"""调度服务 - 定时自动互动、会话校验""" import random import asyncio from datetime import datetime, date from typing import Optional from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.interval import IntervalTrigger from sqlalchemy import select, update, func from app.core.database import AsyncSessionLocal from app.core.logger import logger from app.models import VirtualUser, UserPersonality, InteractionRecord, SystemConfig class SchedulerService: def __init__(self): self.scheduler = AsyncIOScheduler(timezone="Asia/Shanghai") self._running = False async def run_once_now(self, db=None): """立即执行一次互动,不受时间段限制""" from sqlalchemy import select from app.core.database import AsyncSessionLocal logger.info("⚡ 立即触发互动任务") async with AsyncSessionLocal() as session: result_r = await session.execute( select(VirtualUser).where( VirtualUser.status == 2, VirtualUser.is_enabled == 1, ) ) users = result_r.scalars().all() if not users: return {"message": "没有已登录的用户", "triggered": 0} import random selected = random.sample(users, min(5, len(users))) import asyncio tasks = [self._execute_user_interaction(u.id) for u in selected] await asyncio.gather(*tasks, return_exceptions=True) return {"triggered": len(selected), "users": [u.account for u in selected]} async def start(self): if self._running: return # 会话校验:每10分钟 self.scheduler.add_job( self._check_sessions, IntervalTrigger(minutes=10), id="check_sessions", replace_existing=True ) # 互动任务:每5分钟检查一次(内部判断是否在活跃时间段) self.scheduler.add_job( self._run_interactions, IntervalTrigger(minutes=5), id="run_interactions", replace_existing=True ) # 每日零点重置计数 self.scheduler.add_job( self._daily_reset, "cron", hour=16, minute=0, # 北京时间 00:00 = UTC 16:00 id="daily_reset", replace_existing=True ) self.scheduler.start() self._running = True logger.info("调度器已启动") # 记录启动时间 async with AsyncSessionLocal() as db: await self._set_config(db, "system_start_time", datetime.now().isoformat()) async def stop(self): if self.scheduler.running: self.scheduler.shutdown(wait=False) self._running = False async def _get_config(self, db, key: str, default=None): result = await db.execute(select(SystemConfig).where(SystemConfig.config_key == key)) cfg = result.scalar_one_or_none() return cfg.config_value if cfg else default async def _set_config(self, db, key: str, value: str): result = await db.execute(select(SystemConfig).where(SystemConfig.config_key == key)) cfg = result.scalar_one_or_none() if cfg: cfg.config_value = value else: db.add(SystemConfig(config_key=key, config_value=value)) await db.commit() async def _check_sessions(self): """定时校验登录状态""" from app.services.news_service import news_service async with AsyncSessionLocal() as db: result = await db.execute( select(VirtualUser).where(VirtualUser.status == 2, VirtualUser.is_enabled == 1) ) users = result.scalars().all() for user in users: try: valid = await news_service.check_session(db, user) if not valid: logger.warning(f"用户 {user.account} 会话失效,尝试重登") await news_service.login(db, user) except Exception as e: logger.error(f"会话校验异常 {user.account}: {e}") async def _run_interactions(self): """执行互动任务""" async with AsyncSessionLocal() as db: # 检查调度器开关 enabled = await self._get_config(db, "scheduler_enabled", "true") if enabled != "true": return # 检查Token限额 token_limited = await self._get_config(db, "token_limit_reached", "false") if token_limited == "true": return # 检查互动时间段(北京时间 UTC+8) from datetime import timezone, timedelta tz_beijing = timezone(timedelta(hours=8)) now_bj = datetime.now(tz_beijing) now_time = now_bj.strftime("%H:%M") start_str = await self._get_config(db, "interact_time_start", "08:00") end_str = await self._get_config(db, "interact_time_end", "22:00") if not (start_str <= now_time <= end_str): logger.debug(f"[调度] 当前北京时间 {now_time} 不在互动时段 {start_str}-{end_str}") return # 获取最小互动间隔(秒) min_interval = int(await self._get_config(db, "interact_min_interval", "300")) # 获取最大并发 max_concurrent = int(await self._get_config(db, "max_concurrent_users", "5")) # 获取所有已登录、启用的用户(不加 LIMIT,确保所有用户公平参与) result = await db.execute( select(VirtualUser).where( VirtualUser.status == 2, VirtualUser.is_enabled == 1, ) ) all_users = result.scalars().all() # 没有已登录用户时,尝试登录未登录用户 if not all_users: await self._try_login_users(db) return # 检查互动间隔:过滤掉最近 min_interval 秒内已互动的用户 now_dt = datetime.now() eligible = [] for u in all_users: if u.last_interact_at is None: eligible.append(u) else: elapsed = (now_dt - u.last_interact_at).total_seconds() if elapsed >= min_interval: eligible.append(u) if not eligible: logger.debug(f"[调度] 所有 {len(all_users)} 个用户在 {min_interval}s 内已互动,跳过本次") return # 按最后互动时间升序排序:最久没互动的用户优先 eligible.sort(key=lambda u: u.last_interact_at or datetime.min) # 从符合条件的用户中随机选取 max_concurrent 个执行(保证公平轮转) batch_size = max_concurrent if max_concurrent > 0 else len(eligible) # 优先选最久未互动的用户(前1/3),其余随机补充 priority_size = max(1, batch_size // 3) priority_users = eligible[:priority_size] rest_users = eligible[priority_size:] random.shuffle(rest_users) selected = priority_users + rest_users[:max(0, batch_size - priority_size)] logger.info( f"[调度] 共 {len(all_users)} 个用户,{len(eligible)} 个满足间隔," f"本轮选取 {len(selected)} 个执行互动" ) for user in selected: asyncio.create_task(self._execute_user_interaction(user.id)) async def _try_login_users(self, db): """尝试登录未登录的用户""" from app.services.news_service import news_service result = await db.execute( select(VirtualUser).where( VirtualUser.status.in_([0, 3]), VirtualUser.is_enabled == 1 ).limit(3) ) users = result.scalars().all() for user in users: try: await news_service.login(db, user) await asyncio.sleep(2) except Exception as e: logger.error(f"自动登录失败 {user.account}: {e}") async def _execute_user_interaction(self, user_id: int): """执行单用户互动 - 基于真实接口""" from app.services.news_service import news_service from app.services.ai_service import ai_service async with AsyncSessionLocal() as db: try: user_result = await db.execute(select(VirtualUser).where(VirtualUser.id == user_id)) user = user_result.scalar_one_or_none() if not user or user.status != 2: return # 检查今日评论限额 can_comment = True if user.today_comment_count >= user.daily_comment_limit: can_comment = False logger.info(f'用户 ' + user.account + ' 今日评论已达上限,仍执行点赞/收藏/转发') # 获取人格 from app.models import UserPersonality p_result = await db.execute( select(UserPersonality).where(UserPersonality.user_id == user_id) ) personality = p_result.scalar_one_or_none() interest_tags = personality.interest_tags if personality else [] # 获取新闻列表(基于接口 GET /news/list) articles = await news_service.get_news_list( db, user, count=5, interest_tags=interest_tags ) if not articles: # 尝试从 session 获取 org_id 再试一次 from app.core.redis_client import get_session as _get_sess sess = await _get_sess(user.id) org_from_sess = sess.get("org_id", "") if sess else "" if org_from_sess: articles = await news_service.get_news_list( db, user, count=5, interest_tags=interest_tags ) if not articles: logger.warning( f"用户 {user.account} 获取新闻列表为空 " f"(orgId={await news_service._cfg(db, 'platform_org_id', '')})" ) return # ── 文章去重 + 热度加权选取 ───────────────────────────────── # 查询今日已互动过的文章(所有类型),避免重复互动同一篇 from sqlalchemy import func as _func from datetime import date as _date today_str = datetime.now().date() dup_result = await db.execute( select( InteractionRecord.article_id, InteractionRecord.interact_type, ).where( InteractionRecord.user_id == user_id, InteractionRecord.status == 1, _func.date(InteractionRecord.executed_at) == today_str, ) ) # {article_id: set of interact_types already done today} today_done: dict = {} for r in dup_result.all(): today_done.setdefault(r[0], set()).add(r[1]) already_commented = {aid for aid, types in today_done.items() if "comment" in types} # 按热度加权:commentNum + praiseNum + readNum 越高权重越大 # 同时优先未评论过的文章 def _article_weight(a): aid = str(a.get("recordId") or a.get("id", "")) base = ( int(a.get("commentNum") or 0) * 3 + int(a.get("praiseNum") or 0) * 2 + int(a.get("readNum") or 0) ) # 已评论的文章权重大幅降低(但不为0,还可以点赞/收藏) penalty = 0.1 if aid in already_commented else 1.0 return max(1, base) * penalty weights = [_article_weight(a) for a in articles] article = random.choices(articles, weights=weights, k=1)[0] # 判断是否已评论此文章(用于后续逻辑) news_id = str(article.get("recordId") or article.get("id", "")) already_commented_this = "comment" in today_done.get(news_id, set()) # 接口返回字段: id/newsTitle/content/digest/createUser # 广场接口字段:recordId=新闻实际ID, id=广场记录ID, title=标题 news_title = article.get("title") or article.get("newsTitle") or "未知文章" news_content = article.get("content") or article.get("digest") or news_title news_author = str(article.get("createUser") or "") # 从广场数据中顺带获取 orgId article_org_id = str(article.get("orgId") or "") if not news_id: return # 读取互动概率 comment_prob = float(await self._get_config_from_db(db, "comment_probability", "0.4")) reply_prob = float(await self._get_config_from_db(db, "reply_probability", "0.2")) like_prob = float(await self._get_config_from_db(db, "like_probability", "0.6")) collect_prob = float(await self._get_config_from_db(db, "collect_probability", "0.3")) forward_prob = float(await self._get_config_from_db(db, "forward_probability", "0.15")) interactions_done = [] # ① 先记录阅读(每次必做,模拟真实用户打开文章) await news_service.read_news(db, user, news_id) # 今日已对此文章做过的互动类型 done_on_this = today_done.get(news_id, set()) # ② 点赞(每篇文章每用户每天只点赞一次) if "like" not in done_on_this and random.random() < like_prob: success, err = await news_service.like_news(db, user, news_id, org_id=article_org_id, to_user_id=news_author, title=news_title) await self._save_record(db, user, news_id, news_title, "like", None, 0, success, err) if success: interactions_done.append("like") await self._incr_total(db, user_id) # ③ 收藏(每篇文章每用户每天只收藏一次) if "collect" not in done_on_this and random.random() < collect_prob: success, err = await news_service.collect_news(db, user, news_id, org_id=article_org_id, to_user_id=news_author, title=news_title) await self._save_record(db, user, news_id, news_title, "collect", None, 0, success, err) if success: interactions_done.append("collect") # ④ 转发(每篇文章每用户每天只转发一次) if "forward" not in done_on_this and random.random() < forward_prob: success, err = await news_service.forward_news(db, user, news_id) await self._save_record(db, user, news_id, news_title, "forward", None, 0, success, err) if success: interactions_done.append("forward") await self._incr_total(db, user_id) # ⑤ 评论/回复逻辑(去重:已评论过的文章改为回复他人评论) if can_comment and personality: style_prompt = personality.comment_style_prompt or "" safe_word_max = min(personality.word_count_max, 80) if already_commented_this: # 已评论过此文章 → 改为回复其他用户的评论(虚拟用户互动) if random.random() < reply_prob: existing = await news_service.get_comments(db, user, news_id) if existing: # 优先回复虚拟用户的评论(促进互动),过滤掉自己的评论 from app.core.redis_client import get_session as _gs my_sess = await _gs(user.id) my_uid = my_sess.get("platform_uid", "") if my_sess else "" others = [c for c in existing if str(c.get("userId") or c.get("createUser") or "") != my_uid] if others: target = random.choice(others) cid = str(target.get("id") or target.get("commentId") or "") parent_content = target.get("content") or "" if cid: reply_text, r_tokens = await ai_service.generate_reply( db, news_title, parent_content, style_prompt, personality.word_count_min, safe_word_max ) if reply_text: r_ok, r_err = await news_service.post_reply( db, user, news_id, cid, reply_text ) await self._save_record( db, user, news_id, news_title, "reply", reply_text, r_tokens, r_ok, r_err, parent_comment_id=cid ) if r_ok: interactions_done.append("reply") logger.info(f"💬 {user.account} 回复了已评论文章的评论(去重逻辑)") else: # 未评论过此文章 → 正常发评论 if random.random() < comment_prob: comment_text, tokens = await ai_service.generate_comment( db, news_title, news_content, style_prompt, personality.word_count_min, safe_word_max ) if comment_text: success, err = await news_service.post_comment( db, user, news_id, news_title, comment_text, news_author_id=news_author, org_id=article_org_id ) await self._save_record( db, user, news_id, news_title, "comment", comment_text, tokens, success, err ) if success: interactions_done.append("comment") await db.execute( update(VirtualUser).where(VirtualUser.id == user_id).values( today_comment_count=VirtualUser.today_comment_count + 1, total_interactions=VirtualUser.total_interactions + 1, last_interact_at=datetime.now() ) ) # ⑥ 评论成功后,随机回复其他用户的评论(互动链) if random.random() < reply_prob: existing = await news_service.get_comments(db, user, news_id) if existing: from app.core.redis_client import get_session as _gs2 my_sess2 = await _gs2(user.id) my_uid2 = my_sess2.get("platform_uid", "") if my_sess2 else "" others2 = [c for c in existing if str(c.get("userId") or c.get("createUser") or "") != my_uid2] if others2: target2 = random.choice(others2) cid2 = str(target2.get("id") or target2.get("commentId") or "") parent_content2 = target2.get("content") or "" if cid2: reply_text2, r_tokens2 = await ai_service.generate_reply( db, news_title, parent_content2, style_prompt, personality.word_count_min, safe_word_max ) if reply_text2: r_ok2, r_err2 = await news_service.post_reply( db, user, news_id, cid2, reply_text2 ) await self._save_record( db, user, news_id, news_title, "reply", reply_text2, r_tokens2, r_ok2, r_err2, parent_comment_id=cid2 ) if r_ok2: interactions_done.append("reply") await db.commit() logger.info(f"👤 {user.account} 互动完成: {interactions_done} [新闻: {news_title[:20]}]") except Exception as e: logger.error(f"用户 {user_id} 互动异常: {e}") async def _incr_total(self, db, user_id: int): await db.execute( update(VirtualUser).where(VirtualUser.id == user_id).values( total_interactions=VirtualUser.total_interactions + 1, last_interact_at=datetime.now() ) ) async def _save_record( self, db, user: VirtualUser, article_id: str, article_title: str, interact_type: str, content: Optional[str], tokens: int, success: bool, error_msg: str, parent_comment_id: str = None, platform_record_id: str = None ): from app.core.redis_client import get_session session = await get_session(user.id) session_id = session.get("session_id") if session else None record = InteractionRecord( user_id=user.id, user_nickname=user.nickname, user_account=user.account, article_id=article_id, article_title=article_title, interact_type=interact_type, content=content, parent_comment_id=parent_comment_id, platform_record_id=platform_record_id, session_id=session_id, token_consumed=tokens, status=1 if success else 2, error_msg=error_msg or None, executed_at=datetime.now(), ) db.add(record) async def _get_config_from_db(self, db, key: str, default: str = "") -> str: result = await db.execute(select(SystemConfig).where(SystemConfig.config_key == key)) cfg = result.scalar_one_or_none() return cfg.config_value if cfg else default async def _daily_reset(self): """每日零点重置计数""" async with AsyncSessionLocal() as db: await db.execute( update(VirtualUser).values( today_comment_count=0, today_like_count=0 ) ) # 重置Token限额标志 result = await db.execute( select(SystemConfig).where(SystemConfig.config_key == "token_limit_reached") ) cfg = result.scalar_one_or_none() if cfg: cfg.config_value = "false" await db.commit() logger.info("每日计数重置完成") scheduler_service = SchedulerService()