| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485 |
- import asyncio
- import json
- from typing import Any
- from fastapi import WebSocket
- class ConnectionManager:
- """Manages WebSocket connections and broadcasts."""
- def __init__(self):
- self.active_connections: list[WebSocket] = []
- self._lock = asyncio.Lock()
- async def connect(self, websocket: WebSocket):
- """Accept a new WebSocket connection."""
- await websocket.accept()
- async with self._lock:
- self.active_connections.append(websocket)
- async def disconnect(self, websocket: WebSocket):
- """Remove a WebSocket connection."""
- async with self._lock:
- if websocket in self.active_connections:
- self.active_connections.remove(websocket)
- async def broadcast(self, message: dict[str, Any]):
- """Broadcast a message to all connected clients."""
- if not self.active_connections:
- return
- data = json.dumps(message)
- async with self._lock:
- disconnected = []
- for connection in self.active_connections:
- try:
- await connection.send_text(data)
- except Exception:
- disconnected.append(connection)
- # Clean up disconnected clients
- for conn in disconnected:
- if conn in self.active_connections:
- self.active_connections.remove(conn)
- async def send_printer_status(self, printer_id: int, status: dict):
- """Send printer status update to all clients."""
- await self.broadcast({
- "type": "printer_status",
- "printer_id": printer_id,
- "data": status,
- })
- async def send_print_start(self, printer_id: int, data: dict):
- """Notify clients that a print has started."""
- await self.broadcast({
- "type": "print_start",
- "printer_id": printer_id,
- "data": data,
- })
- async def send_print_complete(self, printer_id: int, data: dict):
- """Notify clients that a print has completed."""
- await self.broadcast({
- "type": "print_complete",
- "printer_id": printer_id,
- "data": data,
- })
- async def send_archive_created(self, archive: dict):
- """Notify clients that a new archive was created."""
- await self.broadcast({
- "type": "archive_created",
- "data": archive,
- })
- async def send_archive_updated(self, archive: dict):
- """Notify clients that an archive was updated."""
- await self.broadcast({
- "type": "archive_updated",
- "data": archive,
- })
- # Global connection manager
- ws_manager = ConnectionManager()
|