"""数据统计服务""" from datetime import datetime, date, timedelta, timezone def _fmt_dt(dt): """统一输出 UTC 时间,带时区标识,让前端正确解析为 +8""" if dt is None: return None if dt.tzinfo is None: # 数据库存的是 UTC,补上时区信息 dt = dt.replace(tzinfo=timezone.utc) return dt.isoformat() from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, func, and_ from app.models import VirtualUser, InteractionRecord, TokenStat, SystemConfig from app.core.logger import logger class StatsService: async def get_dashboard(self, db: AsyncSession) -> dict: """获取控制台数据""" today = date.today() now = datetime.now() month_start = today.replace(day=1) # 用户统计 user_stats = await self._get_user_stats(db) # 今日互动统计 today_stats = await self._get_today_stats(db, today) # 本月互动统计 monthly_stats = await self._get_monthly_stats(db, month_start, today) # Token统计 token_stats = await self._get_token_stats(db, today) # 系统状态 system_status = await self._get_system_status(db, now) # 在线用户数 online_count_result = await db.execute( select(func.count()).where(VirtualUser.status == 2) ) online_count = online_count_result.scalar() or 0 return { "user_stats": user_stats, "today_interactions": today_stats, "monthly_stats": monthly_stats, "token_stats": token_stats, "system_status": system_status, "online_users": online_count, } async def _get_user_stats(self, db: AsyncSession) -> dict: total = await db.execute(select(func.count()).select_from(VirtualUser)) normal = await db.execute(select(func.count()).where(VirtualUser.is_enabled == 1)) banned = await db.execute(select(func.count()).where(VirtualUser.status == 4)) abnormal = await db.execute(select(func.count()).where(VirtualUser.status == 3)) return { "total": total.scalar() or 0, "normal": normal.scalar() or 0, "banned": banned.scalar() or 0, "abnormal": abnormal.scalar() or 0, } async def _get_today_stats(self, db: AsyncSession, today: date) -> dict: result = await db.execute( select( InteractionRecord.interact_type, func.count().label("cnt"), ).where( func.date(InteractionRecord.executed_at) == today, InteractionRecord.status == 1, ).group_by(InteractionRecord.interact_type) ) rows = result.all() stats = {"comment": 0, "reply": 0, "like": 0, "collect": 0, "forward": 0, "total": 0} for row in rows: if row.interact_type in stats: stats[row.interact_type] = row.cnt stats["total"] += row.cnt return stats async def _get_monthly_stats(self, db: AsyncSession, month_start: date, today: date) -> dict: result = await db.execute( select(func.count()).where( InteractionRecord.executed_at >= month_start, InteractionRecord.status == 1, ) ) return {"total": result.scalar() or 0, "month_start": month_start.isoformat()} async def _get_token_stats(self, db: AsyncSession, today: date) -> dict: # 今日 today_stat = await db.execute(select(TokenStat).where(TokenStat.stat_date == today)) today_row = today_stat.scalar_one_or_none() # 每日限额 limit_cfg = await db.execute( select(SystemConfig).where(SystemConfig.config_key == "daily_token_limit") ) limit_row = limit_cfg.scalar_one_or_none() daily_limit = int(limit_row.config_value) if limit_row else 100000 today_used = today_row.total_tokens if today_row else 0 return { "today_used": today_used, "daily_limit": daily_limit, "remaining": max(0, daily_limit - today_used), "today_calls": today_row.call_count if today_row else 0, } async def _get_system_status(self, db: AsyncSession, now: datetime) -> dict: start_cfg = await db.execute( select(SystemConfig).where(SystemConfig.config_key == "system_start_time") ) start_row = start_cfg.scalar_one_or_none() uptime = "" if start_row and start_row.config_value: try: start_time = datetime.fromisoformat(start_row.config_value) delta = now - start_time hours, rem = divmod(int(delta.total_seconds()), 3600) mins = rem // 60 uptime = f"{hours}小时{mins}分钟" except Exception: uptime = "未知" scheduler_cfg = await db.execute( select(SystemConfig).where(SystemConfig.config_key == "scheduler_enabled") ) scheduler_row = scheduler_cfg.scalar_one_or_none() return { "uptime": uptime, "scheduler_enabled": (scheduler_row.config_value == "true") if scheduler_row else True, "current_time": now.isoformat(), } async def get_token_trend(self, db: AsyncSession, days: int = 30) -> list: """Token消耗趋势(近N天)""" end_date = date.today() start_date = end_date - timedelta(days=days - 1) result = await db.execute( select(TokenStat).where( TokenStat.stat_date >= start_date, TokenStat.stat_date <= end_date, ).order_by(TokenStat.stat_date) ) rows = result.scalars().all() stat_map = {r.stat_date.isoformat(): r.total_tokens for r in rows} trend = [] for i in range(days): d = (start_date + timedelta(days=i)).isoformat() trend.append({"date": d, "tokens": stat_map.get(d, 0)}) return trend async def get_monthly_token_trend(self, db: AsyncSession) -> list: """近12个月Token消耗""" today = date.today() months = [] for i in range(11, -1, -1): if today.month - i <= 0: year = today.year - 1 month = today.month - i + 12 else: year = today.year month = today.month - i months.append((year, month)) trend = [] for year, month in months: start = date(year, month, 1) if month == 12: end = date(year + 1, 1, 1) - timedelta(days=1) else: end = date(year, month + 1, 1) - timedelta(days=1) result = await db.execute( select(func.sum(TokenStat.total_tokens)).where( TokenStat.stat_date >= start, TokenStat.stat_date <= end ) ) total = result.scalar() or 0 trend.append({"month": f"{year}-{month:02d}", "tokens": total}) return trend async def get_interaction_records( self, db: AsyncSession, page: int = 1, page_size: int = 20, user_id: int = None, interact_type: str = None, status: int = None, start_date: str = None, end_date: str = None, keyword: str = None ) -> dict: query = select(InteractionRecord) conditions = [] if user_id: conditions.append(InteractionRecord.user_id == user_id) if interact_type: conditions.append(InteractionRecord.interact_type == interact_type) if status is not None: conditions.append(InteractionRecord.status == status) if start_date: conditions.append(InteractionRecord.executed_at >= start_date) if end_date: conditions.append(InteractionRecord.executed_at <= end_date + " 23:59:59") if keyword: from sqlalchemy import or_ conditions.append( or_(InteractionRecord.article_title.like(f"%{keyword}%"), InteractionRecord.content.like(f"%{keyword}%"), InteractionRecord.user_nickname.like(f"%{keyword}%")) ) if conditions: query = query.where(and_(*conditions)) count_q = select(func.count()).select_from(query.subquery()) total = (await db.execute(count_q)).scalar() query = query.order_by(InteractionRecord.executed_at.desc()).offset( (page - 1) * page_size ).limit(page_size) result = await db.execute(query) records = result.scalars().all() INTERACT_LABELS = { "comment": "评论", "reply": "回复", "like": "点赞", "collect": "收藏", "forward": "转发" } STATUS_LABELS = {0: "执行中", 1: "成功", 2: "失败"} items = [] for r in records: items.append({ "id": r.id, "user_id": r.user_id, "user_nickname": r.user_nickname, "user_account": r.user_account, "article_id": r.article_id, "article_title": r.article_title, "interact_type": r.interact_type, "interact_type_label": INTERACT_LABELS.get(r.interact_type, r.interact_type), "content": r.content, "token_consumed": r.token_consumed, "status": r.status, "status_label": STATUS_LABELS.get(r.status, "未知"), "error_msg": r.error_msg, "retry_count": r.retry_count, "executed_at": _fmt_dt(r.executed_at), }) return {"total": total, "page": page, "page_size": page_size, "items": items} stats_service = StatsService()