| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250 |
- # -*- coding: utf-8 -*-
- import json
- from starlette.responses import HTMLResponse
- from typing import Any, AsyncGenerator, Callable
- from fastapi import FastAPI, Request, Response, WebSocket
- from fastapi.staticfiles import StaticFiles
- from fastapi.concurrency import asynccontextmanager
- from fastapi.openapi.docs import (
- get_redoc_html,
- get_swagger_ui_html,
- get_swagger_ui_oauth2_redirect_html
- )
- from fastapi_limiter import FastAPILimiter
- from math import ceil
- from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint
- from starlette.types import ASGIApp
- import time
- from app.config.setting import settings
- from app.core.logger import log
- from app.core.discover import router
- from app.core.exceptions import CustomException, handle_exception
- from app.utils.common_util import import_module, import_modules_async
- from app.scripts.initialize import InitializeData
- from app.api.v1.module_application.job.tools.ap_scheduler import SchedulerUtil
- from app.api.v1.module_system.params.service import ParamsService
- from app.api.v1.module_system.dict.service import DictDataService
- # ✅ 核心修复:手动实现Redis限流计数(适配0.1.6版本,确保限流生效)
- class CustomLimiterMiddleware(BaseHTTPMiddleware):
- """
- 手动实现限流逻辑(直接操作Redis,替代RateLimiter的隐式调用)
- 核心:按IP+路径生成唯一key,Redis计数,超过阈值触发限流
- """
- def __init__(
- self,
- app: ASGIApp,
- times: int = 5, # 限流次数
- seconds: int = 10, # 限流时间窗口
- prefix: str = settings.REQUEST_LIMITER_REDIS_PREFIX or "fastapi-limiter:" # Redis key前缀
- ):
- super().__init__(app)
- self.times = times
- self.seconds = seconds
- self.prefix = prefix
- # 生成唯一限流key(IP + 请求路径)
- def _get_limit_key(self, request: Request) -> str:
- client_ip = request.client.host or "unknown"
- path = request.url.path
- return f"{self.prefix}:{client_ip}:{path}"
- # 限流触发回调
- async def _limit_callback(self, expire: int):
- """返回指定格式的429响应"""
- expires = ceil(expire / 30) # 动态计算Retry-After值(你示例中的222是占位,实际是expires)
- # 构造严格匹配的响应体
- response_body = {
- "code": -1,
- "msg": "请求过于频繁,请稍后重试",
- "data": {
- "Retry-After": str(expires) # 动态值,替换示例中的222
- },
- "status_code": 429,
- "success": False
- }
- # 返回Response对象
- return Response(
- content=json.dumps(response_body, ensure_ascii=False), # 确保中文正常显示
- status_code=429, # HTTP状态码
- headers={"Retry-After": str(expires)}, # 响应头也保留(可选)
- media_type="application/json" # 声明JSON格式
- )
- async def dispatch(self, request: Request, call_next: RequestResponseEndpoint):
- # 1. WebSocket请求 → 跳过限流
- if request.scope.get("type") == "websocket":
- return await call_next(request)
- # 2. HTTP请求 → 执行手动限流逻辑
- try:
- # 获取Redis连接(fastapi-limiter已初始化)
- redis_client = FastAPILimiter.redis
- if not redis_client:
- log.warning("Redis未初始化,跳过限流")
- return await call_next(request)
- # 生成限流key
- limit_key = self._get_limit_key(request)
- # 当前时间戳
- now = int(time.time())
- # 时间窗口起始(now - seconds)
- window_start = now - self.seconds
- # ✅ 核心Redis操作(原子计数,避免并发问题)
- async with redis_client.pipeline(transaction=True) as pipe:
- # 1. 删除时间窗口外的旧计数
- await pipe.zremrangebyscore(limit_key, 0, window_start)
- # 2. 添加当前请求时间戳到有序集合
- await pipe.zadd(limit_key, {now: now})
- # 3. 设置key过期时间(避免内存泄漏)
- await pipe.expire(limit_key, self.seconds * 2)
- # 4. 获取当前窗口内的请求数
- await pipe.zcard(limit_key)
- # 执行管道
- results = await pipe.execute()
- # 提取请求数(第四个操作的结果)
- request_count = results[3]
- # ✅ 判断是否超过限流阈值
- if request_count > self.times:
- # 获取过期时间,调用回调返回响应
- ttl = await redis_client.ttl(limit_key)
- # 关键:返回响应,不是 raise
- return await self._limit_callback(ttl or self.seconds)
- except CustomException:
- # 限流触发 → 抛异常(全局处理器捕获)
- raise
- except Exception as e:
- # 非限流异常 → 日志记录,放行请求(避免阻断业务)
- log.error(f"限流中间件执行异常: {str(e)}")
- # 3. 执行后续处理
- response = await call_next(request)
- return response
- # ✅ 生命周期函数(仅初始化,无中间件操作)
- @asynccontextmanager
- async def lifespan(app: FastAPI) -> AsyncGenerator[Any, Any]:
- try:
- # 数据库初始化
- await InitializeData().init_db()
- log.info(f"✅ {settings.DATABASE_TYPE}数据库初始化完成")
- # 全局事件加载
- await import_modules_async(modules=settings.EVENT_LIST, desc="全局事件", app=app, status=True)
- log.info("✅ 全局事件模块加载完成")
- # Redis配置/字典初始化
- await ParamsService().init_config_service(redis=app.state.redis)
- log.info("✅ Redis系统配置初始化完成")
- await DictDataService().init_dict_service(redis=app.state.redis)
- log.info("✅ Redis数据字典初始化完成")
- # 定时任务初始化
- await SchedulerUtil.init_system_scheduler()
- scheduler_jobs_count = len(SchedulerUtil.get_all_jobs())
- scheduler_status = SchedulerUtil.get_job_status()
- log.info(f"✅ 定时任务调度器初始化完成 ({scheduler_jobs_count} 个任务)")
- # ✅ 初始化fastapi-limiter(仅获取Redis连接)
- await FastAPILimiter.init(
- redis=app.state.redis,
- prefix=settings.REQUEST_LIMITER_REDIS_PREFIX,
- )
- log.info("✅ 请求限制器初始化完成")
- # 启动信息面板
- from app.utils.console import run as console_run
- from app.common.enums import EnvironmentEnum
- console_run(
- host=settings.SERVER_HOST,
- port=settings.SERVER_PORT,
- reload=True if settings.ENVIRONMENT == EnvironmentEnum.DEV else False,
- redis_ready=True,
- scheduler_jobs=scheduler_jobs_count,
- scheduler_status=scheduler_status,
- )
- except Exception as e:
- log.error(f"❌ 应用初始化失败: {str(e)}")
- raise
- yield
- # 关闭逻辑
- try:
- await import_modules_async(modules=settings.EVENT_LIST, desc="全局事件", app=app, status=False)
- log.info("✅ 全局事件模块卸载完成")
- await SchedulerUtil.close_system_scheduler()
- log.info("✅ 定时任务调度器已关闭")
- await FastAPILimiter.close()
- log.info("✅ 请求限制器已关闭")
- except Exception as e:
- log.error(f"❌ 应用关闭过程中发生错误: {str(e)}")
- # ✅ 中间件注册(添加自定义限流中间件)
- def register_middlewares(app: FastAPI) -> None:
- # 1. 原有中间件
- for middleware in settings.MIDDLEWARE_LIST[::-1]:
- if not middleware:
- continue
- middleware = import_module(middleware, desc="中间件")
- app.add_middleware(middleware)
- # 2. 限流中间件(核心:5次/10秒)
- app.add_middleware(
- CustomLimiterMiddleware,
- times=5,
- seconds=10,
- prefix=settings.REQUEST_LIMITER_REDIS_PREFIX or "fastapi-limiter:"
- )
- log.info("✅ 限流中间件注册完成")
- # ✅ 其他函数(不变)
- def register_exceptions(app: FastAPI) -> None:
- handle_exception(app)
- def register_routers(app: FastAPI) -> None:
- app.include_router(router=router)
- def register_files(app: FastAPI) -> None:
- if settings.STATIC_ENABLE:
- settings.STATIC_ROOT.mkdir(parents=True, exist_ok=True)
- app.mount(path=settings.STATIC_URL, app=StaticFiles(directory=settings.STATIC_ROOT), name=settings.STATIC_DIR)
- def reset_api_docs(app: FastAPI) -> None:
- @app.get(settings.DOCS_URL, include_in_schema=False)
- async def custom_swagger_ui_html() -> HTMLResponse:
- return get_swagger_ui_html(
- openapi_url=str(app.root_path) + str(app.openapi_url),
- title=app.title + " - Swagger UI",
- oauth2_redirect_url=app.swagger_ui_oauth2_redirect_url,
- swagger_js_url=settings.SWAGGER_JS_URL,
- swagger_css_url=settings.SWAGGER_CSS_URL,
- swagger_favicon_url=settings.FAVICON_URL,
- )
- @app.get(str(app.swagger_ui_oauth2_redirect_url), include_in_schema=False)
- async def swagger_ui_redirect():
- return get_swagger_ui_oauth2_redirect_html()
- @app.get(settings.REDOC_URL, include_in_schema=False)
- async def custom_redoc_html():
- return get_redoc_html(
- openapi_url=str(app.root_path) + str(app.openapi_url),
- title=app.title + " - ReDoc",
- redoc_js_url=settings.REDOC_JS_URL,
- redoc_favicon_url=settings.FAVICON_URL,
- )
|