| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157 |
- # -*- coding: utf-8 -*-
- import jwt
- from fastapi import Form, Request
- from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
- from fastapi.security.utils import get_authorization_scheme_param
- from app.core.exceptions import CustomException
- from app.config.setting import settings
- from app.api.v1.module_system.auth.schema import JWTPayloadSchema
- class CustomOAuth2PasswordBearer(OAuth2PasswordBearer):
- """自定义OAuth2认证类,继承自OAuth2PasswordBearer"""
- def __init__(
- self,
- token_url: str,
- scheme_name: str | None = None,
- scopes: dict[str, str] | None = None,
- description: str | None = None,
- auto_error: bool = True
- ) -> None:
- super().__init__(
- tokenUrl=token_url,
- scheme_name=scheme_name,
- scopes=scopes,
- description=description,
- auto_error=auto_error
- )
- async def __call__(self, request: Request) -> str | None:
- """
- 重写认证方法,校验token
- 参数:
- - request (Request): FastAPI请求对象。
- 返回:
- - str | None: 校验通过的token,如果校验失败则返回None。
- 异常:
- - CustomException: 认证失败时抛出,状态码为401。
- """
- authorization = request.headers.get("Authorization")
- scheme, token = get_authorization_scheme_param(authorization)
- if not authorization or scheme.lower() != settings.TOKEN_TYPE:
- if self.auto_error:
- raise CustomException(msg="认证失败,请登录后再试", code=10401, status_code=401)
- return None
- return token
- class CustomOAuth2PasswordRequestForm(OAuth2PasswordRequestForm):
- """
- 自定义登录表单,扩展验证码等字段
- 参数:
- - grant_type (str | None): 授权类型,默认值为None,正则表达式为'password'。
- - scope (str): 作用域,默认值为空字符串。
- - client_id (str | None): 客户端ID,默认值为None。
- - client_secret (str | None): 客户端密钥,默认值为None。
- - username (str): 用户名。
- - password (str): 密码。
- - captcha_key (str | None): 验证码键,默认值为空字符串。
- - captcha (str | None): 验证码值,默认值为空字符串。
- - login_type (str | None): 登录类型,默认值为"PC端",描述为"PC端 | 移动端"。
- """
- def __init__(
- self,
- grant_type: str | None = Form(default=None, pattern='password'),
- scope: str = Form(default=''),
- client_id: str | None = Form(default=None),
- client_secret: str | None = Form(default=None),
- username: str = Form(),
- password: str = Form(),
- captcha_key: str | None = Form(default=""),
- captcha: str | None = Form(default=""),
- login_type: str | None = Form(default="PC端", description="PC端 | 移动端")
- ):
- super().__init__(
- grant_type=grant_type,
- scope=scope,
- client_id=client_id,
- client_secret=client_secret,
- username=username,
- password=password,
- )
- self.captcha_key = captcha_key
- self.captcha = captcha
- self.login_type = login_type
- # OAuth2认证配置
- OAuth2Schema = CustomOAuth2PasswordBearer(
- token_url="system/auth/login",
- description="认证"
- )
- def create_access_token(payload: JWTPayloadSchema) -> str:
- """
- 生成JWT访问令牌
- 参数:
- - payload (JWTPayloadSchema): JWT有效载荷,包含用户信息等。
- 返回:
- - str: 生成的JWT访问令牌。
- """
- payload_dict = payload.model_dump()
- return jwt.encode(
- payload=payload_dict,
- key=settings.SECRET_KEY,
- algorithm=settings.ALGORITHM
- )
- def decode_access_token(token: str) -> JWTPayloadSchema:
- """
- 解析JWT访问令牌
- 参数:
- - token (str): JWT访问令牌字符串。
- 返回:
- - JWTPayloadSchema: 解析后的JWT有效载荷,包含用户信息等。
- 异常:
- - CustomException: 解析失败时抛出,状态码为401。
- """
- if not token:
- raise CustomException(msg="认证不存在,请重新登录", code=10401, status_code=401)
- try:
- payload = jwt.decode(
- jwt=token,
- key=settings.SECRET_KEY,
- algorithms=[settings.ALGORITHM]
- )
- online_user_info = payload.get("sub")
- if not online_user_info:
- raise CustomException(msg="无效认证,请重新登录", code=10401, status_code=401)
- return JWTPayloadSchema(**payload)
- except (jwt.InvalidSignatureError, jwt.DecodeError):
- raise CustomException(msg="无效认证,请重新登录", code=10401, status_code=401)
- except jwt.ExpiredSignatureError:
- raise CustomException(msg="认证已过期,请重新登录", code=10401, status_code=401)
- except jwt.InvalidTokenError:
- raise CustomException(msg="token已失效,请重新登录", code=10401, status_code=401)
|