upload_util.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. # -*- coding: utf-8 -*-
  2. import random
  3. import mimetypes
  4. from datetime import datetime
  5. import aiofiles
  6. from fastapi import UploadFile
  7. from pathlib import Path
  8. from urllib.parse import urljoin
  9. from app.config.setting import settings
  10. from app.core.exceptions import CustomException
  11. from app.core.logger import log
  12. class UploadUtil:
  13. """
  14. 上传工具类
  15. """
  16. @staticmethod
  17. def generate_random_number() -> str:
  18. """
  19. 生成3位随机数字字符串。
  20. 返回:
  21. - str: 三位随机数字字符串。
  22. """
  23. return f'{random.randint(1, 999):03}'
  24. @staticmethod
  25. def check_file_exists(filepath: str) -> bool:
  26. """
  27. 检查文件是否存在。
  28. 参数:
  29. - filepath (str): 文件路径。
  30. 返回:
  31. - bool: 文件是否存在。
  32. """
  33. return Path(filepath).exists()
  34. @staticmethod
  35. def check_file_extension(file: UploadFile) -> bool:
  36. """
  37. 检查文件后缀是否合法。
  38. 参数:
  39. - file (UploadFile): 上传的文件对象。
  40. 返回:
  41. - bool: 文件后缀是否合法。
  42. 异常:
  43. - CustomException: 文件类型不支持时抛出。
  44. """
  45. if file.content_type:
  46. file_extension = mimetypes.guess_extension(file.content_type)
  47. if file_extension and file_extension in settings.ALLOWED_EXTENSIONS:
  48. return True
  49. else:
  50. raise CustomException(msg="文件类型不支持")
  51. else:
  52. raise CustomException(msg="文件类型不支持")
  53. @staticmethod
  54. def check_file_timestamp(filename: str) -> bool:
  55. """
  56. 校验文件时间戳是否合法。
  57. 参数:
  58. - filename (str): 文件名(包含时间戳片段)。
  59. 返回:
  60. - bool: 时间戳是否合法。
  61. """
  62. try:
  63. name_parts = filename.rsplit('.', 1)[0].split('_')
  64. timestamp = name_parts[-1].split(settings.UPLOAD_MACHINE)[0]
  65. datetime.strptime(timestamp, '%Y%m%d%H%M%S')
  66. return True
  67. except (ValueError, IndexError):
  68. return False
  69. @staticmethod
  70. def check_file_machine(filename: str) -> bool:
  71. """
  72. 校验文件机器码是否合法。
  73. 参数:
  74. - filename (str): 文件名。
  75. 返回:
  76. - bool: 机器码是否合法。
  77. """
  78. try:
  79. name_without_ext = filename.rsplit('.', 1)[0]
  80. return len(name_without_ext) >= 4 and name_without_ext[-4] == settings.UPLOAD_MACHINE
  81. except IndexError:
  82. return False
  83. @staticmethod
  84. def check_file_random_code(filename: str) -> bool:
  85. """
  86. 校验文件随机码是否合法。
  87. 参数:
  88. - filename (str): 文件名。
  89. 返回:
  90. - bool: 随机码是否合法(000–999)。
  91. """
  92. try:
  93. code = filename.rsplit('.', 1)[0][-3:]
  94. return code.isdigit() and 1 <= int(code) <= 999
  95. except IndexError:
  96. return False
  97. @staticmethod
  98. def check_file_size(file: UploadFile) -> bool:
  99. """
  100. 校验文件大小是否合法。
  101. 参数:
  102. - file (UploadFile): 上传的文件对象。
  103. 返回:
  104. - bool: 文件大小是否合法(未提供 size 返回 False)。
  105. """
  106. if file.size:
  107. return file.size <= settings.MAX_FILE_SIZE
  108. else:
  109. return False
  110. @classmethod
  111. def generate_file_name(cls, filename: str) -> str:
  112. """
  113. 生成文件名称。
  114. 参数:
  115. - filename (str): 原始文件名(包含拓展名)。
  116. 返回:
  117. - str: 生成的文件名(包含时间戳、机器码、随机码)。
  118. """
  119. name, ext = filename.rsplit(".", 1)
  120. timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
  121. return f'{name}_{timestamp}{settings.UPLOAD_MACHINE}{cls.generate_random_number()}.{ext}'
  122. @staticmethod
  123. def generate_file(filepath: Path, chunk_size: int = 8192):
  124. """
  125. 根据文件生成二进制数据迭代器。
  126. 参数:
  127. - filepath (Path): 文件路径。
  128. - chunk_size (int): 分块大小,默认 8192 字节。
  129. 返回:
  130. - Iterator[bytes]: 文件二进制数据分块迭代器。
  131. """
  132. with filepath.open('rb') as f:
  133. while chunk := f.read(chunk_size):
  134. yield chunk
  135. @staticmethod
  136. def delete_file(filepath: Path) -> bool:
  137. """
  138. 删除文件。
  139. 参数:
  140. - filepath (Path): 文件路径。
  141. 返回:
  142. - bool: 删除是否成功。
  143. """
  144. try:
  145. filepath.unlink(missing_ok=True)
  146. return True
  147. except OSError:
  148. return False
  149. @classmethod
  150. async def upload_file(cls, file: UploadFile, base_url: str) -> tuple[str, Path, str]:
  151. """
  152. 文件上传。
  153. 参数:
  154. - file (UploadFile): 上传的文件对象。
  155. - base_url (str): 基础 URL。
  156. 返回:
  157. - tuple[str, Path, str]: (文件名, 文件路径, 文件 URL)。
  158. 异常:
  159. - CustomException: 当文件类型不支持或大小超限时抛出。
  160. """
  161. # 文件校验
  162. if not all([cls.check_file_extension(file), cls.check_file_size(file)]):
  163. raise CustomException(msg='文件类型或大小不合法')
  164. try:
  165. # 构建完整的目录路径
  166. dir_path = settings.UPLOAD_FILE_PATH.joinpath(datetime.now().strftime("%Y/%m/%d"))
  167. dir_path.mkdir(parents=True, exist_ok=True)
  168. filename = ""
  169. # 生成文件名并保存
  170. if file.filename:
  171. filename = cls.generate_file_name(file.filename)
  172. filepath = dir_path.joinpath(filename)
  173. file_url = urljoin(base_url, str(filepath))
  174. # filepath.mkdir(parents=True, exist_ok=True)
  175. # 分块写入文件
  176. chunk_size = 8 * 1024 * 1024 # 8MB chunks
  177. async with aiofiles.open(filepath, 'wb') as f:
  178. while chunk := await file.read(chunk_size):
  179. await f.write(chunk)
  180. # 返回相对路径
  181. return filename, filepath, file_url
  182. except Exception as e:
  183. log.error(f"文件上传失败: {e}")
  184. raise CustomException(msg='文件上传失败')
  185. @staticmethod
  186. def get_file_tree(file_path: str) -> list[dict]:
  187. """
  188. 获取文件树结构。
  189. 参数:
  190. - file_path (str): 文件路径。
  191. 返回:
  192. - list[dict]: 文件树列表。
  193. """
  194. return [{"name": item.name, "is_dir": item.is_dir()} for item in Path(file_path).iterdir()]
  195. @classmethod
  196. async def download_file(cls, file_path: str) -> str:
  197. """
  198. 下载文件,生成新的文件名。
  199. 参数:
  200. - file_path (str): 文件路径。
  201. 返回:
  202. - str: 文件下载信息。
  203. """
  204. filename = cls.generate_file(Path(file_path))
  205. return str(filename)