database.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. # -*- coding: utf-8 -*-
  2. from redis.asyncio import Redis
  3. from redis import exceptions
  4. from fastapi import FastAPI
  5. from sqlalchemy import create_engine, Engine
  6. from sqlalchemy.orm import sessionmaker
  7. from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession, AsyncEngine
  8. from app.core.logger import log
  9. from app.config.setting import settings
  10. from app.core.exceptions import CustomException
  11. def create_engine_and_session(
  12. db_url: str = settings.DB_URI
  13. ) -> tuple[Engine, sessionmaker]:
  14. """
  15. 创建同步数据库引擎和会话工厂。
  16. 参数:
  17. - db_url (str): 数据库连接URL,默认从配置中获取。
  18. 返回:
  19. - tuple[Engine, sessionmaker]: 同步数据库引擎和会话工厂。
  20. """
  21. try:
  22. if not settings.SQL_DB_ENABLE:
  23. raise CustomException(msg="请先开启数据库连接", data="请启用 app/config/setting.py: SQL_DB_ENABLE")
  24. # 同步数据库引擎
  25. engine: Engine = create_engine(
  26. url=db_url,
  27. echo=settings.DATABASE_ECHO,
  28. pool_pre_ping=settings.POOL_PRE_PING,
  29. pool_recycle=settings.POOL_RECYCLE,
  30. )
  31. except Exception as e:
  32. log.error(f'❌ 数据库连接失败 {e}')
  33. raise
  34. else:
  35. # 同步数据库会话工厂
  36. SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
  37. return engine, SessionLocal
  38. def create_async_engine_and_session(
  39. db_url: str = settings.ASYNC_DB_URI
  40. ) -> tuple[AsyncEngine, async_sessionmaker[AsyncSession]]:
  41. """
  42. 获取异步数据库会话连接。
  43. 返回:
  44. - tuple[AsyncEngine, async_sessionmaker[AsyncSession]]: 异步数据库引擎和会话工厂。
  45. """
  46. try:
  47. if not settings.SQL_DB_ENABLE:
  48. raise CustomException(msg="请先开启数据库连接", data="请启用 app/config/setting.py: SQL_DB_ENABLE")
  49. # 异步数据库引擎
  50. async_engine: AsyncEngine = create_async_engine(
  51. url=db_url,
  52. echo=settings.DATABASE_ECHO,
  53. echo_pool=settings.ECHO_POOL,
  54. pool_pre_ping=settings.POOL_PRE_PING,
  55. future=settings.FUTURE,
  56. pool_recycle=settings.POOL_RECYCLE,
  57. pool_size=settings.POOL_SIZE,
  58. max_overflow=settings.MAX_OVERFLOW,
  59. pool_timeout=settings.POOL_TIMEOUT,
  60. pool_use_lifo=settings.POOL_USE_LIFO,
  61. )
  62. except Exception as e:
  63. log.error(f'❌ 数据库连接失败 {e}')
  64. raise
  65. else:
  66. # 异步数据库会话工厂
  67. AsyncSessionLocal = async_sessionmaker(
  68. bind=async_engine,
  69. autocommit=settings.AUTOCOMMIT,
  70. autoflush=settings.AUTOFETCH,
  71. expire_on_commit=settings.EXPIRE_ON_COMMIT,
  72. class_=AsyncSession
  73. )
  74. return async_engine, AsyncSessionLocal
  75. engine, db_session = create_engine_and_session(settings.DB_URI)
  76. async_engine, async_db_session = create_async_engine_and_session(settings.ASYNC_DB_URI)
  77. async def redis_connect(app: FastAPI, status: str) -> Redis | None:
  78. """
  79. 创建或关闭Redis连接。
  80. 参数:
  81. - app (FastAPI): FastAPI应用实例。
  82. - status (bool): 连接状态,True为创建连接,False为关闭连接。
  83. 返回:
  84. - Redis | None: Redis连接实例,如果连接失败则返回None。
  85. """
  86. if not settings.REDIS_ENABLE:
  87. raise CustomException(msg="请先开启Redis连接", data="请启用 app/core/config.py: REDIS_ENABLE")
  88. if status:
  89. try:
  90. rd = await Redis.from_url(
  91. url=settings.REDIS_URI,
  92. encoding='utf-8',
  93. decode_responses=True,
  94. health_check_interval=20,
  95. max_connections=settings.POOL_SIZE,
  96. socket_timeout=settings.POOL_TIMEOUT
  97. )
  98. app.state.redis = rd
  99. if await rd.ping():
  100. log.info("✅️ Redis连接成功...")
  101. return rd
  102. except exceptions.AuthenticationError as e:
  103. log.error(f"❌ 数据库 Redis 认证失败: {e}")
  104. raise
  105. except exceptions.TimeoutError as e:
  106. log.error(f"❌ 数据库 Redis 连接超时: {e}")
  107. raise
  108. except exceptions.RedisError as e:
  109. log.error(f"❌ 数据库 Redis 连接错误: {e}")
  110. raise
  111. else:
  112. await app.state.redis.aclose()
  113. log.info('✅️ Redis连接已关闭')