| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201 |
- # -*- coding: utf-8 -*-
- from starlette.responses import HTMLResponse
- from typing import Any, AsyncGenerator
- from fastapi import Depends, FastAPI, Request, Response
- from fastapi.staticfiles import StaticFiles
- from fastapi.concurrency import asynccontextmanager
- from fastapi.openapi.docs import (
- get_redoc_html,
- get_swagger_ui_html,
- get_swagger_ui_oauth2_redirect_html
- )
- from fastapi_limiter import FastAPILimiter
- from fastapi_limiter.depends import RateLimiter
- from math import ceil
- from app.api.v1.module_business.vardict.service import BizVarDictService
- from app.config.setting import settings
- from app.core.logger import log
- from app.core.discover import router
- from app.core.exceptions import CustomException, handle_exception
- from app.utils.common_util import import_module, import_modules_async
- from app.scripts.initialize import InitializeData
- from app.api.v1.module_application.job.tools.ap_scheduler import SchedulerUtil
- from app.api.v1.module_system.params.service import ParamsService
- from app.api.v1.module_system.dict.service import DictDataService
- from app.api.v1.module_business.crane.service import BizCraneService
- @asynccontextmanager
- async def lifespan(app: FastAPI) -> AsyncGenerator[Any, Any]:
- """
- 自定义 FastAPI 应用生命周期。
-
- 参数:
- - app (FastAPI): FastAPI 应用实例。
-
- 返回:
- - AsyncGenerator[Any, Any]: 生命周期上下文生成器。
- """
- try:
- await InitializeData().init_db()
- log.info(f"✅ {settings.DATABASE_TYPE}数据库初始化完成")
- await import_modules_async(modules=settings.EVENT_LIST, desc="全局事件", app=app, status=True)
- log.info("✅ 全局事件模块加载完成")
- await ParamsService().init_config_service(redis=app.state.redis)
- log.info("✅ Redis系统配置初始化完成")
- await DictDataService().init_dict_service(redis=app.state.redis)
- log.info("✅ Redis数据字典初始化完成")
- await BizVarDictService().init_vardict_group_service(redis=app.state.redis)
- log.info("✅ Redis天车数据初始化完成")
- await SchedulerUtil.init_system_scheduler()
- scheduler_jobs_count = len(SchedulerUtil.get_all_jobs())
- scheduler_status = SchedulerUtil.get_job_status()
- log.info(f"✅ 定时任务调度器初始化完成 ({scheduler_jobs_count} 个任务)")
- # 6. 初始化请求限制器
- await FastAPILimiter.init(
- redis=app.state.redis,
- prefix=settings.REQUEST_LIMITER_REDIS_PREFIX,
- http_callback=http_limit_callback,
- )
- log.info("✅ 请求限制器初始化完成")
-
- # 导入并显示最终的启动信息面板
- from app.utils.console import run as console_run
- from app.common.enums import EnvironmentEnum
- console_run(
- host=settings.SERVER_HOST,
- port=settings.SERVER_PORT,
- reload=True if settings.ENVIRONMENT == EnvironmentEnum.DEV else False,
- redis_ready=True,
- scheduler_jobs=scheduler_jobs_count,
- scheduler_status=scheduler_status,
- )
-
- except Exception as e:
- log.error(f"❌ 应用初始化失败: {str(e)}")
- raise
- yield
-
- try:
- await import_modules_async(modules=settings.EVENT_LIST, desc="全局事件", app=app, status=False)
- log.info("✅ 全局事件模块卸载完成")
- await SchedulerUtil.close_system_scheduler()
- log.info("✅ 定时任务调度器已关闭")
- await FastAPILimiter.close()
- log.info("✅ 请求限制器已关闭")
- except Exception as e:
- log.error(f"❌ 应用关闭过程中发生错误: {str(e)}")
-
- def register_middlewares(app: FastAPI) -> None:
- """
- 注册全局中间件。
- 参数:
- - app (FastAPI): FastAPI 应用实例。
- 返回:
- - None
- """
- for middleware in settings.MIDDLEWARE_LIST[::-1]:
- if not middleware:
- continue
- middleware = import_module(middleware, desc="中间件")
- app.add_middleware(middleware)
- def register_exceptions(app: FastAPI) -> None:
- """
- 统一注册异常处理器。
- 参数:
- - app (FastAPI): FastAPI 应用实例。
- 返回:
- - None
- """
- handle_exception(app)
- def register_routers(app: FastAPI) -> None:
- """
- 注册根路由。
- 参数:
- - app (FastAPI): FastAPI 应用实例。
- 返回:
- - None
- """
- app.include_router(router=router, dependencies=[Depends(RateLimiter(times=5, seconds=10))])
- def register_files(app: FastAPI) -> None:
- """
- 注册静态资源挂载和文件相关配置。
- 参数:
- - app (FastAPI): FastAPI 应用实例。
- 返回:
- - None
- """
- # 挂载静态文件目录
- if settings.STATIC_ENABLE:
- # 确保日志目录存在
- settings.STATIC_ROOT.mkdir(parents=True, exist_ok=True)
- app.mount(path=settings.STATIC_URL, app=StaticFiles(directory=settings.STATIC_ROOT), name=settings.STATIC_DIR)
- def reset_api_docs(app: FastAPI) -> None:
- """
- 使用本地静态资源自定义 API 文档页面(Swagger UI 与 ReDoc)。
- 参数:
- - app (FastAPI): FastAPI 应用实例。
- 返回:
- - None
- """
- @app.get(settings.DOCS_URL, include_in_schema=False)
- async def custom_swagger_ui_html() -> HTMLResponse:
- return get_swagger_ui_html(
- openapi_url=str(app.root_path) + str(app.openapi_url),
- title=app.title + " - Swagger UI",
- oauth2_redirect_url=app.swagger_ui_oauth2_redirect_url,
- swagger_js_url=settings.SWAGGER_JS_URL,
- swagger_css_url=settings.SWAGGER_CSS_URL,
- swagger_favicon_url=settings.FAVICON_URL,
- )
- @app.get(str(app.swagger_ui_oauth2_redirect_url), include_in_schema=False)
- async def swagger_ui_redirect():
- return get_swagger_ui_oauth2_redirect_html()
- @app.get(settings.REDOC_URL, include_in_schema=False)
- async def custom_redoc_html():
- return get_redoc_html(
- openapi_url=str(app.root_path) + str(app.openapi_url),
- title=app.title + " - ReDoc",
- redoc_js_url=settings.REDOC_JS_URL,
- redoc_favicon_url=settings.FAVICON_URL,
- )
- async def http_limit_callback(request: Request, response: Response, expire: int):
- """
- 请求限制时的默认回调函数
- :param request: FastAPI 请求对象
- :param response: FastAPI 响应对象
- :param expire: 剩余毫秒数
- :return:
- """
- expires = ceil(expire / 30)
- raise CustomException(
- status_code=429,
- msg='请求过于频繁,请稍后重试',
- data={'Retry-After': str(expires)},
- )
|