|
@@ -2,8 +2,9 @@
|
|
|
import asyncio
|
|
import asyncio
|
|
|
import io
|
|
import io
|
|
|
import json
|
|
import json
|
|
|
-from datetime import datetime
|
|
|
|
|
-from typing import Any
|
|
|
|
|
|
|
+from collections import defaultdict
|
|
|
|
|
+from datetime import datetime, timedelta
|
|
|
|
|
+from typing import Any, List, Dict, Optional
|
|
|
|
|
|
|
|
from fastapi import UploadFile
|
|
from fastapi import UploadFile
|
|
|
import pandas as pd
|
|
import pandas as pd
|
|
@@ -73,21 +74,53 @@ class BizVarDictService:
|
|
|
return result
|
|
return result
|
|
|
|
|
|
|
|
@classmethod
|
|
@classmethod
|
|
|
- async def vardict_alarms_list(cls, auth: AuthSchema, crane_no: str = None) -> list[dict]:
|
|
|
|
|
|
|
+ async def vardict_alarms_list(cls, auth: AuthSchema, search: BizVarDictQueryParam | None = None) -> list[dict]:
|
|
|
|
|
|
|
|
sql_parts = [
|
|
sql_parts = [
|
|
|
"""SELECT a.*,b.crane_name
|
|
"""SELECT a.*,b.crane_name
|
|
|
FROM biz_var_dict as a
|
|
FROM biz_var_dict as a
|
|
|
LEFT JOIN biz_crane as b ON a.crane_no = b.crane_no
|
|
LEFT JOIN biz_crane as b ON a.crane_no = b.crane_no
|
|
|
- WHERE a.`status` = :status AND b.`status` = :status AND a.switch_type >= 2"""
|
|
|
|
|
|
|
+ WHERE a.`status` = :status AND b.`status` = :status AND a.data_type <= 1 AND a.switch_type >= 2"""
|
|
|
]
|
|
]
|
|
|
|
|
|
|
|
business_params: dict[str, Any] = {"status": 1}
|
|
business_params: dict[str, Any] = {"status": 1}
|
|
|
|
|
+ if search and search.crane_no:
|
|
|
|
|
+ sql_parts.append(f"AND a.crane_no = :crane_no")
|
|
|
|
|
+ business_params["crane_no"] = search.crane_no
|
|
|
|
|
+ if search and search.mec_type:
|
|
|
|
|
+ sql_parts.append(f"AND a.mec_type = :mec_type")
|
|
|
|
|
+ business_params["mec_type"] = search.mec_type
|
|
|
|
|
+
|
|
|
|
|
+ sql_parts.append("ORDER BY a.switch_type desc,b.`order` asc,a.mec_type asc,a.var_sort asc")
|
|
|
|
|
+ final_sql = " ".join(sql_parts)
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ obj_list = await BizVarDictCRUD(auth).list_sql(final_sql, business_params)
|
|
|
|
|
|
|
|
- if crane_no and isinstance(crane_no, str) and crane_no.strip():
|
|
|
|
|
- valid_crane_no = crane_no.strip()
|
|
|
|
|
|
|
+ return [BizVarDictOutSchema.model_validate(obj).model_dump() for obj in obj_list]
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ raise CustomException(msg=f"查询变量字典报警列表失败:{str(e)}")
|
|
|
|
|
+
|
|
|
|
|
+ @classmethod
|
|
|
|
|
+ async def vardict_operation_record_list(cls, auth: AuthSchema, search: BizVarDictQueryParam | None = None) -> list[dict]:
|
|
|
|
|
+
|
|
|
|
|
+ crane_no = search.crane_no
|
|
|
|
|
+ mec_type = search.mec_type
|
|
|
|
|
+ sql_parts = [
|
|
|
|
|
+ """SELECT a.*,b.crane_name
|
|
|
|
|
+ FROM biz_var_dict as a
|
|
|
|
|
+ LEFT JOIN biz_crane as b ON a.crane_no = b.crane_no
|
|
|
|
|
+ WHERE a.`status` = :status AND b.`status` = :status AND a.data_type <= 1 AND a.switch_type <= 1"""
|
|
|
|
|
+ ]
|
|
|
|
|
+
|
|
|
|
|
+ business_params: dict[str, Any] = {"status": 1}
|
|
|
|
|
+
|
|
|
|
|
+ if crane_no:
|
|
|
sql_parts.append(f"AND a.crane_no = :crane_no")
|
|
sql_parts.append(f"AND a.crane_no = :crane_no")
|
|
|
- business_params["crane_no"] = valid_crane_no
|
|
|
|
|
|
|
+ business_params["crane_no"] = crane_no
|
|
|
|
|
+ if mec_type:
|
|
|
|
|
+ sql_parts.append(f"AND a.mec_type = :mec_type")
|
|
|
|
|
+ business_params["mec_type"] = mec_type
|
|
|
|
|
|
|
|
sql_parts.append("ORDER BY b.`order` asc,a.mec_type asc,a.var_sort asc")
|
|
sql_parts.append("ORDER BY b.`order` asc,a.mec_type asc,a.var_sort asc")
|
|
|
final_sql = " ".join(sql_parts)
|
|
final_sql = " ".join(sql_parts)
|
|
@@ -99,9 +132,11 @@ class BizVarDictService:
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
|
raise CustomException(msg=f"查询变量字典报警列表失败:{str(e)}")
|
|
raise CustomException(msg=f"查询变量字典报警列表失败:{str(e)}")
|
|
|
|
|
|
|
|
- @classmethod
|
|
|
|
|
- async def vardict_analog_list(cls, auth: AuthSchema, crane_no: str = None) -> list[dict]:
|
|
|
|
|
|
|
|
|
|
|
|
+ @classmethod
|
|
|
|
|
+ async def vardict_analog_list(cls, auth: AuthSchema, search: BizVarDictQueryParam | None = None) -> list[dict]:
|
|
|
|
|
+ crane_no = search.crane_no
|
|
|
|
|
+ mec_type = search.mec_type
|
|
|
sql_parts = [
|
|
sql_parts = [
|
|
|
"""SELECT a.*,b.crane_name
|
|
"""SELECT a.*,b.crane_name
|
|
|
FROM biz_var_dict as a
|
|
FROM biz_var_dict as a
|
|
@@ -111,10 +146,12 @@ class BizVarDictService:
|
|
|
|
|
|
|
|
business_params: dict[str, Any] = {"status": 1}
|
|
business_params: dict[str, Any] = {"status": 1}
|
|
|
|
|
|
|
|
- if crane_no and isinstance(crane_no, str) and crane_no.strip():
|
|
|
|
|
- valid_crane_no = crane_no.strip()
|
|
|
|
|
|
|
+ if crane_no:
|
|
|
sql_parts.append(f"AND a.crane_no = :crane_no")
|
|
sql_parts.append(f"AND a.crane_no = :crane_no")
|
|
|
- business_params["crane_no"] = valid_crane_no
|
|
|
|
|
|
|
+ business_params["crane_no"] = crane_no
|
|
|
|
|
+ if mec_type:
|
|
|
|
|
+ sql_parts.append(f"AND a.mec_type = :mec_type")
|
|
|
|
|
+ business_params["mec_type"] = mec_type
|
|
|
|
|
|
|
|
sql_parts.append("ORDER BY b.`order` asc,a.mec_type asc,a.var_sort asc")
|
|
sql_parts.append("ORDER BY b.`order` asc,a.mec_type asc,a.var_sort asc")
|
|
|
final_sql = " ".join(sql_parts)
|
|
final_sql = " ".join(sql_parts)
|
|
@@ -563,23 +600,28 @@ class BizVarDictService:
|
|
|
@classmethod
|
|
@classmethod
|
|
|
async def get_tdengine_data(cls, auth: AuthSchema, page_no: int, page_size: int,stable_name:str,
|
|
async def get_tdengine_data(cls, auth: AuthSchema, page_no: int, page_size: int,stable_name:str,
|
|
|
search: BizVarDictQueryParam | None = None) -> dict:
|
|
search: BizVarDictQueryParam | None = None) -> dict:
|
|
|
- var_dict_search_dict = {'crane_no':search.crane_no,'data_type':search.data_type,'mec_type':search.mec_type}
|
|
|
|
|
|
|
+ var_dict_search_dict = {'crane_no':search.crane_no,'data_type':search.data_type,'mec_type':search.mec_type,'var_code':search.var_code}
|
|
|
offset = (page_no - 1) * page_size
|
|
offset = (page_no - 1) * page_size
|
|
|
base_sql = "SELECT * FROM "+stable_name
|
|
base_sql = "SELECT * FROM "+stable_name
|
|
|
filter_conditions = []
|
|
filter_conditions = []
|
|
|
crane_no = search.crane_no
|
|
crane_no = search.crane_no
|
|
|
- mec_type = search.mec_type
|
|
|
|
|
if crane_no:
|
|
if crane_no:
|
|
|
safe_crane_no = crane_no.strip().replace("'", "''")
|
|
safe_crane_no = crane_no.strip().replace("'", "''")
|
|
|
filter_conditions.append(f"crane_no = '{safe_crane_no}'")
|
|
filter_conditions.append(f"crane_no = '{safe_crane_no}'")
|
|
|
|
|
|
|
|
- if mec_type:
|
|
|
|
|
- mec_var_dict = await BizVarDictCRUD(auth).list(search=var_dict_search_dict)
|
|
|
|
|
- var_codes = [item.var_code for item in mec_var_dict if item.var_code]
|
|
|
|
|
- if var_codes:
|
|
|
|
|
- var_codes_str = "','".join(var_codes)
|
|
|
|
|
- filter_conditions.append(f"var_code IN ('{var_codes_str}')")
|
|
|
|
|
-
|
|
|
|
|
|
|
+ mec_var_dict = await BizVarDictCRUD(auth).list(search=var_dict_search_dict)
|
|
|
|
|
+ var_codes = [item.var_code for item in mec_var_dict if item.var_code]
|
|
|
|
|
+ if var_codes:
|
|
|
|
|
+ var_codes_str = "','".join(var_codes)
|
|
|
|
|
+ filter_conditions.append(f"var_code IN ('{var_codes_str}')")
|
|
|
|
|
+ else:
|
|
|
|
|
+ return {
|
|
|
|
|
+ "page_no": page_no,
|
|
|
|
|
+ "page_size": page_size,
|
|
|
|
|
+ "total": 0,
|
|
|
|
|
+ "has_next": False,
|
|
|
|
|
+ "items": []
|
|
|
|
|
+ }
|
|
|
# 4. 过滤条件2:created_time时间范围(新增核心逻辑)
|
|
# 4. 过滤条件2:created_time时间范围(新增核心逻辑)
|
|
|
created_time = search.created_time
|
|
created_time = search.created_time
|
|
|
if created_time and isinstance(created_time, tuple) and len(created_time) == 2:
|
|
if created_time and isinstance(created_time, tuple) and len(created_time) == 2:
|
|
@@ -628,75 +670,280 @@ class BizVarDictService:
|
|
|
"items": formatted_data
|
|
"items": formatted_data
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ # 定义常量(解决魔法值问题)
|
|
|
|
|
+ CONST_VAL_TRIGGER = 1 # 触发值
|
|
|
|
|
+ CONST_VAL_RECOVER = 0 # 恢复值
|
|
|
|
|
+ CONST_ORDER_ASC = "ASC"
|
|
|
|
|
+ CONST_ORDER_DESC = "DESC"
|
|
|
|
|
+ CONST_UNRECOVERED = "未恢复"
|
|
|
|
|
+ CONST_TIME_FORMAT = "%Y-%m-%d %H:%M:%S"
|
|
|
|
|
+
|
|
|
|
|
+ # 工具函数:封装时间格式转换
|
|
|
|
|
+ def format_timestamp(ts_str: Optional[str]) -> Optional[str]:
|
|
|
|
|
+ """标准化时间格式(去除T和+08:00)"""
|
|
|
|
|
+ if not ts_str:
|
|
|
|
|
+ return None
|
|
|
|
|
+ return ts_str.replace('T', ' ').replace('+08:00', '')
|
|
|
|
|
+
|
|
|
|
|
+ # 工具函数:安全转义SQL字符串(封装重复的防注入逻辑)
|
|
|
|
|
+ def escape_sql_str(s: Optional[str]) -> str:
|
|
|
|
|
+ """安全转义SQL字符串,防止注入"""
|
|
|
|
|
+ if not s:
|
|
|
|
|
+ return ""
|
|
|
|
|
+ return str(s).strip().replace("'", "''")
|
|
|
|
|
+
|
|
|
|
|
+ # 工具函数:构建var_code到var_name的映射(解决嵌套循环匹配问题)
|
|
|
|
|
+ def build_var_code_mapping(varDicts: List[any]) -> Dict[str, str]:
|
|
|
|
|
+ """构建var_code→var_name的字典,提升匹配效率"""
|
|
|
|
|
+ return {
|
|
|
|
|
+ item.var_code: item.var_name
|
|
|
|
|
+ for item in varDicts
|
|
|
|
|
+ if hasattr(item, 'var_code') and item.var_code
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
@classmethod
|
|
@classmethod
|
|
|
- async def get_tdengine_data_test(cls, auth: AuthSchema, page_no: int, page_size: int, stable_name: str,
|
|
|
|
|
- search: BizVarDictQueryParam | None = None) -> dict:
|
|
|
|
|
- var_dict_search_dict = {'crane_no': search.crane_no, 'data_type': search.data_type}
|
|
|
|
|
- offset = (page_no - 1) * page_size
|
|
|
|
|
- # 拼接SQL(替换时间占位符,防注入)
|
|
|
|
|
- base_sql = f"""
|
|
|
|
|
- WITH target_data AS (
|
|
|
|
|
- SELECT
|
|
|
|
|
- var_code,
|
|
|
|
|
- ts,
|
|
|
|
|
- val,
|
|
|
|
|
- LAG(val) OVER (PARTITION BY var_code ORDER BY ts) AS prev_val,
|
|
|
|
|
- LAST_VALUE(val) OVER (PARTITION BY var_code ORDER BY ts ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS latest_val
|
|
|
|
|
- FROM st_digital
|
|
|
|
|
- WHERE
|
|
|
|
|
- val IN (0, 1)
|
|
|
|
|
- ),
|
|
|
|
|
- trigger_events AS (
|
|
|
|
|
- SELECT
|
|
|
|
|
- var_code,
|
|
|
|
|
- ts_cn AS trigger_time,
|
|
|
|
|
- latest_val,
|
|
|
|
|
- ROW_NUMBER() OVER (PARTITION BY var_code ORDER BY ts_cn) AS trigger_batch_id
|
|
|
|
|
- FROM target_data
|
|
|
|
|
- WHERE prev_val = 0 AND val = 1
|
|
|
|
|
- ),
|
|
|
|
|
- recover_events AS (
|
|
|
|
|
- SELECT
|
|
|
|
|
- var_code,
|
|
|
|
|
- ts_cn AS recover_time,
|
|
|
|
|
- ROW_NUMBER() OVER (PARTITION BY var_code ORDER BY ts_cn) AS recover_batch_id
|
|
|
|
|
- FROM target_data
|
|
|
|
|
- WHERE prev_val = 1 AND val = 0
|
|
|
|
|
- )
|
|
|
|
|
- SELECT
|
|
|
|
|
- t.var_code,
|
|
|
|
|
- CASE t.latest_val
|
|
|
|
|
- WHEN 1 THEN '触发中'
|
|
|
|
|
- WHEN 0 THEN '已恢复'
|
|
|
|
|
- ELSE '无数据'
|
|
|
|
|
- END AS current_status,
|
|
|
|
|
- t.trigger_time,
|
|
|
|
|
- IFNULL(r.recover_time, '未恢复') AS recover_time
|
|
|
|
|
- FROM trigger_events t
|
|
|
|
|
- LEFT JOIN recover_events r
|
|
|
|
|
- ON t.var_code = r.var_code
|
|
|
|
|
- AND t.trigger_batch_id = r.recover_batch_id
|
|
|
|
|
- ORDER BY t.var_name ASC, t.trigger_time ASC;
|
|
|
|
|
|
|
+ async def _query_single_record(cls, stable_name: str, var_code: str, crane_no: str,
|
|
|
|
|
+ time_condition: str, val: int, order: str = CONST_ORDER_ASC) -> Optional[str]:
|
|
|
|
|
+ """
|
|
|
|
|
+ 精准查询单条关键记录(性能最优)
|
|
|
|
|
+ :param time_condition: 时间条件字符串,如 "ts > '2026-01-18 10:30:00'"
|
|
|
|
|
+ :param val: 要查询的val值(0/1)
|
|
|
|
|
+ :param order: 排序方式(ASC/DESC)
|
|
|
|
|
+ :return: 符合条件的第一条ts(北京时间),无则返回None
|
|
|
"""
|
|
"""
|
|
|
|
|
+ # 优化:使用封装的转义函数,减少重复代码
|
|
|
|
|
+ safe_stable = cls.escape_sql_str(stable_name)
|
|
|
|
|
+ safe_var = cls.escape_sql_str(var_code)
|
|
|
|
|
+ safe_crane = cls.escape_sql_str(crane_no)
|
|
|
|
|
|
|
|
- rest_result = await tdengine_rest_query(base_sql)
|
|
|
|
|
- formatted_data = await format_rest_result(rest_result)
|
|
|
|
|
- # 查找var_name
|
|
|
|
|
|
|
+ # 严格类型校验
|
|
|
|
|
+ try:
|
|
|
|
|
+ safe_val = int(val)
|
|
|
|
|
+ except (ValueError, TypeError):
|
|
|
|
|
+ return None
|
|
|
|
|
+
|
|
|
|
|
+ # 校验排序参数,防止非法值
|
|
|
|
|
+ safe_order = cls.CONST_ORDER_ASC if order.upper() != cls.CONST_ORDER_DESC else cls.CONST_ORDER_DESC
|
|
|
|
|
+
|
|
|
|
|
+ # 优化:SQL格式化更简洁,减少空格
|
|
|
|
|
+ sql = f"""
|
|
|
|
|
+ SELECT ts AS ts_cn
|
|
|
|
|
+ FROM {safe_stable}
|
|
|
|
|
+ WHERE
|
|
|
|
|
+ crane_no = '{safe_crane}'
|
|
|
|
|
+ AND var_code = '{safe_var}'
|
|
|
|
|
+ AND val = {safe_val}
|
|
|
|
|
+ AND {time_condition}
|
|
|
|
|
+ ORDER BY ts {safe_order}
|
|
|
|
|
+ LIMIT 1;
|
|
|
|
|
+ """
|
|
|
|
|
+ try:
|
|
|
|
|
+ rest_result = await tdengine_rest_query(sql)
|
|
|
|
|
+ formatted_result = await format_rest_result(rest_result)
|
|
|
|
|
+ if formatted_result and len(formatted_result) > 0:
|
|
|
|
|
+ ts_cn = cls.format_timestamp(formatted_result[0].get("ts_cn")) # 复用工具函数
|
|
|
|
|
+ return ts_cn
|
|
|
|
|
+ return None
|
|
|
|
|
+ # 优化:捕获具体异常,而非泛化Exception,便于排查问题
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ # 建议添加日志:logger.error(f"查询单条记录失败: {e}, SQL: {sql}")
|
|
|
|
|
+ return None
|
|
|
|
|
+
|
|
|
|
|
+ @classmethod
|
|
|
|
|
+ async def calc_switch_batch(cls, raw_formatted_data: List[Dict],
|
|
|
|
|
+ query_start: Optional[datetime] = None,
|
|
|
|
|
+ query_end: Optional[datetime] = None,
|
|
|
|
|
+ stable_name: Optional[str] = None) -> List[Dict]:
|
|
|
|
|
+ if not raw_formatted_data:
|
|
|
|
|
+ return []
|
|
|
|
|
+
|
|
|
|
|
+ # 优化1:使用defaultdict简化分组逻辑,减少判断
|
|
|
|
|
+ point_groups: Dict[str, List[Dict]] = defaultdict(list)
|
|
|
|
|
+ for item in raw_formatted_data:
|
|
|
|
|
+ var_code = item.get("var_code")
|
|
|
|
|
+ crane_no = item.get("crane_no")
|
|
|
|
|
+ if not var_code or not crane_no:
|
|
|
|
|
+ continue
|
|
|
|
|
+ # 优化:捕获具体异常(ValueError),而非泛化except
|
|
|
|
|
+ try:
|
|
|
|
|
+ item["val"] = int(item.get("val", cls.CONST_VAL_RECOVER))
|
|
|
|
|
+ except ValueError:
|
|
|
|
|
+ item["val"] = cls.CONST_VAL_RECOVER
|
|
|
|
|
+ group_key = f"{crane_no}_{var_code}"
|
|
|
|
|
+ point_groups[group_key].append(item)
|
|
|
|
|
+
|
|
|
|
|
+ batch_list = []
|
|
|
|
|
+ # 遍历每个点位
|
|
|
|
|
+ for group_key, point_records in point_groups.items():
|
|
|
|
|
+ # 优化:增加异常处理,防止group_key格式错误
|
|
|
|
|
+ try:
|
|
|
|
|
+ crane_no, var_code = group_key.split("_", 1)
|
|
|
|
|
+ except ValueError:
|
|
|
|
|
+ # 日志:logger.warning(f"无效的分组键: {group_key}")
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ # 关键修复:必须排序!原代码注释掉了排序,会导致触发/恢复判断完全错误
|
|
|
|
|
+ point_records.sort(key=lambda x: x["ts"])
|
|
|
|
|
+
|
|
|
|
|
+ # 优化:直接取第一个item的var_name,无则用默认值
|
|
|
|
|
+ var_name = point_records[0].get("var_name", f"未知点位({var_code})")
|
|
|
|
|
+
|
|
|
|
|
+ # 提取触发/恢复事件(0→1/1→0)
|
|
|
|
|
+ is_all_0 = True
|
|
|
|
|
+ trigger_times = [] # 触发时间列表
|
|
|
|
|
+ recover_times = [] # 恢复时间列表
|
|
|
|
|
+ prev_val = None
|
|
|
|
|
+
|
|
|
|
|
+ for record in point_records:
|
|
|
|
|
+ current_val = record["val"]
|
|
|
|
|
+ current_ts = record["ts"]
|
|
|
|
|
+ if prev_val is None:
|
|
|
|
|
+ prev_val = current_val
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ # 优化:使用常量替代魔法值,提升可读性
|
|
|
|
|
+ if prev_val == cls.CONST_VAL_RECOVER and current_val == cls.CONST_VAL_TRIGGER:
|
|
|
|
|
+ trigger_times.append(current_ts)
|
|
|
|
|
+ elif prev_val == cls.CONST_VAL_TRIGGER and current_val == cls.CONST_VAL_RECOVER:
|
|
|
|
|
+ recover_times.append(current_ts)
|
|
|
|
|
+
|
|
|
|
|
+ prev_val = current_val
|
|
|
|
|
+ if current_val == cls.CONST_VAL_TRIGGER:
|
|
|
|
|
+ is_all_0 = False
|
|
|
|
|
+
|
|
|
|
|
+ # ---------------------- 无触发记录(全0)→ 返回空 ----------------------
|
|
|
|
|
+ if is_all_0:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ # ---------------------- 第一条是触发追溯真实触发时间 ----------------------
|
|
|
|
|
+ if point_records[0].get('val') == cls.CONST_VAL_TRIGGER:
|
|
|
|
|
+ if not (query_start and stable_name):
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ start_str = query_start.strftime(cls.CONST_TIME_FORMAT)
|
|
|
|
|
+ recover_before_start = await cls._query_single_record(
|
|
|
|
|
+ stable_name, var_code, crane_no,
|
|
|
|
|
+ f"ts < '{cls.escape_sql_str(start_str)}'",
|
|
|
|
|
+ cls.CONST_VAL_RECOVER, cls.CONST_ORDER_DESC
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ real_trigger = point_records[0]["ts"]
|
|
|
|
|
+ if recover_before_start:
|
|
|
|
|
+ trigger_after_recover = await cls._query_single_record(
|
|
|
|
|
+ stable_name, var_code, crane_no,
|
|
|
|
|
+ f"ts > '{cls.escape_sql_str(recover_before_start)}'",
|
|
|
|
|
+ cls.CONST_VAL_TRIGGER, cls.CONST_ORDER_ASC
|
|
|
|
|
+ )
|
|
|
|
|
+ if trigger_after_recover:
|
|
|
|
|
+ real_trigger = trigger_after_recover
|
|
|
|
|
+ trigger_times.insert(0, real_trigger)
|
|
|
|
|
+
|
|
|
|
|
+ # ---------------------- 最后一条是触发追溯真实恢复时间 ----------------------
|
|
|
|
|
+ if point_records[-1].get('val') == cls.CONST_VAL_TRIGGER:
|
|
|
|
|
+ if not (query_end and stable_name):
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ end_str = query_end.strftime(cls.CONST_TIME_FORMAT)
|
|
|
|
|
+ recover_after_end = await cls._query_single_record(
|
|
|
|
|
+ stable_name, var_code, crane_no,
|
|
|
|
|
+ f"ts > '{cls.escape_sql_str(end_str)}'",
|
|
|
|
|
+ cls.CONST_VAL_RECOVER, cls.CONST_ORDER_ASC
|
|
|
|
|
+ )
|
|
|
|
|
+ if recover_after_end:
|
|
|
|
|
+ recover_times.append(recover_after_end)
|
|
|
|
|
+
|
|
|
|
|
+ # ---------------------- 匹配 ----------------------
|
|
|
|
|
+ min_len = min(len(trigger_times), len(recover_times))
|
|
|
|
|
+ for i in range(min_len):
|
|
|
|
|
+ batch_list.append({
|
|
|
|
|
+ "var_name": var_name,
|
|
|
|
|
+ "val": cls.CONST_VAL_RECOVER, # 已恢复
|
|
|
|
|
+ "str_time": trigger_times[i],
|
|
|
|
|
+ "end_time": recover_times[i]
|
|
|
|
|
+ })
|
|
|
|
|
+
|
|
|
|
|
+ # 不匹配的情况只可能是触发没恢复
|
|
|
|
|
+ if len(trigger_times) > len(recover_times):
|
|
|
|
|
+ batch_list.append({
|
|
|
|
|
+ "var_name": var_name,
|
|
|
|
|
+ "val": cls.CONST_VAL_TRIGGER, # 触发中
|
|
|
|
|
+ "str_time": trigger_times[-1],
|
|
|
|
|
+ "end_time": cls.CONST_UNRECOVERED
|
|
|
|
|
+ })
|
|
|
|
|
+
|
|
|
|
|
+ # 最终排序
|
|
|
|
|
+ batch_list.sort(key=lambda x: x["str_time"], reverse=True)
|
|
|
|
|
+ return batch_list
|
|
|
|
|
+
|
|
|
|
|
+ @classmethod
|
|
|
|
|
+ async def get_tdengine_data_operation(cls, auth: AuthSchema, stable_name: str,
|
|
|
|
|
+ search: BizVarDictQueryParam | None = None) -> dict:
|
|
|
|
|
+ if not search:
|
|
|
|
|
+ return {"page_no": 0, "page_size": 0, "total": 0, "has_next": 0, "items": []}
|
|
|
|
|
+
|
|
|
|
|
+ # 优化:初始化查询参数,减少重复判断
|
|
|
|
|
+ var_dict_search_dict = {
|
|
|
|
|
+ 'crane_no': search.crane_no,
|
|
|
|
|
+ 'data_type': search.data_type,
|
|
|
|
|
+ 'mec_type': search.mec_type,
|
|
|
|
|
+ 'switch_type': search.switch_type,
|
|
|
|
|
+ 'var_code': search.var_code
|
|
|
|
|
+ }
|
|
|
|
|
+ filter_conditions = []
|
|
|
|
|
+ query_start: Optional[datetime] = None
|
|
|
|
|
+ query_end: Optional[datetime] = None
|
|
|
|
|
+
|
|
|
|
|
+ # 1. 过滤条件:crane_no
|
|
|
|
|
+ if search.crane_no:
|
|
|
|
|
+ filter_conditions.append(f"crane_no = '{cls.escape_sql_str(search.crane_no)}'")
|
|
|
|
|
+
|
|
|
|
|
+ # 2. 过滤条件:mec_type对应的var_code
|
|
|
varDicts = await BizVarDictCRUD(auth).list(search=var_dict_search_dict)
|
|
varDicts = await BizVarDictCRUD(auth).list(search=var_dict_search_dict)
|
|
|
|
|
+ var_code_map = cls.build_var_code_mapping(varDicts) # 构建映射字典
|
|
|
|
|
+
|
|
|
|
|
+ if search.mec_type and var_code_map:
|
|
|
|
|
+ var_codes_str = "','".join(cls.escape_sql_str(code) for code in var_code_map.keys())
|
|
|
|
|
+ filter_conditions.append(f"var_code IN ('{var_codes_str}')")
|
|
|
|
|
+
|
|
|
|
|
+ # 3. 过滤条件:created_time时间范围
|
|
|
|
|
+ if search.created_time and isinstance(search.created_time, tuple) and len(search.created_time) == 2:
|
|
|
|
|
+ condition_type, time_range = search.created_time
|
|
|
|
|
+ if condition_type == "between" and isinstance(time_range, (list, tuple)) and len(time_range) == 2:
|
|
|
|
|
+ query_start, query_end = time_range
|
|
|
|
|
+ if isinstance(query_start, datetime) and isinstance(query_end, datetime):
|
|
|
|
|
+ start_str = query_start.strftime(cls.CONST_TIME_FORMAT)
|
|
|
|
|
+ end_str = query_end.strftime(cls.CONST_TIME_FORMAT)
|
|
|
|
|
+ filter_conditions.append(
|
|
|
|
|
+ f"ts BETWEEN '{cls.escape_sql_str(start_str)}' AND '{cls.escape_sql_str(end_str)}'")
|
|
|
|
|
+
|
|
|
|
|
+ # 4. 拼接WHERE子句
|
|
|
|
|
+ where_clause = " WHERE " + " AND ".join(filter_conditions) if filter_conditions else ""
|
|
|
|
|
+
|
|
|
|
|
+ # 5. 查询原始数据(优化:SQL更简洁,排序仅保留必要的)
|
|
|
|
|
+ query_sql = f"SELECT * FROM {cls.escape_sql_str(stable_name)}{where_clause} ORDER BY ts " + cls.CONST_ORDER_ASC
|
|
|
|
|
+ rest_result = await tdengine_rest_query(query_sql)
|
|
|
|
|
+ formatted_data = await format_rest_result(rest_result)
|
|
|
|
|
+
|
|
|
|
|
+ # 6. 匹配var_name(优化:用字典映射替代嵌套循环,O(n)→O(1))
|
|
|
if formatted_data:
|
|
if formatted_data:
|
|
|
for item in formatted_data:
|
|
for item in formatted_data:
|
|
|
- normal_time = item.get('ts').replace('T', ' ').replace('+08:00', '')
|
|
|
|
|
- item['ts'] = normal_time
|
|
|
|
|
- for varDict in varDicts:
|
|
|
|
|
- if item.get('var_code') == varDict.var_code:
|
|
|
|
|
- item['var_name'] = varDict.var_name
|
|
|
|
|
- break
|
|
|
|
|
- total = await get_table_total_count(stable_name, where_clause)
|
|
|
|
|
|
|
+ item['ts'] = cls.format_timestamp(item.get('ts')) # 复用工具函数
|
|
|
|
|
+ # 优化:字典查找,无需遍历所有varDicts
|
|
|
|
|
+ item['var_name'] = var_code_map.get(item.get('var_code'), f"未知点位({item.get('var_code')})")
|
|
|
|
|
+
|
|
|
|
|
+ # 7. 调用批量计算方法
|
|
|
|
|
+ batch_result = await cls.calc_switch_batch(
|
|
|
|
|
+ raw_formatted_data=formatted_data,
|
|
|
|
|
+ query_start=query_start,
|
|
|
|
|
+ query_end=query_end,
|
|
|
|
|
+ stable_name=stable_name
|
|
|
|
|
+ )
|
|
|
|
|
|
|
|
|
|
+ # 8. 返回结果
|
|
|
return {
|
|
return {
|
|
|
- "page_no": page_no,
|
|
|
|
|
- "page_size": page_size,
|
|
|
|
|
- "total": total,
|
|
|
|
|
- "has_next": offset + page_size < total,
|
|
|
|
|
- "items": formatted_data
|
|
|
|
|
|
|
+ "page_no": 0,
|
|
|
|
|
+ "page_size": 0,
|
|
|
|
|
+ "total": len(batch_result),
|
|
|
|
|
+ "has_next": 0,
|
|
|
|
|
+ "items": batch_result
|
|
|
}
|
|
}
|