service.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. # -*- coding: utf-8 -*-
  2. import io
  3. from fastapi import UploadFile
  4. import pandas as pd
  5. from app.core.base_schema import BatchSetAvailable
  6. from app.core.exceptions import CustomException
  7. from app.utils.excel_util import ExcelUtil
  8. from app.core.logger import log
  9. from app.api.v1.module_system.auth.schema import AuthSchema
  10. from .schema import GatewayCreateSchema, GatewayUpdateSchema, GatewayOutSchema, GatewayQueryParam
  11. from .crud import GatewayCRUD
  12. from ..crane.crud import BizCraneCRUD
  13. from ..crane.model import BizCraneModel
  14. class GatewayService:
  15. """
  16. 网关信息服务层
  17. """
  18. @classmethod
  19. async def detail_gateway_service(cls, auth: AuthSchema, id: int) -> dict:
  20. """详情"""
  21. obj = await GatewayCRUD(auth).get_by_id_gateway_crud(id=id)
  22. if not obj:
  23. raise CustomException(msg="该数据不存在")
  24. crane = await BizCraneCRUD(auth).get_by_id_crane_crud_for_no(obj.crane_no)
  25. res = GatewayOutSchema.model_validate(obj).model_dump()
  26. res['crane_name'] = crane.crane_name
  27. return res
  28. @classmethod
  29. async def list_gateway_service(cls, auth: AuthSchema, search: GatewayQueryParam | None = None, order_by: list[dict] | None = None) -> list[dict]:
  30. """列表查询"""
  31. search_dict = search.__dict__ if search else None
  32. obj_list = await GatewayCRUD(auth).list_gateway_crud(search=search_dict, order_by=order_by)
  33. return [GatewayOutSchema.model_validate(obj).model_dump() for obj in obj_list]
  34. @classmethod
  35. async def page_gateway_service(cls, auth: AuthSchema, page_no: int, page_size: int, search: GatewayQueryParam | None = None, order_by: list[dict] | None = None) -> dict:
  36. """分页查询(数据库分页)"""
  37. search_dict = search.__dict__ if search else {}
  38. order_by_list = order_by or [{'id': 'asc'}]
  39. offset = (page_no - 1) * page_size
  40. result = await GatewayCRUD(auth).page_gateway_crud(
  41. offset=offset,
  42. limit=page_size,
  43. order_by=order_by_list,
  44. search=search_dict
  45. )
  46. for item in result.get('items'):
  47. crane_model: BizCraneModel | None = await BizCraneCRUD(auth).get_by_id_crane_crud_for_no(crane_no=item['crane_no'])
  48. item['crane_name'] = crane_model.crane_name
  49. return result
  50. @classmethod
  51. async def create_gateway_service(cls, auth: AuthSchema, data: GatewayCreateSchema) -> dict:
  52. """创建"""
  53. # 检查唯一性约束
  54. obj = await GatewayCRUD(auth).create_gateway_crud(data=data)
  55. return GatewayOutSchema.model_validate(obj).model_dump()
  56. @classmethod
  57. async def update_gateway_service(cls, auth: AuthSchema, id: int, data: GatewayUpdateSchema) -> dict:
  58. """更新"""
  59. # 检查数据是否存在
  60. obj = await GatewayCRUD(auth).get_by_id_gateway_crud(id=id)
  61. if not obj:
  62. raise CustomException(msg='更新失败,该数据不存在')
  63. # 检查唯一性约束
  64. obj = await GatewayCRUD(auth).update_gateway_crud(id=id, data=data)
  65. return GatewayOutSchema.model_validate(obj).model_dump()
  66. @classmethod
  67. async def delete_gateway_service(cls, auth: AuthSchema, ids: list[int]) -> None:
  68. """删除"""
  69. if len(ids) < 1:
  70. raise CustomException(msg='删除失败,删除对象不能为空')
  71. for id in ids:
  72. obj = await GatewayCRUD(auth).get_by_id_gateway_crud(id=id)
  73. if not obj:
  74. raise CustomException(msg=f'删除失败,ID为{id}的数据不存在')
  75. await GatewayCRUD(auth).delete_gateway_crud(ids=ids)
  76. @classmethod
  77. async def set_available_gateway_service(cls, auth: AuthSchema, data: BatchSetAvailable) -> None:
  78. """批量设置状态"""
  79. await GatewayCRUD(auth).set_available_gateway_crud(ids=data.ids, status=data.status)
  80. @classmethod
  81. async def batch_export_gateway_service(cls, obj_list: list[dict]) -> bytes:
  82. """批量导出"""
  83. mapping_dict = {
  84. 'id': '编号 编号 自增',
  85. 'crane_no': '行车编号',
  86. 'gateway_name': '网关名称',
  87. 'gateway_type': '网关类型',
  88. 'gateway_ipaddress': '网关IP地址 ',
  89. 'gateway_port': '网关端口 ',
  90. 'plc_brand': 'PLC品牌',
  91. 'plc_model': 'PLC型号 ',
  92. 'serial_port_name': '端口号',
  93. 'serial_baud_rate': '波特率 ',
  94. 'serial_data_bits': '数据位 5678',
  95. 'serial_stop_bits': '停止位 ',
  96. 'serial_parity': '检验位 ',
  97. 'uuid': 'UUID全局唯一标识',
  98. 'status': '是否启用(0:启用 1:禁用)',
  99. 'description': '备注/描述',
  100. 'created_time': '创建时间',
  101. 'updated_time': '更新时间',
  102. 'updated_id': '更新者ID',
  103. }
  104. data = obj_list.copy()
  105. for item in data:
  106. # 状态转换
  107. if 'status' in item:
  108. item['status'] = '启用' if item.get('status') == '0' else '停用'
  109. # 创建者转换
  110. creator_info = item.get('creator')
  111. if isinstance(creator_info, dict):
  112. item['creator'] = creator_info.get('name', '未知')
  113. elif creator_info is None:
  114. item['creator'] = '未知'
  115. return ExcelUtil.export_list2excel(list_data=data, mapping_dict=mapping_dict)
  116. @classmethod
  117. async def batch_import_gateway_service(cls, auth: AuthSchema, file: UploadFile, update_support: bool = False) -> str:
  118. """批量导入"""
  119. header_dict = {
  120. '编号 编号 自增': 'id',
  121. '行车编号': 'crane_no',
  122. '网关名称': 'gateway_name',
  123. '网关类型': 'gateway_type',
  124. '网关IP地址 ': 'gateway_ipaddress',
  125. '网关端口 ': 'gateway_port',
  126. 'PLC品牌': 'plc_brand',
  127. 'PLC型号 ': 'plc_model',
  128. '端口号': 'serial_port_name',
  129. '波特率 ': 'serial_baud_rate',
  130. '数据位 5678': 'serial_data_bits',
  131. '停止位 ': 'serial_stop_bits',
  132. '检验位 ': 'serial_parity',
  133. 'UUID全局唯一标识': 'uuid',
  134. '是否启用(0:启用 1:禁用)': 'status',
  135. '备注/描述': 'description',
  136. '创建时间': 'created_time',
  137. '更新时间': 'updated_time',
  138. }
  139. try:
  140. contents = await file.read()
  141. df = pd.read_excel(io.BytesIO(contents))
  142. await file.close()
  143. if df.empty:
  144. raise CustomException(msg="导入文件为空")
  145. missing_headers = [header for header in header_dict.keys() if header not in df.columns]
  146. if missing_headers:
  147. raise CustomException(msg=f"导入文件缺少必要的列: {', '.join(missing_headers)}")
  148. df.rename(columns=header_dict, inplace=True)
  149. # 验证必填字段
  150. error_msgs = []
  151. success_count = 0
  152. count = 0
  153. for index, row in df.iterrows():
  154. count += 1
  155. try:
  156. data = {
  157. "id": row['id'],
  158. "crane_no": row['crane_no'],
  159. "gateway_name": row['gateway_name'],
  160. "gateway_type": row['gateway_type'],
  161. "gateway_ipaddress": row['gateway_ipaddress'],
  162. "gateway_port": row['gateway_port'],
  163. "plc_brand": row['plc_brand'],
  164. "plc_model": row['plc_model'],
  165. "serial_port_name": row['serial_port_name'],
  166. "serial_baud_rate": row['serial_baud_rate'],
  167. "serial_data_bits": row['serial_data_bits'],
  168. "serial_stop_bits": row['serial_stop_bits'],
  169. "serial_parity": row['serial_parity'],
  170. "uuid": row['uuid'],
  171. "status": row['status'],
  172. "description": row['description'],
  173. "created_time": row['created_time'],
  174. "updated_time": row['updated_time'],
  175. }
  176. # 使用CreateSchema做校验后入库
  177. create_schema = GatewayCreateSchema.model_validate(data)
  178. # 检查唯一性约束
  179. await GatewayCRUD(auth).create_gateway_crud(data=create_schema)
  180. success_count += 1
  181. except Exception as e:
  182. error_msgs.append(f"第{count}行: {str(e)}")
  183. continue
  184. result = f"成功导入 {success_count} 条数据"
  185. if error_msgs:
  186. result += "\n错误信息:\n" + "\n".join(error_msgs)
  187. return result
  188. except Exception as e:
  189. log.error(f"批量导入失败: {str(e)}")
  190. raise CustomException(msg=f"导入失败: {str(e)}")
  191. @classmethod
  192. async def import_template_download_gateway_service(cls) -> bytes:
  193. """下载导入模板"""
  194. header_list = [
  195. '编号 编号 自增',
  196. '行车编号',
  197. '网关名称',
  198. '网关类型',
  199. '网关IP地址 ',
  200. '网关端口 ',
  201. 'PLC品牌',
  202. 'PLC型号 ',
  203. '端口号',
  204. '波特率 ',
  205. '数据位 5678',
  206. '停止位 ',
  207. '检验位 ',
  208. 'UUID全局唯一标识',
  209. '是否启用(0:启用 1:禁用)',
  210. '备注/描述',
  211. '创建时间',
  212. '更新时间',
  213. ]
  214. selector_header_list = []
  215. option_list = []
  216. # 添加下拉选项
  217. return ExcelUtil.get_excel_template(
  218. header_list=header_list,
  219. selector_header_list=selector_header_list,
  220. option_list=option_list
  221. )