| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254 |
- # -*- coding: utf-8 -*-
- import random
- import mimetypes
- from datetime import datetime
- import aiofiles
- from fastapi import UploadFile
- from pathlib import Path
- from urllib.parse import urljoin
- from app.config.setting import settings
- from app.core.exceptions import CustomException
- from app.core.logger import log
- class UploadUtil:
- """
- 上传工具类
- """
- @staticmethod
- def generate_random_number() -> str:
- """
- 生成3位随机数字字符串。
-
- 返回:
- - str: 三位随机数字字符串。
- """
- return f'{random.randint(1, 999):03}'
- @staticmethod
- def check_file_exists(filepath: str) -> bool:
- """
- 检查文件是否存在。
-
- 参数:
- - filepath (str): 文件路径。
-
- 返回:
- - bool: 文件是否存在。
- """
- return Path(filepath).exists()
- @staticmethod
- def check_file_extension(file: UploadFile) -> bool:
- """
- 检查文件后缀是否合法。
-
- 参数:
- - file (UploadFile): 上传的文件对象。
-
- 返回:
- - bool: 文件后缀是否合法。
-
- 异常:
- - CustomException: 文件类型不支持时抛出。
- """
- if file.content_type:
- file_extension = mimetypes.guess_extension(file.content_type)
- if file_extension and file_extension in settings.ALLOWED_EXTENSIONS:
- return True
- else:
- raise CustomException(msg="文件类型不支持")
- else:
- raise CustomException(msg="文件类型不支持")
- @staticmethod
- def check_file_timestamp(filename: str) -> bool:
- """
- 校验文件时间戳是否合法。
-
- 参数:
- - filename (str): 文件名(包含时间戳片段)。
-
- 返回:
- - bool: 时间戳是否合法。
- """
- try:
- name_parts = filename.rsplit('.', 1)[0].split('_')
- timestamp = name_parts[-1].split(settings.UPLOAD_MACHINE)[0]
- datetime.strptime(timestamp, '%Y%m%d%H%M%S')
- return True
- except (ValueError, IndexError):
- return False
- @staticmethod
- def check_file_machine(filename: str) -> bool:
- """
- 校验文件机器码是否合法。
-
- 参数:
- - filename (str): 文件名。
-
- 返回:
- - bool: 机器码是否合法。
- """
- try:
- name_without_ext = filename.rsplit('.', 1)[0]
- return len(name_without_ext) >= 4 and name_without_ext[-4] == settings.UPLOAD_MACHINE
- except IndexError:
- return False
- @staticmethod
- def check_file_random_code(filename: str) -> bool:
- """
- 校验文件随机码是否合法。
-
- 参数:
- - filename (str): 文件名。
-
- 返回:
- - bool: 随机码是否合法(000–999)。
- """
- try:
- code = filename.rsplit('.', 1)[0][-3:]
- return code.isdigit() and 1 <= int(code) <= 999
- except IndexError:
- return False
- @staticmethod
- def check_file_size(file: UploadFile) -> bool:
- """
- 校验文件大小是否合法。
-
- 参数:
- - file (UploadFile): 上传的文件对象。
-
- 返回:
- - bool: 文件大小是否合法(未提供 size 返回 False)。
- """
- if file.size:
- return file.size <= settings.MAX_FILE_SIZE
- else:
- return False
- @classmethod
- def generate_file_name(cls, filename: str) -> str:
- """
- 生成文件名称。
-
- 参数:
- - filename (str): 原始文件名(包含拓展名)。
-
- 返回:
- - str: 生成的文件名(包含时间戳、机器码、随机码)。
- """
- name, ext = filename.rsplit(".", 1)
- timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
- return f'{name}_{timestamp}{settings.UPLOAD_MACHINE}{cls.generate_random_number()}.{ext}'
-
- @staticmethod
- def generate_file(filepath: Path, chunk_size: int = 8192):
- """
- 根据文件生成二进制数据迭代器。
-
- 参数:
- - filepath (Path): 文件路径。
- - chunk_size (int): 分块大小,默认 8192 字节。
-
- 返回:
- - Iterator[bytes]: 文件二进制数据分块迭代器。
- """
- with filepath.open('rb') as f:
- while chunk := f.read(chunk_size):
- yield chunk
- @staticmethod
- def delete_file(filepath: Path) -> bool:
- """
- 删除文件。
-
- 参数:
- - filepath (Path): 文件路径。
-
- 返回:
- - bool: 删除是否成功。
- """
- try:
- filepath.unlink(missing_ok=True)
- return True
- except OSError:
- return False
-
- @classmethod
- async def upload_file(cls, file: UploadFile, base_url: str) -> tuple[str, Path, str]:
- """
- 文件上传。
-
- 参数:
- - file (UploadFile): 上传的文件对象。
- - base_url (str): 基础 URL。
-
- 返回:
- - tuple[str, Path, str]: (文件名, 文件路径, 文件 URL)。
-
- 异常:
- - CustomException: 当文件类型不支持或大小超限时抛出。
- """
- # 文件校验
- if not all([cls.check_file_extension(file), cls.check_file_size(file)]):
- raise CustomException(msg='文件类型或大小不合法')
-
- try:
- # 构建完整的目录路径
- dir_path = settings.UPLOAD_FILE_PATH.joinpath(datetime.now().strftime("%Y/%m/%d"))
- dir_path.mkdir(parents=True, exist_ok=True)
-
- filename = ""
- # 生成文件名并保存
- if file.filename:
- filename = cls.generate_file_name(file.filename)
- filepath = dir_path.joinpath(filename)
- file_url = urljoin(base_url, str(filepath))
- # filepath.mkdir(parents=True, exist_ok=True)
- # 分块写入文件
- chunk_size = 8 * 1024 * 1024 # 8MB chunks
- async with aiofiles.open(filepath, 'wb') as f:
- while chunk := await file.read(chunk_size):
- await f.write(chunk)
- # 返回相对路径
- return filename, filepath, file_url
-
- except Exception as e:
- log.error(f"文件上传失败: {e}")
- raise CustomException(msg='文件上传失败')
- @staticmethod
- def get_file_tree(file_path: str) -> list[dict]:
- """
- 获取文件树结构。
-
- 参数:
- - file_path (str): 文件路径。
-
- 返回:
- - list[dict]: 文件树列表。
- """
- return [{"name": item.name, "is_dir": item.is_dir()} for item in Path(file_path).iterdir()]
- @classmethod
- async def download_file(cls, file_path: str) -> str:
- """
- 下载文件,生成新的文件名。
-
- 参数:
- - file_path (str): 文件路径。
-
- 返回:
- - str: 文件下载信息。
- """
- filename = cls.generate_file(Path(file_path))
- return str(filename)
-
|