|
|
@@ -519,7 +519,7 @@ class BizVarDictService:
|
|
|
log.error(f"获取变量报警数据缓存失败: {str(e)}")
|
|
|
raise CustomException(msg=f"获取变量报警数据失败: {str(e)}")
|
|
|
@classmethod
|
|
|
- async def init_vardict_service(cls, redis: Redis,crane_no:str = None):
|
|
|
+ async def init_vardict_service1(cls, redis: Redis,crane_no:str = None):
|
|
|
"""
|
|
|
应用初始化: 获取所有天车变量数据信息并缓存service
|
|
|
|
|
|
@@ -597,6 +597,128 @@ class BizVarDictService:
|
|
|
# 只在严重错误时抛出异常,允许单个字典加载失败
|
|
|
raise CustomException(msg=f"变量数据初始化失败: {str(e)}")
|
|
|
|
|
|
+ import asyncio
|
|
|
+ from typing import Dict, List
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ async def init_vardict_service(cls, redis: Redis, crane_no: str = None):
|
|
|
+ """
|
|
|
+ 应用初始化: 获取所有天车变量数据信息并缓存service(性能优化版)
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ - redis (Redis): Redis客户端
|
|
|
+ - crane_no: 指定天车编号,为空则处理所有
|
|
|
+
|
|
|
+ 返回:
|
|
|
+ - None
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ async with async_db_session() as session:
|
|
|
+ async with session.begin():
|
|
|
+ auth = AuthSchema(db=session, check_data_scope=False)
|
|
|
+ # 1. 批量查询所有天车基础数据(原逻辑不变)
|
|
|
+ search = {'status': '1'}
|
|
|
+ if crane_no:
|
|
|
+ search['crane_no'] = crane_no
|
|
|
+ crane_list = await BizCraneCRUD(auth).list(
|
|
|
+ search=search,
|
|
|
+ order_by=[{'order': 'asc'}]
|
|
|
+ )
|
|
|
+ success_count = 0
|
|
|
+ fail_count = 0
|
|
|
+
|
|
|
+ # 2. 并发处理每个crane的变量数据(核心优化:并发替代串行)
|
|
|
+ # 定义单个crane的处理函数
|
|
|
+ async def process_single_crane(crane):
|
|
|
+ nonlocal success_count, fail_count
|
|
|
+ crane_no = crane.crane_no
|
|
|
+ crane_name = crane.crane_name
|
|
|
+ try:
|
|
|
+ # 2.1 批量查询当前crane下的所有机构数据(一次查询)
|
|
|
+ mec_list = await BizMecCRUD(auth).list(
|
|
|
+ search={'crane_no': crane_no, 'status': '1'},
|
|
|
+ order_by=[{'sort': 'asc'}]
|
|
|
+ )
|
|
|
+ if not mec_list:
|
|
|
+ log.info(f"⚠️ {crane_name} 无机构数据,跳过缓存")
|
|
|
+ return
|
|
|
+
|
|
|
+ # 2.2 批量查询当前crane下的所有VarDict数据(关键优化:1次查询替代4×mec_count次)
|
|
|
+ var_dicts_all = await BizVarDictCRUD(auth).list(
|
|
|
+ search={'crane_no': crane_no, 'status': '1'},
|
|
|
+ order_by=[{'var_sort': 'asc'}]
|
|
|
+ )
|
|
|
+ # 构建mec_type到var数据的映射(内存分类,替代多次查询)
|
|
|
+ var_dict_by_mec: Dict[str, List] = {}
|
|
|
+ for var in var_dicts_all:
|
|
|
+ mec_type = str(var.mec_type)
|
|
|
+ if mec_type not in var_dict_by_mec:
|
|
|
+ var_dict_by_mec[mec_type] = []
|
|
|
+ var_dict_by_mec[mec_type].append(var)
|
|
|
+
|
|
|
+ # 2.3 遍历机构,内存分类数据
|
|
|
+ varDictMecGroupSchemaList: list[VarDictMecGroupSchema] = []
|
|
|
+ for mec in mec_list:
|
|
|
+ mec_type = mec.mec_type
|
|
|
+ # 获取当前机构的所有var数据(无则跳过)
|
|
|
+ mec_vars = var_dict_by_mec.get(mec_type, [])
|
|
|
+ if not mec_vars:
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 内存中按条件分类(替代4次数据库查询)
|
|
|
+ alarmVarList = [v for v in mec_vars if v.data_type == 1 and v.switch_type >= 2 ]
|
|
|
+ digitalVarList = [v for v in mec_vars if v.data_type == 1]
|
|
|
+ analogVarList = [v for v in mec_vars if v.data_type != 1]
|
|
|
+
|
|
|
+ varDictMecGroupSchemaList.append(
|
|
|
+ VarDictMecGroupSchema(
|
|
|
+ mec_type=mec_type,
|
|
|
+ mecVarList_simple=mec_vars, # 原mecVarDicts就是当前mec的所有var
|
|
|
+ digital_varList=digitalVarList,
|
|
|
+ analog_varList=analogVarList,
|
|
|
+ alarm_varList=alarmVarList
|
|
|
+ )
|
|
|
+ )
|
|
|
+
|
|
|
+ # 2.4 写入Redis(原逻辑不变)
|
|
|
+ redis_key = f"{RedisInitKeyConfig.VAR_DICT.key}:{crane_no}"
|
|
|
+ var_dict_list = [item.model_dump() for item in varDictMecGroupSchemaList]
|
|
|
+ value = json.dumps(var_dict_list, ensure_ascii=False)
|
|
|
+ await RedisCURD(redis).set(key=redis_key, value=value)
|
|
|
+
|
|
|
+ success_count += 1
|
|
|
+ log.info(f"✅ 机构分组变量数据缓存成功: {crane_name}")
|
|
|
+ except Exception as e:
|
|
|
+ fail_count += 1
|
|
|
+ log.error(f"❌ 初始化机构分组变量数据失败 [{crane_name}]: {e}")
|
|
|
+
|
|
|
+ # 3. 并发执行所有crane的处理(控制并发数,避免数据库连接池耗尽)
|
|
|
+ # 建议并发数=数据库连接池大小(比如10),防止连接数超限
|
|
|
+ semaphore = asyncio.Semaphore(10)
|
|
|
+
|
|
|
+ async def bounded_process(crane):
|
|
|
+ async with semaphore:
|
|
|
+ await process_single_crane(crane)
|
|
|
+
|
|
|
+ # 并发执行
|
|
|
+ await asyncio.gather(*[bounded_process(crane) for crane in crane_list])
|
|
|
+
|
|
|
+ log.info(f"机构分组变量数据初始化完成 - 成功: {success_count}, 失败: {fail_count}")
|
|
|
+
|
|
|
+ # 4. 初始化所有行车报警变量数据(原逻辑不变,可按需优化)
|
|
|
+ try:
|
|
|
+ varDicts = await cls.vardict_alarms_list(auth=auth)
|
|
|
+ redis_key = f"{RedisInitKeyConfig.VAR_DICT.key}:alarms_all"
|
|
|
+ value = json.dumps(varDicts, ensure_ascii=False)
|
|
|
+ await RedisCURD(redis).set(key=redis_key, value=value)
|
|
|
+ log.info(f"✅ 报警变量数据缓存成功")
|
|
|
+ except Exception as e:
|
|
|
+ log.error(f"❌ 初始化报警变量数据失败: {e}")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ log.error(f"变量数据初始化过程发生错误: {e}")
|
|
|
+ raise CustomException(msg=f"变量数据初始化失败: {str(e)}")
|
|
|
+
|
|
|
@classmethod
|
|
|
async def get_tdengine_data(cls, auth: AuthSchema, page_no: int, page_size: int,stable_name:str,
|
|
|
search: BizVarDictQueryParam | None = None) -> dict:
|