center_sync.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462
  1. import asyncio
  2. import logging
  3. import secrets
  4. from datetime import datetime, timedelta
  5. from typing import Iterable, Optional
  6. import httpx
  7. from sqlalchemy import select
  8. from sqlalchemy.ext.asyncio import AsyncSession
  9. from backend.app.core.config import settings
  10. from backend.app.core.security import hash_password
  11. from backend.app.db.session import SessionLocal
  12. from backend.app.models import Campus, Department, Role, SyncState, User
  13. from backend.app.services.sync_ws import sync_broadcaster
  14. logger = logging.getLogger("uvicorn.error")
  15. SYNC_STATE_KEY = "center"
  16. PENDING_ROLE_NAME = "待授权"
  17. _sync_task: asyncio.Task | None = None
  18. _sync_task_lock = asyncio.Lock()
  19. SYNC_YIELD_EVERY = 200
  20. def _build_center_url(path: str) -> str:
  21. base = settings.center_base_url.rstrip("/")
  22. return f"{base}/{path.lstrip('/')}"
  23. def _normalize_external(value) -> Optional[str]:
  24. if value is None:
  25. return None
  26. text = str(value).strip()
  27. if not text or text == "0":
  28. return None
  29. return text
  30. async def upsert_sync_token(db: AsyncSession, token: str) -> SyncState:
  31. state = await db.get(SyncState, SYNC_STATE_KEY)
  32. if not state:
  33. state = SyncState(key=SYNC_STATE_KEY, token=token)
  34. db.add(state)
  35. else:
  36. state.token = token
  37. state.updated_at = datetime.utcnow()
  38. return state
  39. async def get_sync_token(db: AsyncSession) -> Optional[str]:
  40. state = await db.get(SyncState, SYNC_STATE_KEY)
  41. return state.token if state else None
  42. async def mark_sync_completed(db: AsyncSession) -> None:
  43. state = await db.get(SyncState, SYNC_STATE_KEY)
  44. if state:
  45. state.last_sync_at = datetime.utcnow()
  46. async def get_sync_status(db: AsyncSession) -> dict:
  47. state = await db.get(SyncState, SYNC_STATE_KEY)
  48. async with _sync_task_lock:
  49. running = _sync_task is not None and not _sync_task.done()
  50. return {
  51. "running": running,
  52. "last_sync_at": state.last_sync_at if state else None,
  53. }
  54. async def _notify_sync_status() -> None:
  55. try:
  56. async with SessionLocal() as db:
  57. status = await get_sync_status(db)
  58. await sync_broadcaster.broadcast_status(status)
  59. except Exception:
  60. logger.exception("广播同步状态失败")
  61. async def get_or_create_pending_role(db: AsyncSession) -> Role:
  62. result = await db.execute(select(Role).where(Role.name == PENDING_ROLE_NAME))
  63. role = result.scalar_one_or_none()
  64. if role:
  65. return role
  66. role = Role(name=PENDING_ROLE_NAME, description="等待管理员授权的默认角色", is_system=False)
  67. db.add(role)
  68. await db.flush()
  69. return role
  70. async def resolve_role_by_external_ids(db: AsyncSession, external_ids: Iterable[str]) -> Optional[Role]:
  71. for external_id in external_ids:
  72. result = await db.execute(select(Role).where(Role.external_role_id == external_id))
  73. role = result.scalar_one_or_none()
  74. if role:
  75. return role
  76. return None
  77. async def resolve_department_by_external_id(db: AsyncSession, external_dept_id: Optional[str]) -> Optional[Department]:
  78. if not external_dept_id:
  79. return None
  80. result = await db.execute(select(Department).where(Department.external_dept_id == external_dept_id))
  81. return result.scalar_one_or_none()
  82. async def _fetch_json(client: httpx.AsyncClient, url: str, token: str):
  83. response = await client.get(url, headers={"token": token})
  84. response.raise_for_status()
  85. return response.json()
  86. async def _sync_campuses_and_departments(db: AsyncSession, items: list[dict]) -> dict[str, Department]:
  87. campus_result = await db.execute(select(Campus))
  88. campuses = campus_result.scalars().all()
  89. campus_by_external = {c.external_hosp_id: c for c in campuses if c.external_hosp_id}
  90. campus_by_name = {c.name: c for c in campuses}
  91. dept_result = await db.execute(select(Department))
  92. departments = dept_result.scalars().all()
  93. dept_by_external = {d.external_dept_id: d for d in departments if d.external_dept_id}
  94. dept_by_campus_name = {(d.campus_id, d.name): d for d in departments}
  95. updated_departments: dict[str, Department] = {}
  96. for index, item in enumerate(items, start=1):
  97. dept_id = _normalize_external(item.get("departmentId"))
  98. dept_code = _normalize_external(item.get("code"))
  99. dept_primary_id = _normalize_external(item.get("id"))
  100. external_dept_id = dept_id or dept_primary_id or dept_code
  101. hosp_id = _normalize_external(item.get("hospId"))
  102. name = (item.get("name") or "").strip() or f"科室{external_dept_id}"
  103. if not external_dept_id or not hosp_id:
  104. continue
  105. campus = campus_by_external.get(hosp_id)
  106. if not campus:
  107. campus_name = f"院区{hosp_id}"
  108. campus = campus_by_name.get(campus_name)
  109. if campus and not campus.external_hosp_id:
  110. campus.external_hosp_id = hosp_id
  111. else:
  112. if campus_name in campus_by_name and (not campus or campus.external_hosp_id):
  113. campus_name = f"{campus_name}({hosp_id})"
  114. campus = Campus(name=campus_name, external_hosp_id=hosp_id)
  115. db.add(campus)
  116. await db.flush()
  117. campus_by_external[hosp_id] = campus
  118. campus_by_name[campus.name] = campus
  119. dept = dept_by_external.get(external_dept_id)
  120. if not dept:
  121. dept = dept_by_campus_name.get((campus.id, name))
  122. if dept and not dept.external_dept_id:
  123. dept.external_dept_id = external_dept_id
  124. elif dept and dept.external_dept_id and dept.external_dept_id != external_dept_id:
  125. # 同名科室但不同 external_id,添加后缀区分
  126. unique_name = f"{name}({external_dept_id})"
  127. existing_unique = dept_by_campus_name.get((campus.id, unique_name))
  128. if existing_unique:
  129. dept = existing_unique
  130. if not dept.external_dept_id:
  131. dept.external_dept_id = external_dept_id
  132. else:
  133. dept = Department(campus_id=campus.id, name=unique_name, external_dept_id=external_dept_id)
  134. db.add(dept)
  135. await db.flush()
  136. dept_by_external[external_dept_id] = dept
  137. dept_by_campus_name[(dept.campus_id, dept.name)] = dept
  138. elif not dept:
  139. dept = Department(campus_id=campus.id, name=name, external_dept_id=external_dept_id)
  140. db.add(dept)
  141. await db.flush()
  142. dept_by_external[external_dept_id] = dept
  143. dept_by_campus_name[(dept.campus_id, dept.name)] = dept
  144. else:
  145. # 检查更新后的名称是否会导致 (campus_id, name) 冲突
  146. conflict_dept = dept_by_campus_name.get((campus.id, name))
  147. if conflict_dept and conflict_dept.id != dept.id:
  148. # 发现同名科室,为避免冲突,附加外部 ID
  149. name = f"{name}({external_dept_id})"
  150. dept.name = name
  151. dept.campus_id = campus.id
  152. for key in {external_dept_id, dept_primary_id, dept_id, dept_code}:
  153. if key:
  154. updated_departments[key] = dept
  155. if index % SYNC_YIELD_EVERY == 0:
  156. await asyncio.sleep(0)
  157. return updated_departments
  158. async def _sync_roles(db: AsyncSession, items: list[dict]) -> dict[str, Role]:
  159. role_result = await db.execute(select(Role))
  160. roles = role_result.scalars().all()
  161. role_by_external = {r.external_role_id: r for r in roles if r.external_role_id}
  162. role_by_name = {r.name: r for r in roles}
  163. updated_roles: dict[str, Role] = {}
  164. for item in items:
  165. external_role_id = str(item.get("roleId") or item.get("roleCode") or "").strip()
  166. if not external_role_id:
  167. continue
  168. name_candidate = (item.get("remark") or item.get("roleName") or f"角色{external_role_id}").strip()
  169. role = role_by_external.get(external_role_id)
  170. if not role:
  171. role = role_by_name.get(name_candidate)
  172. if role and not role.external_role_id:
  173. role.external_role_id = external_role_id
  174. else:
  175. role_name = name_candidate
  176. if role_name in role_by_name and (not role or role.external_role_id):
  177. role_name = f"{role_name}({external_role_id})"
  178. role = Role(name=role_name, description="中台角色同步", is_system=False, external_role_id=external_role_id)
  179. db.add(role)
  180. await db.flush()
  181. role_by_external[external_role_id] = role
  182. role_by_name[role.name] = role
  183. updated_roles[external_role_id] = role
  184. return updated_roles
  185. async def _sync_users(db: AsyncSession, items: list[dict], dept_by_external: dict[str, Department]) -> None:
  186. external_ids = []
  187. accounts = []
  188. for item in items:
  189. external_id = item.get("userId") or item.get("id")
  190. if external_id is not None:
  191. external_ids.append(str(external_id))
  192. account = str(item.get("empNo") or external_id or "").strip()
  193. if account:
  194. accounts.append(account)
  195. external_id_set = set(external_ids)
  196. existing_users: dict[str, User] = {}
  197. if external_ids:
  198. result = await db.execute(select(User).where(User.external_user_id.in_(external_ids)))
  199. existing_users = {user.external_user_id: user for user in result.scalars().all() if user.external_user_id}
  200. # 预加载按账号查找的用户(用于处理账号冲突)
  201. users_by_account: dict[str, User] = {}
  202. if accounts:
  203. result = await db.execute(select(User).where(User.account.in_(accounts)))
  204. users_by_account = {user.account: user for user in result.scalars().all()}
  205. pending_role = await get_or_create_pending_role(db)
  206. for index, item in enumerate(items, start=1):
  207. external_user_id = str(item.get("userId") or item.get("id") or "").strip()
  208. if not external_user_id:
  209. continue
  210. account = str(item.get("empNo") or external_user_id).strip()
  211. name = (item.get("name") or account).strip()
  212. phone = (item.get("phoneNumber") or "").strip() or None
  213. title = (item.get("jobTitleName") or item.get("professionalTitleName") or "").strip() or None
  214. avatar = (item.get("avatarUrl") or "").strip() or None
  215. status = "active" if str(item.get("activeFlag") or "0") == "1" else "inactive"
  216. dept_external_id = _normalize_external(item.get("departmentId"))
  217. dept_code = _normalize_external(item.get("departmentCode"))
  218. dept = None
  219. if dept_external_id:
  220. dept = dept_by_external.get(dept_external_id)
  221. if not dept and dept_code:
  222. dept = dept_by_external.get(dept_code)
  223. campus_id = dept.campus_id if dept else None
  224. user = existing_users.get(external_user_id)
  225. if not user:
  226. # 检查账号是否已存在(处理账号冲突)
  227. existing_by_account = users_by_account.get(account)
  228. if existing_by_account:
  229. # 账号已存在,关联 external_user_id 到已有用户
  230. user = existing_by_account
  231. user.external_user_id = external_user_id
  232. user.name = name
  233. user.phone = phone
  234. user.title = title
  235. user.avatar = avatar
  236. user.dept_id = dept.id if dept else user.dept_id
  237. user.campus_id = campus_id or user.campus_id
  238. if user.status != status:
  239. user.status = status
  240. user.token_version = (user.token_version or 1) + 1
  241. existing_users[external_user_id] = user
  242. else:
  243. user = User(
  244. name=name,
  245. account=account,
  246. phone=phone,
  247. title=title,
  248. avatar=avatar,
  249. role_id=pending_role.id,
  250. status=status,
  251. password_hash=hash_password(secrets.token_urlsafe(24)),
  252. token_version=1,
  253. external_user_id=external_user_id,
  254. dept_id=dept.id if dept else None,
  255. campus_id=campus_id
  256. )
  257. db.add(user)
  258. users_by_account[account] = user
  259. continue
  260. user.name = name
  261. user.phone = phone
  262. user.title = title
  263. user.avatar = avatar
  264. user.external_user_id = external_user_id
  265. user.dept_id = dept.id if dept else user.dept_id
  266. user.campus_id = campus_id or user.campus_id
  267. if user.status != status:
  268. user.status = status
  269. user.token_version = (user.token_version or 1) + 1
  270. if index % SYNC_YIELD_EVERY == 0:
  271. await db.flush()
  272. await asyncio.sleep(0)
  273. result = await db.execute(select(User).where(User.external_user_id.isnot(None)))
  274. for user in result.scalars().all():
  275. if user.external_user_id not in external_id_set and user.status != "inactive":
  276. user.status = "inactive"
  277. user.token_version = (user.token_version or 1) + 1
  278. async def sync_center_data(token: str) -> None:
  279. if not settings.center_base_url:
  280. logger.warning("CENTER_BASE_URL 未配置,跳过中台同步")
  281. return
  282. logger.info("开始拉取中台数据...")
  283. async with httpx.AsyncClient(timeout=settings.sso_timeout_seconds) as client:
  284. dept_url = _build_center_url("/gateway/centerSys/api/getAllDepartment")
  285. user_url = _build_center_url("/gateway/centerSys/api/getAllUser")
  286. role_url = _build_center_url("/gateway/centerSys/api/getRoleList")
  287. try:
  288. dept_payload = await _fetch_json(client, dept_url, token)
  289. logger.info("已获取科室数据")
  290. role_payload = await _fetch_json(client, role_url, token)
  291. logger.info("已获取角色数据")
  292. user_payload = await _fetch_json(client, user_url, token)
  293. logger.info("已获取人员数据")
  294. except Exception as e:
  295. logger.error(f"从发现中心请求数据失败: {str(e)}")
  296. raise
  297. if isinstance(dept_payload, dict):
  298. code = dept_payload.get("code") or dept_payload.get("status")
  299. if code is not None and code != 200:
  300. raise ValueError(f"科室接口返回异常 code={code}")
  301. if isinstance(role_payload, dict):
  302. code = role_payload.get("code")
  303. if code is not None and code != 200:
  304. raise ValueError(f"角色接口返回异常 code={code}")
  305. if isinstance(user_payload, dict):
  306. status = user_payload.get("status")
  307. if status is not None and status != 200:
  308. raise ValueError(f"人员接口返回异常 status={status}")
  309. dept_items = []
  310. if isinstance(dept_payload, list):
  311. dept_items = dept_payload
  312. elif isinstance(dept_payload, dict) and isinstance(dept_payload.get("data"), list):
  313. dept_items = dept_payload.get("data", [])
  314. role_items = []
  315. if isinstance(role_payload, dict):
  316. role_data = role_payload.get("data") or {}
  317. role_items = role_data.get("list") or []
  318. user_items = []
  319. if isinstance(user_payload, dict):
  320. user_items = user_payload.get("data") or []
  321. logger.info(f"解析到数据: 科室={len(dept_items)}, 角色={len(role_items)}, 人员={len(user_items)}")
  322. try:
  323. async with SessionLocal() as db:
  324. dept_by_external = await _sync_campuses_and_departments(db, dept_items)
  325. await _sync_roles(db, role_items)
  326. await _sync_users(db, user_items, dept_by_external)
  327. await mark_sync_completed(db)
  328. await db.commit()
  329. logger.info("中台同步完成并已存入数据库")
  330. except Exception as e:
  331. logger.error(f"中台同步写入数据库失败: {str(e)}")
  332. raise
  333. finally:
  334. # 确保无论成功失败,都广播一次最新的状态(结束 loading)
  335. await _notify_sync_status()
  336. async def _cancel_then_sync(task: asyncio.Task, token: str) -> None:
  337. try:
  338. await task
  339. except asyncio.CancelledError:
  340. pass
  341. await sync_center_data(token)
  342. def _schedule_status_broadcast(_: asyncio.Task) -> None:
  343. asyncio.create_task(_notify_sync_status())
  344. async def trigger_center_sync(token: str, *, restart: bool, wait: bool) -> bool:
  345. global _sync_task
  346. async with _sync_task_lock:
  347. if _sync_task and not _sync_task.done():
  348. if not restart:
  349. return False
  350. _sync_task.cancel()
  351. _sync_task = asyncio.create_task(_cancel_then_sync(_sync_task, token))
  352. else:
  353. _sync_task = asyncio.create_task(sync_center_data(token))
  354. task = _sync_task
  355. task.add_done_callback(_schedule_status_broadcast)
  356. await _notify_sync_status()
  357. if wait:
  358. await task
  359. return True
  360. async def run_center_sync_once() -> None:
  361. async with SessionLocal() as db:
  362. token = await get_sync_token(db)
  363. if not token:
  364. logger.warning("未找到中台同步 token,跳过本次同步")
  365. return
  366. started = await trigger_center_sync(token, restart=False, wait=True)
  367. if not started:
  368. logger.info("已有同步任务在执行,跳过本次同步")
  369. async def run_center_sync_loop(stop_event: asyncio.Event) -> None:
  370. while not stop_event.is_set():
  371. now = datetime.now()
  372. next_run = now.replace(
  373. hour=settings.center_sync_hour,
  374. minute=settings.center_sync_minute,
  375. second=0,
  376. microsecond=0
  377. )
  378. if next_run <= now:
  379. next_run = next_run + timedelta(days=1)
  380. delay = (next_run - now).total_seconds()
  381. try:
  382. await asyncio.wait_for(stop_event.wait(), timeout=delay)
  383. except asyncio.TimeoutError:
  384. try:
  385. await run_center_sync_once()
  386. except Exception:
  387. logger.exception("中台同步失败")