| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462 |
- import asyncio
- import logging
- import secrets
- from datetime import datetime, timedelta
- from typing import Iterable, Optional
- import httpx
- from sqlalchemy import select
- from sqlalchemy.ext.asyncio import AsyncSession
- from backend.app.core.config import settings
- from backend.app.core.security import hash_password
- from backend.app.db.session import SessionLocal
- from backend.app.models import Campus, Department, Role, SyncState, User
- from backend.app.services.sync_ws import sync_broadcaster
- logger = logging.getLogger("uvicorn.error")
- SYNC_STATE_KEY = "center"
- PENDING_ROLE_NAME = "待授权"
- _sync_task: asyncio.Task | None = None
- _sync_task_lock = asyncio.Lock()
- SYNC_YIELD_EVERY = 200
- def _build_center_url(path: str) -> str:
- base = settings.center_base_url.rstrip("/")
- return f"{base}/{path.lstrip('/')}"
- def _normalize_external(value) -> Optional[str]:
- if value is None:
- return None
- text = str(value).strip()
- if not text or text == "0":
- return None
- return text
- async def upsert_sync_token(db: AsyncSession, token: str) -> SyncState:
- state = await db.get(SyncState, SYNC_STATE_KEY)
- if not state:
- state = SyncState(key=SYNC_STATE_KEY, token=token)
- db.add(state)
- else:
- state.token = token
- state.updated_at = datetime.utcnow()
- return state
- async def get_sync_token(db: AsyncSession) -> Optional[str]:
- state = await db.get(SyncState, SYNC_STATE_KEY)
- return state.token if state else None
- async def mark_sync_completed(db: AsyncSession) -> None:
- state = await db.get(SyncState, SYNC_STATE_KEY)
- if state:
- state.last_sync_at = datetime.utcnow()
- async def get_sync_status(db: AsyncSession) -> dict:
- state = await db.get(SyncState, SYNC_STATE_KEY)
- async with _sync_task_lock:
- running = _sync_task is not None and not _sync_task.done()
- return {
- "running": running,
- "last_sync_at": state.last_sync_at if state else None,
- }
- async def _notify_sync_status() -> None:
- try:
- async with SessionLocal() as db:
- status = await get_sync_status(db)
- await sync_broadcaster.broadcast_status(status)
- except Exception:
- logger.exception("广播同步状态失败")
- async def get_or_create_pending_role(db: AsyncSession) -> Role:
- result = await db.execute(select(Role).where(Role.name == PENDING_ROLE_NAME))
- role = result.scalar_one_or_none()
- if role:
- return role
- role = Role(name=PENDING_ROLE_NAME, description="等待管理员授权的默认角色", is_system=False)
- db.add(role)
- await db.flush()
- return role
- async def resolve_role_by_external_ids(db: AsyncSession, external_ids: Iterable[str]) -> Optional[Role]:
- for external_id in external_ids:
- result = await db.execute(select(Role).where(Role.external_role_id == external_id))
- role = result.scalar_one_or_none()
- if role:
- return role
- return None
- async def resolve_department_by_external_id(db: AsyncSession, external_dept_id: Optional[str]) -> Optional[Department]:
- if not external_dept_id:
- return None
- result = await db.execute(select(Department).where(Department.external_dept_id == external_dept_id))
- return result.scalar_one_or_none()
- async def _fetch_json(client: httpx.AsyncClient, url: str, token: str):
- response = await client.get(url, headers={"token": token})
- response.raise_for_status()
- return response.json()
- async def _sync_campuses_and_departments(db: AsyncSession, items: list[dict]) -> dict[str, Department]:
- campus_result = await db.execute(select(Campus))
- campuses = campus_result.scalars().all()
- campus_by_external = {c.external_hosp_id: c for c in campuses if c.external_hosp_id}
- campus_by_name = {c.name: c for c in campuses}
- dept_result = await db.execute(select(Department))
- departments = dept_result.scalars().all()
- dept_by_external = {d.external_dept_id: d for d in departments if d.external_dept_id}
- dept_by_campus_name = {(d.campus_id, d.name): d for d in departments}
- updated_departments: dict[str, Department] = {}
- for index, item in enumerate(items, start=1):
- dept_id = _normalize_external(item.get("departmentId"))
- dept_code = _normalize_external(item.get("code"))
- dept_primary_id = _normalize_external(item.get("id"))
- external_dept_id = dept_id or dept_primary_id or dept_code
- hosp_id = _normalize_external(item.get("hospId"))
- name = (item.get("name") or "").strip() or f"科室{external_dept_id}"
- if not external_dept_id or not hosp_id:
- continue
- campus = campus_by_external.get(hosp_id)
- if not campus:
- campus_name = f"院区{hosp_id}"
- campus = campus_by_name.get(campus_name)
- if campus and not campus.external_hosp_id:
- campus.external_hosp_id = hosp_id
- else:
- if campus_name in campus_by_name and (not campus or campus.external_hosp_id):
- campus_name = f"{campus_name}({hosp_id})"
- campus = Campus(name=campus_name, external_hosp_id=hosp_id)
- db.add(campus)
- await db.flush()
- campus_by_external[hosp_id] = campus
- campus_by_name[campus.name] = campus
- dept = dept_by_external.get(external_dept_id)
- if not dept:
- dept = dept_by_campus_name.get((campus.id, name))
- if dept and not dept.external_dept_id:
- dept.external_dept_id = external_dept_id
- elif dept and dept.external_dept_id and dept.external_dept_id != external_dept_id:
- # 同名科室但不同 external_id,添加后缀区分
- unique_name = f"{name}({external_dept_id})"
- existing_unique = dept_by_campus_name.get((campus.id, unique_name))
- if existing_unique:
- dept = existing_unique
- if not dept.external_dept_id:
- dept.external_dept_id = external_dept_id
- else:
- dept = Department(campus_id=campus.id, name=unique_name, external_dept_id=external_dept_id)
- db.add(dept)
- await db.flush()
- dept_by_external[external_dept_id] = dept
- dept_by_campus_name[(dept.campus_id, dept.name)] = dept
- elif not dept:
- dept = Department(campus_id=campus.id, name=name, external_dept_id=external_dept_id)
- db.add(dept)
- await db.flush()
- dept_by_external[external_dept_id] = dept
- dept_by_campus_name[(dept.campus_id, dept.name)] = dept
- else:
- # 检查更新后的名称是否会导致 (campus_id, name) 冲突
- conflict_dept = dept_by_campus_name.get((campus.id, name))
- if conflict_dept and conflict_dept.id != dept.id:
- # 发现同名科室,为避免冲突,附加外部 ID
- name = f"{name}({external_dept_id})"
-
- dept.name = name
- dept.campus_id = campus.id
- for key in {external_dept_id, dept_primary_id, dept_id, dept_code}:
- if key:
- updated_departments[key] = dept
- if index % SYNC_YIELD_EVERY == 0:
- await asyncio.sleep(0)
- return updated_departments
- async def _sync_roles(db: AsyncSession, items: list[dict]) -> dict[str, Role]:
- role_result = await db.execute(select(Role))
- roles = role_result.scalars().all()
- role_by_external = {r.external_role_id: r for r in roles if r.external_role_id}
- role_by_name = {r.name: r for r in roles}
- updated_roles: dict[str, Role] = {}
- for item in items:
- external_role_id = str(item.get("roleId") or item.get("roleCode") or "").strip()
- if not external_role_id:
- continue
- name_candidate = (item.get("remark") or item.get("roleName") or f"角色{external_role_id}").strip()
- role = role_by_external.get(external_role_id)
- if not role:
- role = role_by_name.get(name_candidate)
- if role and not role.external_role_id:
- role.external_role_id = external_role_id
- else:
- role_name = name_candidate
- if role_name in role_by_name and (not role or role.external_role_id):
- role_name = f"{role_name}({external_role_id})"
- role = Role(name=role_name, description="中台角色同步", is_system=False, external_role_id=external_role_id)
- db.add(role)
- await db.flush()
- role_by_external[external_role_id] = role
- role_by_name[role.name] = role
- updated_roles[external_role_id] = role
- return updated_roles
- async def _sync_users(db: AsyncSession, items: list[dict], dept_by_external: dict[str, Department]) -> None:
- external_ids = []
- accounts = []
- for item in items:
- external_id = item.get("userId") or item.get("id")
- if external_id is not None:
- external_ids.append(str(external_id))
- account = str(item.get("empNo") or external_id or "").strip()
- if account:
- accounts.append(account)
- external_id_set = set(external_ids)
- existing_users: dict[str, User] = {}
- if external_ids:
- result = await db.execute(select(User).where(User.external_user_id.in_(external_ids)))
- existing_users = {user.external_user_id: user for user in result.scalars().all() if user.external_user_id}
- # 预加载按账号查找的用户(用于处理账号冲突)
- users_by_account: dict[str, User] = {}
- if accounts:
- result = await db.execute(select(User).where(User.account.in_(accounts)))
- users_by_account = {user.account: user for user in result.scalars().all()}
- pending_role = await get_or_create_pending_role(db)
- for index, item in enumerate(items, start=1):
- external_user_id = str(item.get("userId") or item.get("id") or "").strip()
- if not external_user_id:
- continue
- account = str(item.get("empNo") or external_user_id).strip()
- name = (item.get("name") or account).strip()
- phone = (item.get("phoneNumber") or "").strip() or None
- title = (item.get("jobTitleName") or item.get("professionalTitleName") or "").strip() or None
- avatar = (item.get("avatarUrl") or "").strip() or None
- status = "active" if str(item.get("activeFlag") or "0") == "1" else "inactive"
- dept_external_id = _normalize_external(item.get("departmentId"))
- dept_code = _normalize_external(item.get("departmentCode"))
- dept = None
- if dept_external_id:
- dept = dept_by_external.get(dept_external_id)
- if not dept and dept_code:
- dept = dept_by_external.get(dept_code)
- campus_id = dept.campus_id if dept else None
- user = existing_users.get(external_user_id)
- if not user:
- # 检查账号是否已存在(处理账号冲突)
- existing_by_account = users_by_account.get(account)
- if existing_by_account:
- # 账号已存在,关联 external_user_id 到已有用户
- user = existing_by_account
- user.external_user_id = external_user_id
- user.name = name
- user.phone = phone
- user.title = title
- user.avatar = avatar
- user.dept_id = dept.id if dept else user.dept_id
- user.campus_id = campus_id or user.campus_id
- if user.status != status:
- user.status = status
- user.token_version = (user.token_version or 1) + 1
- existing_users[external_user_id] = user
- else:
- user = User(
- name=name,
- account=account,
- phone=phone,
- title=title,
- avatar=avatar,
- role_id=pending_role.id,
- status=status,
- password_hash=hash_password(secrets.token_urlsafe(24)),
- token_version=1,
- external_user_id=external_user_id,
- dept_id=dept.id if dept else None,
- campus_id=campus_id
- )
- db.add(user)
- users_by_account[account] = user
- continue
- user.name = name
- user.phone = phone
- user.title = title
- user.avatar = avatar
- user.external_user_id = external_user_id
- user.dept_id = dept.id if dept else user.dept_id
- user.campus_id = campus_id or user.campus_id
- if user.status != status:
- user.status = status
- user.token_version = (user.token_version or 1) + 1
- if index % SYNC_YIELD_EVERY == 0:
- await db.flush()
- await asyncio.sleep(0)
- result = await db.execute(select(User).where(User.external_user_id.isnot(None)))
- for user in result.scalars().all():
- if user.external_user_id not in external_id_set and user.status != "inactive":
- user.status = "inactive"
- user.token_version = (user.token_version or 1) + 1
- async def sync_center_data(token: str) -> None:
- if not settings.center_base_url:
- logger.warning("CENTER_BASE_URL 未配置,跳过中台同步")
- return
- logger.info("开始拉取中台数据...")
- async with httpx.AsyncClient(timeout=settings.sso_timeout_seconds) as client:
- dept_url = _build_center_url("/gateway/centerSys/api/getAllDepartment")
- user_url = _build_center_url("/gateway/centerSys/api/getAllUser")
- role_url = _build_center_url("/gateway/centerSys/api/getRoleList")
- try:
- dept_payload = await _fetch_json(client, dept_url, token)
- logger.info("已获取科室数据")
- role_payload = await _fetch_json(client, role_url, token)
- logger.info("已获取角色数据")
- user_payload = await _fetch_json(client, user_url, token)
- logger.info("已获取人员数据")
- except Exception as e:
- logger.error(f"从发现中心请求数据失败: {str(e)}")
- raise
- if isinstance(dept_payload, dict):
- code = dept_payload.get("code") or dept_payload.get("status")
- if code is not None and code != 200:
- raise ValueError(f"科室接口返回异常 code={code}")
- if isinstance(role_payload, dict):
- code = role_payload.get("code")
- if code is not None and code != 200:
- raise ValueError(f"角色接口返回异常 code={code}")
- if isinstance(user_payload, dict):
- status = user_payload.get("status")
- if status is not None and status != 200:
- raise ValueError(f"人员接口返回异常 status={status}")
- dept_items = []
- if isinstance(dept_payload, list):
- dept_items = dept_payload
- elif isinstance(dept_payload, dict) and isinstance(dept_payload.get("data"), list):
- dept_items = dept_payload.get("data", [])
- role_items = []
- if isinstance(role_payload, dict):
- role_data = role_payload.get("data") or {}
- role_items = role_data.get("list") or []
- user_items = []
- if isinstance(user_payload, dict):
- user_items = user_payload.get("data") or []
- logger.info(f"解析到数据: 科室={len(dept_items)}, 角色={len(role_items)}, 人员={len(user_items)}")
- try:
- async with SessionLocal() as db:
- dept_by_external = await _sync_campuses_and_departments(db, dept_items)
- await _sync_roles(db, role_items)
- await _sync_users(db, user_items, dept_by_external)
- await mark_sync_completed(db)
- await db.commit()
- logger.info("中台同步完成并已存入数据库")
- except Exception as e:
- logger.error(f"中台同步写入数据库失败: {str(e)}")
- raise
- finally:
- # 确保无论成功失败,都广播一次最新的状态(结束 loading)
- await _notify_sync_status()
- async def _cancel_then_sync(task: asyncio.Task, token: str) -> None:
- try:
- await task
- except asyncio.CancelledError:
- pass
- await sync_center_data(token)
- def _schedule_status_broadcast(_: asyncio.Task) -> None:
- asyncio.create_task(_notify_sync_status())
- async def trigger_center_sync(token: str, *, restart: bool, wait: bool) -> bool:
- global _sync_task
- async with _sync_task_lock:
- if _sync_task and not _sync_task.done():
- if not restart:
- return False
- _sync_task.cancel()
- _sync_task = asyncio.create_task(_cancel_then_sync(_sync_task, token))
- else:
- _sync_task = asyncio.create_task(sync_center_data(token))
- task = _sync_task
- task.add_done_callback(_schedule_status_broadcast)
- await _notify_sync_status()
- if wait:
- await task
- return True
- async def run_center_sync_once() -> None:
- async with SessionLocal() as db:
- token = await get_sync_token(db)
- if not token:
- logger.warning("未找到中台同步 token,跳过本次同步")
- return
- started = await trigger_center_sync(token, restart=False, wait=True)
- if not started:
- logger.info("已有同步任务在执行,跳过本次同步")
- async def run_center_sync_loop(stop_event: asyncio.Event) -> None:
- while not stop_event.is_set():
- now = datetime.now()
- next_run = now.replace(
- hour=settings.center_sync_hour,
- minute=settings.center_sync_minute,
- second=0,
- microsecond=0
- )
- if next_run <= now:
- next_run = next_run + timedelta(days=1)
- delay = (next_run - now).total_seconds()
- try:
- await asyncio.wait_for(stop_event.wait(), timeout=delay)
- except asyncio.TimeoutError:
- try:
- await run_center_sync_once()
- except Exception:
- logger.exception("中台同步失败")
|