service.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428
  1. # -*- coding: utf-8 -*-
  2. import io
  3. import json
  4. from fastapi import UploadFile
  5. import pandas as pd
  6. from redis.asyncio.client import Redis
  7. from app.core.database import async_db_session
  8. from app.core.redis_crud import RedisCURD
  9. from app.common.enums import RedisInitKeyConfig
  10. from app.core.base_schema import BatchSetAvailable
  11. from app.core.exceptions import CustomException
  12. from app.utils.excel_util import ExcelUtil
  13. from app.core.logger import log
  14. from app.api.v1.module_system.auth.schema import AuthSchema
  15. from .schema import BizVarDictCreateSchema, BizVarDictUpdateSchema, BizVarDictOutSchema, BizVarDictQueryParam
  16. from ..crane.crud import BizCraneCRUD
  17. from ..crane.model import BizCraneModel
  18. from ..gateway.crud import GatewayCRUD
  19. from ..gateway.model import GatewayModel
  20. from ..mec.crud import BizMecCRUD
  21. from ..vardict.crud import BizVarDictCRUD
  22. from ..vardict.schema import VarDictMecGroupSchema
  23. class BizVarDictService:
  24. """
  25. 变量信息服务层
  26. """
  27. @classmethod
  28. async def detail_vardict_service(cls, auth: AuthSchema, id: int) -> dict:
  29. """详情"""
  30. obj = await BizVarDictCRUD(auth).get_by_id_vardict_crud(id=id)
  31. if not obj:
  32. raise CustomException(msg="该数据不存在")
  33. crane = await BizCraneCRUD(auth).get_by_id_crane_crud_for_no(obj.crane_no)
  34. gateway = await GatewayCRUD(auth).get_by_id_gateway_crud(obj.gateway_id)
  35. res = BizVarDictOutSchema.model_validate(obj).model_dump()
  36. res['crane_name'] = crane.crane_name
  37. res['gateway_name'] = gateway.gateway_name if gateway else ""
  38. return res
  39. @classmethod
  40. async def list_vardict_service(cls, auth: AuthSchema, search: BizVarDictQueryParam | None = None, order_by: list[dict] | None = None) -> list[dict]:
  41. """列表查询"""
  42. search_dict = search.__dict__ if search else None
  43. obj_list = await BizVarDictCRUD(auth).list_vardict_crud(search=search_dict, order_by=order_by)
  44. return [BizVarDictOutSchema.model_validate(obj).model_dump() for obj in obj_list]
  45. @classmethod
  46. async def page_vardict_service(cls, auth: AuthSchema, page_no: int, page_size: int, search: BizVarDictQueryParam | None = None, order_by: list[dict] | None = None) -> dict:
  47. """分页查询(数据库分页)"""
  48. search_dict = search.__dict__ if search else {}
  49. order_by_list = order_by or [{'id': 'asc'}]
  50. offset = (page_no - 1) * page_size
  51. result = await BizVarDictCRUD(auth).page_vardict_crud(
  52. offset=offset,
  53. limit=page_size,
  54. order_by=order_by_list,
  55. search=search_dict
  56. )
  57. for item in result.get('items'):
  58. crane_model: BizCraneModel | None = await BizCraneCRUD(auth).get_by_id_crane_crud_for_no(crane_no=item['crane_no'])
  59. gateway_model: GatewayModel | None = await GatewayCRUD(auth).get_by_id_gateway_crud(id=item['gateway_id'])
  60. item['crane_name'] = crane_model.crane_name
  61. item['gateway_name'] = gateway_model.gateway_name if gateway_model else ""
  62. return result
  63. @classmethod
  64. async def create_vardict_service(cls, auth: AuthSchema, data: BizVarDictCreateSchema,redis: Redis) -> dict:
  65. """创建"""
  66. # 检查唯一性约束
  67. obj = await BizVarDictCRUD(auth).create_vardict_crud(data=data)
  68. if obj:
  69. # 更新缓存中数据
  70. await RedisCURD(redis).clear(f"{RedisInitKeyConfig.VAR_DICT.key}:*")
  71. return BizVarDictOutSchema.model_validate(obj).model_dump()
  72. @classmethod
  73. async def update_vardict_service(cls, auth: AuthSchema, id: int, data: BizVarDictUpdateSchema,redis: Redis) -> dict:
  74. """更新"""
  75. # 检查数据是否存在
  76. obj = await BizVarDictCRUD(auth).get_by_id_vardict_crud(id=id)
  77. if not obj:
  78. raise CustomException(msg='更新失败,该数据不存在')
  79. # 检查唯一性约束
  80. obj = await BizVarDictCRUD(auth).update_vardict_crud(id=id, data=data)
  81. if obj:
  82. # 更新缓存中数据
  83. await RedisCURD(redis).clear(f"{RedisInitKeyConfig.VAR_DICT.key}:*")
  84. return BizVarDictOutSchema.model_validate(obj).model_dump()
  85. @classmethod
  86. async def delete_vardict_service(cls, auth: AuthSchema, ids: list[int],redis: Redis) -> None:
  87. """删除"""
  88. if len(ids) < 1:
  89. raise CustomException(msg='删除失败,删除对象不能为空')
  90. for id in ids:
  91. obj = await BizVarDictCRUD(auth).get_by_id_vardict_crud(id=id)
  92. if not obj:
  93. raise CustomException(msg=f'删除失败,ID为{id}的数据不存在')
  94. await BizVarDictCRUD(auth).delete_vardict_crud(ids=ids)
  95. # 更新缓存中数据
  96. await RedisCURD(redis).clear(f"{RedisInitKeyConfig.VAR_DICT.key}:*")
  97. @classmethod
  98. async def set_available_vardict_service(cls, auth: AuthSchema, data: BatchSetAvailable,redis: Redis) -> None:
  99. """批量设置状态"""
  100. await BizVarDictCRUD(auth).set_available_vardict_crud(ids=data.ids, status=data.status)
  101. # 更新缓存中数据
  102. await RedisCURD(redis).clear(f"{RedisInitKeyConfig.VAR_DICT.key}:*")
  103. @classmethod
  104. async def batch_export_vardict_service(cls, obj_list: list[dict]) -> bytes:
  105. """批量导出"""
  106. mapping_dict = {
  107. 'id': 'id ',
  108. 'crane_no': '',
  109. 'var_code': '变量code',
  110. 'var_name': '变量名称',
  111. 'mec_type': '所属机构',
  112. 'data_type': '数据类型',
  113. 'switch_type': '变量类型',
  114. 'addr': 'modbus地址',
  115. 'gateway_id': '网关',
  116. 'var_sort': '排序',
  117. 'var_group': '变量分组',
  118. 'var_category': '变量分类',
  119. 'translate': '绑定公式',
  120. 'device_no': '关联设备编号 ',
  121. 'is_reverse': '是否取反',
  122. 'is_top_show': '是否重点显示',
  123. 'is_save': '是否生成',
  124. 'is_calibration': '是否标定',
  125. 'is_overview_top_show': '是否首页重点显示',
  126. 'is_home_page_show': '是否首页显示',
  127. 'is_diagnose': '是否启用诊断专家',
  128. 'is_upload': '是否上传云平台',
  129. 'diagnosis_id': '关联诊断专家',
  130. 'status': '是否启用',
  131. 'description': '备注/描述',
  132. 'create_time': '创建时间 ',
  133. 'updated_time': '更新时间',
  134. 'created_id': '创建人ID',
  135. 'updated_id': '更新人ID',
  136. }
  137. data = obj_list.copy()
  138. for item in data:
  139. # 状态转换
  140. if 'status' in item:
  141. item['status'] = '启用' if item.get('status') == '0' else '停用'
  142. # 创建者转换
  143. creator_info = item.get('creator')
  144. if isinstance(creator_info, dict):
  145. item['creator'] = creator_info.get('name', '未知')
  146. elif creator_info is None:
  147. item['creator'] = '未知'
  148. return ExcelUtil.export_list2excel(list_data=data, mapping_dict=mapping_dict)
  149. @classmethod
  150. async def batch_import_vardict_service(cls, auth: AuthSchema, file: UploadFile, update_support: bool = False) -> str:
  151. """批量导入"""
  152. header_dict = {
  153. 'id ': 'id',
  154. '': 'crane_no',
  155. '变量code': 'var_code',
  156. '变量名称': 'var_name',
  157. '所属机构': 'mec_type',
  158. '数据类型': 'data_type',
  159. '变量类型': 'switch_type',
  160. 'modbus地址': 'addr',
  161. '网关': 'gateway_id',
  162. '排序': 'var_sort',
  163. '变量分组': 'var_group',
  164. '变量分类': 'var_category',
  165. '绑定公式': 'translate',
  166. '关联设备编号 ': 'device_no',
  167. '是否取反': 'is_reverse',
  168. '是否重点显示': 'is_top_show',
  169. '是否生成': 'is_save',
  170. '是否标定': 'is_calibration',
  171. '是否首页重点显示': 'is_overview_top_show',
  172. '是否首页显示': 'is_home_page_show',
  173. '是否启用诊断专家': 'is_diagnose',
  174. '是否上传云平台': 'is_upload',
  175. '关联诊断专家': 'diagnosis_id',
  176. '是否启用': 'status',
  177. '备注/描述': 'description',
  178. '创建时间 ': 'create_time',
  179. '更新时间': 'updated_time',
  180. '创建人ID': 'created_id',
  181. '更新人ID': 'updated_id',
  182. }
  183. try:
  184. contents = await file.read()
  185. df = pd.read_excel(io.BytesIO(contents))
  186. await file.close()
  187. if df.empty:
  188. raise CustomException(msg="导入文件为空")
  189. missing_headers = [header for header in header_dict.keys() if header not in df.columns]
  190. if missing_headers:
  191. raise CustomException(msg=f"导入文件缺少必要的列: {', '.join(missing_headers)}")
  192. df.rename(columns=header_dict, inplace=True)
  193. # 验证必填字段
  194. error_msgs = []
  195. success_count = 0
  196. count = 0
  197. for index, row in df.iterrows():
  198. count += 1
  199. try:
  200. data = {
  201. "id": row['id'],
  202. "crane_no": row['crane_no'],
  203. "var_code": row['var_code'],
  204. "var_name": row['var_name'],
  205. "mec_type": row['mec_type'],
  206. "data_type": row['data_type'],
  207. "switch_type": row['switch_type'],
  208. "addr": row['addr'],
  209. "gateway_id": row['gateway_id'],
  210. "var_sort": row['var_sort'],
  211. "var_group": row['var_group'],
  212. "var_category": row['var_category'],
  213. "translate": row['translate'],
  214. "device_no": row['device_no'],
  215. "is_reverse": row['is_reverse'],
  216. "is_top_show": row['is_top_show'],
  217. "is_save": row['is_save'],
  218. "is_calibration": row['is_calibration'],
  219. "is_overview_top_show": row['is_overview_top_show'],
  220. "is_home_page_show": row['is_home_page_show'],
  221. "is_diagnose": row['is_diagnose'],
  222. "is_upload": row['is_upload'],
  223. "diagnosis_id": row['diagnosis_id'],
  224. "status": row['status'],
  225. "description": row['description'],
  226. "create_time": row['create_time'],
  227. "updated_time": row['updated_time'],
  228. "created_id": row['created_id'],
  229. "updated_id": row['updated_id'],
  230. }
  231. # 使用CreateSchema做校验后入库
  232. create_schema = BizVarDictCreateSchema.model_validate(data)
  233. # 检查唯一性约束
  234. await BizVarDictCRUD(auth).create_vardict_crud(data=create_schema)
  235. success_count += 1
  236. except Exception as e:
  237. error_msgs.append(f"第{count}行: {str(e)}")
  238. continue
  239. result = f"成功导入 {success_count} 条数据"
  240. if error_msgs:
  241. result += "\n错误信息:\n" + "\n".join(error_msgs)
  242. return result
  243. except Exception as e:
  244. log.error(f"批量导入失败: {str(e)}")
  245. raise CustomException(msg=f"导入失败: {str(e)}")
  246. @classmethod
  247. async def import_template_download_vardict_service(cls) -> bytes:
  248. """下载导入模板"""
  249. header_list = [
  250. 'id ',
  251. '',
  252. '变量code',
  253. '变量名称',
  254. '所属机构',
  255. '数据类型',
  256. '变量类型',
  257. 'modbus地址',
  258. '网关',
  259. '排序',
  260. '变量分组',
  261. '变量分类',
  262. '绑定公式',
  263. '关联设备编号 ',
  264. '是否取反',
  265. '是否重点显示',
  266. '是否生成',
  267. '是否标定',
  268. '是否首页重点显示',
  269. '是否首页显示',
  270. '是否启用诊断专家',
  271. '是否上传云平台',
  272. '关联诊断专家',
  273. '是否启用',
  274. '备注/描述',
  275. '创建时间 ',
  276. '更新时间',
  277. '创建人ID',
  278. '更新人ID',
  279. ]
  280. selector_header_list = []
  281. option_list = []
  282. # 添加下拉选项
  283. selector_header_list.append('所属机构')
  284. option_list.append({'所属机构': []})
  285. selector_header_list.append('数据类型')
  286. option_list.append({'数据类型': []})
  287. selector_header_list.append('变量类型')
  288. option_list.append({'变量类型': []})
  289. selector_header_list.append('变量分类')
  290. option_list.append({'变量分类': []})
  291. return ExcelUtil.get_excel_template(
  292. header_list=header_list,
  293. selector_header_list=selector_header_list,
  294. option_list=option_list
  295. )
  296. @classmethod
  297. async def get_vardict_group_service(cls,auth: AuthSchema, redis: Redis,id: int):
  298. """
  299. 从缓存获取变量分组数据列表信息service
  300. 参数:
  301. - redis (Redis): Redis客户端
  302. - id (int): 行车id
  303. 返回:
  304. - list[dict]: 变量分组数据列表
  305. """
  306. try:
  307. crane = await BizCraneCRUD(auth).get_by_id_crane_crud(id)
  308. redis_key = f"{RedisInitKeyConfig.VAR_DICT.key}:{crane.crane_no}"
  309. obj_list_dict = await RedisCURD(redis).get(redis_key)
  310. # 确保返回数据正确序列化
  311. if obj_list_dict:
  312. if isinstance(obj_list_dict, str):
  313. try:
  314. return json.loads(obj_list_dict)
  315. except json.JSONDecodeError:
  316. log.warning(f"变量分组数据反序列化失败,尝试重新初始化缓存: {crane.crane_name}")
  317. elif isinstance(obj_list_dict, list):
  318. return obj_list_dict
  319. # 缓存不存在或格式错误时重新初始化
  320. await cls.init_vardict_group_service(redis)
  321. obj_list_dict = await RedisCURD(redis).get(redis_key)
  322. if not obj_list_dict:
  323. raise CustomException(msg="变量分组数据不存在")
  324. # 再次确保返回数据正确序列化
  325. if isinstance(obj_list_dict, str):
  326. try:
  327. return json.loads(obj_list_dict)
  328. except json.JSONDecodeError:
  329. raise CustomException(msg="变量分组数据格式错误")
  330. return obj_list_dict
  331. except CustomException:
  332. raise
  333. except Exception as e:
  334. log.error(f"获取变量分组数据缓存失败: {str(e)}")
  335. raise CustomException(msg=f"获取变量分组数据失败: {str(e)}")
  336. @classmethod
  337. async def init_vardict_group_service(cls, redis: Redis):
  338. """
  339. 应用初始化: 获取所有天车变量数据信息并缓存service
  340. 参数:
  341. - redis (Redis): Redis客户端
  342. 返回:
  343. - None
  344. """
  345. try:
  346. async with async_db_session() as session:
  347. async with session.begin():
  348. # 在初始化过程中,不需要检查数据权限
  349. auth = AuthSchema(db=session, check_data_scope=False)
  350. crane_list = await BizCraneCRUD(auth).list(search={'status':'1'},order_by=[{'order':'asc'}])
  351. if not crane_list:
  352. log.warning("未找到任何天车数据")
  353. return
  354. success_count = 0
  355. fail_count = 0
  356. for crane in crane_list:
  357. crane_no = crane.crane_no
  358. crane_name = crane.crane_name
  359. try:
  360. varDictMecGroupSchema: list[VarDictMecGroupSchema] = []
  361. mec_list = await BizMecCRUD(auth).list(search={'crane_no':crane_no,'status':'1'},order_by=[{'sort':'asc'}])
  362. for mec in mec_list:
  363. # 获取分组数据
  364. varDicts = await BizVarDictCRUD(auth).list(search={'crane_no':crane_no,'mec_type':mec.mec_type,'status':'1'},order_by=[{'var_sort':'asc'}])
  365. if not varDicts:
  366. continue
  367. varDictMecGroupSchema.append(
  368. VarDictMecGroupSchema(mec_type=mec.mec_type, varList_simple=varDicts))
  369. # 保存到Redis并设置过期时间
  370. redis_key = f"{RedisInitKeyConfig.VAR_DICT.key}:{crane_no}"
  371. var_dict_list = [item.model_dump() for item in varDictMecGroupSchema]
  372. value = json.dumps(var_dict_list, ensure_ascii=False)
  373. await RedisCURD(redis).set(
  374. key=redis_key,
  375. value=value,
  376. )
  377. success_count += 1
  378. log.info(f"✅ 变量数据缓存成功: {crane_name}")
  379. except Exception as e:
  380. fail_count += 1
  381. log.error(f"❌ 初始化变量数据失败 [{crane_name}]: {e}")
  382. log.info(f"变量数据初始化完成 - 成功: {success_count}, 失败: {fail_count}")
  383. except Exception as e:
  384. log.error(f"变量数据初始化过程发生错误: {e}")
  385. # 只在严重错误时抛出异常,允许单个字典加载失败
  386. raise CustomException(msg=f"变量数据初始化失败: {str(e)}")