Files
huihuiSquare/backend/app/services/scheduler.py
stefanfeng c944fbb0ea fix: 今日文章配额控制,避免全部虚拟用户集中互动同一篇
问题:今日只有1篇文章时,所有虚拟用户全部互动该文章,历史文章无人问津

修复方案(配额制):
- 新增 count_today_articles():轻量统计今日广场文章数
- 配额规则:每篇今日文章最多吸引3个虚拟用户(可调)
  - 今日1篇 → 最多3人互动今日,其余全走历史
  - 今日5篇 → 最多15人互动今日,其余走历史
  - 今日10篇以上 → 批次内所有人均可互动今日文章
- get_news_list() 新增 force_history 参数,强制走 Phase 2
- 调度器在分发任务前计算配额,超出配额的用户透传 force_history=True

效果:新文章获得合理曝光,历史文章持续被互动,分布更自然
2026-04-08 11:47:36 +08:00

514 lines
26 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""调度服务 - 定时自动互动、会话校验"""
import random
import asyncio
from datetime import datetime, date
from typing import Optional
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger
from sqlalchemy import select, update, func
from app.core.database import AsyncSessionLocal
from app.core.logger import logger
from app.models import VirtualUser, UserPersonality, InteractionRecord, SystemConfig
class SchedulerService:
def __init__(self):
self.scheduler = AsyncIOScheduler(timezone="Asia/Shanghai")
self._running = False
async def run_once_now(self, db=None):
"""立即执行一次互动,不受时间段限制"""
from sqlalchemy import select
from app.core.database import AsyncSessionLocal
logger.info("⚡ 立即触发互动任务")
async with AsyncSessionLocal() as session:
result_r = await session.execute(
select(VirtualUser).where(
VirtualUser.status == 2,
VirtualUser.is_enabled == 1,
)
)
users = result_r.scalars().all()
if not users:
return {"message": "没有已登录的用户", "triggered": 0}
import random
selected = random.sample(users, min(5, len(users)))
import asyncio
tasks = [self._execute_user_interaction(u.id) for u in selected]
await asyncio.gather(*tasks, return_exceptions=True)
return {"triggered": len(selected), "users": [u.account for u in selected]}
async def start(self):
if self._running:
return
# 会话校验每10分钟
self.scheduler.add_job(
self._check_sessions, IntervalTrigger(minutes=10),
id="check_sessions", replace_existing=True
)
# 互动任务每5分钟检查一次内部判断是否在活跃时间段
self.scheduler.add_job(
self._run_interactions, IntervalTrigger(minutes=5),
id="run_interactions", replace_existing=True
)
# 每日零点重置计数
self.scheduler.add_job(
self._daily_reset, "cron", hour=16, minute=0, # 北京时间 00:00 = UTC 16:00
id="daily_reset", replace_existing=True
)
self.scheduler.start()
self._running = True
logger.info("调度器已启动")
# 记录启动时间
async with AsyncSessionLocal() as db:
await self._set_config(db, "system_start_time", datetime.now().isoformat())
async def stop(self):
if self.scheduler.running:
self.scheduler.shutdown(wait=False)
self._running = False
async def _get_config(self, db, key: str, default=None):
result = await db.execute(select(SystemConfig).where(SystemConfig.config_key == key))
cfg = result.scalar_one_or_none()
return cfg.config_value if cfg else default
async def _set_config(self, db, key: str, value: str):
result = await db.execute(select(SystemConfig).where(SystemConfig.config_key == key))
cfg = result.scalar_one_or_none()
if cfg:
cfg.config_value = value
else:
db.add(SystemConfig(config_key=key, config_value=value))
await db.commit()
async def _check_sessions(self):
"""定时校验登录状态"""
from app.services.news_service import news_service
async with AsyncSessionLocal() as db:
result = await db.execute(
select(VirtualUser).where(VirtualUser.status == 2, VirtualUser.is_enabled == 1)
)
users = result.scalars().all()
for user in users:
try:
valid = await news_service.check_session(db, user)
if not valid:
logger.warning(f"用户 {user.account} 会话失效,尝试重登")
await news_service.login(db, user)
except Exception as e:
logger.error(f"会话校验异常 {user.account}: {e}")
async def _run_interactions(self):
"""执行互动任务"""
async with AsyncSessionLocal() as db:
# 检查调度器开关
enabled = await self._get_config(db, "scheduler_enabled", "true")
if enabled != "true":
return
# 检查Token限额
token_limited = await self._get_config(db, "token_limit_reached", "false")
if token_limited == "true":
return
# 检查互动时间段(北京时间 UTC+8
from datetime import timezone, timedelta
tz_beijing = timezone(timedelta(hours=8))
now_bj = datetime.now(tz_beijing)
now_time = now_bj.strftime("%H:%M")
start_str = await self._get_config(db, "interact_time_start", "08:00")
end_str = await self._get_config(db, "interact_time_end", "22:00")
if not (start_str <= now_time <= end_str):
logger.debug(f"[调度] 当前北京时间 {now_time} 不在互动时段 {start_str}-{end_str}")
return
# 获取最小互动间隔(秒)
min_interval = int(await self._get_config(db, "interact_min_interval", "300"))
# 获取最大并发
max_concurrent = int(await self._get_config(db, "max_concurrent_users", "5"))
# 获取所有已登录、启用的用户(不加 LIMIT确保所有用户公平参与
result = await db.execute(
select(VirtualUser).where(
VirtualUser.status == 2,
VirtualUser.is_enabled == 1,
)
)
all_users = result.scalars().all()
# 没有已登录用户时,尝试登录未登录用户
if not all_users:
await self._try_login_users(db)
return
# 检查互动间隔:过滤掉最近 min_interval 秒内已互动的用户
now_dt = datetime.now()
eligible = []
for u in all_users:
if u.last_interact_at is None:
eligible.append(u)
else:
elapsed = (now_dt - u.last_interact_at).total_seconds()
if elapsed >= min_interval:
eligible.append(u)
if not eligible:
logger.debug(f"[调度] 所有 {len(all_users)} 个用户在 {min_interval}s 内已互动,跳过本次")
return
# 按最后互动时间升序排序:最久没互动的用户优先
eligible.sort(key=lambda u: u.last_interact_at or datetime.min)
# 从符合条件的用户中随机选取 max_concurrent 个执行(保证公平轮转)
batch_size = max_concurrent if max_concurrent > 0 else len(eligible)
# 优先选最久未互动的用户前1/3其余随机补充
priority_size = max(1, batch_size // 3)
priority_users = eligible[:priority_size]
rest_users = eligible[priority_size:]
random.shuffle(rest_users)
selected = priority_users + rest_users[:max(0, batch_size - priority_size)]
# ── 今日文章配额计算 ──────────────────────────────────────
# 获取今日文章数量,决定本轮有多少用户应互动今日文章
today_count = 0
try:
from app.services.news_service import news_service as _ns
today_count = await _ns.count_today_articles(db, selected[0] if selected else None)
except Exception:
pass
# 配额规则:每篇今日文章最多吸引 3 个虚拟用户,超出部分走历史
today_quota = min(today_count * 3, len(selected))
logger.info(
f"[调度] 共 {len(all_users)} 个用户,{len(eligible)} 个满足间隔,"
f"本轮选取 {len(selected)} 个,今日文章 {today_count} 篇,"
f"配额 {today_quota} 人互动今日/{len(selected)-today_quota} 人走历史"
)
for i, user in enumerate(selected):
# 超出今日配额的用户强制走历史文章
force_history = (i >= today_quota)
asyncio.create_task(self._execute_user_interaction(user.id, force_history=force_history))
async def _try_login_users(self, db):
"""尝试登录未登录的用户"""
from app.services.news_service import news_service
result = await db.execute(
select(VirtualUser).where(
VirtualUser.status.in_([0, 3]),
VirtualUser.is_enabled == 1
).limit(3)
)
users = result.scalars().all()
for user in users:
try:
await news_service.login(db, user)
await asyncio.sleep(2)
except Exception as e:
logger.error(f"自动登录失败 {user.account}: {e}")
async def _execute_user_interaction(self, user_id: int, force_history: bool = False):
"""执行单用户互动 - 基于真实接口"""
from app.services.news_service import news_service
from app.services.ai_service import ai_service
async with AsyncSessionLocal() as db:
try:
user_result = await db.execute(select(VirtualUser).where(VirtualUser.id == user_id))
user = user_result.scalar_one_or_none()
if not user or user.status != 2:
return
# 检查今日评论限额
can_comment = True
if user.today_comment_count >= user.daily_comment_limit:
can_comment = False
logger.info(f'用户 ' + user.account + ' 今日评论已达上限,仍执行点赞/收藏/转发')
# 获取人格
from app.models import UserPersonality
p_result = await db.execute(
select(UserPersonality).where(UserPersonality.user_id == user_id)
)
personality = p_result.scalar_one_or_none()
interest_tags = personality.interest_tags if personality else []
# 获取新闻列表(基于接口 GET /news/list
articles = await news_service.get_news_list(
db, user, count=5, interest_tags=interest_tags, force_history=force_history
)
if not articles:
# 尝试从 session 获取 org_id 再试一次
from app.core.redis_client import get_session as _get_sess
sess = await _get_sess(user.id)
org_from_sess = sess.get("org_id", "") if sess else ""
if org_from_sess:
articles = await news_service.get_news_list(
db, user, count=5, interest_tags=interest_tags
)
if not articles:
logger.warning(
f"用户 {user.account} 获取新闻列表为空 "
f"(orgId={await news_service._cfg(db, 'platform_org_id', '')})"
)
return
# ── 文章去重 + 热度加权选取 ─────────────────────────────────
# 查询今日已互动过的文章(所有类型),避免重复互动同一篇
from sqlalchemy import func as _func
from datetime import date as _date
today_str = datetime.now().date()
dup_result = await db.execute(
select(
InteractionRecord.article_id,
InteractionRecord.interact_type,
).where(
InteractionRecord.user_id == user_id,
InteractionRecord.status == 1,
_func.date(InteractionRecord.executed_at) == today_str,
)
)
# {article_id: set of interact_types already done today}
today_done: dict = {}
for r in dup_result.all():
today_done.setdefault(r[0], set()).add(r[1])
already_commented = {aid for aid, types in today_done.items() if "comment" in types}
# 按热度加权commentNum + praiseNum + readNum 越高权重越大
# 同时优先未评论过的文章
def _article_weight(a):
aid = str(a.get("recordId") or a.get("id", ""))
base = (
int(a.get("commentNum") or 0) * 3 +
int(a.get("praiseNum") or 0) * 2 +
int(a.get("readNum") or 0)
)
# 已评论的文章权重大幅降低但不为0还可以点赞/收藏)
penalty = 0.1 if aid in already_commented else 1.0
return max(1, base) * penalty
weights = [_article_weight(a) for a in articles]
article = random.choices(articles, weights=weights, k=1)[0]
# 判断是否已评论此文章(用于后续逻辑)
news_id = str(article.get("recordId") or article.get("id", ""))
already_commented_this = "comment" in today_done.get(news_id, set())
# 接口返回字段: id/newsTitle/content/digest/createUser
# 广场接口字段recordId=新闻实际ID, id=广场记录ID, title=标题
news_title = article.get("title") or article.get("newsTitle") or "未知文章"
news_content = article.get("content") or article.get("digest") or news_title
news_author = str(article.get("createUser") or "")
# 从广场数据中顺带获取 orgId
article_org_id = str(article.get("orgId") or "")
if not news_id:
return
# 读取互动概率
comment_prob = float(await self._get_config_from_db(db, "comment_probability", "0.4"))
reply_prob = float(await self._get_config_from_db(db, "reply_probability", "0.2"))
like_prob = float(await self._get_config_from_db(db, "like_probability", "0.6"))
collect_prob = float(await self._get_config_from_db(db, "collect_probability", "0.3"))
forward_prob = float(await self._get_config_from_db(db, "forward_probability", "0.15"))
interactions_done = []
# ① 先记录阅读(每次必做,模拟真实用户打开文章)
await news_service.read_news(db, user, news_id)
# 今日已对此文章做过的互动类型
done_on_this = today_done.get(news_id, set())
# ② 点赞(每篇文章每用户每天只点赞一次)
if "like" not in done_on_this and random.random() < like_prob:
success, err = await news_service.like_news(db, user, news_id, org_id=article_org_id, to_user_id=news_author, title=news_title)
await self._save_record(db, user, news_id, news_title, "like", None, 0, success, err)
if success:
interactions_done.append("like")
await self._incr_total(db, user_id)
# ③ 收藏(每篇文章每用户每天只收藏一次)
if "collect" not in done_on_this and random.random() < collect_prob:
success, err = await news_service.collect_news(db, user, news_id, org_id=article_org_id, to_user_id=news_author, title=news_title)
await self._save_record(db, user, news_id, news_title, "collect", None, 0, success, err)
if success:
interactions_done.append("collect")
# ④ 转发(每篇文章每用户每天只转发一次)
if "forward" not in done_on_this and random.random() < forward_prob:
success, err = await news_service.forward_news(db, user, news_id)
await self._save_record(db, user, news_id, news_title, "forward", None, 0, success, err)
if success:
interactions_done.append("forward")
await self._incr_total(db, user_id)
# ⑤ 评论/回复逻辑(去重:已评论过的文章改为回复他人评论)
if can_comment and personality:
style_prompt = personality.comment_style_prompt or ""
safe_word_max = min(personality.word_count_max, 80)
if already_commented_this:
# 已评论过此文章 → 改为回复其他用户的评论(虚拟用户互动)
if random.random() < reply_prob:
existing = await news_service.get_comments(db, user, news_id)
if existing:
# 优先回复虚拟用户的评论(促进互动),过滤掉自己的评论
from app.core.redis_client import get_session as _gs
my_sess = await _gs(user.id)
my_uid = my_sess.get("platform_uid", "") if my_sess else ""
others = [c for c in existing
if str(c.get("userId") or c.get("createUser") or "") != my_uid]
if others:
target = random.choice(others)
cid = str(target.get("id") or target.get("commentId") or "")
parent_content = target.get("content") or ""
if cid:
reply_text, r_tokens = await ai_service.generate_reply(
db, news_title, parent_content,
style_prompt,
personality.word_count_min,
safe_word_max
)
if reply_text:
r_ok, r_err = await news_service.post_reply(
db, user, news_id, cid, reply_text
)
await self._save_record(
db, user, news_id, news_title, "reply",
reply_text, r_tokens, r_ok, r_err,
parent_comment_id=cid
)
if r_ok:
interactions_done.append("reply")
logger.info(f"💬 {user.account} 回复了已评论文章的评论(去重逻辑)")
else:
# 未评论过此文章 → 正常发评论
if random.random() < comment_prob:
comment_text, tokens = await ai_service.generate_comment(
db, news_title, news_content,
style_prompt, personality.word_count_min, safe_word_max
)
if comment_text:
success, err = await news_service.post_comment(
db, user, news_id, news_title, comment_text,
news_author_id=news_author, org_id=article_org_id
)
await self._save_record(
db, user, news_id, news_title, "comment",
comment_text, tokens, success, err
)
if success:
interactions_done.append("comment")
await db.execute(
update(VirtualUser).where(VirtualUser.id == user_id).values(
today_comment_count=VirtualUser.today_comment_count + 1,
total_interactions=VirtualUser.total_interactions + 1,
last_interact_at=datetime.now()
)
)
# ⑥ 评论成功后,随机回复其他用户的评论(互动链)
if random.random() < reply_prob:
existing = await news_service.get_comments(db, user, news_id)
if existing:
from app.core.redis_client import get_session as _gs2
my_sess2 = await _gs2(user.id)
my_uid2 = my_sess2.get("platform_uid", "") if my_sess2 else ""
others2 = [c for c in existing
if str(c.get("userId") or c.get("createUser") or "") != my_uid2]
if others2:
target2 = random.choice(others2)
cid2 = str(target2.get("id") or target2.get("commentId") or "")
parent_content2 = target2.get("content") or ""
if cid2:
reply_text2, r_tokens2 = await ai_service.generate_reply(
db, news_title, parent_content2,
style_prompt,
personality.word_count_min,
safe_word_max
)
if reply_text2:
r_ok2, r_err2 = await news_service.post_reply(
db, user, news_id, cid2, reply_text2
)
await self._save_record(
db, user, news_id, news_title, "reply",
reply_text2, r_tokens2, r_ok2, r_err2,
parent_comment_id=cid2
)
if r_ok2:
interactions_done.append("reply")
await db.commit()
logger.info(f"👤 {user.account} 互动完成: {interactions_done} [新闻: {news_title[:20]}]")
except Exception as e:
logger.error(f"用户 {user_id} 互动异常: {e}")
async def _incr_total(self, db, user_id: int):
await db.execute(
update(VirtualUser).where(VirtualUser.id == user_id).values(
total_interactions=VirtualUser.total_interactions + 1,
last_interact_at=datetime.now()
)
)
async def _save_record(
self, db, user: VirtualUser, article_id: str, article_title: str,
interact_type: str, content: Optional[str], tokens: int,
success: bool, error_msg: str, parent_comment_id: str = None,
platform_record_id: str = None
):
from app.core.redis_client import get_session
session = await get_session(user.id)
session_id = session.get("session_id") if session else None
record = InteractionRecord(
user_id=user.id,
user_nickname=user.nickname,
user_account=user.account,
article_id=article_id,
article_title=article_title,
interact_type=interact_type,
content=content,
parent_comment_id=parent_comment_id,
platform_record_id=platform_record_id,
session_id=session_id,
token_consumed=tokens,
status=1 if success else 2,
error_msg=error_msg or None,
executed_at=datetime.now(),
)
db.add(record)
async def _get_config_from_db(self, db, key: str, default: str = "") -> str:
result = await db.execute(select(SystemConfig).where(SystemConfig.config_key == key))
cfg = result.scalar_one_or_none()
return cfg.config_value if cfg else default
async def _daily_reset(self):
"""每日零点重置计数"""
async with AsyncSessionLocal() as db:
await db.execute(
update(VirtualUser).values(
today_comment_count=0,
today_like_count=0
)
)
# 重置Token限额标志
result = await db.execute(
select(SystemConfig).where(SystemConfig.config_key == "token_limit_reached")
)
cfg = result.scalar_one_or_none()
if cfg:
cfg.config_value = "false"
await db.commit()
logger.info("每日计数重置完成")
scheduler_service = SchedulerService()