import asyncio import logging import secrets from datetime import datetime, timedelta from typing import Iterable, Optional import httpx from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from backend.app.core.config import settings from backend.app.core.security import hash_password from backend.app.db.session import SessionLocal from backend.app.models import Campus, Department, Role, SyncState, User from backend.app.services.sync_ws import sync_broadcaster logger = logging.getLogger("uvicorn.error") SYNC_STATE_KEY = "center" PENDING_ROLE_NAME = "待授权" _sync_task: asyncio.Task | None = None _sync_task_lock = asyncio.Lock() SYNC_YIELD_EVERY = 200 def _build_center_url(path: str) -> str: base = settings.center_base_url.rstrip("/") return f"{base}/{path.lstrip('/')}" def _normalize_external(value) -> Optional[str]: if value is None: return None text = str(value).strip() if not text or text == "0": return None return text async def upsert_sync_token(db: AsyncSession, token: str) -> SyncState: state = await db.get(SyncState, SYNC_STATE_KEY) if not state: state = SyncState(key=SYNC_STATE_KEY, token=token) db.add(state) else: state.token = token state.updated_at = datetime.utcnow() return state async def get_sync_token(db: AsyncSession) -> Optional[str]: state = await db.get(SyncState, SYNC_STATE_KEY) return state.token if state else None async def mark_sync_completed(db: AsyncSession) -> None: state = await db.get(SyncState, SYNC_STATE_KEY) if state: state.last_sync_at = datetime.utcnow() async def get_sync_status(db: AsyncSession) -> dict: state = await db.get(SyncState, SYNC_STATE_KEY) async with _sync_task_lock: running = _sync_task is not None and not _sync_task.done() return { "running": running, "last_sync_at": state.last_sync_at if state else None, } async def _notify_sync_status() -> None: try: async with SessionLocal() as db: status = await get_sync_status(db) await sync_broadcaster.broadcast_status(status) except Exception: logger.exception("广播同步状态失败") async def get_or_create_pending_role(db: AsyncSession) -> Role: result = await db.execute(select(Role).where(Role.name == PENDING_ROLE_NAME)) role = result.scalar_one_or_none() if role: return role role = Role(name=PENDING_ROLE_NAME, description="等待管理员授权的默认角色", is_system=False) db.add(role) await db.flush() return role async def resolve_role_by_external_ids(db: AsyncSession, external_ids: Iterable[str]) -> Optional[Role]: for external_id in external_ids: result = await db.execute(select(Role).where(Role.external_role_id == external_id)) role = result.scalar_one_or_none() if role: return role return None async def resolve_department_by_external_id(db: AsyncSession, external_dept_id: Optional[str]) -> Optional[Department]: if not external_dept_id: return None result = await db.execute(select(Department).where(Department.external_dept_id == external_dept_id)) return result.scalar_one_or_none() async def _fetch_json(client: httpx.AsyncClient, url: str, token: str): response = await client.get(url, headers={"token": token}) response.raise_for_status() return response.json() async def _sync_campuses_and_departments(db: AsyncSession, items: list[dict]) -> dict[str, Department]: campus_result = await db.execute(select(Campus)) campuses = campus_result.scalars().all() campus_by_external = {c.external_hosp_id: c for c in campuses if c.external_hosp_id} campus_by_name = {c.name: c for c in campuses} dept_result = await db.execute(select(Department)) departments = dept_result.scalars().all() dept_by_external = {d.external_dept_id: d for d in departments if d.external_dept_id} dept_by_campus_name = {(d.campus_id, d.name): d for d in departments} updated_departments: dict[str, Department] = {} for index, item in enumerate(items, start=1): dept_id = _normalize_external(item.get("departmentId")) dept_code = _normalize_external(item.get("code")) dept_primary_id = _normalize_external(item.get("id")) external_dept_id = dept_id or dept_primary_id or dept_code hosp_id = _normalize_external(item.get("hospId")) name = (item.get("name") or "").strip() or f"科室{external_dept_id}" if not external_dept_id or not hosp_id: continue campus = campus_by_external.get(hosp_id) if not campus: campus_name = f"院区{hosp_id}" campus = campus_by_name.get(campus_name) if campus and not campus.external_hosp_id: campus.external_hosp_id = hosp_id else: if campus_name in campus_by_name and (not campus or campus.external_hosp_id): campus_name = f"{campus_name}({hosp_id})" campus = Campus(name=campus_name, external_hosp_id=hosp_id) db.add(campus) await db.flush() campus_by_external[hosp_id] = campus campus_by_name[campus.name] = campus dept = dept_by_external.get(external_dept_id) if not dept: dept = dept_by_campus_name.get((campus.id, name)) if dept and not dept.external_dept_id: dept.external_dept_id = external_dept_id elif dept and dept.external_dept_id and dept.external_dept_id != external_dept_id: # 同名科室但不同 external_id,添加后缀区分 unique_name = f"{name}({external_dept_id})" existing_unique = dept_by_campus_name.get((campus.id, unique_name)) if existing_unique: dept = existing_unique if not dept.external_dept_id: dept.external_dept_id = external_dept_id else: dept = Department(campus_id=campus.id, name=unique_name, external_dept_id=external_dept_id) db.add(dept) await db.flush() dept_by_external[external_dept_id] = dept dept_by_campus_name[(dept.campus_id, dept.name)] = dept elif not dept: dept = Department(campus_id=campus.id, name=name, external_dept_id=external_dept_id) db.add(dept) await db.flush() dept_by_external[external_dept_id] = dept dept_by_campus_name[(dept.campus_id, dept.name)] = dept else: # 检查更新后的名称是否会导致 (campus_id, name) 冲突 conflict_dept = dept_by_campus_name.get((campus.id, name)) if conflict_dept and conflict_dept.id != dept.id: # 发现同名科室,为避免冲突,附加外部 ID name = f"{name}({external_dept_id})" dept.name = name dept.campus_id = campus.id for key in {external_dept_id, dept_primary_id, dept_id, dept_code}: if key: updated_departments[key] = dept if index % SYNC_YIELD_EVERY == 0: await asyncio.sleep(0) return updated_departments async def _sync_roles(db: AsyncSession, items: list[dict]) -> dict[str, Role]: role_result = await db.execute(select(Role)) roles = role_result.scalars().all() role_by_external = {r.external_role_id: r for r in roles if r.external_role_id} role_by_name = {r.name: r for r in roles} updated_roles: dict[str, Role] = {} for item in items: external_role_id = str(item.get("roleId") or item.get("roleCode") or "").strip() if not external_role_id: continue name_candidate = (item.get("remark") or item.get("roleName") or f"角色{external_role_id}").strip() role = role_by_external.get(external_role_id) if not role: role = role_by_name.get(name_candidate) if role and not role.external_role_id: role.external_role_id = external_role_id else: role_name = name_candidate if role_name in role_by_name and (not role or role.external_role_id): role_name = f"{role_name}({external_role_id})" role = Role(name=role_name, description="中台角色同步", is_system=False, external_role_id=external_role_id) db.add(role) await db.flush() role_by_external[external_role_id] = role role_by_name[role.name] = role updated_roles[external_role_id] = role return updated_roles async def _sync_users(db: AsyncSession, items: list[dict], dept_by_external: dict[str, Department]) -> None: external_ids = [] accounts = [] for item in items: external_id = item.get("userId") or item.get("id") if external_id is not None: external_ids.append(str(external_id)) account = str(item.get("empNo") or external_id or "").strip() if account: accounts.append(account) external_id_set = set(external_ids) existing_users: dict[str, User] = {} if external_ids: result = await db.execute(select(User).where(User.external_user_id.in_(external_ids))) existing_users = {user.external_user_id: user for user in result.scalars().all() if user.external_user_id} # 预加载按账号查找的用户(用于处理账号冲突) users_by_account: dict[str, User] = {} if accounts: result = await db.execute(select(User).where(User.account.in_(accounts))) users_by_account = {user.account: user for user in result.scalars().all()} pending_role = await get_or_create_pending_role(db) for index, item in enumerate(items, start=1): external_user_id = str(item.get("userId") or item.get("id") or "").strip() if not external_user_id: continue account = str(item.get("empNo") or external_user_id).strip() name = (item.get("name") or account).strip() phone = (item.get("phoneNumber") or "").strip() or None title = (item.get("jobTitleName") or item.get("professionalTitleName") or "").strip() or None avatar = (item.get("avatarUrl") or "").strip() or None status = "active" if str(item.get("activeFlag") or "0") == "1" else "inactive" dept_external_id = _normalize_external(item.get("departmentId")) dept_code = _normalize_external(item.get("departmentCode")) dept = None if dept_external_id: dept = dept_by_external.get(dept_external_id) if not dept and dept_code: dept = dept_by_external.get(dept_code) campus_id = dept.campus_id if dept else None user = existing_users.get(external_user_id) if not user: # 检查账号是否已存在(处理账号冲突) existing_by_account = users_by_account.get(account) if existing_by_account: # 账号已存在,关联 external_user_id 到已有用户 user = existing_by_account user.external_user_id = external_user_id user.name = name user.phone = phone user.title = title user.avatar = avatar user.dept_id = dept.id if dept else user.dept_id user.campus_id = campus_id or user.campus_id if user.status != status: user.status = status user.token_version = (user.token_version or 1) + 1 existing_users[external_user_id] = user else: user = User( name=name, account=account, phone=phone, title=title, avatar=avatar, role_id=pending_role.id, status=status, password_hash=hash_password(secrets.token_urlsafe(24)), token_version=1, external_user_id=external_user_id, dept_id=dept.id if dept else None, campus_id=campus_id ) db.add(user) users_by_account[account] = user continue user.name = name user.phone = phone user.title = title user.avatar = avatar user.external_user_id = external_user_id user.dept_id = dept.id if dept else user.dept_id user.campus_id = campus_id or user.campus_id if user.status != status: user.status = status user.token_version = (user.token_version or 1) + 1 if index % SYNC_YIELD_EVERY == 0: await db.flush() await asyncio.sleep(0) result = await db.execute(select(User).where(User.external_user_id.isnot(None))) for user in result.scalars().all(): if user.external_user_id not in external_id_set and user.status != "inactive": user.status = "inactive" user.token_version = (user.token_version or 1) + 1 async def sync_center_data(token: str) -> None: if not settings.center_base_url: logger.warning("CENTER_BASE_URL 未配置,跳过中台同步") return logger.info("开始拉取中台数据...") async with httpx.AsyncClient(timeout=settings.sso_timeout_seconds) as client: dept_url = _build_center_url("/gateway/centerSys/api/getAllDepartment") user_url = _build_center_url("/gateway/centerSys/api/getAllUser") role_url = _build_center_url("/gateway/centerSys/api/getRoleList") try: dept_payload = await _fetch_json(client, dept_url, token) logger.info("已获取科室数据") role_payload = await _fetch_json(client, role_url, token) logger.info("已获取角色数据") user_payload = await _fetch_json(client, user_url, token) logger.info("已获取人员数据") except Exception as e: logger.error(f"从发现中心请求数据失败: {str(e)}") raise if isinstance(dept_payload, dict): code = dept_payload.get("code") or dept_payload.get("status") if code is not None and code != 200: raise ValueError(f"科室接口返回异常 code={code}") if isinstance(role_payload, dict): code = role_payload.get("code") if code is not None and code != 200: raise ValueError(f"角色接口返回异常 code={code}") if isinstance(user_payload, dict): status = user_payload.get("status") if status is not None and status != 200: raise ValueError(f"人员接口返回异常 status={status}") dept_items = [] if isinstance(dept_payload, list): dept_items = dept_payload elif isinstance(dept_payload, dict) and isinstance(dept_payload.get("data"), list): dept_items = dept_payload.get("data", []) role_items = [] if isinstance(role_payload, dict): role_data = role_payload.get("data") or {} role_items = role_data.get("list") or [] user_items = [] if isinstance(user_payload, dict): user_items = user_payload.get("data") or [] logger.info(f"解析到数据: 科室={len(dept_items)}, 角色={len(role_items)}, 人员={len(user_items)}") try: async with SessionLocal() as db: dept_by_external = await _sync_campuses_and_departments(db, dept_items) await _sync_roles(db, role_items) await _sync_users(db, user_items, dept_by_external) await mark_sync_completed(db) await db.commit() logger.info("中台同步完成并已存入数据库") except Exception as e: logger.error(f"中台同步写入数据库失败: {str(e)}") raise finally: # 确保无论成功失败,都广播一次最新的状态(结束 loading) await _notify_sync_status() async def _cancel_then_sync(task: asyncio.Task, token: str) -> None: try: await task except asyncio.CancelledError: pass await sync_center_data(token) def _schedule_status_broadcast(_: asyncio.Task) -> None: asyncio.create_task(_notify_sync_status()) async def trigger_center_sync(token: str, *, restart: bool, wait: bool) -> bool: global _sync_task async with _sync_task_lock: if _sync_task and not _sync_task.done(): if not restart: return False _sync_task.cancel() _sync_task = asyncio.create_task(_cancel_then_sync(_sync_task, token)) else: _sync_task = asyncio.create_task(sync_center_data(token)) task = _sync_task task.add_done_callback(_schedule_status_broadcast) await _notify_sync_status() if wait: await task return True async def run_center_sync_once() -> None: async with SessionLocal() as db: token = await get_sync_token(db) if not token: logger.warning("未找到中台同步 token,跳过本次同步") return started = await trigger_center_sync(token, restart=False, wait=True) if not started: logger.info("已有同步任务在执行,跳过本次同步") async def run_center_sync_loop(stop_event: asyncio.Event) -> None: while not stop_event.is_set(): now = datetime.now() next_run = now.replace( hour=settings.center_sync_hour, minute=settings.center_sync_minute, second=0, microsecond=0 ) if next_run <= now: next_run = next_run + timedelta(days=1) delay = (next_run - now).total_seconds() try: await asyncio.wait_for(stop_event.wait(), timeout=delay) except asyncio.TimeoutError: try: await run_center_sync_once() except Exception: logger.exception("中台同步失败")