- 日志时间改为北京时间(TZ=Asia/Shanghai) - 评论达上限后继续执行点赞/收藏/转发 - 用户信息同步改用 PATCH /v2/users/current - 一键登出全部功能 - 一键登出全部前端按钮 - update.sh 一键更新脚本
359 lines
17 KiB
Python
Executable File
359 lines
17 KiB
Python
Executable File
"""虚拟用户业务服务"""
|
||
import io
|
||
import uuid
|
||
from datetime import datetime, timezone
|
||
|
||
def _fmt_dt(dt):
|
||
if dt is None: return None
|
||
if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc)
|
||
return dt.isoformat()
|
||
from typing import List, Optional, Tuple
|
||
import pandas as pd
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
from sqlalchemy import select, update, delete, func, and_, or_
|
||
from fastapi import HTTPException
|
||
|
||
from app.models import VirtualUser, UserPersonality
|
||
from app.schemas import UserCreateRequest, UserUpdateRequest
|
||
from app.utils.crypto import encrypt, decrypt
|
||
from app.services.ai_service import ai_service
|
||
from app.core.logger import logger
|
||
|
||
STATUS_LABELS = {0: "未登录", 1: "登录中", 2: "已登录", 3: "登录失效", 4: "封禁"}
|
||
ACTIVITY_LABELS = {0: "低", 1: "中", 2: "高"}
|
||
ACTIVITY_COMMENT_LIMITS = {0: (3, 5), 1: (8, 15), 2: (20, 30)}
|
||
|
||
|
||
class UserService:
|
||
|
||
async def get_users(
|
||
self, db: AsyncSession,
|
||
page: int = 1, page_size: int = 20,
|
||
keyword: str = None, status: int = None,
|
||
is_enabled: int = None
|
||
) -> Tuple[int, List[dict]]:
|
||
query = select(VirtualUser)
|
||
conditions = []
|
||
if keyword:
|
||
conditions.append(
|
||
or_(VirtualUser.nickname.like(f"%{keyword}%"),
|
||
VirtualUser.account.like(f"%{keyword}%"))
|
||
)
|
||
if status is not None:
|
||
conditions.append(VirtualUser.status == status)
|
||
if is_enabled is not None:
|
||
conditions.append(VirtualUser.is_enabled == is_enabled)
|
||
if conditions:
|
||
query = query.where(and_(*conditions))
|
||
|
||
count_result = await db.execute(
|
||
select(func.count()).select_from(query.subquery())
|
||
)
|
||
total = count_result.scalar()
|
||
|
||
query = query.offset((page - 1) * page_size).limit(page_size).order_by(VirtualUser.created_at.desc())
|
||
result = await db.execute(query)
|
||
users = result.scalars().all()
|
||
|
||
items = []
|
||
for u in users:
|
||
# 获取人格
|
||
p_result = await db.execute(select(UserPersonality).where(UserPersonality.user_id == u.id))
|
||
personality = p_result.scalar_one_or_none()
|
||
items.append(self._format_user(u, personality))
|
||
return total, items
|
||
|
||
async def create_user(self, db: AsyncSession, req: UserCreateRequest) -> dict:
|
||
# 检查账号重复
|
||
existing = await db.execute(select(VirtualUser).where(VirtualUser.account == req.account))
|
||
if existing.scalar_one_or_none():
|
||
raise HTTPException(status_code=400, detail="账号已存在")
|
||
# 昵称选填:为空则自动生成
|
||
nickname = req.nickname or f"用户{req.account[-4:]}"
|
||
# 检查昵称重复(自动生成的若冲突则加随机后缀)
|
||
existing_nick = await db.execute(select(VirtualUser).where(VirtualUser.nickname == nickname))
|
||
if existing_nick.scalar_one_or_none():
|
||
import random, string
|
||
nickname = nickname + "_" + "".join(random.choices(string.digits, k=4))
|
||
|
||
user = VirtualUser(
|
||
nickname=nickname,
|
||
account=req.account,
|
||
password_enc=encrypt(req.password),
|
||
avatar_url=req.avatar_url,
|
||
activity_level=req.activity_level,
|
||
daily_comment_limit=req.daily_comment_limit,
|
||
daily_like_limit=req.daily_like_limit,
|
||
remark=req.remark,
|
||
status=0,
|
||
is_enabled=1,
|
||
)
|
||
db.add(user)
|
||
await db.flush()
|
||
# 自动生成AI人格
|
||
try:
|
||
await self._generate_personality(db, user)
|
||
except Exception as e:
|
||
logger.warning(f"人格生成失败,跳过: {e}")
|
||
await db.commit()
|
||
await db.refresh(user)
|
||
p_result = await db.execute(select(UserPersonality).where(UserPersonality.user_id == user.id))
|
||
personality = p_result.scalar_one_or_none()
|
||
return self._format_user(user, personality)
|
||
|
||
async def update_user(self, db: AsyncSession, user_id: int, req: UserUpdateRequest) -> dict:
|
||
user = await self._get_or_404(db, user_id)
|
||
if req.nickname and req.nickname != user.nickname:
|
||
existing = await db.execute(
|
||
select(VirtualUser).where(VirtualUser.nickname == req.nickname, VirtualUser.id != user_id)
|
||
)
|
||
if existing.scalar_one_or_none():
|
||
raise HTTPException(status_code=400, detail="昵称已被使用")
|
||
user.nickname = req.nickname
|
||
if req.password:
|
||
user.password_enc = encrypt(req.password)
|
||
if req.avatar_url is not None:
|
||
user.avatar_url = req.avatar_url
|
||
if req.activity_level is not None:
|
||
user.activity_level = req.activity_level
|
||
if req.daily_comment_limit is not None:
|
||
user.daily_comment_limit = req.daily_comment_limit
|
||
if req.daily_like_limit is not None:
|
||
user.daily_like_limit = req.daily_like_limit
|
||
if req.remark is not None:
|
||
user.remark = req.remark
|
||
if req.is_enabled is not None:
|
||
user.is_enabled = req.is_enabled
|
||
if req.is_enabled == 0:
|
||
user.status = 0 # 禁用后重置状态
|
||
await db.commit()
|
||
await db.refresh(user)
|
||
p_result = await db.execute(select(UserPersonality).where(UserPersonality.user_id == user.id))
|
||
personality = p_result.scalar_one_or_none()
|
||
return self._format_user(user, personality)
|
||
|
||
async def delete_user(self, db: AsyncSession, user_id: int):
|
||
user = await self._get_or_404(db, user_id)
|
||
await db.execute(delete(UserPersonality).where(UserPersonality.user_id == user_id))
|
||
await db.delete(user)
|
||
await db.commit()
|
||
|
||
async def batch_action(self, db: AsyncSession, user_ids: List[int], action: str):
|
||
"""批量操作"""
|
||
if action == "enable":
|
||
await db.execute(update(VirtualUser).where(VirtualUser.id.in_(user_ids)).values(is_enabled=1))
|
||
elif action == "disable":
|
||
await db.execute(update(VirtualUser).where(VirtualUser.id.in_(user_ids)).values(is_enabled=0, status=0))
|
||
elif action == "logout":
|
||
await db.execute(update(VirtualUser).where(VirtualUser.id.in_(user_ids)).values(status=0, session_token=None))
|
||
elif action == "delete":
|
||
await db.execute(delete(UserPersonality).where(UserPersonality.user_id.in_(user_ids)))
|
||
await db.execute(delete(VirtualUser).where(VirtualUser.id.in_(user_ids)))
|
||
await db.commit()
|
||
return {"affected": len(user_ids)}
|
||
|
||
async def generate_personality(self, db: AsyncSession, user_id: int) -> dict:
|
||
"""为用户生成/重新生成AI人格"""
|
||
user = await self._get_or_404(db, user_id)
|
||
# 删除旧人格
|
||
await db.execute(delete(UserPersonality).where(UserPersonality.user_id == user_id))
|
||
personality = await self._generate_personality(db, user)
|
||
await db.commit()
|
||
return self._format_personality(personality)
|
||
|
||
async def update_personality(self, db: AsyncSession, user_id: int, req) -> dict:
|
||
p_result = await db.execute(select(UserPersonality).where(UserPersonality.user_id == user_id))
|
||
personality = p_result.scalar_one_or_none()
|
||
if not personality:
|
||
raise HTTPException(status_code=404, detail="人格不存在")
|
||
for field, val in req.model_dump(exclude_none=True).items():
|
||
setattr(personality, field, val)
|
||
# 重新生成提示词
|
||
personality.comment_style_prompt = self._build_style_prompt(personality)
|
||
await db.commit()
|
||
await db.refresh(personality)
|
||
return self._format_personality(personality)
|
||
|
||
async def import_from_excel(self, db: AsyncSession, file_content: bytes) -> dict:
|
||
"""Excel批量导入 - 每行独立事务,互不影响"""
|
||
try:
|
||
df = pd.read_excel(io.BytesIO(file_content), engine='openpyxl')
|
||
except Exception:
|
||
df = pd.read_excel(io.BytesIO(file_content))
|
||
|
||
df.columns = [str(c).strip() for c in df.columns]
|
||
required_cols = {"新闻平台账号", "登录密码", "昵称"}
|
||
if not required_cols.issubset(set(df.columns)):
|
||
raise HTTPException(
|
||
status_code=400,
|
||
detail=f"缺少必填列: {required_cols - set(df.columns)},当前列: {list(df.columns)}"
|
||
)
|
||
|
||
success_count = 0
|
||
error_list = []
|
||
|
||
for idx, row in df.iterrows():
|
||
row_num = idx + 2
|
||
row_account = ""
|
||
try:
|
||
# 账号可能是数字类型(手机号),统一转为字符串
|
||
account = str(row.get("新闻平台账号", "") or "").strip().split(".")[0] # 去掉 .0 后缀
|
||
password = str(row.get("登录密码", "") or "").strip()
|
||
nickname = str(row.get("昵称", "") or "").strip()
|
||
row_account = account
|
||
|
||
if account.lower() in ("nan", "none", ""):
|
||
error_list.append({"row": row_num, "error": "账号为空"}); continue
|
||
if password.lower() in ("nan", "none", ""):
|
||
error_list.append({"row": row_num, "account": account, "error": "密码为空"}); continue
|
||
if len(password) < 6:
|
||
error_list.append({"row": row_num, "account": account, "error": "密码不足6位"}); continue
|
||
# 昵称选填:为空时自动用账号末4位生成
|
||
if nickname.lower() in ("nan", "none", ""):
|
||
nickname = f"用户{account[-4:]}"
|
||
|
||
existing = await db.execute(select(VirtualUser).where(VirtualUser.account == account))
|
||
if existing.scalar_one_or_none():
|
||
error_list.append({"row": row_num, "account": account, "error": "账号已存在"}); continue
|
||
existing_nick = await db.execute(select(VirtualUser).where(VirtualUser.nickname == nickname))
|
||
if existing_nick.scalar_one_or_none():
|
||
error_list.append({"row": row_num, "account": account, "error": "昵称已被使用"}); continue
|
||
|
||
avatar = str(row.get("头像链接", "") or "").strip()
|
||
remark = str(row.get("备注", "") or "").strip()
|
||
|
||
user = VirtualUser(
|
||
nickname=nickname, account=account,
|
||
password_enc=encrypt(password),
|
||
avatar_url=avatar if avatar.lower() not in ("nan","none","") else None,
|
||
remark=remark if remark.lower() not in ("nan","none","") else None,
|
||
status=0, is_enabled=1, activity_level=1,
|
||
)
|
||
db.add(user)
|
||
await db.flush()
|
||
await db.commit()
|
||
|
||
try:
|
||
await db.refresh(user)
|
||
await self._generate_personality(db, user)
|
||
await db.commit()
|
||
except Exception as pe:
|
||
logger.warning(f"第{row_num}行人格生成跳过: {pe}")
|
||
await db.rollback()
|
||
|
||
success_count += 1
|
||
|
||
except Exception as e:
|
||
await db.rollback()
|
||
error_list.append({"row": row_num, "account": row_account, "error": str(e)})
|
||
logger.warning(f"导入第{row_num}行失败: {e}")
|
||
|
||
return {"success": success_count, "failed": len(error_list), "errors": error_list}
|
||
|
||
async def export_to_excel(self, db: AsyncSession) -> bytes:
|
||
"""导出全量用户数据(不含密码)"""
|
||
result = await db.execute(select(VirtualUser).order_by(VirtualUser.created_at.desc()))
|
||
users = result.scalars().all()
|
||
rows = []
|
||
for u in users:
|
||
p_result = await db.execute(select(UserPersonality).where(UserPersonality.user_id == u.id))
|
||
p = p_result.scalar_one_or_none()
|
||
rows.append({
|
||
"ID": u.id, "昵称": u.nickname, "账号": u.account,
|
||
"状态": STATUS_LABELS.get(u.status, "未知"),
|
||
"活跃度": ACTIVITY_LABELS.get(u.activity_level, "中"),
|
||
"性格": p.character_type if p else "", "语言风格": p.language_style if p else "",
|
||
"兴趣偏好": ",".join(p.interest_tags or []) if p else "",
|
||
"互动倾向": p.interact_tendency if p else "",
|
||
"累计互动": u.total_interactions, "今日评论": u.today_comment_count,
|
||
"今日点赞": u.today_like_count, "最后登录": u.last_login_at,
|
||
"最后互动": u.last_interact_at, "备注": u.remark,
|
||
"是否启用": "是" if u.is_enabled else "否", "创建时间": u.created_at,
|
||
})
|
||
df = pd.DataFrame(rows)
|
||
buf = io.BytesIO()
|
||
df.to_excel(buf, index=False, sheet_name="虚拟用户")
|
||
buf.seek(0)
|
||
return buf.read()
|
||
|
||
async def get_excel_template(self) -> bytes:
|
||
"""获取导入模板(账号+密码必填,其他选填)"""
|
||
df = pd.DataFrame(columns=["新闻平台账号", "登录密码", "昵称(选填)", "头像链接(选填)", "备注(选填)"])
|
||
df.loc[0] = ["13800138000", "password123", "(留空自动生成)", "", ""]
|
||
buf = io.BytesIO()
|
||
df.to_excel(buf, index=False, sheet_name="导入模板")
|
||
buf.seek(0)
|
||
return buf.read()
|
||
|
||
async def _generate_personality(self, db: AsyncSession, user: VirtualUser) -> UserPersonality:
|
||
"""调用AI生成人格"""
|
||
result = await ai_service.generate_personality(user.nickname, user.account)
|
||
personality = UserPersonality(
|
||
user_id=user.id,
|
||
character_type=result.get("character_type", "温和"),
|
||
language_style=result.get("language_style", "幽默"),
|
||
interest_tags=result.get("interest_tags", ["科技"]),
|
||
interact_tendency=result.get("interact_tendency", "爱评论"),
|
||
word_count_min=result.get("word_count_min", 20),
|
||
word_count_max=result.get("word_count_max", 80),
|
||
personality_desc=result.get("personality_desc", ""),
|
||
)
|
||
personality.comment_style_prompt = self._build_style_prompt(personality)
|
||
db.add(personality)
|
||
await db.flush()
|
||
return personality
|
||
|
||
def _build_style_prompt(self, p: UserPersonality) -> str:
|
||
interests = "、".join(p.interest_tags or []) if p.interest_tags else "综合"
|
||
return (
|
||
f"你是一个{p.character_type}性格、{p.language_style}语言风格的新闻读者,"
|
||
f"主要对{interests}类内容感兴趣,互动倾向是{p.interact_tendency}。"
|
||
f"评论字数控制在{p.word_count_min}~{p.word_count_max}字。"
|
||
f"个人简介:{p.personality_desc}"
|
||
)
|
||
|
||
def _format_user(self, u: VirtualUser, p: Optional[UserPersonality]) -> dict:
|
||
return {
|
||
"id": u.id, "nickname": u.nickname, "account": u.account,
|
||
"avatar_url": u.avatar_url,
|
||
"real_name": getattr(u, "real_name", None),
|
||
"sex": getattr(u, "sex", 0),
|
||
"platform_uid": getattr(u, "platform_uid", None),
|
||
"status": u.status,
|
||
"status_label": STATUS_LABELS.get(u.status, "未知"),
|
||
"activity_level": u.activity_level,
|
||
"activity_label": ACTIVITY_LABELS.get(u.activity_level, "中"),
|
||
"daily_comment_limit": u.daily_comment_limit,
|
||
"daily_like_limit": u.daily_like_limit,
|
||
"today_comment_count": u.today_comment_count,
|
||
"today_like_count": u.today_like_count,
|
||
"total_interactions": u.total_interactions,
|
||
"last_login_at": _fmt_dt(u.last_login_at),
|
||
"last_interact_at": _fmt_dt(u.last_interact_at),
|
||
"remark": u.remark, "is_enabled": u.is_enabled,
|
||
"created_at": _fmt_dt(u.created_at),
|
||
"personality": self._format_personality(p) if p else None,
|
||
}
|
||
|
||
def _format_personality(self, p: Optional[UserPersonality]) -> Optional[dict]:
|
||
if not p:
|
||
return None
|
||
return {
|
||
"id": p.id, "user_id": p.user_id,
|
||
"character_type": p.character_type, "language_style": p.language_style,
|
||
"interest_tags": p.interest_tags or [], "interact_tendency": p.interact_tendency,
|
||
"word_count_min": p.word_count_min, "word_count_max": p.word_count_max,
|
||
"personality_desc": p.personality_desc,
|
||
"updated_at": _fmt_dt(p.updated_at),
|
||
}
|
||
|
||
async def _get_or_404(self, db: AsyncSession, user_id: int) -> VirtualUser:
|
||
result = await db.execute(select(VirtualUser).where(VirtualUser.id == user_id))
|
||
user = result.scalar_one_or_none()
|
||
if not user:
|
||
raise HTTPException(status_code=404, detail="用户不存在")
|
||
return user
|
||
|
||
|
||
user_service = UserService()
|