"""日志管理接口""" import os from typing import Optional from fastapi import APIRouter, Depends, Query, HTTPException from fastapi.responses import FileResponse from sqlalchemy import select, func, and_ from app.core.database import get_db from app.schemas import ApiResponse from app.models import LoginLog from app.core.config import settings router = APIRouter() @router.get("/login") async def get_login_logs( page: int = Query(default=1, ge=1), page_size: int = Query(default=50, ge=1, le=200), user_id: Optional[int] = None, action: Optional[str] = None, db=Depends(get_db) ): query = select(LoginLog) conditions = [] if user_id: conditions.append(LoginLog.user_id == user_id) if action: conditions.append(LoginLog.action == action) if conditions: query = query.where(and_(*conditions)) total = (await db.execute(select(func.count()).select_from(query.subquery()))).scalar() query = query.order_by(LoginLog.created_at.desc()).offset((page - 1) * page_size).limit(page_size) result = await db.execute(query) logs = result.scalars().all() items = [{ "id": l.id, "user_id": l.user_id, "user_account": l.user_account, "action": l.action, "session_id": l.session_id, "error_msg": l.error_msg, "created_at": l.created_at.isoformat() } for l in logs] return ApiResponse(data={"total": total, "page": page, "page_size": page_size, "items": items}) @router.get("/files") async def list_log_files(): """列出日志文件""" log_dir = settings.LOG_DIR files = [] if os.path.exists(log_dir): for fname in sorted(os.listdir(log_dir), reverse=True): if fname.endswith(".log"): fpath = os.path.join(log_dir, fname) size = os.path.getsize(fpath) files.append({"name": fname, "size": size, "size_kb": round(size / 1024, 1)}) return ApiResponse(data=files) @router.get("/files/{filename}/tail") async def tail_log_file(filename: str, lines: int = Query(default=100, ge=10, le=1000)): """读取日志文件末尾""" # 安全校验 if ".." in filename or "/" in filename: raise HTTPException(status_code=400, detail="非法文件名") fpath = os.path.join(settings.LOG_DIR, filename) if not os.path.exists(fpath): raise HTTPException(status_code=404, detail="文件不存在") with open(fpath, "r", encoding="utf-8", errors="replace") as f: all_lines = f.readlines() tail = all_lines[-lines:] return ApiResponse(data={"filename": filename, "lines": tail, "total_lines": len(all_lines)}) @router.get("/files/{filename}/download") async def download_log_file(filename: str): """下载日志文件""" if ".." in filename or "/" in filename: raise HTTPException(status_code=400, detail="非法文件名") fpath = os.path.join(settings.LOG_DIR, filename) if not os.path.exists(fpath): raise HTTPException(status_code=404, detail="文件不存在") return FileResponse(fpath, filename=filename, media_type="text/plain")