service.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826
  1. # -*- coding: utf-8 -*-
  2. import os
  3. import shutil
  4. from datetime import datetime
  5. from pathlib import Path
  6. from urllib.parse import urlparse
  7. from fastapi import UploadFile
  8. from app.core.exceptions import CustomException
  9. from app.core.logger import log
  10. from app.utils.excel_util import ExcelUtil
  11. from app.config.setting import settings
  12. from .schema import (
  13. ResourceItemSchema,
  14. ResourceDirectorySchema,
  15. ResourceUploadSchema,
  16. ResourceMoveSchema,
  17. ResourceCopySchema,
  18. ResourceRenameSchema,
  19. ResourceCreateDirSchema,
  20. ResourceSearchQueryParam
  21. )
  22. class ResourceService:
  23. """
  24. 资源管理模块服务层 - 管理系统静态文件目录
  25. """
  26. # 配置常量
  27. MAX_UPLOAD_SIZE = 100 * 1024 * 1024 # 100MB
  28. MAX_SEARCH_RESULTS = 1000 # 最大搜索结果数
  29. MAX_PATH_DEPTH = 20 # 最大路径深度
  30. @classmethod
  31. def _get_resource_root(cls) -> str:
  32. """
  33. 获取资源管理根目录
  34. 返回:
  35. - str: 资源管理根目录路径。
  36. """
  37. if not settings.STATIC_ENABLE:
  38. raise CustomException(msg='静态文件服务未启用')
  39. return str(settings.STATIC_ROOT)
  40. @classmethod
  41. def _get_safe_path(cls, path: str | None = None) -> str:
  42. """
  43. 获取安全的文件路径
  44. 参数:
  45. - path (str | None): 原始文件路径。
  46. 返回:
  47. - str: 安全的文件路径。
  48. """
  49. resource_root = cls._get_resource_root()
  50. if not path:
  51. return resource_root
  52. # 支持前端传递的完整URL或以STATIC_URL/ROOT_PATH+STATIC_URL开头的URL路径,转换为相对资源路径
  53. if isinstance(path, str):
  54. static_prefix = settings.STATIC_URL.rstrip('/')
  55. root_prefix = settings.ROOT_PATH.rstrip('/') if getattr(settings, 'ROOT_PATH', '') else ''
  56. root_static_prefix = f"{root_prefix}{static_prefix}" if root_prefix else static_prefix
  57. def strip_prefix(p: str) -> str:
  58. if p.startswith(root_static_prefix):
  59. return p[len(root_static_prefix):].lstrip('/')
  60. if p.startswith(static_prefix):
  61. return p[len(static_prefix):].lstrip('/')
  62. return p
  63. if path.startswith('http://') or path.startswith('https://'):
  64. parsed = urlparse(path)
  65. url_path = parsed.path or ''
  66. path = strip_prefix(url_path)
  67. else:
  68. path = strip_prefix(path)
  69. # 清理路径,移除危险字符
  70. path = path.strip().replace('..', '').replace('//', '/')
  71. # 规范化路径
  72. if os.path.isabs(path):
  73. safe_path = os.path.normpath(path)
  74. else:
  75. safe_path = os.path.normpath(os.path.join(resource_root, path))
  76. # 检查路径是否在允许的范围内
  77. resource_root_abs = os.path.normpath(os.path.abspath(resource_root))
  78. safe_path_abs = os.path.normpath(os.path.abspath(safe_path))
  79. if not safe_path_abs.startswith(resource_root_abs):
  80. raise CustomException(msg=f'访问路径不在允许范围内: {path}')
  81. # 防止路径遍历攻击
  82. if '..' in safe_path or safe_path.count('/') > cls.MAX_PATH_DEPTH:
  83. raise CustomException(msg=f'不安全的路径格式: {path}')
  84. return safe_path
  85. @classmethod
  86. def _path_exists(cls, path: str) -> bool:
  87. """
  88. 检查路径是否存在
  89. 参数:
  90. - path (str): 要检查的路径。
  91. 返回:
  92. - bool: 如果路径存在则返回True,否则返回False。
  93. """
  94. try:
  95. safe_path = cls._get_safe_path(path)
  96. return os.path.exists(safe_path)
  97. except:
  98. return False
  99. @classmethod
  100. def _generate_http_url(cls, file_path: str, base_url: str | None = None) -> str:
  101. """
  102. 生成文件的HTTP URL
  103. 参数:
  104. - file_path (str): 文件的绝对路径。
  105. - base_url (str | None): 基础URL,用于生成完整URL。
  106. 返回:
  107. - str: 文件的HTTP URL。
  108. """
  109. resource_root = cls._get_resource_root()
  110. try:
  111. relative_path = os.path.relpath(file_path, resource_root)
  112. # 确保路径使用正斜杠(URL格式)
  113. url_path = relative_path.replace(os.sep, '/')
  114. except ValueError:
  115. # 如果无法计算相对路径,使用文件名
  116. url_path = os.path.basename(file_path)
  117. # 如果提供了base_url,使用它生成完整URL,否则使用settings.STATIC_URL
  118. if base_url:
  119. # 修复URL生成逻辑
  120. base_part = base_url.rstrip('/')
  121. static_part = settings.STATIC_URL.lstrip('/')
  122. file_part = url_path.lstrip('/')
  123. if base_part.endswith(':') or (len(base_part) > 0 and base_part[-1] not in ['/', ':']):
  124. base_part += '/'
  125. http_url = f"{base_part}{static_part}/{file_part}".replace('//', '/').replace(':/', '://')
  126. else:
  127. http_url = f"{settings.STATIC_URL}/{url_path}".replace('//', '/')
  128. return http_url
  129. @classmethod
  130. def _get_file_info(cls, file_path: str, base_url: str | None = None) -> dict:
  131. """
  132. 获取文件或目录的详细信息,如名称、大小、创建时间、修改时间、路径、深度、HTTP URL、是否隐藏、是否为目录等。
  133. 参数:
  134. - file_path (str): 文件或目录的路径。
  135. - base_url (str | None): 基础URL,用于生成完整URL。
  136. 返回:
  137. - dict: 文件或目录的详细信息字典。
  138. """
  139. try:
  140. safe_path = cls._get_safe_path(file_path)
  141. if not os.path.exists(safe_path):
  142. return {}
  143. stat = os.stat(safe_path)
  144. path_obj = Path(safe_path)
  145. resource_root = cls._get_resource_root()
  146. # 计算相对路径
  147. try:
  148. relative_path = os.path.relpath(safe_path, resource_root)
  149. except ValueError:
  150. relative_path = os.path.basename(safe_path)
  151. # 计算深度
  152. try:
  153. depth = len(Path(safe_path).relative_to(resource_root).parts)
  154. except ValueError:
  155. depth = 0
  156. # 生成HTTP URL路径而不是文件系统路径
  157. http_url = cls._generate_http_url(safe_path, base_url)
  158. # 检查是否为隐藏文件(文件名以点开头)
  159. is_hidden = path_obj.name.startswith('.')
  160. # 对于目录,设置is_directory字段(兼容前端)
  161. is_directory = os.path.isdir(safe_path)
  162. # 将datetime对象转换为ISO格式的字符串,确保JSON序列化成功
  163. created_time = datetime.fromtimestamp(stat.st_ctime).isoformat()
  164. modified_time = datetime.fromtimestamp(stat.st_mtime).isoformat()
  165. return {
  166. 'name': path_obj.name,
  167. 'file_url': http_url, # 统一使用file_url字段
  168. 'relative_path': relative_path,
  169. 'is_file': os.path.isfile(safe_path),
  170. 'is_dir': is_directory,
  171. 'size': stat.st_size if os.path.isfile(safe_path) else None,
  172. 'created_time': created_time,
  173. 'modified_time': modified_time,
  174. 'is_hidden': is_hidden
  175. }
  176. except Exception as e:
  177. log.error(f'获取文件信息失败: {str(e)}')
  178. return {}
  179. @classmethod
  180. async def get_directory_list_service(cls, path: str | None = None, include_hidden: bool = False, base_url: str | None = None) -> dict:
  181. """
  182. 获取目录列表
  183. 参数:
  184. - path (str | None): 目录路径。如果未指定,将使用静态文件根目录。
  185. - include_hidden (bool): 是否包含隐藏文件。
  186. - base_url (str | None): 基础URL,用于生成完整URL。
  187. 返回:
  188. - dict: 包含目录列表和统计信息的字典。
  189. """
  190. try:
  191. # 如果没有指定路径,使用静态文件根目录
  192. if path is None:
  193. safe_path = cls._get_resource_root()
  194. display_path = cls._generate_http_url(safe_path, base_url)
  195. else:
  196. safe_path = cls._get_safe_path(path)
  197. display_path = cls._generate_http_url(safe_path, base_url)
  198. if not os.path.exists(safe_path):
  199. raise CustomException(msg='目录不存在')
  200. if not os.path.isdir(safe_path):
  201. raise CustomException(msg='路径不是目录')
  202. items = []
  203. total_files = 0
  204. total_dirs = 0
  205. total_size = 0
  206. try:
  207. for item_name in os.listdir(safe_path):
  208. # 跳过隐藏文件
  209. if not include_hidden and item_name.startswith('.'):
  210. continue
  211. item_path = os.path.join(safe_path, item_name)
  212. file_info = cls._get_file_info(item_path, base_url)
  213. if file_info:
  214. items.append(ResourceItemSchema(**file_info))
  215. if file_info['is_file']:
  216. total_files += 1
  217. total_size += file_info.get('size', 0) or 0
  218. elif file_info['is_dir']:
  219. total_dirs += 1
  220. except PermissionError:
  221. raise CustomException(msg='没有权限访问此目录')
  222. return ResourceDirectorySchema(
  223. path=display_path, # 返回HTTP URL路径而不是文件系统路径
  224. name=os.path.basename(safe_path),
  225. items=items,
  226. total_files=total_files,
  227. total_dirs=total_dirs,
  228. total_size=total_size
  229. ).model_dump()
  230. except CustomException:
  231. raise
  232. except Exception as e:
  233. log.error(f'获取目录列表失败: {str(e)}')
  234. raise CustomException(msg=f'获取目录列表失败: {str(e)}')
  235. @classmethod
  236. async def get_resources_list_service(cls, search: ResourceSearchQueryParam | None = None, order_by: str | None = None, base_url: str | None = None) -> list[dict]:
  237. """
  238. 搜索资源列表(用于分页和导出)
  239. 参数:
  240. - search (ResourceSearchQueryParam | None): 查询参数模型。
  241. - order_by (str | None): 排序参数。
  242. - base_url (str | None): 基础URL,用于生成完整URL。
  243. 返回:
  244. - list[dict]: 资源详情字典列表。
  245. """
  246. try:
  247. # 确定搜索路径
  248. if search and hasattr(search, 'path') and search.path:
  249. resource_root = cls._get_safe_path(search.path)
  250. else:
  251. resource_root = cls._get_resource_root()
  252. # 检查路径是否存在
  253. if not os.path.exists(resource_root):
  254. raise CustomException(msg='目录不存在')
  255. if not os.path.isdir(resource_root):
  256. raise CustomException(msg='路径不是目录')
  257. # 收集资源
  258. all_resources = []
  259. try:
  260. for item_name in os.listdir(resource_root):
  261. # 跳过隐藏文件
  262. if item_name.startswith('.'):
  263. continue
  264. item_path = os.path.join(resource_root, item_name)
  265. file_info = cls._get_file_info(item_path, base_url)
  266. if file_info:
  267. # 应用名称过滤
  268. if search and hasattr(search, 'name') and search.name and search.name[1]:
  269. search_keyword = search.name[1].lower()
  270. if search_keyword not in file_info.get('name', '').lower():
  271. continue
  272. all_resources.append(file_info)
  273. except PermissionError:
  274. raise CustomException(msg='没有权限访问此目录')
  275. # 应用排序
  276. sorted_resources = cls._sort_results(all_resources, order_by)
  277. # 限制最大结果数
  278. if len(sorted_resources) > cls.MAX_SEARCH_RESULTS:
  279. sorted_resources = sorted_resources[:cls.MAX_SEARCH_RESULTS]
  280. return sorted_resources
  281. except Exception as e:
  282. log.error(f'搜索资源失败: {str(e)}')
  283. raise CustomException(msg=f'搜索资源失败: {str(e)}')
  284. @classmethod
  285. async def export_resource_service(cls, data_list: list[dict]) -> bytes:
  286. """
  287. 导出资源列表
  288. 参数:
  289. - data_list (list[dict]): 资源详情字典列表。
  290. 返回:
  291. - bytes: Excel文件的二进制数据。
  292. """
  293. mapping_dict = {
  294. 'name': '文件名',
  295. 'path': '文件路径',
  296. 'size': '文件大小',
  297. 'created_time': '创建时间',
  298. 'modified_time': '修改时间',
  299. 'parent_path': '父目录'
  300. }
  301. # 复制数据并转换状态
  302. export_data = data_list.copy()
  303. # 格式化文件大小
  304. for item in export_data:
  305. if item.get('size'):
  306. item['size'] = cls._format_file_size(item['size'])
  307. return ExcelUtil.export_list2excel(list_data=export_data, mapping_dict=mapping_dict)
  308. @classmethod
  309. async def _get_directory_stats(cls, path: str, include_hidden: bool = False) -> dict[str, int]:
  310. """
  311. 递归获取目录统计信息
  312. 参数:
  313. - path (str): 目录路径。
  314. - include_hidden (bool): 是否包含隐藏文件。
  315. 返回:
  316. - dict[str, int]: 包含文件数、目录数和总大小的字典。
  317. """
  318. stats = {'files': 0, 'dirs': 0, 'size': 0}
  319. try:
  320. for root, dirs, files in os.walk(path):
  321. # 过滤隐藏目录
  322. if not include_hidden:
  323. dirs[:] = [d for d in dirs if not d.startswith('.')]
  324. files = [f for f in files if not f.startswith('.')]
  325. stats['dirs'] += len(dirs)
  326. stats['files'] += len(files)
  327. for file in files:
  328. file_path = os.path.join(root, file)
  329. try:
  330. stats['size'] += os.path.getsize(file_path)
  331. except (OSError, IOError):
  332. continue
  333. except Exception:
  334. pass
  335. return stats
  336. @classmethod
  337. def _sort_results(cls, results: list[dict], order_by: str | None = None) -> list[dict]:
  338. """
  339. 排序搜索结果
  340. 参数:
  341. - results (list[dict]): 资源详情字典列表。
  342. - order_by (str | None): 排序参数。
  343. 返回:
  344. - list[dict]: 排序后的资源详情字典列表。
  345. """
  346. try:
  347. # 默认按名称升序排序
  348. if not order_by:
  349. return sorted(results, key=lambda x: x.get('name', ''), reverse=False)
  350. # 解析order_by参数,格式: [{'field':'asc/desc'}]
  351. try:
  352. sort_conditions = eval(order_by)
  353. if isinstance(sort_conditions, list):
  354. # 构建排序键函数
  355. def sort_key(item):
  356. keys = []
  357. for cond in sort_conditions:
  358. field = cond.get('field', 'name')
  359. direction = cond.get('direction', 'asc')
  360. # 获取字段值,默认为空字符串
  361. value = item.get(field, '')
  362. # 如果是日期字段,转换为可比较的格式
  363. if field in ['created_time', 'modified_time', 'accessed_time'] and value:
  364. value = datetime.fromisoformat(value)
  365. keys.append(value)
  366. return keys
  367. # 确定排序方向(这里只支持单一方向,多个条件时使用第一个条件的方向)
  368. reverse = False
  369. if sort_conditions and isinstance(sort_conditions[0], dict):
  370. direction = sort_conditions[0].get('direction', '').lower()
  371. reverse = direction == 'desc'
  372. return sorted(results, key=sort_key, reverse=reverse)
  373. except:
  374. # 如果解析失败,使用默认排序
  375. pass
  376. return sorted(results, key=lambda x: x.get('name', ''), reverse=False)
  377. except:
  378. return results
  379. @classmethod
  380. async def upload_file_service(cls, file: UploadFile, target_path: str | None = None, base_url: str | None = None) -> dict:
  381. """
  382. 上传文件到指定目录
  383. 参数:
  384. - file (UploadFile): 上传的文件对象。
  385. - target_path (str | None): 目标目录路径。
  386. - base_url (str | None): 基础URL,用于生成完整URL。
  387. 返回:
  388. - dict: 包含文件信息的字典。
  389. """
  390. if not file or not file.filename:
  391. raise CustomException(msg="请选择要上传的文件")
  392. # 文件名安全检查
  393. if '..' in file.filename or '/' in file.filename or '\\' in file.filename:
  394. raise CustomException(msg="文件名包含不安全字符")
  395. try:
  396. # 检查文件大小
  397. content = await file.read()
  398. if len(content) > cls.MAX_UPLOAD_SIZE:
  399. raise CustomException(msg=f"文件太大,最大支持{cls.MAX_UPLOAD_SIZE // (1024*1024)}MB")
  400. # 确定上传目录,如果没有指定目标路径,使用静态文件根目录
  401. if target_path is None:
  402. safe_dir = cls._get_resource_root()
  403. else:
  404. safe_dir = cls._get_safe_path(target_path)
  405. # 创建目录(如果不存在)
  406. os.makedirs(safe_dir, exist_ok=True)
  407. # 生成文件路径
  408. filename = file.filename
  409. file_path = os.path.join(safe_dir, filename)
  410. # 检查文件是否已存在
  411. if os.path.exists(file_path):
  412. # 生成唯一文件名
  413. base_name, ext = os.path.splitext(filename)
  414. counter = 1
  415. while os.path.exists(file_path):
  416. new_filename = f"{base_name}_{counter}{ext}"
  417. file_path = os.path.join(safe_dir, new_filename)
  418. counter += 1
  419. filename = os.path.basename(file_path)
  420. # 保存文件(使用已读取的内容)
  421. with open(file_path, 'wb') as f:
  422. f.write(content)
  423. # 获取文件信息
  424. file_info = cls._get_file_info(file_path, base_url)
  425. # 生成文件URL
  426. file_url = cls._generate_http_url(file_path, base_url)
  427. log.info(f"文件上传成功: {filename}")
  428. return ResourceUploadSchema(
  429. filename=filename,
  430. file_url=file_url,
  431. file_size=file_info.get('size', 0),
  432. upload_time=datetime.now()
  433. ).model_dump(mode='json')
  434. except Exception as e:
  435. log.error(f"文件上传失败: {str(e)}")
  436. raise CustomException(msg=f"文件上传失败: {str(e)}")
  437. @classmethod
  438. async def download_file_service(cls, file_path: str, base_url: str | None = None) -> str:
  439. """
  440. 下载文件(返回本地文件系统路径)
  441. 参数:
  442. - file_path (str): 文件路径(可为相对路径、绝对路径或完整URL)。
  443. - base_url (str | None): 基础URL,用于生成完整URL(不再直接返回URL)。
  444. 返回:
  445. - str: 本地文件系统路径。
  446. """
  447. try:
  448. safe_path = cls._get_safe_path(file_path)
  449. if not os.path.exists(safe_path):
  450. raise CustomException(msg='文件不存在')
  451. if not os.path.isfile(safe_path):
  452. raise CustomException(msg='路径不是文件')
  453. # 返回本地文件路径给 FileResponse 使用
  454. log.info(f"定位文件路径: {safe_path}")
  455. return safe_path
  456. except CustomException:
  457. raise
  458. except Exception as e:
  459. log.error(f"下载文件失败: {str(e)}")
  460. raise CustomException(msg=f"下载文件失败: {str(e)}")
  461. @classmethod
  462. async def delete_file_service(cls, paths: list[str]) -> None:
  463. """
  464. 删除文件或目录
  465. 参数:
  466. - paths (list[str]): 文件或目录路径列表。
  467. 返回:
  468. - None
  469. """
  470. if not paths:
  471. raise CustomException(msg='删除失败,删除路径不能为空')
  472. for path in paths:
  473. try:
  474. safe_path = cls._get_safe_path(path)
  475. if not os.path.exists(safe_path):
  476. log.error(f"路径不存在,跳过: {path}")
  477. continue
  478. if os.path.isfile(safe_path):
  479. os.remove(safe_path)
  480. log.info(f"删除文件成功: {safe_path}")
  481. elif os.path.isdir(safe_path):
  482. shutil.rmtree(safe_path)
  483. log.info(f"删除目录成功: {safe_path}")
  484. except Exception as e:
  485. log.error(f"删除失败 {path}: {str(e)}")
  486. raise CustomException(msg=f"删除失败 {path}: {str(e)}")
  487. @classmethod
  488. async def batch_delete_service(cls, paths: list[str]) -> dict[str, list[str]]:
  489. """
  490. 批量删除文件或目录
  491. 参数:
  492. - paths (List[str]): 文件或目录路径列表。
  493. 返回:
  494. - Dict[str, List[str]]: 包含成功删除路径和失败删除路径的字典。
  495. """
  496. if not paths:
  497. raise CustomException(msg='删除失败,删除路径不能为空')
  498. success_paths = []
  499. failed_paths = []
  500. for path in paths:
  501. try:
  502. safe_path = cls._get_safe_path(path)
  503. if not os.path.exists(safe_path):
  504. failed_paths.append(path)
  505. continue
  506. if os.path.isfile(safe_path):
  507. os.remove(safe_path)
  508. success_paths.append(path)
  509. log.info(f"删除文件成功: {safe_path}")
  510. elif os.path.isdir(safe_path):
  511. shutil.rmtree(safe_path)
  512. success_paths.append(path)
  513. log.info(f"删除目录成功: {safe_path}")
  514. except Exception as e:
  515. log.error(f"删除失败 {path}: {str(e)}")
  516. failed_paths.append(path)
  517. return {
  518. "success": success_paths,
  519. "failed": failed_paths
  520. }
  521. @classmethod
  522. async def move_file_service(cls, data: ResourceMoveSchema) -> None:
  523. """
  524. 移动文件或目录
  525. 参数:
  526. - data (ResourceMoveSchema): 包含源路径和目标路径的模型。
  527. 返回:
  528. - None
  529. """
  530. try:
  531. source_path = cls._get_safe_path(data.source_path)
  532. target_path = cls._get_safe_path(data.target_path)
  533. if not os.path.exists(source_path):
  534. raise CustomException(msg='源路径不存在')
  535. # 检查目标路径是否已存在
  536. if os.path.exists(target_path):
  537. if not data.overwrite:
  538. raise CustomException(msg='目标路径已存在')
  539. else:
  540. # 删除目标路径
  541. if os.path.isfile(target_path):
  542. os.remove(target_path)
  543. else:
  544. shutil.rmtree(target_path)
  545. # 确保目标目录存在
  546. target_dir = os.path.dirname(target_path)
  547. os.makedirs(target_dir, exist_ok=True)
  548. # 移动文件
  549. shutil.move(source_path, target_path)
  550. log.info(f"移动成功: {source_path} -> {target_path}")
  551. except CustomException:
  552. raise
  553. except Exception as e:
  554. log.error(f"移动失败: {str(e)}")
  555. raise CustomException(msg=f"移动失败: {str(e)}")
  556. @classmethod
  557. async def copy_file_service(cls, data: ResourceCopySchema) -> None:
  558. """
  559. 复制文件或目录
  560. 参数:
  561. - data (ResourceCopySchema): 包含源路径和目标路径的模型。
  562. 返回:
  563. - None
  564. """
  565. try:
  566. source_path = cls._get_safe_path(data.source_path)
  567. target_path = cls._get_safe_path(data.target_path)
  568. if not os.path.exists(source_path):
  569. raise CustomException(msg='源路径不存在')
  570. # 检查目标路径是否已存在
  571. if os.path.exists(target_path) and not data.overwrite:
  572. raise CustomException(msg='目标路径已存在')
  573. # 确保目标目录存在
  574. target_dir = os.path.dirname(target_path)
  575. os.makedirs(target_dir, exist_ok=True)
  576. # 复制文件或目录
  577. if os.path.isfile(source_path):
  578. shutil.copy2(source_path, target_path)
  579. else:
  580. shutil.copytree(source_path, target_path, dirs_exist_ok=data.overwrite)
  581. log.info(f"复制成功: {source_path} -> {target_path}")
  582. except CustomException:
  583. raise
  584. except Exception as e:
  585. log.error(f"复制失败: {str(e)}")
  586. raise CustomException(msg=f"复制失败: {str(e)}")
  587. @classmethod
  588. async def rename_file_service(cls, data: ResourceRenameSchema) -> None:
  589. """
  590. 重命名文件或目录
  591. 参数:
  592. - data (ResourceRenameSchema): 包含旧路径和新名称的模型。
  593. 返回:
  594. - None
  595. """
  596. try:
  597. old_path = cls._get_safe_path(data.old_path)
  598. if not os.path.exists(old_path):
  599. raise CustomException(msg='文件或目录不存在')
  600. # 生成新路径
  601. parent_dir = os.path.dirname(old_path)
  602. new_path = os.path.join(parent_dir, data.new_name)
  603. if os.path.exists(new_path):
  604. raise CustomException(msg='目标名称已存在')
  605. # 重命名
  606. os.rename(old_path, new_path)
  607. log.info(f"重命名成功: {old_path} -> {new_path}")
  608. except CustomException:
  609. raise
  610. except Exception as e:
  611. log.error(f"重命名失败: {str(e)}")
  612. raise CustomException(msg=f"重命名失败: {str(e)}")
  613. @classmethod
  614. async def create_directory_service(cls, data: ResourceCreateDirSchema) -> None:
  615. """
  616. 创建目录
  617. 参数:
  618. - data (ResourceCreateDirSchema): 包含父目录路径和目录名称的模型。
  619. 返回:
  620. - None
  621. """
  622. try:
  623. parent_path = cls._get_safe_path(data.parent_path)
  624. if not os.path.exists(parent_path):
  625. raise CustomException(msg='父目录不存在')
  626. if not os.path.isdir(parent_path):
  627. raise CustomException(msg='父路径不是目录')
  628. # 生成新目录路径
  629. new_dir_path = os.path.join(parent_path, data.dir_name)
  630. # 安全检查:确保新目录名称不包含路径遍历字符
  631. if '..' in data.dir_name or '/' in data.dir_name or '\\' in data.dir_name:
  632. raise CustomException(msg='目录名称包含不安全字符')
  633. if os.path.exists(new_dir_path):
  634. raise CustomException(msg='目录已存在')
  635. # 创建目录
  636. os.makedirs(new_dir_path)
  637. log.info(f"创建目录成功: {new_dir_path}")
  638. except CustomException:
  639. raise
  640. except Exception as e:
  641. log.error(f"创建目录失败: {str(e)}")
  642. raise CustomException(msg=f"创建目录失败: {str(e)}")
  643. @classmethod
  644. def _format_file_size(cls, size_bytes: int) -> str:
  645. """
  646. 格式化文件大小
  647. 参数:
  648. - size_bytes (int): 文件大小(字节)
  649. 返回:
  650. - str: 格式化后的文件大小字符串(例如:"123.45MB")
  651. """
  652. if size_bytes == 0:
  653. return "0B"
  654. size_names = ["B", "KB", "MB", "GB", "TB"]
  655. i = 0
  656. while size_bytes >= 1024 and i < len(size_names) - 1:
  657. size_bytes = int(size_bytes / 1024)
  658. i += 1
  659. return f"{size_bytes:.2f}{size_names[i]}"