service.py 9.7 KB


  1. # -*- coding: utf-8 -*-
  2. import io
  3. from fastapi import UploadFile
  4. import pandas as pd
  5. from app.core.base_schema import BatchSetAvailable
  6. from app.core.exceptions import CustomException
  7. from app.utils.excel_util import ExcelUtil
  8. from app.core.logger import log
  9. from app.api.v1.module_system.auth.schema import AuthSchema
  10. from .schema import BizVarDictCreateSchema, BizVarDictUpdateSchema, BizVarDictOutSchema, BizVarDictQueryParam
  11. from .crud import BizVarDictCRUD
  12. class BizVarDictService:
  13. """
  14. 变量信息服务层
  15. """
  16. @classmethod
  17. async def detail_vardict_service(cls, auth: AuthSchema, id: int) -> dict:
  18. """详情"""
  19. obj = await BizVarDictCRUD(auth).get_by_id_vardict_crud(id=id)
  20. if not obj:
  21. raise CustomException(msg="该数据不存在")
  22. return BizVarDictOutSchema.model_validate(obj).model_dump()
  23. @classmethod
  24. async def list_vardict_service(cls, auth: AuthSchema, search: BizVarDictQueryParam | None = None, order_by: list[dict] | None = None) -> list[dict]:
  25. """列表查询"""
  26. search_dict = search.__dict__ if search else None
  27. obj_list = await BizVarDictCRUD(auth).list_vardict_crud(search=search_dict, order_by=order_by)
  28. return [BizVarDictOutSchema.model_validate(obj).model_dump() for obj in obj_list]
  29. @classmethod
  30. async def page_vardict_service(cls, auth: AuthSchema, page_no: int, page_size: int, search: BizVarDictQueryParam | None = None, order_by: list[dict] | None = None) -> dict:
  31. """分页查询(数据库分页)"""
  32. search_dict = search.__dict__ if search else {}
  33. order_by_list = order_by or [{'id': 'asc'}]
  34. offset = (page_no - 1) * page_size
  35. result = await BizVarDictCRUD(auth).page_vardict_crud(
  36. offset=offset,
  37. limit=page_size,
  38. order_by=order_by_list,
  39. search=search_dict
  40. )
  41. return result
  42. @classmethod
  43. async def create_vardict_service(cls, auth: AuthSchema, data: BizVarDictCreateSchema) -> dict:
  44. """创建"""
  45. # 检查唯一性约束
  46. obj = await BizVarDictCRUD(auth).create_vardict_crud(data=data)
  47. return BizVarDictOutSchema.model_validate(obj).model_dump()
  48. @classmethod
  49. async def update_vardict_service(cls, auth: AuthSchema, id: int, data: BizVarDictUpdateSchema) -> dict:
  50. """更新"""
  51. # 检查数据是否存在
  52. obj = await BizVarDictCRUD(auth).get_by_id_vardict_crud(id=id)
  53. if not obj:
  54. raise CustomException(msg='更新失败,该数据不存在')
  55. # 检查唯一性约束
  56. obj = await BizVarDictCRUD(auth).update_vardict_crud(id=id, data=data)
  57. return BizVarDictOutSchema.model_validate(obj).model_dump()
  58. @classmethod
  59. async def delete_vardict_service(cls, auth: AuthSchema, ids: list[int]) -> None:
  60. """删除"""
  61. if len(ids) < 1:
  62. raise CustomException(msg='删除失败,删除对象不能为空')
  63. for id in ids:
  64. obj = await BizVarDictCRUD(auth).get_by_id_vardict_crud(id=id)
  65. if not obj:
  66. raise CustomException(msg=f'删除失败,ID为{id}的数据不存在')
  67. await BizVarDictCRUD(auth).delete_vardict_crud(ids=ids)
  68. @classmethod
  69. async def set_available_vardict_service(cls, auth: AuthSchema, data: BatchSetAvailable) -> None:
  70. """批量设置状态"""
  71. await BizVarDictCRUD(auth).set_available_vardict_crud(ids=data.ids, status=data.status)
  72. @classmethod
  73. async def batch_export_vardict_service(cls, obj_list: list[dict]) -> bytes:
  74. """批量导出"""
  75. mapping_dict = {
  76. 'var_code': '变量code',
  77. 'var_name': '变量名',
  78. 'mec_type': '所属机构',
  79. 'data_type': '数据类型',
  80. 'switch_type': '指令灯颜色',
  81. 'is_alert': '是否报警',
  82. 'modbus_address': 'modbus地址',
  83. 'modbus_data_type': 'modubs数据类型',
  84. 'is_reverse': '是否取反',
  85. 'unit': '单位',
  86. 'crane_id': '行车id',
  87. 'order': '排序',
  88. 'id': '主键ID',
  89. 'uuid': 'UUID全局唯一标识',
  90. 'status': '是否启用',
  91. 'description': '备注',
  92. 'created_time': '创建时间',
  93. 'updated_time': '更新时间',
  94. 'created_id': '创建人ID',
  95. 'updated_id': '更新人ID',
  96. 'updated_id': '更新者ID',
  97. }
  98. data = obj_list.copy()
  99. for item in data:
  100. # 状态转换
  101. if 'status' in item:
  102. item['status'] = '启用' if item.get('status') == '0' else '停用'
  103. # 创建者转换
  104. creator_info = item.get('creator')
  105. if isinstance(creator_info, dict):
  106. item['creator'] = creator_info.get('name', '未知')
  107. elif creator_info is None:
  108. item['creator'] = '未知'
  109. return ExcelUtil.export_list2excel(list_data=data, mapping_dict=mapping_dict)
  110. @classmethod
  111. async def batch_import_vardict_service(cls, auth: AuthSchema, file: UploadFile, update_support: bool = False) -> str:
  112. """批量导入"""
  113. header_dict = {
  114. '变量code': 'var_code',
  115. '变量名': 'var_name',
  116. '所属机构': 'mec_type',
  117. '数据类型': 'data_type',
  118. '指令灯颜色': 'switch_type',
  119. '是否报警': 'is_alert',
  120. 'modbus地址': 'modbus_address',
  121. 'modubs数据类型': 'modbus_data_type',
  122. '是否取反': 'is_reverse',
  123. '单位': 'unit',
  124. '行车id': 'crane_id',
  125. '排序': 'order',
  126. '主键ID': 'id',
  127. 'UUID全局唯一标识': 'uuid',
  128. '是否启用': 'status',
  129. '备注': 'description',
  130. '创建时间': 'created_time',
  131. '更新时间': 'updated_time',
  132. '创建人ID': 'created_id',
  133. '更新人ID': 'updated_id',
  134. }
  135. try:
  136. contents = await file.read()
  137. df = pd.read_excel(io.BytesIO(contents))
  138. await file.close()
  139. if df.empty:
  140. raise CustomException(msg="导入文件为空")
  141. missing_headers = [header for header in header_dict.keys() if header not in df.columns]
  142. if missing_headers:
  143. raise CustomException(msg=f"导入文件缺少必要的列: {', '.join(missing_headers)}")
  144. df.rename(columns=header_dict, inplace=True)
  145. # 验证必填字段
  146. error_msgs = []
  147. success_count = 0
  148. count = 0
  149. for index, row in df.iterrows():
  150. count += 1
  151. try:
  152. data = {
  153. "var_code": row['var_code'],
  154. "var_name": row['var_name'],
  155. "mec_type": row['mec_type'],
  156. "data_type": row['data_type'],
  157. "switch_type": row['switch_type'],
  158. "is_alert": row['is_alert'],
  159. "modbus_address": row['modbus_address'],
  160. "modbus_data_type": row['modbus_data_type'],
  161. "is_reverse": row['is_reverse'],
  162. "unit": row['unit'],
  163. "crane_id": row['crane_id'],
  164. "order": row['order'],
  165. "id": row['id'],
  166. "uuid": row['uuid'],
  167. "status": row['status'],
  168. "description": row['description'],
  169. "created_time": row['created_time'],
  170. "updated_time": row['updated_time'],
  171. "created_id": row['created_id'],
  172. "updated_id": row['updated_id'],
  173. }
  174. # 使用CreateSchema做校验后入库
  175. create_schema = BizVarDictCreateSchema.model_validate(data)
  176. # 检查唯一性约束
  177. await BizVarDictCRUD(auth).create_vardict_crud(data=create_schema)
  178. success_count += 1
  179. except Exception as e:
  180. error_msgs.append(f"第{count}行: {str(e)}")
  181. continue
  182. result = f"成功导入 {success_count} 条数据"
  183. if error_msgs:
  184. result += "\n错误信息:\n" + "\n".join(error_msgs)
  185. return result
  186. except Exception as e:
  187. log.error(f"批量导入失败: {str(e)}")
  188. raise CustomException(msg=f"导入失败: {str(e)}")
  189. @classmethod
  190. async def import_template_download_vardict_service(cls) -> bytes:
  191. """下载导入模板"""
  192. header_list = [
  193. '变量code',
  194. '变量名',
  195. '所属机构',
  196. '数据类型',
  197. '指令灯颜色',
  198. '是否报警',
  199. 'modbus地址',
  200. 'modubs数据类型',
  201. '是否取反',
  202. '单位',
  203. '行车id',
  204. '排序',
  205. '主键ID',
  206. 'UUID全局唯一标识',
  207. '是否启用',
  208. '备注',
  209. '创建时间',
  210. '更新时间',
  211. '创建人ID',
  212. '更新人ID',
  213. ]
  214. selector_header_list = []
  215. option_list = []
  216. # 添加下拉选项
  217. return ExcelUtil.get_excel_template(
  218. header_list=header_list,
  219. selector_header_list=selector_header_list,
  220. option_list=option_list
  221. )