service.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. # -*- coding: utf-8 -*-
  2. import io
  3. from fastapi import UploadFile
  4. import pandas as pd
  5. from app.core.base_schema import BatchSetAvailable
  6. from app.core.exceptions import CustomException
  7. from app.utils.excel_util import ExcelUtil
  8. from app.core.logger import log
  9. from app.api.v1.module_system.auth.schema import AuthSchema
  10. from .schema import BizCraneCreateSchema, BizCraneUpdateSchema, BizCraneOutSchema, BizCraneQueryParam
  11. from .crud import BizCraneCRUD
  12. class BizCraneService:
  13. """
  14. 行车信息服务层
  15. """
  16. @classmethod
  17. async def detail_crane_service(cls, auth: AuthSchema, id: int) -> dict:
  18. """详情"""
  19. obj = await BizCraneCRUD(auth).get_by_id_crane_crud(id=id)
  20. if not obj:
  21. raise CustomException(msg="该数据不存在")
  22. return BizCraneOutSchema.model_validate(obj).model_dump()
  23. @classmethod
  24. async def list_crane_service(cls, auth: AuthSchema, search: BizCraneQueryParam | None = None, order_by: list[dict] | None = None) -> list[dict]:
  25. """列表查询"""
  26. search_dict = search.__dict__ if search else None
  27. obj_list = await BizCraneCRUD(auth).list_crane_crud(search=search_dict, order_by=order_by)
  28. return [BizCraneOutSchema.model_validate(obj).model_dump() for obj in obj_list]
  29. @classmethod
  30. async def page_crane_service(cls, auth: AuthSchema, page_no: int, page_size: int, search: BizCraneQueryParam | None = None, order_by: list[dict] | None = None) -> dict:
  31. """分页查询(数据库分页)"""
  32. search_dict = search.__dict__ if search else {}
  33. order_by_list = order_by or [{'id': 'asc'}]
  34. offset = (page_no - 1) * page_size
  35. result = await BizCraneCRUD(auth).page_crane_crud(
  36. offset=offset,
  37. limit=page_size,
  38. order_by=order_by_list,
  39. search=search_dict
  40. )
  41. return result
  42. @classmethod
  43. async def create_crane_service(cls, auth: AuthSchema, data: BizCraneCreateSchema) -> dict:
  44. """创建"""
  45. # 检查唯一性约束
  46. obj = await BizCraneCRUD(auth).create_crane_crud(data=data)
  47. return BizCraneOutSchema.model_validate(obj).model_dump()
  48. @classmethod
  49. async def update_crane_service(cls, auth: AuthSchema, id: int, data: BizCraneUpdateSchema) -> dict:
  50. """更新"""
  51. # 检查数据是否存在
  52. obj = await BizCraneCRUD(auth).get_by_id_crane_crud(id=id)
  53. if not obj:
  54. raise CustomException(msg='更新失败,该数据不存在')
  55. # 检查唯一性约束
  56. obj = await BizCraneCRUD(auth).update_crane_crud(id=id, data=data)
  57. return BizCraneOutSchema.model_validate(obj).model_dump()
  58. @classmethod
  59. async def delete_crane_service(cls, auth: AuthSchema, ids: list[int]) -> None:
  60. """删除"""
  61. if len(ids) < 1:
  62. raise CustomException(msg='删除失败,删除对象不能为空')
  63. for id in ids:
  64. obj = await BizCraneCRUD(auth).get_by_id_crane_crud(id=id)
  65. if not obj:
  66. raise CustomException(msg=f'删除失败,ID为{id}的数据不存在')
  67. await BizCraneCRUD(auth).delete_crane_crud(ids=ids)
  68. @classmethod
  69. async def set_available_crane_service(cls, auth: AuthSchema, data: BatchSetAvailable) -> None:
  70. """批量设置状态"""
  71. await BizCraneCRUD(auth).set_available_crane_crud(ids=data.ids, status=data.status)
  72. @classmethod
  73. async def batch_export_crane_service(cls, obj_list: list[dict]) -> bytes:
  74. """批量导出"""
  75. mapping_dict = {
  76. 'crane_name': '行车名称',
  77. 'crane_no': '行车编号',
  78. 'crane_model': '行车型号',
  79. 'work_span': '工作跨度',
  80. 'work_height': '工作高度',
  81. 'work_weight': '最大载重',
  82. 'ip_address': 'ip地址',
  83. 'modbus_port': 'modbus端口号',
  84. 'order': '排序',
  85. 'id': '主键ID',
  86. 'uuid': 'UUID全局唯一标识',
  87. 'status': '是否启用(0:启用 1:禁用)',
  88. 'description': '备注/描述',
  89. 'created_time': '创建时间',
  90. 'updated_time': '更新时间',
  91. 'created_id': '创建人ID',
  92. 'updated_id': '更新人ID',
  93. 'updated_id': '更新者ID',
  94. }
  95. data = obj_list.copy()
  96. for item in data:
  97. # 状态转换
  98. if 'status' in item:
  99. item['status'] = '启用' if item.get('status') == '0' else '停用'
  100. # 创建者转换
  101. creator_info = item.get('creator')
  102. if isinstance(creator_info, dict):
  103. item['creator'] = creator_info.get('name', '未知')
  104. elif creator_info is None:
  105. item['creator'] = '未知'
  106. return ExcelUtil.export_list2excel(list_data=data, mapping_dict=mapping_dict)
  107. @classmethod
  108. async def batch_import_crane_service(cls, auth: AuthSchema, file: UploadFile, update_support: bool = False) -> str:
  109. """批量导入"""
  110. header_dict = {
  111. '行车名称': 'crane_name',
  112. '行车编号': 'crane_no',
  113. '行车型号': 'crane_model',
  114. '工作跨度': 'work_span',
  115. '工作高度': 'work_height',
  116. '最大载重': 'work_weight',
  117. 'ip地址': 'ip_address',
  118. 'modbus端口号': 'modbus_port',
  119. '排序': 'order',
  120. '主键ID': 'id',
  121. 'UUID全局唯一标识': 'uuid',
  122. '是否启用(0:启用 1:禁用)': 'status',
  123. '备注/描述': 'description',
  124. '创建时间': 'created_time',
  125. '更新时间': 'updated_time',
  126. '创建人ID': 'created_id',
  127. '更新人ID': 'updated_id',
  128. }
  129. try:
  130. contents = await file.read()
  131. df = pd.read_excel(io.BytesIO(contents))
  132. await file.close()
  133. if df.empty:
  134. raise CustomException(msg="导入文件为空")
  135. missing_headers = [header for header in header_dict.keys() if header not in df.columns]
  136. if missing_headers:
  137. raise CustomException(msg=f"导入文件缺少必要的列: {', '.join(missing_headers)}")
  138. df.rename(columns=header_dict, inplace=True)
  139. # 验证必填字段
  140. error_msgs = []
  141. success_count = 0
  142. count = 0
  143. for index, row in df.iterrows():
  144. count += 1
  145. try:
  146. data = {
  147. "crane_name": row['crane_name'],
  148. "crane_no": row['crane_no'],
  149. "crane_model": row['crane_model'],
  150. "work_span": row['work_span'],
  151. "work_height": row['work_height'],
  152. "work_weight": row['work_weight'],
  153. "ip_address": row['ip_address'],
  154. "modbus_port": row['modbus_port'],
  155. "order": row['order'],
  156. "id": row['id'],
  157. "uuid": row['uuid'],
  158. "status": row['status'],
  159. "description": row['description'],
  160. "created_time": row['created_time'],
  161. "updated_time": row['updated_time'],
  162. "created_id": row['created_id'],
  163. "updated_id": row['updated_id'],
  164. }
  165. # 使用CreateSchema做校验后入库
  166. create_schema = BizCraneCreateSchema.model_validate(data)
  167. # 检查唯一性约束
  168. await BizCraneCRUD(auth).create_crane_crud(data=create_schema)
  169. success_count += 1
  170. except Exception as e:
  171. error_msgs.append(f"第{count}行: {str(e)}")
  172. continue
  173. result = f"成功导入 {success_count} 条数据"
  174. if error_msgs:
  175. result += "\n错误信息:\n" + "\n".join(error_msgs)
  176. return result
  177. except Exception as e:
  178. log.error(f"批量导入失败: {str(e)}")
  179. raise CustomException(msg=f"导入失败: {str(e)}")
  180. @classmethod
  181. async def import_template_download_crane_service(cls) -> bytes:
  182. """下载导入模板"""
  183. header_list = [
  184. '行车名称',
  185. '行车编号',
  186. '行车型号',
  187. '工作跨度',
  188. '工作高度',
  189. '最大载重',
  190. 'ip地址',
  191. 'modbus端口号',
  192. '排序',
  193. '主键ID',
  194. 'UUID全局唯一标识',
  195. '是否启用(0:启用 1:禁用)',
  196. '备注/描述',
  197. '创建时间',
  198. '更新时间',
  199. '创建人ID',
  200. '更新人ID',
  201. ]
  202. selector_header_list = []
  203. option_list = []
  204. # 添加下拉选项
  205. return ExcelUtil.get_excel_template(
  206. header_list=header_list,
  207. selector_header_list=selector_header_list,
  208. option_list=option_list
  209. )