diff --git a/backend/app/services/scheduler.py b/backend/app/services/scheduler.py index 10b2ec8..ab6d4b8 100755 --- a/backend/app/services/scheduler.py +++ b/backend/app/services/scheduler.py @@ -130,14 +130,13 @@ class SchedulerService: # 获取最大并发 max_concurrent = int(await self._get_config(db, "max_concurrent_users", "5")) - # 获取已登录、启用的用户 - query = select(VirtualUser).where( - VirtualUser.status == 2, - VirtualUser.is_enabled == 1, + # 获取所有已登录、启用的用户(不加 LIMIT,确保所有用户公平参与) + result = await db.execute( + select(VirtualUser).where( + VirtualUser.status == 2, + VirtualUser.is_enabled == 1, + ) ) - if max_concurrent > 0: - query = query.limit(max_concurrent) - result = await db.execute(query) all_users = result.scalars().all() # 没有已登录用户时,尝试登录未登录用户 @@ -146,26 +145,39 @@ class SchedulerService: return # 检查互动间隔:过滤掉最近 min_interval 秒内已互动的用户 - now_utc = datetime.now() + now_dt = datetime.now() eligible = [] for u in all_users: if u.last_interact_at is None: eligible.append(u) else: - elapsed = (now_utc - u.last_interact_at).total_seconds() + elapsed = (now_dt - u.last_interact_at).total_seconds() if elapsed >= min_interval: eligible.append(u) if not eligible: - logger.debug(f"[调度] 所有用户在 {min_interval}s 内已互动,跳过本次") + logger.debug(f"[调度] 所有 {len(all_users)} 个用户在 {min_interval}s 内已互动,跳过本次") return - logger.info(f"[调度] {len(eligible)}/{len(all_users)} 个用户满足间隔要求,开始互动") + # 按最后互动时间升序排序:最久没互动的用户优先 + eligible.sort(key=lambda u: u.last_interact_at or datetime.min) - # 随机选取用户执行互动 - for user in eligible: - if random.random() < 0.6: - asyncio.create_task(self._execute_user_interaction(user.id)) + # 从符合条件的用户中随机选取 max_concurrent 个执行(保证公平轮转) + batch_size = max_concurrent if max_concurrent > 0 else len(eligible) + # 优先选最久未互动的用户(前1/3),其余随机补充 + priority_size = max(1, batch_size // 3) + priority_users = eligible[:priority_size] + rest_users = eligible[priority_size:] + random.shuffle(rest_users) + selected = priority_users + rest_users[:max(0, batch_size - priority_size)] + + logger.info( + f"[调度] 共 {len(all_users)} 个用户,{len(eligible)} 个满足间隔," + f"本轮选取 {len(selected)} 个执行互动" + ) + + for user in selected: + asyncio.create_task(self._execute_user_interaction(user.id)) async def _try_login_users(self, db): """尝试登录未登录的用户"""