Files
huihuiSquare/backend/app/services/news_service.py
stefanfeng 79d57da769 fix: 今日文章不足时混入历史文章,避免所有用户扎堆同一篇
问题:今日只有1篇文章时,所有虚拟用户都只互动这同一篇
原因:Phase 1 找到今日文章后直接返回,不管数量多少

修复逻辑:
- 今日有效文章 → 每用户最多取 1 篇(count//3,最少1篇)
- 剩余名额(count - today_quota)从历史文章补充
- 历史文章:按当前小时对应页拉取(与Phase 2相同),随机打散
- 历史文章排除今日文章ID和当天发布的文章,保证内容不重复
- 最终返回:今日N篇 + 历史M篇,总量接近 count

效果:今日1篇文章时 → 用户取1篇今日 + 4篇历史,互动多样性恢复
2026-04-08 11:49:57 +08:00

1018 lines
43 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
新闻平台对接服务
登录: POST {auth}/open/login/token (formData)
签名: 完全对应 sign.js 的实现
"""
import uuid
import hashlib
import hmac
from datetime import datetime, timedelta
from typing import Optional
import httpx
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update
from app.models import VirtualUser, SystemConfig, LoginLog
from app.utils.crypto import decrypt
from app.core.redis_client import set_session, get_session, delete_session
from app.core.logger import logger
class NewsPlatformService:
# ─── 配置读取 ──────────────────────────────────────────────
async def _cfg(self, db: AsyncSession, key: str, default: str = "") -> str:
result = await db.execute(select(SystemConfig).where(SystemConfig.config_key == key))
row = result.scalar_one_or_none()
return row.config_value if row else default
async def _biz_url(self, db: AsyncSession) -> str:
return await self._cfg(db, "news_platform_base_url", "http://192.168.1.200:63120")
# Runtime cache of invalid article IDs (too short / not found / error)
# Persists for the lifetime of the process — avoids repeated API calls
_invalid_ids_cache: set = set()
async def _auth_url(self, db: AsyncSession) -> str:
return await self._cfg(db, "auth_base_url", "http://192.168.1.200:60040")
async def _client(self, db: AsyncSession) -> dict:
return {
"appId": await self._cfg(db, "platform_app_id", ""),
"accessId": await self._cfg(db, "platform_access_id", ""),
"accessSecret": await self._cfg(db, "platform_access_secret", ""),
"clientCode": await self._cfg(db, "platform_client_code", ""),
"orgId": await self._cfg(db, "platform_org_id", ""),
}
# ─── 签名(完全对应 sign.js 逻辑) ─────────────────────────
@staticmethod
def _make_sign(params: dict, secret_key: str, sign_type: str = "MD5") -> str:
"""
完全对应 sign.js:
1. 所有参数 key 排序
2. 过滤掉 signature / accessSecret / 空值 / 空数组
3. 拼接 key=value& ... accessSecret=secretKey
4. MD5/SHA256 大写
"""
SIGN_KEY = "signature"
SECRET_KEY = "accessSecret"
keys = sorted(params.keys())
str_parts = []
for k in keys:
if k in (SIGN_KEY, SECRET_KEY):
continue
v = params.get(k)
if v is None or v == "" or v == []:
continue
if isinstance(v, list):
continue
str_parts.append(f"{k}={v}")
sign_str = "&".join(str_parts) + f"&{SECRET_KEY}={secret_key}"
if sign_type.upper() == "SHA256":
return hashlib.sha256(sign_str.encode("utf-8")).hexdigest().upper()
else:
return hashlib.md5(sign_str.encode("utf-8")).hexdigest().upper()
@staticmethod
def _get_nonce() -> str:
import random, math
return str(random.random())[2:][: random.randint(8, 12)]
@staticmethod
def _get_timestamp() -> str:
"""yyyyMMddhhmmss — 注意 sign.js 用小写 hh(12小时制)"""
now = datetime.now()
# sign.js 用 yyyyMMddhhmmss (12小时制小写hh)
return now.strftime("%Y%m%d%I%M%S")
def _build_form(self, extra: dict, cfg: dict) -> dict:
"""构建带签名的 formData"""
sign_type = "MD5"
sign_version = "1.0"
secret_key = cfg.get("accessSecret", "")
base = {
"appId": cfg.get("appId", ""),
"accessId": cfg.get("accessId", ""),
"timestamp": self._get_timestamp(),
"signType": sign_type,
"signVersion": sign_version,
"accessSecret": secret_key,
"nonce": self._get_nonce(),
}
base.update(extra)
# 计算签名
signature = self._make_sign(base, secret_key, sign_type) if secret_key else ""
base["signature"] = signature
# 移除 accessSecret不发送到服务器
base.pop("accessSecret", None)
return base
# ─── 登录 ──────────────────────────────────────────────────
async def login(self, db: AsyncSession, user: VirtualUser) -> bool:
password = decrypt(user.password_enc)
if not password:
logger.error(f"[登录] {user.account} 密码解密失败")
return False
auth = await self._auth_url(db)
cfg = await self._client(db)
await db.execute(update(VirtualUser).where(VirtualUser.id == user.id).values(status=1))
await db.commit()
extra = {
"username": user.account,
"password": password,
"loginType": "password",
"grantType": "password",
"isRegister": "false",
}
if cfg.get("clientCode"):
extra["clientCode"] = cfg["clientCode"]
form = self._build_form(extra, cfg)
exc = None
try:
async with httpx.AsyncClient(
timeout=30,
follow_redirects=True, # 自动跟随 HTTP 重定向
) as c:
# 登录接口路径:需要加 /usercenter 前缀(通过网关路由)
# auth_base_url 配置为完整前缀,如 https://fat-open.99hui.com/api/usercenter
login_url = f"{auth}/open/login/token"
resp = await c.post(login_url, data=form)
# 详细记录原始响应,便于排查
logger.info(
f"[登录] {user.account} 原始响应: "
f"status={resp.status_code} "
f"content-type={resp.headers.get('content-type','')} "
f"body={resp.text[:500]}"
)
# 防止空响应体崩溃
if not resp.text.strip():
logger.warning(f"[登录] {user.account} 服务器返回空响应体,接口可能不存在或被重定向")
raise ValueError(f"服务器返回空响应 HTTP={resp.status_code}")
# 尝试解析 JSON
try:
body = resp.json()
except Exception as je:
logger.warning(f"[登录] {user.account} 响应非JSON: {resp.text[:200]}")
raise ValueError(f"响应非JSON: {resp.text[:100]}")
if resp.status_code == 200 and body.get("code") in [0, 200]:
raw = body.get("data")
access_token, platform_uid = self._extract_token(raw)
if access_token:
sid = str(uuid.uuid4())
# 登录成功后尝试获取用户组织信息
org_id = await self._fetch_org_id(db, access_token, platform_uid, cfg)
if org_id and not cfg.get("orgId"):
# 如果系统没有配置 orgId则自动保存用户所属组织
await self._save_org_id(db, org_id)
# 从登录响应中提取用户信息
user_info = raw.get("userInfo", {}) if isinstance(raw, dict) else {}
sync_nickname = user_info.get("nickName") or user_info.get("username") or ""
sync_real_name = user_info.get("realName") or ""
sync_sex = int(user_info.get("sex") or 0)
sync_avatar = user_info.get("avatar") or ""
await set_session(user.id, {
"token": access_token,
"session_id": sid,
"platform_uid": platform_uid,
"org_id": org_id or cfg.get("orgId", ""),
"login_time": datetime.now().isoformat(),
# 缓存用户信息供 sync 使用
"nickname": sync_nickname,
"real_name": sync_real_name,
"sex": sync_sex,
"avatar": sync_avatar,
}, expire=86400)
# 更新本地数据库,同步平台用户信息
update_vals = dict(
status=2, session_token=access_token,
session_expires_at=datetime.now() + timedelta(hours=24),
last_login_at=datetime.now(),
platform_uid=platform_uid,
)
if sync_nickname: update_vals["nickname"] = sync_nickname
if sync_real_name: update_vals["real_name"] = sync_real_name
if sync_sex: update_vals["sex"] = sync_sex
if sync_avatar: update_vals["avatar_url"] = sync_avatar
await db.execute(update(VirtualUser).where(VirtualUser.id == user.id).values(**update_vals))
await db.commit()
await self._write_login_log(db, user, "login", sid)
logger.info(f"✅ [登录] {user.account} 成功 orgId={org_id}")
return True
logger.warning(f"[登录] {user.account} 无token: {body}")
else:
logger.warning(f"[登录] {user.account} 失败: HTTP={resp.status_code} {body}")
except Exception as e:
exc = e
logger.error(f"[登录] {user.account} 异常: {e}")
await db.execute(update(VirtualUser).where(VirtualUser.id == user.id).values(status=3))
await db.commit()
await self._write_login_log(db, user, "fail", error_msg=str(exc or "登录失败"))
return False
async def _fetch_org_id(
self, db: AsyncSession, token: str, platform_uid: str, cfg: dict
) -> str:
"""登录成功后,调用接口获取用户所属组织 orgId"""
biz = await self._biz_url(db)
headers = self._bearer(token)
# 尝试常见的用户信息接口
endpoints = [
f"/app/user/info",
f"/open/user/info",
f"/user/info",
]
for ep in endpoints:
try:
form = self._build_form({}, cfg)
async with httpx.AsyncClient(timeout=8) as c:
r = await c.get(
f"{biz}{ep}",
headers=headers,
params={k: v for k, v in form.items() if k not in ("username","password")}
)
if r.status_code == 200:
d = r.json()
data = d.get("data") or {}
# 从各种可能的字段中提取 orgId
org_id = (
data.get("orgId") or data.get("defaultOrgId") or
data.get("org", {}).get("id") if isinstance(data.get("org"), dict) else None
)
if org_id:
return str(org_id)
except Exception as e:
logger.debug(f"获取orgId失败({ep}): {e}")
return ""
async def _save_org_id(self, db: AsyncSession, org_id: str):
"""自动保存获取到的 orgId 到系统配置"""
result = await db.execute(
select(SystemConfig).where(SystemConfig.config_key == "platform_org_id")
)
cfg = result.scalar_one_or_none()
if cfg:
cfg.config_value = org_id
else:
db.add(SystemConfig(
config_key="platform_org_id",
config_value=org_id,
description="平台组织Id自动获取"
))
await db.commit()
logger.info(f"✅ 自动获取并保存 orgId={org_id}")
@staticmethod
def _extract_token(raw) -> tuple[str, str]:
if isinstance(raw, str) and raw:
return raw, ""
if isinstance(raw, dict):
token = (raw.get("access_token") or raw.get("accessToken") or raw.get("token") or "")
# openid 是平台用户ID登录响应里 data.openid = data.userInfo.id
uid = str(
raw.get("openid") or
raw.get("userId") or raw.get("user_id") or raw.get("id") or
(raw.get("userInfo") or {}).get("id") or ""
)
return token, uid
return "", ""
async def logout(self, db: AsyncSession, user_id: int):
user_r = await db.execute(select(VirtualUser).where(VirtualUser.id == user_id))
user = user_r.scalar_one_or_none()
if user:
sess = await get_session(user_id)
await self._write_login_log(db, user, "logout",
sess.get("session_id") if sess else None)
await delete_session(user_id)
await db.execute(update(VirtualUser).where(VirtualUser.id == user_id).values(
status=0, session_token=None, session_expires_at=None))
await db.commit()
async def check_session(self, db: AsyncSession, user: VirtualUser) -> bool:
sess = await get_session(user.id)
if not sess:
return False
biz = await self._biz_url(db)
cfg = await self._client(db)
try:
params = self._build_form({}, cfg)
params.update({"orgId": cfg["orgId"] or "1", "pageNum": 1, "pageSize": 1, "status": "approved"})
async with httpx.AsyncClient(timeout=10) as c:
r = await c.get(f"{biz}/news/list", headers=self._bearer(sess["token"]), params=params)
if r.status_code == 200 and r.json().get("code") in [0, 200]:
return True
await self.logout(db, user.id)
return False
except Exception:
return False
# ─── 新闻列表 ──────────────────────────────────────────────
async def count_today_articles(self, db, user) -> int:
"""
获取今日广场新文章数量(用于调度器计算配额)。
使用轻量请求只取第1页统计今日文章数。
无会话时返回 0。
"""
if user is None:
return 0
from datetime import datetime as _dt
sess = await get_session(user.id)
if not sess:
return 0
biz = await self._biz_url(db)
cfg = await self._client(db)
token = sess["token"]
try:
params = self._build_form({"pageNum": 1, "pageSize": 50, "type": "1"}, cfg)
async with httpx.AsyncClient(timeout=8) as c:
r = await c.get(
f"{biz}/business/square/list",
headers=self._bearer(token),
params=params,
)
if r.status_code != 200:
return 0
d = r.json()
if d.get("code") not in [0, 200]:
return 0
nd = d.get("data", {})
items = nd.get("data") or nd.get("list") or nd.get("records") or []
today = 0
for a in items:
t = a.get("publishTime") or a.get("createTime") or ""
if not t:
continue
try:
if _dt.strptime(t[:10], "%Y-%m-%d").date() == _dt.now().date():
today += 1
except Exception:
pass
return today
except Exception:
return 0
async def validate_article(self, db, user, article_id: str) -> bool:
"""
验证文章是否可用:
- 文章存在且接口返回 code=0
- 去除 HTML 标签后正文字数 >= 100 字
返回 True 表示可用False 表示应跳过
"""
if not article_id:
return False
# 命中缓存直接跳过
if article_id in news_service._invalid_ids_cache:
return False
sess = await get_session(user.id)
if not sess:
return False
biz = await self._biz_url(db)
token = sess.get("token", "")
try:
async with httpx.AsyncClient(timeout=8) as c:
r = await c.get(
f"{biz}/news/{article_id}",
headers=self._bearer(token),
)
if r.status_code != 200:
news_service._invalid_ids_cache.add(article_id)
return False
d = r.json()
if d.get("code") not in [0, 200]:
logger.info(f"[文章校验] {article_id} 无效: code={d.get('code')} {d.get('message','')}")
news_service._invalid_ids_cache.add(article_id)
return False
data = d.get("data") or {}
# 取正文,去除 HTML 标签,统计字数
raw = data.get("content") or data.get("digest") or data.get("newsTitle") or ""
import re as _re
text = _re.sub(r"<[^>]+>", "", raw).strip()
if len(text) < 100:
logger.info(f"[文章校验] {article_id} 正文过短({len(text)}字),跳过")
news_service._invalid_ids_cache.add(article_id)
return False
return True
except Exception as e:
logger.warning(f"[文章校验] {article_id} 请求异常: {e}")
return False
async def get_news_list(self, db, user, count=5, interest_tags=None, force_history=False) -> list:
"""
获取文章列表,优先返回今日新发布的文章(从新到旧轮询),
无今日新文章时才随机翻历史页。
"""
import math, random as _rand
from datetime import datetime as _dt
sess = await get_session(user.id)
if not sess:
return []
biz = await self._biz_url(db)
cfg = await self._client(db)
org_id = sess.get("org_id") or cfg.get("orgId") or ""
platform_uid = sess.get("platform_uid", "")
token = sess["token"]
# 已知静态无效ID直接过滤无需 API 校验)
INVALID_IDS = {
"1965670408480907266","2029092495693975554","1960652956793597953",
"1960651987045347330","1960596408620838914","1960596083193180161",
"1960595664341594113","1952296583257133058",
} | news_service._invalid_ids_cache
def _build(page, size=50):
# Use /business/square/list (not /business/square/list)
# Only type=1 filter — isPlatformShow/isAdmin exclude org-less articles
p = self._build_form({
"pageNum": page, "pageSize": size,
"type": "1",
}, cfg)
if org_id:
p["orgId"] = org_id
return p
def _filter(items):
"""过滤本人发布 + 无效ID + 无标题/无recordId的空条目"""
# 过滤空条目recordId/title 为 nulltype!=1 等异常数据)
items = [x for x in items
if x.get("recordId") and x.get("title") and x.get("type") == "1"]
if platform_uid:
items = [x for x in items if x.get("createUser") != platform_uid]
items = [x for x in items
if (x.get("recordId") or x.get("id")) not in INVALID_IDS]
return items
def _is_today(item):
# API returns publishTime (not createTime) for /business/square/list
t = item.get("publishTime") or item.get("createTime") or item.get("pushTime") or ""
if not t:
return False
try:
pub = _dt.strptime(t[:10], "%Y-%m-%d")
return pub.date() == _dt.now().date()
except Exception:
return False
# ── Phase 1: 今日新发布文章从新到旧轮询最多查3页──────────────
today_articles = []
try:
async with httpx.AsyncClient(timeout=12) as c:
for page in range(1, 4): # 第1页最新逐页往前
r = await c.get(
f"{biz}/business/square/list",
headers=self._bearer(token),
params=_build(page),
)
if r.status_code != 200:
break
d = r.json()
if d.get("code") not in [0, 200]:
break
nd = d.get("data", {})
items = nd.get("data") or nd.get("list") or nd.get("records") or []
items = _filter(items)
# 只保留今日发布的文章,按 createTime 降序(接口本就如此)
today_page = [x for x in items if _is_today(x)]
today_articles.extend(today_page)
# 如果该页已经有非今日文章,说明今日文章已全部抓完
has_older = any(not _is_today(x) for x in items)
if has_older or not items:
break
except Exception as e:
logger.warning(f"[广场新闻-今日] {user.account} 请求异常: {e}")
if today_articles:
# 今日文章:从新到旧排序
today_articles.sort(
key=lambda x: x.get("publishTime") or x.get("createTime") or x.get("pushTime") or "",
reverse=True
)
# 去重(按 recordId
seen, unique = set(), []
for a in today_articles:
aid = str(a.get("recordId") or a.get("id", ""))
if aid and aid not in seen:
seen.add(aid)
unique.append(a)
# 有效性校验今日文章
today_valid = []
for a in unique:
aid = str(a.get("recordId") or a.get("id", ""))
if await self.validate_article(db, user, aid):
today_valid.append(a)
if today_valid:
# ── 混合策略:今日文章不足时,补充历史文章,避免所有用户扎堆同一篇 ──
# 每个用户最多分配 1 篇今日文章,其余名额从历史文章随机补充
today_quota = min(len(today_valid), max(1, count // 3))
today_pick = today_valid[:today_quota]
today_ids = {str(a.get("recordId") or a.get("id","")) for a in today_pick}
hist_needed = count - today_quota
logger.info(
f"[广场新闻] {user.account} 今日有效 {len(today_valid)} 篇,"
f"{today_quota} 篇今日 + {hist_needed} 篇历史"
)
hist_valid = []
if hist_needed > 0:
import math as _math2
# 拉历史页(与 Phase 2 相同的小时分页逻辑)
total_pages_h = 1
page1_items_h = []
try:
async with httpx.AsyncClient(timeout=10) as _hc:
_hr = await _hc.get(
f"{biz}/business/square/list",
headers=self._bearer(token),
params=_build(1),
)
_hd = _hr.json()
if _hd.get("code") in [0, 200]:
_hnd = _hd.get("data", {})
total_pages_h = max(1, _math2.ceil(_hnd.get("totalSize", 0) / 50))
page1_items_h = _filter(
_hnd.get("data") or _hnd.get("list") or _hnd.get("records") or []
)
except Exception:
pass
max_pages_h = min(total_pages_h, 10)
hour_page_h = (_dt.now().hour % max_pages_h) + 1
hist_pool = page1_items_h if hour_page_h == 1 else []
if hour_page_h != 1:
try:
async with httpx.AsyncClient(timeout=12) as _hc2:
_hr2 = await _hc2.get(
f"{biz}/business/square/list",
headers=self._bearer(token),
params=_build(hour_page_h),
)
if _hr2.status_code == 200:
_hd2 = _hr2.json()
if _hd2.get("code") in [0, 200]:
_hnd2 = _hd2.get("data", {})
hist_pool = _filter(
_hnd2.get("data") or _hnd2.get("list") or _hnd2.get("records") or []
)
except Exception:
hist_pool = page1_items_h # fallback to page 1
# 排除今日文章,避免重复
hist_pool = [x for x in hist_pool
if (x.get("recordId") or x.get("id","")) not in today_ids
and not _is_today(x)]
_rand.shuffle(hist_pool)
for a in hist_pool:
if len(hist_valid) >= hist_needed:
break
aid = str(a.get("recordId") or a.get("id", ""))
if await self.validate_article(db, user, aid):
hist_valid.append(a)
result = today_pick + hist_valid
logger.info(
f"[广场新闻] {user.account} 返回 {len(result)}"
f"(今日 {len(today_pick)} + 历史 {len(hist_valid)}"
)
return result
logger.info(f"[广场新闻] {user.account} 今日文章校验后全部无效,转历史")
# ── Phase 2: 无今日新文章 → 从最新(第1页)开始往旧顺序遍历 ────
# 规则始终从第1页最新开始按页顺序 1→2→3...往旧方向走
# 每小时推进一页,保证不同时段覆盖不同深度的文章
logger.info(f"[广场新闻] {user.account} 无今日新文章,从最新向旧顺序翻页")
# 获取总页数第1页同时也是数据源
total_pages = 1
page1_items = []
try:
async with httpx.AsyncClient(timeout=10) as _c:
_r = await _c.get(
f"{biz}/business/square/list",
headers=self._bearer(token),
params=_build(1),
)
_d = _r.json()
if _d.get("code") in [0, 200]:
total_size = _d.get("data", {}).get("totalSize", 0)
total_pages = max(1, math.ceil(total_size / 50))
nd = _d.get("data", {})
page1_items = _filter(nd.get("data") or nd.get("list") or nd.get("records") or [])
except Exception:
pass
max_pages = min(total_pages, 10)
# 用小时决定本轮目标页0点→第1页(最新),每小时推进一页,循环往复
hour_page = (_dt.now().hour % max_pages) + 1 # 1-based范围 1~max_pages
items = []
tried_page = 1
if hour_page == 1:
# 当前时段对应第1页直接使用已获取的数据
items = page1_items
tried_page = 1
else:
# 先尝试当前时段对应的目标页
pages_to_try = [hour_page] + list(range(1, hour_page)) + list(range(hour_page + 1, max_pages + 1))
for page in pages_to_try:
if page == 1 and page1_items:
items = page1_items
tried_page = 1
break
try:
async with httpx.AsyncClient(timeout=15) as c:
r = await c.get(
f"{biz}/business/square/list",
headers=self._bearer(token),
params=_build(page),
)
if r.status_code == 200:
d = r.json()
if d.get("code") in [0, 200]:
nd = d.get("data", {})
_items = _filter(nd.get("data") or nd.get("list") or nd.get("records") or [])
if _items:
items = _items
tried_page = page
break
except Exception as e:
logger.error(f"[广场新闻-历史] {user.account} page={page}: {e}")
logger.info(f"[广场新闻] {user.account} 历史第{tried_page}/{max_pages}页获取到 {len(items)} 条(时段对应第{hour_page}页)")
if not items:
return []
# 在该页内随机打散(同一页内不需要排序,保持自然顺序即可)
_rand.shuffle(items)
selected = items[:min(count * 2, len(items))]
# 有效性校验
valid = []
for a in selected:
if len(valid) >= count:
break
aid = str(a.get("recordId") or a.get("id", ""))
if await self.validate_article(db, user, aid):
valid.append(a)
# 不够则从剩余补充
for a in items[len(selected):]:
if len(valid) >= count:
break
aid = str(a.get("recordId") or a.get("id", ""))
if await self.validate_article(db, user, aid):
valid.append(a)
if not valid:
logger.warning(f"[广场新闻] {user.account} 校验后无可用文章")
return valid
async def read_news(self, db, user, news_id: str) -> bool:
sess = await get_session(user.id)
if not sess:
return False
biz = await self._biz_url(db)
cfg = await self._client(db)
try:
async with httpx.AsyncClient(timeout=10) as c:
r = await c.patch(
f"{biz}/news/read/{news_id}",
headers=self._bearer(sess["token"]),
data=self._build_form({}, cfg),
)
return r.status_code == 200
except Exception:
return False
async def post_comment(self, db, user, news_id, news_title, content, news_author_id="", org_id="") -> tuple[bool, str]:
sess = await get_session(user.id)
if not sess:
return False, "未登录"
biz = await self._biz_url(db)
cfg = await self._client(db)
uid = sess.get("platform_uid", "")
# org_id 优先取文章自带的(从广场数据获取),否则取 session/配置
final_org_id = org_id or sess.get("org_id") or cfg.get("orgId") or ""
body = {
"module": "news", "topicId": news_id, "title": news_title,
"content": content, "orgId": final_org_id,
"toUserId": news_author_id or uid, "userId": uid,
"userName": user.nickname, "avatar": user.avatar_url or "",
}
return await self._json_post(f"{biz}/message/comment", self._bearer(sess["token"]), body)
async def post_reply(self, db, user, news_id, comment_id, content) -> tuple[bool, str]:
sess = await get_session(user.id)
if not sess:
return False, "未登录"
biz = await self._biz_url(db)
uid = sess.get("platform_uid", "")
body = {
"module": "news", "topicId": news_id, "commentId": comment_id,
"commentUserId": uid, "content": content,
"fromUserName": user.nickname, "avatar": user.avatar_url or "",
}
return await self._json_post(f"{biz}/message/comment/reply", self._bearer(sess["token"]), body)
async def get_comments(self, db, user, news_id) -> list:
sess = await get_session(user.id)
if not sess:
return []
biz = await self._biz_url(db)
cfg = await self._client(db)
try:
params = self._build_form({"module": "news", "topicId": news_id, "pageNum": 1, "pageSize": 20}, cfg)
async with httpx.AsyncClient(timeout=10) as c:
r = await c.get(f"{biz}/message/comment", headers=self._bearer(sess["token"]), params=params)
if r.status_code == 200:
return r.json().get("data", {}).get("data") or []
except Exception:
pass
return []
async def like_news(self, db, user, news_id, org_id="", to_user_id="", title="") -> tuple[bool, str]:
sess = await get_session(user.id)
if not sess:
return False, "未登录"
biz = await self._biz_url(db)
uid = sess.get("platform_uid", "")
body = {
"module": "news",
"topicId": news_id,
"userId": uid,
"toUserId": to_user_id or uid,
"orgId": org_id or sess.get("org_id", "") or "",
"title": title,
}
return await self._json_post(f"{biz}/message/praise", self._bearer(sess["token"]), body)
async def collect_news(self, db, user, news_id, org_id="", to_user_id="", title="") -> tuple[bool, str]:
"""收藏新闻:复用点赞接口(平台收藏=点赞同一接口)"""
return await self.like_news(db, user, news_id, org_id=org_id, to_user_id=to_user_id, title=title)
async def forward_news(self, db, user, news_id) -> tuple[bool, str]:
sess = await get_session(user.id)
if not sess:
return False, "未登录"
biz = await self._biz_url(db)
cfg = await self._client(db)
org_id = sess.get("org_id") or cfg.get("orgId") or "1"
headers = self._bearer(sess["token"])
try:
async with httpx.AsyncClient(timeout=8) as c:
await c.get(f"{biz}/news/share/wechat/{news_id}", headers=headers,
params=self._build_form({}, cfg))
except Exception:
pass
try:
async with httpx.AsyncClient(timeout=15) as c:
r = await c.post(
f"{biz}/points/forward/news/{org_id}",
headers=headers,
data=self._build_form({}, cfg),
)
return self._ok(r)
except Exception as e:
return False, str(e)
@staticmethod
def _bearer(token: str) -> dict:
return {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
@staticmethod
def _ok(resp: httpx.Response) -> tuple[bool, str]:
if resp.status_code in [200, 201]:
try:
d = resp.json()
if d.get("code") in [0, 200]:
return True, ""
return False, d.get("message") or "业务失败"
except Exception:
return True, ""
return False, f"HTTP {resp.status_code}"
async def _json_post(self, url, headers, body) -> tuple[bool, str]:
try:
async with httpx.AsyncClient(timeout=15) as c:
r = await c.post(url, json=body, headers=headers)
return self._ok(r)
except Exception as e:
return False, str(e)
async def _write_login_log(self, db, user, action, session_id=None, error_msg=None):
try:
log = LoginLog(
user_id=user.id, user_account=user.account,
action=action, session_id=session_id, error_msg=error_msg,
)
db.add(log)
await db.commit()
except Exception:
pass
# ─── 取消互动 ────────────────────────────────────────────────────
async def cancel_like(self, db, user, news_id: str, org_id: str = "", to_user_id: str = "", title: str = "") -> tuple[bool, str]:
"""DELETE /message/praise/cancel 取消点赞"""
sess = await get_session(user.id)
if not sess:
return False, "未登录"
biz = await self._biz_url(db)
uid = sess.get("platform_uid", "")
body = {
"module": "news",
"topicId": news_id,
"userId": uid,
"toUserId": to_user_id or uid,
"orgId": org_id or sess.get("org_id", "") or "",
"title": title,
}
try:
async with httpx.AsyncClient(timeout=10) as c:
r = await c.delete(
f"{biz}/message/praise/cancel",
json=body,
headers=self._bearer(sess["token"])
)
d = r.json()
if d.get("code") in [0, 200]:
return True, ""
return False, d.get("message", "取消点赞失败")
except Exception as e:
return False, str(e)
async def cancel_comment(self, db, user, news_id: str, comment_id: str) -> tuple[bool, str]:
"""DELETE /message/comment/{topicId}/{id} 删除评论"""
sess = await get_session(user.id)
if not sess:
return False, "未登录"
biz = await self._biz_url(db)
cfg = await self._client(db)
# 签名参数放 formData路径里是 topicId 和 comment_id
params = self._build_form({}, cfg)
try:
async with httpx.AsyncClient(timeout=10) as c:
r = await c.delete(
f"{biz}/message/comment/{news_id}/{comment_id}",
headers=self._bearer(sess["token"]),
params=params
)
d = r.json()
if d.get("code") in [0, 200]:
return True, ""
return False, d.get("message", "删除评论失败")
except Exception as e:
return False, str(e)
async def cancel_collect(self, db, user, news_id: str, org_id: str = "", to_user_id: str = "", title: str = "") -> tuple[bool, str]:
"""取消收藏(复用取消点赞接口)"""
return await self.cancel_like(db, user, news_id, org_id=org_id, to_user_id=to_user_id, title=title)
# ─── 修改目标系统用户信息 ─────────────────────────────────────
async def update_user_profile(
self, db: AsyncSession, user: VirtualUser,
nick_name: str = None, real_name: str = None,
sex: int = None, avatar: str = None,
description: str = None, email: str = None,
) -> tuple[bool, str]:
"""
调用 POST /usercenter/user/change/userInfo 修改用户信息
支持:昵称、真实姓名、性别、头像、简介、邮箱
同时同步更新本地数据库
"""
sess = await get_session(user.id)
if not sess:
return False, "用户未登录,请先登录"
auth = await self._auth_url(db)
cfg = await self._client(db)
token = sess.get("token", "")
platform_uid = sess.get("platform_uid", "")
if not platform_uid:
return False, "缺少平台用户ID请重新登录"
# 构建请求体(只传有值的字段)
# 构建请求体,确保至少有 nickName 字段(平台 SQL 要求 SET 子句不为空)
body = {
"id": platform_uid,
"nickName": nick_name if nick_name is not None else (user.nickname or ""),
"realName": real_name if real_name is not None else (user.real_name or ""),
"sex": sex if sex is not None else (user.sex or 0),
}
if avatar is not None: body["avatar"] = avatar
if description is not None: body["description"] = description
if email is not None: body["email"] = email
# 使用 PATCH /v2/users/current 接口(支持修改昵称)
headers = dict(self._bearer(token))
headers["Content-Type"] = "application/json"
try:
async with httpx.AsyncClient(timeout=15) as c:
r = await c.patch(
f"{auth}/v2/users/current",
json=body,
headers=headers,
)
d = r.json()
if d.get("code") in [0, 200]:
# 同步到本地数据库
local_vals = {}
if nick_name is not None: local_vals["nickname"] = nick_name
if real_name is not None: local_vals["real_name"] = real_name
if sex is not None: local_vals["sex"] = sex
if avatar is not None: local_vals["avatar_url"] = avatar
if local_vals:
from sqlalchemy import update
await db.execute(update(VirtualUser).where(
VirtualUser.id == user.id).values(**local_vals))
await db.commit()
logger.info(f"✅ 用户 {user.account} 信息已同步到目标系统")
return True, ""
err = d.get("message") or f"code={d.get('code')}"
logger.warning(f"[修改用户信息] {user.account} 失败: {err} body={r.text[:200]}")
return False, err
except Exception as e:
logger.warning(f"[修改用户信息] {user.account} 异常: {e}")
return False, str(e)
async def upload_avatar(
self, db: AsyncSession, user: VirtualUser, file_bytes: bytes, filename: str
) -> tuple[bool, str]:
"""
上传头像图片到平台 filecenter返回图片 URL
POST /filecenter/fileUpload (multipart/form-data)
"""
sess = await get_session(user.id)
if not sess:
return False, "用户未登录"
cfg = await self._client(db)
token = sess.get("token", "")
# filecenter 服务地址
biz_base = await self._biz_url(db)
# filecenter 与 huihuibusiness 同网关,替换服务名
filecenter_url = biz_base.replace("/huihuibusiness", "/filecenter")
sign_params = self._build_form({"module": "userInfo", "service": "kccloud"}, cfg)
headers = {"Authorization": f"Bearer {token}"}
try:
import mimetypes
mime = mimetypes.guess_type(filename)[0] or "image/jpeg"
files = {"file": (filename, file_bytes, mime)}
async with httpx.AsyncClient(timeout=30) as c:
r = await c.post(
f"{filecenter_url}/fileUpload",
files=files,
data=sign_params,
headers=headers,
)
d = r.json()
if d.get("code") in [0, 200]:
url = d.get("data") or d.get("url") or ""
if isinstance(url, dict):
url = url.get("url") or url.get("path") or ""
logger.info(f"✅ 头像上传成功: {url}")
return True, url
return False, d.get("message") or "上传失败"
except Exception as e:
return False, str(e)
news_service = NewsPlatformService()