问题根因: 1. SQL查询加了 .limit(max_concurrent),导致只有数据库前5条用户参与互动 2. 额外的 random.random() < 0.6 过滤进一步减少了执行用户数 修复方案: - 查询所有已登录用户(去掉 SQL LIMIT) - 按最后互动时间升序排序,最久未互动的用户优先 - 前1/3名额给最久未互动用户(优先权),其余随机补充 - 每轮最多执行 max_concurrent 个用户,保证公平轮转
490 lines
24 KiB
Python
Executable File
490 lines
24 KiB
Python
Executable File
"""调度服务 - 定时自动互动、会话校验"""
|
||
import random
|
||
import asyncio
|
||
from datetime import datetime, date
|
||
from typing import Optional
|
||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||
from apscheduler.triggers.interval import IntervalTrigger
|
||
from sqlalchemy import select, update, func
|
||
|
||
from app.core.database import AsyncSessionLocal
|
||
from app.core.logger import logger
|
||
from app.models import VirtualUser, UserPersonality, InteractionRecord, SystemConfig
|
||
|
||
|
||
class SchedulerService:
|
||
def __init__(self):
|
||
self.scheduler = AsyncIOScheduler(timezone="Asia/Shanghai")
|
||
self._running = False
|
||
|
||
async def run_once_now(self, db=None):
|
||
"""立即执行一次互动,不受时间段限制"""
|
||
from sqlalchemy import select
|
||
from app.core.database import AsyncSessionLocal
|
||
logger.info("⚡ 立即触发互动任务")
|
||
async with AsyncSessionLocal() as session:
|
||
result_r = await session.execute(
|
||
select(VirtualUser).where(
|
||
VirtualUser.status == 2,
|
||
VirtualUser.is_enabled == 1,
|
||
)
|
||
)
|
||
users = result_r.scalars().all()
|
||
if not users:
|
||
return {"message": "没有已登录的用户", "triggered": 0}
|
||
import random
|
||
selected = random.sample(users, min(5, len(users)))
|
||
import asyncio
|
||
tasks = [self._execute_user_interaction(u.id) for u in selected]
|
||
await asyncio.gather(*tasks, return_exceptions=True)
|
||
return {"triggered": len(selected), "users": [u.account for u in selected]}
|
||
|
||
async def start(self):
|
||
if self._running:
|
||
return
|
||
# 会话校验:每10分钟
|
||
self.scheduler.add_job(
|
||
self._check_sessions, IntervalTrigger(minutes=10),
|
||
id="check_sessions", replace_existing=True
|
||
)
|
||
# 互动任务:每5分钟检查一次(内部判断是否在活跃时间段)
|
||
self.scheduler.add_job(
|
||
self._run_interactions, IntervalTrigger(minutes=5),
|
||
id="run_interactions", replace_existing=True
|
||
)
|
||
# 每日零点重置计数
|
||
self.scheduler.add_job(
|
||
self._daily_reset, "cron", hour=16, minute=0, # 北京时间 00:00 = UTC 16:00
|
||
id="daily_reset", replace_existing=True
|
||
)
|
||
self.scheduler.start()
|
||
self._running = True
|
||
logger.info("调度器已启动")
|
||
# 记录启动时间
|
||
async with AsyncSessionLocal() as db:
|
||
await self._set_config(db, "system_start_time", datetime.now().isoformat())
|
||
|
||
async def stop(self):
|
||
if self.scheduler.running:
|
||
self.scheduler.shutdown(wait=False)
|
||
self._running = False
|
||
|
||
async def _get_config(self, db, key: str, default=None):
|
||
result = await db.execute(select(SystemConfig).where(SystemConfig.config_key == key))
|
||
cfg = result.scalar_one_or_none()
|
||
return cfg.config_value if cfg else default
|
||
|
||
async def _set_config(self, db, key: str, value: str):
|
||
result = await db.execute(select(SystemConfig).where(SystemConfig.config_key == key))
|
||
cfg = result.scalar_one_or_none()
|
||
if cfg:
|
||
cfg.config_value = value
|
||
else:
|
||
db.add(SystemConfig(config_key=key, config_value=value))
|
||
await db.commit()
|
||
|
||
async def _check_sessions(self):
|
||
"""定时校验登录状态"""
|
||
from app.services.news_service import news_service
|
||
async with AsyncSessionLocal() as db:
|
||
result = await db.execute(
|
||
select(VirtualUser).where(VirtualUser.status == 2, VirtualUser.is_enabled == 1)
|
||
)
|
||
users = result.scalars().all()
|
||
for user in users:
|
||
try:
|
||
valid = await news_service.check_session(db, user)
|
||
if not valid:
|
||
logger.warning(f"用户 {user.account} 会话失效,尝试重登")
|
||
await news_service.login(db, user)
|
||
except Exception as e:
|
||
logger.error(f"会话校验异常 {user.account}: {e}")
|
||
|
||
async def _run_interactions(self):
|
||
"""执行互动任务"""
|
||
async with AsyncSessionLocal() as db:
|
||
# 检查调度器开关
|
||
enabled = await self._get_config(db, "scheduler_enabled", "true")
|
||
if enabled != "true":
|
||
return
|
||
|
||
# 检查Token限额
|
||
token_limited = await self._get_config(db, "token_limit_reached", "false")
|
||
if token_limited == "true":
|
||
return
|
||
|
||
# 检查互动时间段(北京时间 UTC+8)
|
||
from datetime import timezone, timedelta
|
||
tz_beijing = timezone(timedelta(hours=8))
|
||
now_bj = datetime.now(tz_beijing)
|
||
now_time = now_bj.strftime("%H:%M")
|
||
start_str = await self._get_config(db, "interact_time_start", "08:00")
|
||
end_str = await self._get_config(db, "interact_time_end", "22:00")
|
||
if not (start_str <= now_time <= end_str):
|
||
logger.debug(f"[调度] 当前北京时间 {now_time} 不在互动时段 {start_str}-{end_str}")
|
||
return
|
||
|
||
# 获取最小互动间隔(秒)
|
||
min_interval = int(await self._get_config(db, "interact_min_interval", "300"))
|
||
|
||
# 获取最大并发
|
||
max_concurrent = int(await self._get_config(db, "max_concurrent_users", "5"))
|
||
|
||
# 获取所有已登录、启用的用户(不加 LIMIT,确保所有用户公平参与)
|
||
result = await db.execute(
|
||
select(VirtualUser).where(
|
||
VirtualUser.status == 2,
|
||
VirtualUser.is_enabled == 1,
|
||
)
|
||
)
|
||
all_users = result.scalars().all()
|
||
|
||
# 没有已登录用户时,尝试登录未登录用户
|
||
if not all_users:
|
||
await self._try_login_users(db)
|
||
return
|
||
|
||
# 检查互动间隔:过滤掉最近 min_interval 秒内已互动的用户
|
||
now_dt = datetime.now()
|
||
eligible = []
|
||
for u in all_users:
|
||
if u.last_interact_at is None:
|
||
eligible.append(u)
|
||
else:
|
||
elapsed = (now_dt - u.last_interact_at).total_seconds()
|
||
if elapsed >= min_interval:
|
||
eligible.append(u)
|
||
|
||
if not eligible:
|
||
logger.debug(f"[调度] 所有 {len(all_users)} 个用户在 {min_interval}s 内已互动,跳过本次")
|
||
return
|
||
|
||
# 按最后互动时间升序排序:最久没互动的用户优先
|
||
eligible.sort(key=lambda u: u.last_interact_at or datetime.min)
|
||
|
||
# 从符合条件的用户中随机选取 max_concurrent 个执行(保证公平轮转)
|
||
batch_size = max_concurrent if max_concurrent > 0 else len(eligible)
|
||
# 优先选最久未互动的用户(前1/3),其余随机补充
|
||
priority_size = max(1, batch_size // 3)
|
||
priority_users = eligible[:priority_size]
|
||
rest_users = eligible[priority_size:]
|
||
random.shuffle(rest_users)
|
||
selected = priority_users + rest_users[:max(0, batch_size - priority_size)]
|
||
|
||
logger.info(
|
||
f"[调度] 共 {len(all_users)} 个用户,{len(eligible)} 个满足间隔,"
|
||
f"本轮选取 {len(selected)} 个执行互动"
|
||
)
|
||
|
||
for user in selected:
|
||
asyncio.create_task(self._execute_user_interaction(user.id))
|
||
|
||
async def _try_login_users(self, db):
|
||
"""尝试登录未登录的用户"""
|
||
from app.services.news_service import news_service
|
||
result = await db.execute(
|
||
select(VirtualUser).where(
|
||
VirtualUser.status.in_([0, 3]),
|
||
VirtualUser.is_enabled == 1
|
||
).limit(3)
|
||
)
|
||
users = result.scalars().all()
|
||
for user in users:
|
||
try:
|
||
await news_service.login(db, user)
|
||
await asyncio.sleep(2)
|
||
except Exception as e:
|
||
logger.error(f"自动登录失败 {user.account}: {e}")
|
||
|
||
async def _execute_user_interaction(self, user_id: int):
|
||
"""执行单用户互动 - 基于真实接口"""
|
||
from app.services.news_service import news_service
|
||
from app.services.ai_service import ai_service
|
||
|
||
async with AsyncSessionLocal() as db:
|
||
try:
|
||
user_result = await db.execute(select(VirtualUser).where(VirtualUser.id == user_id))
|
||
user = user_result.scalar_one_or_none()
|
||
if not user or user.status != 2:
|
||
return
|
||
|
||
# 检查今日评论限额
|
||
can_comment = True
|
||
if user.today_comment_count >= user.daily_comment_limit:
|
||
can_comment = False
|
||
logger.info(f'用户 ' + user.account + ' 今日评论已达上限,仍执行点赞/收藏/转发')
|
||
|
||
# 获取人格
|
||
from app.models import UserPersonality
|
||
p_result = await db.execute(
|
||
select(UserPersonality).where(UserPersonality.user_id == user_id)
|
||
)
|
||
personality = p_result.scalar_one_or_none()
|
||
interest_tags = personality.interest_tags if personality else []
|
||
|
||
# 获取新闻列表(基于接口 GET /news/list)
|
||
articles = await news_service.get_news_list(
|
||
db, user, count=5, interest_tags=interest_tags
|
||
)
|
||
if not articles:
|
||
# 尝试从 session 获取 org_id 再试一次
|
||
from app.core.redis_client import get_session as _get_sess
|
||
sess = await _get_sess(user.id)
|
||
org_from_sess = sess.get("org_id", "") if sess else ""
|
||
if org_from_sess:
|
||
articles = await news_service.get_news_list(
|
||
db, user, count=5, interest_tags=interest_tags
|
||
)
|
||
if not articles:
|
||
logger.warning(
|
||
f"用户 {user.account} 获取新闻列表为空 "
|
||
f"(orgId={await news_service._cfg(db, 'platform_org_id', '')})"
|
||
)
|
||
return
|
||
|
||
# ── 文章去重 + 热度加权选取 ─────────────────────────────────
|
||
# 查询今日已评论过的文章ID(避免重复评论同一篇)
|
||
from sqlalchemy import func as _func
|
||
from datetime import date as _date
|
||
today_str = datetime.now().date()
|
||
dup_result = await db.execute(
|
||
select(InteractionRecord.article_id).where(
|
||
InteractionRecord.user_id == user_id,
|
||
InteractionRecord.interact_type == "comment",
|
||
InteractionRecord.status == 1,
|
||
_func.date(InteractionRecord.executed_at) == today_str,
|
||
)
|
||
)
|
||
already_commented = {r[0] for r in dup_result.all()}
|
||
|
||
# 按热度加权:commentNum + praiseNum + readNum 越高权重越大
|
||
# 同时优先未评论过的文章
|
||
def _article_weight(a):
|
||
aid = str(a.get("recordId") or a.get("id", ""))
|
||
base = (
|
||
int(a.get("commentNum") or 0) * 3 +
|
||
int(a.get("praiseNum") or 0) * 2 +
|
||
int(a.get("readNum") or 0)
|
||
)
|
||
# 已评论的文章权重大幅降低(但不为0,还可以点赞/收藏)
|
||
penalty = 0.1 if aid in already_commented else 1.0
|
||
return max(1, base) * penalty
|
||
|
||
weights = [_article_weight(a) for a in articles]
|
||
article = random.choices(articles, weights=weights, k=1)[0]
|
||
|
||
# 判断是否已评论此文章(用于后续逻辑)
|
||
news_id = str(article.get("recordId") or article.get("id", ""))
|
||
already_commented_this = news_id in already_commented
|
||
|
||
# 接口返回字段: id/newsTitle/content/digest/createUser
|
||
# 广场接口字段:recordId=新闻实际ID, id=广场记录ID, title=标题
|
||
news_title = article.get("title") or article.get("newsTitle") or "未知文章"
|
||
news_content = article.get("content") or article.get("digest") or news_title
|
||
news_author = str(article.get("createUser") or "")
|
||
# 从广场数据中顺带获取 orgId
|
||
article_org_id = str(article.get("orgId") or "")
|
||
|
||
if not news_id:
|
||
return
|
||
|
||
# 读取互动概率
|
||
comment_prob = float(await self._get_config_from_db(db, "comment_probability", "0.4"))
|
||
reply_prob = float(await self._get_config_from_db(db, "reply_probability", "0.2"))
|
||
like_prob = float(await self._get_config_from_db(db, "like_probability", "0.6"))
|
||
collect_prob = float(await self._get_config_from_db(db, "collect_probability", "0.3"))
|
||
forward_prob = float(await self._get_config_from_db(db, "forward_probability", "0.15"))
|
||
|
||
interactions_done = []
|
||
|
||
# ① 先记录阅读(每次必做,模拟真实用户打开文章)
|
||
await news_service.read_news(db, user, news_id)
|
||
|
||
# ② 点赞
|
||
if random.random() < like_prob:
|
||
success, err = await news_service.like_news(db, user, news_id, org_id=article_org_id, to_user_id=news_author, title=news_title)
|
||
await self._save_record(db, user, news_id, news_title, "like", None, 0, success, err)
|
||
if success:
|
||
interactions_done.append("like")
|
||
await self._incr_total(db, user_id)
|
||
|
||
# ③ 收藏(阅读+点赞组合模拟)
|
||
if random.random() < collect_prob:
|
||
success, err = await news_service.collect_news(db, user, news_id, org_id=article_org_id, to_user_id=news_author, title=news_title)
|
||
await self._save_record(db, user, news_id, news_title, "collect", None, 0, success, err)
|
||
if success:
|
||
interactions_done.append("collect")
|
||
|
||
# ④ 转发(调用 /points/forward/news/{orgId})
|
||
if random.random() < forward_prob:
|
||
success, err = await news_service.forward_news(db, user, news_id)
|
||
await self._save_record(db, user, news_id, news_title, "forward", None, 0, success, err)
|
||
if success:
|
||
interactions_done.append("forward")
|
||
await self._incr_total(db, user_id)
|
||
|
||
# ⑤ 评论/回复逻辑(去重:已评论过的文章改为回复他人评论)
|
||
if can_comment and personality:
|
||
style_prompt = personality.comment_style_prompt or ""
|
||
safe_word_max = min(personality.word_count_max, 80)
|
||
|
||
if already_commented_this:
|
||
# 已评论过此文章 → 改为回复其他用户的评论(虚拟用户互动)
|
||
if random.random() < reply_prob:
|
||
existing = await news_service.get_comments(db, user, news_id)
|
||
if existing:
|
||
# 优先回复虚拟用户的评论(促进互动),过滤掉自己的评论
|
||
from app.core.redis_client import get_session as _gs
|
||
my_sess = await _gs(user.id)
|
||
my_uid = my_sess.get("platform_uid", "") if my_sess else ""
|
||
others = [c for c in existing
|
||
if str(c.get("userId") or c.get("createUser") or "") != my_uid]
|
||
if others:
|
||
target = random.choice(others)
|
||
cid = str(target.get("id") or target.get("commentId") or "")
|
||
parent_content = target.get("content") or ""
|
||
if cid:
|
||
reply_text, r_tokens = await ai_service.generate_reply(
|
||
db, news_title, parent_content,
|
||
style_prompt,
|
||
personality.word_count_min,
|
||
safe_word_max
|
||
)
|
||
if reply_text:
|
||
r_ok, r_err = await news_service.post_reply(
|
||
db, user, news_id, cid, reply_text
|
||
)
|
||
await self._save_record(
|
||
db, user, news_id, news_title, "reply",
|
||
reply_text, r_tokens, r_ok, r_err,
|
||
parent_comment_id=cid
|
||
)
|
||
if r_ok:
|
||
interactions_done.append("reply")
|
||
logger.info(f"💬 {user.account} 回复了已评论文章的评论(去重逻辑)")
|
||
else:
|
||
# 未评论过此文章 → 正常发评论
|
||
if random.random() < comment_prob:
|
||
comment_text, tokens = await ai_service.generate_comment(
|
||
db, news_title, news_content,
|
||
style_prompt, personality.word_count_min, safe_word_max
|
||
)
|
||
if comment_text:
|
||
success, err = await news_service.post_comment(
|
||
db, user, news_id, news_title, comment_text,
|
||
news_author_id=news_author, org_id=article_org_id
|
||
)
|
||
await self._save_record(
|
||
db, user, news_id, news_title, "comment",
|
||
comment_text, tokens, success, err
|
||
)
|
||
if success:
|
||
interactions_done.append("comment")
|
||
await db.execute(
|
||
update(VirtualUser).where(VirtualUser.id == user_id).values(
|
||
today_comment_count=VirtualUser.today_comment_count + 1,
|
||
total_interactions=VirtualUser.total_interactions + 1,
|
||
last_interact_at=datetime.now()
|
||
)
|
||
)
|
||
|
||
# ⑥ 评论成功后,随机回复其他用户的评论(互动链)
|
||
if random.random() < reply_prob:
|
||
existing = await news_service.get_comments(db, user, news_id)
|
||
if existing:
|
||
from app.core.redis_client import get_session as _gs2
|
||
my_sess2 = await _gs2(user.id)
|
||
my_uid2 = my_sess2.get("platform_uid", "") if my_sess2 else ""
|
||
others2 = [c for c in existing
|
||
if str(c.get("userId") or c.get("createUser") or "") != my_uid2]
|
||
if others2:
|
||
target2 = random.choice(others2)
|
||
cid2 = str(target2.get("id") or target2.get("commentId") or "")
|
||
parent_content2 = target2.get("content") or ""
|
||
if cid2:
|
||
reply_text2, r_tokens2 = await ai_service.generate_reply(
|
||
db, news_title, parent_content2,
|
||
style_prompt,
|
||
personality.word_count_min,
|
||
safe_word_max
|
||
)
|
||
if reply_text2:
|
||
r_ok2, r_err2 = await news_service.post_reply(
|
||
db, user, news_id, cid2, reply_text2
|
||
)
|
||
await self._save_record(
|
||
db, user, news_id, news_title, "reply",
|
||
reply_text2, r_tokens2, r_ok2, r_err2,
|
||
parent_comment_id=cid2
|
||
)
|
||
if r_ok2:
|
||
interactions_done.append("reply")
|
||
|
||
await db.commit()
|
||
logger.info(f"👤 {user.account} 互动完成: {interactions_done} [新闻: {news_title[:20]}]")
|
||
|
||
except Exception as e:
|
||
logger.error(f"用户 {user_id} 互动异常: {e}")
|
||
|
||
async def _incr_total(self, db, user_id: int):
|
||
await db.execute(
|
||
update(VirtualUser).where(VirtualUser.id == user_id).values(
|
||
total_interactions=VirtualUser.total_interactions + 1,
|
||
last_interact_at=datetime.now()
|
||
)
|
||
)
|
||
|
||
async def _save_record(
|
||
self, db, user: VirtualUser, article_id: str, article_title: str,
|
||
interact_type: str, content: Optional[str], tokens: int,
|
||
success: bool, error_msg: str, parent_comment_id: str = None,
|
||
platform_record_id: str = None
|
||
):
|
||
from app.core.redis_client import get_session
|
||
session = await get_session(user.id)
|
||
session_id = session.get("session_id") if session else None
|
||
|
||
record = InteractionRecord(
|
||
user_id=user.id,
|
||
user_nickname=user.nickname,
|
||
user_account=user.account,
|
||
article_id=article_id,
|
||
article_title=article_title,
|
||
interact_type=interact_type,
|
||
content=content,
|
||
parent_comment_id=parent_comment_id,
|
||
platform_record_id=platform_record_id,
|
||
session_id=session_id,
|
||
token_consumed=tokens,
|
||
status=1 if success else 2,
|
||
error_msg=error_msg or None,
|
||
executed_at=datetime.now(),
|
||
)
|
||
db.add(record)
|
||
|
||
async def _get_config_from_db(self, db, key: str, default: str = "") -> str:
|
||
result = await db.execute(select(SystemConfig).where(SystemConfig.config_key == key))
|
||
cfg = result.scalar_one_or_none()
|
||
return cfg.config_value if cfg else default
|
||
|
||
async def _daily_reset(self):
|
||
"""每日零点重置计数"""
|
||
async with AsyncSessionLocal() as db:
|
||
await db.execute(
|
||
update(VirtualUser).values(
|
||
today_comment_count=0,
|
||
today_like_count=0
|
||
)
|
||
)
|
||
# 重置Token限额标志
|
||
result = await db.execute(
|
||
select(SystemConfig).where(SystemConfig.config_key == "token_limit_reached")
|
||
)
|
||
cfg = result.scalar_one_or_none()
|
||
if cfg:
|
||
cfg.config_value = "false"
|
||
await db.commit()
|
||
logger.info("每日计数重置完成")
|
||
|
||
|
||
scheduler_service = SchedulerService() |