Files
huihuiSquare/backend/app/services/stats_service.py
stefanfeng cd07776914 feat: 多项功能更新
- 日志时间改为北京时间(TZ=Asia/Shanghai)
- 评论达上限后继续执行点赞/收藏/转发
- 用户信息同步改用 PATCH /v2/users/current
- 一键登出全部功能
- 一键登出全部前端按钮
- update.sh 一键更新脚本
2026-03-31 10:29:26 +08:00

252 lines
9.7 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""数据统计服务"""
from datetime import datetime, date, timedelta, timezone
def _fmt_dt(dt):
"""统一输出 UTC 时间,带时区标识,让前端正确解析为 +8"""
if dt is None:
return None
if dt.tzinfo is None:
# 数据库存的是 UTC补上时区信息
dt = dt.replace(tzinfo=timezone.utc)
return dt.isoformat()
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, and_
from app.models import VirtualUser, InteractionRecord, TokenStat, SystemConfig
from app.core.logger import logger
class StatsService:
async def get_dashboard(self, db: AsyncSession) -> dict:
"""获取控制台数据"""
today = date.today()
now = datetime.now()
month_start = today.replace(day=1)
# 用户统计
user_stats = await self._get_user_stats(db)
# 今日互动统计
today_stats = await self._get_today_stats(db, today)
# 本月互动统计
monthly_stats = await self._get_monthly_stats(db, month_start, today)
# Token统计
token_stats = await self._get_token_stats(db, today)
# 系统状态
system_status = await self._get_system_status(db, now)
# 在线用户数
online_count_result = await db.execute(
select(func.count()).where(VirtualUser.status == 2)
)
online_count = online_count_result.scalar() or 0
return {
"user_stats": user_stats,
"today_interactions": today_stats,
"monthly_stats": monthly_stats,
"token_stats": token_stats,
"system_status": system_status,
"online_users": online_count,
}
async def _get_user_stats(self, db: AsyncSession) -> dict:
total = await db.execute(select(func.count()).select_from(VirtualUser))
normal = await db.execute(select(func.count()).where(VirtualUser.is_enabled == 1))
banned = await db.execute(select(func.count()).where(VirtualUser.status == 4))
abnormal = await db.execute(select(func.count()).where(VirtualUser.status == 3))
return {
"total": total.scalar() or 0,
"normal": normal.scalar() or 0,
"banned": banned.scalar() or 0,
"abnormal": abnormal.scalar() or 0,
}
async def _get_today_stats(self, db: AsyncSession, today: date) -> dict:
result = await db.execute(
select(
InteractionRecord.interact_type,
func.count().label("cnt"),
).where(
func.date(InteractionRecord.executed_at) == today,
InteractionRecord.status == 1,
).group_by(InteractionRecord.interact_type)
)
rows = result.all()
stats = {"comment": 0, "reply": 0, "like": 0, "collect": 0, "forward": 0, "total": 0}
for row in rows:
if row.interact_type in stats:
stats[row.interact_type] = row.cnt
stats["total"] += row.cnt
return stats
async def _get_monthly_stats(self, db: AsyncSession, month_start: date, today: date) -> dict:
result = await db.execute(
select(func.count()).where(
InteractionRecord.executed_at >= month_start,
InteractionRecord.status == 1,
)
)
return {"total": result.scalar() or 0, "month_start": month_start.isoformat()}
async def _get_token_stats(self, db: AsyncSession, today: date) -> dict:
# 今日
today_stat = await db.execute(select(TokenStat).where(TokenStat.stat_date == today))
today_row = today_stat.scalar_one_or_none()
# 每日限额
limit_cfg = await db.execute(
select(SystemConfig).where(SystemConfig.config_key == "daily_token_limit")
)
limit_row = limit_cfg.scalar_one_or_none()
daily_limit = int(limit_row.config_value) if limit_row else 100000
today_used = today_row.total_tokens if today_row else 0
return {
"today_used": today_used,
"daily_limit": daily_limit,
"remaining": max(0, daily_limit - today_used),
"today_calls": today_row.call_count if today_row else 0,
}
async def _get_system_status(self, db: AsyncSession, now: datetime) -> dict:
start_cfg = await db.execute(
select(SystemConfig).where(SystemConfig.config_key == "system_start_time")
)
start_row = start_cfg.scalar_one_or_none()
uptime = ""
if start_row and start_row.config_value:
try:
start_time = datetime.fromisoformat(start_row.config_value)
delta = now - start_time
hours, rem = divmod(int(delta.total_seconds()), 3600)
mins = rem // 60
uptime = f"{hours}小时{mins}分钟"
except Exception:
uptime = "未知"
scheduler_cfg = await db.execute(
select(SystemConfig).where(SystemConfig.config_key == "scheduler_enabled")
)
scheduler_row = scheduler_cfg.scalar_one_or_none()
return {
"uptime": uptime,
"scheduler_enabled": (scheduler_row.config_value == "true") if scheduler_row else True,
"current_time": now.isoformat(),
}
async def get_token_trend(self, db: AsyncSession, days: int = 30) -> list:
"""Token消耗趋势近N天"""
end_date = date.today()
start_date = end_date - timedelta(days=days - 1)
result = await db.execute(
select(TokenStat).where(
TokenStat.stat_date >= start_date,
TokenStat.stat_date <= end_date,
).order_by(TokenStat.stat_date)
)
rows = result.scalars().all()
stat_map = {r.stat_date.isoformat(): r.total_tokens for r in rows}
trend = []
for i in range(days):
d = (start_date + timedelta(days=i)).isoformat()
trend.append({"date": d, "tokens": stat_map.get(d, 0)})
return trend
async def get_monthly_token_trend(self, db: AsyncSession) -> list:
"""近12个月Token消耗"""
today = date.today()
months = []
for i in range(11, -1, -1):
if today.month - i <= 0:
year = today.year - 1
month = today.month - i + 12
else:
year = today.year
month = today.month - i
months.append((year, month))
trend = []
for year, month in months:
start = date(year, month, 1)
if month == 12:
end = date(year + 1, 1, 1) - timedelta(days=1)
else:
end = date(year, month + 1, 1) - timedelta(days=1)
result = await db.execute(
select(func.sum(TokenStat.total_tokens)).where(
TokenStat.stat_date >= start, TokenStat.stat_date <= end
)
)
total = result.scalar() or 0
trend.append({"month": f"{year}-{month:02d}", "tokens": total})
return trend
async def get_interaction_records(
self, db: AsyncSession,
page: int = 1, page_size: int = 20,
user_id: int = None, interact_type: str = None,
status: int = None, start_date: str = None,
end_date: str = None, keyword: str = None
) -> dict:
query = select(InteractionRecord)
conditions = []
if user_id:
conditions.append(InteractionRecord.user_id == user_id)
if interact_type:
conditions.append(InteractionRecord.interact_type == interact_type)
if status is not None:
conditions.append(InteractionRecord.status == status)
if start_date:
conditions.append(InteractionRecord.executed_at >= start_date)
if end_date:
conditions.append(InteractionRecord.executed_at <= end_date + " 23:59:59")
if keyword:
from sqlalchemy import or_
conditions.append(
or_(InteractionRecord.article_title.like(f"%{keyword}%"),
InteractionRecord.content.like(f"%{keyword}%"),
InteractionRecord.user_nickname.like(f"%{keyword}%"))
)
if conditions:
query = query.where(and_(*conditions))
count_q = select(func.count()).select_from(query.subquery())
total = (await db.execute(count_q)).scalar()
query = query.order_by(InteractionRecord.executed_at.desc()).offset(
(page - 1) * page_size
).limit(page_size)
result = await db.execute(query)
records = result.scalars().all()
INTERACT_LABELS = {
"comment": "评论", "reply": "回复", "like": "点赞",
"collect": "收藏", "forward": "转发"
}
STATUS_LABELS = {0: "执行中", 1: "成功", 2: "失败"}
items = []
for r in records:
items.append({
"id": r.id, "user_id": r.user_id,
"user_nickname": r.user_nickname, "user_account": r.user_account,
"article_id": r.article_id, "article_title": r.article_title,
"interact_type": r.interact_type,
"interact_type_label": INTERACT_LABELS.get(r.interact_type, r.interact_type),
"content": r.content, "token_consumed": r.token_consumed,
"status": r.status, "status_label": STATUS_LABELS.get(r.status, "未知"),
"error_msg": r.error_msg, "retry_count": r.retry_count,
"executed_at": _fmt_dt(r.executed_at),
})
return {"total": total, "page": page, "page_size": page_size, "items": items}
stats_service = StatsService()