Files
huihuiSquare/backend/app/services/token_service.py
yuqianqian10204095yu cebc0a288f 1.0.0初始化源代码
2026-03-23 15:40:36 +08:00

174 lines
5.3 KiB
Python

"""
Token 使用统计服务
"""
import logging
from typing import Dict, Any, List
from datetime import datetime, timedelta, date
from sqlalchemy.orm import Session
from sqlalchemy import func, and_, extract
from app.models.token_usage import TokenUsage
from app.core.config import settings
logger = logging.getLogger(__name__)
class TokenService:
"""Token 统计服务类"""
def __init__(self, db: Session):
self.db = db
def get_today_usage(self) -> int:
"""获取今日 Token 使用量"""
today = date.today()
result = self.db.query(func.sum(TokenUsage.tokens_used)).filter(
func.date(TokenUsage.usage_date) == today
).scalar()
return result or 0
def get_yesterday_usage(self) -> int:
"""获取昨日 Token 使用量"""
yesterday = date.today() - timedelta(days=1)
result = self.db.query(func.sum(TokenUsage.tokens_used)).filter(
func.date(TokenUsage.usage_date) == yesterday
).scalar()
return result or 0
def get_month_usage(self, year: Optional[int] = None, month: Optional[int] = None) -> int:
"""获取当月 Token 使用量"""
if not year or not month:
now = datetime.now()
year = now.year
month = now.month
result = self.db.query(func.sum(TokenUsage.tokens_used)).filter(
and_(
extract('year', TokenUsage.usage_date) == year,
extract('month', TokenUsage.usage_date) == month
)
).scalar()
return result or 0
def get_remaining_tokens(self) -> int:
"""获取今日剩余 Token"""
today_used = self.get_today_usage()
remaining = settings.MAX_TOKENS_PER_DAY - today_used
return max(0, remaining)
def get_daily_usages(self, days: int = 30) -> List[Dict[str, Any]]:
"""
获取每日 Token 使用(用于图表)
:param days: 天数
:return: 每日使用列表
"""
end_date = date.today()
start_date = end_date - timedelta(days=days - 1)
results = self.db.query(
func.date(TokenUsage.usage_date).label('usage_date'),
func.sum(TokenUsage.tokens_used).label('total_tokens')
).filter(
and_(
func.date(TokenUsage.usage_date) >= start_date,
func.date(TokenUsage.usage_date) <= end_date
)
).group_by(
func.date(TokenUsage.usage_date)
).order_by(
func.date(TokenUsage.usage_date)
).all()
# 转换为字典列表
usage_dict = {str(row.usage_date): row.total_tokens for row in results}
# 填充缺失的日期
daily_usages = []
current_date = start_date
while current_date <= end_date:
date_str = str(current_date)
tokens = usage_dict.get(date_str, 0)
daily_usages.append({
"date": date_str,
"tokens": tokens
})
current_date += timedelta(days=1)
return daily_usages
def get_monthly_usages(self, months: int = 12) -> List[Dict[str, Any]]:
"""
获取每月 Token 使用(用于图表)
:param months: 月数
:return: 每月使用列表
"""
now = datetime.now()
results = []
for i in range(months):
# 计算月份
month_offset = months - 1 - i
target_date = now - timedelta(days=30 * month_offset)
year = target_date.year
month = target_date.month
# 查询该月的使用量
usage = self.get_month_usage(year, month)
results.append({
"month": f"{year}-{month:02d}",
"tokens": usage
})
return results
def get_user_token_usage(
self,
user_id: int,
days: int = 30
) -> List[Dict[str, Any]]:
"""
获取指定用户的 Token 使用
:param user_id: 用户 ID
:param days: 天数
:return: 每日使用列表
"""
end_date = date.today()
start_date = end_date - timedelta(days=days - 1)
results = self.db.query(
func.date(TokenUsage.usage_date).label('usage_date'),
func.sum(TokenUsage.tokens_used).label('total_tokens')
).filter(
and_(
TokenUsage.virtual_user_id == user_id,
func.date(TokenUsage.usage_date) >= start_date,
func.date(TokenUsage.usage_date) <= end_date
)
).group_by(
func.date(TokenUsage.usage_date)
).order_by(
func.date(TokenUsage.usage_date)
).all()
return [
{"date": str(row.usage_date), "tokens": row.total_tokens}
for row in results
]
def check_token_limit_exceeded(self) -> bool:
"""检查是否超出 Token 限额"""
today_used = self.get_today_usage()
return today_used >= settings.MAX_TOKENS_PER_DAY
# 工厂函数
def get_token_service(db: Session) -> TokenService:
"""获取 Token 服务实例"""
return TokenService(db)