# -*- coding: utf-8 -*- from typing import Set import httpx from fastapi import APIRouter, Depends, UploadFile, Body, Path, Query from fastapi.responses import StreamingResponse, JSONResponse from redis.asyncio.client import Redis from app.common.response import SuccessResponse, StreamResponse from app.config.setting import settings from app.core.dependencies import AuthPermission, redis_getter from app.api.v1.module_system.auth.schema import AuthSchema from app.core.base_params import PaginationQueryParam from app.utils.common_util import bytes2file_response from app.core.logger import log from app.core.base_schema import BatchSetAvailable from .service import BizVarDictService from .schema import BizVarDictCreateSchema, BizVarDictUpdateSchema, BizVarDictQueryParam from ..crane.service import BizCraneService BizVarDictRouter = APIRouter(prefix='/vardict', tags=["变量信息模块"]) @BizVarDictRouter.get("/detail/{id}", summary="获取变量信息详情", description="获取变量信息详情") async def get_vardict_detail_controller( id: int = Path(..., description="ID"), auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:query"])) ) -> JSONResponse: """获取变量信息详情接口""" result_dict = await BizVarDictService.detail_vardict_service(auth=auth, id=id) log.info(f"获取变量信息详情成功 {id}") return SuccessResponse(data=result_dict, msg="获取变量信息详情成功") @BizVarDictRouter.get("/list", summary="查询变量信息列表", description="查询变量信息列表") async def get_vardict_list_controller( page: PaginationQueryParam = Depends(), search: BizVarDictQueryParam = Depends(), auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:query"])) ) -> JSONResponse: """查询变量信息列表接口(数据库分页)""" result_dict = await BizVarDictService.page_vardict_service( auth=auth, page_no=page.page_no if page.page_no is not None else 1, page_size=page.page_size if page.page_size is not None else 10, search=search, order_by=page.order_by ) log.info("查询变量信息列表成功") return SuccessResponse(data=result_dict, msg="查询变量信息列表成功") @BizVarDictRouter.post("/create", summary="创建变量信息", description="创建变量信息") async def create_vardict_controller( data: BizVarDictCreateSchema, redis: Redis = Depends(redis_getter), auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:create"])) ) -> JSONResponse: """创建变量信息接口""" result_dict = await BizVarDictService.create_vardict_service(auth=auth, data=data,redis=redis) log.info("创建变量信息成功") return SuccessResponse(data=result_dict, msg="创建变量信息成功") @BizVarDictRouter.put("/update/{id}", summary="修改变量信息", description="修改变量信息") async def update_vardict_controller( data: BizVarDictUpdateSchema, id: int = Path(..., description="ID"), redis: Redis = Depends(redis_getter), auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:update"])) ) -> JSONResponse: """修改变量信息接口""" result_dict = await BizVarDictService.update_vardict_service(auth=auth, id=id, data=data,redis=redis) log.info("修改变量信息成功") return SuccessResponse(data=result_dict, msg="修改变量信息成功") @BizVarDictRouter.delete("/delete", summary="删除变量信息", description="删除变量信息") async def delete_vardict_controller( ids: list[int] = Body(..., description="ID列表"), redis: Redis = Depends(redis_getter), auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:delete"])) ) -> JSONResponse: """删除变量信息接口""" await BizVarDictService.delete_vardict_service(auth=auth, ids=ids,redis=redis) log.info(f"删除变量信息成功: {ids}") return SuccessResponse(msg="删除变量信息成功") @BizVarDictRouter.patch("/available/setting", summary="批量修改变量信息状态", description="批量修改变量信息状态") async def batch_set_available_vardict_controller( data: BatchSetAvailable, auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:patch"])) ) -> JSONResponse: """批量修改变量信息状态接口""" await BizVarDictService.set_available_vardict_service(auth=auth, data=data) log.info(f"批量修改变量信息状态成功: {data.ids}") return SuccessResponse(msg="批量修改变量信息状态成功") @BizVarDictRouter.post('/export', summary="导出变量信息", description="导出变量信息") async def export_vardict_list_controller( search: BizVarDictQueryParam = Depends(), auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:export"])) ) -> StreamingResponse: """导出变量信息接口""" result_dict_list = await BizVarDictService.list_vardict_service(search=search, auth=auth) export_result = await BizVarDictService.batch_export_vardict_service(obj_list=result_dict_list) log.info('导出变量信息成功') return StreamResponse( data=bytes2file_response(export_result), media_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', headers={ 'Content-Disposition': 'attachment; filename=biz_var_dict.xlsx' } ) @BizVarDictRouter.post('/import', summary="导入变量信息", description="导入变量信息") async def import_vardict_list_controller( file: UploadFile, auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:import"])) ) -> JSONResponse: """导入变量信息接口""" batch_import_result = await BizVarDictService.batch_import_vardict_service(file=file, auth=auth, update_support=True) log.info("导入变量信息成功") return SuccessResponse(data=batch_import_result, msg="导入变量信息成功") @BizVarDictRouter.post('/download/template', summary="获取变量信息导入模板", description="获取变量信息导入模板", dependencies=[Depends(AuthPermission(["module_business:vardict:download"]))]) async def export_vardict_template_controller() -> StreamingResponse: """获取变量信息导入模板接口""" import_template_result = await BizVarDictService.import_template_download_vardict_service() log.info('获取变量信息导入模板成功') return StreamResponse( data=bytes2file_response(import_template_result), media_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', headers={'Content-Disposition': 'attachment; filename=biz_var_dict_template.xlsx'} ) @BizVarDictRouter.get("/list_alarms", summary="查询变量信息列表", description="查询变量信息列表") async def get_vardict_list_alarms_controller( search: BizVarDictQueryParam = Depends(), redis: Redis = Depends(redis_getter), auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:query"])) ) -> JSONResponse: if search.crane_no: #带crane_no说明是单台车报警直接查数据库即可 result_dict = await BizVarDictService.vardict_alarms_list(auth=auth, search=search) try: async with httpx.AsyncClient() as client: response = await client.get( url=settings.COLLECT_DATA_FULL, params={}, timeout=2 ) # 捕获HTTP状态码非200的情况 response.raise_for_status() # 捕获JSON解析异常 json_data = response.json() if json_data['code'] == 200 and json_data.get('data'): for item in result_dict: item['value'] = False crane_no = item['crane_no'] alarm = json_data.get('data').get(crane_no, {}).get('data', {}).get('alarm', {}).get( item['var_code']) if alarm: item['value'] = alarm.get('value') except Exception as e: log.error(f"调用数据初始化接口获取报警列表时发生未知异常:{str(e)}", exc_info=True) else: #全部报警点位查缓存 result_dict = await BizVarDictService.get_vardict_alarms_service(auth=auth, redis=redis) cranes = await BizCraneService.list_crane_service(auth=auth) valid_crane_nos: Set[str] = set() for crane_dict in cranes: if crane_dict.get('status') == '0': continue crane_no = crane_dict.get('crane_no') if crane_no and isinstance(crane_no, str): valid_crane_nos.add(crane_no) try: async with httpx.AsyncClient() as client: response = await client.get( url=settings.COLLECT_DATA_FULL, params={}, timeout=2 ) # 捕获HTTP状态码非200的情况 response.raise_for_status() # 捕获JSON解析异常 json_data = response.json() filtered_result = [] for item in result_dict: item_crane_no = item.get('crane_no') if item_crane_no not in valid_crane_nos: continue item['value'] = False # 默认值 if json_data and json_data['code'] == 200 and json_data.get('data'): alarm = json_data.get('data').get(item_crane_no, {}).get('data', {}).get('alarm', {}).get( item.get('var_code')) if alarm: item['value'] = alarm.get('value') filtered_result.append(item) result_dict = filtered_result except Exception as e: log.error(f"调用数据初始化接口获取报警列表时发生未知异常:{str(e)}", exc_info=True) log.info("查询变量信息列表成功") return SuccessResponse(data=result_dict, msg="查询变量信息列表成功") @BizVarDictRouter.get("/list_analog", summary="查询变量信息列表", description="查询变量信息列表") async def get_vardict_list_analog_controller( search: BizVarDictQueryParam = Depends(), auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:query"])) ) -> JSONResponse: """查询变量信息列表接口(数据库分页)""" result_dict = await BizVarDictService.vardict_analog_list(auth=auth,search=search) log.info("查询变量信息列表成功") return SuccessResponse(data=result_dict, msg="查询变量信息列表成功") @BizVarDictRouter.get("/list_operation_record", summary="查询操作点位变量信息列表", description="查询操作点位变量信息列表") async def get_vardict_list_operation_record_controller( search: BizVarDictQueryParam = Depends(), auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:query"])) ) -> JSONResponse: """查询变量信息列表接口(数据库分页)""" result_dict = await BizVarDictService.vardict_operation_record_list(auth=auth,search=search) log.info("查询操作点位变量信息列表成功") return SuccessResponse(data=result_dict, msg="查询操作点位变量信息列表成功") @BizVarDictRouter.get("/varDictMecGroup/{crane_no}", summary="获取变量信息分组数据", description="获取变量信息分组数据") async def get_vardict_mec_group_controller( crane_no: str = Path(..., description="crane_no"), redis: Redis = Depends(redis_getter), auth: AuthSchema = Depends(AuthPermission(["module_business:crane:query"])) ) -> JSONResponse: result_dict = await BizVarDictService.get_vardict_group_service( redis=redis, crane_no=crane_no,auth=auth ) if not result_dict: log.info(f"获取变量信息分组数据成功:{result_dict}") return SuccessResponse(data=result_dict, msg="获取变量分组数据成功") #请求采集接口获取状态信息 try: async with httpx.AsyncClient() as client: response = await client.get( url=settings.COLLECT_DATA_FULL, params={}, timeout=2 ) # 捕获HTTP状态码非200的情况 response.raise_for_status() # 捕获JSON解析异常 json_data = response.json() if json_data['code'] == 200 and json_data.get('data'): json_analog = json_data.get('data').get(crane_no).get('data').get('analog') json_digital = json_data.get('data').get(crane_no).get('data').get('digital') for var_dict in result_dict: for key, inner_dict in var_dict.items(): if key != 'mec_type' and key != 'alarm_varList' and key != 'mecVarList_simple': for item in inner_dict: if key == 'digital_varList': if json_digital: item['value'] = json_digital.get(item.get('var_code')).get('value') else: if json_analog: item['value'] = json_analog.get(item.get('var_code')).get('value') except Exception as e: log.error(f"调用采集接口获取分组变量状态时发生未知异常:{str(e)}", exc_info=True) log.info(f"获取变量信息分组数据成功:{result_dict}") return SuccessResponse(data=result_dict, msg="获取变量分组数据成功") @BizVarDictRouter.get("/historyData", summary="查询历史数据列表", description="查询历史数据列表") async def get_vardict_historyData_controller( page: PaginationQueryParam = Depends(), search: BizVarDictQueryParam = Depends(), auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:query"])) ) -> JSONResponse: search.data_type = ('!=', 1) result_dict = await BizVarDictService.get_tdengine_data( search=search, stable_name='st_analog', page_no=page.page_no if page.page_no is not None else 1, page_size=page.page_size if page.page_size is not None else 10, auth=auth ) log.info("查询历史数据列表成功") return SuccessResponse(data=result_dict, msg="查询历史数据列表成功") @BizVarDictRouter.get("/operationRecord", summary="查询操作记录列表", description="查询操作记录列表") async def get_vardict_operationRecord_controller( search: BizVarDictQueryParam = Depends(), auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:query"])) ) -> JSONResponse: #过滤操作记录 search.switch_type = ('<=',1) result_dict = await BizVarDictService.get_tdengine_data_operation( search=search, stable_name='st_digital', auth=auth ) log.info("查询操作记录列表成功") return SuccessResponse(data=result_dict, msg="查询操作记录列表成功") @BizVarDictRouter.get("/historyAlarm", summary="查询报警记录列表", description="查询报警记录列表") async def get_vardict_historyAlarm_controller( search: BizVarDictQueryParam = Depends(), auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:query"])) ) -> JSONResponse: result_dict = await BizVarDictService.get_tdengine_data_operation( search=search, stable_name='st_alarm', auth=auth ) log.info("查询报警记录列表成功") return SuccessResponse(data=result_dict, msg="查询报警记录列表成功")