# -*- coding: utf-8 -*- import io from fastapi import UploadFile import pandas as pd from app.core.base_schema import BatchSetAvailable from app.core.exceptions import CustomException from app.utils.excel_util import ExcelUtil from app.core.logger import log from app.api.v1.module_system.auth.schema import AuthSchema from .schema import BizVarDictCreateSchema, BizVarDictUpdateSchema, BizVarDictOutSchema, BizVarDictQueryParam from .crud import BizVarDictCRUD class BizVarDictService: """ 变量信息服务层 """ @classmethod async def detail_vardict_service(cls, auth: AuthSchema, id: int) -> dict: """详情""" obj = await BizVarDictCRUD(auth).get_by_id_vardict_crud(id=id) if not obj: raise CustomException(msg="该数据不存在") return BizVarDictOutSchema.model_validate(obj).model_dump() @classmethod async def list_vardict_service(cls, auth: AuthSchema, search: BizVarDictQueryParam | None = None, order_by: list[dict] | None = None) -> list[dict]: """列表查询""" search_dict = search.__dict__ if search else None obj_list = await BizVarDictCRUD(auth).list_vardict_crud(search=search_dict, order_by=order_by) return [BizVarDictOutSchema.model_validate(obj).model_dump() for obj in obj_list] @classmethod async def page_vardict_service(cls, auth: AuthSchema, page_no: int, page_size: int, search: BizVarDictQueryParam | None = None, order_by: list[dict] | None = None) -> dict: """分页查询(数据库分页)""" search_dict = search.__dict__ if search else {} order_by_list = order_by or [{'id': 'asc'}] offset = (page_no - 1) * page_size result = await BizVarDictCRUD(auth).page_vardict_crud( offset=offset, limit=page_size, order_by=order_by_list, search=search_dict ) return result @classmethod async def create_vardict_service(cls, auth: AuthSchema, data: BizVarDictCreateSchema) -> dict: """创建""" # 检查唯一性约束 obj = await BizVarDictCRUD(auth).create_vardict_crud(data=data) return BizVarDictOutSchema.model_validate(obj).model_dump() @classmethod async def update_vardict_service(cls, auth: AuthSchema, id: int, data: BizVarDictUpdateSchema) -> dict: """更新""" # 检查数据是否存在 obj = await BizVarDictCRUD(auth).get_by_id_vardict_crud(id=id) if not obj: raise CustomException(msg='更新失败,该数据不存在') # 检查唯一性约束 obj = await BizVarDictCRUD(auth).update_vardict_crud(id=id, data=data) return BizVarDictOutSchema.model_validate(obj).model_dump() @classmethod async def delete_vardict_service(cls, auth: AuthSchema, ids: list[int]) -> None: """删除""" if len(ids) < 1: raise CustomException(msg='删除失败,删除对象不能为空') for id in ids: obj = await BizVarDictCRUD(auth).get_by_id_vardict_crud(id=id) if not obj: raise CustomException(msg=f'删除失败,ID为{id}的数据不存在') await BizVarDictCRUD(auth).delete_vardict_crud(ids=ids) @classmethod async def set_available_vardict_service(cls, auth: AuthSchema, data: BatchSetAvailable) -> None: """批量设置状态""" await BizVarDictCRUD(auth).set_available_vardict_crud(ids=data.ids, status=data.status) @classmethod async def batch_export_vardict_service(cls, obj_list: list[dict]) -> bytes: """批量导出""" mapping_dict = { 'var_code': '变量code', 'var_name': '变量名', 'mec_type': '所属机构', 'data_type': '数据类型', 'switch_type': '指令灯颜色', 'is_alert': '是否报警', 'modbus_address': 'modbus地址', 'modbus_data_type': 'modubs数据类型', 'is_reverse': '是否取反', 'unit': '单位', 'crane_id': '行车id', 'order': '排序', 'id': '主键ID', 'uuid': 'UUID全局唯一标识', 'status': '是否启用', 'description': '备注', 'created_time': '创建时间', 'updated_time': '更新时间', 'created_id': '创建人ID', 'updated_id': '更新人ID', 'updated_id': '更新者ID', } data = obj_list.copy() for item in data: # 状态转换 if 'status' in item: item['status'] = '启用' if item.get('status') == '0' else '停用' # 创建者转换 creator_info = item.get('creator') if isinstance(creator_info, dict): item['creator'] = creator_info.get('name', '未知') elif creator_info is None: item['creator'] = '未知' return ExcelUtil.export_list2excel(list_data=data, mapping_dict=mapping_dict) @classmethod async def batch_import_vardict_service(cls, auth: AuthSchema, file: UploadFile, update_support: bool = False) -> str: """批量导入""" header_dict = { '变量code': 'var_code', '变量名': 'var_name', '所属机构': 'mec_type', '数据类型': 'data_type', '指令灯颜色': 'switch_type', '是否报警': 'is_alert', 'modbus地址': 'modbus_address', 'modubs数据类型': 'modbus_data_type', '是否取反': 'is_reverse', '单位': 'unit', '行车id': 'crane_id', '排序': 'order', '主键ID': 'id', 'UUID全局唯一标识': 'uuid', '是否启用': 'status', '备注': 'description', '创建时间': 'created_time', '更新时间': 'updated_time', '创建人ID': 'created_id', '更新人ID': 'updated_id', } try: contents = await file.read() df = pd.read_excel(io.BytesIO(contents)) await file.close() if df.empty: raise CustomException(msg="导入文件为空") missing_headers = [header for header in header_dict.keys() if header not in df.columns] if missing_headers: raise CustomException(msg=f"导入文件缺少必要的列: {', '.join(missing_headers)}") df.rename(columns=header_dict, inplace=True) # 验证必填字段 error_msgs = [] success_count = 0 count = 0 for index, row in df.iterrows(): count += 1 try: data = { "var_code": row['var_code'], "var_name": row['var_name'], "mec_type": row['mec_type'], "data_type": row['data_type'], "switch_type": row['switch_type'], "is_alert": row['is_alert'], "modbus_address": row['modbus_address'], "modbus_data_type": row['modbus_data_type'], "is_reverse": row['is_reverse'], "unit": row['unit'], "crane_id": row['crane_id'], "order": row['order'], "id": row['id'], "uuid": row['uuid'], "status": row['status'], "description": row['description'], "created_time": row['created_time'], "updated_time": row['updated_time'], "created_id": row['created_id'], "updated_id": row['updated_id'], } # 使用CreateSchema做校验后入库 create_schema = BizVarDictCreateSchema.model_validate(data) # 检查唯一性约束 await BizVarDictCRUD(auth).create_vardict_crud(data=create_schema) success_count += 1 except Exception as e: error_msgs.append(f"第{count}行: {str(e)}") continue result = f"成功导入 {success_count} 条数据" if error_msgs: result += "\n错误信息:\n" + "\n".join(error_msgs) return result except Exception as e: log.error(f"批量导入失败: {str(e)}") raise CustomException(msg=f"导入失败: {str(e)}") @classmethod async def import_template_download_vardict_service(cls) -> bytes: """下载导入模板""" header_list = [ '变量code', '变量名', '所属机构', '数据类型', '指令灯颜色', '是否报警', 'modbus地址', 'modubs数据类型', '是否取反', '单位', '行车id', '排序', '主键ID', 'UUID全局唯一标识', '是否启用', '备注', '创建时间', '更新时间', '创建人ID', '更新人ID', ] selector_header_list = [] option_list = [] # 添加下拉选项 return ExcelUtil.get_excel_template( header_list=header_list, selector_header_list=selector_header_list, option_list=option_list )