crud.py 23 KB


  1. # -*- coding: utf-8 -*-
  2. from sqlalchemy.engine.row import Row
  3. from sqlalchemy import and_, select, text
  4. from typing import Sequence
  5. from sqlglot.expressions import Expression
  6. from app.core.logger import log
  7. from app.config.setting import settings
  8. from app.core.base_crud import CRUDBase
  9. from app.api.v1.module_system.auth.schema import AuthSchema
  10. from .model import GenTableModel, GenTableColumnModel
  11. from .schema import (
  12. GenTableSchema,
  13. GenTableColumnSchema,
  14. GenTableColumnOutSchema,
  15. GenDBTableSchema,
  16. GenTableQueryParam
  17. )
  18. class GenTableCRUD(CRUDBase[GenTableModel, GenTableSchema, GenTableSchema]):
  19. """代码生成业务表模块数据库操作层"""
  20. def __init__(self, auth: AuthSchema) -> None:
  21. """
  22. 初始化CRUD操作层
  23. 参数:
  24. - auth (AuthSchema): 认证信息模型
  25. """
  26. super().__init__(model=GenTableModel, auth=auth)
  27. async def get_gen_table_by_id(self, table_id: int, preload: list | None = None) -> GenTableModel | None:
  28. """
  29. 根据业务表ID获取需要生成的业务表信息。
  30. 参数:
  31. - table_id (int): 业务表ID。
  32. - preload (list | None): 预加载关系,未提供时使用模型默认项
  33. 返回:
  34. - GenTableModel | None: 业务表信息对象。
  35. """
  36. return await self.get(id=table_id, preload=preload)
  37. async def get_gen_table_by_name(self, table_name: str, preload: list | None = None) -> GenTableModel | None:
  38. """
  39. 根据业务表名称获取需要生成的业务表信息。
  40. 参数:
  41. - table_name (str): 业务表名称。
  42. - preload (list | None): 预加载关系,未提供时使用模型默认项
  43. 返回:
  44. - GenTableModel | None: 业务表信息对象。
  45. """
  46. return await self.get(table_name=table_name, preload=preload)
  47. async def get_gen_table_all(self, preload: list | None = None) -> Sequence[GenTableModel]:
  48. """
  49. 获取所有业务表信息。
  50. 参数:
  51. - preload (list | None): 预加载关系,未提供时使用模型默认项
  52. 返回:
  53. - Sequence[GenTableModel]: 所有业务表信息列表。
  54. """
  55. return await self.list(preload=preload)
  56. async def get_gen_table_list(self, search: GenTableQueryParam | None = None, preload: list | None = None) -> Sequence[GenTableModel]:
  57. """
  58. 根据查询参数获取代码生成业务表列表信息。
  59. 参数:
  60. - search (GenTableQueryParam | None): 查询参数对象。
  61. - preload (list | None): 预加载关系,未提供时使用模型默认项
  62. 返回:
  63. - Sequence[GenTableModel]: 业务表列表信息。
  64. """
  65. return await self.list(search=search.__dict__, order_by=[{"created_time": "desc"}], preload=preload)
  66. async def add_gen_table(self, add_model: GenTableSchema) -> GenTableModel:
  67. """
  68. 新增业务表信息。
  69. 参数:
  70. - add_model (GenTableSchema): 新增业务表信息模型。
  71. 返回:
  72. - GenTableModel: 新增的业务表信息对象。
  73. """
  74. return await self.create(data=add_model)
  75. async def edit_gen_table(self, table_id: int, edit_model: GenTableSchema) -> GenTableModel:
  76. """
  77. 修改业务表信息。
  78. 参数:
  79. - table_id (int): 业务表ID。
  80. - edit_model (GenTableSchema): 修改业务表信息模型。
  81. 返回:
  82. - GenTableSchema: 修改后的业务表信息模型。
  83. """
  84. # 排除嵌套对象字段,避免SQLAlchemy尝试直接将字典设置到模型实例上
  85. return await self.update(id=table_id, data=edit_model.model_dump(exclude_unset=True, exclude={"columns"}))
  86. async def delete_gen_table(self, ids: list[int]) -> None:
  87. """
  88. 删除业务表信息。除了系统表。
  89. 参数:
  90. - ids (list[int]): 业务表ID列表。
  91. """
  92. await self.delete(ids=ids)
  93. async def get_db_table_list(self, search: GenTableQueryParam | None = None) -> list[dict]:
  94. """
  95. 根据查询参数获取数据库表列表信息。
  96. 参数:
  97. - search (GenTableQueryParam | None): 查询参数对象。
  98. 返回:
  99. - list[dict]: 数据库表列表信息(已转为可序列化字典)。
  100. """
  101. # 使用更健壮的方式检测数据库方言
  102. if settings.DATABASE_TYPE == "postgres":
  103. query_sql = (
  104. select(
  105. text("t.table_catalog as database_name"),
  106. text("t.table_name as table_name"),
  107. text("t.table_type as table_type"),
  108. text("pd.description as table_comment"),
  109. )
  110. .select_from(text(
  111. "information_schema.tables t \n"
  112. "LEFT JOIN pg_catalog.pg_class c ON c.relname = t.table_name \n"
  113. "LEFT JOIN pg_catalog.pg_namespace n ON n.nspname = t.table_schema AND c.relnamespace = n.oid \n"
  114. "LEFT JOIN pg_catalog.pg_description pd ON pd.objoid = c.oid AND pd.objsubid = 0"
  115. ))
  116. .where(
  117. and_(
  118. text("t.table_catalog = (select current_database())"),
  119. text("t.is_insertable_into = 'YES'"),
  120. text("t.table_schema = 'public'"),
  121. )
  122. )
  123. )
  124. else:
  125. query_sql = (
  126. select(
  127. text("table_schema as database_name"),
  128. text("table_name as table_name"),
  129. text("table_type as table_type"),
  130. text("table_comment as table_comment"),
  131. )
  132. .select_from(text("information_schema.tables"))
  133. .where(
  134. and_(
  135. text("table_schema = (select database())"),
  136. )
  137. )
  138. )
  139. # 动态条件构造
  140. params = {}
  141. if search and search.table_name:
  142. query_sql = query_sql.where(
  143. text("lower(table_name) like lower(:table_name)")
  144. )
  145. params['table_name'] = f"%{search.table_name}%"
  146. if search and search.table_comment:
  147. # 对于PostgreSQL,表注释字段是pd.description,而不是table_comment
  148. if settings.DATABASE_TYPE == "postgres":
  149. query_sql = query_sql.where(
  150. text("lower(pd.description) like lower(:table_comment)")
  151. )
  152. else:
  153. query_sql = query_sql.where(
  154. text("lower(table_comment) like lower(:table_comment)")
  155. )
  156. params['table_comment'] = f"%{search.table_comment}%"
  157. # 执行查询并绑定参数
  158. all_data = (await self.auth.db.execute(query_sql, params)).fetchall()
  159. # 将Row对象转换为字典列表,解决JSON序列化问题
  160. dict_data = []
  161. for row in all_data:
  162. # 检查row是否为Row对象
  163. if isinstance(row, Row):
  164. # 使用._mapping获取字典
  165. dict_row = GenDBTableSchema(**dict(row._mapping)).model_dump()
  166. dict_data.append(dict_row)
  167. else:
  168. dict_row = GenDBTableSchema(**dict(row)).model_dump()
  169. dict_data.append(dict_row)
  170. return dict_data
  171. async def get_db_table_list_by_names(self, table_names: list[str]) -> list[GenDBTableSchema]:
  172. """
  173. 根据业务表名称列表获取数据库表信息。
  174. 参数:
  175. - table_names (list[str]): 业务表名称列表。
  176. 返回:
  177. - list[GenDBTableSchema]: 数据库表信息对象列表。
  178. """
  179. # 处理空列表情况
  180. if not table_names:
  181. return []
  182. # 使用更健壮的方式检测数据库方言
  183. if settings.DATABASE_TYPE == "postgres":
  184. # PostgreSQL使用ANY操作符和正确的参数绑定
  185. query_sql = """
  186. SELECT
  187. t.table_catalog as database_name,
  188. t.table_name as table_name,
  189. t.table_type as table_type,
  190. pd.description as table_comment
  191. FROM
  192. information_schema.tables t
  193. LEFT JOIN pg_catalog.pg_class c ON c.relname = t.table_name
  194. LEFT JOIN pg_catalog.pg_namespace n ON n.nspname = t.table_schema AND c.relnamespace = n.oid
  195. LEFT JOIN pg_catalog.pg_description pd ON pd.objoid = c.oid AND pd.objsubid = 0
  196. WHERE
  197. t.table_catalog = (select current_database())
  198. AND t.is_insertable_into = 'YES'
  199. AND t.table_schema = 'public'
  200. AND t.table_name = ANY(:table_names)
  201. """
  202. else:
  203. query_sql = """
  204. SELECT
  205. table_schema as database_name,
  206. table_name as table_name,
  207. table_type as table_type,
  208. table_comment as table_comment
  209. FROM
  210. information_schema.tables
  211. WHERE
  212. table_schema = (select database())
  213. AND table_name IN :table_names
  214. """
  215. # 创建新的数据库会话上下文来执行查询,避免受外部事务状态影响
  216. try:
  217. # 去重表名列表,避免重复查询
  218. unique_table_names = list(set(table_names))
  219. # 使用只读事务执行查询,不影响主事务
  220. if settings.DATABASE_TYPE == "postgres":
  221. gen_db_table_list = (await self.auth.db.execute(text(query_sql), {"table_names": unique_table_names})).fetchall()
  222. else:
  223. gen_db_table_list = (await self.auth.db.execute(text(query_sql), {"table_names": tuple(unique_table_names)})).fetchall()
  224. except Exception as e:
  225. log.error(f"查询表信息时发生错误: {e}")
  226. # 查询错误时直接抛出,不需要事务处理
  227. raise
  228. # 将Row对象转换为字典列表,解决JSON序列化问题
  229. dict_data = []
  230. for row in gen_db_table_list:
  231. # 检查row是否为Row对象
  232. if isinstance(row, Row):
  233. # 使用._mapping获取字典
  234. dict_row = GenDBTableSchema(**dict(row._mapping))
  235. dict_data.append(dict_row)
  236. else:
  237. dict_row = GenDBTableSchema(**dict(row))
  238. dict_data.append(dict_row)
  239. return dict_data
  240. async def check_table_exists(self, table_name: str) -> bool:
  241. """
  242. 检查数据库中是否已存在指定表名的表。
  243. 参数:
  244. - table_name (str): 要检查的表名。
  245. 返回:
  246. - bool: 如果表存在返回True,否则返回False。
  247. """
  248. try:
  249. # 根据不同数据库类型使用不同的查询方式
  250. if settings.DATABASE_TYPE.lower() == 'mysql':
  251. query = text("SELECT 1 FROM information_schema.tables WHERE table_schema = DATABASE() AND table_name = :table_name")
  252. else:
  253. query = text("SELECT 1 FROM pg_tables WHERE tablename = :table_name")
  254. result = await self.auth.db.execute(query, {"table_name": table_name})
  255. return result.scalar() is not None
  256. except Exception as e:
  257. log.error(f"检查表格存在性时发生错误: {e}")
  258. # 出错时返回False,避免误报表已存在
  259. return False
  260. async def create_table_by_sql(self, sql_statements: list[Expression | None]) -> bool:
  261. """
  262. 根据SQL语句创建表结构。
  263. 参数:
  264. - sql (str): 创建表的SQL语句。
  265. 返回:
  266. - bool: 是否创建成功。
  267. """
  268. try:
  269. # 执行SQL但不手动提交事务,由框架管理事务生命周期
  270. for sql_statement in sql_statements:
  271. if not sql_statement:
  272. continue
  273. sql = sql_statement.sql(dialect=settings.DATABASE_TYPE)
  274. await self.auth.db.execute(text(sql))
  275. return True
  276. except Exception as e:
  277. log.error(f"创建表时发生错误: {e}")
  278. return False
  279. async def execute_sql(self, sql: str) -> bool:
  280. """
  281. 执行SQL语句。
  282. 参数:
  283. - sql (str): 要执行的SQL语句。
  284. 返回:
  285. - bool: 是否执行成功。
  286. """
  287. try:
  288. # 执行SQL但不手动提交事务,由框架管理事务生命周期
  289. await self.auth.db.execute(text(sql))
  290. return True
  291. except Exception as e:
  292. log.error(f"执行SQL时发生错误: {e}")
  293. return False
  294. class GenTableColumnCRUD(CRUDBase[GenTableColumnModel, GenTableColumnSchema, GenTableColumnSchema]):
  295. """代码生成业务表字段模块数据库操作层"""
  296. def __init__(self, auth: AuthSchema) -> None:
  297. """
  298. 初始化CRUD操作层
  299. 参数:
  300. - auth (AuthSchema): 认证信息模型
  301. """
  302. super().__init__(model=GenTableColumnModel, auth=auth)
  303. async def get_gen_table_column_by_id(self, id: int, preload: list | None = None) -> GenTableColumnModel | None:
  304. """根据业务表字段ID获取业务表字段信息。
  305. 参数:
  306. - id (int): 业务表字段ID。
  307. - preload (list | None): 预加载关系,未提供时使用模型默认项
  308. 返回:
  309. - GenTableColumnModel | None: 业务表字段信息对象。
  310. """
  311. return await self.get(id=id, preload=preload)
  312. async def get_gen_table_column_list_by_table_id(self, table_id: int, preload: list | None = None) -> GenTableColumnModel | None:
  313. """根据业务表ID获取业务表字段列表信息。
  314. 参数:
  315. - table_id (int): 业务表ID。
  316. - preload (list | None): 预加载关系,未提供时使用模型默认项
  317. 返回:
  318. - GenTableColumnModel | None: 业务表字段列表信息对象。
  319. """
  320. return await self.get(table_id=table_id, preload=preload)
  321. async def list_gen_table_column_crud_by_table_id(self, table_id: int, order_by: list | None = None, preload: list | None = None) -> Sequence[GenTableColumnModel]:
  322. """根据业务表ID查询业务表字段列表。
  323. 参数:
  324. - table_id (int): 业务表ID。
  325. - order_by (list | None): 排序字段列表,每个元素为{"field": "字段名", "order": "asc" | "desc"}。
  326. - preload (list | None): 预加载关系,未提供时使用模型默认项
  327. 返回:
  328. - Sequence[GenTableColumnModel]: 业务表字段列表信息对象序列。
  329. """
  330. return await self.list(search={"table_id": table_id}, order_by=order_by, preload=preload)
  331. async def get_gen_db_table_columns_by_name(self, table_name: str | None) -> list[GenTableColumnOutSchema]:
  332. """
  333. 根据业务表名称获取业务表字段列表信息。
  334. 参数:
  335. - table_name (str | None): 业务表名称。
  336. 返回:
  337. - list[GenTableColumnOutSchema]: 业务表字段列表信息对象。
  338. """
  339. # 检查表名是否为空
  340. if not table_name:
  341. raise ValueError("数据表名称不能为空")
  342. try:
  343. if settings.DATABASE_TYPE == "mysql":
  344. query_sql = """
  345. SELECT
  346. c.column_name AS column_name,
  347. c.column_comment AS column_comment,
  348. c.column_type AS column_type,
  349. c.character_maximum_length AS column_length,
  350. c.column_default AS column_default,
  351. c.ordinal_position AS sort,
  352. (CASE WHEN c.column_key = 'PRI' THEN 1 ELSE 0 END) AS is_pk,
  353. (CASE WHEN c.extra = 'auto_increment' THEN 1 ELSE 0 END) AS is_increment,
  354. (CASE WHEN (c.is_nullable = 'NO' AND c.column_key != 'PRI') THEN 1 ELSE 0 END) AS is_nullable,
  355. (CASE
  356. WHEN c.column_name IN (
  357. SELECT k.column_name
  358. FROM information_schema.key_column_usage k
  359. JOIN information_schema.table_constraints t
  360. ON k.constraint_name = t.constraint_name
  361. WHERE k.table_schema = c.table_schema
  362. AND k.table_name = c.table_name
  363. AND t.constraint_type = 'UNIQUE'
  364. ) THEN 1 ELSE 0
  365. END) AS is_unique
  366. FROM
  367. information_schema.columns c
  368. WHERE c.table_schema = (SELECT DATABASE())
  369. AND c.table_name = :table_name
  370. ORDER BY
  371. c.ordinal_position
  372. """
  373. else:
  374. query_sql = """
  375. SELECT
  376. c.column_name AS column_name,
  377. COALESCE(pgd.description, '') AS column_comment,
  378. c.udt_name AS column_type,
  379. c.character_maximum_length AS column_length,
  380. c.column_default AS column_default,
  381. c.ordinal_position AS sort,
  382. (CASE WHEN EXISTS (
  383. SELECT 1 FROM information_schema.table_constraints tc
  384. JOIN information_schema.constraint_column_usage ccu ON tc.constraint_name = ccu.constraint_name
  385. WHERE tc.table_name = c.table_name
  386. AND tc.constraint_type = 'PRIMARY KEY'
  387. AND ccu.column_name = c.column_name
  388. ) THEN 1 ELSE 0 END) AS is_pk,
  389. (CASE WHEN c.column_default LIKE 'nextval%' THEN 1 ELSE 0 END) AS is_increment,
  390. (CASE WHEN c.is_nullable = 'NO' THEN 1 ELSE 0 END) AS is_nullable,
  391. (CASE WHEN EXISTS (
  392. SELECT 1 FROM information_schema.table_constraints tc
  393. JOIN information_schema.constraint_column_usage ccu ON tc.constraint_name = ccu.constraint_name
  394. WHERE tc.table_name = c.table_name
  395. AND tc.constraint_type = 'UNIQUE'
  396. AND ccu.column_name = c.column_name
  397. ) THEN 1 ELSE 0 END) AS is_unique
  398. FROM
  399. information_schema.columns c
  400. LEFT JOIN pg_catalog.pg_description pgd ON
  401. pgd.objoid = (SELECT oid FROM pg_class WHERE relname = c.table_name)
  402. AND pgd.objsubid = c.ordinal_position
  403. WHERE c.table_catalog = current_database()
  404. AND c.table_schema = 'public'
  405. AND c.table_name = :table_name
  406. ORDER BY
  407. c.ordinal_position
  408. """
  409. query = text(query_sql).bindparams(table_name=table_name)
  410. result = await self.auth.db.execute(query)
  411. rows = result.fetchall() if result else []
  412. # 确保rows是可迭代对象
  413. if not rows:
  414. return []
  415. columns_list = []
  416. for row in rows:
  417. # 防御性编程:检查row是否有足够的元素
  418. if len(row) >= 10:
  419. columns_list.append(
  420. GenTableColumnOutSchema(
  421. column_name=row[0],
  422. column_comment=row[1],
  423. column_type=row[2],
  424. column_length=str(row[3]) if row[3] is not None else '',
  425. column_default=str(row[4]) if row[4] is not None else '',
  426. sort=row[5],
  427. is_pk=row[6],
  428. is_increment=row[7],
  429. is_nullable=row[8],
  430. is_unique=row[9],
  431. )
  432. )
  433. return columns_list
  434. except Exception as e:
  435. log.error(f"获取表{table_name}的字段列表时出错: {str(e)}")
  436. # 确保即使出错也返回空列表而不是None
  437. raise
  438. async def list_gen_table_column_crud(self, search: dict | None = None, order_by: list | None = None, preload: list | None = None) -> Sequence[GenTableColumnModel]:
  439. """根据业务表字段查询业务表字段列表。
  440. 参数:
  441. - search (dict | None): 查询参数,例如{"table_id": 1}。
  442. - order_by (list | None): 排序字段列表,每个元素为{"field": "字段名", "order": "asc" | "desc"}。
  443. - preload (list | None): 预加载关系,未提供时使用模型默认项
  444. 返回:
  445. - Sequence[GenTableColumnModel]: 业务表字段列表信息对象序列。
  446. """
  447. return await self.list(search=search, order_by=order_by, preload=preload)
  448. async def create_gen_table_column_crud(self, data: GenTableColumnSchema) -> GenTableColumnModel | None:
  449. """创建业务表字段。
  450. 参数:
  451. - data (GenTableColumnSchema): 业务表字段模型。
  452. 返回:
  453. - GenTableColumnModel | None: 业务表字段列表信息对象。
  454. """
  455. return await self.create(data=data)
  456. async def update_gen_table_column_crud(self, id: int, data: GenTableColumnSchema) -> GenTableColumnModel | None:
  457. """更新业务表字段。
  458. 参数:
  459. - id (int): 业务表字段ID。
  460. - data (GenTableColumnSchema): 业务表字段模型。
  461. 返回:
  462. - GenTableColumnModel | None: 业务表字段列表信息对象。
  463. """
  464. # 将对象转换为字典,避免SQLAlchemy直接操作对象时出现的状态问题
  465. data_dict = data.model_dump(exclude_unset=True)
  466. return await self.update(id=id, data=data_dict)
  467. async def delete_gen_table_column_by_table_id_crud(self, table_ids: list[int]) -> None:
  468. """根据业务表ID批量删除业务表字段。
  469. 参数:
  470. - table_ids (list[int]): 业务表ID列表。
  471. 返回:
  472. - None
  473. """
  474. # 先查询出这些表ID对应的所有字段ID
  475. query = select(GenTableColumnModel.id).where(GenTableColumnModel.table_id.in_(table_ids))
  476. result = await self.auth.db.execute(query)
  477. column_ids = [row[0] for row in result.fetchall()]
  478. # 如果有字段ID,则删除这些字段
  479. if column_ids:
  480. await self.delete(ids=column_ids)
  481. async def delete_gen_table_column_by_column_id_crud(self, column_ids: list[int]) -> None:
  482. """根据业务表字段ID批量删除业务表字段。
  483. 参数:
  484. - column_ids (list[int]): 业务表字段ID列表。
  485. 返回:
  486. - None
  487. """
  488. return await self.delete(ids=column_ids)