# -*- coding: utf-8 -*- import io from fastapi import UploadFile import pandas as pd from app.core.base_schema import BatchSetAvailable from app.core.exceptions import CustomException from app.utils.excel_util import ExcelUtil from app.core.logger import log from app.api.v1.module_system.auth.schema import AuthSchema from .schema import GatewayCreateSchema, GatewayUpdateSchema, GatewayOutSchema, GatewayQueryParam from .crud import GatewayCRUD from ..crane.crud import BizCraneCRUD from ..crane.model import BizCraneModel class GatewayService: """ 网关信息服务层 """ @classmethod async def detail_gateway_service(cls, auth: AuthSchema, id: int) -> dict: """详情""" obj = await GatewayCRUD(auth).get_by_id_gateway_crud(id=id) if not obj: raise CustomException(msg="该数据不存在") crane = await BizCraneCRUD(auth).get_by_id_crane_crud_for_no(obj.crane_no) res = GatewayOutSchema.model_validate(obj).model_dump() res['crane_name'] = crane.crane_name return res @classmethod async def list_gateway_service(cls, auth: AuthSchema, search: GatewayQueryParam | None = None, order_by: list[dict] | None = None) -> list[dict]: """列表查询""" search_dict = search.__dict__ if search else None obj_list = await GatewayCRUD(auth).list_gateway_crud(search=search_dict, order_by=order_by) return [GatewayOutSchema.model_validate(obj).model_dump() for obj in obj_list] @classmethod async def page_gateway_service(cls, auth: AuthSchema, page_no: int, page_size: int, search: GatewayQueryParam | None = None, order_by: list[dict] | None = None) -> dict: """分页查询(数据库分页)""" search_dict = search.__dict__ if search else {} order_by_list = order_by or [{'id': 'asc'}] offset = (page_no - 1) * page_size result = await GatewayCRUD(auth).page_gateway_crud( offset=offset, limit=page_size, order_by=order_by_list, search=search_dict ) for item in result.get('items'): crane_model: BizCraneModel | None = await BizCraneCRUD(auth).get_by_id_crane_crud_for_no(crane_no=item['crane_no']) item['crane_name'] = crane_model.crane_name return result @classmethod async def create_gateway_service(cls, auth: AuthSchema, data: GatewayCreateSchema) -> dict: """创建""" # 检查唯一性约束 obj = await GatewayCRUD(auth).create_gateway_crud(data=data) return GatewayOutSchema.model_validate(obj).model_dump() @classmethod async def update_gateway_service(cls, auth: AuthSchema, id: int, data: GatewayUpdateSchema) -> dict: """更新""" # 检查数据是否存在 obj = await GatewayCRUD(auth).get_by_id_gateway_crud(id=id) if not obj: raise CustomException(msg='更新失败,该数据不存在') # 检查唯一性约束 obj = await GatewayCRUD(auth).update_gateway_crud(id=id, data=data) return GatewayOutSchema.model_validate(obj).model_dump() @classmethod async def delete_gateway_service(cls, auth: AuthSchema, ids: list[int]) -> None: """删除""" if len(ids) < 1: raise CustomException(msg='删除失败,删除对象不能为空') for id in ids: obj = await GatewayCRUD(auth).get_by_id_gateway_crud(id=id) if not obj: raise CustomException(msg=f'删除失败,ID为{id}的数据不存在') await GatewayCRUD(auth).delete_gateway_crud(ids=ids) @classmethod async def set_available_gateway_service(cls, auth: AuthSchema, data: BatchSetAvailable) -> None: """批量设置状态""" await GatewayCRUD(auth).set_available_gateway_crud(ids=data.ids, status=data.status) @classmethod async def batch_export_gateway_service(cls, obj_list: list[dict]) -> bytes: """批量导出""" mapping_dict = { 'id': '编号 编号 自增', 'crane_no': '行车编号', 'gateway_name': '网关名称', 'gateway_type': '网关类型', 'gateway_ipaddress': '网关IP地址 ', 'gateway_port': '网关端口 ', 'plc_brand': 'PLC品牌', 'plc_model': 'PLC型号 ', 'serial_port_name': '端口号', 'serial_baud_rate': '波特率 ', 'serial_data_bits': '数据位 5678', 'serial_stop_bits': '停止位 ', 'serial_parity': '检验位 ', 'uuid': 'UUID全局唯一标识', 'status': '是否启用(0:启用 1:禁用)', 'description': '备注/描述', 'created_time': '创建时间', 'updated_time': '更新时间', 'updated_id': '更新者ID', } data = obj_list.copy() for item in data: # 状态转换 if 'status' in item: item['status'] = '启用' if item.get('status') == '0' else '停用' # 创建者转换 creator_info = item.get('creator') if isinstance(creator_info, dict): item['creator'] = creator_info.get('name', '未知') elif creator_info is None: item['creator'] = '未知' return ExcelUtil.export_list2excel(list_data=data, mapping_dict=mapping_dict) @classmethod async def batch_import_gateway_service(cls, auth: AuthSchema, file: UploadFile, update_support: bool = False) -> str: """批量导入""" header_dict = { '编号 编号 自增': 'id', '行车编号': 'crane_no', '网关名称': 'gateway_name', '网关类型': 'gateway_type', '网关IP地址 ': 'gateway_ipaddress', '网关端口 ': 'gateway_port', 'PLC品牌': 'plc_brand', 'PLC型号 ': 'plc_model', '端口号': 'serial_port_name', '波特率 ': 'serial_baud_rate', '数据位 5678': 'serial_data_bits', '停止位 ': 'serial_stop_bits', '检验位 ': 'serial_parity', 'UUID全局唯一标识': 'uuid', '是否启用(0:启用 1:禁用)': 'status', '备注/描述': 'description', '创建时间': 'created_time', '更新时间': 'updated_time', } try: contents = await file.read() df = pd.read_excel(io.BytesIO(contents)) await file.close() if df.empty: raise CustomException(msg="导入文件为空") missing_headers = [header for header in header_dict.keys() if header not in df.columns] if missing_headers: raise CustomException(msg=f"导入文件缺少必要的列: {', '.join(missing_headers)}") df.rename(columns=header_dict, inplace=True) # 验证必填字段 error_msgs = [] success_count = 0 count = 0 for index, row in df.iterrows(): count += 1 try: data = { "id": row['id'], "crane_no": row['crane_no'], "gateway_name": row['gateway_name'], "gateway_type": row['gateway_type'], "gateway_ipaddress": row['gateway_ipaddress'], "gateway_port": row['gateway_port'], "plc_brand": row['plc_brand'], "plc_model": row['plc_model'], "serial_port_name": row['serial_port_name'], "serial_baud_rate": row['serial_baud_rate'], "serial_data_bits": row['serial_data_bits'], "serial_stop_bits": row['serial_stop_bits'], "serial_parity": row['serial_parity'], "uuid": row['uuid'], "status": row['status'], "description": row['description'], "created_time": row['created_time'], "updated_time": row['updated_time'], } # 使用CreateSchema做校验后入库 create_schema = GatewayCreateSchema.model_validate(data) # 检查唯一性约束 await GatewayCRUD(auth).create_gateway_crud(data=create_schema) success_count += 1 except Exception as e: error_msgs.append(f"第{count}行: {str(e)}") continue result = f"成功导入 {success_count} 条数据" if error_msgs: result += "\n错误信息:\n" + "\n".join(error_msgs) return result except Exception as e: log.error(f"批量导入失败: {str(e)}") raise CustomException(msg=f"导入失败: {str(e)}") @classmethod async def import_template_download_gateway_service(cls) -> bytes: """下载导入模板""" header_list = [ '编号 编号 自增', '行车编号', '网关名称', '网关类型', '网关IP地址 ', '网关端口 ', 'PLC品牌', 'PLC型号 ', '端口号', '波特率 ', '数据位 5678', '停止位 ', '检验位 ', 'UUID全局唯一标识', '是否启用(0:启用 1:禁用)', '备注/描述', '创建时间', '更新时间', ] selector_header_list = [] option_list = [] # 添加下拉选项 return ExcelUtil.get_excel_template( header_list=header_list, selector_header_list=selector_header_list, option_list=option_list )