controller.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. # -*- coding: utf-8 -*-
  2. from typing import Set
  3. import httpx
  4. from fastapi import APIRouter, Depends, UploadFile, Body, Path, Query
  5. from fastapi.responses import StreamingResponse, JSONResponse
  6. from redis.asyncio.client import Redis
  7. from app.common.response import SuccessResponse, StreamResponse
  8. from app.config.setting import settings
  9. from app.core.dependencies import AuthPermission, redis_getter
  10. from app.api.v1.module_system.auth.schema import AuthSchema
  11. from app.core.base_params import PaginationQueryParam
  12. from app.utils.common_util import bytes2file_response
  13. from app.core.logger import log
  14. from app.core.base_schema import BatchSetAvailable
  15. from .service import BizVarDictService
  16. from .schema import BizVarDictCreateSchema, BizVarDictUpdateSchema, BizVarDictQueryParam
  17. from ..crane.service import BizCraneService
  18. BizVarDictRouter = APIRouter(prefix='/vardict', tags=["变量信息模块"])
  19. @BizVarDictRouter.get("/detail/{id}", summary="获取变量信息详情", description="获取变量信息详情")
  20. async def get_vardict_detail_controller(
  21. id: int = Path(..., description="ID"),
  22. auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:query"]))
  23. ) -> JSONResponse:
  24. """获取变量信息详情接口"""
  25. result_dict = await BizVarDictService.detail_vardict_service(auth=auth, id=id)
  26. log.info(f"获取变量信息详情成功 {id}")
  27. return SuccessResponse(data=result_dict, msg="获取变量信息详情成功")
  28. @BizVarDictRouter.get("/list", summary="查询变量信息列表", description="查询变量信息列表")
  29. async def get_vardict_list_controller(
  30. page: PaginationQueryParam = Depends(),
  31. search: BizVarDictQueryParam = Depends(),
  32. auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:query"]))
  33. ) -> JSONResponse:
  34. """查询变量信息列表接口(数据库分页)"""
  35. result_dict = await BizVarDictService.page_vardict_service(
  36. auth=auth,
  37. page_no=page.page_no if page.page_no is not None else 1,
  38. page_size=page.page_size if page.page_size is not None else 10,
  39. search=search,
  40. order_by=page.order_by
  41. )
  42. log.info("查询变量信息列表成功")
  43. return SuccessResponse(data=result_dict, msg="查询变量信息列表成功")
  44. @BizVarDictRouter.post("/create", summary="创建变量信息", description="创建变量信息")
  45. async def create_vardict_controller(
  46. data: BizVarDictCreateSchema,
  47. redis: Redis = Depends(redis_getter),
  48. auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:create"]))
  49. ) -> JSONResponse:
  50. """创建变量信息接口"""
  51. result_dict = await BizVarDictService.create_vardict_service(auth=auth, data=data,redis=redis)
  52. log.info("创建变量信息成功")
  53. return SuccessResponse(data=result_dict, msg="创建变量信息成功")
  54. @BizVarDictRouter.put("/update/{id}", summary="修改变量信息", description="修改变量信息")
  55. async def update_vardict_controller(
  56. data: BizVarDictUpdateSchema,
  57. id: int = Path(..., description="ID"),
  58. redis: Redis = Depends(redis_getter),
  59. auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:update"]))
  60. ) -> JSONResponse:
  61. """修改变量信息接口"""
  62. result_dict = await BizVarDictService.update_vardict_service(auth=auth, id=id, data=data,redis=redis)
  63. log.info("修改变量信息成功")
  64. return SuccessResponse(data=result_dict, msg="修改变量信息成功")
  65. @BizVarDictRouter.delete("/delete", summary="删除变量信息", description="删除变量信息")
  66. async def delete_vardict_controller(
  67. ids: list[int] = Body(..., description="ID列表"),
  68. redis: Redis = Depends(redis_getter),
  69. auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:delete"]))
  70. ) -> JSONResponse:
  71. """删除变量信息接口"""
  72. await BizVarDictService.delete_vardict_service(auth=auth, ids=ids,redis=redis)
  73. log.info(f"删除变量信息成功: {ids}")
  74. return SuccessResponse(msg="删除变量信息成功")
  75. @BizVarDictRouter.patch("/available/setting", summary="批量修改变量信息状态", description="批量修改变量信息状态")
  76. async def batch_set_available_vardict_controller(
  77. data: BatchSetAvailable,
  78. auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:patch"]))
  79. ) -> JSONResponse:
  80. """批量修改变量信息状态接口"""
  81. await BizVarDictService.set_available_vardict_service(auth=auth, data=data)
  82. log.info(f"批量修改变量信息状态成功: {data.ids}")
  83. return SuccessResponse(msg="批量修改变量信息状态成功")
  84. @BizVarDictRouter.post('/export', summary="导出变量信息", description="导出变量信息")
  85. async def export_vardict_list_controller(
  86. search: BizVarDictQueryParam = Depends(),
  87. auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:export"]))
  88. ) -> StreamingResponse:
  89. """导出变量信息接口"""
  90. result_dict_list = await BizVarDictService.list_vardict_service(search=search, auth=auth)
  91. export_result = await BizVarDictService.batch_export_vardict_service(obj_list=result_dict_list)
  92. log.info('导出变量信息成功')
  93. return StreamResponse(
  94. data=bytes2file_response(export_result),
  95. media_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
  96. headers={
  97. 'Content-Disposition': 'attachment; filename=biz_var_dict.xlsx'
  98. }
  99. )
  100. @BizVarDictRouter.post('/import', summary="导入变量信息", description="导入变量信息")
  101. async def import_vardict_list_controller(
  102. file: UploadFile,
  103. auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:import"]))
  104. ) -> JSONResponse:
  105. """导入变量信息接口"""
  106. batch_import_result = await BizVarDictService.batch_import_vardict_service(file=file, auth=auth, update_support=True)
  107. log.info("导入变量信息成功")
  108. return SuccessResponse(data=batch_import_result, msg="导入变量信息成功")
  109. @BizVarDictRouter.post('/download/template', summary="获取变量信息导入模板", description="获取变量信息导入模板", dependencies=[Depends(AuthPermission(["module_business:vardict:download"]))])
  110. async def export_vardict_template_controller() -> StreamingResponse:
  111. """获取变量信息导入模板接口"""
  112. import_template_result = await BizVarDictService.import_template_download_vardict_service()
  113. log.info('获取变量信息导入模板成功')
  114. return StreamResponse(
  115. data=bytes2file_response(import_template_result),
  116. media_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
  117. headers={'Content-Disposition': 'attachment; filename=biz_var_dict_template.xlsx'}
  118. )
  119. @BizVarDictRouter.get("/list_alarms", summary="查询变量信息列表", description="查询变量信息列表")
  120. async def get_vardict_list_alarms_controller(
  121. search: BizVarDictQueryParam = Depends(),
  122. redis: Redis = Depends(redis_getter),
  123. auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:query"]))
  124. ) -> JSONResponse:
  125. if search.crane_no: #带crane_no说明是单台车报警直接查数据库即可
  126. result_dict = await BizVarDictService.vardict_alarms_list(auth=auth, search=search)
  127. try:
  128. async with httpx.AsyncClient() as client:
  129. response = await client.get(
  130. url=settings.COLLECT_DATA_FULL,
  131. params={},
  132. timeout=2
  133. )
  134. # 捕获HTTP状态码非200的情况
  135. response.raise_for_status()
  136. # 捕获JSON解析异常
  137. json_data = response.json()
  138. if json_data['code'] == 200 and json_data.get('data'):
  139. for item in result_dict:
  140. item['value'] = False
  141. crane_no = item['crane_no']
  142. alarm = json_data.get('data').get(crane_no, {}).get('data', {}).get('alarm', {}).get(
  143. item['var_code'])
  144. if alarm:
  145. item['value'] = alarm.get('value')
  146. except Exception as e:
  147. log.error(f"调用数据初始化接口获取报警列表时发生未知异常:{str(e)}", exc_info=True)
  148. else: #全部报警点位查缓存
  149. result_dict = await BizVarDictService.get_vardict_alarms_service(auth=auth, redis=redis)
  150. cranes = await BizCraneService.list_crane_service(auth=auth)
  151. valid_crane_nos: Set[str] = set()
  152. for crane_dict in cranes:
  153. if crane_dict.get('status') == '0':
  154. continue
  155. crane_no = crane_dict.get('crane_no')
  156. if crane_no and isinstance(crane_no, str):
  157. valid_crane_nos.add(crane_no)
  158. try:
  159. async with httpx.AsyncClient() as client:
  160. response = await client.get(
  161. url=settings.COLLECT_DATA_FULL,
  162. params={},
  163. timeout=2
  164. )
  165. # 捕获HTTP状态码非200的情况
  166. response.raise_for_status()
  167. # 捕获JSON解析异常
  168. json_data = response.json()
  169. filtered_result = []
  170. for item in result_dict:
  171. item_crane_no = item.get('crane_no')
  172. if item_crane_no not in valid_crane_nos:
  173. continue
  174. item['value'] = False # 默认值
  175. if json_data and json_data['code'] == 200 and json_data.get('data'):
  176. alarm = json_data.get('data').get(item_crane_no, {}).get('data', {}).get('alarm', {}).get(
  177. item.get('var_code'))
  178. if alarm:
  179. item['value'] = alarm.get('value')
  180. filtered_result.append(item)
  181. result_dict = filtered_result
  182. except Exception as e:
  183. log.error(f"调用数据初始化接口获取报警列表时发生未知异常:{str(e)}", exc_info=True)
  184. log.info("查询变量信息列表成功")
  185. return SuccessResponse(data=result_dict, msg="查询变量信息列表成功")
  186. @BizVarDictRouter.get("/list_analog", summary="查询变量信息列表", description="查询变量信息列表")
  187. async def get_vardict_list_analog_controller(
  188. search: BizVarDictQueryParam = Depends(),
  189. auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:query"]))
  190. ) -> JSONResponse:
  191. """查询变量信息列表接口(数据库分页)"""
  192. result_dict = await BizVarDictService.vardict_analog_list(auth=auth,search=search)
  193. log.info("查询变量信息列表成功")
  194. return SuccessResponse(data=result_dict, msg="查询变量信息列表成功")
  195. @BizVarDictRouter.get("/list_operation_record", summary="查询操作点位变量信息列表", description="查询操作点位变量信息列表")
  196. async def get_vardict_list_operation_record_controller(
  197. search: BizVarDictQueryParam = Depends(),
  198. auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:query"]))
  199. ) -> JSONResponse:
  200. """查询变量信息列表接口(数据库分页)"""
  201. result_dict = await BizVarDictService.vardict_operation_record_list(auth=auth,search=search)
  202. log.info("查询操作点位变量信息列表成功")
  203. return SuccessResponse(data=result_dict, msg="查询操作点位变量信息列表成功")
  204. @BizVarDictRouter.get("/varDictMecGroup/{crane_no}", summary="获取变量信息分组数据", description="获取变量信息分组数据")
  205. async def get_vardict_mec_group_controller(
  206. crane_no: str = Path(..., description="crane_no"),
  207. redis: Redis = Depends(redis_getter),
  208. auth: AuthSchema = Depends(AuthPermission(["module_business:crane:query"]))
  209. ) -> JSONResponse:
  210. result_dict = await BizVarDictService.get_vardict_group_service(
  211. redis=redis, crane_no=crane_no,auth=auth
  212. )
  213. if not result_dict:
  214. log.info(f"获取变量信息分组数据成功:{result_dict}")
  215. return SuccessResponse(data=result_dict, msg="获取变量分组数据成功")
  216. #请求采集接口获取状态信息
  217. try:
  218. async with httpx.AsyncClient() as client:
  219. response = await client.get(
  220. url=settings.COLLECT_DATA_FULL,
  221. params={},
  222. timeout=2
  223. )
  224. # 捕获HTTP状态码非200的情况
  225. response.raise_for_status()
  226. # 捕获JSON解析异常
  227. json_data = response.json()
  228. if json_data['code'] == 200 and json_data.get('data'):
  229. json_analog = json_data.get('data').get(crane_no).get('data').get('analog')
  230. json_digital = json_data.get('data').get(crane_no).get('data').get('digital')
  231. for var_dict in result_dict:
  232. for key, inner_dict in var_dict.items():
  233. if key != 'mec_type' and key != 'alarm_varList' and key != 'mecVarList_simple':
  234. for item in inner_dict:
  235. if key == 'digital_varList':
  236. if json_digital:
  237. item['value'] = json_digital.get(item.get('var_code')).get('value')
  238. else:
  239. if json_analog:
  240. item['value'] = json_analog.get(item.get('var_code')).get('value')
  241. except Exception as e:
  242. log.error(f"调用采集接口获取分组变量状态时发生未知异常:{str(e)}", exc_info=True)
  243. log.info(f"获取变量信息分组数据成功:{result_dict}")
  244. return SuccessResponse(data=result_dict, msg="获取变量分组数据成功")
  245. @BizVarDictRouter.get("/historyData", summary="查询历史数据列表", description="查询历史数据列表")
  246. async def get_vardict_historyData_controller(
  247. page: PaginationQueryParam = Depends(),
  248. search: BizVarDictQueryParam = Depends(),
  249. auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:query"]))
  250. ) -> JSONResponse:
  251. search.data_type = ('!=', 1)
  252. result_dict = await BizVarDictService.get_tdengine_data(
  253. search=search,
  254. stable_name='st_analog',
  255. page_no=page.page_no if page.page_no is not None else 1,
  256. page_size=page.page_size if page.page_size is not None else 10,
  257. auth=auth
  258. )
  259. log.info("查询历史数据列表成功")
  260. return SuccessResponse(data=result_dict, msg="查询历史数据列表成功")
  261. @BizVarDictRouter.get("/operationRecord", summary="查询操作记录列表", description="查询操作记录列表")
  262. async def get_vardict_operationRecord_controller(
  263. search: BizVarDictQueryParam = Depends(),
  264. auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:query"]))
  265. ) -> JSONResponse:
  266. #过滤操作记录
  267. search.switch_type = ('<=',1)
  268. result_dict = await BizVarDictService.get_tdengine_data_operation(
  269. search=search,
  270. stable_name='st_digital',
  271. auth=auth
  272. )
  273. log.info("查询操作记录列表成功")
  274. return SuccessResponse(data=result_dict, msg="查询操作记录列表成功")
  275. @BizVarDictRouter.get("/historyAlarm", summary="查询报警记录列表", description="查询报警记录列表")
  276. async def get_vardict_historyAlarm_controller(
  277. search: BizVarDictQueryParam = Depends(),
  278. auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:query"]))
  279. ) -> JSONResponse:
  280. result_dict = await BizVarDictService.get_tdengine_data_operation(
  281. search=search,
  282. stable_name='st_alarm',
  283. auth=auth
  284. )
  285. log.info("查询报警记录列表成功")
  286. return SuccessResponse(data=result_dict, msg="查询报警记录列表成功")