service.py 31 KB


  1. # -*- coding: utf-8 -*-
  2. import asyncio
  3. import io
  4. import json
  5. from datetime import datetime
  6. from typing import Any
  7. from fastapi import UploadFile
  8. import pandas as pd
  9. from redis.asyncio.client import Redis
  10. from app.core.database import async_db_session
  11. from app.core.redis_crud import RedisCURD
  12. from app.common.enums import RedisInitKeyConfig
  13. from app.core.base_schema import BatchSetAvailable
  14. from app.core.exceptions import CustomException
  15. from app.utils.excel_util import ExcelUtil
  16. from app.core.logger import log
  17. from app.api.v1.module_system.auth.schema import AuthSchema
  18. from .schema import BizVarDictCreateSchema, BizVarDictUpdateSchema, BizVarDictOutSchema, BizVarDictQueryParam
  19. from ..crane.crud import BizCraneCRUD
  20. from ..crane.model import BizCraneModel
  21. from ..gateway.crud import GatewayCRUD
  22. from ..gateway.model import GatewayModel
  23. from ..mec.crud import BizMecCRUD
  24. from ..vardict.crud import BizVarDictCRUD
  25. from ..vardict.schema import VarDictMecGroupSchema
  26. from app.utils.tdengine_util import tdengine_rest_query, format_rest_result, get_table_total_count
  27. class BizVarDictService:
  28. """
  29. 变量信息服务层
  30. """
  31. @classmethod
  32. async def detail_vardict_service(cls, auth: AuthSchema, id: int) -> dict:
  33. """详情"""
  34. obj = await BizVarDictCRUD(auth).get_by_id_vardict_crud(id=id)
  35. if not obj:
  36. raise CustomException(msg="该数据不存在")
  37. crane = await BizCraneCRUD(auth).get_by_id_crane_crud_for_no(obj.crane_no)
  38. gateway = await GatewayCRUD(auth).get_by_id_gateway_crud(obj.gateway_id)
  39. res = BizVarDictOutSchema.model_validate(obj).model_dump()
  40. res['crane_name'] = crane.crane_name
  41. res['gateway_name'] = gateway.gateway_name if gateway else ""
  42. return res
  43. @classmethod
  44. async def list_vardict_service(cls, auth: AuthSchema, search: BizVarDictQueryParam | None = None, order_by: list[dict] | None = None) -> list[dict]:
  45. """列表查询"""
  46. search_dict = search.__dict__ if search else None
  47. obj_list = await BizVarDictCRUD(auth).list_vardict_crud(search=search_dict, order_by=order_by)
  48. return [BizVarDictOutSchema.model_validate(obj).model_dump() for obj in obj_list]
  49. @classmethod
  50. async def page_vardict_service(cls, auth: AuthSchema, page_no: int, page_size: int, search: BizVarDictQueryParam | None = None, order_by: list[dict] | None = None) -> dict:
  51. """分页查询(数据库分页)"""
  52. search_dict = search.__dict__ if search else {}
  53. order_by_list = order_by or [{'id': 'asc'}]
  54. offset = (page_no - 1) * page_size
  55. result = await BizVarDictCRUD(auth).page_vardict_crud(
  56. offset=offset,
  57. limit=page_size,
  58. order_by=order_by_list,
  59. search=search_dict
  60. )
  61. for item in result.get('items'):
  62. crane_model: BizCraneModel | None = await BizCraneCRUD(auth).get_by_id_crane_crud_for_no(crane_no=item['crane_no'])
  63. gateway_model: GatewayModel | None = await GatewayCRUD(auth).get_by_id_gateway_crud(id=item['gateway_id'])
  64. item['crane_name'] = crane_model.crane_name
  65. item['gateway_name'] = gateway_model.gateway_name if gateway_model else ""
  66. return result
  67. @classmethod
  68. async def vardict_alarms_list(cls, auth: AuthSchema, crane_no: str = None) -> list[dict]:
  69. sql_parts = [
  70. """SELECT a.*,b.crane_name
  71. FROM biz_var_dict as a
  72. LEFT JOIN biz_crane as b ON a.crane_no = b.crane_no
  73. WHERE a.`status` = :status AND b.`status` = :status AND a.switch_type >= 2"""
  74. ]
  75. business_params: dict[str, Any] = {"status": 1}
  76. if crane_no and isinstance(crane_no, str) and crane_no.strip():
  77. valid_crane_no = crane_no.strip()
  78. sql_parts.append(f"AND a.crane_no = :crane_no")
  79. business_params["crane_no"] = valid_crane_no
  80. sql_parts.append("ORDER BY b.`order` asc,a.mec_type asc,a.var_sort asc")
  81. final_sql = " ".join(sql_parts)
  82. try:
  83. obj_list = await BizVarDictCRUD(auth).list_sql(final_sql, business_params)
  84. return [BizVarDictOutSchema.model_validate(obj).model_dump() for obj in obj_list]
  85. except Exception as e:
  86. raise CustomException(msg=f"查询变量字典报警列表失败:{str(e)}")
  87. @classmethod
  88. async def vardict_analog_list(cls, auth: AuthSchema, crane_no: str = None) -> list[dict]:
  89. sql_parts = [
  90. """SELECT a.*,b.crane_name
  91. FROM biz_var_dict as a
  92. LEFT JOIN biz_crane as b ON a.crane_no = b.crane_no
  93. WHERE a.`status` = :status AND b.`status` = :status AND a.data_type >= 2"""
  94. ]
  95. business_params: dict[str, Any] = {"status": 1}
  96. if crane_no and isinstance(crane_no, str) and crane_no.strip():
  97. valid_crane_no = crane_no.strip()
  98. sql_parts.append(f"AND a.crane_no = :crane_no")
  99. business_params["crane_no"] = valid_crane_no
  100. sql_parts.append("ORDER BY b.`order` asc,a.mec_type asc,a.var_sort asc")
  101. final_sql = " ".join(sql_parts)
  102. try:
  103. obj_list = await BizVarDictCRUD(auth).list_sql(final_sql, business_params)
  104. return [BizVarDictOutSchema.model_validate(obj).model_dump() for obj in obj_list]
  105. except Exception as e:
  106. raise CustomException(msg=f"查询变量字典模拟量列表失败:{str(e)}")
  107. @classmethod
  108. async def create_vardict_service(cls, auth: AuthSchema, data: BizVarDictCreateSchema,redis: Redis) -> dict:
  109. """创建"""
  110. # 检查唯一性约束
  111. obj = await BizVarDictCRUD(auth).create_vardict_crud(data=data)
  112. if obj:
  113. # 更新缓存中数据
  114. await RedisCURD(redis).clear(f"{RedisInitKeyConfig.VAR_DICT.key}:{data.crane_no}")
  115. return BizVarDictOutSchema.model_validate(obj).model_dump()
  116. @classmethod
  117. async def update_vardict_service(cls, auth: AuthSchema, id: int, data: BizVarDictUpdateSchema,redis: Redis) -> dict:
  118. """更新"""
  119. # 检查数据是否存在
  120. obj = await BizVarDictCRUD(auth).get_by_id_vardict_crud(id=id)
  121. if not obj:
  122. raise CustomException(msg='更新失败,该数据不存在')
  123. # 检查唯一性约束
  124. obj = await BizVarDictCRUD(auth).update_vardict_crud(id=id, data=data)
  125. if obj:
  126. # 更新缓存中数据
  127. await RedisCURD(redis).clear(f"{RedisInitKeyConfig.VAR_DICT.key}:{obj.crane_no}")
  128. return BizVarDictOutSchema.model_validate(obj).model_dump()
  129. @classmethod
  130. async def delete_vardict_service(cls, auth: AuthSchema, ids: list[int],redis: Redis) -> None:
  131. """删除"""
  132. if len(ids) < 1:
  133. raise CustomException(msg='删除失败,删除对象不能为空')
  134. crane_nos = []
  135. for id in ids:
  136. obj = await BizVarDictCRUD(auth).get_by_id_vardict_crud(id=id)
  137. if not obj:
  138. raise CustomException(msg=f'删除失败,ID为{id}的数据不存在')
  139. crane_nos.append(obj.crane_no)
  140. await BizVarDictCRUD(auth).delete_vardict_crud(ids=ids)
  141. # 更新缓存中数据
  142. for crane_no in crane_nos:
  143. await RedisCURD(redis).clear(f"{RedisInitKeyConfig.VAR_DICT.key}:{crane_no}")
  144. @classmethod
  145. async def set_availale_vardict_service(cls, auth: AuthSchema, data: BatchSetAvailable,redis: Redis) -> None:
  146. crane_nos = []
  147. for id in data.ids:
  148. obj = await BizVarDictCRUD(auth).get_by_id_vardict_crud(id=id)
  149. if not obj:
  150. raise CustomException(msg=f'批量设置失败,ID为{id}的数据不存在')
  151. crane_nos.append(obj.crane_no)
  152. """批量设置状态"""
  153. await BizVarDictCRUD(auth).set_available_vardict_crud(ids=data.ids, status=data.status)
  154. # 更新缓存中数据
  155. for crane_no in crane_nos:
  156. await RedisCURD(redis).clear(f"{RedisInitKeyConfig.VAR_DICT.key}:{crane_no}")
  157. @classmethod
  158. async def batch_export_vardict_service(cls, obj_list: list[dict]) -> bytes:
  159. """批量导出"""
  160. mapping_dict = {
  161. 'id': 'id ',
  162. 'crane_no': '',
  163. 'var_code': '变量code',
  164. 'var_name': '变量名称',
  165. 'mec_type': '所属机构',
  166. 'data_type': '数据类型',
  167. 'switch_type': '变量类型',
  168. 'addr': 'modbus地址',
  169. 'gateway_id': '网关',
  170. 'var_sort': '排序',
  171. 'var_group': '变量分组',
  172. 'var_category': '变量分类',
  173. 'translate': '绑定公式',
  174. 'device_no': '关联设备编号 ',
  175. 'is_reverse': '是否取反',
  176. 'is_top_show': '是否重点显示',
  177. 'is_save': '是否生成',
  178. 'is_calibration': '是否标定',
  179. 'is_overview_top_show': '是否首页重点显示',
  180. 'is_home_page_show': '是否首页显示',
  181. 'is_diagnose': '是否启用诊断专家',
  182. 'is_upload': '是否上传云平台',
  183. 'diagnosis_id': '关联诊断专家',
  184. 'status': '是否启用',
  185. 'description': '备注/描述',
  186. 'create_time': '创建时间 ',
  187. 'updated_time': '更新时间',
  188. 'created_id': '创建人ID',
  189. 'updated_id': '更新人ID',
  190. }
  191. data = obj_list.copy()
  192. for item in data:
  193. # 状态转换
  194. if 'status' in item:
  195. item['status'] = '启用' if item.get('status') == '0' else '停用'
  196. # 创建者转换
  197. creator_info = item.get('creator')
  198. if isinstance(creator_info, dict):
  199. item['creator'] = creator_info.get('name', '未知')
  200. elif creator_info is None:
  201. item['creator'] = '未知'
  202. return ExcelUtil.export_list2excel(list_data=data, mapping_dict=mapping_dict)
  203. @classmethod
  204. async def batch_import_vardict_service(cls, auth: AuthSchema, file: UploadFile, update_support: bool = False) -> str:
  205. """批量导入"""
  206. header_dict = {
  207. 'id ': 'id',
  208. '': 'crane_no',
  209. '变量code': 'var_code',
  210. '变量名称': 'var_name',
  211. '所属机构': 'mec_type',
  212. '数据类型': 'data_type',
  213. '变量类型': 'switch_type',
  214. 'modbus地址': 'addr',
  215. '网关': 'gateway_id',
  216. '排序': 'var_sort',
  217. '变量分组': 'var_group',
  218. '变量分类': 'var_category',
  219. '绑定公式': 'translate',
  220. '关联设备编号 ': 'device_no',
  221. '是否取反': 'is_reverse',
  222. '是否重点显示': 'is_top_show',
  223. '是否生成': 'is_save',
  224. '是否标定': 'is_calibration',
  225. '是否首页重点显示': 'is_overview_top_show',
  226. '是否首页显示': 'is_home_page_show',
  227. '是否启用诊断专家': 'is_diagnose',
  228. '是否上传云平台': 'is_upload',
  229. '关联诊断专家': 'diagnosis_id',
  230. '是否启用': 'status',
  231. '备注/描述': 'description',
  232. '创建时间 ': 'create_time',
  233. '更新时间': 'updated_time',
  234. '创建人ID': 'created_id',
  235. '更新人ID': 'updated_id',
  236. }
  237. try:
  238. contents = await file.read()
  239. df = pd.read_excel(io.BytesIO(contents))
  240. await file.close()
  241. if df.empty:
  242. raise CustomException(msg="导入文件为空")
  243. missing_headers = [header for header in header_dict.keys() if header not in df.columns]
  244. if missing_headers:
  245. raise CustomException(msg=f"导入文件缺少必要的列: {', '.join(missing_headers)}")
  246. df.rename(columns=header_dict, inplace=True)
  247. # 验证必填字段
  248. error_msgs = []
  249. success_count = 0
  250. count = 0
  251. for index, row in df.iterrows():
  252. count += 1
  253. try:
  254. data = {
  255. "id": row['id'],
  256. "crane_no": row['crane_no'],
  257. "var_code": row['var_code'],
  258. "var_name": row['var_name'],
  259. "mec_type": row['mec_type'],
  260. "data_type": row['data_type'],
  261. "switch_type": row['switch_type'],
  262. "addr": row['addr'],
  263. "gateway_id": row['gateway_id'],
  264. "var_sort": row['var_sort'],
  265. "var_group": row['var_group'],
  266. "var_category": row['var_category'],
  267. "translate": row['translate'],
  268. "device_no": row['device_no'],
  269. "is_reverse": row['is_reverse'],
  270. "is_top_show": row['is_top_show'],
  271. "is_save": row['is_save'],
  272. "is_calibration": row['is_calibration'],
  273. "is_overview_top_show": row['is_overview_top_show'],
  274. "is_home_page_show": row['is_home_page_show'],
  275. "is_diagnose": row['is_diagnose'],
  276. "is_upload": row['is_upload'],
  277. "diagnosis_id": row['diagnosis_id'],
  278. "status": row['status'],
  279. "description": row['description'],
  280. "create_time": row['create_time'],
  281. "updated_time": row['updated_time'],
  282. "created_id": row['created_id'],
  283. "updated_id": row['updated_id'],
  284. }
  285. # 使用CreateSchema做校验后入库
  286. create_schema = BizVarDictCreateSchema.model_validate(data)
  287. # 检查唯一性约束
  288. await BizVarDictCRUD(auth).create_vardict_crud(data=create_schema)
  289. success_count += 1
  290. except Exception as e:
  291. error_msgs.append(f"第{count}行: {str(e)}")
  292. continue
  293. result = f"成功导入 {success_count} 条数据"
  294. if error_msgs:
  295. result += "\n错误信息:\n" + "\n".join(error_msgs)
  296. return result
  297. except Exception as e:
  298. log.error(f"批量导入失败: {str(e)}")
  299. raise CustomException(msg=f"导入失败: {str(e)}")
  300. @classmethod
  301. async def import_template_download_vardict_service(cls) -> bytes:
  302. """下载导入模板"""
  303. header_list = [
  304. 'id ',
  305. '',
  306. '变量code',
  307. '变量名称',
  308. '所属机构',
  309. '数据类型',
  310. '变量类型',
  311. 'modbus地址',
  312. '网关',
  313. '排序',
  314. '变量分组',
  315. '变量分类',
  316. '绑定公式',
  317. '关联设备编号 ',
  318. '是否取反',
  319. '是否重点显示',
  320. '是否生成',
  321. '是否标定',
  322. '是否首页重点显示',
  323. '是否首页显示',
  324. '是否启用诊断专家',
  325. '是否上传云平台',
  326. '关联诊断专家',
  327. '是否启用',
  328. '备注/描述',
  329. '创建时间 ',
  330. '更新时间',
  331. '创建人ID',
  332. '更新人ID',
  333. ]
  334. selector_header_list = []
  335. option_list = []
  336. # 添加下拉选项
  337. selector_header_list.append('所属机构')
  338. option_list.append({'所属机构': []})
  339. selector_header_list.append('数据类型')
  340. option_list.append({'数据类型': []})
  341. selector_header_list.append('变量类型')
  342. option_list.append({'变量类型': []})
  343. selector_header_list.append('变量分类')
  344. option_list.append({'变量分类': []})
  345. return ExcelUtil.get_excel_template(
  346. header_list=header_list,
  347. selector_header_list=selector_header_list,
  348. option_list=option_list
  349. )
  350. @classmethod
  351. async def get_vardict_group_service(cls, auth: AuthSchema,redis: Redis,crane_no: str):
  352. """
  353. 从缓存获取变量分组数据列表信息service
  354. 参数:
  355. - redis (Redis): Redis客户端
  356. - id (int): 行车id
  357. 返回:
  358. - list[dict]: 变量分组数据列表
  359. """
  360. try:
  361. redis_key = f"{RedisInitKeyConfig.VAR_DICT.key}:{crane_no}"
  362. obj_list_dict = await RedisCURD(redis).get(redis_key)
  363. # 确保返回数据正确序列化
  364. if obj_list_dict:
  365. if isinstance(obj_list_dict, str):
  366. try:
  367. return json.loads(obj_list_dict)
  368. except json.JSONDecodeError:
  369. log.warning(f"变量分组数据反序列化失败,尝试重新初始化缓存: {'行车:'+crane_no}")
  370. elif isinstance(obj_list_dict, list):
  371. return obj_list_dict
  372. # 缓存不存在或格式错误时重新初始化
  373. await cls.init_vardict_service(redis,crane_no=crane_no)
  374. obj_list_dict = await RedisCURD(redis).get(redis_key)
  375. if not obj_list_dict:
  376. raise CustomException(msg="变量分组数据不存在")
  377. # 再次确保返回数据正确序列化
  378. if isinstance(obj_list_dict, str):
  379. try:
  380. return json.loads(obj_list_dict)
  381. except json.JSONDecodeError:
  382. raise CustomException(msg="变量分组数据格式错误")
  383. return obj_list_dict
  384. except CustomException:
  385. raise
  386. except Exception as e:
  387. log.error(f"获取变量分组数据缓存失败: {str(e)}")
  388. raise CustomException(msg=f"获取变量分组数据失败: {str(e)}")
  389. @classmethod
  390. async def get_vardict_alarms_service(cls, auth: AuthSchema, redis: Redis):
  391. """
  392. 从缓存获取变量分组数据列表信息service
  393. 参数:
  394. - redis (Redis): Redis客户端
  395. - id (int): 行车id
  396. 返回:
  397. - list[dict]: 变量分组数据列表
  398. """
  399. try:
  400. redis_key = f"{RedisInitKeyConfig.VAR_DICT.key}:'alarms_all'"
  401. obj_list_dict = await RedisCURD(redis).get(redis_key)
  402. # 确保返回数据正确序列化
  403. if obj_list_dict:
  404. if isinstance(obj_list_dict, str):
  405. try:
  406. return json.loads(obj_list_dict)
  407. except json.JSONDecodeError:
  408. log.warning(f"变量报警数据反序列化失败,尝试重新初始化缓存")
  409. elif isinstance(obj_list_dict, list):
  410. return obj_list_dict
  411. # 缓存不存在或格式错误时重新初始化
  412. await cls.init_vardict_service(redis)
  413. obj_list_dict = await RedisCURD(redis).get(redis_key)
  414. if not obj_list_dict:
  415. raise CustomException(msg="变量报警数据不存在")
  416. # 再次确保返回数据正确序列化
  417. if isinstance(obj_list_dict, str):
  418. try:
  419. return json.loads(obj_list_dict)
  420. except json.JSONDecodeError:
  421. raise CustomException(msg="变量报警数据格式错误")
  422. return obj_list_dict
  423. except CustomException:
  424. raise
  425. except Exception as e:
  426. log.error(f"获取变量报警数据缓存失败: {str(e)}")
  427. raise CustomException(msg=f"获取变量报警数据失败: {str(e)}")
  428. @classmethod
  429. async def init_vardict_service(cls, redis: Redis,crane_no:str = None):
  430. """
  431. 应用初始化: 获取所有天车变量数据信息并缓存service
  432. 参数:
  433. - redis (Redis): Redis客户端
  434. 返回:
  435. - None
  436. """
  437. try:
  438. async with async_db_session() as session:
  439. async with session.begin():
  440. # 在初始化过程中,不需要检查数据权限
  441. auth = AuthSchema(db=session, check_data_scope=False)
  442. #初始化行车机构分组变量数据
  443. if crane_no:
  444. search = {'status':'1','crane_no':crane_no}
  445. else:
  446. search = {'status': '1'}
  447. crane_list = await BizCraneCRUD(auth).list(search=search,order_by=[{'order':'asc'}])
  448. success_count = 0
  449. fail_count = 0
  450. for crane in crane_list:
  451. crane_no = crane.crane_no
  452. crane_name = crane.crane_name
  453. try:
  454. varDictMecGroupSchemaList: list[VarDictMecGroupSchema] = []
  455. mec_list = await BizMecCRUD(auth).list(search={'crane_no':crane_no,'status':'1'},order_by=[{'sort':'asc'}])
  456. for mec in mec_list:
  457. # 获取分组数据
  458. mecVarDicts = await BizVarDictCRUD(auth).list(
  459. search={'crane_no': crane_no, 'mec_type': mec.mec_type, 'status': '1'},
  460. order_by=[{'var_sort': 'asc'}])
  461. if not mecVarDicts:
  462. continue
  463. alarmVarList = await BizVarDictCRUD(auth).list(search={'crane_no': crane_no,'mec_type':mec.mec_type, 'switch_type': ('>=','2'), 'status': '1'},order_by=[{'var_sort': 'asc'}])
  464. digitalVarList = await BizVarDictCRUD(auth).list(search={'crane_no':crane_no,'mec_type':mec.mec_type,'data_type':'1','status':'1'},order_by=[{'var_sort':'asc'}])
  465. analogVarList = await BizVarDictCRUD(auth).list(search={'crane_no': crane_no,'mec_type':mec.mec_type, 'data_type': ('!=', '1'), 'status': '1'},order_by=[{'var_sort': 'asc'}])
  466. varDictMecGroupSchemaList.append(
  467. VarDictMecGroupSchema(mec_type=mec.mec_type,
  468. mecVarList_simple=mecVarDicts,
  469. digital_varList=digitalVarList,
  470. analog_varList=analogVarList,
  471. alarm_varList=alarmVarList))
  472. # 保存到Redis并设置过期时间
  473. redis_key = f"{RedisInitKeyConfig.VAR_DICT.key}:{crane_no}"
  474. var_dict_list = [item.model_dump() for item in varDictMecGroupSchemaList]
  475. value = json.dumps(var_dict_list, ensure_ascii=False)
  476. await RedisCURD(redis).set(
  477. key=redis_key,
  478. value=value,
  479. )
  480. success_count += 1
  481. log.info(f"✅ 机构分组变量数据缓存成功: {crane_name}")
  482. except Exception as e:
  483. fail_count += 1
  484. log.error(f"❌ 初始化机构分组变量数据失败 [{crane_name}]: {e}")
  485. log.info(f"机构分组变量数据初始化完成 - 成功: {success_count}, 失败: {fail_count}")
  486. #初始化所有行车报警变量数据
  487. try:
  488. varDicts = await cls.vardict_alarms_list(auth=auth)
  489. redis_key = f"{RedisInitKeyConfig.VAR_DICT.key}:'alarms_all'"
  490. value = json.dumps(varDicts, ensure_ascii=False)
  491. await RedisCURD(redis).set(
  492. key=redis_key,
  493. value=value,
  494. )
  495. log.info(f"✅ 报警变量数据缓存成功")
  496. except Exception as e:
  497. log.error(f"❌ 初始化报警变量数据失败: {e}")
  498. except Exception as e:
  499. log.error(f"变量数据初始化过程发生错误: {e}")
  500. # 只在严重错误时抛出异常,允许单个字典加载失败
  501. raise CustomException(msg=f"变量数据初始化失败: {str(e)}")
  502. @classmethod
  503. async def get_tdengine_data(cls, auth: AuthSchema, page_no: int, page_size: int,stable_name:str,
  504. search: BizVarDictQueryParam | None = None) -> dict:
  505. var_dict_search_dict = {'crane_no':search.crane_no,'data_type':search.data_type,'mec_type':search.mec_type}
  506. offset = (page_no - 1) * page_size
  507. base_sql = "SELECT * FROM "+stable_name
  508. filter_conditions = []
  509. crane_no = search.crane_no
  510. mec_type = search.mec_type
  511. if crane_no:
  512. safe_crane_no = crane_no.strip().replace("'", "''")
  513. filter_conditions.append(f"crane_no = '{safe_crane_no}'")
  514. if mec_type:
  515. mec_var_dict = await BizVarDictCRUD(auth).list(search=var_dict_search_dict)
  516. var_codes = [item.var_code for item in mec_var_dict if item.var_code]
  517. if var_codes:
  518. var_codes_str = "','".join(var_codes)
  519. filter_conditions.append(f"var_code IN ('{var_codes_str}')")
  520. # 4. 过滤条件2:created_time时间范围(新增核心逻辑)
  521. created_time = search.created_time
  522. if created_time and isinstance(created_time, tuple) and len(created_time) == 2:
  523. # 解析between条件:格式为('between', (start_time, end_time))
  524. condition_type, time_range = created_time
  525. if condition_type == "between" and isinstance(time_range, (list, tuple)) and len(time_range) == 2:
  526. start_time, end_time = time_range
  527. # 校验时间类型并格式化为TDengine支持的字符串
  528. if isinstance(start_time, datetime) and isinstance(end_time, datetime):
  529. # 格式化时间为"YYYY-MM-DD HH:MM:SS"(匹配TDengine的时间格式)
  530. start_str = start_time.strftime("%Y-%m-%d %H:%M:%S")
  531. end_str = end_time.strftime("%Y-%m-%d %H:%M:%S")
  532. # 防SQL注入:转义单引号(虽然时间格式不会有,但做兜底)
  533. safe_start = start_str.replace("'", "''")
  534. safe_end = end_str.replace("'", "''")
  535. # 添加时间范围条件(TDengine的ts字段对应创建时间)
  536. filter_conditions.append(f"ts BETWEEN '{safe_start}' AND '{safe_end}'")
  537. # 5. 拼接WHERE子句
  538. where_clause = " WHERE " + " AND ".join(filter_conditions) if filter_conditions else ""
  539. # 6. 构建完整查询SQL(排序+分页)
  540. if page_size == 1000: #历史曲线用
  541. query_sql = f"{base_sql}{where_clause} ORDER BY ts DESC"
  542. else:
  543. query_sql = f"{base_sql}{where_clause} ORDER BY ts DESC LIMIT {offset}, {page_size}"
  544. rest_result = await tdengine_rest_query(query_sql)
  545. formatted_data = await format_rest_result(rest_result)
  546. #查找var_name
  547. varDicts = await BizVarDictCRUD(auth).list(search=var_dict_search_dict)
  548. if formatted_data:
  549. for item in formatted_data:
  550. normal_time = item.get('ts').replace('T', ' ').replace('+08:00', '')
  551. item['ts'] = normal_time
  552. for varDict in varDicts:
  553. if item.get('var_code') == varDict.var_code:
  554. item['var_name'] = varDict.var_name
  555. break
  556. total = await get_table_total_count(stable_name, where_clause)
  557. return {
  558. "page_no": page_no,
  559. "page_size": page_size,
  560. "total": total,
  561. "has_next": offset + page_size < total,
  562. "items": formatted_data
  563. }
  564. @classmethod
  565. async def get_tdengine_data_test(cls, auth: AuthSchema, page_no: int, page_size: int, stable_name: str,
  566. search: BizVarDictQueryParam | None = None) -> dict:
  567. var_dict_search_dict = {'crane_no': search.crane_no, 'data_type': search.data_type}
  568. offset = (page_no - 1) * page_size
  569. # 拼接SQL(替换时间占位符,防注入)
  570. base_sql = f"""
  571. WITH target_data AS (
  572. SELECT
  573. var_code,
  574. ts,
  575. val,
  576. LAG(val) OVER (PARTITION BY var_code ORDER BY ts) AS prev_val,
  577. LAST_VALUE(val) OVER (PARTITION BY var_code ORDER BY ts ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS latest_val
  578. FROM st_digital
  579. WHERE
  580. val IN (0, 1)
  581. ),
  582. trigger_events AS (
  583. SELECT
  584. var_code,
  585. ts_cn AS trigger_time,
  586. latest_val,
  587. ROW_NUMBER() OVER (PARTITION BY var_code ORDER BY ts_cn) AS trigger_batch_id
  588. FROM target_data
  589. WHERE prev_val = 0 AND val = 1
  590. ),
  591. recover_events AS (
  592. SELECT
  593. var_code,
  594. ts_cn AS recover_time,
  595. ROW_NUMBER() OVER (PARTITION BY var_code ORDER BY ts_cn) AS recover_batch_id
  596. FROM target_data
  597. WHERE prev_val = 1 AND val = 0
  598. )
  599. SELECT
  600. t.var_code,
  601. CASE t.latest_val
  602. WHEN 1 THEN '触发中'
  603. WHEN 0 THEN '已恢复'
  604. ELSE '无数据'
  605. END AS current_status,
  606. t.trigger_time,
  607. IFNULL(r.recover_time, '未恢复') AS recover_time
  608. FROM trigger_events t
  609. LEFT JOIN recover_events r
  610. ON t.var_code = r.var_code
  611. AND t.trigger_batch_id = r.recover_batch_id
  612. ORDER BY t.var_name ASC, t.trigger_time ASC;
  613. """
  614. rest_result = await tdengine_rest_query(base_sql)
  615. formatted_data = await format_rest_result(rest_result)
  616. # 查找var_name
  617. varDicts = await BizVarDictCRUD(auth).list(search=var_dict_search_dict)
  618. if formatted_data:
  619. for item in formatted_data:
  620. normal_time = item.get('ts').replace('T', ' ').replace('+08:00', '')
  621. item['ts'] = normal_time
  622. for varDict in varDicts:
  623. if item.get('var_code') == varDict.var_code:
  624. item['var_name'] = varDict.var_name
  625. break
  626. total = await get_table_total_count(stable_name, where_clause)
  627. return {
  628. "page_no": page_no,
  629. "page_size": page_size,
  630. "total": total,
  631. "has_next": offset + page_size < total,
  632. "items": formatted_data
  633. }