| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312 |
- # -*- coding: utf-8 -*-
- from typing import Set
- import httpx
- from fastapi import APIRouter, Depends, UploadFile, Body, Path, Query
- from fastapi.responses import StreamingResponse, JSONResponse
- from redis.asyncio.client import Redis
- from app.common.response import SuccessResponse, StreamResponse
- from app.config.setting import settings
- from app.core.dependencies import AuthPermission, redis_getter
- from app.api.v1.module_system.auth.schema import AuthSchema
- from app.core.base_params import PaginationQueryParam
- from app.utils.common_util import bytes2file_response
- from app.core.logger import log
- from app.core.base_schema import BatchSetAvailable
- from .service import BizVarDictService
- from .schema import BizVarDictCreateSchema, BizVarDictUpdateSchema, BizVarDictQueryParam
- from ..crane.service import BizCraneService
- BizVarDictRouter = APIRouter(prefix='/vardict', tags=["变量信息模块"])
- @BizVarDictRouter.get("/detail/{id}", summary="获取变量信息详情", description="获取变量信息详情")
- async def get_vardict_detail_controller(
- id: int = Path(..., description="ID"),
- auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:query"]))
- ) -> JSONResponse:
- """获取变量信息详情接口"""
- result_dict = await BizVarDictService.detail_vardict_service(auth=auth, id=id)
- log.info(f"获取变量信息详情成功 {id}")
- return SuccessResponse(data=result_dict, msg="获取变量信息详情成功")
- @BizVarDictRouter.get("/list", summary="查询变量信息列表", description="查询变量信息列表")
- async def get_vardict_list_controller(
- page: PaginationQueryParam = Depends(),
- search: BizVarDictQueryParam = Depends(),
- auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:query"]))
- ) -> JSONResponse:
- """查询变量信息列表接口(数据库分页)"""
- result_dict = await BizVarDictService.page_vardict_service(
- auth=auth,
- page_no=page.page_no if page.page_no is not None else 1,
- page_size=page.page_size if page.page_size is not None else 10,
- search=search,
- order_by=page.order_by
- )
- log.info("查询变量信息列表成功")
- return SuccessResponse(data=result_dict, msg="查询变量信息列表成功")
- @BizVarDictRouter.post("/create", summary="创建变量信息", description="创建变量信息")
- async def create_vardict_controller(
- data: BizVarDictCreateSchema,
- redis: Redis = Depends(redis_getter),
- auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:create"]))
- ) -> JSONResponse:
- """创建变量信息接口"""
- result_dict = await BizVarDictService.create_vardict_service(auth=auth, data=data,redis=redis)
- log.info("创建变量信息成功")
- return SuccessResponse(data=result_dict, msg="创建变量信息成功")
- @BizVarDictRouter.put("/update/{id}", summary="修改变量信息", description="修改变量信息")
- async def update_vardict_controller(
- data: BizVarDictUpdateSchema,
- id: int = Path(..., description="ID"),
- redis: Redis = Depends(redis_getter),
- auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:update"]))
- ) -> JSONResponse:
- """修改变量信息接口"""
- result_dict = await BizVarDictService.update_vardict_service(auth=auth, id=id, data=data,redis=redis)
- log.info("修改变量信息成功")
- return SuccessResponse(data=result_dict, msg="修改变量信息成功")
- @BizVarDictRouter.delete("/delete", summary="删除变量信息", description="删除变量信息")
- async def delete_vardict_controller(
- ids: list[int] = Body(..., description="ID列表"),
- redis: Redis = Depends(redis_getter),
- auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:delete"]))
- ) -> JSONResponse:
- """删除变量信息接口"""
- await BizVarDictService.delete_vardict_service(auth=auth, ids=ids,redis=redis)
- log.info(f"删除变量信息成功: {ids}")
- return SuccessResponse(msg="删除变量信息成功")
- @BizVarDictRouter.patch("/available/setting", summary="批量修改变量信息状态", description="批量修改变量信息状态")
- async def batch_set_available_vardict_controller(
- data: BatchSetAvailable,
- auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:patch"]))
- ) -> JSONResponse:
- """批量修改变量信息状态接口"""
- await BizVarDictService.set_available_vardict_service(auth=auth, data=data)
- log.info(f"批量修改变量信息状态成功: {data.ids}")
- return SuccessResponse(msg="批量修改变量信息状态成功")
- @BizVarDictRouter.post('/export', summary="导出变量信息", description="导出变量信息")
- async def export_vardict_list_controller(
- search: BizVarDictQueryParam = Depends(),
- auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:export"]))
- ) -> StreamingResponse:
- """导出变量信息接口"""
- result_dict_list = await BizVarDictService.list_vardict_service(search=search, auth=auth)
- export_result = await BizVarDictService.batch_export_vardict_service(obj_list=result_dict_list)
- log.info('导出变量信息成功')
- return StreamResponse(
- data=bytes2file_response(export_result),
- media_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
- headers={
- 'Content-Disposition': 'attachment; filename=biz_var_dict.xlsx'
- }
- )
- @BizVarDictRouter.post('/import', summary="导入变量信息", description="导入变量信息")
- async def import_vardict_list_controller(
- file: UploadFile,
- auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:import"]))
- ) -> JSONResponse:
- """导入变量信息接口"""
- batch_import_result = await BizVarDictService.batch_import_vardict_service(file=file, auth=auth, update_support=True)
- log.info("导入变量信息成功")
-
- return SuccessResponse(data=batch_import_result, msg="导入变量信息成功")
- @BizVarDictRouter.post('/download/template', summary="获取变量信息导入模板", description="获取变量信息导入模板", dependencies=[Depends(AuthPermission(["module_business:vardict:download"]))])
- async def export_vardict_template_controller() -> StreamingResponse:
- """获取变量信息导入模板接口"""
- import_template_result = await BizVarDictService.import_template_download_vardict_service()
- log.info('获取变量信息导入模板成功')
- return StreamResponse(
- data=bytes2file_response(import_template_result),
- media_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
- headers={'Content-Disposition': 'attachment; filename=biz_var_dict_template.xlsx'}
- )
- @BizVarDictRouter.get("/list_alarms", summary="查询变量信息列表", description="查询变量信息列表")
- async def get_vardict_list_alarms_controller(
- search: BizVarDictQueryParam = Depends(),
- redis: Redis = Depends(redis_getter),
- auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:query"]))
- ) -> JSONResponse:
- if search.crane_no: #带crane_no说明是单台车报警直接查数据库即可
- result_dict = await BizVarDictService.vardict_alarms_list(auth=auth, search=search)
- try:
- async with httpx.AsyncClient() as client:
- response = await client.get(
- url=settings.COLLECT_DATA_FULL,
- params={},
- timeout=2
- )
- # 捕获HTTP状态码非200的情况
- response.raise_for_status()
- # 捕获JSON解析异常
- json_data = response.json()
- if json_data['code'] == 200 and json_data.get('data'):
- for item in result_dict:
- item['value'] = False
- crane_no = item['crane_no']
- alarm = json_data.get('data').get(crane_no, {}).get('data', {}).get('alarm', {}).get(
- item['var_code'])
- if alarm:
- item['value'] = alarm.get('value')
- except Exception as e:
- log.error(f"调用数据初始化接口获取报警列表时发生未知异常:{str(e)}", exc_info=True)
- else: #全部报警点位查缓存
- result_dict = await BizVarDictService.get_vardict_alarms_service(auth=auth, redis=redis)
- cranes = await BizCraneService.list_crane_service(auth=auth)
- valid_crane_nos: Set[str] = set()
- for crane_dict in cranes:
- if crane_dict.get('status') == '0':
- continue
- crane_no = crane_dict.get('crane_no')
- if crane_no and isinstance(crane_no, str):
- valid_crane_nos.add(crane_no)
- try:
- async with httpx.AsyncClient() as client:
- response = await client.get(
- url=settings.COLLECT_DATA_FULL,
- params={},
- timeout=2
- )
- # 捕获HTTP状态码非200的情况
- response.raise_for_status()
- # 捕获JSON解析异常
- json_data = response.json()
- filtered_result = []
- for item in result_dict:
- item_crane_no = item.get('crane_no')
- if item_crane_no not in valid_crane_nos:
- continue
- item['value'] = False # 默认值
- if json_data and json_data['code'] == 200 and json_data.get('data'):
- alarm = json_data.get('data').get(item_crane_no, {}).get('data', {}).get('alarm', {}).get(
- item.get('var_code'))
- if alarm:
- item['value'] = alarm.get('value')
- filtered_result.append(item)
- result_dict = filtered_result
- except Exception as e:
- log.error(f"调用数据初始化接口获取报警列表时发生未知异常:{str(e)}", exc_info=True)
- log.info("查询变量信息列表成功")
- return SuccessResponse(data=result_dict, msg="查询变量信息列表成功")
- @BizVarDictRouter.get("/list_analog", summary="查询变量信息列表", description="查询变量信息列表")
- async def get_vardict_list_analog_controller(
- search: BizVarDictQueryParam = Depends(),
- auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:query"]))
- ) -> JSONResponse:
- """查询变量信息列表接口(数据库分页)"""
- result_dict = await BizVarDictService.vardict_analog_list(auth=auth,search=search)
- log.info("查询变量信息列表成功")
- return SuccessResponse(data=result_dict, msg="查询变量信息列表成功")
- @BizVarDictRouter.get("/list_operation_record", summary="查询操作点位变量信息列表", description="查询操作点位变量信息列表")
- async def get_vardict_list_operation_record_controller(
- search: BizVarDictQueryParam = Depends(),
- auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:query"]))
- ) -> JSONResponse:
- """查询变量信息列表接口(数据库分页)"""
- result_dict = await BizVarDictService.vardict_operation_record_list(auth=auth,search=search)
- log.info("查询操作点位变量信息列表成功")
- return SuccessResponse(data=result_dict, msg="查询操作点位变量信息列表成功")
- @BizVarDictRouter.get("/varDictMecGroup/{crane_no}", summary="获取变量信息分组数据", description="获取变量信息分组数据")
- async def get_vardict_mec_group_controller(
- crane_no: str = Path(..., description="crane_no"),
- redis: Redis = Depends(redis_getter),
- auth: AuthSchema = Depends(AuthPermission(["module_business:crane:query"]))
- ) -> JSONResponse:
- result_dict = await BizVarDictService.get_vardict_group_service(
- redis=redis, crane_no=crane_no,auth=auth
- )
- if not result_dict:
- log.info(f"获取变量信息分组数据成功:{result_dict}")
- return SuccessResponse(data=result_dict, msg="获取变量分组数据成功")
- #请求采集接口获取状态信息
- try:
- async with httpx.AsyncClient() as client:
- response = await client.get(
- url=settings.COLLECT_DATA_FULL,
- params={},
- timeout=2
- )
- # 捕获HTTP状态码非200的情况
- response.raise_for_status()
- # 捕获JSON解析异常
- json_data = response.json()
- if json_data['code'] == 200 and json_data.get('data'):
- json_analog = json_data.get('data').get(crane_no).get('data').get('analog')
- json_digital = json_data.get('data').get(crane_no).get('data').get('digital')
- for var_dict in result_dict:
- for key, inner_dict in var_dict.items():
- if key != 'mec_type' and key != 'alarm_varList' and key != 'mecVarList_simple':
- for item in inner_dict:
- if key == 'digital_varList':
- if json_digital:
- item['value'] = json_digital.get(item.get('var_code')).get('value')
- else:
- if json_analog:
- item['value'] = json_analog.get(item.get('var_code')).get('value')
- except Exception as e:
- log.error(f"调用采集接口获取分组变量状态时发生未知异常:{str(e)}", exc_info=True)
- log.info(f"获取变量信息分组数据成功:{result_dict}")
- return SuccessResponse(data=result_dict, msg="获取变量分组数据成功")
- @BizVarDictRouter.get("/historyData", summary="查询历史数据列表", description="查询历史数据列表")
- async def get_vardict_historyData_controller(
- page: PaginationQueryParam = Depends(),
- search: BizVarDictQueryParam = Depends(),
- auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:query"]))
- ) -> JSONResponse:
- search.data_type = ('!=', 1)
- result_dict = await BizVarDictService.get_tdengine_data(
- search=search,
- stable_name='st_analog',
- page_no=page.page_no if page.page_no is not None else 1,
- page_size=page.page_size if page.page_size is not None else 10,
- auth=auth
- )
- log.info("查询历史数据列表成功")
- return SuccessResponse(data=result_dict, msg="查询历史数据列表成功")
- @BizVarDictRouter.get("/operationRecord", summary="查询操作记录列表", description="查询操作记录列表")
- async def get_vardict_operationRecord_controller(
- search: BizVarDictQueryParam = Depends(),
- auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:query"]))
- ) -> JSONResponse:
- #过滤操作记录
- search.switch_type = ('<=',1)
- result_dict = await BizVarDictService.get_tdengine_data_operation(
- search=search,
- stable_name='st_digital',
- auth=auth
- )
- log.info("查询操作记录列表成功")
- return SuccessResponse(data=result_dict, msg="查询操作记录列表成功")
- @BizVarDictRouter.get("/historyAlarm", summary="查询报警记录列表", description="查询报警记录列表")
- async def get_vardict_historyAlarm_controller(
- search: BizVarDictQueryParam = Depends(),
- auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:query"]))
- ) -> JSONResponse:
- result_dict = await BizVarDictService.get_tdengine_data_operation(
- search=search,
- stable_name='st_alarm',
- auth=auth
- )
- log.info("查询报警记录列表成功")
- return SuccessResponse(data=result_dict, msg="查询报警记录列表成功")
|