from datetime import date from fastapi import APIRouter, Depends, Query from sqlalchemy import func, select, case from sqlalchemy.ext.asyncio import AsyncSession from backend.app.core.dependencies import require_permissions, get_current_user from backend.app.db.session import get_db from backend.app.models import ScheduleItem, User router = APIRouter(prefix="/statistics", tags=["statistics"]) def is_limited_scope(user: User) -> bool: return user.role and user.role.name not in {"管理员", "排班员"} @router.get("/schedule", dependencies=[Depends(require_permissions(["statistics.view"]))]) async def get_schedule_stats( db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), dept_id: str | None = Query(default=None), start: date | None = Query(default=None), end: date | None = Query(default=None), ): conditions = [] if is_limited_scope(current_user): if not current_user.dept_id: return {"byDept": {}, "byStaff": {}, "byShift": {}, "total": 0} conditions.append(ScheduleItem.dept_id == current_user.dept_id) elif dept_id: conditions.append(ScheduleItem.dept_id == dept_id) conditions.append(ScheduleItem.tag != "stopClinic") if start: conditions.append(ScheduleItem.date >= start) if end: conditions.append(ScheduleItem.date <= end) total_result = await db.execute( select(func.count()).select_from(select(ScheduleItem).where(*conditions).subquery()) ) total = total_result.scalar_one() or 0 by_dept = await db.execute( select(ScheduleItem.dept_id, func.count()) .where(*conditions) .group_by(ScheduleItem.dept_id) ) by_shift = await db.execute( select(ScheduleItem.shift_id, func.count()) .where(*conditions) .group_by(ScheduleItem.shift_id) ) by_staff = await db.execute( select( ScheduleItem.staff_id, func.count(case((ScheduleItem.tag == "substitute", 1))).label("substitute"), func.count(case((ScheduleItem.tag != "substitute", 1))).label("normal"), ) .where(*conditions) .group_by(ScheduleItem.staff_id) ) return { "byDept": {str(row[0]): row[1] for row in by_dept.fetchall()}, "byShift": {str(row[0]): row[1] for row in by_shift.fetchall()}, "byStaff": {str(row[0]): {"normal": row[2], "substitute": row[1]} for row in by_staff.fetchall()}, "total": total }