fix: 今日文章配额控制,避免全部虚拟用户集中互动同一篇

问题:今日只有1篇文章时,所有虚拟用户全部互动该文章,历史文章无人问津

修复方案(配额制):
- 新增 count_today_articles():轻量统计今日广场文章数
- 配额规则:每篇今日文章最多吸引3个虚拟用户(可调)
  - 今日1篇 → 最多3人互动今日,其余全走历史
  - 今日5篇 → 最多15人互动今日,其余走历史
  - 今日10篇以上 → 批次内所有人均可互动今日文章
- get_news_list() 新增 force_history 参数,强制走 Phase 2
- 调度器在分发任务前计算配额,超出配额的用户透传 force_history=True

效果:新文章获得合理曝光,历史文章持续被互动,分布更自然
This commit is contained in:
stefanfeng
2026-04-08 11:47:36 +08:00
parent b43ee777fc
commit c944fbb0ea
2 changed files with 68 additions and 8 deletions

View File

@@ -330,6 +330,50 @@ class NewsPlatformService:
return False return False
# ─── 新闻列表 ────────────────────────────────────────────── # ─── 新闻列表 ──────────────────────────────────────────────
async def count_today_articles(self, db, user) -> int:
"""
获取今日广场新文章数量(用于调度器计算配额)。
使用轻量请求只取第1页统计今日文章数。
无会话时返回 0。
"""
if user is None:
return 0
from datetime import datetime as _dt
sess = await get_session(user.id)
if not sess:
return 0
biz = await self._biz_url(db)
cfg = await self._client(db)
token = sess["token"]
try:
params = self._build_form({"pageNum": 1, "pageSize": 50, "type": "1"}, cfg)
async with httpx.AsyncClient(timeout=8) as c:
r = await c.get(
f"{biz}/business/square/list",
headers=self._bearer(token),
params=params,
)
if r.status_code != 200:
return 0
d = r.json()
if d.get("code") not in [0, 200]:
return 0
nd = d.get("data", {})
items = nd.get("data") or nd.get("list") or nd.get("records") or []
today = 0
for a in items:
t = a.get("publishTime") or a.get("createTime") or ""
if not t:
continue
try:
if _dt.strptime(t[:10], "%Y-%m-%d").date() == _dt.now().date():
today += 1
except Exception:
pass
return today
except Exception:
return 0
async def validate_article(self, db, user, article_id: str) -> bool: async def validate_article(self, db, user, article_id: str) -> bool:
""" """
验证文章是否可用: 验证文章是否可用:
@@ -376,7 +420,7 @@ class NewsPlatformService:
logger.warning(f"[文章校验] {article_id} 请求异常: {e}") logger.warning(f"[文章校验] {article_id} 请求异常: {e}")
return False return False
async def get_news_list(self, db, user, count=5, interest_tags=None) -> list: async def get_news_list(self, db, user, count=5, interest_tags=None, force_history=False) -> list:
""" """
获取文章列表,优先返回今日新发布的文章(从新到旧轮询), 获取文章列表,优先返回今日新发布的文章(从新到旧轮询),
无今日新文章时才随机翻历史页。 无今日新文章时才随机翻历史页。
@@ -490,9 +534,10 @@ class NewsPlatformService:
aid = str(a.get("recordId") or a.get("id", "")) aid = str(a.get("recordId") or a.get("id", ""))
if await self.validate_article(db, user, aid): if await self.validate_article(db, user, aid):
valid.append(a) valid.append(a)
if valid: if valid and not force_history:
return valid return valid
logger.info(f"[广场新闻] {user.account} 今日文章校验后全部无效,转历史") if not valid:
logger.info(f"[广场新闻] {user.account} 今日文章校验后全部无效,转历史")
# ── Phase 2: 无今日新文章 → 从最新(第1页)开始往旧顺序遍历 ──── # ── Phase 2: 无今日新文章 → 从最新(第1页)开始往旧顺序遍历 ────
# 规则始终从第1页最新开始按页顺序 1→2→3...往旧方向走 # 规则始终从第1页最新开始按页顺序 1→2→3...往旧方向走

View File

@@ -171,13 +171,28 @@ class SchedulerService:
random.shuffle(rest_users) random.shuffle(rest_users)
selected = priority_users + rest_users[:max(0, batch_size - priority_size)] selected = priority_users + rest_users[:max(0, batch_size - priority_size)]
# ── 今日文章配额计算 ──────────────────────────────────────
# 获取今日文章数量,决定本轮有多少用户应互动今日文章
today_count = 0
try:
from app.services.news_service import news_service as _ns
today_count = await _ns.count_today_articles(db, selected[0] if selected else None)
except Exception:
pass
# 配额规则:每篇今日文章最多吸引 3 个虚拟用户,超出部分走历史
today_quota = min(today_count * 3, len(selected))
logger.info( logger.info(
f"[调度] 共 {len(all_users)} 个用户,{len(eligible)} 个满足间隔," f"[调度] 共 {len(all_users)} 个用户,{len(eligible)} 个满足间隔,"
f"本轮选取 {len(selected)}执行互动" f"本轮选取 {len(selected)},今日文章 {today_count} 篇,"
f"配额 {today_quota} 人互动今日/{len(selected)-today_quota} 人走历史"
) )
for user in selected: for i, user in enumerate(selected):
asyncio.create_task(self._execute_user_interaction(user.id)) # 超出今日配额的用户强制走历史文章
force_history = (i >= today_quota)
asyncio.create_task(self._execute_user_interaction(user.id, force_history=force_history))
async def _try_login_users(self, db): async def _try_login_users(self, db):
"""尝试登录未登录的用户""" """尝试登录未登录的用户"""
@@ -196,7 +211,7 @@ class SchedulerService:
except Exception as e: except Exception as e:
logger.error(f"自动登录失败 {user.account}: {e}") logger.error(f"自动登录失败 {user.account}: {e}")
async def _execute_user_interaction(self, user_id: int): async def _execute_user_interaction(self, user_id: int, force_history: bool = False):
"""执行单用户互动 - 基于真实接口""" """执行单用户互动 - 基于真实接口"""
from app.services.news_service import news_service from app.services.news_service import news_service
from app.services.ai_service import ai_service from app.services.ai_service import ai_service
@@ -224,7 +239,7 @@ class SchedulerService:
# 获取新闻列表(基于接口 GET /news/list # 获取新闻列表(基于接口 GET /news/list
articles = await news_service.get_news_list( articles = await news_service.get_news_list(
db, user, count=5, interest_tags=interest_tags db, user, count=5, interest_tags=interest_tags, force_history=force_history
) )
if not articles: if not articles:
# 尝试从 session 获取 org_id 再试一次 # 尝试从 session 获取 org_id 再试一次