service.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573
  1. # -*- coding: utf-8 -*-
  2. import io
  3. import os
  4. from pathlib import Path
  5. import zipfile
  6. from typing import Any
  7. from sqlglot.expressions import Add, Alter, Create, Delete, Drop, Expression, Insert, Table, TruncateTable, Update
  8. from sqlglot import parse as sqlglot_parse
  9. from app.config.path_conf import BASE_DIR
  10. from app.config.setting import settings
  11. from app.core.logger import log
  12. from app.core.exceptions import CustomException
  13. from app.api.v1.module_system.auth.schema import AuthSchema
  14. from .tools.jinja2_template_util import Jinja2TemplateUtil
  15. from .tools.gen_util import GenUtils
  16. from .schema import GenTableSchema, GenTableOutSchema, GenTableColumnSchema, GenTableColumnOutSchema, GenTableQueryParam
  17. from .crud import GenTableColumnCRUD, GenTableCRUD
  18. def handle_service_exception(func):
  19. async def wrapper(*args, **kwargs):
  20. try:
  21. return await func(*args, **kwargs)
  22. except CustomException:
  23. raise
  24. except Exception as e:
  25. raise CustomException(msg=f'{func.__name__}执行失败: {str(e)}')
  26. return wrapper
  27. class GenTableService:
  28. """代码生成业务表服务层"""
  29. @classmethod
  30. @handle_service_exception
  31. async def get_gen_table_detail_service(cls, auth: AuthSchema, table_id: int) -> dict:
  32. """获取业务表详细信息(含字段与其他表列表)。
  33. - 备注:优先解析`options`为`GenTableOptionSchema`,设置`parent_menu_id`等选项;保证`columns`与`tables`结构完整。
  34. """
  35. gen_table = await cls.get_gen_table_by_id_service(auth, table_id)
  36. return GenTableOutSchema.model_validate(gen_table).model_dump()
  37. @classmethod
  38. @handle_service_exception
  39. async def get_gen_table_list_service(cls, auth: AuthSchema, search: GenTableQueryParam) -> list[dict]:
  40. """
  41. 获取代码生成业务表列表信息。
  42. 参数:
  43. - auth (AuthSchema): 认证信息。
  44. - search (GenTableQueryParam): 查询参数模型。
  45. 返回:
  46. - list[dict]: 包含业务表列表信息的字典列表。
  47. """
  48. gen_table_list_result = await GenTableCRUD(auth=auth).get_gen_table_list(search)
  49. return [GenTableOutSchema.model_validate(obj).model_dump() for obj in gen_table_list_result]
  50. @classmethod
  51. @handle_service_exception
  52. async def get_gen_db_table_list_service(cls, auth: AuthSchema, search: GenTableQueryParam) -> list[Any]:
  53. """获取数据库表列表(跨方言)。
  54. - 备注:返回已转换为字典的结构,适用于前端直接展示;排序参数保留扩展位但当前未使用。
  55. """
  56. gen_db_table_list_result = await GenTableCRUD(auth=auth).get_db_table_list(search)
  57. return gen_db_table_list_result
  58. @classmethod
  59. @handle_service_exception
  60. async def get_gen_db_table_list_by_name_service(cls, auth: AuthSchema, table_names: list[str]) -> list[GenTableOutSchema]:
  61. """根据表名称组获取数据库表信息。
  62. - 校验:如有不存在的表名,抛出明确异常;返回统一的`GenTableOutSchema`列表。
  63. """
  64. # 验证输入参数
  65. if not table_names:
  66. raise CustomException(msg="表名列表不能为空")
  67. gen_db_table_list_result = await GenTableCRUD(auth).get_db_table_list_by_names(table_names)
  68. # 修复:将GenDBTableSchema对象转换为字典后再传递给GenTableOutSchema
  69. result = []
  70. for gen_table in gen_db_table_list_result:
  71. # 确保table_name不为None
  72. if gen_table.table_name is not None:
  73. result.append(GenTableOutSchema(**gen_table.model_dump()))
  74. return result
  75. @classmethod
  76. @handle_service_exception
  77. async def import_gen_table_service(cls, auth: AuthSchema, gen_table_list: list[GenTableOutSchema]) -> bool | None:
  78. """导入表结构到生成器(持久化并初始化列)。
  79. - 备注:避免重复导入;为每列调用`GenUtils.init_column_field`填充默认属性,保留语义一致性。
  80. """
  81. # 检查是否有表需要导入
  82. if not gen_table_list:
  83. raise CustomException(msg="导入的表结构不能为空")
  84. try:
  85. for table in gen_table_list:
  86. table_name = table.table_name
  87. # 检查表是否已存在
  88. existing_table = await GenTableCRUD(auth).get_gen_table_by_name(table_name)
  89. if existing_table:
  90. raise CustomException(msg=f"以下表已存在,不能重复导入: {table_name}")
  91. GenUtils.init_table(table)
  92. if not table.columns:
  93. table.columns = []
  94. add_gen_table = await GenTableCRUD(auth).add_gen_table(GenTableSchema.model_validate(table.model_dump()))
  95. gen_table_columns = await GenTableColumnCRUD(auth).get_gen_db_table_columns_by_name(table_name)
  96. if len(gen_table_columns) > 0:
  97. table.id = add_gen_table.id
  98. for column in gen_table_columns:
  99. column_schema = GenTableColumnSchema(
  100. table_id=table.id,
  101. column_name=column.column_name,
  102. column_comment=column.column_comment,
  103. column_type=column.column_type,
  104. column_length=column.column_length,
  105. column_default=column.column_default,
  106. is_pk=column.is_pk,
  107. is_increment=column.is_increment,
  108. is_nullable=column.is_nullable,
  109. is_unique=column.is_unique,
  110. sort=column.sort,
  111. python_type=column.python_type,
  112. python_field=column.python_field,
  113. )
  114. GenUtils.init_column_field(column_schema, table)
  115. await GenTableColumnCRUD(auth).create_gen_table_column_crud(column_schema)
  116. return True
  117. except Exception as e:
  118. raise CustomException(msg=f'导入失败, {str(e)}')
  119. @classmethod
  120. @handle_service_exception
  121. async def create_table_service(cls, auth: AuthSchema, sql: str) -> bool | None:
  122. """创建表结构并导入至代码生成模块。
  123. - 校验:使用`sqlglot`确保仅包含`CREATE TABLE`语句;失败抛出明确异常。
  124. - 唯一性检查:在创建前检查该表是否已存在于数据库中。
  125. """
  126. # 验证SQL非空
  127. if not sql or not sql.strip():
  128. raise CustomException(msg='SQL语句不能为空')
  129. try:
  130. # 解析SQL语句
  131. sql_statements = sqlglot_parse(sql, dialect=settings.DATABASE_TYPE)
  132. if not sql_statements:
  133. raise CustomException(msg='无法解析SQL语句,请检查SQL语法')
  134. # 校验sql语句是否为合法的建表语句
  135. if not cls.__is_valid_create_table(sql_statements):
  136. raise CustomException(msg='sql语句不是合法的建表语句')
  137. # 获取要创建的表名
  138. table_names = cls.__get_table_names(sql_statements)
  139. # 创建CRUD实例
  140. gen_table_crud = GenTableCRUD(auth=auth)
  141. # 检查每个表是否已存在
  142. for table_name in table_names:
  143. # 检查数据库中是否已存在该表
  144. if await gen_table_crud.check_table_exists(table_name):
  145. raise CustomException(msg=f'表 {table_name} 已存在,请检查并修改表名后重试')
  146. # 检查代码生成模块中是否已导入该表
  147. existing_table = await gen_table_crud.get_gen_table_by_name(table_name)
  148. if existing_table:
  149. raise CustomException(msg=f'表 {table_name} 已在代码生成模块中存在,请检查并修改表名后重试')
  150. # 表不存在,执行SQL语句创建表
  151. result = await gen_table_crud.create_table_by_sql(sql_statements)
  152. if not result:
  153. raise CustomException(msg=f'创建表 {table_names} 失败,请检查SQL语句')
  154. # 导入表结构到代码生成模块 - 简化逻辑,移除多余的None检查
  155. gen_table_list = await cls.get_gen_db_table_list_by_name_service(auth, table_names)
  156. import_result = await cls.import_gen_table_service(auth, gen_table_list)
  157. return import_result
  158. except Exception as e:
  159. raise CustomException(msg=f'创建表结构失败: {str(e)}')
  160. @classmethod
  161. @handle_service_exception
  162. async def execute_sql_service(cls, auth: AuthSchema, gen_table: GenTableOutSchema) -> bool:
  163. """
  164. 执行菜单 SQL(INSERT / DO 块)并写入 sys_menu。
  165. - 仅处理菜单 SQL,不再混杂建表逻辑;
  166. - 文件不存在时给出友好提示;
  167. - 统一异常信息,日志与业务提示分离。
  168. """
  169. sql_path = f'{BASE_DIR}/sql/menu/{gen_table.module_name}/{gen_table.business_name}.sql'
  170. # 文件存在性前置检查,避免多余解析开销
  171. if not os.path.isfile(sql_path):
  172. raise CustomException(msg=f'菜单 SQL 文件不存在: {sql_path}')
  173. sql = Path(sql_path).read_text(encoding='utf-8').strip()
  174. if not sql:
  175. raise CustomException(msg='菜单 SQL 文件内容为空')
  176. # 仅做语法校验,不限制关键字;真正的语义安全由数据库权限控制
  177. try:
  178. statements = sqlglot_parse(sql, dialect=settings.DATABASE_TYPE)
  179. if not statements:
  180. raise CustomException(msg='菜单 SQL 语法解析失败,请检查文件内容')
  181. except Exception as e:
  182. log.error(f'菜单 SQL 解析异常: {e}')
  183. raise CustomException(msg='菜单 SQL 语法错误,请检查文件内容')
  184. # 执行 SQL
  185. try:
  186. await GenTableCRUD(auth).execute_sql(sql)
  187. log.info(f'成功执行菜单 SQL: {sql_path}')
  188. return True
  189. except Exception as e:
  190. log.error(f'菜单 SQL 执行失败: {e}')
  191. raise CustomException(msg='菜单 SQL 执行失败,请确认语句及数据库状态')
  192. @classmethod
  193. def __is_valid_create_table(cls, sql_statements: list[Expression | None]) -> bool:
  194. """
  195. 校验SQL语句是否为合法的建表语句。
  196. 参数:
  197. - sql_statements (list[Expression | None]): SQL的AST列表。
  198. 返回:
  199. - bool: 校验结果。
  200. """
  201. validate_create = [isinstance(sql_statement, Create) for sql_statement in sql_statements]
  202. validate_forbidden_keywords = [
  203. isinstance(
  204. sql_statement,
  205. (Add, Alter, Delete, Drop, Insert, TruncateTable, Update),
  206. )
  207. for sql_statement in sql_statements
  208. ]
  209. if not any(validate_create) or any(validate_forbidden_keywords):
  210. return False
  211. return True
  212. @classmethod
  213. def __get_table_names(cls, sql_statements: list[Expression | None]) -> list[str]:
  214. """
  215. 获取SQL语句中所有的建表表名。
  216. 参数:
  217. - sql_statements (list[Expression | None]): SQL的AST列表。
  218. 返回:
  219. - list[str]: 建表表名列表。
  220. """
  221. table_names = []
  222. for sql_statement in sql_statements:
  223. if isinstance(sql_statement, Create):
  224. table = sql_statement.find(Table)
  225. if table and table.name:
  226. table_names.append(table.name)
  227. return list(set(table_names))
  228. @classmethod
  229. @handle_service_exception
  230. async def update_gen_table_service(cls, auth: AuthSchema, data: GenTableSchema, table_id: int) -> dict[str, Any]:
  231. """编辑业务表信息(含选项与字段)。
  232. - 备注:将`params`序列化写入`options`以持久化;仅更新存在`id`的列,避免误创建。
  233. """
  234. # 处理params为None的情况
  235. gen_table_info = await cls.get_gen_table_by_id_service(auth, table_id)
  236. if gen_table_info.id:
  237. try:
  238. # 直接调用edit_gen_table方法,它会在内部处理排除嵌套字段的逻辑
  239. result = await GenTableCRUD(auth).edit_gen_table(table_id, data)
  240. # 处理data.columns为None的情况
  241. if data.columns:
  242. for gen_table_column in data.columns:
  243. # 确保column有id字段
  244. if hasattr(gen_table_column, 'id') and gen_table_column.id:
  245. column_schema = GenTableColumnSchema(**gen_table_column.model_dump())
  246. await GenTableColumnCRUD(auth).update_gen_table_column_crud(gen_table_column.id, column_schema)
  247. return GenTableOutSchema.model_validate(result).model_dump()
  248. except Exception as e:
  249. raise CustomException(msg=str(e))
  250. else:
  251. raise CustomException(msg='业务表不存在')
  252. @classmethod
  253. @handle_service_exception
  254. async def delete_gen_table_service(cls, auth: AuthSchema, ids: list[int]) -> None:
  255. """删除业务表信息(先删字段,再删表)。"""
  256. # 验证ID列表非空
  257. if not ids:
  258. raise CustomException(msg="ID列表不能为空")
  259. try:
  260. # 先删除相关的字段信息
  261. await GenTableColumnCRUD(auth=auth).delete_gen_table_column_by_table_id_crud(ids)
  262. # 再删除表信息
  263. await GenTableCRUD(auth=auth).delete_gen_table(ids)
  264. except Exception as e:
  265. raise CustomException(msg=str(e))
  266. @classmethod
  267. @handle_service_exception
  268. async def get_gen_table_by_id_service(cls, auth: AuthSchema, table_id: int) -> GenTableOutSchema:
  269. """获取需要生成代码的业务表详细信息。
  270. - 备注:去除SQLAlchemy内部状态;将`None`值转为适配前端的默认值;解析`options`补充选项。
  271. """
  272. gen_table = await GenTableCRUD(auth=auth).get_gen_table_by_id(table_id)
  273. if not gen_table:
  274. raise CustomException(msg='业务表不存在')
  275. result = GenTableOutSchema.model_validate(gen_table)
  276. return result
  277. @classmethod
  278. @handle_service_exception
  279. async def get_gen_table_all_service(cls, auth: AuthSchema) -> list[GenTableOutSchema]:
  280. """获取所有业务表信息(列表)。"""
  281. gen_table_all = await GenTableCRUD(auth=auth).get_gen_table_all() or []
  282. result = []
  283. for gen_table in gen_table_all:
  284. try:
  285. table_out = GenTableOutSchema.model_validate(gen_table)
  286. result.append(table_out)
  287. except Exception as e:
  288. log.error(f"转换业务表时出错: {str(e)}")
  289. continue
  290. return result
  291. @classmethod
  292. @handle_service_exception
  293. async def preview_code_service(cls, auth: AuthSchema, table_id: int) -> dict[str, Any]:
  294. """
  295. 预览代码(根据模板渲染内存结果)。
  296. - 备注:构建Jinja2上下文;根据模板类型与前端类型选择模板清单;返回文件名到内容映射。
  297. """
  298. gen_table = GenTableOutSchema.model_validate(
  299. await GenTableCRUD(auth).get_gen_table_by_id(table_id)
  300. )
  301. await cls.set_pk_column(gen_table)
  302. env = Jinja2TemplateUtil.get_env()
  303. context = Jinja2TemplateUtil.prepare_context(gen_table)
  304. template_list = Jinja2TemplateUtil.get_template_list()
  305. preview_code_result = {}
  306. for template in template_list:
  307. try:
  308. render_content = await env.get_template(template).render_async(**context)
  309. preview_code_result[template] = render_content
  310. except Exception as e:
  311. log.error(f"渲染模板 {template} 时出错: {str(e)}")
  312. # 即使某个模板渲染失败,也继续处理其他模板
  313. preview_code_result[template] = f"渲染错误: {str(e)}"
  314. return preview_code_result
  315. @classmethod
  316. @handle_service_exception
  317. async def generate_code_service(cls, auth: AuthSchema, table_name: str) -> bool:
  318. """生成代码至指定路径(安全写入+可跳过覆盖)。
  319. - 安全:限制写入在项目根目录内;越界路径自动回退到项目根目录。
  320. """
  321. # 验证表名非空
  322. if not table_name or not table_name.strip():
  323. raise CustomException(msg='表名不能为空')
  324. env = Jinja2TemplateUtil.get_env()
  325. render_info = await cls.__get_gen_render_info(auth, table_name)
  326. gen_table_schema = render_info[3]
  327. for template in render_info[0]:
  328. try:
  329. render_content = await env.get_template(template).render_async(**render_info[2])
  330. gen_path = cls.__get_gen_path(gen_table_schema, template)
  331. if not gen_path:
  332. raise CustomException(msg='【代码生成】生成路径为空')
  333. # 确保目录存在
  334. os.makedirs(os.path.dirname(gen_path), exist_ok=True)
  335. with open(gen_path, 'w', encoding='utf-8') as f:
  336. f.write(render_content)
  337. except Exception as e:
  338. raise CustomException(msg=f'渲染模板失败,表名:{gen_table_schema.table_name},详细错误信息:{str(e)}')
  339. await cls.execute_sql_service(auth, gen_table_schema)
  340. return True
  341. @classmethod
  342. @handle_service_exception
  343. async def batch_gen_code_service(cls, auth: AuthSchema, table_names: list[str]) -> bytes:
  344. """
  345. 批量生成代码并打包为ZIP。
  346. - 备注:内存生成并压缩,兼容多模板类型;供下载使用。
  347. """
  348. # 验证表名列表非空
  349. if not table_names:
  350. raise CustomException(msg="表名列表不能为空")
  351. zip_buffer = io.BytesIO()
  352. with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
  353. for table_name in table_names:
  354. if not table_name.strip():
  355. continue
  356. try:
  357. env = Jinja2TemplateUtil.get_env()
  358. render_info = await cls.__get_gen_render_info(auth, table_name)
  359. for template_file, output_file in zip(render_info[0], render_info[1]):
  360. render_content = await env.get_template(template_file).render_async(**render_info[2])
  361. zip_file.writestr(output_file, render_content)
  362. except Exception as e:
  363. log.error(f"批量生成代码时处理表 {table_name} 出错: {str(e)}")
  364. # 继续处理其他表,不中断整个过程
  365. continue
  366. zip_data = zip_buffer.getvalue()
  367. zip_buffer.close()
  368. return zip_data
  369. @classmethod
  370. @handle_service_exception
  371. async def sync_db_service(cls, auth: AuthSchema, table_name: str) -> None:
  372. """同步数据库表结构至生成器(保留用户配置)。
  373. - 备注:按数据库实际字段重建或更新生成器字段;保留字典/查询/展示等用户自定义属性;清理已删除字段。
  374. """
  375. # 验证表名非空
  376. if not table_name or not table_name.strip():
  377. raise CustomException(msg='表名不能为空')
  378. gen_table = await GenTableCRUD(auth).get_gen_table_by_name(table_name)
  379. if not gen_table:
  380. raise CustomException(msg='业务表不存在')
  381. table = GenTableOutSchema.model_validate(gen_table)
  382. if not table.id:
  383. raise CustomException(msg='业务表ID不能为空')
  384. table_columns = table.columns or []
  385. table_column_map = {column.column_name: column for column in table_columns}
  386. # 确保db_table_columns始终是列表类型,避免None值
  387. db_table_columns = await GenTableColumnCRUD(auth).get_gen_db_table_columns_by_name(table_name) or []
  388. db_table_columns = [col for col in db_table_columns if col is not None]
  389. db_table_column_names = [column.column_name for column in db_table_columns]
  390. try:
  391. for column in db_table_columns:
  392. # 仅在缺省时初始化默认属性(包含 table_id 关联)
  393. GenUtils.init_column_field(column, table)
  394. # 利用schema层的默认值,移除多余的None检查
  395. if column.column_name in table_column_map:
  396. prev_column = table_column_map[column.column_name]
  397. # 复用旧记录ID,确保执行更新
  398. if hasattr(prev_column, 'id') and prev_column.id:
  399. column.id = prev_column.id
  400. # 保留用户配置的显示与查询属性 - 使用getattr确保安全访问
  401. if hasattr(prev_column, 'dict_type') and prev_column.dict_type:
  402. column.dict_type = prev_column.dict_type
  403. if hasattr(prev_column, 'query_type') and prev_column.query_type:
  404. column.query_type = prev_column.query_type
  405. if hasattr(prev_column, 'html_type') and prev_column.html_type:
  406. column.html_type = prev_column.html_type
  407. # 保留关键用户自定义属性 - 安全处理is_pk
  408. is_pk_bool = False
  409. if hasattr(prev_column, 'is_pk'):
  410. # 处理不同类型的is_pk值
  411. if isinstance(prev_column.is_pk, bool):
  412. is_pk_bool = prev_column.is_pk
  413. else:
  414. is_pk_bool = str(prev_column.is_pk) == '1'
  415. # 安全处理nullable属性
  416. if hasattr(prev_column, 'is_nullable') and not is_pk_bool:
  417. column.is_nullable = prev_column.is_nullable
  418. # 保留其他重要用户设置
  419. if hasattr(prev_column, 'python_field'):
  420. column.python_field = prev_column.python_field or column.python_field
  421. if hasattr(column, 'id') and column.id:
  422. await GenTableColumnCRUD(auth).update_gen_table_column_crud(column.id, column)
  423. else:
  424. await GenTableColumnCRUD(auth).create_gen_table_column_crud(column)
  425. else:
  426. # 设置table_id以确保新字段能正确关联到表
  427. column.table_id = table.id
  428. await GenTableColumnCRUD(auth).create_gen_table_column_crud(column)
  429. del_columns = [column for column in table_columns if column.column_name not in db_table_column_names]
  430. if del_columns:
  431. for column in del_columns:
  432. if hasattr(column, 'id') and column.id:
  433. await GenTableColumnCRUD(auth).delete_gen_table_column_by_column_id_crud([column.id])
  434. except Exception as e:
  435. raise CustomException(msg=f'同步失败: {str(e)}')
  436. @classmethod
  437. async def set_pk_column(cls, gen_table: GenTableOutSchema) -> None:
  438. """设置主键列信息(主表/子表)。
  439. - 备注:同时兼容`pk`布尔与`is_pk == '1'`字符串两种标识。
  440. """
  441. if gen_table.columns:
  442. for column in gen_table.columns:
  443. # 修复:确保正确检查主键标识
  444. if getattr(column, 'pk', False) or getattr(column, 'is_pk', '') == '1':
  445. gen_table.pk_column = column
  446. break
  447. # 如果没有找到主键列且有列存在,使用第一个列作为主键
  448. if gen_table.pk_column is None and gen_table.columns:
  449. gen_table.pk_column = gen_table.columns[0]
  450. @classmethod
  451. async def __get_gen_render_info(cls, auth: AuthSchema, table_name: str) -> list[Any]:
  452. """
  453. 获取生成代码渲染模板相关信息。
  454. 参数:
  455. - auth (AuthSchema): 认证对象。
  456. - table_name (str): 业务表名称。
  457. 返回:
  458. - list[Any]: [模板列表, 输出文件名列表, 渲染上下文, 业务表对象]。
  459. 异常:
  460. - CustomException: 当业务表不存在或数据转换失败时抛出。
  461. """
  462. gen_table_model = await GenTableCRUD(auth=auth).get_gen_table_by_name(table_name)
  463. # 检查表是否存在
  464. if gen_table_model is None:
  465. raise CustomException(msg=f"业务表 {table_name} 不存在")
  466. gen_table = GenTableOutSchema.model_validate(gen_table_model)
  467. await cls.set_pk_column(gen_table)
  468. context = Jinja2TemplateUtil.prepare_context(gen_table)
  469. template_list = Jinja2TemplateUtil.get_template_list()
  470. output_files = [Jinja2TemplateUtil.get_file_name(template, gen_table) for template in template_list]
  471. return [template_list, output_files, context, gen_table]
  472. @classmethod
  473. def __get_gen_path(cls, gen_table: GenTableOutSchema, template: str) -> str | None:
  474. """根据GenTableOutSchema对象和模板名称生成路径。"""
  475. try:
  476. file_name = Jinja2TemplateUtil.get_file_name(template, gen_table)
  477. # 默认写入到项目根目录(backend的上一级)
  478. project_root = str(BASE_DIR.parent)
  479. full_path = os.path.join(project_root, file_name)
  480. # 确保路径在项目根目录内,防止路径遍历攻击
  481. if not os.path.abspath(full_path).startswith(os.path.abspath(project_root)):
  482. log.error(f"路径越界,回退到项目根目录: {file_name}")
  483. # 回退到项目根目录下的generated文件夹
  484. full_path = os.path.join(project_root, "generated", os.path.basename(file_name))
  485. return full_path
  486. except Exception as e:
  487. log.error(f"生成路径时出错: {str(e)}")
  488. return None
  489. class GenTableColumnService:
  490. """代码生成业务表字段服务层"""
  491. @classmethod
  492. @handle_service_exception
  493. async def get_gen_table_column_list_by_table_id_service(cls, auth: AuthSchema, table_id: int) -> list[dict[str, Any]]:
  494. """获取业务表字段列表信息(输出模型)。"""
  495. gen_table_column_list_result = await GenTableColumnCRUD(auth).list_gen_table_column_crud({"table_id": table_id})
  496. result = [GenTableColumnOutSchema.model_validate(gen_table_column).model_dump() for gen_table_column in gen_table_column_list_result]
  497. return result