问题根因: - 一直用 /business/member/square/list(成员广场接口) - 该接口加了 isPlatformShow=true 和 isAdmin=false 过滤 - 导致大量文章(包括今日最新文章)被过滤掉,只剩124篇 - 正确接口 /business/square/list?type=1 有2504篇文章,今日文章可见 修改内容: 1. 接口地址改为 /business/square/list(4处) 2. 去掉 isPlatformShow 和 isAdmin 参数,只保留 type=1 3. _is_today 时间字段优先用 publishTime(新接口主要返回此字段) 4. 排序和热度权重计算也统一用 publishTime 优先
904 lines
38 KiB
Python
Executable File
904 lines
38 KiB
Python
Executable File
"""
|
||
新闻平台对接服务
|
||
登录: POST {auth}/open/login/token (formData)
|
||
签名: 完全对应 sign.js 的实现
|
||
"""
|
||
import uuid
|
||
import hashlib
|
||
import hmac
|
||
from datetime import datetime, timedelta
|
||
from typing import Optional
|
||
import httpx
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
from sqlalchemy import select, update
|
||
|
||
from app.models import VirtualUser, SystemConfig, LoginLog
|
||
from app.utils.crypto import decrypt
|
||
from app.core.redis_client import set_session, get_session, delete_session
|
||
from app.core.logger import logger
|
||
|
||
|
||
class NewsPlatformService:
|
||
|
||
# ─── 配置读取 ──────────────────────────────────────────────
|
||
async def _cfg(self, db: AsyncSession, key: str, default: str = "") -> str:
|
||
result = await db.execute(select(SystemConfig).where(SystemConfig.config_key == key))
|
||
row = result.scalar_one_or_none()
|
||
return row.config_value if row else default
|
||
|
||
async def _biz_url(self, db: AsyncSession) -> str:
|
||
return await self._cfg(db, "news_platform_base_url", "http://192.168.1.200:63120")
|
||
|
||
# Runtime cache of invalid article IDs (too short / not found / error)
|
||
# Persists for the lifetime of the process — avoids repeated API calls
|
||
_invalid_ids_cache: set = set()
|
||
|
||
async def _auth_url(self, db: AsyncSession) -> str:
|
||
return await self._cfg(db, "auth_base_url", "http://192.168.1.200:60040")
|
||
|
||
async def _client(self, db: AsyncSession) -> dict:
|
||
return {
|
||
"appId": await self._cfg(db, "platform_app_id", ""),
|
||
"accessId": await self._cfg(db, "platform_access_id", ""),
|
||
"accessSecret": await self._cfg(db, "platform_access_secret", ""),
|
||
"clientCode": await self._cfg(db, "platform_client_code", ""),
|
||
"orgId": await self._cfg(db, "platform_org_id", ""),
|
||
}
|
||
|
||
# ─── 签名(完全对应 sign.js 逻辑) ─────────────────────────
|
||
@staticmethod
|
||
def _make_sign(params: dict, secret_key: str, sign_type: str = "MD5") -> str:
|
||
"""
|
||
完全对应 sign.js:
|
||
1. 所有参数 key 排序
|
||
2. 过滤掉 signature / accessSecret / 空值 / 空数组
|
||
3. 拼接 key=value& ... accessSecret=secretKey
|
||
4. MD5/SHA256 大写
|
||
"""
|
||
SIGN_KEY = "signature"
|
||
SECRET_KEY = "accessSecret"
|
||
|
||
keys = sorted(params.keys())
|
||
str_parts = []
|
||
for k in keys:
|
||
if k in (SIGN_KEY, SECRET_KEY):
|
||
continue
|
||
v = params.get(k)
|
||
if v is None or v == "" or v == []:
|
||
continue
|
||
if isinstance(v, list):
|
||
continue
|
||
str_parts.append(f"{k}={v}")
|
||
|
||
sign_str = "&".join(str_parts) + f"&{SECRET_KEY}={secret_key}"
|
||
|
||
if sign_type.upper() == "SHA256":
|
||
return hashlib.sha256(sign_str.encode("utf-8")).hexdigest().upper()
|
||
else:
|
||
return hashlib.md5(sign_str.encode("utf-8")).hexdigest().upper()
|
||
|
||
@staticmethod
|
||
def _get_nonce() -> str:
|
||
import random, math
|
||
return str(random.random())[2:][: random.randint(8, 12)]
|
||
|
||
@staticmethod
|
||
def _get_timestamp() -> str:
|
||
"""yyyyMMddhhmmss — 注意 sign.js 用小写 hh(12小时制)"""
|
||
now = datetime.now()
|
||
# sign.js 用 yyyyMMddhhmmss (12小时制小写hh)
|
||
return now.strftime("%Y%m%d%I%M%S")
|
||
|
||
def _build_form(self, extra: dict, cfg: dict) -> dict:
|
||
"""构建带签名的 formData"""
|
||
sign_type = "MD5"
|
||
sign_version = "1.0"
|
||
secret_key = cfg.get("accessSecret", "")
|
||
|
||
base = {
|
||
"appId": cfg.get("appId", ""),
|
||
"accessId": cfg.get("accessId", ""),
|
||
"timestamp": self._get_timestamp(),
|
||
"signType": sign_type,
|
||
"signVersion": sign_version,
|
||
"accessSecret": secret_key,
|
||
"nonce": self._get_nonce(),
|
||
}
|
||
base.update(extra)
|
||
|
||
# 计算签名
|
||
signature = self._make_sign(base, secret_key, sign_type) if secret_key else ""
|
||
base["signature"] = signature
|
||
|
||
# 移除 accessSecret(不发送到服务器)
|
||
base.pop("accessSecret", None)
|
||
return base
|
||
|
||
# ─── 登录 ──────────────────────────────────────────────────
|
||
async def login(self, db: AsyncSession, user: VirtualUser) -> bool:
|
||
password = decrypt(user.password_enc)
|
||
if not password:
|
||
logger.error(f"[登录] {user.account} 密码解密失败")
|
||
return False
|
||
|
||
auth = await self._auth_url(db)
|
||
cfg = await self._client(db)
|
||
|
||
await db.execute(update(VirtualUser).where(VirtualUser.id == user.id).values(status=1))
|
||
await db.commit()
|
||
|
||
extra = {
|
||
"username": user.account,
|
||
"password": password,
|
||
"loginType": "password",
|
||
"grantType": "password",
|
||
"isRegister": "false",
|
||
}
|
||
if cfg.get("clientCode"):
|
||
extra["clientCode"] = cfg["clientCode"]
|
||
|
||
form = self._build_form(extra, cfg)
|
||
|
||
exc = None
|
||
try:
|
||
async with httpx.AsyncClient(
|
||
timeout=30,
|
||
follow_redirects=True, # 自动跟随 HTTP 重定向
|
||
) as c:
|
||
# 登录接口路径:需要加 /usercenter 前缀(通过网关路由)
|
||
# auth_base_url 配置为完整前缀,如 https://fat-open.99hui.com/api/usercenter
|
||
login_url = f"{auth}/open/login/token"
|
||
resp = await c.post(login_url, data=form)
|
||
|
||
# 详细记录原始响应,便于排查
|
||
logger.info(
|
||
f"[登录] {user.account} 原始响应: "
|
||
f"status={resp.status_code} "
|
||
f"content-type={resp.headers.get('content-type','')} "
|
||
f"body={resp.text[:500]}"
|
||
)
|
||
|
||
# 防止空响应体崩溃
|
||
if not resp.text.strip():
|
||
logger.warning(f"[登录] {user.account} 服务器返回空响应体,接口可能不存在或被重定向")
|
||
raise ValueError(f"服务器返回空响应 HTTP={resp.status_code}")
|
||
|
||
# 尝试解析 JSON
|
||
try:
|
||
body = resp.json()
|
||
except Exception as je:
|
||
logger.warning(f"[登录] {user.account} 响应非JSON: {resp.text[:200]}")
|
||
raise ValueError(f"响应非JSON: {resp.text[:100]}")
|
||
|
||
if resp.status_code == 200 and body.get("code") in [0, 200]:
|
||
raw = body.get("data")
|
||
access_token, platform_uid = self._extract_token(raw)
|
||
|
||
if access_token:
|
||
sid = str(uuid.uuid4())
|
||
# 登录成功后尝试获取用户组织信息
|
||
org_id = await self._fetch_org_id(db, access_token, platform_uid, cfg)
|
||
if org_id and not cfg.get("orgId"):
|
||
# 如果系统没有配置 orgId,则自动保存用户所属组织
|
||
await self._save_org_id(db, org_id)
|
||
|
||
# 从登录响应中提取用户信息
|
||
user_info = raw.get("userInfo", {}) if isinstance(raw, dict) else {}
|
||
sync_nickname = user_info.get("nickName") or user_info.get("username") or ""
|
||
sync_real_name = user_info.get("realName") or ""
|
||
sync_sex = int(user_info.get("sex") or 0)
|
||
sync_avatar = user_info.get("avatar") or ""
|
||
|
||
await set_session(user.id, {
|
||
"token": access_token,
|
||
"session_id": sid,
|
||
"platform_uid": platform_uid,
|
||
"org_id": org_id or cfg.get("orgId", ""),
|
||
"login_time": datetime.now().isoformat(),
|
||
# 缓存用户信息供 sync 使用
|
||
"nickname": sync_nickname,
|
||
"real_name": sync_real_name,
|
||
"sex": sync_sex,
|
||
"avatar": sync_avatar,
|
||
}, expire=86400)
|
||
|
||
# 更新本地数据库,同步平台用户信息
|
||
update_vals = dict(
|
||
status=2, session_token=access_token,
|
||
session_expires_at=datetime.now() + timedelta(hours=24),
|
||
last_login_at=datetime.now(),
|
||
platform_uid=platform_uid,
|
||
)
|
||
if sync_nickname: update_vals["nickname"] = sync_nickname
|
||
if sync_real_name: update_vals["real_name"] = sync_real_name
|
||
if sync_sex: update_vals["sex"] = sync_sex
|
||
if sync_avatar: update_vals["avatar_url"] = sync_avatar
|
||
await db.execute(update(VirtualUser).where(VirtualUser.id == user.id).values(**update_vals))
|
||
await db.commit()
|
||
await self._write_login_log(db, user, "login", sid)
|
||
logger.info(f"✅ [登录] {user.account} 成功 orgId={org_id}")
|
||
return True
|
||
|
||
logger.warning(f"[登录] {user.account} 无token: {body}")
|
||
else:
|
||
logger.warning(f"[登录] {user.account} 失败: HTTP={resp.status_code} {body}")
|
||
|
||
except Exception as e:
|
||
exc = e
|
||
logger.error(f"[登录] {user.account} 异常: {e}")
|
||
|
||
await db.execute(update(VirtualUser).where(VirtualUser.id == user.id).values(status=3))
|
||
await db.commit()
|
||
await self._write_login_log(db, user, "fail", error_msg=str(exc or "登录失败"))
|
||
return False
|
||
|
||
async def _fetch_org_id(
|
||
self, db: AsyncSession, token: str, platform_uid: str, cfg: dict
|
||
) -> str:
|
||
"""登录成功后,调用接口获取用户所属组织 orgId"""
|
||
biz = await self._biz_url(db)
|
||
headers = self._bearer(token)
|
||
# 尝试常见的用户信息接口
|
||
endpoints = [
|
||
f"/app/user/info",
|
||
f"/open/user/info",
|
||
f"/user/info",
|
||
]
|
||
for ep in endpoints:
|
||
try:
|
||
form = self._build_form({}, cfg)
|
||
async with httpx.AsyncClient(timeout=8) as c:
|
||
r = await c.get(
|
||
f"{biz}{ep}",
|
||
headers=headers,
|
||
params={k: v for k, v in form.items() if k not in ("username","password")}
|
||
)
|
||
if r.status_code == 200:
|
||
d = r.json()
|
||
data = d.get("data") or {}
|
||
# 从各种可能的字段中提取 orgId
|
||
org_id = (
|
||
data.get("orgId") or data.get("defaultOrgId") or
|
||
data.get("org", {}).get("id") if isinstance(data.get("org"), dict) else None
|
||
)
|
||
if org_id:
|
||
return str(org_id)
|
||
except Exception as e:
|
||
logger.debug(f"获取orgId失败({ep}): {e}")
|
||
return ""
|
||
|
||
async def _save_org_id(self, db: AsyncSession, org_id: str):
|
||
"""自动保存获取到的 orgId 到系统配置"""
|
||
result = await db.execute(
|
||
select(SystemConfig).where(SystemConfig.config_key == "platform_org_id")
|
||
)
|
||
cfg = result.scalar_one_or_none()
|
||
if cfg:
|
||
cfg.config_value = org_id
|
||
else:
|
||
db.add(SystemConfig(
|
||
config_key="platform_org_id",
|
||
config_value=org_id,
|
||
description="平台组织Id(自动获取)"
|
||
))
|
||
await db.commit()
|
||
logger.info(f"✅ 自动获取并保存 orgId={org_id}")
|
||
|
||
@staticmethod
|
||
def _extract_token(raw) -> tuple[str, str]:
|
||
if isinstance(raw, str) and raw:
|
||
return raw, ""
|
||
if isinstance(raw, dict):
|
||
token = (raw.get("access_token") or raw.get("accessToken") or raw.get("token") or "")
|
||
# openid 是平台用户ID(登录响应里 data.openid = data.userInfo.id)
|
||
uid = str(
|
||
raw.get("openid") or
|
||
raw.get("userId") or raw.get("user_id") or raw.get("id") or
|
||
(raw.get("userInfo") or {}).get("id") or ""
|
||
)
|
||
return token, uid
|
||
return "", ""
|
||
|
||
async def logout(self, db: AsyncSession, user_id: int):
|
||
user_r = await db.execute(select(VirtualUser).where(VirtualUser.id == user_id))
|
||
user = user_r.scalar_one_or_none()
|
||
if user:
|
||
sess = await get_session(user_id)
|
||
await self._write_login_log(db, user, "logout",
|
||
sess.get("session_id") if sess else None)
|
||
await delete_session(user_id)
|
||
await db.execute(update(VirtualUser).where(VirtualUser.id == user_id).values(
|
||
status=0, session_token=None, session_expires_at=None))
|
||
await db.commit()
|
||
|
||
async def check_session(self, db: AsyncSession, user: VirtualUser) -> bool:
|
||
sess = await get_session(user.id)
|
||
if not sess:
|
||
return False
|
||
biz = await self._biz_url(db)
|
||
cfg = await self._client(db)
|
||
try:
|
||
params = self._build_form({}, cfg)
|
||
params.update({"orgId": cfg["orgId"] or "1", "pageNum": 1, "pageSize": 1, "status": "approved"})
|
||
async with httpx.AsyncClient(timeout=10) as c:
|
||
r = await c.get(f"{biz}/news/list", headers=self._bearer(sess["token"]), params=params)
|
||
if r.status_code == 200 and r.json().get("code") in [0, 200]:
|
||
return True
|
||
await self.logout(db, user.id)
|
||
return False
|
||
except Exception:
|
||
return False
|
||
|
||
# ─── 新闻列表 ──────────────────────────────────────────────
|
||
async def validate_article(self, db, user, article_id: str) -> bool:
|
||
"""
|
||
验证文章是否可用:
|
||
- 文章存在且接口返回 code=0
|
||
- 去除 HTML 标签后正文字数 >= 100 字
|
||
返回 True 表示可用,False 表示应跳过
|
||
"""
|
||
if not article_id:
|
||
return False
|
||
# 命中缓存直接跳过
|
||
if article_id in news_service._invalid_ids_cache:
|
||
return False
|
||
|
||
sess = await get_session(user.id)
|
||
if not sess:
|
||
return False
|
||
biz = await self._biz_url(db)
|
||
token = sess.get("token", "")
|
||
try:
|
||
async with httpx.AsyncClient(timeout=8) as c:
|
||
r = await c.get(
|
||
f"{biz}/news/{article_id}",
|
||
headers=self._bearer(token),
|
||
)
|
||
if r.status_code != 200:
|
||
news_service._invalid_ids_cache.add(article_id)
|
||
return False
|
||
d = r.json()
|
||
if d.get("code") not in [0, 200]:
|
||
logger.info(f"[文章校验] {article_id} 无效: code={d.get('code')} {d.get('message','')}")
|
||
news_service._invalid_ids_cache.add(article_id)
|
||
return False
|
||
data = d.get("data") or {}
|
||
# 取正文,去除 HTML 标签,统计字数
|
||
raw = data.get("content") or data.get("digest") or data.get("newsTitle") or ""
|
||
import re as _re
|
||
text = _re.sub(r"<[^>]+>", "", raw).strip()
|
||
if len(text) < 100:
|
||
logger.info(f"[文章校验] {article_id} 正文过短({len(text)}字),跳过")
|
||
news_service._invalid_ids_cache.add(article_id)
|
||
return False
|
||
return True
|
||
except Exception as e:
|
||
logger.warning(f"[文章校验] {article_id} 请求异常: {e}")
|
||
return False
|
||
|
||
async def get_news_list(self, db, user, count=5, interest_tags=None) -> list:
|
||
"""
|
||
获取文章列表,优先返回今日新发布的文章(从新到旧轮询),
|
||
无今日新文章时才随机翻历史页。
|
||
"""
|
||
import math, random as _rand
|
||
from datetime import datetime as _dt
|
||
|
||
sess = await get_session(user.id)
|
||
if not sess:
|
||
return []
|
||
biz = await self._biz_url(db)
|
||
cfg = await self._client(db)
|
||
org_id = sess.get("org_id") or cfg.get("orgId") or ""
|
||
platform_uid = sess.get("platform_uid", "")
|
||
token = sess["token"]
|
||
|
||
# 已知静态无效ID(直接过滤,无需 API 校验)
|
||
INVALID_IDS = {
|
||
"1965670408480907266","2029092495693975554","1960652956793597953",
|
||
"1960651987045347330","1960596408620838914","1960596083193180161",
|
||
"1960595664341594113","1952296583257133058",
|
||
} | news_service._invalid_ids_cache
|
||
|
||
def _build(page, size=50):
|
||
# Use /business/square/list (not /business/square/list)
|
||
# Only type=1 filter — isPlatformShow/isAdmin exclude org-less articles
|
||
p = self._build_form({
|
||
"pageNum": page, "pageSize": size,
|
||
"type": "1",
|
||
}, cfg)
|
||
if org_id:
|
||
p["orgId"] = org_id
|
||
return p
|
||
|
||
def _filter(items):
|
||
"""过滤本人发布 + 无效ID + 无标题/无recordId的空条目"""
|
||
# 过滤空条目(recordId/title 为 null,type!=1 等异常数据)
|
||
items = [x for x in items
|
||
if x.get("recordId") and x.get("title") and x.get("type") == "1"]
|
||
if platform_uid:
|
||
items = [x for x in items if x.get("createUser") != platform_uid]
|
||
items = [x for x in items
|
||
if (x.get("recordId") or x.get("id")) not in INVALID_IDS]
|
||
return items
|
||
|
||
def _is_today(item):
|
||
# API returns publishTime (not createTime) for /business/square/list
|
||
t = item.get("publishTime") or item.get("createTime") or item.get("pushTime") or ""
|
||
if not t:
|
||
return False
|
||
try:
|
||
pub = _dt.strptime(t[:10], "%Y-%m-%d")
|
||
return pub.date() == _dt.now().date()
|
||
except Exception:
|
||
return False
|
||
|
||
# ── Phase 1: 今日新发布文章(从新到旧轮询,最多查3页)──────────────
|
||
today_articles = []
|
||
try:
|
||
async with httpx.AsyncClient(timeout=12) as c:
|
||
for page in range(1, 4): # 第1页最新,逐页往前
|
||
r = await c.get(
|
||
f"{biz}/business/square/list",
|
||
headers=self._bearer(token),
|
||
params=_build(page),
|
||
)
|
||
if r.status_code != 200:
|
||
break
|
||
d = r.json()
|
||
if d.get("code") not in [0, 200]:
|
||
break
|
||
nd = d.get("data", {})
|
||
items = nd.get("data") or nd.get("list") or nd.get("records") or []
|
||
items = _filter(items)
|
||
|
||
# 只保留今日发布的文章,按 createTime 降序(接口本就如此)
|
||
today_page = [x for x in items if _is_today(x)]
|
||
today_articles.extend(today_page)
|
||
|
||
# 如果该页已经有非今日文章,说明今日文章已全部抓完
|
||
has_older = any(not _is_today(x) for x in items)
|
||
if has_older or not items:
|
||
break
|
||
except Exception as e:
|
||
logger.warning(f"[广场新闻-今日] {user.account} 请求异常: {e}")
|
||
|
||
if today_articles:
|
||
# 今日文章:从新到旧排序(createTime 降序)
|
||
today_articles.sort(
|
||
key=lambda x: x.get("publishTime") or x.get("createTime") or x.get("pushTime") or "",
|
||
reverse=True
|
||
)
|
||
# 去重(按 recordId)
|
||
seen, unique = set(), []
|
||
for a in today_articles:
|
||
aid = str(a.get("recordId") or a.get("id", ""))
|
||
if aid and aid not in seen:
|
||
seen.add(aid)
|
||
unique.append(a)
|
||
logger.info(
|
||
f"[广场新闻] {user.account} 今日新文章 {len(unique)} 篇,"
|
||
f"顺序轮询(最多取 {count} 篇)"
|
||
)
|
||
candidates = unique[:max(count * 2, 20)] # 取候选池,稍多于 count 以备校验失败
|
||
|
||
# 有效性校验后返回
|
||
valid = []
|
||
for a in candidates:
|
||
if len(valid) >= count:
|
||
break
|
||
aid = str(a.get("recordId") or a.get("id", ""))
|
||
if await self.validate_article(db, user, aid):
|
||
valid.append(a)
|
||
if valid:
|
||
return valid
|
||
logger.info(f"[广场新闻] {user.account} 今日文章校验后全部无效,转历史")
|
||
|
||
# ── Phase 2: 无今日新文章 → 从最新(第1页)开始往旧顺序遍历 ────
|
||
# 规则:始终从第1页(最新)开始,按页顺序 1→2→3...往旧方向走
|
||
# 每小时推进一页,保证不同时段覆盖不同深度的文章
|
||
logger.info(f"[广场新闻] {user.account} 无今日新文章,从最新向旧顺序翻页")
|
||
|
||
# 获取总页数(第1页同时也是数据源)
|
||
total_pages = 1
|
||
page1_items = []
|
||
try:
|
||
async with httpx.AsyncClient(timeout=10) as _c:
|
||
_r = await _c.get(
|
||
f"{biz}/business/square/list",
|
||
headers=self._bearer(token),
|
||
params=_build(1),
|
||
)
|
||
_d = _r.json()
|
||
if _d.get("code") in [0, 200]:
|
||
total_size = _d.get("data", {}).get("totalSize", 0)
|
||
total_pages = max(1, math.ceil(total_size / 50))
|
||
nd = _d.get("data", {})
|
||
page1_items = _filter(nd.get("data") or nd.get("list") or nd.get("records") or [])
|
||
except Exception:
|
||
pass
|
||
|
||
max_pages = min(total_pages, 10)
|
||
# 用小时决定本轮目标页:0点→第1页(最新),每小时推进一页,循环往复
|
||
hour_page = (_dt.now().hour % max_pages) + 1 # 1-based,范围 1~max_pages
|
||
|
||
items = []
|
||
tried_page = 1
|
||
|
||
if hour_page == 1:
|
||
# 当前时段对应第1页,直接使用已获取的数据
|
||
items = page1_items
|
||
tried_page = 1
|
||
else:
|
||
# 先尝试当前时段对应的目标页
|
||
pages_to_try = [hour_page] + list(range(1, hour_page)) + list(range(hour_page + 1, max_pages + 1))
|
||
for page in pages_to_try:
|
||
if page == 1 and page1_items:
|
||
items = page1_items
|
||
tried_page = 1
|
||
break
|
||
try:
|
||
async with httpx.AsyncClient(timeout=15) as c:
|
||
r = await c.get(
|
||
f"{biz}/business/square/list",
|
||
headers=self._bearer(token),
|
||
params=_build(page),
|
||
)
|
||
if r.status_code == 200:
|
||
d = r.json()
|
||
if d.get("code") in [0, 200]:
|
||
nd = d.get("data", {})
|
||
_items = _filter(nd.get("data") or nd.get("list") or nd.get("records") or [])
|
||
if _items:
|
||
items = _items
|
||
tried_page = page
|
||
break
|
||
except Exception as e:
|
||
logger.error(f"[广场新闻-历史] {user.account} page={page}: {e}")
|
||
|
||
logger.info(f"[广场新闻] {user.account} 历史第{tried_page}/{max_pages}页获取到 {len(items)} 条(时段对应第{hour_page}页)")
|
||
|
||
if not items:
|
||
return []
|
||
|
||
# 在该页内随机打散(同一页内不需要排序,保持自然顺序即可)
|
||
_rand.shuffle(items)
|
||
selected = items[:min(count * 2, len(items))]
|
||
|
||
# 有效性校验
|
||
valid = []
|
||
for a in selected:
|
||
if len(valid) >= count:
|
||
break
|
||
aid = str(a.get("recordId") or a.get("id", ""))
|
||
if await self.validate_article(db, user, aid):
|
||
valid.append(a)
|
||
|
||
# 不够则从剩余补充
|
||
for a in items[len(selected):]:
|
||
if len(valid) >= count:
|
||
break
|
||
aid = str(a.get("recordId") or a.get("id", ""))
|
||
if await self.validate_article(db, user, aid):
|
||
valid.append(a)
|
||
|
||
if not valid:
|
||
logger.warning(f"[广场新闻] {user.account} 校验后无可用文章")
|
||
return valid
|
||
|
||
|
||
async def read_news(self, db, user, news_id: str) -> bool:
|
||
sess = await get_session(user.id)
|
||
if not sess:
|
||
return False
|
||
biz = await self._biz_url(db)
|
||
cfg = await self._client(db)
|
||
try:
|
||
async with httpx.AsyncClient(timeout=10) as c:
|
||
r = await c.patch(
|
||
f"{biz}/news/read/{news_id}",
|
||
headers=self._bearer(sess["token"]),
|
||
data=self._build_form({}, cfg),
|
||
)
|
||
return r.status_code == 200
|
||
except Exception:
|
||
return False
|
||
|
||
async def post_comment(self, db, user, news_id, news_title, content, news_author_id="", org_id="") -> tuple[bool, str]:
|
||
sess = await get_session(user.id)
|
||
if not sess:
|
||
return False, "未登录"
|
||
biz = await self._biz_url(db)
|
||
cfg = await self._client(db)
|
||
uid = sess.get("platform_uid", "")
|
||
# org_id 优先取文章自带的(从广场数据获取),否则取 session/配置
|
||
final_org_id = org_id or sess.get("org_id") or cfg.get("orgId") or ""
|
||
body = {
|
||
"module": "news", "topicId": news_id, "title": news_title,
|
||
"content": content, "orgId": final_org_id,
|
||
"toUserId": news_author_id or uid, "userId": uid,
|
||
"userName": user.nickname, "avatar": user.avatar_url or "",
|
||
}
|
||
return await self._json_post(f"{biz}/message/comment", self._bearer(sess["token"]), body)
|
||
|
||
async def post_reply(self, db, user, news_id, comment_id, content) -> tuple[bool, str]:
|
||
sess = await get_session(user.id)
|
||
if not sess:
|
||
return False, "未登录"
|
||
biz = await self._biz_url(db)
|
||
uid = sess.get("platform_uid", "")
|
||
body = {
|
||
"module": "news", "topicId": news_id, "commentId": comment_id,
|
||
"commentUserId": uid, "content": content,
|
||
"fromUserName": user.nickname, "avatar": user.avatar_url or "",
|
||
}
|
||
return await self._json_post(f"{biz}/message/comment/reply", self._bearer(sess["token"]), body)
|
||
|
||
async def get_comments(self, db, user, news_id) -> list:
|
||
sess = await get_session(user.id)
|
||
if not sess:
|
||
return []
|
||
biz = await self._biz_url(db)
|
||
cfg = await self._client(db)
|
||
try:
|
||
params = self._build_form({"module": "news", "topicId": news_id, "pageNum": 1, "pageSize": 20}, cfg)
|
||
async with httpx.AsyncClient(timeout=10) as c:
|
||
r = await c.get(f"{biz}/message/comment", headers=self._bearer(sess["token"]), params=params)
|
||
if r.status_code == 200:
|
||
return r.json().get("data", {}).get("data") or []
|
||
except Exception:
|
||
pass
|
||
return []
|
||
|
||
async def like_news(self, db, user, news_id, org_id="", to_user_id="", title="") -> tuple[bool, str]:
|
||
sess = await get_session(user.id)
|
||
if not sess:
|
||
return False, "未登录"
|
||
biz = await self._biz_url(db)
|
||
uid = sess.get("platform_uid", "")
|
||
body = {
|
||
"module": "news",
|
||
"topicId": news_id,
|
||
"userId": uid,
|
||
"toUserId": to_user_id or uid,
|
||
"orgId": org_id or sess.get("org_id", "") or "",
|
||
"title": title,
|
||
}
|
||
return await self._json_post(f"{biz}/message/praise", self._bearer(sess["token"]), body)
|
||
|
||
async def collect_news(self, db, user, news_id, org_id="", to_user_id="", title="") -> tuple[bool, str]:
|
||
"""收藏新闻:复用点赞接口(平台收藏=点赞同一接口)"""
|
||
return await self.like_news(db, user, news_id, org_id=org_id, to_user_id=to_user_id, title=title)
|
||
|
||
async def forward_news(self, db, user, news_id) -> tuple[bool, str]:
|
||
sess = await get_session(user.id)
|
||
if not sess:
|
||
return False, "未登录"
|
||
biz = await self._biz_url(db)
|
||
cfg = await self._client(db)
|
||
org_id = sess.get("org_id") or cfg.get("orgId") or "1"
|
||
headers = self._bearer(sess["token"])
|
||
try:
|
||
async with httpx.AsyncClient(timeout=8) as c:
|
||
await c.get(f"{biz}/news/share/wechat/{news_id}", headers=headers,
|
||
params=self._build_form({}, cfg))
|
||
except Exception:
|
||
pass
|
||
try:
|
||
async with httpx.AsyncClient(timeout=15) as c:
|
||
r = await c.post(
|
||
f"{biz}/points/forward/news/{org_id}",
|
||
headers=headers,
|
||
data=self._build_form({}, cfg),
|
||
)
|
||
return self._ok(r)
|
||
except Exception as e:
|
||
return False, str(e)
|
||
|
||
@staticmethod
|
||
def _bearer(token: str) -> dict:
|
||
return {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
|
||
|
||
@staticmethod
|
||
def _ok(resp: httpx.Response) -> tuple[bool, str]:
|
||
if resp.status_code in [200, 201]:
|
||
try:
|
||
d = resp.json()
|
||
if d.get("code") in [0, 200]:
|
||
return True, ""
|
||
return False, d.get("message") or "业务失败"
|
||
except Exception:
|
||
return True, ""
|
||
return False, f"HTTP {resp.status_code}"
|
||
|
||
async def _json_post(self, url, headers, body) -> tuple[bool, str]:
|
||
try:
|
||
async with httpx.AsyncClient(timeout=15) as c:
|
||
r = await c.post(url, json=body, headers=headers)
|
||
return self._ok(r)
|
||
except Exception as e:
|
||
return False, str(e)
|
||
|
||
async def _write_login_log(self, db, user, action, session_id=None, error_msg=None):
|
||
try:
|
||
log = LoginLog(
|
||
user_id=user.id, user_account=user.account,
|
||
action=action, session_id=session_id, error_msg=error_msg,
|
||
)
|
||
db.add(log)
|
||
await db.commit()
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
# ─── 取消互动 ────────────────────────────────────────────────────
|
||
async def cancel_like(self, db, user, news_id: str, org_id: str = "", to_user_id: str = "", title: str = "") -> tuple[bool, str]:
|
||
"""DELETE /message/praise/cancel 取消点赞"""
|
||
sess = await get_session(user.id)
|
||
if not sess:
|
||
return False, "未登录"
|
||
biz = await self._biz_url(db)
|
||
uid = sess.get("platform_uid", "")
|
||
body = {
|
||
"module": "news",
|
||
"topicId": news_id,
|
||
"userId": uid,
|
||
"toUserId": to_user_id or uid,
|
||
"orgId": org_id or sess.get("org_id", "") or "",
|
||
"title": title,
|
||
}
|
||
try:
|
||
async with httpx.AsyncClient(timeout=10) as c:
|
||
r = await c.delete(
|
||
f"{biz}/message/praise/cancel",
|
||
json=body,
|
||
headers=self._bearer(sess["token"])
|
||
)
|
||
d = r.json()
|
||
if d.get("code") in [0, 200]:
|
||
return True, ""
|
||
return False, d.get("message", "取消点赞失败")
|
||
except Exception as e:
|
||
return False, str(e)
|
||
|
||
async def cancel_comment(self, db, user, news_id: str, comment_id: str) -> tuple[bool, str]:
|
||
"""DELETE /message/comment/{topicId}/{id} 删除评论"""
|
||
sess = await get_session(user.id)
|
||
if not sess:
|
||
return False, "未登录"
|
||
biz = await self._biz_url(db)
|
||
cfg = await self._client(db)
|
||
# 签名参数放 formData,路径里是 topicId 和 comment_id
|
||
params = self._build_form({}, cfg)
|
||
try:
|
||
async with httpx.AsyncClient(timeout=10) as c:
|
||
r = await c.delete(
|
||
f"{biz}/message/comment/{news_id}/{comment_id}",
|
||
headers=self._bearer(sess["token"]),
|
||
params=params
|
||
)
|
||
d = r.json()
|
||
if d.get("code") in [0, 200]:
|
||
return True, ""
|
||
return False, d.get("message", "删除评论失败")
|
||
except Exception as e:
|
||
return False, str(e)
|
||
|
||
async def cancel_collect(self, db, user, news_id: str, org_id: str = "", to_user_id: str = "", title: str = "") -> tuple[bool, str]:
|
||
"""取消收藏(复用取消点赞接口)"""
|
||
return await self.cancel_like(db, user, news_id, org_id=org_id, to_user_id=to_user_id, title=title)
|
||
|
||
# ─── 修改目标系统用户信息 ─────────────────────────────────────
|
||
async def update_user_profile(
|
||
self, db: AsyncSession, user: VirtualUser,
|
||
nick_name: str = None, real_name: str = None,
|
||
sex: int = None, avatar: str = None,
|
||
description: str = None, email: str = None,
|
||
) -> tuple[bool, str]:
|
||
"""
|
||
调用 POST /usercenter/user/change/userInfo 修改用户信息
|
||
支持:昵称、真实姓名、性别、头像、简介、邮箱
|
||
同时同步更新本地数据库
|
||
"""
|
||
sess = await get_session(user.id)
|
||
if not sess:
|
||
return False, "用户未登录,请先登录"
|
||
|
||
auth = await self._auth_url(db)
|
||
cfg = await self._client(db)
|
||
token = sess.get("token", "")
|
||
platform_uid = sess.get("platform_uid", "")
|
||
|
||
if not platform_uid:
|
||
return False, "缺少平台用户ID,请重新登录"
|
||
|
||
# 构建请求体(只传有值的字段)
|
||
# 构建请求体,确保至少有 nickName 字段(平台 SQL 要求 SET 子句不为空)
|
||
body = {
|
||
"id": platform_uid,
|
||
"nickName": nick_name if nick_name is not None else (user.nickname or ""),
|
||
"realName": real_name if real_name is not None else (user.real_name or ""),
|
||
"sex": sex if sex is not None else (user.sex or 0),
|
||
}
|
||
if avatar is not None: body["avatar"] = avatar
|
||
if description is not None: body["description"] = description
|
||
if email is not None: body["email"] = email
|
||
|
||
# 使用 PATCH /v2/users/current 接口(支持修改昵称)
|
||
headers = dict(self._bearer(token))
|
||
headers["Content-Type"] = "application/json"
|
||
|
||
try:
|
||
async with httpx.AsyncClient(timeout=15) as c:
|
||
r = await c.patch(
|
||
f"{auth}/v2/users/current",
|
||
json=body,
|
||
headers=headers,
|
||
)
|
||
d = r.json()
|
||
if d.get("code") in [0, 200]:
|
||
# 同步到本地数据库
|
||
local_vals = {}
|
||
if nick_name is not None: local_vals["nickname"] = nick_name
|
||
if real_name is not None: local_vals["real_name"] = real_name
|
||
if sex is not None: local_vals["sex"] = sex
|
||
if avatar is not None: local_vals["avatar_url"] = avatar
|
||
if local_vals:
|
||
from sqlalchemy import update
|
||
await db.execute(update(VirtualUser).where(
|
||
VirtualUser.id == user.id).values(**local_vals))
|
||
await db.commit()
|
||
logger.info(f"✅ 用户 {user.account} 信息已同步到目标系统")
|
||
return True, ""
|
||
err = d.get("message") or f"code={d.get('code')}"
|
||
logger.warning(f"[修改用户信息] {user.account} 失败: {err} body={r.text[:200]}")
|
||
return False, err
|
||
except Exception as e:
|
||
logger.warning(f"[修改用户信息] {user.account} 异常: {e}")
|
||
return False, str(e)
|
||
|
||
async def upload_avatar(
|
||
self, db: AsyncSession, user: VirtualUser, file_bytes: bytes, filename: str
|
||
) -> tuple[bool, str]:
|
||
"""
|
||
上传头像图片到平台 filecenter,返回图片 URL
|
||
POST /filecenter/fileUpload (multipart/form-data)
|
||
"""
|
||
sess = await get_session(user.id)
|
||
if not sess:
|
||
return False, "用户未登录"
|
||
|
||
cfg = await self._client(db)
|
||
token = sess.get("token", "")
|
||
|
||
# filecenter 服务地址
|
||
biz_base = await self._biz_url(db)
|
||
# filecenter 与 huihuibusiness 同网关,替换服务名
|
||
filecenter_url = biz_base.replace("/huihuibusiness", "/filecenter")
|
||
|
||
sign_params = self._build_form({"module": "userInfo", "service": "kccloud"}, cfg)
|
||
headers = {"Authorization": f"Bearer {token}"}
|
||
|
||
try:
|
||
import mimetypes
|
||
mime = mimetypes.guess_type(filename)[0] or "image/jpeg"
|
||
files = {"file": (filename, file_bytes, mime)}
|
||
async with httpx.AsyncClient(timeout=30) as c:
|
||
r = await c.post(
|
||
f"{filecenter_url}/fileUpload",
|
||
files=files,
|
||
data=sign_params,
|
||
headers=headers,
|
||
)
|
||
d = r.json()
|
||
if d.get("code") in [0, 200]:
|
||
url = d.get("data") or d.get("url") or ""
|
||
if isinstance(url, dict):
|
||
url = url.get("url") or url.get("path") or ""
|
||
logger.info(f"✅ 头像上传成功: {url}")
|
||
return True, url
|
||
return False, d.get("message") or "上传失败"
|
||
except Exception as e:
|
||
return False, str(e)
|
||
news_service = NewsPlatformService()
|