| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148 |
- # -*- coding: utf-8 -*-
- import time
- import json
- from typing import Any, Callable, Coroutine
- from fastapi import Request, Response
- from fastapi.routing import APIRoute
- from user_agents import parse
- from app.core.database import async_db_session
- from app.config.setting import settings
- from app.utils.ip_local_util import IpLocalUtil
- from app.api.v1.module_system.auth.schema import AuthSchema
- from app.api.v1.module_system.log.schema import OperationLogCreateSchema
- from app.api.v1.module_system.log.service import OperationLogService
- """
- 在 FastAPI 中,route_class 参数用于自定义路由的行为。
- 通过设置 route_class,你可以定义一个自定义的路由类,从而在每个路由处理之前或之后执行特定的操作。
- 这对于日志记录、权限验证、性能监控等场景非常有用。
- """
- class OperationLogRoute(APIRoute):
- """操作日志路由装饰器"""
-
- def get_route_handler(self) -> Callable[[Request], Coroutine[Any, Any, Response]]:
- """
- 自定义路由处理程序,在每个路由处理之前或之后执行特定的操作。
- 参数:
- - request (Request): FastAPI请求对象。
- 返回:
- - Response: FastAPI响应对象。
- """
- original_route_handler = super().get_route_handler()
-
- async def custom_route_handler(request: Request) -> Response:
- """
- 自定义路由处理程序,在每个路由处理之前或之后执行特定的操作。
- 参数:
- - request (Request): FastAPI请求对象。
- 描述:
- - 该方法在每个路由处理之前被调用,用于记录操作日志。
- 返回:
- - Response: FastAPI响应对象。
- """
- start_time = time.time()
- # 请求前的处理
- response: Response = await original_route_handler(request)
-
- # 请求后的处理
- if not settings.OPERATION_LOG_RECORD:
- return response
- if request.method not in settings.OPERATION_RECORD_METHOD:
- return response
- route: APIRoute = request.scope.get("route", None)
- if route.name in settings.IGNORE_OPERATION_FUNCTION:
- return response
-
- user_agent = parse(request.headers.get("user-agent"))
- payload = b"{}"
- req_content_type = request.headers.get("Content-Type", "")
-
- if req_content_type and (
- req_content_type.startswith('multipart/form-data') or req_content_type.startswith('application/x-www-form-urlencoded')
- ):
- form_data = await request.form()
- oper_param = '\n'.join([f'{k}: {v}' for k, v in form_data.items()])
- payload = oper_param # 直接使用字符串格式的参数
- else:
- payload = await request.body()
- path_params = request.path_params
- oper_param = {}
-
- # 处理请求体数据
- if payload:
- try:
- oper_param['body'] = json.loads(payload.decode())
- except (json.JSONDecodeError, UnicodeDecodeError):
- oper_param['body'] = payload.decode('utf-8', errors='ignore')
-
- # 处理路径参数
- if path_params:
- oper_param['path_params'] = dict(path_params)
-
- payload = json.dumps(oper_param, ensure_ascii=False)
- # 日志表请求参数字段长度最大为2000,因此在此处判断长度
- if len(payload) > 2000:
- payload = '请求参数过长'
-
- response_data = response.body if "application/json" in response.headers.get("Content-Type", "") else b"{}"
- process_time = f"{(time.time() - start_time):.2f}s"
- # 获取当前用户ID,如果是登录接口则为空
- log_type = 1 # 1:登录日志 2:操作日志
- current_user_id = None
- # 优化:只在操作日志场景下获取current_user_id
- if "user_id" in request.scope:
- current_user_id = request.scope.get("user_id")
- log_type = 2
-
- request_ip = None
- x_forwarded_for = request.headers.get('X-Forwarded-For')
- if x_forwarded_for:
- # 取第一个 IP 地址,通常为客户端真实 IP
- request_ip = x_forwarded_for.split(',')[0].strip()
- else:
- # 若没有 X-Forwarded-For 头,则使用 request.client.host
- if request.client:
- request_ip = request.client.host
-
- login_location = await IpLocalUtil.get_ip_location(request_ip) if request_ip else None
-
- # 判断请求是否来自api文档
- referer = request.headers.get('referer')
- request_from_swagger = referer and referer.endswith('docs')
- request_from_redoc = referer and referer.endswith('redoc')
-
- if request_from_swagger or request_from_redoc:
- # 如果请求来自api文档,则不记录日志
- pass
- else:
- async with async_db_session() as session:
- async with session.begin():
- auth = AuthSchema(db=session)
- await OperationLogService.create_log_service(data=OperationLogCreateSchema(
- type = log_type,
- request_path = request.url.path,
- request_method = request.method,
- request_payload = payload,
- request_ip = request_ip,
- login_location=login_location,
- request_os = user_agent.os.family,
- request_browser = user_agent.browser.family,
- response_code = response.status_code,
- response_json = response_data.decode() if isinstance(response_data, (bytes, bytearray)) else str(response_data),
- process_time = process_time,
- description = route.summary,
- created_id = current_user_id,
- updated_id = current_user_id,
- ), auth = auth)
-
- return response
- return custom_route_handler
|