import asyncio import logging from typing import Any from fastapi import WebSocket from fastapi.encoders import jsonable_encoder logger = logging.getLogger("uvicorn.error") class SyncStatusBroadcaster: def __init__(self) -> None: self._connections: set[WebSocket] = set() self._lock = asyncio.Lock() async def connect(self, websocket: WebSocket) -> None: await websocket.accept() async with self._lock: self._connections.add(websocket) async def disconnect(self, websocket: WebSocket) -> None: async with self._lock: self._connections.discard(websocket) async def send_status(self, websocket: WebSocket, status: dict[str, Any]) -> None: payload = {"event": "sync_status", "data": jsonable_encoder(status)} await websocket.send_json(payload) async def broadcast_status(self, status: dict[str, Any]) -> None: async with self._lock: connections = list(self._connections) if not connections: return payload = {"event": "sync_status", "data": jsonable_encoder(status)} for ws in connections: try: # 检查连接是否仍然打开 if ws.client_state.name != "CONNECTED": await self.disconnect(ws) continue await ws.send_json(payload) except Exception: logger.warning("同步状态推送失败,连接可能已关闭") await self.disconnect(ws) sync_broadcaster = SyncStatusBroadcaster()