sync_ws.py 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
  1. import asyncio
  2. import logging
  3. from typing import Any
  4. from fastapi import WebSocket
  5. from fastapi.encoders import jsonable_encoder
  6. logger = logging.getLogger("uvicorn.error")
  7. class SyncStatusBroadcaster:
  8. def __init__(self) -> None:
  9. self._connections: set[WebSocket] = set()
  10. self._lock = asyncio.Lock()
  11. async def connect(self, websocket: WebSocket) -> None:
  12. await websocket.accept()
  13. async with self._lock:
  14. self._connections.add(websocket)
  15. async def disconnect(self, websocket: WebSocket) -> None:
  16. async with self._lock:
  17. self._connections.discard(websocket)
  18. async def send_status(self, websocket: WebSocket, status: dict[str, Any]) -> None:
  19. payload = {"event": "sync_status", "data": jsonable_encoder(status)}
  20. await websocket.send_json(payload)
  21. async def broadcast_status(self, status: dict[str, Any]) -> None:
  22. async with self._lock:
  23. connections = list(self._connections)
  24. if not connections:
  25. return
  26. payload = {"event": "sync_status", "data": jsonable_encoder(status)}
  27. for ws in connections:
  28. try:
  29. # 检查连接是否仍然打开
  30. if ws.client_state.name != "CONNECTED":
  31. await self.disconnect(ws)
  32. continue
  33. await ws.send_json(payload)
  34. except Exception:
  35. logger.warning("同步状态推送失败,连接可能已关闭")
  36. await self.disconnect(ws)
  37. sync_broadcaster = SyncStatusBroadcaster()