service.py 48 KB


  1. # -*- coding: utf-8 -*-
  2. import asyncio
  3. import io
  4. import json
  5. from collections import defaultdict
  6. from datetime import datetime, timedelta
  7. from typing import Any, List, Dict, Optional
  8. from fastapi import UploadFile
  9. import pandas as pd
  10. from redis.asyncio.client import Redis
  11. from app.core.database import async_db_session
  12. from app.core.redis_crud import RedisCURD
  13. from app.common.enums import RedisInitKeyConfig
  14. from app.core.base_schema import BatchSetAvailable
  15. from app.core.exceptions import CustomException
  16. from app.utils.excel_util import ExcelUtil
  17. from app.core.logger import log
  18. from app.api.v1.module_system.auth.schema import AuthSchema
  19. from .schema import BizVarDictCreateSchema, BizVarDictUpdateSchema, BizVarDictOutSchema, BizVarDictQueryParam
  20. from ..crane.crud import BizCraneCRUD
  21. from ..crane.model import BizCraneModel
  22. from ..gateway.crud import GatewayCRUD
  23. from ..gateway.model import GatewayModel
  24. from ..mec.crud import BizMecCRUD
  25. from ..vardict.crud import BizVarDictCRUD
  26. from ..vardict.schema import VarDictMecGroupSchema
  27. from app.utils.tdengine_util import tdengine_rest_query, format_rest_result, get_table_total_count
  28. class BizVarDictService:
  29. """
  30. 变量信息服务层
  31. """
  32. @classmethod
  33. async def detail_vardict_service(cls, auth: AuthSchema, id: int) -> dict:
  34. """详情"""
  35. obj = await BizVarDictCRUD(auth).get_by_id_vardict_crud(id=id)
  36. if not obj:
  37. raise CustomException(msg="该数据不存在")
  38. crane = await BizCraneCRUD(auth).get_by_id_crane_crud_for_no(obj.crane_no)
  39. gateway = await GatewayCRUD(auth).get_by_id_gateway_crud(obj.gateway_id)
  40. res = BizVarDictOutSchema.model_validate(obj).model_dump()
  41. res['crane_name'] = crane.crane_name
  42. res['gateway_name'] = gateway.gateway_name if gateway else ""
  43. return res
  44. @classmethod
  45. async def list_vardict_service(cls, auth: AuthSchema, search: BizVarDictQueryParam | None = None, order_by: list[dict] | None = None) -> list[dict]:
  46. """列表查询"""
  47. search_dict = search.__dict__ if search else None
  48. obj_list = await BizVarDictCRUD(auth).list_vardict_crud(search=search_dict, order_by=order_by)
  49. return [BizVarDictOutSchema.model_validate(obj).model_dump() for obj in obj_list]
  50. @classmethod
  51. async def page_vardict_service(cls, auth: AuthSchema, page_no: int, page_size: int, search: BizVarDictQueryParam | None = None, order_by: list[dict] | None = None) -> dict:
  52. """分页查询(数据库分页)"""
  53. search_dict = search.__dict__ if search else {}
  54. order_by_list = order_by or [{'id': 'asc'}]
  55. offset = (page_no - 1) * page_size
  56. result = await BizVarDictCRUD(auth).page_vardict_crud(
  57. offset=offset,
  58. limit=page_size,
  59. order_by=order_by_list,
  60. search=search_dict
  61. )
  62. for item in result.get('items'):
  63. crane_model: BizCraneModel | None = await BizCraneCRUD(auth).get_by_id_crane_crud_for_no(crane_no=item['crane_no'])
  64. gateway_model: GatewayModel | None = await GatewayCRUD(auth).get_by_id_gateway_crud(id=item['gateway_id'])
  65. item['crane_name'] = crane_model.crane_name
  66. item['gateway_name'] = gateway_model.gateway_name if gateway_model else ""
  67. return result
  68. @classmethod
  69. async def vardict_alarms_list(cls, auth: AuthSchema, search: BizVarDictQueryParam | None = None) -> list[dict]:
  70. sql_parts = [
  71. """SELECT a.*,b.crane_name
  72. FROM biz_var_dict as a
  73. LEFT JOIN biz_crane as b ON a.crane_no = b.crane_no
  74. WHERE a.`status` = :status AND b.`status` = :status AND a.data_type <= 1 AND a.switch_type >= 2"""
  75. ]
  76. business_params: dict[str, Any] = {"status": 1}
  77. if search and search.crane_no:
  78. sql_parts.append(f"AND a.crane_no = :crane_no")
  79. business_params["crane_no"] = search.crane_no
  80. if search and search.mec_type:
  81. sql_parts.append(f"AND a.mec_type = :mec_type")
  82. business_params["mec_type"] = search.mec_type
  83. sql_parts.append("ORDER BY a.switch_type desc,b.`order` asc,a.mec_type asc,a.var_sort asc")
  84. final_sql = " ".join(sql_parts)
  85. try:
  86. obj_list = await BizVarDictCRUD(auth).list_sql(final_sql, business_params)
  87. return [BizVarDictOutSchema.model_validate(obj).model_dump() for obj in obj_list]
  88. except Exception as e:
  89. raise CustomException(msg=f"查询变量字典报警列表失败:{str(e)}")
  90. @classmethod
  91. async def vardict_operation_record_list(cls, auth: AuthSchema, search: BizVarDictQueryParam | None = None) -> list[dict]:
  92. crane_no = search.crane_no
  93. mec_type = search.mec_type
  94. sql_parts = [
  95. """SELECT a.*,b.crane_name
  96. FROM biz_var_dict as a
  97. LEFT JOIN biz_crane as b ON a.crane_no = b.crane_no
  98. WHERE a.`status` = :status AND b.`status` = :status AND a.data_type <= 1 AND a.switch_type <= 1"""
  99. ]
  100. business_params: dict[str, Any] = {"status": 1}
  101. if crane_no:
  102. sql_parts.append(f"AND a.crane_no = :crane_no")
  103. business_params["crane_no"] = crane_no
  104. if mec_type:
  105. sql_parts.append(f"AND a.mec_type = :mec_type")
  106. business_params["mec_type"] = mec_type
  107. sql_parts.append("ORDER BY b.`order` asc,a.mec_type asc,a.var_sort asc")
  108. final_sql = " ".join(sql_parts)
  109. try:
  110. obj_list = await BizVarDictCRUD(auth).list_sql(final_sql, business_params)
  111. return [BizVarDictOutSchema.model_validate(obj).model_dump() for obj in obj_list]
  112. except Exception as e:
  113. raise CustomException(msg=f"查询变量字典报警列表失败:{str(e)}")
  114. @classmethod
  115. async def vardict_analog_list(cls, auth: AuthSchema, search: BizVarDictQueryParam | None = None) -> list[dict]:
  116. crane_no = search.crane_no
  117. mec_type = search.mec_type
  118. sql_parts = [
  119. """SELECT a.*,b.crane_name
  120. FROM biz_var_dict as a
  121. LEFT JOIN biz_crane as b ON a.crane_no = b.crane_no
  122. WHERE a.`status` = :status AND b.`status` = :status AND a.data_type >= 2"""
  123. ]
  124. business_params: dict[str, Any] = {"status": 1}
  125. if crane_no:
  126. sql_parts.append(f"AND a.crane_no = :crane_no")
  127. business_params["crane_no"] = crane_no
  128. if mec_type:
  129. sql_parts.append(f"AND a.mec_type = :mec_type")
  130. business_params["mec_type"] = mec_type
  131. sql_parts.append("ORDER BY b.`order` asc,a.mec_type asc,a.var_sort asc")
  132. final_sql = " ".join(sql_parts)
  133. try:
  134. obj_list = await BizVarDictCRUD(auth).list_sql(final_sql, business_params)
  135. return [BizVarDictOutSchema.model_validate(obj).model_dump() for obj in obj_list]
  136. except Exception as e:
  137. raise CustomException(msg=f"查询变量字典模拟量列表失败:{str(e)}")
  138. @classmethod
  139. async def create_vardict_service(cls, auth: AuthSchema, data: BizVarDictCreateSchema,redis: Redis) -> dict:
  140. """创建"""
  141. # 检查唯一性约束
  142. obj = await BizVarDictCRUD(auth).create_vardict_crud(data=data)
  143. if obj:
  144. # 更新缓存中数据
  145. await RedisCURD(redis).clear(f"{RedisInitKeyConfig.VAR_DICT.key}:{data.crane_no}")
  146. return BizVarDictOutSchema.model_validate(obj).model_dump()
  147. @classmethod
  148. async def update_vardict_service(cls, auth: AuthSchema, id: int, data: BizVarDictUpdateSchema,redis: Redis) -> dict:
  149. """更新"""
  150. # 检查数据是否存在
  151. obj = await BizVarDictCRUD(auth).get_by_id_vardict_crud(id=id)
  152. if not obj:
  153. raise CustomException(msg='更新失败,该数据不存在')
  154. # 检查唯一性约束
  155. obj = await BizVarDictCRUD(auth).update_vardict_crud(id=id, data=data)
  156. if obj:
  157. # 更新缓存中数据
  158. await RedisCURD(redis).clear(f"{RedisInitKeyConfig.VAR_DICT.key}:{obj.crane_no}")
  159. return BizVarDictOutSchema.model_validate(obj).model_dump()
  160. @classmethod
  161. async def delete_vardict_service(cls, auth: AuthSchema, ids: list[int],redis: Redis) -> None:
  162. """删除"""
  163. if len(ids) < 1:
  164. raise CustomException(msg='删除失败,删除对象不能为空')
  165. crane_nos = []
  166. for id in ids:
  167. obj = await BizVarDictCRUD(auth).get_by_id_vardict_crud(id=id)
  168. if not obj:
  169. raise CustomException(msg=f'删除失败,ID为{id}的数据不存在')
  170. crane_nos.append(obj.crane_no)
  171. await BizVarDictCRUD(auth).delete_vardict_crud(ids=ids)
  172. # 更新缓存中数据
  173. for crane_no in crane_nos:
  174. await RedisCURD(redis).clear(f"{RedisInitKeyConfig.VAR_DICT.key}:{crane_no}")
  175. @classmethod
  176. async def set_availale_vardict_service(cls, auth: AuthSchema, data: BatchSetAvailable,redis: Redis) -> None:
  177. crane_nos = []
  178. for id in data.ids:
  179. obj = await BizVarDictCRUD(auth).get_by_id_vardict_crud(id=id)
  180. if not obj:
  181. raise CustomException(msg=f'批量设置失败,ID为{id}的数据不存在')
  182. crane_nos.append(obj.crane_no)
  183. """批量设置状态"""
  184. await BizVarDictCRUD(auth).set_available_vardict_crud(ids=data.ids, status=data.status)
  185. # 更新缓存中数据
  186. for crane_no in crane_nos:
  187. await RedisCURD(redis).clear(f"{RedisInitKeyConfig.VAR_DICT.key}:{crane_no}")
  188. @classmethod
  189. async def batch_export_vardict_service(cls, obj_list: list[dict]) -> bytes:
  190. """批量导出"""
  191. mapping_dict = {
  192. 'id': 'id ',
  193. 'crane_no': '',
  194. 'var_code': '变量code',
  195. 'var_name': '变量名称',
  196. 'mec_type': '所属机构',
  197. 'data_type': '数据类型',
  198. 'switch_type': '变量类型',
  199. 'addr': 'modbus地址',
  200. 'gateway_id': '网关',
  201. 'var_sort': '排序',
  202. 'var_group': '变量分组',
  203. 'var_category': '变量分类',
  204. 'translate': '绑定公式',
  205. 'device_no': '关联设备编号 ',
  206. 'is_reverse': '是否取反',
  207. 'is_top_show': '是否重点显示',
  208. 'is_save': '是否生成',
  209. 'is_calibration': '是否标定',
  210. 'is_overview_top_show': '是否首页重点显示',
  211. 'is_home_page_show': '是否首页显示',
  212. 'is_diagnose': '是否启用诊断专家',
  213. 'is_upload': '是否上传云平台',
  214. 'diagnosis_id': '关联诊断专家',
  215. 'status': '是否启用',
  216. 'description': '备注/描述',
  217. 'create_time': '创建时间 ',
  218. 'updated_time': '更新时间',
  219. 'created_id': '创建人ID',
  220. 'updated_id': '更新人ID',
  221. }
  222. data = obj_list.copy()
  223. for item in data:
  224. # 状态转换
  225. if 'status' in item:
  226. item['status'] = '启用' if item.get('status') == '0' else '停用'
  227. # 创建者转换
  228. creator_info = item.get('creator')
  229. if isinstance(creator_info, dict):
  230. item['creator'] = creator_info.get('name', '未知')
  231. elif creator_info is None:
  232. item['creator'] = '未知'
  233. return ExcelUtil.export_list2excel(list_data=data, mapping_dict=mapping_dict)
  234. @classmethod
  235. async def batch_import_vardict_service(cls, auth: AuthSchema, file: UploadFile, update_support: bool = False) -> str:
  236. """批量导入"""
  237. header_dict = {
  238. 'id ': 'id',
  239. '': 'crane_no',
  240. '变量code': 'var_code',
  241. '变量名称': 'var_name',
  242. '所属机构': 'mec_type',
  243. '数据类型': 'data_type',
  244. '变量类型': 'switch_type',
  245. 'modbus地址': 'addr',
  246. '网关': 'gateway_id',
  247. '排序': 'var_sort',
  248. '变量分组': 'var_group',
  249. '变量分类': 'var_category',
  250. '绑定公式': 'translate',
  251. '关联设备编号 ': 'device_no',
  252. '是否取反': 'is_reverse',
  253. '是否重点显示': 'is_top_show',
  254. '是否生成': 'is_save',
  255. '是否标定': 'is_calibration',
  256. '是否首页重点显示': 'is_overview_top_show',
  257. '是否首页显示': 'is_home_page_show',
  258. '是否启用诊断专家': 'is_diagnose',
  259. '是否上传云平台': 'is_upload',
  260. '关联诊断专家': 'diagnosis_id',
  261. '是否启用': 'status',
  262. '备注/描述': 'description',
  263. '创建时间 ': 'create_time',
  264. '更新时间': 'updated_time',
  265. '创建人ID': 'created_id',
  266. '更新人ID': 'updated_id',
  267. }
  268. try:
  269. contents = await file.read()
  270. df = pd.read_excel(io.BytesIO(contents))
  271. await file.close()
  272. if df.empty:
  273. raise CustomException(msg="导入文件为空")
  274. missing_headers = [header for header in header_dict.keys() if header not in df.columns]
  275. if missing_headers:
  276. raise CustomException(msg=f"导入文件缺少必要的列: {', '.join(missing_headers)}")
  277. df.rename(columns=header_dict, inplace=True)
  278. # 验证必填字段
  279. error_msgs = []
  280. success_count = 0
  281. count = 0
  282. for index, row in df.iterrows():
  283. count += 1
  284. try:
  285. data = {
  286. "id": row['id'],
  287. "crane_no": row['crane_no'],
  288. "var_code": row['var_code'],
  289. "var_name": row['var_name'],
  290. "mec_type": row['mec_type'],
  291. "data_type": row['data_type'],
  292. "switch_type": row['switch_type'],
  293. "addr": row['addr'],
  294. "gateway_id": row['gateway_id'],
  295. "var_sort": row['var_sort'],
  296. "var_group": row['var_group'],
  297. "var_category": row['var_category'],
  298. "translate": row['translate'],
  299. "device_no": row['device_no'],
  300. "is_reverse": row['is_reverse'],
  301. "is_top_show": row['is_top_show'],
  302. "is_save": row['is_save'],
  303. "is_calibration": row['is_calibration'],
  304. "is_overview_top_show": row['is_overview_top_show'],
  305. "is_home_page_show": row['is_home_page_show'],
  306. "is_diagnose": row['is_diagnose'],
  307. "is_upload": row['is_upload'],
  308. "diagnosis_id": row['diagnosis_id'],
  309. "status": row['status'],
  310. "description": row['description'],
  311. "create_time": row['create_time'],
  312. "updated_time": row['updated_time'],
  313. "created_id": row['created_id'],
  314. "updated_id": row['updated_id'],
  315. }
  316. # 使用CreateSchema做校验后入库
  317. create_schema = BizVarDictCreateSchema.model_validate(data)
  318. # 检查唯一性约束
  319. await BizVarDictCRUD(auth).create_vardict_crud(data=create_schema)
  320. success_count += 1
  321. except Exception as e:
  322. error_msgs.append(f"第{count}行: {str(e)}")
  323. continue
  324. result = f"成功导入 {success_count} 条数据"
  325. if error_msgs:
  326. result += "\n错误信息:\n" + "\n".join(error_msgs)
  327. return result
  328. except Exception as e:
  329. log.error(f"批量导入失败: {str(e)}")
  330. raise CustomException(msg=f"导入失败: {str(e)}")
  331. @classmethod
  332. async def import_template_download_vardict_service(cls) -> bytes:
  333. """下载导入模板"""
  334. header_list = [
  335. 'id ',
  336. '',
  337. '变量code',
  338. '变量名称',
  339. '所属机构',
  340. '数据类型',
  341. '变量类型',
  342. 'modbus地址',
  343. '网关',
  344. '排序',
  345. '变量分组',
  346. '变量分类',
  347. '绑定公式',
  348. '关联设备编号 ',
  349. '是否取反',
  350. '是否重点显示',
  351. '是否生成',
  352. '是否标定',
  353. '是否首页重点显示',
  354. '是否首页显示',
  355. '是否启用诊断专家',
  356. '是否上传云平台',
  357. '关联诊断专家',
  358. '是否启用',
  359. '备注/描述',
  360. '创建时间 ',
  361. '更新时间',
  362. '创建人ID',
  363. '更新人ID',
  364. ]
  365. selector_header_list = []
  366. option_list = []
  367. # 添加下拉选项
  368. selector_header_list.append('所属机构')
  369. option_list.append({'所属机构': []})
  370. selector_header_list.append('数据类型')
  371. option_list.append({'数据类型': []})
  372. selector_header_list.append('变量类型')
  373. option_list.append({'变量类型': []})
  374. selector_header_list.append('变量分类')
  375. option_list.append({'变量分类': []})
  376. return ExcelUtil.get_excel_template(
  377. header_list=header_list,
  378. selector_header_list=selector_header_list,
  379. option_list=option_list
  380. )
  381. @classmethod
  382. async def get_vardict_group_service(cls, auth: AuthSchema,redis: Redis,crane_no: str):
  383. """
  384. 从缓存获取变量分组数据列表信息service
  385. 参数:
  386. - redis (Redis): Redis客户端
  387. - id (int): 行车id
  388. 返回:
  389. - list[dict]: 变量分组数据列表
  390. """
  391. try:
  392. redis_key = f"{RedisInitKeyConfig.VAR_DICT.key}:{crane_no}"
  393. obj_list_dict = await RedisCURD(redis).get(redis_key)
  394. # 确保返回数据正确序列化
  395. if obj_list_dict:
  396. if isinstance(obj_list_dict, str):
  397. try:
  398. return json.loads(obj_list_dict)
  399. except json.JSONDecodeError:
  400. log.warning(f"变量分组数据反序列化失败,尝试重新初始化缓存: {'行车:'+crane_no}")
  401. elif isinstance(obj_list_dict, list):
  402. return obj_list_dict
  403. # 缓存不存在或格式错误时重新初始化
  404. await cls.init_vardict_service(redis,crane_no=crane_no)
  405. obj_list_dict = await RedisCURD(redis).get(redis_key)
  406. if not obj_list_dict:
  407. raise CustomException(msg="变量分组数据不存在")
  408. # 再次确保返回数据正确序列化
  409. if isinstance(obj_list_dict, str):
  410. try:
  411. return json.loads(obj_list_dict)
  412. except json.JSONDecodeError:
  413. raise CustomException(msg="变量分组数据格式错误")
  414. return obj_list_dict
  415. except CustomException:
  416. raise
  417. except Exception as e:
  418. log.error(f"获取变量分组数据缓存失败: {str(e)}")
  419. raise CustomException(msg=f"获取变量分组数据失败: {str(e)}")
  420. @classmethod
  421. async def get_vardict_alarms_service(cls, auth: AuthSchema, redis: Redis):
  422. """
  423. 从缓存获取变量分组数据列表信息service
  424. 参数:
  425. - redis (Redis): Redis客户端
  426. - id (int): 行车id
  427. 返回:
  428. - list[dict]: 变量分组数据列表
  429. """
  430. try:
  431. redis_key = f"{RedisInitKeyConfig.VAR_DICT.key}:alarms_all"
  432. obj_list_dict = await RedisCURD(redis).get(redis_key)
  433. # 确保返回数据正确序列化
  434. if obj_list_dict:
  435. if isinstance(obj_list_dict, str):
  436. try:
  437. return json.loads(obj_list_dict)
  438. except json.JSONDecodeError:
  439. log.warning(f"变量报警数据反序列化失败,尝试重新初始化缓存")
  440. elif isinstance(obj_list_dict, list):
  441. return obj_list_dict
  442. # 缓存不存在或格式错误时重新初始化
  443. await cls.init_vardict_service(redis)
  444. obj_list_dict = await RedisCURD(redis).get(redis_key)
  445. if not obj_list_dict:
  446. raise CustomException(msg="变量报警数据不存在")
  447. # 再次确保返回数据正确序列化
  448. if isinstance(obj_list_dict, str):
  449. try:
  450. return json.loads(obj_list_dict)
  451. except json.JSONDecodeError:
  452. raise CustomException(msg="变量报警数据格式错误")
  453. return obj_list_dict
  454. except CustomException:
  455. raise
  456. except Exception as e:
  457. log.error(f"获取变量报警数据缓存失败: {str(e)}")
  458. raise CustomException(msg=f"获取变量报警数据失败: {str(e)}")
  459. @classmethod
  460. async def init_vardict_service1(cls, redis: Redis,crane_no:str = None):
  461. """
  462. 应用初始化: 获取所有天车变量数据信息并缓存service
  463. 参数:
  464. - redis (Redis): Redis客户端
  465. 返回:
  466. - None
  467. """
  468. try:
  469. async with async_db_session() as session:
  470. async with session.begin():
  471. # 在初始化过程中,不需要检查数据权限
  472. auth = AuthSchema(db=session, check_data_scope=False)
  473. #初始化行车机构分组变量数据
  474. if crane_no:
  475. search = {'status':'1','crane_no':crane_no}
  476. else:
  477. search = {'status': '1'}
  478. crane_list = await BizCraneCRUD(auth).list(search=search,order_by=[{'order':'asc'}])
  479. success_count = 0
  480. fail_count = 0
  481. for crane in crane_list:
  482. crane_no = crane.crane_no
  483. crane_name = crane.crane_name
  484. try:
  485. varDictMecGroupSchemaList: list[VarDictMecGroupSchema] = []
  486. mec_list = await BizMecCRUD(auth).list(search={'crane_no':crane_no,'status':'1'},order_by=[{'sort':'asc'}])
  487. for mec in mec_list:
  488. # 获取分组数据
  489. mecVarDicts = await BizVarDictCRUD(auth).list(
  490. search={'crane_no': crane_no, 'mec_type': mec.mec_type, 'status': '1'},
  491. order_by=[{'var_sort': 'asc'}])
  492. if not mecVarDicts:
  493. continue
  494. alarmVarList = await BizVarDictCRUD(auth).list(search={'crane_no': crane_no,'mec_type':mec.mec_type, 'switch_type': ('>=','2'), 'status': '1'},order_by=[{'var_sort': 'asc'}])
  495. digitalVarList = await BizVarDictCRUD(auth).list(search={'crane_no':crane_no,'mec_type':mec.mec_type,'data_type':'1','status':'1'},order_by=[{'var_sort':'asc'}])
  496. analogVarList = await BizVarDictCRUD(auth).list(search={'crane_no': crane_no,'mec_type':mec.mec_type, 'data_type': ('!=', '1'), 'status': '1'},order_by=[{'var_sort': 'asc'}])
  497. varDictMecGroupSchemaList.append(
  498. VarDictMecGroupSchema(mec_type=mec.mec_type,
  499. mecVarList_simple=mecVarDicts,
  500. digital_varList=digitalVarList,
  501. analog_varList=analogVarList,
  502. alarm_varList=alarmVarList))
  503. # 保存到Redis并设置过期时间
  504. redis_key = f"{RedisInitKeyConfig.VAR_DICT.key}:{crane_no}"
  505. var_dict_list = [item.model_dump() for item in varDictMecGroupSchemaList]
  506. value = json.dumps(var_dict_list, ensure_ascii=False)
  507. await RedisCURD(redis).set(
  508. key=redis_key,
  509. value=value,
  510. )
  511. success_count += 1
  512. log.info(f"✅ 机构分组变量数据缓存成功: {crane_name}")
  513. except Exception as e:
  514. fail_count += 1
  515. log.error(f"❌ 初始化机构分组变量数据失败 [{crane_name}]: {e}")
  516. log.info(f"机构分组变量数据初始化完成 - 成功: {success_count}, 失败: {fail_count}")
  517. #初始化所有行车报警变量数据
  518. try:
  519. varDicts = await cls.vardict_alarms_list(auth=auth)
  520. redis_key = f"{RedisInitKeyConfig.VAR_DICT.key}:alarms_all"
  521. value = json.dumps(varDicts, ensure_ascii=False)
  522. await RedisCURD(redis).set(
  523. key=redis_key,
  524. value=value,
  525. )
  526. log.info(f"✅ 报警变量数据缓存成功")
  527. except Exception as e:
  528. log.error(f"❌ 初始化报警变量数据失败: {e}")
  529. except Exception as e:
  530. log.error(f"变量数据初始化过程发生错误: {e}")
  531. # 只在严重错误时抛出异常,允许单个字典加载失败
  532. raise CustomException(msg=f"变量数据初始化失败: {str(e)}")
  533. import asyncio
  534. from typing import Dict, List
  535. @classmethod
  536. async def init_vardict_service(cls, redis: Redis, crane_no: str = None):
  537. """
  538. 应用初始化: 获取所有天车变量数据信息并缓存service(性能优化版)
  539. 参数:
  540. - redis (Redis): Redis客户端
  541. - crane_no: 指定天车编号,为空则处理所有
  542. 返回:
  543. - None
  544. """
  545. try:
  546. async with async_db_session() as session:
  547. async with session.begin():
  548. auth = AuthSchema(db=session, check_data_scope=False)
  549. # 1. 批量查询所有天车基础数据(原逻辑不变)
  550. search = {'status': '1'}
  551. if crane_no:
  552. search['crane_no'] = crane_no
  553. crane_list = await BizCraneCRUD(auth).list(
  554. search=search,
  555. order_by=[{'order': 'asc'}]
  556. )
  557. success_count = 0
  558. fail_count = 0
  559. # 2. 并发处理每个crane的变量数据(核心优化:并发替代串行)
  560. # 定义单个crane的处理函数
  561. async def process_single_crane(crane):
  562. nonlocal success_count, fail_count
  563. crane_no = crane.crane_no
  564. crane_name = crane.crane_name
  565. try:
  566. # 2.1 批量查询当前crane下的所有机构数据(一次查询)
  567. mec_list = await BizMecCRUD(auth).list(
  568. search={'crane_no': crane_no, 'status': '1'},
  569. order_by=[{'sort': 'asc'}]
  570. )
  571. if not mec_list:
  572. log.info(f"⚠️ {crane_name} 无机构数据,跳过缓存")
  573. return
  574. # 2.2 批量查询当前crane下的所有VarDict数据(关键优化:1次查询替代4×mec_count次)
  575. var_dicts_all = await BizVarDictCRUD(auth).list(
  576. search={'crane_no': crane_no, 'status': '1'},
  577. order_by=[{'var_sort': 'asc'}]
  578. )
  579. # 构建mec_type到var数据的映射(内存分类,替代多次查询)
  580. var_dict_by_mec: Dict[str, List] = {}
  581. for var in var_dicts_all:
  582. mec_type = str(var.mec_type)
  583. if mec_type not in var_dict_by_mec:
  584. var_dict_by_mec[mec_type] = []
  585. var_dict_by_mec[mec_type].append(var)
  586. # 2.3 遍历机构,内存分类数据
  587. varDictMecGroupSchemaList: list[VarDictMecGroupSchema] = []
  588. for mec in mec_list:
  589. mec_type = mec.mec_type
  590. # 获取当前机构的所有var数据(无则跳过)
  591. mec_vars = var_dict_by_mec.get(mec_type, [])
  592. if not mec_vars:
  593. continue
  594. # 内存中按条件分类(替代4次数据库查询)
  595. alarmVarList = [v for v in mec_vars if v.data_type == 1 and v.switch_type >= 2 ]
  596. digitalVarList = [v for v in mec_vars if v.data_type == 1]
  597. analogVarList = [v for v in mec_vars if v.data_type != 1]
  598. varDictMecGroupSchemaList.append(
  599. VarDictMecGroupSchema(
  600. mec_type=mec_type,
  601. mecVarList_simple=mec_vars, # 原mecVarDicts就是当前mec的所有var
  602. digital_varList=digitalVarList,
  603. analog_varList=analogVarList,
  604. alarm_varList=alarmVarList
  605. )
  606. )
  607. # 2.4 写入Redis(原逻辑不变)
  608. redis_key = f"{RedisInitKeyConfig.VAR_DICT.key}:{crane_no}"
  609. var_dict_list = [item.model_dump() for item in varDictMecGroupSchemaList]
  610. value = json.dumps(var_dict_list, ensure_ascii=False)
  611. await RedisCURD(redis).set(key=redis_key, value=value)
  612. success_count += 1
  613. log.info(f"✅ 机构分组变量数据缓存成功: {crane_name}")
  614. except Exception as e:
  615. fail_count += 1
  616. log.error(f"❌ 初始化机构分组变量数据失败 [{crane_name}]: {e}")
  617. # 3. 并发执行所有crane的处理(控制并发数,避免数据库连接池耗尽)
  618. # 建议并发数=数据库连接池大小(比如10),防止连接数超限
  619. semaphore = asyncio.Semaphore(10)
  620. async def bounded_process(crane):
  621. async with semaphore:
  622. await process_single_crane(crane)
  623. # 并发执行
  624. await asyncio.gather(*[bounded_process(crane) for crane in crane_list])
  625. log.info(f"机构分组变量数据初始化完成 - 成功: {success_count}, 失败: {fail_count}")
  626. # 4. 初始化所有行车报警变量数据(原逻辑不变,可按需优化)
  627. try:
  628. varDicts = await cls.vardict_alarms_list(auth=auth)
  629. redis_key = f"{RedisInitKeyConfig.VAR_DICT.key}:alarms_all"
  630. value = json.dumps(varDicts, ensure_ascii=False)
  631. await RedisCURD(redis).set(key=redis_key, value=value)
  632. log.info(f"✅ 报警变量数据缓存成功")
  633. except Exception as e:
  634. log.error(f"❌ 初始化报警变量数据失败: {e}")
  635. except Exception as e:
  636. log.error(f"变量数据初始化过程发生错误: {e}")
  637. raise CustomException(msg=f"变量数据初始化失败: {str(e)}")
  638. @classmethod
  639. async def get_tdengine_data(cls, auth: AuthSchema, page_no: int, page_size: int,stable_name:str,
  640. search: BizVarDictQueryParam | None = None) -> dict:
  641. var_dict_search_dict = {'crane_no':search.crane_no,'data_type':search.data_type,'mec_type':search.mec_type,'var_code':search.var_code}
  642. offset = (page_no - 1) * page_size
  643. base_sql = "SELECT * FROM "+stable_name
  644. filter_conditions = []
  645. crane_no = search.crane_no
  646. if crane_no:
  647. safe_crane_no = crane_no.strip().replace("'", "''")
  648. filter_conditions.append(f"crane_no = '{safe_crane_no}'")
  649. mec_var_dict = await BizVarDictCRUD(auth).list(search=var_dict_search_dict)
  650. var_codes = [item.var_code for item in mec_var_dict if item.var_code]
  651. if var_codes:
  652. var_codes_str = "','".join(var_codes)
  653. filter_conditions.append(f"var_code IN ('{var_codes_str}')")
  654. else:
  655. return {
  656. "page_no": page_no,
  657. "page_size": page_size,
  658. "total": 0,
  659. "has_next": False,
  660. "items": []
  661. }
  662. # 4. 过滤条件2:created_time时间范围(新增核心逻辑)
  663. created_time = search.created_time
  664. if created_time and isinstance(created_time, tuple) and len(created_time) == 2:
  665. # 解析between条件:格式为('between', (start_time, end_time))
  666. condition_type, time_range = created_time
  667. if condition_type == "between" and isinstance(time_range, (list, tuple)) and len(time_range) == 2:
  668. start_time, end_time = time_range
  669. # 校验时间类型并格式化为TDengine支持的字符串
  670. if isinstance(start_time, datetime) and isinstance(end_time, datetime):
  671. # 格式化时间为"YYYY-MM-DD HH:MM:SS"(匹配TDengine的时间格式)
  672. start_str = start_time.strftime("%Y-%m-%d %H:%M:%S")
  673. end_str = end_time.strftime("%Y-%m-%d %H:%M:%S")
  674. # 防SQL注入:转义单引号(虽然时间格式不会有,但做兜底)
  675. safe_start = start_str.replace("'", "''")
  676. safe_end = end_str.replace("'", "''")
  677. # 添加时间范围条件(TDengine的ts字段对应创建时间)
  678. filter_conditions.append(f"ts BETWEEN '{safe_start}' AND '{safe_end}'")
  679. # 5. 拼接WHERE子句
  680. where_clause = " WHERE " + " AND ".join(filter_conditions) if filter_conditions else ""
  681. # 6. 构建完整查询SQL(排序+分页)
  682. if page_size == 1000: #历史曲线用
  683. query_sql = f"{base_sql}{where_clause} ORDER BY ts DESC"
  684. else:
  685. query_sql = f"{base_sql}{where_clause} ORDER BY ts DESC LIMIT {offset}, {page_size}"
  686. rest_result = await tdengine_rest_query(query_sql)
  687. formatted_data = await format_rest_result(rest_result)
  688. #查找var_name
  689. varDicts = await BizVarDictCRUD(auth).list(search=var_dict_search_dict)
  690. if formatted_data:
  691. for item in formatted_data:
  692. normal_time = item.get('ts').replace('T', ' ').replace('+08:00', '')
  693. item['ts'] = normal_time
  694. for varDict in varDicts:
  695. if item.get('var_code') == varDict.var_code:
  696. item['var_name'] = varDict.var_name
  697. break
  698. total = await get_table_total_count(stable_name, where_clause)
  699. return {
  700. "page_no": page_no,
  701. "page_size": page_size,
  702. "total": total,
  703. "has_next": offset + page_size < total,
  704. "items": formatted_data
  705. }
  706. # 定义常量(解决魔法值问题)
  707. CONST_VAL_TRIGGER = 1 # 触发值
  708. CONST_VAL_RECOVER = 0 # 恢复值
  709. CONST_ORDER_ASC = "ASC"
  710. CONST_ORDER_DESC = "DESC"
  711. CONST_UNRECOVERED = "未恢复"
  712. CONST_TIME_FORMAT = "%Y-%m-%d %H:%M:%S"
  713. # 工具函数:封装时间格式转换
  714. def format_timestamp(ts_str: Optional[str]) -> Optional[str]:
  715. """标准化时间格式(去除T和+08:00)"""
  716. if not ts_str:
  717. return None
  718. return ts_str.replace('T', ' ').replace('+08:00', '')
  719. # 工具函数:安全转义SQL字符串(封装重复的防注入逻辑)
  720. def escape_sql_str(s: Optional[str]) -> str:
  721. """安全转义SQL字符串,防止注入"""
  722. if not s:
  723. return ""
  724. return str(s).strip().replace("'", "''")
  725. # 工具函数:构建var_code到var_name的映射(解决嵌套循环匹配问题)
  726. def build_var_code_mapping(varDicts: List[any]) -> Dict[str, str]:
  727. """构建var_code→var_name的字典,提升匹配效率"""
  728. return {
  729. item.var_code: item.var_name
  730. for item in varDicts
  731. if hasattr(item, 'var_code') and item.var_code
  732. }
  733. @classmethod
  734. async def _query_single_record(cls, stable_name: str, var_code: str, crane_no: str,
  735. time_condition: str, val: int, order: str = CONST_ORDER_ASC) -> Optional[str]:
  736. """
  737. 精准查询单条关键记录(性能最优)
  738. :param time_condition: 时间条件字符串,如 "ts > '2026-01-18 10:30:00'"
  739. :param val: 要查询的val值(0/1)
  740. :param order: 排序方式(ASC/DESC)
  741. :return: 符合条件的第一条ts(北京时间),无则返回None
  742. """
  743. # 优化:使用封装的转义函数,减少重复代码
  744. safe_stable = cls.escape_sql_str(stable_name)
  745. safe_var = cls.escape_sql_str(var_code)
  746. safe_crane = cls.escape_sql_str(crane_no)
  747. # 严格类型校验
  748. try:
  749. safe_val = int(val)
  750. except (ValueError, TypeError):
  751. return None
  752. # 校验排序参数,防止非法值
  753. safe_order = cls.CONST_ORDER_ASC if order.upper() != cls.CONST_ORDER_DESC else cls.CONST_ORDER_DESC
  754. # 优化:SQL格式化更简洁,减少空格
  755. sql = f"""
  756. SELECT ts AS ts_cn
  757. FROM {safe_stable}
  758. WHERE
  759. crane_no = '{safe_crane}'
  760. AND var_code = '{safe_var}'
  761. AND val = {safe_val}
  762. AND {time_condition}
  763. ORDER BY ts {safe_order}
  764. LIMIT 1;
  765. """
  766. try:
  767. rest_result = await tdengine_rest_query(sql)
  768. formatted_result = await format_rest_result(rest_result)
  769. if formatted_result and len(formatted_result) > 0:
  770. ts_cn = cls.format_timestamp(formatted_result[0].get("ts_cn")) # 复用工具函数
  771. return ts_cn
  772. return None
  773. # 优化:捕获具体异常,而非泛化Exception,便于排查问题
  774. except Exception as e:
  775. # 建议添加日志:logger.error(f"查询单条记录失败: {e}, SQL: {sql}")
  776. return None
  777. @classmethod
  778. async def calc_switch_batch(cls, raw_formatted_data: List[Dict],
  779. query_start: Optional[datetime] = None,
  780. query_end: Optional[datetime] = None,
  781. stable_name: Optional[str] = None) -> List[Dict]:
  782. if not raw_formatted_data:
  783. return []
  784. # 优化1:使用defaultdict简化分组逻辑,减少判断
  785. point_groups: Dict[str, List[Dict]] = defaultdict(list)
  786. for item in raw_formatted_data:
  787. var_code = item.get("var_code")
  788. crane_no = item.get("crane_no")
  789. if not var_code or not crane_no:
  790. continue
  791. # 优化:捕获具体异常(ValueError),而非泛化except
  792. try:
  793. item["val"] = int(item.get("val", cls.CONST_VAL_RECOVER))
  794. except ValueError:
  795. item["val"] = cls.CONST_VAL_RECOVER
  796. group_key = f"{crane_no}_{var_code}"
  797. point_groups[group_key].append(item)
  798. batch_list = []
  799. # 遍历每个点位
  800. for group_key, point_records in point_groups.items():
  801. # 优化:增加异常处理,防止group_key格式错误
  802. try:
  803. crane_no, var_code = group_key.split("_", 1)
  804. except ValueError:
  805. # 日志:logger.warning(f"无效的分组键: {group_key}")
  806. continue
  807. # 关键修复:必须排序!原代码注释掉了排序,会导致触发/恢复判断完全错误
  808. point_records.sort(key=lambda x: x["ts"])
  809. # 优化:直接取第一个item的var_name,无则用默认值
  810. var_name = point_records[0].get("var_name", f"未知点位({var_code})")
  811. # 提取触发/恢复事件(0→1/1→0)
  812. is_all_0 = True
  813. trigger_times = [] # 触发时间列表
  814. recover_times = [] # 恢复时间列表
  815. prev_val = None
  816. for record in point_records:
  817. current_val = record["val"]
  818. current_ts = record["ts"]
  819. if prev_val is None:
  820. prev_val = current_val
  821. continue
  822. # 优化:使用常量替代魔法值,提升可读性
  823. if prev_val == cls.CONST_VAL_RECOVER and current_val == cls.CONST_VAL_TRIGGER:
  824. trigger_times.append(current_ts)
  825. elif prev_val == cls.CONST_VAL_TRIGGER and current_val == cls.CONST_VAL_RECOVER:
  826. recover_times.append(current_ts)
  827. prev_val = current_val
  828. if current_val == cls.CONST_VAL_TRIGGER:
  829. is_all_0 = False
  830. # ---------------------- 无触发记录(全0)→ 返回空 ----------------------
  831. if is_all_0:
  832. continue
  833. # ---------------------- 第一条是触发追溯真实触发时间 ----------------------
  834. if point_records[0].get('val') == cls.CONST_VAL_TRIGGER:
  835. if not (query_start and stable_name):
  836. continue
  837. start_str = query_start.strftime(cls.CONST_TIME_FORMAT)
  838. recover_before_start = await cls._query_single_record(
  839. stable_name, var_code, crane_no,
  840. f"ts < '{cls.escape_sql_str(start_str)}'",
  841. cls.CONST_VAL_RECOVER, cls.CONST_ORDER_DESC
  842. )
  843. real_trigger = point_records[0]["ts"]
  844. if recover_before_start:
  845. trigger_after_recover = await cls._query_single_record(
  846. stable_name, var_code, crane_no,
  847. f"ts > '{cls.escape_sql_str(recover_before_start)}'",
  848. cls.CONST_VAL_TRIGGER, cls.CONST_ORDER_ASC
  849. )
  850. if trigger_after_recover:
  851. real_trigger = trigger_after_recover
  852. trigger_times.insert(0, real_trigger)
  853. # ---------------------- 最后一条是触发追溯真实恢复时间 ----------------------
  854. if point_records[-1].get('val') == cls.CONST_VAL_TRIGGER:
  855. if not (query_end and stable_name):
  856. continue
  857. end_str = query_end.strftime(cls.CONST_TIME_FORMAT)
  858. recover_after_end = await cls._query_single_record(
  859. stable_name, var_code, crane_no,
  860. f"ts > '{cls.escape_sql_str(end_str)}'",
  861. cls.CONST_VAL_RECOVER, cls.CONST_ORDER_ASC
  862. )
  863. if recover_after_end:
  864. recover_times.append(recover_after_end)
  865. # ---------------------- 匹配 ----------------------
  866. min_len = min(len(trigger_times), len(recover_times))
  867. for i in range(min_len):
  868. batch_list.append({
  869. "var_name": var_name,
  870. "val": cls.CONST_VAL_RECOVER, # 已恢复
  871. "str_time": trigger_times[i],
  872. "end_time": recover_times[i]
  873. })
  874. # 不匹配的情况只可能是触发没恢复
  875. if len(trigger_times) > len(recover_times):
  876. batch_list.append({
  877. "var_name": var_name,
  878. "val": cls.CONST_VAL_TRIGGER, # 触发中
  879. "str_time": trigger_times[-1],
  880. "end_time": cls.CONST_UNRECOVERED
  881. })
  882. # 最终排序
  883. batch_list.sort(key=lambda x: x["str_time"], reverse=True)
  884. return batch_list
  885. @classmethod
  886. async def get_tdengine_data_operation(cls, auth: AuthSchema, stable_name: str,
  887. search: BizVarDictQueryParam | None = None) -> dict:
  888. if not search:
  889. return {"page_no": 0, "page_size": 0, "total": 0, "has_next": 0, "items": []}
  890. # 优化:初始化查询参数,减少重复判断
  891. var_dict_search_dict = {
  892. 'crane_no': search.crane_no,
  893. 'data_type': search.data_type,
  894. 'mec_type': search.mec_type,
  895. 'switch_type': search.switch_type,
  896. 'var_code': search.var_code
  897. }
  898. filter_conditions = []
  899. query_start: Optional[datetime] = None
  900. query_end: Optional[datetime] = None
  901. # 1. 过滤条件:crane_no
  902. if search.crane_no:
  903. filter_conditions.append(f"crane_no = '{cls.escape_sql_str(search.crane_no)}'")
  904. # 2. 过滤条件:mec_type对应的var_code
  905. varDicts = await BizVarDictCRUD(auth).list(search=var_dict_search_dict)
  906. var_code_map = cls.build_var_code_mapping(varDicts) # 构建映射字典
  907. if search.mec_type and var_code_map:
  908. var_codes_str = "','".join(cls.escape_sql_str(code) for code in var_code_map.keys())
  909. filter_conditions.append(f"var_code IN ('{var_codes_str}')")
  910. # 3. 过滤条件:created_time时间范围
  911. if search.created_time and isinstance(search.created_time, tuple) and len(search.created_time) == 2:
  912. condition_type, time_range = search.created_time
  913. if condition_type == "between" and isinstance(time_range, (list, tuple)) and len(time_range) == 2:
  914. query_start, query_end = time_range
  915. if isinstance(query_start, datetime) and isinstance(query_end, datetime):
  916. start_str = query_start.strftime(cls.CONST_TIME_FORMAT)
  917. end_str = query_end.strftime(cls.CONST_TIME_FORMAT)
  918. filter_conditions.append(
  919. f"ts BETWEEN '{cls.escape_sql_str(start_str)}' AND '{cls.escape_sql_str(end_str)}'")
  920. # 4. 拼接WHERE子句
  921. where_clause = " WHERE " + " AND ".join(filter_conditions) if filter_conditions else ""
  922. # 5. 查询原始数据(优化:SQL更简洁,排序仅保留必要的)
  923. query_sql = f"SELECT * FROM {cls.escape_sql_str(stable_name)}{where_clause} ORDER BY ts " + cls.CONST_ORDER_ASC
  924. rest_result = await tdengine_rest_query(query_sql)
  925. formatted_data = await format_rest_result(rest_result)
  926. # 6. 匹配var_name(优化:用字典映射替代嵌套循环,O(n)→O(1))
  927. if formatted_data:
  928. for item in formatted_data:
  929. item['ts'] = cls.format_timestamp(item.get('ts')) # 复用工具函数
  930. # 优化:字典查找,无需遍历所有varDicts
  931. item['var_name'] = var_code_map.get(item.get('var_code'), f"未知点位({item.get('var_code')})")
  932. # 7. 调用批量计算方法
  933. batch_result = await cls.calc_switch_batch(
  934. raw_formatted_data=formatted_data,
  935. query_start=query_start,
  936. query_end=query_end,
  937. stable_name=stable_name
  938. )
  939. # 8. 返回结果
  940. return {
  941. "page_no": 0,
  942. "page_size": 0,
  943. "total": len(batch_result),
  944. "has_next": 0,
  945. "items": batch_result
  946. }