| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252 |
- # -*- coding: utf-8 -*-
- import io
- from fastapi import UploadFile
- import pandas as pd
- from app.core.base_schema import BatchSetAvailable
- from app.core.exceptions import CustomException
- from app.utils.excel_util import ExcelUtil
- from app.core.logger import log
- from app.api.v1.module_system.auth.schema import AuthSchema
- from .schema import GatewayCreateSchema, GatewayUpdateSchema, GatewayOutSchema, GatewayQueryParam
- from .crud import GatewayCRUD
- from ..crane.crud import BizCraneCRUD
- from ..crane.model import BizCraneModel
- class GatewayService:
- """
- 网关信息服务层
- """
-
- @classmethod
- async def detail_gateway_service(cls, auth: AuthSchema, id: int) -> dict:
- """详情"""
- obj = await GatewayCRUD(auth).get_by_id_gateway_crud(id=id)
- if not obj:
- raise CustomException(msg="该数据不存在")
- crane = await BizCraneCRUD(auth).get_by_id_crane_crud_for_no(obj.crane_no)
- res = GatewayOutSchema.model_validate(obj).model_dump()
- res['crane_name'] = crane.crane_name
- return res
-
- @classmethod
- async def list_gateway_service(cls, auth: AuthSchema, search: GatewayQueryParam | None = None, order_by: list[dict] | None = None) -> list[dict]:
- """列表查询"""
- search_dict = search.__dict__ if search else None
- obj_list = await GatewayCRUD(auth).list_gateway_crud(search=search_dict, order_by=order_by)
- return [GatewayOutSchema.model_validate(obj).model_dump() for obj in obj_list]
- @classmethod
- async def page_gateway_service(cls, auth: AuthSchema, page_no: int, page_size: int, search: GatewayQueryParam | None = None, order_by: list[dict] | None = None) -> dict:
- """分页查询(数据库分页)"""
- search_dict = search.__dict__ if search else {}
- order_by_list = order_by or [{'id': 'asc'}]
- offset = (page_no - 1) * page_size
- result = await GatewayCRUD(auth).page_gateway_crud(
- offset=offset,
- limit=page_size,
- order_by=order_by_list,
- search=search_dict
- )
- for item in result.get('items'):
- crane_model: BizCraneModel | None = await BizCraneCRUD(auth).get_by_id_crane_crud_for_no(crane_no=item['crane_no'])
- item['crane_name'] = crane_model.crane_name
- return result
-
- @classmethod
- async def create_gateway_service(cls, auth: AuthSchema, data: GatewayCreateSchema) -> dict:
- """创建"""
- # 检查唯一性约束
- obj = await GatewayCRUD(auth).create_gateway_crud(data=data)
- return GatewayOutSchema.model_validate(obj).model_dump()
-
- @classmethod
- async def update_gateway_service(cls, auth: AuthSchema, id: int, data: GatewayUpdateSchema) -> dict:
- """更新"""
- # 检查数据是否存在
- obj = await GatewayCRUD(auth).get_by_id_gateway_crud(id=id)
- if not obj:
- raise CustomException(msg='更新失败,该数据不存在')
-
- # 检查唯一性约束
-
- obj = await GatewayCRUD(auth).update_gateway_crud(id=id, data=data)
- return GatewayOutSchema.model_validate(obj).model_dump()
-
- @classmethod
- async def delete_gateway_service(cls, auth: AuthSchema, ids: list[int]) -> None:
- """删除"""
- if len(ids) < 1:
- raise CustomException(msg='删除失败,删除对象不能为空')
- for id in ids:
- obj = await GatewayCRUD(auth).get_by_id_gateway_crud(id=id)
- if not obj:
- raise CustomException(msg=f'删除失败,ID为{id}的数据不存在')
- await GatewayCRUD(auth).delete_gateway_crud(ids=ids)
-
- @classmethod
- async def set_available_gateway_service(cls, auth: AuthSchema, data: BatchSetAvailable) -> None:
- """批量设置状态"""
- await GatewayCRUD(auth).set_available_gateway_crud(ids=data.ids, status=data.status)
-
- @classmethod
- async def batch_export_gateway_service(cls, obj_list: list[dict]) -> bytes:
- """批量导出"""
- mapping_dict = {
- 'id': '编号 编号 自增',
- 'crane_no': '行车编号',
- 'gateway_name': '网关名称',
- 'gateway_type': '网关类型',
- 'gateway_ipaddress': '网关IP地址 ',
- 'gateway_port': '网关端口 ',
- 'plc_brand': 'PLC品牌',
- 'plc_model': 'PLC型号 ',
- 'serial_port_name': '端口号',
- 'serial_baud_rate': '波特率 ',
- 'serial_data_bits': '数据位 5678',
- 'serial_stop_bits': '停止位 ',
- 'serial_parity': '检验位 ',
- 'uuid': 'UUID全局唯一标识',
- 'status': '是否启用(0:启用 1:禁用)',
- 'description': '备注/描述',
- 'created_time': '创建时间',
- 'updated_time': '更新时间',
- 'updated_id': '更新者ID',
- }
- data = obj_list.copy()
- for item in data:
- # 状态转换
- if 'status' in item:
- item['status'] = '启用' if item.get('status') == '0' else '停用'
- # 创建者转换
- creator_info = item.get('creator')
- if isinstance(creator_info, dict):
- item['creator'] = creator_info.get('name', '未知')
- elif creator_info is None:
- item['creator'] = '未知'
- return ExcelUtil.export_list2excel(list_data=data, mapping_dict=mapping_dict)
- @classmethod
- async def batch_import_gateway_service(cls, auth: AuthSchema, file: UploadFile, update_support: bool = False) -> str:
- """批量导入"""
- header_dict = {
- '编号 编号 自增': 'id',
- '行车编号': 'crane_no',
- '网关名称': 'gateway_name',
- '网关类型': 'gateway_type',
- '网关IP地址 ': 'gateway_ipaddress',
- '网关端口 ': 'gateway_port',
- 'PLC品牌': 'plc_brand',
- 'PLC型号 ': 'plc_model',
- '端口号': 'serial_port_name',
- '波特率 ': 'serial_baud_rate',
- '数据位 5678': 'serial_data_bits',
- '停止位 ': 'serial_stop_bits',
- '检验位 ': 'serial_parity',
- 'UUID全局唯一标识': 'uuid',
- '是否启用(0:启用 1:禁用)': 'status',
- '备注/描述': 'description',
- '创建时间': 'created_time',
- '更新时间': 'updated_time',
- }
- try:
- contents = await file.read()
- df = pd.read_excel(io.BytesIO(contents))
- await file.close()
-
- if df.empty:
- raise CustomException(msg="导入文件为空")
-
- missing_headers = [header for header in header_dict.keys() if header not in df.columns]
- if missing_headers:
- raise CustomException(msg=f"导入文件缺少必要的列: {', '.join(missing_headers)}")
-
- df.rename(columns=header_dict, inplace=True)
-
- # 验证必填字段
-
- error_msgs = []
- success_count = 0
- count = 0
-
- for index, row in df.iterrows():
- count += 1
- try:
- data = {
- "id": row['id'],
- "crane_no": row['crane_no'],
- "gateway_name": row['gateway_name'],
- "gateway_type": row['gateway_type'],
- "gateway_ipaddress": row['gateway_ipaddress'],
- "gateway_port": row['gateway_port'],
- "plc_brand": row['plc_brand'],
- "plc_model": row['plc_model'],
- "serial_port_name": row['serial_port_name'],
- "serial_baud_rate": row['serial_baud_rate'],
- "serial_data_bits": row['serial_data_bits'],
- "serial_stop_bits": row['serial_stop_bits'],
- "serial_parity": row['serial_parity'],
- "uuid": row['uuid'],
- "status": row['status'],
- "description": row['description'],
- "created_time": row['created_time'],
- "updated_time": row['updated_time'],
- }
- # 使用CreateSchema做校验后入库
- create_schema = GatewayCreateSchema.model_validate(data)
-
- # 检查唯一性约束
-
- await GatewayCRUD(auth).create_gateway_crud(data=create_schema)
- success_count += 1
- except Exception as e:
- error_msgs.append(f"第{count}行: {str(e)}")
- continue
- result = f"成功导入 {success_count} 条数据"
- if error_msgs:
- result += "\n错误信息:\n" + "\n".join(error_msgs)
- return result
-
- except Exception as e:
- log.error(f"批量导入失败: {str(e)}")
- raise CustomException(msg=f"导入失败: {str(e)}")
-
- @classmethod
- async def import_template_download_gateway_service(cls) -> bytes:
- """下载导入模板"""
- header_list = [
- '编号 编号 自增',
- '行车编号',
- '网关名称',
- '网关类型',
- '网关IP地址 ',
- '网关端口 ',
- 'PLC品牌',
- 'PLC型号 ',
- '端口号',
- '波特率 ',
- '数据位 5678',
- '停止位 ',
- '检验位 ',
- 'UUID全局唯一标识',
- '是否启用(0:启用 1:禁用)',
- '备注/描述',
- '创建时间',
- '更新时间',
- ]
- selector_header_list = []
- option_list = []
-
- # 添加下拉选项
-
- return ExcelUtil.get_excel_template(
- header_list=header_list,
- selector_header_list=selector_header_list,
- option_list=option_list
- )
|