| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949 |
- # -*- coding: utf-8 -*-
- import asyncio
- import io
- import json
- from collections import defaultdict
- from datetime import datetime, timedelta
- from typing import Any, List, Dict, Optional
- from fastapi import UploadFile
- import pandas as pd
- from redis.asyncio.client import Redis
- from app.core.database import async_db_session
- from app.core.redis_crud import RedisCURD
- from app.common.enums import RedisInitKeyConfig
- from app.core.base_schema import BatchSetAvailable
- from app.core.exceptions import CustomException
- from app.utils.excel_util import ExcelUtil
- from app.core.logger import log
- from app.api.v1.module_system.auth.schema import AuthSchema
- from .schema import BizVarDictCreateSchema, BizVarDictUpdateSchema, BizVarDictOutSchema, BizVarDictQueryParam
- from ..crane.crud import BizCraneCRUD
- from ..crane.model import BizCraneModel
- from ..gateway.crud import GatewayCRUD
- from ..gateway.model import GatewayModel
- from ..mec.crud import BizMecCRUD
- from ..vardict.crud import BizVarDictCRUD
- from ..vardict.schema import VarDictMecGroupSchema
- from app.utils.tdengine_util import tdengine_rest_query, format_rest_result, get_table_total_count
- class BizVarDictService:
- """
- 变量信息服务层
- """
-
- @classmethod
- async def detail_vardict_service(cls, auth: AuthSchema, id: int) -> dict:
- """详情"""
- obj = await BizVarDictCRUD(auth).get_by_id_vardict_crud(id=id)
- if not obj:
- raise CustomException(msg="该数据不存在")
- crane = await BizCraneCRUD(auth).get_by_id_crane_crud_for_no(obj.crane_no)
- gateway = await GatewayCRUD(auth).get_by_id_gateway_crud(obj.gateway_id)
- res = BizVarDictOutSchema.model_validate(obj).model_dump()
- res['crane_name'] = crane.crane_name
- res['gateway_name'] = gateway.gateway_name if gateway else ""
- return res
-
- @classmethod
- async def list_vardict_service(cls, auth: AuthSchema, search: BizVarDictQueryParam | None = None, order_by: list[dict] | None = None) -> list[dict]:
- """列表查询"""
- search_dict = search.__dict__ if search else None
- obj_list = await BizVarDictCRUD(auth).list_vardict_crud(search=search_dict, order_by=order_by)
- return [BizVarDictOutSchema.model_validate(obj).model_dump() for obj in obj_list]
- @classmethod
- async def page_vardict_service(cls, auth: AuthSchema, page_no: int, page_size: int, search: BizVarDictQueryParam | None = None, order_by: list[dict] | None = None) -> dict:
- """分页查询(数据库分页)"""
- search_dict = search.__dict__ if search else {}
- order_by_list = order_by or [{'id': 'asc'}]
- offset = (page_no - 1) * page_size
- result = await BizVarDictCRUD(auth).page_vardict_crud(
- offset=offset,
- limit=page_size,
- order_by=order_by_list,
- search=search_dict
- )
- for item in result.get('items'):
- crane_model: BizCraneModel | None = await BizCraneCRUD(auth).get_by_id_crane_crud_for_no(crane_no=item['crane_no'])
- gateway_model: GatewayModel | None = await GatewayCRUD(auth).get_by_id_gateway_crud(id=item['gateway_id'])
- item['crane_name'] = crane_model.crane_name
- item['gateway_name'] = gateway_model.gateway_name if gateway_model else ""
- return result
- @classmethod
- async def vardict_alarms_list(cls, auth: AuthSchema, search: BizVarDictQueryParam | None = None) -> list[dict]:
- sql_parts = [
- """SELECT a.*,b.crane_name
- FROM biz_var_dict as a
- LEFT JOIN biz_crane as b ON a.crane_no = b.crane_no
- WHERE a.`status` = :status AND b.`status` = :status AND a.data_type <= 1 AND a.switch_type >= 2"""
- ]
- business_params: dict[str, Any] = {"status": 1}
- if search and search.crane_no:
- sql_parts.append(f"AND a.crane_no = :crane_no")
- business_params["crane_no"] = search.crane_no
- if search and search.mec_type:
- sql_parts.append(f"AND a.mec_type = :mec_type")
- business_params["mec_type"] = search.mec_type
- sql_parts.append("ORDER BY a.switch_type desc,b.`order` asc,a.mec_type asc,a.var_sort asc")
- final_sql = " ".join(sql_parts)
- try:
- obj_list = await BizVarDictCRUD(auth).list_sql(final_sql, business_params)
- return [BizVarDictOutSchema.model_validate(obj).model_dump() for obj in obj_list]
- except Exception as e:
- raise CustomException(msg=f"查询变量字典报警列表失败:{str(e)}")
- @classmethod
- async def vardict_operation_record_list(cls, auth: AuthSchema, search: BizVarDictQueryParam | None = None) -> list[dict]:
- crane_no = search.crane_no
- mec_type = search.mec_type
- sql_parts = [
- """SELECT a.*,b.crane_name
- FROM biz_var_dict as a
- LEFT JOIN biz_crane as b ON a.crane_no = b.crane_no
- WHERE a.`status` = :status AND b.`status` = :status AND a.data_type <= 1 AND a.switch_type <= 1"""
- ]
- business_params: dict[str, Any] = {"status": 1}
- if crane_no:
- sql_parts.append(f"AND a.crane_no = :crane_no")
- business_params["crane_no"] = crane_no
- if mec_type:
- sql_parts.append(f"AND a.mec_type = :mec_type")
- business_params["mec_type"] = mec_type
- sql_parts.append("ORDER BY b.`order` asc,a.mec_type asc,a.var_sort asc")
- final_sql = " ".join(sql_parts)
- try:
- obj_list = await BizVarDictCRUD(auth).list_sql(final_sql, business_params)
- return [BizVarDictOutSchema.model_validate(obj).model_dump() for obj in obj_list]
- except Exception as e:
- raise CustomException(msg=f"查询变量字典报警列表失败:{str(e)}")
- @classmethod
- async def vardict_analog_list(cls, auth: AuthSchema, search: BizVarDictQueryParam | None = None) -> list[dict]:
- crane_no = search.crane_no
- mec_type = search.mec_type
- sql_parts = [
- """SELECT a.*,b.crane_name
- FROM biz_var_dict as a
- LEFT JOIN biz_crane as b ON a.crane_no = b.crane_no
- WHERE a.`status` = :status AND b.`status` = :status AND a.data_type >= 2"""
- ]
- business_params: dict[str, Any] = {"status": 1}
- if crane_no:
- sql_parts.append(f"AND a.crane_no = :crane_no")
- business_params["crane_no"] = crane_no
- if mec_type:
- sql_parts.append(f"AND a.mec_type = :mec_type")
- business_params["mec_type"] = mec_type
- sql_parts.append("ORDER BY b.`order` asc,a.mec_type asc,a.var_sort asc")
- final_sql = " ".join(sql_parts)
- try:
- obj_list = await BizVarDictCRUD(auth).list_sql(final_sql, business_params)
- return [BizVarDictOutSchema.model_validate(obj).model_dump() for obj in obj_list]
- except Exception as e:
- raise CustomException(msg=f"查询变量字典模拟量列表失败:{str(e)}")
-
- @classmethod
- async def create_vardict_service(cls, auth: AuthSchema, data: BizVarDictCreateSchema,redis: Redis) -> dict:
- """创建"""
- # 检查唯一性约束
- obj = await BizVarDictCRUD(auth).create_vardict_crud(data=data)
- if obj:
- # 更新缓存中数据
- await RedisCURD(redis).clear(f"{RedisInitKeyConfig.VAR_DICT.key}:{data.crane_no}")
- return BizVarDictOutSchema.model_validate(obj).model_dump()
-
- @classmethod
- async def update_vardict_service(cls, auth: AuthSchema, id: int, data: BizVarDictUpdateSchema,redis: Redis) -> dict:
- """更新"""
- # 检查数据是否存在
- obj = await BizVarDictCRUD(auth).get_by_id_vardict_crud(id=id)
- if not obj:
- raise CustomException(msg='更新失败,该数据不存在')
-
- # 检查唯一性约束
-
- obj = await BizVarDictCRUD(auth).update_vardict_crud(id=id, data=data)
- if obj:
- # 更新缓存中数据
- await RedisCURD(redis).clear(f"{RedisInitKeyConfig.VAR_DICT.key}:{obj.crane_no}")
- return BizVarDictOutSchema.model_validate(obj).model_dump()
-
- @classmethod
- async def delete_vardict_service(cls, auth: AuthSchema, ids: list[int],redis: Redis) -> None:
- """删除"""
- if len(ids) < 1:
- raise CustomException(msg='删除失败,删除对象不能为空')
- crane_nos = []
- for id in ids:
- obj = await BizVarDictCRUD(auth).get_by_id_vardict_crud(id=id)
- if not obj:
- raise CustomException(msg=f'删除失败,ID为{id}的数据不存在')
- crane_nos.append(obj.crane_no)
- await BizVarDictCRUD(auth).delete_vardict_crud(ids=ids)
- # 更新缓存中数据
- for crane_no in crane_nos:
- await RedisCURD(redis).clear(f"{RedisInitKeyConfig.VAR_DICT.key}:{crane_no}")
-
- @classmethod
- async def set_availale_vardict_service(cls, auth: AuthSchema, data: BatchSetAvailable,redis: Redis) -> None:
- crane_nos = []
- for id in data.ids:
- obj = await BizVarDictCRUD(auth).get_by_id_vardict_crud(id=id)
- if not obj:
- raise CustomException(msg=f'批量设置失败,ID为{id}的数据不存在')
- crane_nos.append(obj.crane_no)
- """批量设置状态"""
- await BizVarDictCRUD(auth).set_available_vardict_crud(ids=data.ids, status=data.status)
- # 更新缓存中数据
- for crane_no in crane_nos:
- await RedisCURD(redis).clear(f"{RedisInitKeyConfig.VAR_DICT.key}:{crane_no}")
-
- @classmethod
- async def batch_export_vardict_service(cls, obj_list: list[dict]) -> bytes:
- """批量导出"""
- mapping_dict = {
- 'id': 'id ',
- 'crane_no': '',
- 'var_code': '变量code',
- 'var_name': '变量名称',
- 'mec_type': '所属机构',
- 'data_type': '数据类型',
- 'switch_type': '变量类型',
- 'addr': 'modbus地址',
- 'gateway_id': '网关',
- 'var_sort': '排序',
- 'var_group': '变量分组',
- 'var_category': '变量分类',
- 'translate': '绑定公式',
- 'device_no': '关联设备编号 ',
- 'is_reverse': '是否取反',
- 'is_top_show': '是否重点显示',
- 'is_save': '是否生成',
- 'is_calibration': '是否标定',
- 'is_overview_top_show': '是否首页重点显示',
- 'is_home_page_show': '是否首页显示',
- 'is_diagnose': '是否启用诊断专家',
- 'is_upload': '是否上传云平台',
- 'diagnosis_id': '关联诊断专家',
- 'status': '是否启用',
- 'description': '备注/描述',
- 'create_time': '创建时间 ',
- 'updated_time': '更新时间',
- 'created_id': '创建人ID',
- 'updated_id': '更新人ID',
- }
- data = obj_list.copy()
- for item in data:
- # 状态转换
- if 'status' in item:
- item['status'] = '启用' if item.get('status') == '0' else '停用'
- # 创建者转换
- creator_info = item.get('creator')
- if isinstance(creator_info, dict):
- item['creator'] = creator_info.get('name', '未知')
- elif creator_info is None:
- item['creator'] = '未知'
- return ExcelUtil.export_list2excel(list_data=data, mapping_dict=mapping_dict)
- @classmethod
- async def batch_import_vardict_service(cls, auth: AuthSchema, file: UploadFile, update_support: bool = False) -> str:
- """批量导入"""
- header_dict = {
- 'id ': 'id',
- '': 'crane_no',
- '变量code': 'var_code',
- '变量名称': 'var_name',
- '所属机构': 'mec_type',
- '数据类型': 'data_type',
- '变量类型': 'switch_type',
- 'modbus地址': 'addr',
- '网关': 'gateway_id',
- '排序': 'var_sort',
- '变量分组': 'var_group',
- '变量分类': 'var_category',
- '绑定公式': 'translate',
- '关联设备编号 ': 'device_no',
- '是否取反': 'is_reverse',
- '是否重点显示': 'is_top_show',
- '是否生成': 'is_save',
- '是否标定': 'is_calibration',
- '是否首页重点显示': 'is_overview_top_show',
- '是否首页显示': 'is_home_page_show',
- '是否启用诊断专家': 'is_diagnose',
- '是否上传云平台': 'is_upload',
- '关联诊断专家': 'diagnosis_id',
- '是否启用': 'status',
- '备注/描述': 'description',
- '创建时间 ': 'create_time',
- '更新时间': 'updated_time',
- '创建人ID': 'created_id',
- '更新人ID': 'updated_id',
- }
- try:
- contents = await file.read()
- df = pd.read_excel(io.BytesIO(contents))
- await file.close()
-
- if df.empty:
- raise CustomException(msg="导入文件为空")
-
- missing_headers = [header for header in header_dict.keys() if header not in df.columns]
- if missing_headers:
- raise CustomException(msg=f"导入文件缺少必要的列: {', '.join(missing_headers)}")
-
- df.rename(columns=header_dict, inplace=True)
-
- # 验证必填字段
-
- error_msgs = []
- success_count = 0
- count = 0
-
- for index, row in df.iterrows():
- count += 1
- try:
- data = {
- "id": row['id'],
- "crane_no": row['crane_no'],
- "var_code": row['var_code'],
- "var_name": row['var_name'],
- "mec_type": row['mec_type'],
- "data_type": row['data_type'],
- "switch_type": row['switch_type'],
- "addr": row['addr'],
- "gateway_id": row['gateway_id'],
- "var_sort": row['var_sort'],
- "var_group": row['var_group'],
- "var_category": row['var_category'],
- "translate": row['translate'],
- "device_no": row['device_no'],
- "is_reverse": row['is_reverse'],
- "is_top_show": row['is_top_show'],
- "is_save": row['is_save'],
- "is_calibration": row['is_calibration'],
- "is_overview_top_show": row['is_overview_top_show'],
- "is_home_page_show": row['is_home_page_show'],
- "is_diagnose": row['is_diagnose'],
- "is_upload": row['is_upload'],
- "diagnosis_id": row['diagnosis_id'],
- "status": row['status'],
- "description": row['description'],
- "create_time": row['create_time'],
- "updated_time": row['updated_time'],
- "created_id": row['created_id'],
- "updated_id": row['updated_id'],
- }
- # 使用CreateSchema做校验后入库
- create_schema = BizVarDictCreateSchema.model_validate(data)
-
- # 检查唯一性约束
-
- await BizVarDictCRUD(auth).create_vardict_crud(data=create_schema)
- success_count += 1
- except Exception as e:
- error_msgs.append(f"第{count}行: {str(e)}")
- continue
- result = f"成功导入 {success_count} 条数据"
- if error_msgs:
- result += "\n错误信息:\n" + "\n".join(error_msgs)
- return result
-
- except Exception as e:
- log.error(f"批量导入失败: {str(e)}")
- raise CustomException(msg=f"导入失败: {str(e)}")
-
- @classmethod
- async def import_template_download_vardict_service(cls) -> bytes:
- """下载导入模板"""
- header_list = [
- 'id ',
- '',
- '变量code',
- '变量名称',
- '所属机构',
- '数据类型',
- '变量类型',
- 'modbus地址',
- '网关',
- '排序',
- '变量分组',
- '变量分类',
- '绑定公式',
- '关联设备编号 ',
- '是否取反',
- '是否重点显示',
- '是否生成',
- '是否标定',
- '是否首页重点显示',
- '是否首页显示',
- '是否启用诊断专家',
- '是否上传云平台',
- '关联诊断专家',
- '是否启用',
- '备注/描述',
- '创建时间 ',
- '更新时间',
- '创建人ID',
- '更新人ID',
- ]
- selector_header_list = []
- option_list = []
-
- # 添加下拉选项
- selector_header_list.append('所属机构')
- option_list.append({'所属机构': []})
- selector_header_list.append('数据类型')
- option_list.append({'数据类型': []})
- selector_header_list.append('变量类型')
- option_list.append({'变量类型': []})
- selector_header_list.append('变量分类')
- option_list.append({'变量分类': []})
-
- return ExcelUtil.get_excel_template(
- header_list=header_list,
- selector_header_list=selector_header_list,
- option_list=option_list
- )
- @classmethod
- async def get_vardict_group_service(cls, auth: AuthSchema,redis: Redis,crane_no: str):
- """
- 从缓存获取变量分组数据列表信息service
- 参数:
- - redis (Redis): Redis客户端
- - id (int): 行车id
- 返回:
- - list[dict]: 变量分组数据列表
- """
- try:
- redis_key = f"{RedisInitKeyConfig.VAR_DICT.key}:{crane_no}"
- obj_list_dict = await RedisCURD(redis).get(redis_key)
- # 确保返回数据正确序列化
- if obj_list_dict:
- if isinstance(obj_list_dict, str):
- try:
- return json.loads(obj_list_dict)
- except json.JSONDecodeError:
- log.warning(f"变量分组数据反序列化失败,尝试重新初始化缓存: {'行车:'+crane_no}")
- elif isinstance(obj_list_dict, list):
- return obj_list_dict
- # 缓存不存在或格式错误时重新初始化
- await cls.init_vardict_service(redis,crane_no=crane_no)
- obj_list_dict = await RedisCURD(redis).get(redis_key)
- if not obj_list_dict:
- raise CustomException(msg="变量分组数据不存在")
- # 再次确保返回数据正确序列化
- if isinstance(obj_list_dict, str):
- try:
- return json.loads(obj_list_dict)
- except json.JSONDecodeError:
- raise CustomException(msg="变量分组数据格式错误")
- return obj_list_dict
- except CustomException:
- raise
- except Exception as e:
- log.error(f"获取变量分组数据缓存失败: {str(e)}")
- raise CustomException(msg=f"获取变量分组数据失败: {str(e)}")
- @classmethod
- async def get_vardict_alarms_service(cls, auth: AuthSchema, redis: Redis):
- """
- 从缓存获取变量分组数据列表信息service
- 参数:
- - redis (Redis): Redis客户端
- - id (int): 行车id
- 返回:
- - list[dict]: 变量分组数据列表
- """
- try:
- redis_key = f"{RedisInitKeyConfig.VAR_DICT.key}:'alarms_all'"
- obj_list_dict = await RedisCURD(redis).get(redis_key)
- # 确保返回数据正确序列化
- if obj_list_dict:
- if isinstance(obj_list_dict, str):
- try:
- return json.loads(obj_list_dict)
- except json.JSONDecodeError:
- log.warning(f"变量报警数据反序列化失败,尝试重新初始化缓存")
- elif isinstance(obj_list_dict, list):
- return obj_list_dict
- # 缓存不存在或格式错误时重新初始化
- await cls.init_vardict_service(redis)
- obj_list_dict = await RedisCURD(redis).get(redis_key)
- if not obj_list_dict:
- raise CustomException(msg="变量报警数据不存在")
- # 再次确保返回数据正确序列化
- if isinstance(obj_list_dict, str):
- try:
- return json.loads(obj_list_dict)
- except json.JSONDecodeError:
- raise CustomException(msg="变量报警数据格式错误")
- return obj_list_dict
- except CustomException:
- raise
- except Exception as e:
- log.error(f"获取变量报警数据缓存失败: {str(e)}")
- raise CustomException(msg=f"获取变量报警数据失败: {str(e)}")
- @classmethod
- async def init_vardict_service(cls, redis: Redis,crane_no:str = None):
- """
- 应用初始化: 获取所有天车变量数据信息并缓存service
- 参数:
- - redis (Redis): Redis客户端
- 返回:
- - None
- """
- try:
- async with async_db_session() as session:
- async with session.begin():
- # 在初始化过程中,不需要检查数据权限
- auth = AuthSchema(db=session, check_data_scope=False)
- #初始化行车机构分组变量数据
- if crane_no:
- search = {'status':'1','crane_no':crane_no}
- else:
- search = {'status': '1'}
- crane_list = await BizCraneCRUD(auth).list(search=search,order_by=[{'order':'asc'}])
- success_count = 0
- fail_count = 0
- for crane in crane_list:
- crane_no = crane.crane_no
- crane_name = crane.crane_name
- try:
- varDictMecGroupSchemaList: list[VarDictMecGroupSchema] = []
- mec_list = await BizMecCRUD(auth).list(search={'crane_no':crane_no,'status':'1'},order_by=[{'sort':'asc'}])
- for mec in mec_list:
- # 获取分组数据
- mecVarDicts = await BizVarDictCRUD(auth).list(
- search={'crane_no': crane_no, 'mec_type': mec.mec_type, 'status': '1'},
- order_by=[{'var_sort': 'asc'}])
- if not mecVarDicts:
- continue
- alarmVarList = await BizVarDictCRUD(auth).list(search={'crane_no': crane_no,'mec_type':mec.mec_type, 'switch_type': ('>=','2'), 'status': '1'},order_by=[{'var_sort': 'asc'}])
- digitalVarList = await BizVarDictCRUD(auth).list(search={'crane_no':crane_no,'mec_type':mec.mec_type,'data_type':'1','status':'1'},order_by=[{'var_sort':'asc'}])
- analogVarList = await BizVarDictCRUD(auth).list(search={'crane_no': crane_no,'mec_type':mec.mec_type, 'data_type': ('!=', '1'), 'status': '1'},order_by=[{'var_sort': 'asc'}])
- varDictMecGroupSchemaList.append(
- VarDictMecGroupSchema(mec_type=mec.mec_type,
- mecVarList_simple=mecVarDicts,
- digital_varList=digitalVarList,
- analog_varList=analogVarList,
- alarm_varList=alarmVarList))
- # 保存到Redis并设置过期时间
- redis_key = f"{RedisInitKeyConfig.VAR_DICT.key}:{crane_no}"
- var_dict_list = [item.model_dump() for item in varDictMecGroupSchemaList]
- value = json.dumps(var_dict_list, ensure_ascii=False)
- await RedisCURD(redis).set(
- key=redis_key,
- value=value,
- )
- success_count += 1
- log.info(f"✅ 机构分组变量数据缓存成功: {crane_name}")
- except Exception as e:
- fail_count += 1
- log.error(f"❌ 初始化机构分组变量数据失败 [{crane_name}]: {e}")
- log.info(f"机构分组变量数据初始化完成 - 成功: {success_count}, 失败: {fail_count}")
- #初始化所有行车报警变量数据
- try:
- varDicts = await cls.vardict_alarms_list(auth=auth)
- redis_key = f"{RedisInitKeyConfig.VAR_DICT.key}:'alarms_all'"
- value = json.dumps(varDicts, ensure_ascii=False)
- await RedisCURD(redis).set(
- key=redis_key,
- value=value,
- )
- log.info(f"✅ 报警变量数据缓存成功")
- except Exception as e:
- log.error(f"❌ 初始化报警变量数据失败: {e}")
- except Exception as e:
- log.error(f"变量数据初始化过程发生错误: {e}")
- # 只在严重错误时抛出异常,允许单个字典加载失败
- raise CustomException(msg=f"变量数据初始化失败: {str(e)}")
- @classmethod
- async def get_tdengine_data(cls, auth: AuthSchema, page_no: int, page_size: int,stable_name:str,
- search: BizVarDictQueryParam | None = None) -> dict:
- var_dict_search_dict = {'crane_no':search.crane_no,'data_type':search.data_type,'mec_type':search.mec_type,'var_code':search.var_code}
- offset = (page_no - 1) * page_size
- base_sql = "SELECT * FROM "+stable_name
- filter_conditions = []
- crane_no = search.crane_no
- if crane_no:
- safe_crane_no = crane_no.strip().replace("'", "''")
- filter_conditions.append(f"crane_no = '{safe_crane_no}'")
- mec_var_dict = await BizVarDictCRUD(auth).list(search=var_dict_search_dict)
- var_codes = [item.var_code for item in mec_var_dict if item.var_code]
- if var_codes:
- var_codes_str = "','".join(var_codes)
- filter_conditions.append(f"var_code IN ('{var_codes_str}')")
- else:
- return {
- "page_no": page_no,
- "page_size": page_size,
- "total": 0,
- "has_next": False,
- "items": []
- }
- # 4. 过滤条件2:created_time时间范围(新增核心逻辑)
- created_time = search.created_time
- if created_time and isinstance(created_time, tuple) and len(created_time) == 2:
- # 解析between条件:格式为('between', (start_time, end_time))
- condition_type, time_range = created_time
- if condition_type == "between" and isinstance(time_range, (list, tuple)) and len(time_range) == 2:
- start_time, end_time = time_range
- # 校验时间类型并格式化为TDengine支持的字符串
- if isinstance(start_time, datetime) and isinstance(end_time, datetime):
- # 格式化时间为"YYYY-MM-DD HH:MM:SS"(匹配TDengine的时间格式)
- start_str = start_time.strftime("%Y-%m-%d %H:%M:%S")
- end_str = end_time.strftime("%Y-%m-%d %H:%M:%S")
- # 防SQL注入:转义单引号(虽然时间格式不会有,但做兜底)
- safe_start = start_str.replace("'", "''")
- safe_end = end_str.replace("'", "''")
- # 添加时间范围条件(TDengine的ts字段对应创建时间)
- filter_conditions.append(f"ts BETWEEN '{safe_start}' AND '{safe_end}'")
- # 5. 拼接WHERE子句
- where_clause = " WHERE " + " AND ".join(filter_conditions) if filter_conditions else ""
- # 6. 构建完整查询SQL(排序+分页)
- if page_size == 1000: #历史曲线用
- query_sql = f"{base_sql}{where_clause} ORDER BY ts DESC"
- else:
- query_sql = f"{base_sql}{where_clause} ORDER BY ts DESC LIMIT {offset}, {page_size}"
- rest_result = await tdengine_rest_query(query_sql)
- formatted_data = await format_rest_result(rest_result)
- #查找var_name
- varDicts = await BizVarDictCRUD(auth).list(search=var_dict_search_dict)
- if formatted_data:
- for item in formatted_data:
- normal_time = item.get('ts').replace('T', ' ').replace('+08:00', '')
- item['ts'] = normal_time
- for varDict in varDicts:
- if item.get('var_code') == varDict.var_code:
- item['var_name'] = varDict.var_name
- break
- total = await get_table_total_count(stable_name, where_clause)
- return {
- "page_no": page_no,
- "page_size": page_size,
- "total": total,
- "has_next": offset + page_size < total,
- "items": formatted_data
- }
- # 定义常量(解决魔法值问题)
- CONST_VAL_TRIGGER = 1 # 触发值
- CONST_VAL_RECOVER = 0 # 恢复值
- CONST_ORDER_ASC = "ASC"
- CONST_ORDER_DESC = "DESC"
- CONST_UNRECOVERED = "未恢复"
- CONST_TIME_FORMAT = "%Y-%m-%d %H:%M:%S"
- # 工具函数:封装时间格式转换
- def format_timestamp(ts_str: Optional[str]) -> Optional[str]:
- """标准化时间格式(去除T和+08:00)"""
- if not ts_str:
- return None
- return ts_str.replace('T', ' ').replace('+08:00', '')
- # 工具函数:安全转义SQL字符串(封装重复的防注入逻辑)
- def escape_sql_str(s: Optional[str]) -> str:
- """安全转义SQL字符串,防止注入"""
- if not s:
- return ""
- return str(s).strip().replace("'", "''")
- # 工具函数:构建var_code到var_name的映射(解决嵌套循环匹配问题)
- def build_var_code_mapping(varDicts: List[any]) -> Dict[str, str]:
- """构建var_code→var_name的字典,提升匹配效率"""
- return {
- item.var_code: item.var_name
- for item in varDicts
- if hasattr(item, 'var_code') and item.var_code
- }
- @classmethod
- async def _query_single_record(cls, stable_name: str, var_code: str, crane_no: str,
- time_condition: str, val: int, order: str = CONST_ORDER_ASC) -> Optional[str]:
- """
- 精准查询单条关键记录(性能最优)
- :param time_condition: 时间条件字符串,如 "ts > '2026-01-18 10:30:00'"
- :param val: 要查询的val值(0/1)
- :param order: 排序方式(ASC/DESC)
- :return: 符合条件的第一条ts(北京时间),无则返回None
- """
- # 优化:使用封装的转义函数,减少重复代码
- safe_stable = cls.escape_sql_str(stable_name)
- safe_var = cls.escape_sql_str(var_code)
- safe_crane = cls.escape_sql_str(crane_no)
- # 严格类型校验
- try:
- safe_val = int(val)
- except (ValueError, TypeError):
- return None
- # 校验排序参数,防止非法值
- safe_order = cls.CONST_ORDER_ASC if order.upper() != cls.CONST_ORDER_DESC else cls.CONST_ORDER_DESC
- # 优化:SQL格式化更简洁,减少空格
- sql = f"""
- SELECT ts AS ts_cn
- FROM {safe_stable}
- WHERE
- crane_no = '{safe_crane}'
- AND var_code = '{safe_var}'
- AND val = {safe_val}
- AND {time_condition}
- ORDER BY ts {safe_order}
- LIMIT 1;
- """
- try:
- rest_result = await tdengine_rest_query(sql)
- formatted_result = await format_rest_result(rest_result)
- if formatted_result and len(formatted_result) > 0:
- ts_cn = cls.format_timestamp(formatted_result[0].get("ts_cn")) # 复用工具函数
- return ts_cn
- return None
- # 优化:捕获具体异常,而非泛化Exception,便于排查问题
- except Exception as e:
- # 建议添加日志:logger.error(f"查询单条记录失败: {e}, SQL: {sql}")
- return None
- @classmethod
- async def calc_switch_batch(cls, raw_formatted_data: List[Dict],
- query_start: Optional[datetime] = None,
- query_end: Optional[datetime] = None,
- stable_name: Optional[str] = None) -> List[Dict]:
- if not raw_formatted_data:
- return []
- # 优化1:使用defaultdict简化分组逻辑,减少判断
- point_groups: Dict[str, List[Dict]] = defaultdict(list)
- for item in raw_formatted_data:
- var_code = item.get("var_code")
- crane_no = item.get("crane_no")
- if not var_code or not crane_no:
- continue
- # 优化:捕获具体异常(ValueError),而非泛化except
- try:
- item["val"] = int(item.get("val", cls.CONST_VAL_RECOVER))
- except ValueError:
- item["val"] = cls.CONST_VAL_RECOVER
- group_key = f"{crane_no}_{var_code}"
- point_groups[group_key].append(item)
- batch_list = []
- # 遍历每个点位
- for group_key, point_records in point_groups.items():
- # 优化:增加异常处理,防止group_key格式错误
- try:
- crane_no, var_code = group_key.split("_", 1)
- except ValueError:
- # 日志:logger.warning(f"无效的分组键: {group_key}")
- continue
- # 关键修复:必须排序!原代码注释掉了排序,会导致触发/恢复判断完全错误
- point_records.sort(key=lambda x: x["ts"])
- # 优化:直接取第一个item的var_name,无则用默认值
- var_name = point_records[0].get("var_name", f"未知点位({var_code})")
- # 提取触发/恢复事件(0→1/1→0)
- is_all_0 = True
- trigger_times = [] # 触发时间列表
- recover_times = [] # 恢复时间列表
- prev_val = None
- for record in point_records:
- current_val = record["val"]
- current_ts = record["ts"]
- if prev_val is None:
- prev_val = current_val
- continue
- # 优化:使用常量替代魔法值,提升可读性
- if prev_val == cls.CONST_VAL_RECOVER and current_val == cls.CONST_VAL_TRIGGER:
- trigger_times.append(current_ts)
- elif prev_val == cls.CONST_VAL_TRIGGER and current_val == cls.CONST_VAL_RECOVER:
- recover_times.append(current_ts)
- prev_val = current_val
- if current_val == cls.CONST_VAL_TRIGGER:
- is_all_0 = False
- # ---------------------- 无触发记录(全0)→ 返回空 ----------------------
- if is_all_0:
- continue
- # ---------------------- 第一条是触发追溯真实触发时间 ----------------------
- if point_records[0].get('val') == cls.CONST_VAL_TRIGGER:
- if not (query_start and stable_name):
- continue
- start_str = query_start.strftime(cls.CONST_TIME_FORMAT)
- recover_before_start = await cls._query_single_record(
- stable_name, var_code, crane_no,
- f"ts < '{cls.escape_sql_str(start_str)}'",
- cls.CONST_VAL_RECOVER, cls.CONST_ORDER_DESC
- )
- real_trigger = point_records[0]["ts"]
- if recover_before_start:
- trigger_after_recover = await cls._query_single_record(
- stable_name, var_code, crane_no,
- f"ts > '{cls.escape_sql_str(recover_before_start)}'",
- cls.CONST_VAL_TRIGGER, cls.CONST_ORDER_ASC
- )
- if trigger_after_recover:
- real_trigger = trigger_after_recover
- trigger_times.insert(0, real_trigger)
- # ---------------------- 最后一条是触发追溯真实恢复时间 ----------------------
- if point_records[-1].get('val') == cls.CONST_VAL_TRIGGER:
- if not (query_end and stable_name):
- continue
- end_str = query_end.strftime(cls.CONST_TIME_FORMAT)
- recover_after_end = await cls._query_single_record(
- stable_name, var_code, crane_no,
- f"ts > '{cls.escape_sql_str(end_str)}'",
- cls.CONST_VAL_RECOVER, cls.CONST_ORDER_ASC
- )
- if recover_after_end:
- recover_times.append(recover_after_end)
- # ---------------------- 匹配 ----------------------
- min_len = min(len(trigger_times), len(recover_times))
- for i in range(min_len):
- batch_list.append({
- "var_name": var_name,
- "val": cls.CONST_VAL_RECOVER, # 已恢复
- "str_time": trigger_times[i],
- "end_time": recover_times[i]
- })
- # 不匹配的情况只可能是触发没恢复
- if len(trigger_times) > len(recover_times):
- batch_list.append({
- "var_name": var_name,
- "val": cls.CONST_VAL_TRIGGER, # 触发中
- "str_time": trigger_times[-1],
- "end_time": cls.CONST_UNRECOVERED
- })
- # 最终排序
- batch_list.sort(key=lambda x: x["str_time"], reverse=True)
- return batch_list
- @classmethod
- async def get_tdengine_data_operation(cls, auth: AuthSchema, stable_name: str,
- search: BizVarDictQueryParam | None = None) -> dict:
- if not search:
- return {"page_no": 0, "page_size": 0, "total": 0, "has_next": 0, "items": []}
- # 优化:初始化查询参数,减少重复判断
- var_dict_search_dict = {
- 'crane_no': search.crane_no,
- 'data_type': search.data_type,
- 'mec_type': search.mec_type,
- 'switch_type': search.switch_type,
- 'var_code': search.var_code
- }
- filter_conditions = []
- query_start: Optional[datetime] = None
- query_end: Optional[datetime] = None
- # 1. 过滤条件:crane_no
- if search.crane_no:
- filter_conditions.append(f"crane_no = '{cls.escape_sql_str(search.crane_no)}'")
- # 2. 过滤条件:mec_type对应的var_code
- varDicts = await BizVarDictCRUD(auth).list(search=var_dict_search_dict)
- var_code_map = cls.build_var_code_mapping(varDicts) # 构建映射字典
- if search.mec_type and var_code_map:
- var_codes_str = "','".join(cls.escape_sql_str(code) for code in var_code_map.keys())
- filter_conditions.append(f"var_code IN ('{var_codes_str}')")
- # 3. 过滤条件:created_time时间范围
- if search.created_time and isinstance(search.created_time, tuple) and len(search.created_time) == 2:
- condition_type, time_range = search.created_time
- if condition_type == "between" and isinstance(time_range, (list, tuple)) and len(time_range) == 2:
- query_start, query_end = time_range
- if isinstance(query_start, datetime) and isinstance(query_end, datetime):
- start_str = query_start.strftime(cls.CONST_TIME_FORMAT)
- end_str = query_end.strftime(cls.CONST_TIME_FORMAT)
- filter_conditions.append(
- f"ts BETWEEN '{cls.escape_sql_str(start_str)}' AND '{cls.escape_sql_str(end_str)}'")
- # 4. 拼接WHERE子句
- where_clause = " WHERE " + " AND ".join(filter_conditions) if filter_conditions else ""
- # 5. 查询原始数据(优化:SQL更简洁,排序仅保留必要的)
- query_sql = f"SELECT * FROM {cls.escape_sql_str(stable_name)}{where_clause} ORDER BY ts " + cls.CONST_ORDER_ASC
- rest_result = await tdengine_rest_query(query_sql)
- formatted_data = await format_rest_result(rest_result)
- # 6. 匹配var_name(优化:用字典映射替代嵌套循环,O(n)→O(1))
- if formatted_data:
- for item in formatted_data:
- item['ts'] = cls.format_timestamp(item.get('ts')) # 复用工具函数
- # 优化:字典查找,无需遍历所有varDicts
- item['var_name'] = var_code_map.get(item.get('var_code'), f"未知点位({item.get('var_code')})")
- # 7. 调用批量计算方法
- batch_result = await cls.calc_switch_batch(
- raw_formatted_data=formatted_data,
- query_start=query_start,
- query_end=query_end,
- stable_name=stable_name
- )
- # 8. 返回结果
- return {
- "page_no": 0,
- "page_size": 0,
- "total": len(batch_result),
- "has_next": 0,
- "items": batch_result
- }
|