Files
huihuiSquare/backend/app/api/endpoints/logs.py
stefanfeng 0cfc9bf9c8 feat: AI虚拟用户新闻互动系统 v1.3.0 初始提交
- 虚拟用户管理(昵称/头像/性别/简介/邮箱同步到目标平台)
- AI互动调度(点赞/收藏/评论/转发)
- 日志时间改为北京时间
- 评论达上限后继续执行点赞收藏转发
- 一键登出全部功能
- 浅色主题UI
2026-03-31 10:20:57 +08:00

84 lines
3.1 KiB
Python

"""日志管理接口"""
import os
from typing import Optional
from fastapi import APIRouter, Depends, Query, HTTPException
from fastapi.responses import FileResponse
from sqlalchemy import select, func, and_
from app.core.database import get_db
from app.schemas import ApiResponse
from app.models import LoginLog
from app.core.config import settings
router = APIRouter()
@router.get("/login")
async def get_login_logs(
page: int = Query(default=1, ge=1),
page_size: int = Query(default=50, ge=1, le=200),
user_id: Optional[int] = None,
action: Optional[str] = None,
db=Depends(get_db)
):
query = select(LoginLog)
conditions = []
if user_id:
conditions.append(LoginLog.user_id == user_id)
if action:
conditions.append(LoginLog.action == action)
if conditions:
query = query.where(and_(*conditions))
total = (await db.execute(select(func.count()).select_from(query.subquery()))).scalar()
query = query.order_by(LoginLog.created_at.desc()).offset((page - 1) * page_size).limit(page_size)
result = await db.execute(query)
logs = result.scalars().all()
items = [{
"id": l.id, "user_id": l.user_id, "user_account": l.user_account,
"action": l.action, "session_id": l.session_id,
"error_msg": l.error_msg, "created_at": l.created_at.isoformat()
} for l in logs]
return ApiResponse(data={"total": total, "page": page, "page_size": page_size, "items": items})
@router.get("/files")
async def list_log_files():
"""列出日志文件"""
log_dir = settings.LOG_DIR
files = []
if os.path.exists(log_dir):
for fname in sorted(os.listdir(log_dir), reverse=True):
if fname.endswith(".log"):
fpath = os.path.join(log_dir, fname)
size = os.path.getsize(fpath)
files.append({"name": fname, "size": size,
"size_kb": round(size / 1024, 1)})
return ApiResponse(data=files)
@router.get("/files/{filename}/tail")
async def tail_log_file(filename: str, lines: int = Query(default=100, ge=10, le=1000)):
"""读取日志文件末尾"""
# 安全校验
if ".." in filename or "/" in filename:
raise HTTPException(status_code=400, detail="非法文件名")
fpath = os.path.join(settings.LOG_DIR, filename)
if not os.path.exists(fpath):
raise HTTPException(status_code=404, detail="文件不存在")
with open(fpath, "r", encoding="utf-8", errors="replace") as f:
all_lines = f.readlines()
tail = all_lines[-lines:]
return ApiResponse(data={"filename": filename, "lines": tail, "total_lines": len(all_lines)})
@router.get("/files/{filename}/download")
async def download_log_file(filename: str):
"""下载日志文件"""
if ".." in filename or "/" in filename:
raise HTTPException(status_code=400, detail="非法文件名")
fpath = os.path.join(settings.LOG_DIR, filename)
if not os.path.exists(fpath):
raise HTTPException(status_code=404, detail="文件不存在")
return FileResponse(fpath, filename=filename, media_type="text/plain")