"""调度服务 - 定时自动互动、会话校验""" import random import asyncio from datetime import datetime, date from typing import Optional from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.interval import IntervalTrigger from sqlalchemy import select, update, func from app.core.database import AsyncSessionLocal from app.core.logger import logger from app.models import VirtualUser, UserPersonality, InteractionRecord, SystemConfig class SchedulerService: def __init__(self): self.scheduler = AsyncIOScheduler(timezone="Asia/Shanghai") self._running = False async def run_once_now(self, db=None): """立即执行一次互动,不受时间段限制""" from sqlalchemy import select from app.core.database import AsyncSessionLocal logger.info("⚡ 立即触发互动任务") async with AsyncSessionLocal() as session: result_r = await session.execute( select(VirtualUser).where( VirtualUser.status == 2, VirtualUser.is_enabled == 1, ) ) users = result_r.scalars().all() if not users: return {"message": "没有已登录的用户", "triggered": 0} import random selected = random.sample(users, min(5, len(users))) import asyncio tasks = [self._execute_user_interaction(u.id) for u in selected] await asyncio.gather(*tasks, return_exceptions=True) return {"triggered": len(selected), "users": [u.account for u in selected]} async def start(self): if self._running: return # 会话校验:每10分钟 self.scheduler.add_job( self._check_sessions, IntervalTrigger(minutes=10), id="check_sessions", replace_existing=True ) # 互动任务:每5分钟检查一次(内部判断是否在活跃时间段) self.scheduler.add_job( self._run_interactions, IntervalTrigger(minutes=5), id="run_interactions", replace_existing=True ) # 每日零点重置计数 self.scheduler.add_job( self._daily_reset, "cron", hour=16, minute=0, # 北京时间 00:00 = UTC 16:00 id="daily_reset", replace_existing=True ) self.scheduler.start() self._running = True logger.info("调度器已启动") # 记录启动时间 async with AsyncSessionLocal() as db: await self._set_config(db, "system_start_time", datetime.now().isoformat()) async def stop(self): if self.scheduler.running: self.scheduler.shutdown(wait=False) self._running = False async def _get_config(self, db, key: str, default=None): result = await db.execute(select(SystemConfig).where(SystemConfig.config_key == key)) cfg = result.scalar_one_or_none() return cfg.config_value if cfg else default async def _set_config(self, db, key: str, value: str): result = await db.execute(select(SystemConfig).where(SystemConfig.config_key == key)) cfg = result.scalar_one_or_none() if cfg: cfg.config_value = value else: db.add(SystemConfig(config_key=key, config_value=value)) await db.commit() async def _check_sessions(self): """定时校验登录状态""" from app.services.news_service import news_service async with AsyncSessionLocal() as db: result = await db.execute( select(VirtualUser).where(VirtualUser.status == 2, VirtualUser.is_enabled == 1) ) users = result.scalars().all() for user in users: try: valid = await news_service.check_session(db, user) if not valid: logger.warning(f"用户 {user.account} 会话失效,尝试重登") await news_service.login(db, user) except Exception as e: logger.error(f"会话校验异常 {user.account}: {e}") async def _run_interactions(self): """执行互动任务""" async with AsyncSessionLocal() as db: # 检查调度器开关 enabled = await self._get_config(db, "scheduler_enabled", "true") if enabled != "true": return # 检查Token限额 token_limited = await self._get_config(db, "token_limit_reached", "false") if token_limited == "true": return # 检查互动时间段(北京时间 UTC+8) from datetime import timezone, timedelta tz_beijing = timezone(timedelta(hours=8)) now_bj = datetime.now(tz_beijing) now_time = now_bj.strftime("%H:%M") start_str = await self._get_config(db, "interact_time_start", "08:00") end_str = await self._get_config(db, "interact_time_end", "22:00") if not (start_str <= now_time <= end_str): logger.debug(f"[调度] 当前北京时间 {now_time} 不在互动时段 {start_str}-{end_str}") return # 获取最小互动间隔(秒) min_interval = int(await self._get_config(db, "interact_min_interval", "300")) # 获取最大并发 max_concurrent = int(await self._get_config(db, "max_concurrent_users", "5")) # 获取已登录、启用的用户 query = select(VirtualUser).where( VirtualUser.status == 2, VirtualUser.is_enabled == 1, ) if max_concurrent > 0: query = query.limit(max_concurrent) result = await db.execute(query) all_users = result.scalars().all() # 没有已登录用户时,尝试登录未登录用户 if not all_users: await self._try_login_users(db) return # 检查互动间隔:过滤掉最近 min_interval 秒内已互动的用户 now_utc = datetime.now() eligible = [] for u in all_users: if u.last_interact_at is None: eligible.append(u) else: elapsed = (now_utc - u.last_interact_at).total_seconds() if elapsed >= min_interval: eligible.append(u) if not eligible: logger.debug(f"[调度] 所有用户在 {min_interval}s 内已互动,跳过本次") return logger.info(f"[调度] {len(eligible)}/{len(all_users)} 个用户满足间隔要求,开始互动") # 随机选取用户执行互动 for user in eligible: if random.random() < 0.6: asyncio.create_task(self._execute_user_interaction(user.id)) async def _try_login_users(self, db): """尝试登录未登录的用户""" from app.services.news_service import news_service result = await db.execute( select(VirtualUser).where( VirtualUser.status.in_([0, 3]), VirtualUser.is_enabled == 1 ).limit(3) ) users = result.scalars().all() for user in users: try: await news_service.login(db, user) await asyncio.sleep(2) except Exception as e: logger.error(f"自动登录失败 {user.account}: {e}") async def _execute_user_interaction(self, user_id: int): """执行单用户互动 - 基于真实接口""" from app.services.news_service import news_service from app.services.ai_service import ai_service async with AsyncSessionLocal() as db: try: user_result = await db.execute(select(VirtualUser).where(VirtualUser.id == user_id)) user = user_result.scalar_one_or_none() if not user or user.status != 2: return # 检查今日评论限额 can_comment = True if user.today_comment_count >= user.daily_comment_limit: can_comment = False logger.info(f'用户 ' + user.account + ' 今日评论已达上限,仍执行点赞/收藏/转发') # 获取人格 from app.models import UserPersonality p_result = await db.execute( select(UserPersonality).where(UserPersonality.user_id == user_id) ) personality = p_result.scalar_one_or_none() interest_tags = personality.interest_tags if personality else [] # 获取新闻列表(基于接口 GET /news/list) articles = await news_service.get_news_list( db, user, count=5, interest_tags=interest_tags ) if not articles: # 尝试从 session 获取 org_id 再试一次 from app.core.redis_client import get_session as _get_sess sess = await _get_sess(user.id) org_from_sess = sess.get("org_id", "") if sess else "" if org_from_sess: articles = await news_service.get_news_list( db, user, count=5, interest_tags=interest_tags ) if not articles: logger.warning( f"用户 {user.account} 获取新闻列表为空 " f"(orgId={await news_service._cfg(db, 'platform_org_id', '')})" ) return article = random.choice(articles) # 接口返回字段: id/newsTitle/content/digest/createUser # 广场接口字段:recordId=新闻实际ID, id=广场记录ID, title=标题 news_id = str(article.get("recordId") or article.get("id", "")) news_title = article.get("title") or article.get("newsTitle") or "未知文章" news_content = article.get("content") or article.get("digest") or news_title news_author = str(article.get("createUser") or "") # 从广场数据中顺带获取 orgId article_org_id = str(article.get("orgId") or "") if not news_id: return # 读取互动概率 comment_prob = float(await self._get_config_from_db(db, "comment_probability", "0.4")) reply_prob = float(await self._get_config_from_db(db, "reply_probability", "0.2")) like_prob = float(await self._get_config_from_db(db, "like_probability", "0.6")) collect_prob = float(await self._get_config_from_db(db, "collect_probability", "0.3")) forward_prob = float(await self._get_config_from_db(db, "forward_probability", "0.15")) interactions_done = [] # ① 先记录阅读(每次必做,模拟真实用户打开文章) await news_service.read_news(db, user, news_id) # ② 点赞 if random.random() < like_prob: success, err = await news_service.like_news(db, user, news_id, org_id=article_org_id, to_user_id=news_author, title=news_title) await self._save_record(db, user, news_id, news_title, "like", None, 0, success, err) if success: interactions_done.append("like") await self._incr_total(db, user_id) # ③ 收藏(阅读+点赞组合模拟) if random.random() < collect_prob: success, err = await news_service.collect_news(db, user, news_id, org_id=article_org_id, to_user_id=news_author, title=news_title) await self._save_record(db, user, news_id, news_title, "collect", None, 0, success, err) if success: interactions_done.append("collect") # ④ 转发(调用 /points/forward/news/{orgId}) if random.random() < forward_prob: success, err = await news_service.forward_news(db, user, news_id) await self._save_record(db, user, news_id, news_title, "forward", None, 0, success, err) if success: interactions_done.append("forward") await self._incr_total(db, user_id) # ⑤ 评论(AI生成内容,调用 POST /message/comment) if can_comment and random.random() < comment_prob and personality: style_prompt = personality.comment_style_prompt or "" # 字数上限最多80字,避免超出 max_tokens 被截断 safe_word_max = min(personality.word_count_max, 80) comment_text, tokens = await ai_service.generate_comment( db, news_title, news_content, style_prompt, personality.word_count_min, safe_word_max ) if comment_text: success, err = await news_service.post_comment( db, user, news_id, news_title, comment_text, news_author_id=news_author, org_id=article_org_id ) await self._save_record( db, user, news_id, news_title, "comment", comment_text, tokens, success, err ) if success: interactions_done.append("comment") await db.execute( update(VirtualUser).where(VirtualUser.id == user_id).values( today_comment_count=VirtualUser.today_comment_count + 1, total_interactions=VirtualUser.total_interactions + 1, last_interact_at=datetime.now() ) ) # ⑥ 回复评论(评论成功后,随机回复别人的评论) if random.random() < reply_prob: existing = await news_service.get_comments(db, user, news_id) if existing: target = random.choice(existing) cid = str(target.get("id") or target.get("commentId") or "") parent_content = target.get("content") or "" if cid: reply_text, r_tokens = await ai_service.generate_reply( db, news_title, parent_content, style_prompt, personality.word_count_min, personality.word_count_max ) if reply_text: r_ok, r_err = await news_service.post_reply( db, user, news_id, cid, reply_text ) await self._save_record( db, user, news_id, news_title, "reply", reply_text, r_tokens, r_ok, r_err, parent_comment_id=cid ) if r_ok: interactions_done.append("reply") await db.commit() logger.info(f"👤 {user.account} 互动完成: {interactions_done} [新闻: {news_title[:20]}]") except Exception as e: logger.error(f"用户 {user_id} 互动异常: {e}") async def _incr_total(self, db, user_id: int): await db.execute( update(VirtualUser).where(VirtualUser.id == user_id).values( total_interactions=VirtualUser.total_interactions + 1, last_interact_at=datetime.now() ) ) async def _save_record( self, db, user: VirtualUser, article_id: str, article_title: str, interact_type: str, content: Optional[str], tokens: int, success: bool, error_msg: str, parent_comment_id: str = None, platform_record_id: str = None ): from app.core.redis_client import get_session session = await get_session(user.id) session_id = session.get("session_id") if session else None record = InteractionRecord( user_id=user.id, user_nickname=user.nickname, user_account=user.account, article_id=article_id, article_title=article_title, interact_type=interact_type, content=content, parent_comment_id=parent_comment_id, platform_record_id=platform_record_id, session_id=session_id, token_consumed=tokens, status=1 if success else 2, error_msg=error_msg or None, executed_at=datetime.now(), ) db.add(record) async def _get_config_from_db(self, db, key: str, default: str = "") -> str: result = await db.execute(select(SystemConfig).where(SystemConfig.config_key == key)) cfg = result.scalar_one_or_none() return cfg.config_value if cfg else default async def _daily_reset(self): """每日零点重置计数""" async with AsyncSessionLocal() as db: await db.execute( update(VirtualUser).values( today_comment_count=0, today_like_count=0 ) ) # 重置Token限额标志 result = await db.execute( select(SystemConfig).where(SystemConfig.config_key == "token_limit_reached") ) cfg = result.scalar_one_or_none() if cfg: cfg.config_value = "false" await db.commit() logger.info("每日计数重置完成") scheduler_service = SchedulerService()