""" 新闻平台对接服务 登录: POST {auth}/open/login/token (formData) 签名: 完全对应 sign.js 的实现 """ import uuid import hashlib import hmac from datetime import datetime, timedelta from typing import Optional import httpx from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, update from app.models import VirtualUser, SystemConfig, LoginLog from app.utils.crypto import decrypt from app.core.redis_client import set_session, get_session, delete_session from app.core.logger import logger class NewsPlatformService: # ─── 配置读取 ────────────────────────────────────────────── async def _cfg(self, db: AsyncSession, key: str, default: str = "") -> str: result = await db.execute(select(SystemConfig).where(SystemConfig.config_key == key)) row = result.scalar_one_or_none() return row.config_value if row else default async def _biz_url(self, db: AsyncSession) -> str: return await self._cfg(db, "news_platform_base_url", "http://192.168.1.200:63120") # Runtime cache of invalid article IDs (too short / not found / error) # Persists for the lifetime of the process — avoids repeated API calls _invalid_ids_cache: set = set() async def _auth_url(self, db: AsyncSession) -> str: return await self._cfg(db, "auth_base_url", "http://192.168.1.200:60040") async def _client(self, db: AsyncSession) -> dict: return { "appId": await self._cfg(db, "platform_app_id", ""), "accessId": await self._cfg(db, "platform_access_id", ""), "accessSecret": await self._cfg(db, "platform_access_secret", ""), "clientCode": await self._cfg(db, "platform_client_code", ""), "orgId": await self._cfg(db, "platform_org_id", ""), } # ─── 签名(完全对应 sign.js 逻辑) ───────────────────────── @staticmethod def _make_sign(params: dict, secret_key: str, sign_type: str = "MD5") -> str: """ 完全对应 sign.js: 1. 所有参数 key 排序 2. 过滤掉 signature / accessSecret / 空值 / 空数组 3. 拼接 key=value& ... accessSecret=secretKey 4. MD5/SHA256 大写 """ SIGN_KEY = "signature" SECRET_KEY = "accessSecret" keys = sorted(params.keys()) str_parts = [] for k in keys: if k in (SIGN_KEY, SECRET_KEY): continue v = params.get(k) if v is None or v == "" or v == []: continue if isinstance(v, list): continue str_parts.append(f"{k}={v}") sign_str = "&".join(str_parts) + f"&{SECRET_KEY}={secret_key}" if sign_type.upper() == "SHA256": return hashlib.sha256(sign_str.encode("utf-8")).hexdigest().upper() else: return hashlib.md5(sign_str.encode("utf-8")).hexdigest().upper() @staticmethod def _get_nonce() -> str: import random, math return str(random.random())[2:][: random.randint(8, 12)] @staticmethod def _get_timestamp() -> str: """yyyyMMddhhmmss — 注意 sign.js 用小写 hh(12小时制)""" now = datetime.now() # sign.js 用 yyyyMMddhhmmss (12小时制小写hh) return now.strftime("%Y%m%d%I%M%S") def _build_form(self, extra: dict, cfg: dict) -> dict: """构建带签名的 formData""" sign_type = "MD5" sign_version = "1.0" secret_key = cfg.get("accessSecret", "") base = { "appId": cfg.get("appId", ""), "accessId": cfg.get("accessId", ""), "timestamp": self._get_timestamp(), "signType": sign_type, "signVersion": sign_version, "accessSecret": secret_key, "nonce": self._get_nonce(), } base.update(extra) # 计算签名 signature = self._make_sign(base, secret_key, sign_type) if secret_key else "" base["signature"] = signature # 移除 accessSecret(不发送到服务器) base.pop("accessSecret", None) return base # ─── 登录 ────────────────────────────────────────────────── async def login(self, db: AsyncSession, user: VirtualUser) -> bool: password = decrypt(user.password_enc) if not password: logger.error(f"[登录] {user.account} 密码解密失败") return False auth = await self._auth_url(db) cfg = await self._client(db) await db.execute(update(VirtualUser).where(VirtualUser.id == user.id).values(status=1)) await db.commit() extra = { "username": user.account, "password": password, "loginType": "password", "grantType": "password", "isRegister": "false", } if cfg.get("clientCode"): extra["clientCode"] = cfg["clientCode"] form = self._build_form(extra, cfg) exc = None try: async with httpx.AsyncClient( timeout=30, follow_redirects=True, # 自动跟随 HTTP 重定向 ) as c: # 登录接口路径:需要加 /usercenter 前缀(通过网关路由) # auth_base_url 配置为完整前缀,如 https://fat-open.99hui.com/api/usercenter login_url = f"{auth}/open/login/token" resp = await c.post(login_url, data=form) # 详细记录原始响应,便于排查 logger.info( f"[登录] {user.account} 原始响应: " f"status={resp.status_code} " f"content-type={resp.headers.get('content-type','')} " f"body={resp.text[:500]}" ) # 防止空响应体崩溃 if not resp.text.strip(): logger.warning(f"[登录] {user.account} 服务器返回空响应体,接口可能不存在或被重定向") raise ValueError(f"服务器返回空响应 HTTP={resp.status_code}") # 尝试解析 JSON try: body = resp.json() except Exception as je: logger.warning(f"[登录] {user.account} 响应非JSON: {resp.text[:200]}") raise ValueError(f"响应非JSON: {resp.text[:100]}") if resp.status_code == 200 and body.get("code") in [0, 200]: raw = body.get("data") access_token, platform_uid = self._extract_token(raw) if access_token: sid = str(uuid.uuid4()) # 登录成功后尝试获取用户组织信息 org_id = await self._fetch_org_id(db, access_token, platform_uid, cfg) if org_id and not cfg.get("orgId"): # 如果系统没有配置 orgId,则自动保存用户所属组织 await self._save_org_id(db, org_id) # 从登录响应中提取用户信息 user_info = raw.get("userInfo", {}) if isinstance(raw, dict) else {} sync_nickname = user_info.get("nickName") or user_info.get("username") or "" sync_real_name = user_info.get("realName") or "" sync_sex = int(user_info.get("sex") or 0) sync_avatar = user_info.get("avatar") or "" await set_session(user.id, { "token": access_token, "session_id": sid, "platform_uid": platform_uid, "org_id": org_id or cfg.get("orgId", ""), "login_time": datetime.now().isoformat(), # 缓存用户信息供 sync 使用 "nickname": sync_nickname, "real_name": sync_real_name, "sex": sync_sex, "avatar": sync_avatar, }, expire=86400) # 更新本地数据库,同步平台用户信息 update_vals = dict( status=2, session_token=access_token, session_expires_at=datetime.now() + timedelta(hours=24), last_login_at=datetime.now(), platform_uid=platform_uid, ) if sync_nickname: update_vals["nickname"] = sync_nickname if sync_real_name: update_vals["real_name"] = sync_real_name if sync_sex: update_vals["sex"] = sync_sex if sync_avatar: update_vals["avatar_url"] = sync_avatar await db.execute(update(VirtualUser).where(VirtualUser.id == user.id).values(**update_vals)) await db.commit() await self._write_login_log(db, user, "login", sid) logger.info(f"✅ [登录] {user.account} 成功 orgId={org_id}") return True logger.warning(f"[登录] {user.account} 无token: {body}") else: logger.warning(f"[登录] {user.account} 失败: HTTP={resp.status_code} {body}") except Exception as e: exc = e logger.error(f"[登录] {user.account} 异常: {e}") await db.execute(update(VirtualUser).where(VirtualUser.id == user.id).values(status=3)) await db.commit() await self._write_login_log(db, user, "fail", error_msg=str(exc or "登录失败")) return False async def _fetch_org_id( self, db: AsyncSession, token: str, platform_uid: str, cfg: dict ) -> str: """登录成功后,调用接口获取用户所属组织 orgId""" biz = await self._biz_url(db) headers = self._bearer(token) # 尝试常见的用户信息接口 endpoints = [ f"/app/user/info", f"/open/user/info", f"/user/info", ] for ep in endpoints: try: form = self._build_form({}, cfg) async with httpx.AsyncClient(timeout=8) as c: r = await c.get( f"{biz}{ep}", headers=headers, params={k: v for k, v in form.items() if k not in ("username","password")} ) if r.status_code == 200: d = r.json() data = d.get("data") or {} # 从各种可能的字段中提取 orgId org_id = ( data.get("orgId") or data.get("defaultOrgId") or data.get("org", {}).get("id") if isinstance(data.get("org"), dict) else None ) if org_id: return str(org_id) except Exception as e: logger.debug(f"获取orgId失败({ep}): {e}") return "" async def _save_org_id(self, db: AsyncSession, org_id: str): """自动保存获取到的 orgId 到系统配置""" result = await db.execute( select(SystemConfig).where(SystemConfig.config_key == "platform_org_id") ) cfg = result.scalar_one_or_none() if cfg: cfg.config_value = org_id else: db.add(SystemConfig( config_key="platform_org_id", config_value=org_id, description="平台组织Id(自动获取)" )) await db.commit() logger.info(f"✅ 自动获取并保存 orgId={org_id}") @staticmethod def _extract_token(raw) -> tuple[str, str]: if isinstance(raw, str) and raw: return raw, "" if isinstance(raw, dict): token = (raw.get("access_token") or raw.get("accessToken") or raw.get("token") or "") # openid 是平台用户ID(登录响应里 data.openid = data.userInfo.id) uid = str( raw.get("openid") or raw.get("userId") or raw.get("user_id") or raw.get("id") or (raw.get("userInfo") or {}).get("id") or "" ) return token, uid return "", "" async def logout(self, db: AsyncSession, user_id: int): user_r = await db.execute(select(VirtualUser).where(VirtualUser.id == user_id)) user = user_r.scalar_one_or_none() if user: sess = await get_session(user_id) await self._write_login_log(db, user, "logout", sess.get("session_id") if sess else None) await delete_session(user_id) await db.execute(update(VirtualUser).where(VirtualUser.id == user_id).values( status=0, session_token=None, session_expires_at=None)) await db.commit() async def check_session(self, db: AsyncSession, user: VirtualUser) -> bool: sess = await get_session(user.id) if not sess: return False biz = await self._biz_url(db) cfg = await self._client(db) try: params = self._build_form({}, cfg) params.update({"orgId": cfg["orgId"] or "1", "pageNum": 1, "pageSize": 1, "status": "approved"}) async with httpx.AsyncClient(timeout=10) as c: r = await c.get(f"{biz}/news/list", headers=self._bearer(sess["token"]), params=params) if r.status_code == 200 and r.json().get("code") in [0, 200]: return True await self.logout(db, user.id) return False except Exception: return False # ─── 新闻列表 ────────────────────────────────────────────── async def validate_article(self, db, user, article_id: str) -> bool: """ 验证文章是否可用: - 文章存在且接口返回 code=0 - 去除 HTML 标签后正文字数 >= 100 字 返回 True 表示可用,False 表示应跳过 """ if not article_id: return False # 命中缓存直接跳过 if article_id in news_service._invalid_ids_cache: return False sess = await get_session(user.id) if not sess: return False biz = await self._biz_url(db) token = sess.get("token", "") try: async with httpx.AsyncClient(timeout=8) as c: r = await c.get( f"{biz}/news/{article_id}", headers=self._bearer(token), ) if r.status_code != 200: news_service._invalid_ids_cache.add(article_id) return False d = r.json() if d.get("code") not in [0, 200]: logger.info(f"[文章校验] {article_id} 无效: code={d.get('code')} {d.get('message','')}") news_service._invalid_ids_cache.add(article_id) return False data = d.get("data") or {} # 取正文,去除 HTML 标签,统计字数 raw = data.get("content") or data.get("digest") or data.get("newsTitle") or "" import re as _re text = _re.sub(r"<[^>]+>", "", raw).strip() if len(text) < 100: logger.info(f"[文章校验] {article_id} 正文过短({len(text)}字),跳过") news_service._invalid_ids_cache.add(article_id) return False return True except Exception as e: logger.warning(f"[文章校验] {article_id} 请求异常: {e}") return False async def get_news_list(self, db, user, count=5, interest_tags=None) -> list: """ GET /business/member/square/list 广场数据分页查询 type=1 表示新闻,orgId 选填(不填则查全平台新闻,无需配置 orgId) 返回字段:id(广场ID), recordId(新闻实际ID), title, orgId, orgName """ sess = await get_session(user.id) if not sess: return [] biz = await self._biz_url(db) cfg = await self._client(db) org_id = sess.get("org_id") or cfg.get("orgId") or "" # 先查总数,再随机翻页,避免每次都取第1页相同内容 import math # 第一次查询获取总页数 first_params = self._build_form({ "pageNum": 1, "pageSize": 50, "type": "1", "isPlatformShow": "true", "isAdmin": "false", }, cfg) if org_id: first_params["orgId"] = org_id total_pages = 1 try: async with httpx.AsyncClient(timeout=10) as _c: _r = await _c.get( f"{biz}/business/member/square/list", headers=self._bearer(sess["token"]), params=first_params ) _d = _r.json() if _d.get("code") in [0, 200]: total_size = _d.get("data", {}).get("totalSize", 0) total_pages = max(1, math.ceil(total_size / 50)) except Exception: pass # 随机选择一页 import random as _random rand_page = _random.randint(1, min(total_pages, 10)) # 最多取前10页随机 params = self._build_form({ "pageNum": rand_page, "pageSize": 50, "type": "1", "isPlatformShow": "true", "isAdmin": "false", }, cfg) if org_id: params["orgId"] = org_id # 选填,有则按组织过滤 try: async with httpx.AsyncClient(timeout=15) as c: r = await c.get( f"{biz}/business/member/square/list", headers=self._bearer(sess["token"]), params=params ) if r.status_code == 200: d = r.json() if d.get("code") in [0, 200]: nd = d.get("data", {}) items = nd.get("data") or nd.get("list") or nd.get("records") or [] # 过滤本人发布的文章 platform_uid = sess.get("platform_uid", "") if platform_uid: items = [x for x in items if x.get("createUser") != platform_uid] # 过滤已知无效新闻(详情为空或不存在) # 已知静态无效ID(直接过滤,无需 API 校验) INVALID_IDS = { "1965670408480907266","2029092495693975554","1960652956793597953", "1960651987045347330","1960596408620838914","1960596083193180161", "1960595664341594113","1952296583257133058", } | news_service._invalid_ids_cache # 合并运行时缓存 items = [x for x in items if (x.get("recordId") or x.get("id")) not in INVALID_IDS] logger.info(f"[广场新闻] {user.account} 获取到 {len(items)} 条(已过滤本人+无效文章)") import random as _rand from datetime import datetime as _dt import math as _math # ── 热度 + 新鲜度加权选取 ───────────────────────────────── # 规则:真实用户互动量越大 + 发布时间越新 → 虚拟用户越倾向互动 def _hot_weight(a): comment_n = int(a.get("commentNum") or 0) praise_n = int(a.get("praiseNum") or 0) read_n = int(a.get("readNum") or 0) # 热度分:评论权重3倍,点赞2倍,阅读1倍 hot_score = comment_n * 3 + praise_n * 2 + read_n # 新鲜度衰减:发布时间越近权重越高(72小时内为新鲜文章) freshness = 1.0 pub_time_str = a.get("publishTime") or a.get("createTime") or "" if pub_time_str: try: for fmt in ["%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"]: try: pub_dt = _dt.strptime(pub_time_str[:19], fmt) hours_old = (_dt.now() - pub_dt).total_seconds() / 3600 # 72小时内新鲜文章:新鲜度加成最高3倍 freshness = max(1.0, 3.0 - hours_old / 36.0) break except Exception: continue except Exception: pass # 综合权重:热度 * 新鲜度(基础权重最少为1) return max(1.0, (hot_score + 1) * freshness) if len(items) <= count: return items # 加权随机采样 weights = [_hot_weight(a) for a in items] selected = [] pool = list(range(len(items))) w_pool = list(weights) for _ in range(min(count, len(items))): if not pool: break chosen_idx = _rand.choices(pool, weights=w_pool, k=1)[0] selected.append(items[chosen_idx]) i = pool.index(chosen_idx) pool.pop(i) w_pool.pop(i) # ── 文章有效性校验(过滤不可开、字数<100的文章)───── valid = [] for _a in selected: _aid = str(_a.get("recordId") or _a.get("id", "")) if await self.validate_article(db, user, _aid): valid.append(_a) # 若校验失败,尝试从候选池补充 # 若有效文章不够,从剩余候选中按权重补充 if len(valid) < count and len(pool) > 0: remaining = [items[i] for i in pool] _w2 = [weights[pool.index(i)] if i in pool else 1 for i in range(len(remaining))] import random as _r2 _r2.shuffle(remaining) for _a2 in remaining: if len(valid) >= count: break _aid2 = str(_a2.get("recordId") or _a2.get("id", "")) if await self.validate_article(db, user, _aid2): valid.append(_a2) if not valid: logger.warning(f"[广场新闻] {user.account} 校验后无可用文章") return valid logger.warning(f"[广场新闻] {user.account} code={d.get('code')} msg={d.get('message')}") except Exception as e: logger.error(f"[广场新闻] {user.account}: {e}") return [] async def read_news(self, db, user, news_id: str) -> bool: sess = await get_session(user.id) if not sess: return False biz = await self._biz_url(db) cfg = await self._client(db) try: async with httpx.AsyncClient(timeout=10) as c: r = await c.patch( f"{biz}/news/read/{news_id}", headers=self._bearer(sess["token"]), data=self._build_form({}, cfg), ) return r.status_code == 200 except Exception: return False async def post_comment(self, db, user, news_id, news_title, content, news_author_id="", org_id="") -> tuple[bool, str]: sess = await get_session(user.id) if not sess: return False, "未登录" biz = await self._biz_url(db) cfg = await self._client(db) uid = sess.get("platform_uid", "") # org_id 优先取文章自带的(从广场数据获取),否则取 session/配置 final_org_id = org_id or sess.get("org_id") or cfg.get("orgId") or "" body = { "module": "news", "topicId": news_id, "title": news_title, "content": content, "orgId": final_org_id, "toUserId": news_author_id or uid, "userId": uid, "userName": user.nickname, "avatar": user.avatar_url or "", } return await self._json_post(f"{biz}/message/comment", self._bearer(sess["token"]), body) async def post_reply(self, db, user, news_id, comment_id, content) -> tuple[bool, str]: sess = await get_session(user.id) if not sess: return False, "未登录" biz = await self._biz_url(db) uid = sess.get("platform_uid", "") body = { "module": "news", "topicId": news_id, "commentId": comment_id, "commentUserId": uid, "content": content, "fromUserName": user.nickname, "avatar": user.avatar_url or "", } return await self._json_post(f"{biz}/message/comment/reply", self._bearer(sess["token"]), body) async def get_comments(self, db, user, news_id) -> list: sess = await get_session(user.id) if not sess: return [] biz = await self._biz_url(db) cfg = await self._client(db) try: params = self._build_form({"module": "news", "topicId": news_id, "pageNum": 1, "pageSize": 20}, cfg) async with httpx.AsyncClient(timeout=10) as c: r = await c.get(f"{biz}/message/comment", headers=self._bearer(sess["token"]), params=params) if r.status_code == 200: return r.json().get("data", {}).get("data") or [] except Exception: pass return [] async def like_news(self, db, user, news_id, org_id="", to_user_id="", title="") -> tuple[bool, str]: sess = await get_session(user.id) if not sess: return False, "未登录" biz = await self._biz_url(db) uid = sess.get("platform_uid", "") body = { "module": "news", "topicId": news_id, "userId": uid, "toUserId": to_user_id or uid, "orgId": org_id or sess.get("org_id", "") or "", "title": title, } return await self._json_post(f"{biz}/message/praise", self._bearer(sess["token"]), body) async def collect_news(self, db, user, news_id, org_id="", to_user_id="", title="") -> tuple[bool, str]: """收藏新闻:复用点赞接口(平台收藏=点赞同一接口)""" return await self.like_news(db, user, news_id, org_id=org_id, to_user_id=to_user_id, title=title) async def forward_news(self, db, user, news_id) -> tuple[bool, str]: sess = await get_session(user.id) if not sess: return False, "未登录" biz = await self._biz_url(db) cfg = await self._client(db) org_id = sess.get("org_id") or cfg.get("orgId") or "1" headers = self._bearer(sess["token"]) try: async with httpx.AsyncClient(timeout=8) as c: await c.get(f"{biz}/news/share/wechat/{news_id}", headers=headers, params=self._build_form({}, cfg)) except Exception: pass try: async with httpx.AsyncClient(timeout=15) as c: r = await c.post( f"{biz}/points/forward/news/{org_id}", headers=headers, data=self._build_form({}, cfg), ) return self._ok(r) except Exception as e: return False, str(e) @staticmethod def _bearer(token: str) -> dict: return {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} @staticmethod def _ok(resp: httpx.Response) -> tuple[bool, str]: if resp.status_code in [200, 201]: try: d = resp.json() if d.get("code") in [0, 200]: return True, "" return False, d.get("message") or "业务失败" except Exception: return True, "" return False, f"HTTP {resp.status_code}" async def _json_post(self, url, headers, body) -> tuple[bool, str]: try: async with httpx.AsyncClient(timeout=15) as c: r = await c.post(url, json=body, headers=headers) return self._ok(r) except Exception as e: return False, str(e) async def _write_login_log(self, db, user, action, session_id=None, error_msg=None): try: log = LoginLog( user_id=user.id, user_account=user.account, action=action, session_id=session_id, error_msg=error_msg, ) db.add(log) await db.commit() except Exception: pass # ─── 取消互动 ──────────────────────────────────────────────────── async def cancel_like(self, db, user, news_id: str, org_id: str = "", to_user_id: str = "", title: str = "") -> tuple[bool, str]: """DELETE /message/praise/cancel 取消点赞""" sess = await get_session(user.id) if not sess: return False, "未登录" biz = await self._biz_url(db) uid = sess.get("platform_uid", "") body = { "module": "news", "topicId": news_id, "userId": uid, "toUserId": to_user_id or uid, "orgId": org_id or sess.get("org_id", "") or "", "title": title, } try: async with httpx.AsyncClient(timeout=10) as c: r = await c.delete( f"{biz}/message/praise/cancel", json=body, headers=self._bearer(sess["token"]) ) d = r.json() if d.get("code") in [0, 200]: return True, "" return False, d.get("message", "取消点赞失败") except Exception as e: return False, str(e) async def cancel_comment(self, db, user, news_id: str, comment_id: str) -> tuple[bool, str]: """DELETE /message/comment/{topicId}/{id} 删除评论""" sess = await get_session(user.id) if not sess: return False, "未登录" biz = await self._biz_url(db) cfg = await self._client(db) # 签名参数放 formData,路径里是 topicId 和 comment_id params = self._build_form({}, cfg) try: async with httpx.AsyncClient(timeout=10) as c: r = await c.delete( f"{biz}/message/comment/{news_id}/{comment_id}", headers=self._bearer(sess["token"]), params=params ) d = r.json() if d.get("code") in [0, 200]: return True, "" return False, d.get("message", "删除评论失败") except Exception as e: return False, str(e) async def cancel_collect(self, db, user, news_id: str, org_id: str = "", to_user_id: str = "", title: str = "") -> tuple[bool, str]: """取消收藏(复用取消点赞接口)""" return await self.cancel_like(db, user, news_id, org_id=org_id, to_user_id=to_user_id, title=title) # ─── 修改目标系统用户信息 ───────────────────────────────────── async def update_user_profile( self, db: AsyncSession, user: VirtualUser, nick_name: str = None, real_name: str = None, sex: int = None, avatar: str = None, description: str = None, email: str = None, ) -> tuple[bool, str]: """ 调用 POST /usercenter/user/change/userInfo 修改用户信息 支持:昵称、真实姓名、性别、头像、简介、邮箱 同时同步更新本地数据库 """ sess = await get_session(user.id) if not sess: return False, "用户未登录,请先登录" auth = await self._auth_url(db) cfg = await self._client(db) token = sess.get("token", "") platform_uid = sess.get("platform_uid", "") if not platform_uid: return False, "缺少平台用户ID,请重新登录" # 构建请求体(只传有值的字段) # 构建请求体,确保至少有 nickName 字段(平台 SQL 要求 SET 子句不为空) body = { "id": platform_uid, "nickName": nick_name if nick_name is not None else (user.nickname or ""), "realName": real_name if real_name is not None else (user.real_name or ""), "sex": sex if sex is not None else (user.sex or 0), } if avatar is not None: body["avatar"] = avatar if description is not None: body["description"] = description if email is not None: body["email"] = email # 使用 PATCH /v2/users/current 接口(支持修改昵称) headers = dict(self._bearer(token)) headers["Content-Type"] = "application/json" try: async with httpx.AsyncClient(timeout=15) as c: r = await c.patch( f"{auth}/v2/users/current", json=body, headers=headers, ) d = r.json() if d.get("code") in [0, 200]: # 同步到本地数据库 local_vals = {} if nick_name is not None: local_vals["nickname"] = nick_name if real_name is not None: local_vals["real_name"] = real_name if sex is not None: local_vals["sex"] = sex if avatar is not None: local_vals["avatar_url"] = avatar if local_vals: from sqlalchemy import update await db.execute(update(VirtualUser).where( VirtualUser.id == user.id).values(**local_vals)) await db.commit() logger.info(f"✅ 用户 {user.account} 信息已同步到目标系统") return True, "" err = d.get("message") or f"code={d.get('code')}" logger.warning(f"[修改用户信息] {user.account} 失败: {err} body={r.text[:200]}") return False, err except Exception as e: logger.warning(f"[修改用户信息] {user.account} 异常: {e}") return False, str(e) async def upload_avatar( self, db: AsyncSession, user: VirtualUser, file_bytes: bytes, filename: str ) -> tuple[bool, str]: """ 上传头像图片到平台 filecenter,返回图片 URL POST /filecenter/fileUpload (multipart/form-data) """ sess = await get_session(user.id) if not sess: return False, "用户未登录" cfg = await self._client(db) token = sess.get("token", "") # filecenter 服务地址 biz_base = await self._biz_url(db) # filecenter 与 huihuibusiness 同网关,替换服务名 filecenter_url = biz_base.replace("/huihuibusiness", "/filecenter") sign_params = self._build_form({"module": "userInfo", "service": "kccloud"}, cfg) headers = {"Authorization": f"Bearer {token}"} try: import mimetypes mime = mimetypes.guess_type(filename)[0] or "image/jpeg" files = {"file": (filename, file_bytes, mime)} async with httpx.AsyncClient(timeout=30) as c: r = await c.post( f"{filecenter_url}/fileUpload", files=files, data=sign_params, headers=headers, ) d = r.json() if d.get("code") in [0, 200]: url = d.get("data") or d.get("url") or "" if isinstance(url, dict): url = url.get("url") or url.get("path") or "" logger.info(f"✅ 头像上传成功: {url}") return True, url return False, d.get("message") or "上传失败" except Exception as e: return False, str(e) news_service = NewsPlatformService()