websocket.py 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. import asyncio
  2. import json
  3. from typing import Any
  4. from fastapi import WebSocket
  5. class ConnectionManager:
  6. """Manages WebSocket connections and broadcasts."""
  7. def __init__(self):
  8. self.active_connections: list[WebSocket] = []
  9. self._lock = asyncio.Lock()
  10. async def connect(self, websocket: WebSocket):
  11. """Accept a new WebSocket connection."""
  12. await websocket.accept()
  13. async with self._lock:
  14. self.active_connections.append(websocket)
  15. async def disconnect(self, websocket: WebSocket):
  16. """Remove a WebSocket connection."""
  17. async with self._lock:
  18. if websocket in self.active_connections:
  19. self.active_connections.remove(websocket)
  20. async def broadcast(self, message: dict[str, Any]):
  21. """Broadcast a message to all connected clients."""
  22. if not self.active_connections:
  23. return
  24. data = json.dumps(message)
  25. async with self._lock:
  26. disconnected = []
  27. for connection in self.active_connections:
  28. try:
  29. await connection.send_text(data)
  30. except Exception:
  31. disconnected.append(connection)
  32. # Clean up disconnected clients
  33. for conn in disconnected:
  34. if conn in self.active_connections:
  35. self.active_connections.remove(conn)
  36. async def send_printer_status(self, printer_id: int, status: dict):
  37. """Send printer status update to all clients."""
  38. await self.broadcast({
  39. "type": "printer_status",
  40. "printer_id": printer_id,
  41. "data": status,
  42. })
  43. async def send_print_start(self, printer_id: int, data: dict):
  44. """Notify clients that a print has started."""
  45. await self.broadcast({
  46. "type": "print_start",
  47. "printer_id": printer_id,
  48. "data": data,
  49. })
  50. async def send_print_complete(self, printer_id: int, data: dict):
  51. """Notify clients that a print has completed."""
  52. await self.broadcast({
  53. "type": "print_complete",
  54. "printer_id": printer_id,
  55. "data": data,
  56. })
  57. async def send_archive_created(self, archive: dict):
  58. """Notify clients that a new archive was created."""
  59. await self.broadcast({
  60. "type": "archive_created",
  61. "data": archive,
  62. })
  63. async def send_archive_updated(self, archive: dict):
  64. """Notify clients that an archive was updated."""
  65. await self.broadcast({
  66. "type": "archive_updated",
  67. "data": archive,
  68. })
  69. # Global connection manager
  70. ws_manager = ConnectionManager()