Files
huihuiSquare/backend/app/services/user_service.py
stefanfeng cd07776914 feat: 多项功能更新
- 日志时间改为北京时间(TZ=Asia/Shanghai)
- 评论达上限后继续执行点赞/收藏/转发
- 用户信息同步改用 PATCH /v2/users/current
- 一键登出全部功能
- 一键登出全部前端按钮
- update.sh 一键更新脚本
2026-03-31 10:29:26 +08:00

359 lines
17 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""虚拟用户业务服务"""
import io
import uuid
from datetime import datetime, timezone
def _fmt_dt(dt):
if dt is None: return None
if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc)
return dt.isoformat()
from typing import List, Optional, Tuple
import pandas as pd
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update, delete, func, and_, or_
from fastapi import HTTPException
from app.models import VirtualUser, UserPersonality
from app.schemas import UserCreateRequest, UserUpdateRequest
from app.utils.crypto import encrypt, decrypt
from app.services.ai_service import ai_service
from app.core.logger import logger
STATUS_LABELS = {0: "未登录", 1: "登录中", 2: "已登录", 3: "登录失效", 4: "封禁"}
ACTIVITY_LABELS = {0: "", 1: "", 2: ""}
ACTIVITY_COMMENT_LIMITS = {0: (3, 5), 1: (8, 15), 2: (20, 30)}
class UserService:
async def get_users(
self, db: AsyncSession,
page: int = 1, page_size: int = 20,
keyword: str = None, status: int = None,
is_enabled: int = None
) -> Tuple[int, List[dict]]:
query = select(VirtualUser)
conditions = []
if keyword:
conditions.append(
or_(VirtualUser.nickname.like(f"%{keyword}%"),
VirtualUser.account.like(f"%{keyword}%"))
)
if status is not None:
conditions.append(VirtualUser.status == status)
if is_enabled is not None:
conditions.append(VirtualUser.is_enabled == is_enabled)
if conditions:
query = query.where(and_(*conditions))
count_result = await db.execute(
select(func.count()).select_from(query.subquery())
)
total = count_result.scalar()
query = query.offset((page - 1) * page_size).limit(page_size).order_by(VirtualUser.created_at.desc())
result = await db.execute(query)
users = result.scalars().all()
items = []
for u in users:
# 获取人格
p_result = await db.execute(select(UserPersonality).where(UserPersonality.user_id == u.id))
personality = p_result.scalar_one_or_none()
items.append(self._format_user(u, personality))
return total, items
async def create_user(self, db: AsyncSession, req: UserCreateRequest) -> dict:
# 检查账号重复
existing = await db.execute(select(VirtualUser).where(VirtualUser.account == req.account))
if existing.scalar_one_or_none():
raise HTTPException(status_code=400, detail="账号已存在")
# 昵称选填:为空则自动生成
nickname = req.nickname or f"用户{req.account[-4:]}"
# 检查昵称重复(自动生成的若冲突则加随机后缀)
existing_nick = await db.execute(select(VirtualUser).where(VirtualUser.nickname == nickname))
if existing_nick.scalar_one_or_none():
import random, string
nickname = nickname + "_" + "".join(random.choices(string.digits, k=4))
user = VirtualUser(
nickname=nickname,
account=req.account,
password_enc=encrypt(req.password),
avatar_url=req.avatar_url,
activity_level=req.activity_level,
daily_comment_limit=req.daily_comment_limit,
daily_like_limit=req.daily_like_limit,
remark=req.remark,
status=0,
is_enabled=1,
)
db.add(user)
await db.flush()
# 自动生成AI人格
try:
await self._generate_personality(db, user)
except Exception as e:
logger.warning(f"人格生成失败,跳过: {e}")
await db.commit()
await db.refresh(user)
p_result = await db.execute(select(UserPersonality).where(UserPersonality.user_id == user.id))
personality = p_result.scalar_one_or_none()
return self._format_user(user, personality)
async def update_user(self, db: AsyncSession, user_id: int, req: UserUpdateRequest) -> dict:
user = await self._get_or_404(db, user_id)
if req.nickname and req.nickname != user.nickname:
existing = await db.execute(
select(VirtualUser).where(VirtualUser.nickname == req.nickname, VirtualUser.id != user_id)
)
if existing.scalar_one_or_none():
raise HTTPException(status_code=400, detail="昵称已被使用")
user.nickname = req.nickname
if req.password:
user.password_enc = encrypt(req.password)
if req.avatar_url is not None:
user.avatar_url = req.avatar_url
if req.activity_level is not None:
user.activity_level = req.activity_level
if req.daily_comment_limit is not None:
user.daily_comment_limit = req.daily_comment_limit
if req.daily_like_limit is not None:
user.daily_like_limit = req.daily_like_limit
if req.remark is not None:
user.remark = req.remark
if req.is_enabled is not None:
user.is_enabled = req.is_enabled
if req.is_enabled == 0:
user.status = 0 # 禁用后重置状态
await db.commit()
await db.refresh(user)
p_result = await db.execute(select(UserPersonality).where(UserPersonality.user_id == user.id))
personality = p_result.scalar_one_or_none()
return self._format_user(user, personality)
async def delete_user(self, db: AsyncSession, user_id: int):
user = await self._get_or_404(db, user_id)
await db.execute(delete(UserPersonality).where(UserPersonality.user_id == user_id))
await db.delete(user)
await db.commit()
async def batch_action(self, db: AsyncSession, user_ids: List[int], action: str):
"""批量操作"""
if action == "enable":
await db.execute(update(VirtualUser).where(VirtualUser.id.in_(user_ids)).values(is_enabled=1))
elif action == "disable":
await db.execute(update(VirtualUser).where(VirtualUser.id.in_(user_ids)).values(is_enabled=0, status=0))
elif action == "logout":
await db.execute(update(VirtualUser).where(VirtualUser.id.in_(user_ids)).values(status=0, session_token=None))
elif action == "delete":
await db.execute(delete(UserPersonality).where(UserPersonality.user_id.in_(user_ids)))
await db.execute(delete(VirtualUser).where(VirtualUser.id.in_(user_ids)))
await db.commit()
return {"affected": len(user_ids)}
async def generate_personality(self, db: AsyncSession, user_id: int) -> dict:
"""为用户生成/重新生成AI人格"""
user = await self._get_or_404(db, user_id)
# 删除旧人格
await db.execute(delete(UserPersonality).where(UserPersonality.user_id == user_id))
personality = await self._generate_personality(db, user)
await db.commit()
return self._format_personality(personality)
async def update_personality(self, db: AsyncSession, user_id: int, req) -> dict:
p_result = await db.execute(select(UserPersonality).where(UserPersonality.user_id == user_id))
personality = p_result.scalar_one_or_none()
if not personality:
raise HTTPException(status_code=404, detail="人格不存在")
for field, val in req.model_dump(exclude_none=True).items():
setattr(personality, field, val)
# 重新生成提示词
personality.comment_style_prompt = self._build_style_prompt(personality)
await db.commit()
await db.refresh(personality)
return self._format_personality(personality)
async def import_from_excel(self, db: AsyncSession, file_content: bytes) -> dict:
"""Excel批量导入 - 每行独立事务,互不影响"""
try:
df = pd.read_excel(io.BytesIO(file_content), engine='openpyxl')
except Exception:
df = pd.read_excel(io.BytesIO(file_content))
df.columns = [str(c).strip() for c in df.columns]
required_cols = {"新闻平台账号", "登录密码", "昵称"}
if not required_cols.issubset(set(df.columns)):
raise HTTPException(
status_code=400,
detail=f"缺少必填列: {required_cols - set(df.columns)},当前列: {list(df.columns)}"
)
success_count = 0
error_list = []
for idx, row in df.iterrows():
row_num = idx + 2
row_account = ""
try:
# 账号可能是数字类型(手机号),统一转为字符串
account = str(row.get("新闻平台账号", "") or "").strip().split(".")[0] # 去掉 .0 后缀
password = str(row.get("登录密码", "") or "").strip()
nickname = str(row.get("昵称", "") or "").strip()
row_account = account
if account.lower() in ("nan", "none", ""):
error_list.append({"row": row_num, "error": "账号为空"}); continue
if password.lower() in ("nan", "none", ""):
error_list.append({"row": row_num, "account": account, "error": "密码为空"}); continue
if len(password) < 6:
error_list.append({"row": row_num, "account": account, "error": "密码不足6位"}); continue
# 昵称选填为空时自动用账号末4位生成
if nickname.lower() in ("nan", "none", ""):
nickname = f"用户{account[-4:]}"
existing = await db.execute(select(VirtualUser).where(VirtualUser.account == account))
if existing.scalar_one_or_none():
error_list.append({"row": row_num, "account": account, "error": "账号已存在"}); continue
existing_nick = await db.execute(select(VirtualUser).where(VirtualUser.nickname == nickname))
if existing_nick.scalar_one_or_none():
error_list.append({"row": row_num, "account": account, "error": "昵称已被使用"}); continue
avatar = str(row.get("头像链接", "") or "").strip()
remark = str(row.get("备注", "") or "").strip()
user = VirtualUser(
nickname=nickname, account=account,
password_enc=encrypt(password),
avatar_url=avatar if avatar.lower() not in ("nan","none","") else None,
remark=remark if remark.lower() not in ("nan","none","") else None,
status=0, is_enabled=1, activity_level=1,
)
db.add(user)
await db.flush()
await db.commit()
try:
await db.refresh(user)
await self._generate_personality(db, user)
await db.commit()
except Exception as pe:
logger.warning(f"{row_num}行人格生成跳过: {pe}")
await db.rollback()
success_count += 1
except Exception as e:
await db.rollback()
error_list.append({"row": row_num, "account": row_account, "error": str(e)})
logger.warning(f"导入第{row_num}行失败: {e}")
return {"success": success_count, "failed": len(error_list), "errors": error_list}
async def export_to_excel(self, db: AsyncSession) -> bytes:
"""导出全量用户数据(不含密码)"""
result = await db.execute(select(VirtualUser).order_by(VirtualUser.created_at.desc()))
users = result.scalars().all()
rows = []
for u in users:
p_result = await db.execute(select(UserPersonality).where(UserPersonality.user_id == u.id))
p = p_result.scalar_one_or_none()
rows.append({
"ID": u.id, "昵称": u.nickname, "账号": u.account,
"状态": STATUS_LABELS.get(u.status, "未知"),
"活跃度": ACTIVITY_LABELS.get(u.activity_level, ""),
"性格": p.character_type if p else "", "语言风格": p.language_style if p else "",
"兴趣偏好": ",".join(p.interest_tags or []) if p else "",
"互动倾向": p.interact_tendency if p else "",
"累计互动": u.total_interactions, "今日评论": u.today_comment_count,
"今日点赞": u.today_like_count, "最后登录": u.last_login_at,
"最后互动": u.last_interact_at, "备注": u.remark,
"是否启用": "" if u.is_enabled else "", "创建时间": u.created_at,
})
df = pd.DataFrame(rows)
buf = io.BytesIO()
df.to_excel(buf, index=False, sheet_name="虚拟用户")
buf.seek(0)
return buf.read()
async def get_excel_template(self) -> bytes:
"""获取导入模板(账号+密码必填,其他选填)"""
df = pd.DataFrame(columns=["新闻平台账号", "登录密码", "昵称(选填)", "头像链接(选填)", "备注(选填)"])
df.loc[0] = ["13800138000", "password123", "(留空自动生成)", "", ""]
buf = io.BytesIO()
df.to_excel(buf, index=False, sheet_name="导入模板")
buf.seek(0)
return buf.read()
async def _generate_personality(self, db: AsyncSession, user: VirtualUser) -> UserPersonality:
"""调用AI生成人格"""
result = await ai_service.generate_personality(user.nickname, user.account)
personality = UserPersonality(
user_id=user.id,
character_type=result.get("character_type", "温和"),
language_style=result.get("language_style", "幽默"),
interest_tags=result.get("interest_tags", ["科技"]),
interact_tendency=result.get("interact_tendency", "爱评论"),
word_count_min=result.get("word_count_min", 20),
word_count_max=result.get("word_count_max", 80),
personality_desc=result.get("personality_desc", ""),
)
personality.comment_style_prompt = self._build_style_prompt(personality)
db.add(personality)
await db.flush()
return personality
def _build_style_prompt(self, p: UserPersonality) -> str:
interests = "".join(p.interest_tags or []) if p.interest_tags else "综合"
return (
f"你是一个{p.character_type}性格、{p.language_style}语言风格的新闻读者,"
f"主要对{interests}类内容感兴趣,互动倾向是{p.interact_tendency}"
f"评论字数控制在{p.word_count_min}~{p.word_count_max}字。"
f"个人简介:{p.personality_desc}"
)
def _format_user(self, u: VirtualUser, p: Optional[UserPersonality]) -> dict:
return {
"id": u.id, "nickname": u.nickname, "account": u.account,
"avatar_url": u.avatar_url,
"real_name": getattr(u, "real_name", None),
"sex": getattr(u, "sex", 0),
"platform_uid": getattr(u, "platform_uid", None),
"status": u.status,
"status_label": STATUS_LABELS.get(u.status, "未知"),
"activity_level": u.activity_level,
"activity_label": ACTIVITY_LABELS.get(u.activity_level, ""),
"daily_comment_limit": u.daily_comment_limit,
"daily_like_limit": u.daily_like_limit,
"today_comment_count": u.today_comment_count,
"today_like_count": u.today_like_count,
"total_interactions": u.total_interactions,
"last_login_at": _fmt_dt(u.last_login_at),
"last_interact_at": _fmt_dt(u.last_interact_at),
"remark": u.remark, "is_enabled": u.is_enabled,
"created_at": _fmt_dt(u.created_at),
"personality": self._format_personality(p) if p else None,
}
def _format_personality(self, p: Optional[UserPersonality]) -> Optional[dict]:
if not p:
return None
return {
"id": p.id, "user_id": p.user_id,
"character_type": p.character_type, "language_style": p.language_style,
"interest_tags": p.interest_tags or [], "interact_tendency": p.interact_tendency,
"word_count_min": p.word_count_min, "word_count_max": p.word_count_max,
"personality_desc": p.personality_desc,
"updated_at": _fmt_dt(p.updated_at),
}
async def _get_or_404(self, db: AsyncSession, user_id: int) -> VirtualUser:
result = await db.execute(select(VirtualUser).where(VirtualUser.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
return user
user_service = UserService()