# -*- coding: utf-8 -*- import io from fastapi import UploadFile import pandas as pd from redis.asyncio import Redis from app.common.enums import RedisInitKeyConfig from app.core.base_schema import BatchSetAvailable from app.core.exceptions import CustomException from app.core.redis_crud import RedisCURD from app.utils.excel_util import ExcelUtil from app.core.logger import log from app.api.v1.module_system.auth.schema import AuthSchema from .schema import BizCraneCreateSchema, BizCraneUpdateSchema, BizCraneOutSchema, BizCraneQueryParam from .crud import BizCraneCRUD class BizCraneService: """ 行车信息服务层 """ @classmethod async def detail_crane_service(cls, auth: AuthSchema, id: int) -> dict: """详情""" obj = await BizCraneCRUD(auth).get_by_id_crane_crud(id=id) if not obj: raise CustomException(msg="该数据不存在") return BizCraneOutSchema.model_validate(obj).model_dump() @classmethod async def detail_crane_service_for_no(cls, auth: AuthSchema, crane_no: str) -> dict: """详情""" obj = await BizCraneCRUD(auth).get_by_id_crane_crud_for_no(crane_no=crane_no) if not obj: raise CustomException(msg="该数据不存在") return BizCraneOutSchema.model_validate(obj).model_dump() @classmethod async def list_crane_service(cls, auth: AuthSchema, search: BizCraneQueryParam | None = None, order_by: list[dict] | None = None) -> list[dict]: """列表查询""" search_dict = search.__dict__ if search else None obj_list = await BizCraneCRUD(auth).list_crane_crud(search=search_dict, order_by=order_by) return [BizCraneOutSchema.model_validate(obj).model_dump() for obj in obj_list] @classmethod async def page_crane_service(cls, auth: AuthSchema, page_no: int, page_size: int, search: BizCraneQueryParam | None = None, order_by: list[dict] | None = None) -> dict: """分页查询(数据库分页)""" search_dict = search.__dict__ if search else {} order_by_list = order_by or [{'id': 'asc'}] offset = (page_no - 1) * page_size result = await BizCraneCRUD(auth).page_crane_crud( offset=offset, limit=page_size, order_by=order_by_list, search=search_dict ) return result @classmethod async def create_crane_service(cls, auth: AuthSchema, data: BizCraneCreateSchema,redis: Redis) -> dict: """创建""" # 检查唯一性约束 obj = await BizCraneCRUD(auth).create_crane_crud(data=data) if obj: # 更新缓存中数据 await RedisCURD(redis).clear(f"{RedisInitKeyConfig.VAR_DICT.key}:*") return BizCraneOutSchema.model_validate(obj).model_dump() @classmethod async def update_crane_service(cls, auth: AuthSchema, id: int, data: BizCraneUpdateSchema,redis: Redis) -> dict: """更新""" # 检查数据是否存在 obj = await BizCraneCRUD(auth).get_by_id_crane_crud(id=id) if not obj: raise CustomException(msg='更新失败,该数据不存在') # 检查唯一性约束 obj = await BizCraneCRUD(auth).update_crane_crud(id=id, data=data) if obj: # 更新缓存中数据 await RedisCURD(redis).clear(f"{RedisInitKeyConfig.VAR_DICT.key}:*") return BizCraneOutSchema.model_validate(obj).model_dump() @classmethod async def delete_crane_service(cls, auth: AuthSchema, ids: list[int],redis: Redis) -> None: """删除""" if len(ids) < 1: raise CustomException(msg='删除失败,删除对象不能为空') for id in ids: obj = await BizCraneCRUD(auth).get_by_id_crane_crud(id=id) if not obj: raise CustomException(msg=f'删除失败,ID为{id}的数据不存在') await BizCraneCRUD(auth).delete_crane_crud(ids=ids) # 更新缓存中数据 await RedisCURD(redis).clear(f"{RedisInitKeyConfig.VAR_DICT.key}:*") @classmethod async def set_available_crane_service(cls, auth: AuthSchema, data: BatchSetAvailable,redis: Redis) -> None: """批量设置状态""" await BizCraneCRUD(auth).set_available_crane_crud(ids=data.ids, status=data.status) # 更新缓存中数据 await RedisCURD(redis).clear(f"{RedisInitKeyConfig.VAR_DICT.key}:*") @classmethod async def batch_export_crane_service(cls, obj_list: list[dict]) -> bytes: """批量导出""" mapping_dict = { 'crane_name': '行车名称', 'crane_no': '行车编号', 'crane_model': '行车型号', 'work_span': '工作跨度', 'work_height': '工作高度', 'ip_address': 'ip地址', 'order': '排序', 'id': '主键ID', 'uuid': 'UUID全局唯一标识', 'status': '是否启用(0:启用 1:禁用)', 'description': '备注/描述', 'created_time': '创建时间', 'updated_time': '更新时间', 'created_id': '创建人ID', 'updated_id': '更新人ID', } data = obj_list.copy() for item in data: # 状态转换 if 'status' in item: item['status'] = '启用' if item.get('status') == '0' else '停用' # 创建者转换 creator_info = item.get('creator') if isinstance(creator_info, dict): item['creator'] = creator_info.get('name', '未知') elif creator_info is None: item['creator'] = '未知' return ExcelUtil.export_list2excel(list_data=data, mapping_dict=mapping_dict) @classmethod async def batch_import_crane_service(cls, auth: AuthSchema, file: UploadFile, update_support: bool = False) -> str: """批量导入""" header_dict = { '行车名称': 'crane_name', '行车编号': 'crane_no', '行车型号': 'crane_model', '工作跨度': 'work_span', '工作高度': 'work_height', 'ip地址': 'ip_address', '排序': 'order', '主键ID': 'id', 'UUID全局唯一标识': 'uuid', '是否启用(0:启用 1:禁用)': 'status', '备注/描述': 'description', '创建时间': 'created_time', '更新时间': 'updated_time', '创建人ID': 'created_id', '更新人ID': 'updated_id', } try: contents = await file.read() df = pd.read_excel(io.BytesIO(contents)) await file.close() if df.empty: raise CustomException(msg="导入文件为空") missing_headers = [header for header in header_dict.keys() if header not in df.columns] if missing_headers: raise CustomException(msg=f"导入文件缺少必要的列: {', '.join(missing_headers)}") df.rename(columns=header_dict, inplace=True) # 验证必填字段 error_msgs = [] success_count = 0 count = 0 for index, row in df.iterrows(): count += 1 try: data = { "crane_name": row['crane_name'], "crane_no": row['crane_no'], "crane_model": row['crane_model'], "work_span": row['work_span'], "work_height": row['work_height'], "ip_address": row['ip_address'], "order": row['order'], "id": row['id'], "uuid": row['uuid'], "status": row['status'], "description": row['description'], "created_time": row['created_time'], "updated_time": row['updated_time'], "created_id": row['created_id'], "updated_id": row['updated_id'], } # 使用CreateSchema做校验后入库 create_schema = BizCraneCreateSchema.model_validate(data) # 检查唯一性约束 await BizCraneCRUD(auth).create_crane_crud(data=create_schema) success_count += 1 except Exception as e: error_msgs.append(f"第{count}行: {str(e)}") continue result = f"成功导入 {success_count} 条数据" if error_msgs: result += "\n错误信息:\n" + "\n".join(error_msgs) return result except Exception as e: log.error(f"批量导入失败: {str(e)}") raise CustomException(msg=f"导入失败: {str(e)}") @classmethod async def import_template_download_crane_service(cls) -> bytes: """下载导入模板""" header_list = [ '行车名称', '行车编号', '行车型号', '工作跨度', '工作高度', 'ip地址', '排序', '主键ID', 'UUID全局唯一标识', '是否启用(0:启用 1:禁用)', '备注/描述', '创建时间', '更新时间', '创建人ID', '更新人ID', ] selector_header_list = [] option_list = [] # 添加下拉选项 return ExcelUtil.get_excel_template( header_list=header_list, selector_header_list=selector_header_list, option_list=option_list )