Files
huihuiSquare/backend/app/services/scheduler.py
stefanfeng 7203f04be6 feat: 评论去重 + 热度/新鲜度加权选文
评论去重逻辑:
- 查询今日已评论的文章ID,选文时已评论的文章权重降为10%
- 若选中已评论文章:改为回复其他用户的评论(虚拟用户互动链)
- 若选中未评论文章:正常发新评论,评论成功后随机回复他人评论

热度+新鲜度加权选文规则:
- 热度分 = commentNum×3 + praiseNum×2 + readNum×1
- 新鲜度 = 72小时内的新文章获得最高3倍加成,随时间线性衰减
- 综合权重 = (热度分+1) × 新鲜度,确保真实用户互动多的新文章优先被虚拟用户关注
2026-04-02 17:33:07 +08:00

478 lines
24 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""调度服务 - 定时自动互动、会话校验"""
import random
import asyncio
from datetime import datetime, date
from typing import Optional
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger
from sqlalchemy import select, update, func
from app.core.database import AsyncSessionLocal
from app.core.logger import logger
from app.models import VirtualUser, UserPersonality, InteractionRecord, SystemConfig
class SchedulerService:
def __init__(self):
self.scheduler = AsyncIOScheduler(timezone="Asia/Shanghai")
self._running = False
async def run_once_now(self, db=None):
"""立即执行一次互动,不受时间段限制"""
from sqlalchemy import select
from app.core.database import AsyncSessionLocal
logger.info("⚡ 立即触发互动任务")
async with AsyncSessionLocal() as session:
result_r = await session.execute(
select(VirtualUser).where(
VirtualUser.status == 2,
VirtualUser.is_enabled == 1,
)
)
users = result_r.scalars().all()
if not users:
return {"message": "没有已登录的用户", "triggered": 0}
import random
selected = random.sample(users, min(5, len(users)))
import asyncio
tasks = [self._execute_user_interaction(u.id) for u in selected]
await asyncio.gather(*tasks, return_exceptions=True)
return {"triggered": len(selected), "users": [u.account for u in selected]}
async def start(self):
if self._running:
return
# 会话校验每10分钟
self.scheduler.add_job(
self._check_sessions, IntervalTrigger(minutes=10),
id="check_sessions", replace_existing=True
)
# 互动任务每5分钟检查一次内部判断是否在活跃时间段
self.scheduler.add_job(
self._run_interactions, IntervalTrigger(minutes=5),
id="run_interactions", replace_existing=True
)
# 每日零点重置计数
self.scheduler.add_job(
self._daily_reset, "cron", hour=16, minute=0, # 北京时间 00:00 = UTC 16:00
id="daily_reset", replace_existing=True
)
self.scheduler.start()
self._running = True
logger.info("调度器已启动")
# 记录启动时间
async with AsyncSessionLocal() as db:
await self._set_config(db, "system_start_time", datetime.now().isoformat())
async def stop(self):
if self.scheduler.running:
self.scheduler.shutdown(wait=False)
self._running = False
async def _get_config(self, db, key: str, default=None):
result = await db.execute(select(SystemConfig).where(SystemConfig.config_key == key))
cfg = result.scalar_one_or_none()
return cfg.config_value if cfg else default
async def _set_config(self, db, key: str, value: str):
result = await db.execute(select(SystemConfig).where(SystemConfig.config_key == key))
cfg = result.scalar_one_or_none()
if cfg:
cfg.config_value = value
else:
db.add(SystemConfig(config_key=key, config_value=value))
await db.commit()
async def _check_sessions(self):
"""定时校验登录状态"""
from app.services.news_service import news_service
async with AsyncSessionLocal() as db:
result = await db.execute(
select(VirtualUser).where(VirtualUser.status == 2, VirtualUser.is_enabled == 1)
)
users = result.scalars().all()
for user in users:
try:
valid = await news_service.check_session(db, user)
if not valid:
logger.warning(f"用户 {user.account} 会话失效,尝试重登")
await news_service.login(db, user)
except Exception as e:
logger.error(f"会话校验异常 {user.account}: {e}")
async def _run_interactions(self):
"""执行互动任务"""
async with AsyncSessionLocal() as db:
# 检查调度器开关
enabled = await self._get_config(db, "scheduler_enabled", "true")
if enabled != "true":
return
# 检查Token限额
token_limited = await self._get_config(db, "token_limit_reached", "false")
if token_limited == "true":
return
# 检查互动时间段(北京时间 UTC+8
from datetime import timezone, timedelta
tz_beijing = timezone(timedelta(hours=8))
now_bj = datetime.now(tz_beijing)
now_time = now_bj.strftime("%H:%M")
start_str = await self._get_config(db, "interact_time_start", "08:00")
end_str = await self._get_config(db, "interact_time_end", "22:00")
if not (start_str <= now_time <= end_str):
logger.debug(f"[调度] 当前北京时间 {now_time} 不在互动时段 {start_str}-{end_str}")
return
# 获取最小互动间隔(秒)
min_interval = int(await self._get_config(db, "interact_min_interval", "300"))
# 获取最大并发
max_concurrent = int(await self._get_config(db, "max_concurrent_users", "5"))
# 获取已登录、启用的用户
query = select(VirtualUser).where(
VirtualUser.status == 2,
VirtualUser.is_enabled == 1,
)
if max_concurrent > 0:
query = query.limit(max_concurrent)
result = await db.execute(query)
all_users = result.scalars().all()
# 没有已登录用户时,尝试登录未登录用户
if not all_users:
await self._try_login_users(db)
return
# 检查互动间隔:过滤掉最近 min_interval 秒内已互动的用户
now_utc = datetime.now()
eligible = []
for u in all_users:
if u.last_interact_at is None:
eligible.append(u)
else:
elapsed = (now_utc - u.last_interact_at).total_seconds()
if elapsed >= min_interval:
eligible.append(u)
if not eligible:
logger.debug(f"[调度] 所有用户在 {min_interval}s 内已互动,跳过本次")
return
logger.info(f"[调度] {len(eligible)}/{len(all_users)} 个用户满足间隔要求,开始互动")
# 随机选取用户执行互动
for user in eligible:
if random.random() < 0.6:
asyncio.create_task(self._execute_user_interaction(user.id))
async def _try_login_users(self, db):
"""尝试登录未登录的用户"""
from app.services.news_service import news_service
result = await db.execute(
select(VirtualUser).where(
VirtualUser.status.in_([0, 3]),
VirtualUser.is_enabled == 1
).limit(3)
)
users = result.scalars().all()
for user in users:
try:
await news_service.login(db, user)
await asyncio.sleep(2)
except Exception as e:
logger.error(f"自动登录失败 {user.account}: {e}")
async def _execute_user_interaction(self, user_id: int):
"""执行单用户互动 - 基于真实接口"""
from app.services.news_service import news_service
from app.services.ai_service import ai_service
async with AsyncSessionLocal() as db:
try:
user_result = await db.execute(select(VirtualUser).where(VirtualUser.id == user_id))
user = user_result.scalar_one_or_none()
if not user or user.status != 2:
return
# 检查今日评论限额
can_comment = True
if user.today_comment_count >= user.daily_comment_limit:
can_comment = False
logger.info(f'用户 ' + user.account + ' 今日评论已达上限,仍执行点赞/收藏/转发')
# 获取人格
from app.models import UserPersonality
p_result = await db.execute(
select(UserPersonality).where(UserPersonality.user_id == user_id)
)
personality = p_result.scalar_one_or_none()
interest_tags = personality.interest_tags if personality else []
# 获取新闻列表(基于接口 GET /news/list
articles = await news_service.get_news_list(
db, user, count=5, interest_tags=interest_tags
)
if not articles:
# 尝试从 session 获取 org_id 再试一次
from app.core.redis_client import get_session as _get_sess
sess = await _get_sess(user.id)
org_from_sess = sess.get("org_id", "") if sess else ""
if org_from_sess:
articles = await news_service.get_news_list(
db, user, count=5, interest_tags=interest_tags
)
if not articles:
logger.warning(
f"用户 {user.account} 获取新闻列表为空 "
f"(orgId={await news_service._cfg(db, 'platform_org_id', '')})"
)
return
# ── 文章去重 + 热度加权选取 ─────────────────────────────────
# 查询今日已评论过的文章ID避免重复评论同一篇
from sqlalchemy import func as _func
from datetime import date as _date
today_str = datetime.now().date()
dup_result = await db.execute(
select(InteractionRecord.article_id).where(
InteractionRecord.user_id == user_id,
InteractionRecord.interact_type == "comment",
InteractionRecord.status == 1,
_func.date(InteractionRecord.executed_at) == today_str,
)
)
already_commented = {r[0] for r in dup_result.all()}
# 按热度加权commentNum + praiseNum + readNum 越高权重越大
# 同时优先未评论过的文章
def _article_weight(a):
aid = str(a.get("recordId") or a.get("id", ""))
base = (
int(a.get("commentNum") or 0) * 3 +
int(a.get("praiseNum") or 0) * 2 +
int(a.get("readNum") or 0)
)
# 已评论的文章权重大幅降低但不为0还可以点赞/收藏)
penalty = 0.1 if aid in already_commented else 1.0
return max(1, base) * penalty
weights = [_article_weight(a) for a in articles]
article = random.choices(articles, weights=weights, k=1)[0]
# 判断是否已评论此文章(用于后续逻辑)
news_id = str(article.get("recordId") or article.get("id", ""))
already_commented_this = news_id in already_commented
# 接口返回字段: id/newsTitle/content/digest/createUser
# 广场接口字段recordId=新闻实际ID, id=广场记录ID, title=标题
news_title = article.get("title") or article.get("newsTitle") or "未知文章"
news_content = article.get("content") or article.get("digest") or news_title
news_author = str(article.get("createUser") or "")
# 从广场数据中顺带获取 orgId
article_org_id = str(article.get("orgId") or "")
if not news_id:
return
# 读取互动概率
comment_prob = float(await self._get_config_from_db(db, "comment_probability", "0.4"))
reply_prob = float(await self._get_config_from_db(db, "reply_probability", "0.2"))
like_prob = float(await self._get_config_from_db(db, "like_probability", "0.6"))
collect_prob = float(await self._get_config_from_db(db, "collect_probability", "0.3"))
forward_prob = float(await self._get_config_from_db(db, "forward_probability", "0.15"))
interactions_done = []
# ① 先记录阅读(每次必做,模拟真实用户打开文章)
await news_service.read_news(db, user, news_id)
# ② 点赞
if random.random() < like_prob:
success, err = await news_service.like_news(db, user, news_id, org_id=article_org_id, to_user_id=news_author, title=news_title)
await self._save_record(db, user, news_id, news_title, "like", None, 0, success, err)
if success:
interactions_done.append("like")
await self._incr_total(db, user_id)
# ③ 收藏(阅读+点赞组合模拟)
if random.random() < collect_prob:
success, err = await news_service.collect_news(db, user, news_id, org_id=article_org_id, to_user_id=news_author, title=news_title)
await self._save_record(db, user, news_id, news_title, "collect", None, 0, success, err)
if success:
interactions_done.append("collect")
# ④ 转发(调用 /points/forward/news/{orgId}
if random.random() < forward_prob:
success, err = await news_service.forward_news(db, user, news_id)
await self._save_record(db, user, news_id, news_title, "forward", None, 0, success, err)
if success:
interactions_done.append("forward")
await self._incr_total(db, user_id)
# ⑤ 评论/回复逻辑(去重:已评论过的文章改为回复他人评论)
if can_comment and personality:
style_prompt = personality.comment_style_prompt or ""
safe_word_max = min(personality.word_count_max, 80)
if already_commented_this:
# 已评论过此文章 → 改为回复其他用户的评论(虚拟用户互动)
if random.random() < reply_prob:
existing = await news_service.get_comments(db, user, news_id)
if existing:
# 优先回复虚拟用户的评论(促进互动),过滤掉自己的评论
from app.core.redis_client import get_session as _gs
my_sess = await _gs(user.id)
my_uid = my_sess.get("platform_uid", "") if my_sess else ""
others = [c for c in existing
if str(c.get("userId") or c.get("createUser") or "") != my_uid]
if others:
target = random.choice(others)
cid = str(target.get("id") or target.get("commentId") or "")
parent_content = target.get("content") or ""
if cid:
reply_text, r_tokens = await ai_service.generate_reply(
db, news_title, parent_content,
style_prompt,
personality.word_count_min,
safe_word_max
)
if reply_text:
r_ok, r_err = await news_service.post_reply(
db, user, news_id, cid, reply_text
)
await self._save_record(
db, user, news_id, news_title, "reply",
reply_text, r_tokens, r_ok, r_err,
parent_comment_id=cid
)
if r_ok:
interactions_done.append("reply")
logger.info(f"💬 {user.account} 回复了已评论文章的评论(去重逻辑)")
else:
# 未评论过此文章 → 正常发评论
if random.random() < comment_prob:
comment_text, tokens = await ai_service.generate_comment(
db, news_title, news_content,
style_prompt, personality.word_count_min, safe_word_max
)
if comment_text:
success, err = await news_service.post_comment(
db, user, news_id, news_title, comment_text,
news_author_id=news_author, org_id=article_org_id
)
await self._save_record(
db, user, news_id, news_title, "comment",
comment_text, tokens, success, err
)
if success:
interactions_done.append("comment")
await db.execute(
update(VirtualUser).where(VirtualUser.id == user_id).values(
today_comment_count=VirtualUser.today_comment_count + 1,
total_interactions=VirtualUser.total_interactions + 1,
last_interact_at=datetime.now()
)
)
# ⑥ 评论成功后,随机回复其他用户的评论(互动链)
if random.random() < reply_prob:
existing = await news_service.get_comments(db, user, news_id)
if existing:
from app.core.redis_client import get_session as _gs2
my_sess2 = await _gs2(user.id)
my_uid2 = my_sess2.get("platform_uid", "") if my_sess2 else ""
others2 = [c for c in existing
if str(c.get("userId") or c.get("createUser") or "") != my_uid2]
if others2:
target2 = random.choice(others2)
cid2 = str(target2.get("id") or target2.get("commentId") or "")
parent_content2 = target2.get("content") or ""
if cid2:
reply_text2, r_tokens2 = await ai_service.generate_reply(
db, news_title, parent_content2,
style_prompt,
personality.word_count_min,
safe_word_max
)
if reply_text2:
r_ok2, r_err2 = await news_service.post_reply(
db, user, news_id, cid2, reply_text2
)
await self._save_record(
db, user, news_id, news_title, "reply",
reply_text2, r_tokens2, r_ok2, r_err2,
parent_comment_id=cid2
)
if r_ok2:
interactions_done.append("reply")
await db.commit()
logger.info(f"👤 {user.account} 互动完成: {interactions_done} [新闻: {news_title[:20]}]")
except Exception as e:
logger.error(f"用户 {user_id} 互动异常: {e}")
async def _incr_total(self, db, user_id: int):
await db.execute(
update(VirtualUser).where(VirtualUser.id == user_id).values(
total_interactions=VirtualUser.total_interactions + 1,
last_interact_at=datetime.now()
)
)
async def _save_record(
self, db, user: VirtualUser, article_id: str, article_title: str,
interact_type: str, content: Optional[str], tokens: int,
success: bool, error_msg: str, parent_comment_id: str = None,
platform_record_id: str = None
):
from app.core.redis_client import get_session
session = await get_session(user.id)
session_id = session.get("session_id") if session else None
record = InteractionRecord(
user_id=user.id,
user_nickname=user.nickname,
user_account=user.account,
article_id=article_id,
article_title=article_title,
interact_type=interact_type,
content=content,
parent_comment_id=parent_comment_id,
platform_record_id=platform_record_id,
session_id=session_id,
token_consumed=tokens,
status=1 if success else 2,
error_msg=error_msg or None,
executed_at=datetime.now(),
)
db.add(record)
async def _get_config_from_db(self, db, key: str, default: str = "") -> str:
result = await db.execute(select(SystemConfig).where(SystemConfig.config_key == key))
cfg = result.scalar_one_or_none()
return cfg.config_value if cfg else default
async def _daily_reset(self):
"""每日零点重置计数"""
async with AsyncSessionLocal() as db:
await db.execute(
update(VirtualUser).values(
today_comment_count=0,
today_like_count=0
)
)
# 重置Token限额标志
result = await db.execute(
select(SystemConfig).where(SystemConfig.config_key == "token_limit_reached")
)
cfg = result.scalar_one_or_none()
if cfg:
cfg.config_value = "false"
await db.commit()
logger.info("每日计数重置完成")
scheduler_service = SchedulerService()