init_app.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. # -*- coding: utf-8 -*-
  2. import json
  3. from starlette.responses import HTMLResponse
  4. from typing import Any, AsyncGenerator, Callable
  5. from fastapi import FastAPI, Request, Response, WebSocket
  6. from fastapi.staticfiles import StaticFiles
  7. from fastapi.concurrency import asynccontextmanager
  8. from fastapi.openapi.docs import (
  9. get_redoc_html,
  10. get_swagger_ui_html,
  11. get_swagger_ui_oauth2_redirect_html
  12. )
  13. from fastapi_limiter import FastAPILimiter
  14. from math import ceil
  15. from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint
  16. from starlette.types import ASGIApp
  17. import time
  18. from app.config.setting import settings
  19. from app.core.logger import log
  20. from app.core.discover import router
  21. from app.core.exceptions import CustomException, handle_exception
  22. from app.utils.common_util import import_module, import_modules_async
  23. from app.scripts.initialize import InitializeData
  24. from app.api.v1.module_application.job.tools.ap_scheduler import SchedulerUtil
  25. from app.api.v1.module_system.params.service import ParamsService
  26. from app.api.v1.module_system.dict.service import DictDataService
  27. # ✅ 核心修复:手动实现Redis限流计数(适配0.1.6版本,确保限流生效)
  28. class CustomLimiterMiddleware(BaseHTTPMiddleware):
  29. """
  30. 手动实现限流逻辑(直接操作Redis,替代RateLimiter的隐式调用)
  31. 核心:按IP+路径生成唯一key,Redis计数,超过阈值触发限流
  32. """
  33. def __init__(
  34. self,
  35. app: ASGIApp,
  36. times: int = 5, # 限流次数
  37. seconds: int = 10, # 限流时间窗口
  38. prefix: str = settings.REQUEST_LIMITER_REDIS_PREFIX or "fastapi-limiter:" # Redis key前缀
  39. ):
  40. super().__init__(app)
  41. self.times = times
  42. self.seconds = seconds
  43. self.prefix = prefix
  44. # 生成唯一限流key(IP + 请求路径)
  45. def _get_limit_key(self, request: Request) -> str:
  46. client_ip = request.client.host or "unknown"
  47. path = request.url.path
  48. return f"{self.prefix}:{client_ip}:{path}"
  49. # 限流触发回调
  50. async def _limit_callback(self, expire: int):
  51. """返回指定格式的429响应"""
  52. expires = ceil(expire / 30) # 动态计算Retry-After值(你示例中的222是占位,实际是expires)
  53. # 构造严格匹配的响应体
  54. response_body = {
  55. "code": -1,
  56. "msg": "请求过于频繁,请稍后重试",
  57. "data": {
  58. "Retry-After": str(expires) # 动态值,替换示例中的222
  59. },
  60. "status_code": 429,
  61. "success": False
  62. }
  63. # 返回Response对象
  64. return Response(
  65. content=json.dumps(response_body, ensure_ascii=False), # 确保中文正常显示
  66. status_code=429, # HTTP状态码
  67. headers={"Retry-After": str(expires)}, # 响应头也保留(可选)
  68. media_type="application/json" # 声明JSON格式
  69. )
  70. async def dispatch(self, request: Request, call_next: RequestResponseEndpoint):
  71. # 1. WebSocket请求 → 跳过限流
  72. if request.scope.get("type") == "websocket":
  73. return await call_next(request)
  74. # 2. HTTP请求 → 执行手动限流逻辑
  75. try:
  76. # 获取Redis连接(fastapi-limiter已初始化)
  77. redis_client = FastAPILimiter.redis
  78. if not redis_client:
  79. log.warning("Redis未初始化,跳过限流")
  80. return await call_next(request)
  81. # 生成限流key
  82. limit_key = self._get_limit_key(request)
  83. # 当前时间戳
  84. now = int(time.time())
  85. # 时间窗口起始(now - seconds)
  86. window_start = now - self.seconds
  87. # ✅ 核心Redis操作(原子计数,避免并发问题)
  88. async with redis_client.pipeline(transaction=True) as pipe:
  89. # 1. 删除时间窗口外的旧计数
  90. await pipe.zremrangebyscore(limit_key, 0, window_start)
  91. # 2. 添加当前请求时间戳到有序集合
  92. await pipe.zadd(limit_key, {now: now})
  93. # 3. 设置key过期时间(避免内存泄漏)
  94. await pipe.expire(limit_key, self.seconds * 2)
  95. # 4. 获取当前窗口内的请求数
  96. await pipe.zcard(limit_key)
  97. # 执行管道
  98. results = await pipe.execute()
  99. # 提取请求数(第四个操作的结果)
  100. request_count = results[3]
  101. # ✅ 判断是否超过限流阈值
  102. if request_count > self.times:
  103. # 获取过期时间,调用回调返回响应
  104. ttl = await redis_client.ttl(limit_key)
  105. # 关键:返回响应,不是 raise
  106. return await self._limit_callback(ttl or self.seconds)
  107. except CustomException:
  108. # 限流触发 → 抛异常(全局处理器捕获)
  109. raise
  110. except Exception as e:
  111. # 非限流异常 → 日志记录,放行请求(避免阻断业务)
  112. log.error(f"限流中间件执行异常: {str(e)}")
  113. # 3. 执行后续处理
  114. response = await call_next(request)
  115. return response
  116. # ✅ 生命周期函数(仅初始化,无中间件操作)
  117. @asynccontextmanager
  118. async def lifespan(app: FastAPI) -> AsyncGenerator[Any, Any]:
  119. try:
  120. # 数据库初始化
  121. await InitializeData().init_db()
  122. log.info(f"✅ {settings.DATABASE_TYPE}数据库初始化完成")
  123. # 全局事件加载
  124. await import_modules_async(modules=settings.EVENT_LIST, desc="全局事件", app=app, status=True)
  125. log.info("✅ 全局事件模块加载完成")
  126. # Redis配置/字典初始化
  127. await ParamsService().init_config_service(redis=app.state.redis)
  128. log.info("✅ Redis系统配置初始化完成")
  129. await DictDataService().init_dict_service(redis=app.state.redis)
  130. log.info("✅ Redis数据字典初始化完成")
  131. # 定时任务初始化
  132. await SchedulerUtil.init_system_scheduler()
  133. scheduler_jobs_count = len(SchedulerUtil.get_all_jobs())
  134. scheduler_status = SchedulerUtil.get_job_status()
  135. log.info(f"✅ 定时任务调度器初始化完成 ({scheduler_jobs_count} 个任务)")
  136. # ✅ 初始化fastapi-limiter(仅获取Redis连接)
  137. await FastAPILimiter.init(
  138. redis=app.state.redis,
  139. prefix=settings.REQUEST_LIMITER_REDIS_PREFIX,
  140. )
  141. log.info("✅ 请求限制器初始化完成")
  142. # 启动信息面板
  143. from app.utils.console import run as console_run
  144. from app.common.enums import EnvironmentEnum
  145. console_run(
  146. host=settings.SERVER_HOST,
  147. port=settings.SERVER_PORT,
  148. reload=True if settings.ENVIRONMENT == EnvironmentEnum.DEV else False,
  149. redis_ready=True,
  150. scheduler_jobs=scheduler_jobs_count,
  151. scheduler_status=scheduler_status,
  152. )
  153. except Exception as e:
  154. log.error(f"❌ 应用初始化失败: {str(e)}")
  155. raise
  156. yield
  157. # 关闭逻辑
  158. try:
  159. await import_modules_async(modules=settings.EVENT_LIST, desc="全局事件", app=app, status=False)
  160. log.info("✅ 全局事件模块卸载完成")
  161. await SchedulerUtil.close_system_scheduler()
  162. log.info("✅ 定时任务调度器已关闭")
  163. await FastAPILimiter.close()
  164. log.info("✅ 请求限制器已关闭")
  165. except Exception as e:
  166. log.error(f"❌ 应用关闭过程中发生错误: {str(e)}")
  167. # ✅ 中间件注册(添加自定义限流中间件)
  168. def register_middlewares(app: FastAPI) -> None:
  169. # 1. 原有中间件
  170. for middleware in settings.MIDDLEWARE_LIST[::-1]:
  171. if not middleware:
  172. continue
  173. middleware = import_module(middleware, desc="中间件")
  174. app.add_middleware(middleware)
  175. # 2. 限流中间件(核心:5次/10秒)
  176. app.add_middleware(
  177. CustomLimiterMiddleware,
  178. times=5,
  179. seconds=10,
  180. prefix=settings.REQUEST_LIMITER_REDIS_PREFIX or "fastapi-limiter:"
  181. )
  182. log.info("✅ 限流中间件注册完成")
  183. # ✅ 其他函数(不变)
  184. def register_exceptions(app: FastAPI) -> None:
  185. handle_exception(app)
  186. def register_routers(app: FastAPI) -> None:
  187. app.include_router(router=router)
  188. def register_files(app: FastAPI) -> None:
  189. if settings.STATIC_ENABLE:
  190. settings.STATIC_ROOT.mkdir(parents=True, exist_ok=True)
  191. app.mount(path=settings.STATIC_URL, app=StaticFiles(directory=settings.STATIC_ROOT), name=settings.STATIC_DIR)
  192. def reset_api_docs(app: FastAPI) -> None:
  193. @app.get(settings.DOCS_URL, include_in_schema=False)
  194. async def custom_swagger_ui_html() -> HTMLResponse:
  195. return get_swagger_ui_html(
  196. openapi_url=str(app.root_path) + str(app.openapi_url),
  197. title=app.title + " - Swagger UI",
  198. oauth2_redirect_url=app.swagger_ui_oauth2_redirect_url,
  199. swagger_js_url=settings.SWAGGER_JS_URL,
  200. swagger_css_url=settings.SWAGGER_CSS_URL,
  201. swagger_favicon_url=settings.FAVICON_URL,
  202. )
  203. @app.get(str(app.swagger_ui_oauth2_redirect_url), include_in_schema=False)
  204. async def swagger_ui_redirect():
  205. return get_swagger_ui_oauth2_redirect_html()
  206. @app.get(settings.REDOC_URL, include_in_schema=False)
  207. async def custom_redoc_html():
  208. return get_redoc_html(
  209. openapi_url=str(app.root_path) + str(app.openapi_url),
  210. title=app.title + " - ReDoc",
  211. redoc_js_url=settings.REDOC_JS_URL,
  212. redoc_favicon_url=settings.FAVICON_URL,
  213. )