common_util.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399
  1. # -*- coding: utf-8 -*-
  2. import importlib
  3. import re
  4. import uuid
  5. from pathlib import Path
  6. from typing import Any, Literal, Sequence, Generator
  7. from sqlalchemy.orm import DeclarativeBase
  8. from sqlalchemy.engine.row import Row
  9. from sqlalchemy.orm.collections import InstrumentedList
  10. from sqlalchemy.sql.expression import TextClause, null
  11. from app.config.setting import settings
  12. from app.core.logger import log
  13. from app.core.exceptions import CustomException
  14. def import_module(module: str, desc: str) -> Any:
  15. """
  16. 动态导入模块
  17. 参数:
  18. - module (str): 模块名称。
  19. - desc (str): 模块描述。
  20. 返回:
  21. - Any: 模块对象。
  22. """
  23. try:
  24. module_path, module_class = module.rsplit(".", 1)
  25. module = importlib.import_module(module_path)
  26. return getattr(module, module_class)
  27. except ModuleNotFoundError:
  28. log.error(f"❗️ 导入{desc}失败,未找到模块:{module}")
  29. raise
  30. except AttributeError:
  31. log.error(f"❗ ️导入{desc}失败,未找到模块方法:{module}")
  32. raise
  33. async def import_modules_async(modules: list, desc: str, **kwargs) -> None:
  34. """
  35. 异步导入模块列表
  36. 参数:
  37. - modules (list[str]): 模块列表。
  38. - desc (str): 模块描述。
  39. - kwargs: 额外参数。
  40. 返回:
  41. - None
  42. """
  43. for module in modules:
  44. if not module:
  45. continue
  46. try:
  47. module_path = module[0:module.rindex(".")]
  48. module_name = module[module.rindex(".") + 1:]
  49. module_obj = importlib.import_module(module_path)
  50. await getattr(module_obj, module_name)(**kwargs)
  51. except ModuleNotFoundError:
  52. log.error(f"❌️ 导入{desc}失败,未找到模块:{module}")
  53. raise
  54. except AttributeError:
  55. log.error(f"❌️ 导入{desc}失败,未找到模块方法:{module}")
  56. raise
  57. def get_random_character() -> str:
  58. """
  59. 生成随机字符串
  60. 返回:
  61. - str: 随机字符串。
  62. """
  63. return uuid.uuid4().hex
  64. def uuid4_str() -> str:
  65. """数据库引擎 UUID 类型兼容性解决方案"""
  66. return str(uuid.uuid4())
  67. def get_parent_id_map(model_list: Sequence[DeclarativeBase]) -> dict[int, int]:
  68. """
  69. 获取父级 ID 映射字典
  70. 参数:
  71. - model_list (Sequence[DeclarativeBase]): 模型列表。
  72. 返回:
  73. - Dict[int, int]: {id: parent_id} 映射字典。
  74. """
  75. return {item.id: item.parent_id for item in model_list}
  76. def get_parent_recursion(id: int, id_map: dict[int, int], ids: list[int] | None = None) -> list[int]:
  77. """
  78. 递归获取所有父级 ID
  79. 参数:
  80. - id (int): 当前 ID。
  81. - id_map (dict[int, int]): ID 映射字典。
  82. - ids (list[int] | None): 已收集的 ID 列表。
  83. 返回:
  84. - list[int]: 所有父级 ID 列表。
  85. """
  86. ids = ids or []
  87. if id in ids:
  88. raise CustomException(msg="递归获取父级ID失败,不可以自引用")
  89. ids.append(id)
  90. parent_id = id_map.get(id)
  91. if parent_id:
  92. get_parent_recursion(parent_id, id_map, ids)
  93. return ids
  94. def get_child_id_map(model_list: Sequence[DeclarativeBase]) -> dict[int, list[int]]:
  95. """
  96. 获取子级 ID 映射字典
  97. 参数:
  98. - model_list (Sequence[DeclarativeBase]): 模型列表。
  99. 返回:
  100. - Dict[int, List[int]]: {id: [child_ids]} 映射字典。
  101. """
  102. data_map = {}
  103. for model in model_list:
  104. data_map.setdefault(model.id, [])
  105. if model.parent_id:
  106. data_map.setdefault(model.parent_id, []).append(model.id)
  107. return data_map
  108. def get_child_recursion(id: int, id_map: dict[int, list[int]], ids: list[int] | None= None) -> list[int]:
  109. """
  110. 递归获取所有子级 ID
  111. 参数:
  112. - id (int): 当前 ID。
  113. - id_map (dict[int, list[int]]): ID 映射字典。
  114. - ids (list[int] | None): 已收集的 ID 列表。
  115. 返回:
  116. - list[int]: 所有子级 ID 列表。
  117. """
  118. ids = ids or []
  119. ids.append(id)
  120. for child in id_map.get(id, []):
  121. get_child_recursion(child, id_map, ids)
  122. return ids
  123. def traversal_to_tree(nodes: list[dict[str, Any]]) -> list[dict[str, Any]]:
  124. """
  125. 通过遍历算法构造树形结构
  126. 参数:
  127. - nodes (list[dict[str, Any]]): 树节点列表。
  128. 返回:
  129. - list[dict[str, Any]]: 构造后的树形结构列表。
  130. """
  131. tree: list[dict[str, Any]] = []
  132. node_dict = {node['id']: node for node in nodes}
  133. for node in nodes:
  134. # 确保每个节点都有children字段,即使没有子节点也设置为null
  135. if 'children' not in node:
  136. node['children'] = None
  137. parent_id = node['parent_id']
  138. if parent_id is None:
  139. tree.append(node)
  140. else:
  141. parent_node = node_dict.get(parent_id)
  142. if parent_node is not None:
  143. if 'children' not in parent_node or parent_node['children'] is None:
  144. parent_node['children'] = []
  145. if node not in parent_node['children']:
  146. parent_node['children'].append(node)
  147. else:
  148. if node not in tree:
  149. tree.append(node)
  150. # 确保所有节点都有children字段
  151. for node in tree:
  152. if 'children' not in node:
  153. node['children'] = None
  154. return tree
  155. def recursive_to_tree(nodes: list[dict[str, Any]], *, parent_id: int | None = None) -> list[dict[str, Any]]:
  156. """
  157. 通过递归算法构造树形结构(性能影响较大)
  158. 参数:
  159. - nodes (list[dict[str, Any]]): 树节点列表。
  160. - parent_id (int | None): 父节点 ID,默认为 None 表示根节点。
  161. 返回:
  162. - list[dict[str, Any]]: 构造后的树形结构列表。
  163. """
  164. tree: list[dict[str, Any]] = []
  165. for node in nodes:
  166. if node['parent_id'] == parent_id:
  167. child_nodes = recursive_to_tree(nodes, parent_id=node['id'])
  168. if child_nodes:
  169. node['children'] = child_nodes
  170. tree.append(node)
  171. return tree
  172. def bytes2human(n: int, format_str: str = '%(value).1f%(symbol)s') -> str:
  173. """
  174. 字节数转人类可读格式
  175. Used by various scripts. See:
  176. http://goo.gl/zeJZl
  177. >>> bytes2human(10000)
  178. '9.8K'
  179. >>> bytes2human(100001221)
  180. '95.4M'
  181. 参数:
  182. - n (int): 字节数。
  183. - format_str (str): 格式化字符串,默认 '%(value).1f%(symbol)s'。
  184. 返回:
  185. - str: 可读的字节字符串,如 '1.5MB'。
  186. """
  187. symbols = ('B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB')
  188. prefix = {s: 1 << (i + 1) * 10 for i, s in enumerate(symbols[1:])}
  189. for symbol in reversed(symbols[1:]):
  190. if n >= prefix[symbol]:
  191. value = float(n) / prefix[symbol]
  192. return format_str % locals()
  193. return format_str % dict(symbol=symbols[0], value=n)
  194. def bytes2file_response(bytes_info: bytes) -> Generator[bytes, Any, None]:
  195. """生成文件响应"""
  196. yield bytes_info
  197. def get_filepath_from_url(url: str) -> Path:
  198. """
  199. 工具方法:根据请求参数获取文件路径
  200. 参数:
  201. - url (str): 请求参数中的 url 参数。
  202. 返回:
  203. - Path: 文件路径。
  204. """
  205. file_info = url.split('?')[1].split('&')
  206. task_id = file_info[0].split('=')[1]
  207. file_name = file_info[1].split('=')[1]
  208. task_path = file_info[2].split('=')[1]
  209. filepath = settings.STATIC_ROOT.joinpath(task_path, task_id, file_name)
  210. return filepath
  211. class SqlalchemyUtil:
  212. """
  213. sqlalchemy工具类
  214. """
  215. @classmethod
  216. def base_to_dict(
  217. cls, obj: DeclarativeBase | dict[str, Any], transform_case: Literal['no_case', 'snake_to_camel', 'camel_to_snake'] = 'no_case'
  218. ):
  219. """
  220. 将sqlalchemy模型对象转换为字典
  221. :param obj: sqlalchemy模型对象或普通字典
  222. :param transform_case: 转换得到的结果形式,可选的有'no_case'(不转换)、'snake_to_camel'(下划线转小驼峰)、'camel_to_snake'(小驼峰转下划线),默认为'no_case'
  223. :return: 字典结果
  224. """
  225. if isinstance(obj, DeclarativeBase):
  226. base_dict = obj.__dict__.copy()
  227. base_dict.pop('_sa_instance_state', None)
  228. for name, value in base_dict.items():
  229. if isinstance(value, InstrumentedList):
  230. base_dict[name] = cls.serialize_result(value, 'snake_to_camel')
  231. elif isinstance(obj, dict):
  232. base_dict = obj.copy()
  233. if transform_case == 'snake_to_camel':
  234. return {CamelCaseUtil.snake_to_camel(k): v for k, v in base_dict.items()}
  235. elif transform_case == 'camel_to_snake':
  236. return {SnakeCaseUtil.camel_to_snake(k): v for k, v in base_dict.items()}
  237. return base_dict
  238. @classmethod
  239. def serialize_result(
  240. cls, result: Any, transform_case: Literal['no_case', 'snake_to_camel', 'camel_to_snake'] = 'no_case'
  241. ):
  242. """
  243. 将sqlalchemy查询结果序列化
  244. :param result: sqlalchemy查询结果
  245. :param transform_case: 转换得到的结果形式,可选的有'no_case'(不转换)、'snake_to_camel'(下划线转小驼峰)、'camel_to_snake'(小驼峰转下划线),默认为'no_case'
  246. :return: 序列化结果
  247. """
  248. if isinstance(result, (DeclarativeBase, dict)):
  249. return cls.base_to_dict(result, transform_case)
  250. elif isinstance(result, list):
  251. return [cls.serialize_result(row, transform_case) for row in result]
  252. elif isinstance(result, Row):
  253. if all([isinstance(row, DeclarativeBase) for row in result]):
  254. return [cls.base_to_dict(row, transform_case) for row in result]
  255. elif any([isinstance(row, DeclarativeBase) for row in result]):
  256. return [cls.serialize_result(row, transform_case) for row in result]
  257. else:
  258. result_dict = result._asdict()
  259. if transform_case == 'snake_to_camel':
  260. return {CamelCaseUtil.snake_to_camel(k): v for k, v in result_dict.items()}
  261. elif transform_case == 'camel_to_snake':
  262. return {SnakeCaseUtil.camel_to_snake(k): v for k, v in result_dict.items()}
  263. return result_dict
  264. return result
  265. @classmethod
  266. def get_server_default_null(cls, dialect_name: str, need_explicit_null: bool = True) -> TextClause | None:
  267. """
  268. 根据数据库方言动态返回值为null的server_default
  269. :param dialect_name: 数据库方言名称
  270. :param need_explicit_null: 是否需要显式DEFAULT NULL
  271. :return: 不同数据库方言对应的null_server_default
  272. """
  273. if need_explicit_null and dialect_name == 'postgres':
  274. return null()
  275. return None
  276. class CamelCaseUtil:
  277. """
  278. 下划线形式(snake_case)转小驼峰形式(camelCase)工具方法
  279. """
  280. @classmethod
  281. def snake_to_camel(cls, snake_str: str):
  282. """
  283. 下划线形式字符串(snake_case)转换为小驼峰形式字符串(camelCase)
  284. :param snake_str: 下划线形式字符串
  285. :return: 小驼峰形式字符串
  286. """
  287. # 分割字符串
  288. words = snake_str.split('_')
  289. # 小驼峰命名,第一个词首字母小写,其余词首字母大写
  290. # return words[0] + ''.join(word.capitalize() for word in words[1:])
  291. # 大驼峰命名,所有词首字母大写
  292. return ''.join(word.capitalize() for word in words)
  293. @classmethod
  294. def transform_result(cls, result: Any):
  295. """
  296. 针对不同类型将下划线形式(snake_case)批量转换为小驼峰形式(camelCase)方法
  297. :param result: 输入数据
  298. :return: 小驼峰形式结果
  299. """
  300. return SqlalchemyUtil.serialize_result(result=result, transform_case='snake_to_camel')
  301. class SnakeCaseUtil:
  302. """
  303. 小驼峰形式(camelCase)转下划线形式(snake_case)工具方法
  304. """
  305. @classmethod
  306. def camel_to_snake(cls, camel_str: str):
  307. """
  308. 小驼峰形式字符串(camelCase)转换为下划线形式字符串(snake_case)
  309. :param camel_str: 小驼峰形式字符串
  310. :return: 下划线形式字符串
  311. """
  312. # 在大写字母前添加一个下划线,然后将整个字符串转为小写
  313. words = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', camel_str)
  314. return re.sub('([a-z0-9])([A-Z])', r'\1_\2', words).lower()
  315. @classmethod
  316. def transform_result(cls, result: Any):
  317. """
  318. 针对不同类型将下划线形式(snake_case)批量转换为小驼峰形式(camelCase)方法
  319. :param result: 输入数据
  320. :return: 小驼峰形式结果
  321. """
  322. return SqlalchemyUtil.serialize_result(result=result, transform_case='camel_to_snake')