router_class.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. # -*- coding: utf-8 -*-
  2. import time
  3. import json
  4. from typing import Any, Callable, Coroutine
  5. from fastapi import Request, Response
  6. from fastapi.routing import APIRoute
  7. from user_agents import parse
  8. from app.core.database import async_db_session
  9. from app.config.setting import settings
  10. from app.utils.ip_local_util import IpLocalUtil
  11. from app.api.v1.module_system.auth.schema import AuthSchema
  12. from app.api.v1.module_system.log.schema import OperationLogCreateSchema
  13. from app.api.v1.module_system.log.service import OperationLogService
  14. """
  15. 在 FastAPI 中,route_class 参数用于自定义路由的行为。
  16. 通过设置 route_class,你可以定义一个自定义的路由类,从而在每个路由处理之前或之后执行特定的操作。
  17. 这对于日志记录、权限验证、性能监控等场景非常有用。
  18. """
  19. class OperationLogRoute(APIRoute):
  20. """操作日志路由装饰器"""
  21. def get_route_handler(self) -> Callable[[Request], Coroutine[Any, Any, Response]]:
  22. """
  23. 自定义路由处理程序,在每个路由处理之前或之后执行特定的操作。
  24. 参数:
  25. - request (Request): FastAPI请求对象。
  26. 返回:
  27. - Response: FastAPI响应对象。
  28. """
  29. original_route_handler = super().get_route_handler()
  30. async def custom_route_handler(request: Request) -> Response:
  31. """
  32. 自定义路由处理程序,在每个路由处理之前或之后执行特定的操作。
  33. 参数:
  34. - request (Request): FastAPI请求对象。
  35. 描述:
  36. - 该方法在每个路由处理之前被调用,用于记录操作日志。
  37. 返回:
  38. - Response: FastAPI响应对象。
  39. """
  40. start_time = time.time()
  41. # 请求前的处理
  42. response: Response = await original_route_handler(request)
  43. # 请求后的处理
  44. if not settings.OPERATION_LOG_RECORD:
  45. return response
  46. if request.method not in settings.OPERATION_RECORD_METHOD:
  47. return response
  48. route: APIRoute = request.scope.get("route", None)
  49. if route.name in settings.IGNORE_OPERATION_FUNCTION:
  50. return response
  51. user_agent = parse(request.headers.get("user-agent"))
  52. payload = b"{}"
  53. req_content_type = request.headers.get("Content-Type", "")
  54. if req_content_type and (
  55. req_content_type.startswith('multipart/form-data') or req_content_type.startswith('application/x-www-form-urlencoded')
  56. ):
  57. form_data = await request.form()
  58. oper_param = '\n'.join([f'{k}: {v}' for k, v in form_data.items()])
  59. payload = oper_param # 直接使用字符串格式的参数
  60. else:
  61. payload = await request.body()
  62. path_params = request.path_params
  63. oper_param = {}
  64. # 处理请求体数据
  65. if payload:
  66. try:
  67. oper_param['body'] = json.loads(payload.decode())
  68. except (json.JSONDecodeError, UnicodeDecodeError):
  69. oper_param['body'] = payload.decode('utf-8', errors='ignore')
  70. # 处理路径参数
  71. if path_params:
  72. oper_param['path_params'] = dict(path_params)
  73. payload = json.dumps(oper_param, ensure_ascii=False)
  74. # 日志表请求参数字段长度最大为2000,因此在此处判断长度
  75. if len(payload) > 2000:
  76. payload = '请求参数过长'
  77. response_data = response.body if "application/json" in response.headers.get("Content-Type", "") else b"{}"
  78. process_time = f"{(time.time() - start_time):.2f}s"
  79. # 获取当前用户ID,如果是登录接口则为空
  80. log_type = 1 # 1:登录日志 2:操作日志
  81. current_user_id = None
  82. # 优化:只在操作日志场景下获取current_user_id
  83. if "user_id" in request.scope:
  84. current_user_id = request.scope.get("user_id")
  85. log_type = 2
  86. request_ip = None
  87. x_forwarded_for = request.headers.get('X-Forwarded-For')
  88. if x_forwarded_for:
  89. # 取第一个 IP 地址,通常为客户端真实 IP
  90. request_ip = x_forwarded_for.split(',')[0].strip()
  91. else:
  92. # 若没有 X-Forwarded-For 头,则使用 request.client.host
  93. if request.client:
  94. request_ip = request.client.host
  95. login_location = await IpLocalUtil.get_ip_location(request_ip) if request_ip else None
  96. # 判断请求是否来自api文档
  97. referer = request.headers.get('referer')
  98. request_from_swagger = referer and referer.endswith('docs')
  99. request_from_redoc = referer and referer.endswith('redoc')
  100. if request_from_swagger or request_from_redoc:
  101. # 如果请求来自api文档,则不记录日志
  102. pass
  103. else:
  104. async with async_db_session() as session:
  105. async with session.begin():
  106. auth = AuthSchema(db=session)
  107. await OperationLogService.create_log_service(data=OperationLogCreateSchema(
  108. type = log_type,
  109. request_path = request.url.path,
  110. request_method = request.method,
  111. request_payload = payload,
  112. request_ip = request_ip,
  113. login_location=login_location,
  114. request_os = user_agent.os.family,
  115. request_browser = user_agent.browser.family,
  116. response_code = response.status_code,
  117. response_json = response_data.decode() if isinstance(response_data, (bytes, bytearray)) else str(response_data),
  118. process_time = process_time,
  119. description = route.summary,
  120. created_id = current_user_id,
  121. updated_id = current_user_id,
  122. ), auth = auth)
  123. return response
  124. return custom_route_handler