service.py 23 KB


  1. # -*- coding: utf-8 -*-
  2. import json
  3. from redis.asyncio.client import Redis
  4. from app.common.enums import RedisInitKeyConfig
  5. from app.utils.excel_util import ExcelUtil
  6. from app.core.database import async_db_session
  7. from app.core.base_schema import BatchSetAvailable
  8. from app.core.redis_crud import RedisCURD
  9. from app.core.exceptions import CustomException
  10. from app.core.logger import log
  11. from app.api.v1.module_system.auth.schema import AuthSchema
  12. from .schema import (
  13. DictDataCreateSchema,
  14. DictDataOutSchema,
  15. DictDataUpdateSchema,
  16. DictTypeCreateSchema,
  17. DictTypeOutSchema,
  18. DictTypeUpdateSchema,
  19. DictDataQueryParam,
  20. DictTypeQueryParam
  21. )
  22. from .crud import DictDataCRUD, DictTypeCRUD
  23. class DictTypeService:
  24. """
  25. 字典类型管理模块服务层
  26. """
  27. @classmethod
  28. async def get_obj_detail_service(cls, auth: AuthSchema, id: int) -> dict:
  29. """
  30. 获取数据字典类型详情
  31. 参数:
  32. - auth (AuthSchema): 认证信息模型
  33. - id (int): 数据字典类型ID
  34. 返回:
  35. - dict: 数据字典类型详情字典
  36. """
  37. obj = await DictTypeCRUD(auth).get_obj_by_id_crud(id=id)
  38. return DictTypeOutSchema.model_validate(obj).model_dump()
  39. @classmethod
  40. async def get_obj_list_service(cls, auth: AuthSchema, search: DictTypeQueryParam | None = None, order_by: list[dict] | None = None) -> list[dict]:
  41. """
  42. 获取数据字典类型列表
  43. 参数:
  44. - auth (AuthSchema): 认证信息模型
  45. - search (DictTypeQueryParam | None): 搜索条件模型
  46. - order_by (list[dict] | None): 排序字段列表
  47. 返回:
  48. - list[dict]: 数据字典类型详情字典列表
  49. """
  50. obj_list = await DictTypeCRUD(auth).get_obj_list_crud(search=search.__dict__, order_by=order_by)
  51. return [DictTypeOutSchema.model_validate(obj).model_dump() for obj in obj_list]
  52. @classmethod
  53. async def create_obj_service(cls, auth: AuthSchema, redis: Redis, data: DictTypeCreateSchema) -> dict:
  54. """
  55. 创建数据字典类型
  56. 参数:
  57. - auth (AuthSchema): 认证信息模型
  58. - redis (Redis): Redis客户端
  59. - data (DictTypeCreateSchema): 数据字典类型创建模型
  60. 返回:
  61. - dict: 数据字典类型详情字典
  62. """
  63. exist_obj = await DictTypeCRUD(auth).get(dict_name=data.dict_name)
  64. if exist_obj:
  65. raise CustomException(msg='创建失败,该数据字典类型已存在')
  66. obj = await DictTypeCRUD(auth).create_obj_crud(data=data)
  67. new_obj_dict = DictTypeOutSchema.model_validate(obj).model_dump()
  68. redis_key = f"{RedisInitKeyConfig.SYSTEM_DICT.key}:{data.dict_type}"
  69. try:
  70. await RedisCURD(redis).set(
  71. key=redis_key,
  72. value="",
  73. )
  74. log.info(f"创建字典类型成功: {new_obj_dict}")
  75. except Exception as e:
  76. log.error(f"创建字典类型失败: {e}")
  77. raise CustomException(msg=f"创建字典类型失败 {e}")
  78. return new_obj_dict
  79. @classmethod
  80. async def update_obj_service(cls, auth: AuthSchema, redis: Redis, id:int, data: DictTypeUpdateSchema) -> dict:
  81. """
  82. 更新数据字典类型
  83. 参数:
  84. - auth (AuthSchema): 认证信息模型
  85. - redis (Redis): Redis客户端
  86. - id (int): 数据字典类型ID
  87. - data (DictTypeUpdateSchema): 数据字典类型更新模型
  88. 返回:
  89. - dict: 数据字典类型详情字典
  90. """
  91. exist_obj = await DictTypeCRUD(auth).get_obj_by_id_crud(id=id)
  92. if not exist_obj:
  93. raise CustomException(msg='更新失败,该数据字典类型不存在')
  94. if exist_obj.dict_name != data.dict_name:
  95. raise CustomException(msg='更新失败,数据字典类型名称不可以修改')
  96. dict_data_list = []
  97. # 如果字典类型修改或状态变更,则修改对应字典数据的类型和状态,并更新Redis缓存
  98. if exist_obj.dict_type != data.dict_type or exist_obj.status != data.status:
  99. # 检查字典数据类型是否被修改
  100. exist_obj_type_list = await DictDataCRUD(auth).list(search={'dict_type': exist_obj.dict_type})
  101. if exist_obj_type_list:
  102. for item in exist_obj_type_list:
  103. item.dict_type = data.dict_type
  104. dict_data = DictDataUpdateSchema(
  105. dict_sort=item.dict_sort,
  106. dict_label=item.dict_label,
  107. dict_value=item.dict_value,
  108. dict_type=data.dict_type,
  109. dict_type_id=item.dict_type_id,
  110. css_class=item.css_class,
  111. list_class=item.list_class,
  112. is_default=item.is_default,
  113. status=data.status,
  114. description=item.description
  115. )
  116. obj = await DictDataCRUD(auth).update_obj_crud(id=item.id, data=dict_data)
  117. dict_data_list.append(DictDataOutSchema.model_validate(obj).model_dump())
  118. obj = await DictTypeCRUD(auth).update_obj_crud(id=id, data=data)
  119. new_obj_dict = DictTypeOutSchema.model_validate(obj).model_dump()
  120. redis_key = f"{RedisInitKeyConfig.SYSTEM_DICT.key}:{data.dict_type}"
  121. try:
  122. # 获取当前字典类型的所有字典数据,确保包含最新状态
  123. dict_data_list = await DictDataCRUD(auth).get_obj_list_crud(search={'dict_type': data.dict_type})
  124. dict_data = [DictDataOutSchema.model_validate(row).model_dump() for row in dict_data_list if row]
  125. value = json.dumps(dict_data, ensure_ascii=False)
  126. await RedisCURD(redis).set(
  127. key=redis_key,
  128. value=value,
  129. )
  130. log.info(f"更新字典类型成功并刷新缓存: {new_obj_dict}")
  131. except Exception as e:
  132. log.error(f"更新字典类型缓存失败: {e}")
  133. raise CustomException(msg=f"更新字典类型缓存失败 {e}")
  134. return new_obj_dict
  135. @classmethod
  136. async def delete_obj_service(cls, auth: AuthSchema, redis: Redis, ids: list[int]) -> None:
  137. """
  138. 删除数据字典类型
  139. 参数:
  140. - auth (AuthSchema): 认证信息模型
  141. - redis (Redis): Redis客户端
  142. - ids (list[int]): 数据字典类型ID列表
  143. 返回:
  144. - None
  145. """
  146. if len(ids) < 1:
  147. raise CustomException(msg='删除失败,删除对象不能为空')
  148. for id in ids:
  149. exist_obj = await DictTypeCRUD(auth).get_obj_by_id_crud(id=id)
  150. if not exist_obj:
  151. raise CustomException(msg='删除失败,该数据字典类型不存在')
  152. # 检查是否有字典数据
  153. exist_obj_type_list = await DictDataCRUD(auth).list(search={'dict_type': id})
  154. if len(exist_obj_type_list) > 0:
  155. # 如果有字典数据,不能删除
  156. raise CustomException(msg='删除失败,该数据字典类型下存在字典数据')
  157. # 删除Redis缓存
  158. redis_key = f"{RedisInitKeyConfig.SYSTEM_DICT.key}:{exist_obj.dict_type}"
  159. try:
  160. await RedisCURD(redis).delete(redis_key)
  161. log.info(f"删除字典类型成功: {id}")
  162. except Exception as e:
  163. log.error(f"删除字典类型失败: {e}")
  164. raise CustomException(msg=f"删除字典类型失败")
  165. await DictTypeCRUD(auth).delete_obj_crud(ids=ids)
  166. @classmethod
  167. async def set_obj_available_service(cls, auth: AuthSchema, data: BatchSetAvailable) -> None:
  168. """
  169. 设置数据字典类型状态
  170. 参数:
  171. - auth (AuthSchema): 认证信息模型
  172. - data (BatchSetAvailable): 批量设置状态模型
  173. 返回:
  174. - None
  175. """
  176. await DictTypeCRUD(auth).set_obj_available_crud(ids=data.ids, status=data.status)
  177. @classmethod
  178. async def export_obj_service(cls, data_list: list[dict]) -> bytes:
  179. """
  180. 导出数据字典类型列表
  181. 参数:
  182. - data_list (list[dict]): 数据字典类型列表
  183. 返回:
  184. - bytes: Excel文件字节流
  185. """
  186. mapping_dict = {
  187. 'id': '编号',
  188. 'dict_name': '字典名称',
  189. 'dict_type': '字典类型',
  190. 'status': '状态',
  191. 'description': '备注',
  192. 'created_time': '创建时间',
  193. 'updated_time': '更新时间',
  194. 'created_id': '创建者ID',
  195. 'updated_id': '更新者ID',
  196. }
  197. # 复制数据并转换状态
  198. data = data_list.copy()
  199. for item in data:
  200. # 处理状态
  201. item['status'] = '启用' if item.get('status') == '0' else '停用'
  202. item['creator'] = item.get('creator', {}).get('name', '未知') if isinstance(item.get('creator'), dict) else '未知'
  203. return ExcelUtil.export_list2excel(list_data=data, mapping_dict=mapping_dict)
  204. class DictDataService:
  205. """
  206. 字典数据管理模块服务层
  207. """
  208. @classmethod
  209. async def get_obj_detail_service(cls, auth: AuthSchema, id: int) -> dict:
  210. """
  211. 获取数据字典数据详情
  212. 参数:
  213. - auth (AuthSchema): 认证信息模型
  214. - id (int): 数据字典数据ID
  215. 返回:
  216. - dict: 数据字典数据详情字典
  217. """
  218. obj = await DictDataCRUD(auth).get_obj_by_id_crud(id=id)
  219. return DictDataOutSchema.model_validate(obj).model_dump()
  220. @classmethod
  221. async def get_obj_list_service(cls, auth: AuthSchema, search: DictDataQueryParam | None = None, order_by: list[dict] | None = None) -> list[dict]:
  222. """
  223. 获取数据字典数据列表
  224. 参数:
  225. - auth (AuthSchema): 认证信息模型
  226. - search (DictDataQueryParam | None): 搜索条件模型
  227. - order_by (list[dict] | None): 排序字段列表
  228. 返回:
  229. - list[dict]: 数据字典数据详情字典列表
  230. """
  231. obj_list = await DictDataCRUD(auth).get_obj_list_crud(search=search.__dict__, order_by=order_by)
  232. return [DictDataOutSchema.model_validate(obj).model_dump() for obj in obj_list]
  233. @classmethod
  234. async def init_dict_service(cls, redis: Redis):
  235. """
  236. 应用初始化: 获取所有字典类型对应的字典数据信息并缓存service
  237. 参数:
  238. - redis (Redis): Redis客户端
  239. 返回:
  240. - None
  241. """
  242. try:
  243. async with async_db_session() as session:
  244. async with session.begin():
  245. # 在初始化过程中,不需要检查数据权限
  246. auth = AuthSchema(db=session, check_data_scope=False)
  247. obj_list = await DictTypeCRUD(auth).get_obj_list_crud()
  248. if not obj_list:
  249. log.warning("未找到任何字典类型数据")
  250. return
  251. success_count = 0
  252. fail_count = 0
  253. for obj in obj_list:
  254. dict_type = obj.dict_type
  255. try:
  256. dict_data_list = await DictDataCRUD(auth).get_obj_list_crud(search={'dict_type': dict_type})
  257. dict_data = [DictDataOutSchema.model_validate(row).model_dump() for row in dict_data_list if row]
  258. # 保存到Redis并设置过期时间
  259. redis_key = f"{RedisInitKeyConfig.SYSTEM_DICT.key}:{dict_type}"
  260. value = json.dumps(dict_data, ensure_ascii=False)
  261. await RedisCURD(redis).set(
  262. key=redis_key,
  263. value=value,
  264. )
  265. success_count += 1
  266. log.info(f"✅ 字典数据缓存成功: {dict_type}")
  267. except Exception as e:
  268. fail_count += 1
  269. log.error(f"❌ 初始化字典数据失败 [{dict_type}]: {e}")
  270. # 继续处理其他字典类型,不中断整个初始化过程
  271. log.info(f"字典数据初始化完成 - 成功: {success_count}, 失败: {fail_count}")
  272. except Exception as e:
  273. log.error(f"字典初始化过程发生错误: {e}")
  274. # 只在严重错误时抛出异常,允许单个字典加载失败
  275. raise CustomException(msg=f"字典数据初始化失败: {str(e)}")
  276. @classmethod
  277. async def get_init_dict_service(cls, redis: Redis, dict_type: str)->list[dict]:
  278. """
  279. 从缓存获取字典数据列表信息service
  280. 参数:
  281. - redis (Redis): Redis客户端
  282. - dict_type (str): 字典类型
  283. 返回:
  284. - list[dict]: 字典数据列表
  285. """
  286. try:
  287. redis_key = f"{RedisInitKeyConfig.SYSTEM_DICT.key}:{dict_type}"
  288. obj_list_dict = await RedisCURD(redis).get(redis_key)
  289. # 确保返回数据正确序列化
  290. if obj_list_dict:
  291. if isinstance(obj_list_dict, str):
  292. try:
  293. return json.loads(obj_list_dict)
  294. except json.JSONDecodeError:
  295. log.warning(f"字典数据反序列化失败,尝试重新初始化缓存: {dict_type}")
  296. elif isinstance(obj_list_dict, list):
  297. return obj_list_dict
  298. # 缓存不存在或格式错误时重新初始化
  299. await cls.init_dict_service(redis)
  300. obj_list_dict = await RedisCURD(redis).get(redis_key)
  301. if not obj_list_dict:
  302. raise CustomException(msg="数据字典不存在")
  303. # 再次确保返回数据正确序列化
  304. if isinstance(obj_list_dict, str):
  305. try:
  306. return json.loads(obj_list_dict)
  307. except json.JSONDecodeError:
  308. raise CustomException(msg="字典数据格式错误")
  309. return obj_list_dict
  310. except CustomException:
  311. raise
  312. except Exception as e:
  313. log.error(f"获取字典缓存失败: {str(e)}")
  314. raise CustomException(msg=f"获取字典数据失败: {str(e)}")
  315. @classmethod
  316. async def create_obj_service(cls, auth: AuthSchema, redis: Redis, data: DictDataCreateSchema) -> dict:
  317. """
  318. 创建数据字典数据
  319. 参数:
  320. - auth (AuthSchema): 认证信息模型
  321. - redis (Redis): Redis客户端
  322. - data (DictDataCreateSchema): 数据字典数据创建模型
  323. 返回:
  324. - dict: 数据字典数据详情字典
  325. """
  326. exist_obj = await DictDataCRUD(auth).get(dict_label=data.dict_label)
  327. if exist_obj:
  328. raise CustomException(msg='创建失败,该字典数据已存在')
  329. obj = await DictDataCRUD(auth).create_obj_crud(data=data)
  330. redis_key = f"{RedisInitKeyConfig.SYSTEM_DICT.key}:{data.dict_type}"
  331. try:
  332. # 获取当前字典类型的所有字典数据
  333. dict_data_list = await DictDataCRUD(auth).get_obj_list_crud(search={'dict_type': data.dict_type})
  334. dict_data = [DictDataOutSchema.model_validate(row).model_dump() for row in dict_data_list if row]
  335. value = json.dumps(dict_data, ensure_ascii=False)
  336. await RedisCURD(redis).set(
  337. key=redis_key,
  338. value=value,
  339. )
  340. log.info(f"创建字典数据写入缓存成功: {obj}")
  341. except Exception as e:
  342. log.error(f"创建字典数据写入缓存失败: {e}")
  343. raise CustomException(msg=f"创建字典数据失败 {e}")
  344. return DictDataOutSchema.model_validate(obj).model_dump()
  345. @classmethod
  346. async def update_obj_service(cls, auth: AuthSchema, redis: Redis, id:int, data: DictDataUpdateSchema) -> dict:
  347. """
  348. 更新数据字典数据
  349. 参数:
  350. - auth (AuthSchema): 认证信息模型
  351. - redis (Redis): Redis客户端
  352. - id (int): 数据字典数据ID
  353. - data (DictDataUpdateSchema): 数据字典数据更新模型
  354. 返回:
  355. - Dict: 数据字典数据详情字典
  356. """
  357. exist_obj = await DictDataCRUD(auth).get_obj_by_id_crud(id=id)
  358. if not exist_obj:
  359. raise CustomException(msg='更新失败,该字典数据不存在')
  360. if exist_obj.id != id:
  361. raise CustomException(msg='更新失败,数据字典数据重复')
  362. # 如果字典类型变更,仅刷新旧类型缓存,不联动字典类型状态
  363. if exist_obj.dict_type != data.dict_type:
  364. dict_type = await DictTypeCRUD(auth).get(dict_type=exist_obj.dict_type)
  365. if dict_type:
  366. redis_key = f"{RedisInitKeyConfig.SYSTEM_DICT.key}:{dict_type.dict_type}"
  367. try:
  368. dict_data_list = await DictDataCRUD(auth).get_obj_list_crud(search={'dict_type': dict_type.dict_type})
  369. dict_data = [DictDataOutSchema.model_validate(row).model_dump() for row in dict_data_list if row]
  370. value = json.dumps(dict_data, ensure_ascii=False)
  371. await RedisCURD(redis).set(
  372. key=redis_key,
  373. value=value,
  374. )
  375. except Exception as e:
  376. log.error(f"更新字典数据类型变更时刷新旧缓存失败: {e}")
  377. obj = await DictDataCRUD(auth).update_obj_crud(id=id, data=data)
  378. redis_key = f"{RedisInitKeyConfig.SYSTEM_DICT.key}:{data.dict_type}"
  379. try:
  380. # 获取当前字典类型的所有字典数据
  381. dict_data_list = await DictDataCRUD(auth).get_obj_list_crud(search={'dict_type': data.dict_type})
  382. dict_data = [DictDataOutSchema.model_validate(row).model_dump() for row in dict_data_list if row]
  383. value = json.dumps(dict_data, ensure_ascii=False)
  384. await RedisCURD(redis).set(
  385. key=redis_key,
  386. value=value,
  387. )
  388. log.info(f"更新字典数据写入缓存成功: {obj}")
  389. except Exception as e:
  390. log.error(f"更新字典数据写入缓存失败: {e}")
  391. raise CustomException(msg=f"更新字典数据失败 {e}")
  392. return DictDataOutSchema.model_validate(obj).model_dump()
  393. @classmethod
  394. async def delete_obj_service(cls, auth: AuthSchema, redis: Redis, ids: list[int]) -> None:
  395. """
  396. 删除数据字典数据
  397. 参数:
  398. - auth (AuthSchema): 认证信息模型
  399. - redis (Redis): Redis客户端
  400. - ids (list[int]): 数据字典数据ID列表
  401. 返回:
  402. - None
  403. """
  404. try:
  405. if len(ids) < 1:
  406. raise CustomException(msg='删除失败,删除对象不能为空')
  407. # 首先检查是否包含系统默认数据
  408. for id in ids:
  409. exist_obj = await DictDataCRUD(auth).get_obj_by_id_crud(id=id)
  410. if not exist_obj:
  411. raise CustomException(msg=f'{id} 删除失败,该字典数据不存在')
  412. # 系统默认字典数据不允许删除
  413. if exist_obj.is_default:
  414. raise CustomException(msg=f'删除失败,ID为{id}的系统默认字典数据不允许删除')
  415. # 获取所有需要清除的缓存键
  416. dict_types_to_clear = set()
  417. for id in ids:
  418. exist_obj = await DictDataCRUD(auth).get_obj_by_id_crud(id=id)
  419. if exist_obj:
  420. dict_types_to_clear.add(exist_obj.dict_type)
  421. # 执行删除操作
  422. await DictDataCRUD(auth).delete_obj_crud(ids=ids)
  423. # 清除缓存
  424. for dict_type in dict_types_to_clear:
  425. try:
  426. redis_key = f"{RedisInitKeyConfig.SYSTEM_DICT.key}:{dict_type}"
  427. await RedisCURD(redis).delete(redis_key)
  428. log.info(f"清除字典缓存成功: {dict_type}")
  429. except Exception as e:
  430. log.warning(f"清除字典缓存失败: {e}")
  431. # 缓存清除失败不影响删除操作
  432. log.info(f"删除字典数据成功,ID列表: {ids}")
  433. except CustomException:
  434. raise
  435. except Exception as e:
  436. log.error(f"删除字典数据失败: {str(e)}")
  437. raise CustomException(msg=f"删除字典数据失败: {str(e)}")
  438. @classmethod
  439. async def set_obj_available_service(cls, auth: AuthSchema, data: BatchSetAvailable) -> None:
  440. """
  441. 批量修改数据字典数据状态
  442. 参数:
  443. - auth (AuthSchema): 认证信息模型
  444. - data (BatchSetAvailable): 批量修改数据字典数据状态负载模型
  445. 返回:
  446. - None
  447. """
  448. await DictDataCRUD(auth).set_obj_available_crud(ids=data.ids, status=data.status)
  449. @classmethod
  450. async def export_obj_service(cls, data_list: list[dict]) -> bytes:
  451. """
  452. 导出数据字典数据列表
  453. 参数:
  454. - data_list (list[dict]): 数据字典数据列表
  455. 返回:
  456. - bytes: Excel文件字节流
  457. """
  458. mapping_dict = {
  459. 'id': '编号',
  460. 'dict_sort': '字典排序',
  461. 'dict_label': '字典标签',
  462. 'dict_value': '字典键值',
  463. 'dict_type': '字典类型',
  464. 'css_class': '样式属性',
  465. 'list_class': '表格回显样式',
  466. 'is_default': '是否默认',
  467. 'status': '状态',
  468. 'description': '备注',
  469. 'created_time': '创建时间',
  470. 'updated_time': '更新时间',
  471. 'created_id': '创建者ID',
  472. 'updated_id': '更新者ID',
  473. }
  474. # 复制数据并转换状态
  475. data = data_list.copy()
  476. for item in data:
  477. # 处理状态
  478. item['status'] = '启用' if item.get('status') == '0' else '停用'
  479. # 处理是否默认
  480. item['is_default'] = '是' if item.get('is_default') else '否'
  481. item['creator'] = item.get('creator', {}).get('name', '未知') if isinstance(item.get('creator'), dict) else '未知'
  482. return ExcelUtil.export_list2excel(list_data=data, mapping_dict=mapping_dict)