security.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. # -*- coding: utf-8 -*-
  2. import jwt
  3. from fastapi import Form, Request
  4. from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
  5. from fastapi.security.utils import get_authorization_scheme_param
  6. from app.core.exceptions import CustomException
  7. from app.config.setting import settings
  8. from app.api.v1.module_system.auth.schema import JWTPayloadSchema
  9. class CustomOAuth2PasswordBearer(OAuth2PasswordBearer):
  10. """自定义OAuth2认证类,继承自OAuth2PasswordBearer"""
  11. def __init__(
  12. self,
  13. token_url: str,
  14. scheme_name: str | None = None,
  15. scopes: dict[str, str] | None = None,
  16. description: str | None = None,
  17. auto_error: bool = True
  18. ) -> None:
  19. super().__init__(
  20. tokenUrl=token_url,
  21. scheme_name=scheme_name,
  22. scopes=scopes,
  23. description=description,
  24. auto_error=auto_error
  25. )
  26. async def __call__(self, request: Request) -> str | None:
  27. """
  28. 重写认证方法,校验token
  29. 参数:
  30. - request (Request): FastAPI请求对象。
  31. 返回:
  32. - str | None: 校验通过的token,如果校验失败则返回None。
  33. 异常:
  34. - CustomException: 认证失败时抛出,状态码为401。
  35. """
  36. authorization = request.headers.get("Authorization")
  37. scheme, token = get_authorization_scheme_param(authorization)
  38. if not authorization or scheme.lower() != settings.TOKEN_TYPE:
  39. if self.auto_error:
  40. raise CustomException(msg="认证失败,请登录后再试", code=10401, status_code=401)
  41. return None
  42. return token
  43. class CustomOAuth2PasswordRequestForm(OAuth2PasswordRequestForm):
  44. """
  45. 自定义登录表单,扩展验证码等字段
  46. 参数:
  47. - grant_type (str | None): 授权类型,默认值为None,正则表达式为'password'。
  48. - scope (str): 作用域,默认值为空字符串。
  49. - client_id (str | None): 客户端ID,默认值为None。
  50. - client_secret (str | None): 客户端密钥,默认值为None。
  51. - username (str): 用户名。
  52. - password (str): 密码。
  53. - captcha_key (str | None): 验证码键,默认值为空字符串。
  54. - captcha (str | None): 验证码值,默认值为空字符串。
  55. - login_type (str | None): 登录类型,默认值为"PC端",描述为"PC端 | 移动端"。
  56. """
  57. def __init__(
  58. self,
  59. grant_type: str | None = Form(default=None, pattern='password'),
  60. scope: str = Form(default=''),
  61. client_id: str | None = Form(default=None),
  62. client_secret: str | None = Form(default=None),
  63. username: str = Form(),
  64. password: str = Form(),
  65. captcha_key: str | None = Form(default=""),
  66. captcha: str | None = Form(default=""),
  67. login_type: str | None = Form(default="PC端", description="PC端 | 移动端")
  68. ):
  69. super().__init__(
  70. grant_type=grant_type,
  71. scope=scope,
  72. client_id=client_id,
  73. client_secret=client_secret,
  74. username=username,
  75. password=password,
  76. )
  77. self.captcha_key = captcha_key
  78. self.captcha = captcha
  79. self.login_type = login_type
  80. # OAuth2认证配置
  81. OAuth2Schema = CustomOAuth2PasswordBearer(
  82. token_url="system/auth/login",
  83. description="认证"
  84. )
  85. def create_access_token(payload: JWTPayloadSchema) -> str:
  86. """
  87. 生成JWT访问令牌
  88. 参数:
  89. - payload (JWTPayloadSchema): JWT有效载荷,包含用户信息等。
  90. 返回:
  91. - str: 生成的JWT访问令牌。
  92. """
  93. payload_dict = payload.model_dump()
  94. return jwt.encode(
  95. payload=payload_dict,
  96. key=settings.SECRET_KEY,
  97. algorithm=settings.ALGORITHM
  98. )
  99. def decode_access_token(token: str) -> JWTPayloadSchema:
  100. """
  101. 解析JWT访问令牌
  102. 参数:
  103. - token (str): JWT访问令牌字符串。
  104. 返回:
  105. - JWTPayloadSchema: 解析后的JWT有效载荷,包含用户信息等。
  106. 异常:
  107. - CustomException: 解析失败时抛出,状态码为401。
  108. """
  109. if not token:
  110. raise CustomException(msg="认证不存在,请重新登录", code=10401, status_code=401)
  111. try:
  112. payload = jwt.decode(
  113. jwt=token,
  114. key=settings.SECRET_KEY,
  115. algorithms=[settings.ALGORITHM]
  116. )
  117. online_user_info = payload.get("sub")
  118. if not online_user_info:
  119. raise CustomException(msg="无效认证,请重新登录", code=10401, status_code=401)
  120. return JWTPayloadSchema(**payload)
  121. except (jwt.InvalidSignatureError, jwt.DecodeError):
  122. raise CustomException(msg="无效认证,请重新登录", code=10401, status_code=401)
  123. except jwt.ExpiredSignatureError:
  124. raise CustomException(msg="认证已过期,请重新登录", code=10401, status_code=401)
  125. except jwt.InvalidTokenError:
  126. raise CustomException(msg="token已失效,请重新登录", code=10401, status_code=401)