Files
huihuiSquare/backend/app/services/ai_service.py
stefanfeng 958eaeda8a fix: 多项修复
- main.py: 加 _CNJSONResponse 修复 datetime 序列化时区(+00:00→+08:00)
- schemas/__init__.py: 加 _fmt_dt 函数和 sync_to_platform 字段
- ai_service.py: 评论 max_tokens 从 300 提升到 500 避免截断
- scheduler.py: datetime.utcnow() 全部改为 datetime.now()(北京时间)
- docker-compose.yml: MySQL 容器加 TZ=Asia/Shanghai
- Interactions.vue: 文章标题链接从系统配置读取域名,格式为 {域名}/huihui-h5/#/news/share?id={id}&login=no
2026-04-01 18:07:42 +08:00

259 lines
11 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""AI服务 - 人格生成、内容创作"""
import json
import random
import re
from typing import Optional
import httpx
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update
from app.models import AIModelConfig, TokenStat
from app.utils.crypto import decrypt
from app.core.logger import logger
from datetime import date
class AIService:
"""AI大模型服务"""
# 人格候选池
CHARACTER_TYPES = ["开朗", "内敛", "毒舌", "温和", "理性", "感性", "幽默", "严谨"]
LANGUAGE_STYLES = ["严肃", "幽默", "文艺", "吐槽", "口语化", "学术", "简洁", "丰富"]
INTEREST_TAGS_POOL = [
"科技", "财经", "娱乐", "体育", "政治", "文化", "教育", "医疗",
"汽车", "房产", "旅游", "美食", "军事", "国际", "环保", "农业"
]
INTERACT_TENDENCIES = ["爱评论", "爱点赞", "爱收藏", "潜水", "爱转发", "爱回复"]
async def _get_default_model(self, db: AsyncSession) -> Optional[AIModelConfig]:
result = await db.execute(
select(AIModelConfig).where(
AIModelConfig.is_default == 1, AIModelConfig.is_enabled == 1
)
)
return result.scalar_one_or_none()
async def _call_api(
self, db: AsyncSession, prompt: str, system_prompt: str = None,
max_tokens: int = None
) -> tuple[str, int]:
"""调用AI接口返回(内容, token数)"""
model = await self._get_default_model(db)
if not model:
# 无模型配置时返回随机预设
return "", 0
api_key = decrypt(model.api_key_enc) if model.api_key_enc else ""
base_url = model.api_base_url or "https://api.openai.com/v1"
headers = {"Content-Type": "application/json"}
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
messages.append({"role": "user", "content": prompt})
payload = {
"model": model.model_version or "gpt-3.5-turbo",
"messages": messages,
"temperature": model.temperature,
"max_tokens": max_tokens or model.max_tokens,
}
import asyncio as _asyncio
last_err = None
for attempt in range(3): # 最多重试3次
try:
async with httpx.AsyncClient(timeout=model.timeout_seconds) as client:
resp = await client.post(
f"{base_url}/chat/completions",
headers=headers,
json=payload,
)
# 429 限流:等待后重试
if resp.status_code == 429:
wait = 30 * (attempt + 1) # 30s, 60s, 90s
logger.warning(f"AI接口限流(429){wait}s后重试({attempt+1}/3)")
await _asyncio.sleep(wait)
continue
resp.raise_for_status()
data = resp.json()
text = data["choices"][0]["message"]["content"].strip()
tokens = data.get("usage", {}).get("total_tokens", 0)
await self._record_token_usage(db, tokens, data.get("usage", {}), model.model_name)
logger.bind(ai_call=True).info(
f"AI调用成功 model={model.model_name} tokens={tokens}"
)
return text, tokens
except Exception as e:
last_err = e
if attempt < 2:
await _asyncio.sleep(5 * (attempt + 1))
logger.error(f"AI调用失败(已重试3次): {last_err}")
return "", 0
async def generate_personality(self, nickname: str, account: str) -> dict:
"""生成用户人格含fallback随机生成"""
# 如无AI配置使用随机生成
from app.core.database import AsyncSessionLocal
try:
async with AsyncSessionLocal() as db:
model = await self._get_default_model(db)
if not model:
return self._random_personality()
prompt = f"""请为以下虚拟新闻读者生成一个独特的人格档案,要求真实自然、贴合中国用户特征。
用户昵称:{nickname}
请严格以JSON格式返回不要有其他内容
{{
"character_type": "从[开朗/内敛/毒舌/温和/理性/感性/幽默/严谨]中选一个",
"language_style": "从[严肃/幽默/文艺/吐槽/口语化/学术/简洁/丰富]中选一个",
"interest_tags": ["兴趣1", "兴趣2", "兴趣3"],
"interact_tendency": "从[爱评论/爱点赞/爱收藏/潜水/爱转发/爱回复]中选一个",
"word_count_min": 最少字数(10-50整数),
"word_count_max": 最多字数(50-200整数),
"personality_desc": "一句话描述此人的性格特点30字以内"
}}"""
content, _ = await self._call_api(db, prompt, max_tokens=300)
if content:
try:
# 提取JSON
json_match = re.search(r'\{.*\}', content, re.DOTALL)
if json_match:
return json.loads(json_match.group())
except Exception:
pass
return self._random_personality()
except Exception as e:
logger.error(f"人格生成失败: {e}")
return self._random_personality()
def _random_personality(self) -> dict:
"""随机生成人格无AI时的备用方案"""
interests = random.sample(self.INTEREST_TAGS_POOL, random.randint(2, 4))
char = random.choice(self.CHARACTER_TYPES)
style = random.choice(self.LANGUAGE_STYLES)
tendency = random.choice(self.INTERACT_TENDENCIES)
w_min = random.randint(15, 40)
w_max = random.randint(60, 150)
return {
"character_type": char,
"language_style": style,
"interest_tags": interests,
"interact_tendency": tendency,
"word_count_min": w_min,
"word_count_max": w_max,
"personality_desc": f"一个{char}性格、{tendency}的新闻读者",
}
async def generate_comment(
self, db: AsyncSession, article_title: str, article_content: str,
personality_prompt: str, word_min: int = 20, word_max: int = 80
) -> tuple[str, int]:
"""生成文章评论"""
system_prompt = f"""你是一名真实的社区用户,正在阅读新闻后发表评论。{personality_prompt}
重要规则:
- 评论必须积极正面、文明友善,绝对不包含任何政治敏感、色情、暴力、侮辱、歧视内容
- 不要提及具体政治人物、党派、政策批评、社会矛盾等敏感话题
- 内容围绕文章本身展开,表达个人感受、分享观点、提出建设性问题
- 语言朴实自然,像普通网友留言,不夸张不煽情"""
prompt = f"""请根据以下新闻文章写一条评论。
文章标题:{article_title}
文章摘要:{article_content[:200] if article_content else '(无摘要)'}
要求:
1. 评论字数 {word_min}~{word_max}
2. 内容积极正面,贴近文章主题
3. 语气自然真实,符合普通读者口吻
4. 必须是完整的句子,不能被截断,以句号/感叹号/问号结尾
5. 只输出评论正文,不要加任何前缀或解释
评论:"""
return await self._call_api(db, prompt, system_prompt, max_tokens=500)
async def generate_reply(
self, db: AsyncSession, article_title: str, parent_comment: str,
personality_prompt: str, word_min: int = 15, word_max: int = 60
) -> tuple[str, int]:
"""生成回复"""
system_prompt = f"""你是一名真实的社区用户。{personality_prompt}
重要规则:回复必须积极正面、文明友善,不含任何敏感违规内容。"""
prompt = f"""文章:{article_title}
原评论:{parent_comment}
请对上面的评论写一条友善自然的回复,{word_min}~{word_max}字,直接输出回复内容。"""
return await self._call_api(db, prompt, system_prompt, max_tokens=150)
async def test_model(self, db: AsyncSession, model_id: int, test_prompt: str) -> dict:
"""测试模型可用性"""
result = await db.execute(select(AIModelConfig).where(AIModelConfig.id == model_id))
model = result.scalar_one_or_none()
if not model:
return {"success": False, "error": "模型不存在"}
api_key = decrypt(model.api_key_enc) if model.api_key_enc else ""
base_url = model.api_base_url or "https://api.openai.com/v1"
headers = {"Content-Type": "application/json"}
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
payload = {
"model": model.model_version or "gpt-3.5-turbo",
"messages": [{"role": "user", "content": test_prompt}],
"max_tokens": 200,
}
try:
import time
start = time.time()
async with httpx.AsyncClient(timeout=model.timeout_seconds) as client:
resp = await client.post(f"{base_url}/chat/completions", headers=headers, json=payload)
resp.raise_for_status()
data = resp.json()
elapsed = round(time.time() - start, 2)
content = data["choices"][0]["message"]["content"]
tokens = data.get("usage", {}).get("total_tokens", 0)
return {
"success": True, "content": content,
"tokens": tokens, "elapsed_seconds": elapsed,
}
except Exception as e:
return {"success": False, "error": str(e)}
async def _record_token_usage(
self, db: AsyncSession, total: int, usage: dict, model_name: str
):
"""记录Token消耗"""
today = date.today()
from sqlalchemy.dialects.mysql import insert as mysql_insert
try:
existing = await db.execute(
select(TokenStat).where(TokenStat.stat_date == today)
)
stat = existing.scalar_one_or_none()
if stat:
stat.total_tokens += total
stat.prompt_tokens += usage.get("prompt_tokens", 0)
stat.completion_tokens += usage.get("completion_tokens", 0)
stat.call_count += 1
else:
stat = TokenStat(
stat_date=today,
model_name=model_name,
total_tokens=total,
prompt_tokens=usage.get("prompt_tokens", 0),
completion_tokens=usage.get("completion_tokens", 0),
call_count=1,
)
db.add(stat)
except Exception as e:
logger.error(f"记录Token消耗失败: {e}")
ai_service = AIService()