service.py 9.8 KB


  1. # -*- coding: utf-8 -*-
  2. import io
  3. from fastapi import UploadFile
  4. import pandas as pd
  5. from redis.asyncio import Redis
  6. from app.common.enums import RedisInitKeyConfig
  7. from app.core.base_schema import BatchSetAvailable
  8. from app.core.exceptions import CustomException
  9. from app.core.redis_crud import RedisCURD
  10. from app.utils.excel_util import ExcelUtil
  11. from app.core.logger import log
  12. from app.api.v1.module_system.auth.schema import AuthSchema
  13. from .schema import BizCraneCreateSchema, BizCraneUpdateSchema, BizCraneOutSchema, BizCraneQueryParam
  14. from .crud import BizCraneCRUD
  15. class BizCraneService:
  16. """
  17. 行车信息服务层
  18. """
  19. @classmethod
  20. async def detail_crane_service(cls, auth: AuthSchema, id: int) -> dict:
  21. """详情"""
  22. obj = await BizCraneCRUD(auth).get_by_id_crane_crud(id=id)
  23. if not obj:
  24. raise CustomException(msg="该数据不存在")
  25. return BizCraneOutSchema.model_validate(obj).model_dump()
  26. @classmethod
  27. async def detail_crane_service_for_no(cls, auth: AuthSchema, crane_no: str) -> dict:
  28. """详情"""
  29. obj = await BizCraneCRUD(auth).get_by_id_crane_crud_for_no(crane_no=crane_no)
  30. if not obj:
  31. raise CustomException(msg="该数据不存在")
  32. return BizCraneOutSchema.model_validate(obj).model_dump()
  33. @classmethod
  34. async def list_crane_service(cls, auth: AuthSchema, search: BizCraneQueryParam | None = None, order_by: list[dict] | None = None) -> list[dict]:
  35. """列表查询"""
  36. search_dict = search.__dict__ if search else None
  37. obj_list = await BizCraneCRUD(auth).list_crane_crud(search=search_dict, order_by=order_by)
  38. return [BizCraneOutSchema.model_validate(obj).model_dump() for obj in obj_list]
  39. @classmethod
  40. async def page_crane_service(cls, auth: AuthSchema, page_no: int, page_size: int, search: BizCraneQueryParam | None = None, order_by: list[dict] | None = None) -> dict:
  41. """分页查询(数据库分页)"""
  42. search_dict = search.__dict__ if search else {}
  43. order_by_list = order_by or [{'id': 'asc'}]
  44. offset = (page_no - 1) * page_size
  45. result = await BizCraneCRUD(auth).page_crane_crud(
  46. offset=offset,
  47. limit=page_size,
  48. order_by=order_by_list,
  49. search=search_dict
  50. )
  51. return result
  52. @classmethod
  53. async def create_crane_service(cls, auth: AuthSchema, data: BizCraneCreateSchema,redis: Redis) -> dict:
  54. """创建"""
  55. # 检查唯一性约束
  56. obj = await BizCraneCRUD(auth).create_crane_crud(data=data)
  57. if obj:
  58. # 更新缓存中数据
  59. await RedisCURD(redis).clear(f"{RedisInitKeyConfig.VAR_DICT.key}:*")
  60. return BizCraneOutSchema.model_validate(obj).model_dump()
  61. @classmethod
  62. async def update_crane_service(cls, auth: AuthSchema, id: int, data: BizCraneUpdateSchema,redis: Redis) -> dict:
  63. """更新"""
  64. # 检查数据是否存在
  65. obj = await BizCraneCRUD(auth).get_by_id_crane_crud(id=id)
  66. if not obj:
  67. raise CustomException(msg='更新失败,该数据不存在')
  68. # 检查唯一性约束
  69. obj = await BizCraneCRUD(auth).update_crane_crud(id=id, data=data)
  70. if obj:
  71. # 更新缓存中数据
  72. await RedisCURD(redis).clear(f"{RedisInitKeyConfig.VAR_DICT.key}:*")
  73. return BizCraneOutSchema.model_validate(obj).model_dump()
  74. @classmethod
  75. async def delete_crane_service(cls, auth: AuthSchema, ids: list[int],redis: Redis) -> None:
  76. """删除"""
  77. if len(ids) < 1:
  78. raise CustomException(msg='删除失败,删除对象不能为空')
  79. for id in ids:
  80. obj = await BizCraneCRUD(auth).get_by_id_crane_crud(id=id)
  81. if not obj:
  82. raise CustomException(msg=f'删除失败,ID为{id}的数据不存在')
  83. await BizCraneCRUD(auth).delete_crane_crud(ids=ids)
  84. # 更新缓存中数据
  85. await RedisCURD(redis).clear(f"{RedisInitKeyConfig.VAR_DICT.key}:*")
  86. @classmethod
  87. async def set_available_crane_service(cls, auth: AuthSchema, data: BatchSetAvailable,redis: Redis) -> None:
  88. """批量设置状态"""
  89. await BizCraneCRUD(auth).set_available_crane_crud(ids=data.ids, status=data.status)
  90. # 更新缓存中数据
  91. await RedisCURD(redis).clear(f"{RedisInitKeyConfig.VAR_DICT.key}:*")
  92. @classmethod
  93. async def batch_export_crane_service(cls, obj_list: list[dict]) -> bytes:
  94. """批量导出"""
  95. mapping_dict = {
  96. 'crane_name': '行车名称',
  97. 'crane_no': '行车编号',
  98. 'crane_model': '行车型号',
  99. 'work_span': '工作跨度',
  100. 'work_height': '工作高度',
  101. 'ip_address': 'ip地址',
  102. 'order': '排序',
  103. 'id': '主键ID',
  104. 'uuid': 'UUID全局唯一标识',
  105. 'status': '是否启用(0:启用 1:禁用)',
  106. 'description': '备注/描述',
  107. 'created_time': '创建时间',
  108. 'updated_time': '更新时间',
  109. 'created_id': '创建人ID',
  110. 'updated_id': '更新人ID',
  111. }
  112. data = obj_list.copy()
  113. for item in data:
  114. # 状态转换
  115. if 'status' in item:
  116. item['status'] = '启用' if item.get('status') == '0' else '停用'
  117. # 创建者转换
  118. creator_info = item.get('creator')
  119. if isinstance(creator_info, dict):
  120. item['creator'] = creator_info.get('name', '未知')
  121. elif creator_info is None:
  122. item['creator'] = '未知'
  123. return ExcelUtil.export_list2excel(list_data=data, mapping_dict=mapping_dict)
  124. @classmethod
  125. async def batch_import_crane_service(cls, auth: AuthSchema, file: UploadFile, update_support: bool = False) -> str:
  126. """批量导入"""
  127. header_dict = {
  128. '行车名称': 'crane_name',
  129. '行车编号': 'crane_no',
  130. '行车型号': 'crane_model',
  131. '工作跨度': 'work_span',
  132. '工作高度': 'work_height',
  133. 'ip地址': 'ip_address',
  134. '排序': 'order',
  135. '主键ID': 'id',
  136. 'UUID全局唯一标识': 'uuid',
  137. '是否启用(0:启用 1:禁用)': 'status',
  138. '备注/描述': 'description',
  139. '创建时间': 'created_time',
  140. '更新时间': 'updated_time',
  141. '创建人ID': 'created_id',
  142. '更新人ID': 'updated_id',
  143. }
  144. try:
  145. contents = await file.read()
  146. df = pd.read_excel(io.BytesIO(contents))
  147. await file.close()
  148. if df.empty:
  149. raise CustomException(msg="导入文件为空")
  150. missing_headers = [header for header in header_dict.keys() if header not in df.columns]
  151. if missing_headers:
  152. raise CustomException(msg=f"导入文件缺少必要的列: {', '.join(missing_headers)}")
  153. df.rename(columns=header_dict, inplace=True)
  154. # 验证必填字段
  155. error_msgs = []
  156. success_count = 0
  157. count = 0
  158. for index, row in df.iterrows():
  159. count += 1
  160. try:
  161. data = {
  162. "crane_name": row['crane_name'],
  163. "crane_no": row['crane_no'],
  164. "crane_model": row['crane_model'],
  165. "work_span": row['work_span'],
  166. "work_height": row['work_height'],
  167. "ip_address": row['ip_address'],
  168. "order": row['order'],
  169. "id": row['id'],
  170. "uuid": row['uuid'],
  171. "status": row['status'],
  172. "description": row['description'],
  173. "created_time": row['created_time'],
  174. "updated_time": row['updated_time'],
  175. "created_id": row['created_id'],
  176. "updated_id": row['updated_id'],
  177. }
  178. # 使用CreateSchema做校验后入库
  179. create_schema = BizCraneCreateSchema.model_validate(data)
  180. # 检查唯一性约束
  181. await BizCraneCRUD(auth).create_crane_crud(data=create_schema)
  182. success_count += 1
  183. except Exception as e:
  184. error_msgs.append(f"第{count}行: {str(e)}")
  185. continue
  186. result = f"成功导入 {success_count} 条数据"
  187. if error_msgs:
  188. result += "\n错误信息:\n" + "\n".join(error_msgs)
  189. return result
  190. except Exception as e:
  191. log.error(f"批量导入失败: {str(e)}")
  192. raise CustomException(msg=f"导入失败: {str(e)}")
  193. @classmethod
  194. async def import_template_download_crane_service(cls) -> bytes:
  195. """下载导入模板"""
  196. header_list = [
  197. '行车名称',
  198. '行车编号',
  199. '行车型号',
  200. '工作跨度',
  201. '工作高度',
  202. 'ip地址',
  203. '排序',
  204. '主键ID',
  205. 'UUID全局唯一标识',
  206. '是否启用(0:启用 1:禁用)',
  207. '备注/描述',
  208. '创建时间',
  209. '更新时间',
  210. '创建人ID',
  211. '更新人ID',
  212. ]
  213. selector_header_list = []
  214. option_list = []
  215. # 添加下拉选项
  216. return ExcelUtil.get_excel_template(
  217. header_list=header_list,
  218. selector_header_list=selector_header_list,
  219. option_list=option_list
  220. )