controller.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. # -*- coding: utf-8 -*-
  2. import json
  3. import asyncio
  4. from starlette.websockets import WebSocketDisconnect
  5. from fastapi import APIRouter, Depends, UploadFile, Body, Path, WebSocket
  6. from fastapi.responses import StreamingResponse, JSONResponse
  7. from app.common.response import SuccessResponse, StreamResponse
  8. from app.core.dependencies import AuthPermission
  9. from app.api.v1.module_system.auth.schema import AuthSchema
  10. from app.core.base_params import PaginationQueryParam
  11. from app.utils.common_util import bytes2file_response
  12. from app.core.logger import log
  13. from app.core.base_schema import BatchSetAvailable
  14. from .service import BizVarDictService
  15. from .schema import BizVarDictCreateSchema, BizVarDictUpdateSchema, BizVarDictQueryParam
  16. BizVarDictRouter = APIRouter(prefix='/vardict', tags=["变量信息模块"])
  17. @BizVarDictRouter.get("/detail/{id}", summary="获取变量信息详情", description="获取变量信息详情")
  18. async def get_vardict_detail_controller(
  19. id: int = Path(..., description="ID"),
  20. auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:query"]))
  21. ) -> JSONResponse:
  22. """获取变量信息详情接口"""
  23. result_dict = await BizVarDictService.detail_vardict_service(auth=auth, id=id)
  24. log.info(f"获取变量信息详情成功 {id}")
  25. return SuccessResponse(data=result_dict, msg="获取变量信息详情成功")
  26. @BizVarDictRouter.get("/list", summary="查询变量信息列表", description="查询变量信息列表")
  27. async def get_vardict_list_controller(
  28. page: PaginationQueryParam = Depends(),
  29. search: BizVarDictQueryParam = Depends(),
  30. auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:query"]))
  31. ) -> JSONResponse:
  32. """查询变量信息列表接口(数据库分页)"""
  33. result_dict = await BizVarDictService.page_vardict_service(
  34. auth=auth,
  35. page_no=page.page_no if page.page_no is not None else 1,
  36. page_size=page.page_size if page.page_size is not None else 10,
  37. search=search,
  38. order_by=page.order_by
  39. )
  40. log.info("查询变量信息列表成功")
  41. return SuccessResponse(data=result_dict, msg="查询变量信息列表成功")
  42. @BizVarDictRouter.post("/create", summary="创建变量信息", description="创建变量信息")
  43. async def create_vardict_controller(
  44. data: BizVarDictCreateSchema,
  45. auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:create"]))
  46. ) -> JSONResponse:
  47. """创建变量信息接口"""
  48. result_dict = await BizVarDictService.create_vardict_service(auth=auth, data=data)
  49. log.info("创建变量信息成功")
  50. return SuccessResponse(data=result_dict, msg="创建变量信息成功")
  51. @BizVarDictRouter.put("/update/{id}", summary="修改变量信息", description="修改变量信息")
  52. async def update_vardict_controller(
  53. data: BizVarDictUpdateSchema,
  54. id: int = Path(..., description="ID"),
  55. auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:update"]))
  56. ) -> JSONResponse:
  57. """修改变量信息接口"""
  58. result_dict = await BizVarDictService.update_vardict_service(auth=auth, id=id, data=data)
  59. log.info("修改变量信息成功")
  60. return SuccessResponse(data=result_dict, msg="修改变量信息成功")
  61. @BizVarDictRouter.delete("/delete", summary="删除变量信息", description="删除变量信息")
  62. async def delete_vardict_controller(
  63. ids: list[int] = Body(..., description="ID列表"),
  64. auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:delete"]))
  65. ) -> JSONResponse:
  66. """删除变量信息接口"""
  67. await BizVarDictService.delete_vardict_service(auth=auth, ids=ids)
  68. log.info(f"删除变量信息成功: {ids}")
  69. return SuccessResponse(msg="删除变量信息成功")
  70. @BizVarDictRouter.patch("/available/setting", summary="批量修改变量信息状态", description="批量修改变量信息状态")
  71. async def batch_set_available_vardict_controller(
  72. data: BatchSetAvailable,
  73. auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:patch"]))
  74. ) -> JSONResponse:
  75. """批量修改变量信息状态接口"""
  76. await BizVarDictService.set_available_vardict_service(auth=auth, data=data)
  77. log.info(f"批量修改变量信息状态成功: {data.ids}")
  78. return SuccessResponse(msg="批量修改变量信息状态成功")
  79. @BizVarDictRouter.post('/export', summary="导出变量信息", description="导出变量信息")
  80. async def export_vardict_list_controller(
  81. search: BizVarDictQueryParam = Depends(),
  82. auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:export"]))
  83. ) -> StreamingResponse:
  84. """导出变量信息接口"""
  85. result_dict_list = await BizVarDictService.list_vardict_service(search=search, auth=auth)
  86. export_result = await BizVarDictService.batch_export_vardict_service(obj_list=result_dict_list)
  87. log.info('导出变量信息成功')
  88. return StreamResponse(
  89. data=bytes2file_response(export_result),
  90. media_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
  91. headers={
  92. 'Content-Disposition': 'attachment; filename=biz_var_dict.xlsx'
  93. }
  94. )
  95. @BizVarDictRouter.post('/import', summary="导入变量信息", description="导入变量信息")
  96. async def import_vardict_list_controller(
  97. file: UploadFile,
  98. auth: AuthSchema = Depends(AuthPermission(["module_business:vardict:import"]))
  99. ) -> JSONResponse:
  100. """导入变量信息接口"""
  101. batch_import_result = await BizVarDictService.batch_import_vardict_service(file=file, auth=auth, update_support=True)
  102. log.info("导入变量信息成功")
  103. return SuccessResponse(data=batch_import_result, msg="导入变量信息成功")
  104. @BizVarDictRouter.post('/download/template', summary="获取变量信息导入模板", description="获取变量信息导入模板", dependencies=[Depends(AuthPermission(["module_business:vardict:download"]))])
  105. async def export_vardict_template_controller() -> StreamingResponse:
  106. """获取变量信息导入模板接口"""
  107. import_template_result = await BizVarDictService.import_template_download_vardict_service()
  108. log.info('获取变量信息导入模板成功')
  109. return StreamResponse(
  110. data=bytes2file_response(import_template_result),
  111. media_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
  112. headers={'Content-Disposition': 'attachment; filename=biz_var_dict_template.xlsx'}
  113. )
  114. @BizVarDictRouter.websocket("/ws/alert", name="变量报警信息", dependencies=[])
  115. async def websocket_chat_controller(
  116. websocket: WebSocket,
  117. ):
  118. await websocket.accept()
  119. is_closed = False
  120. try:
  121. log.info("前端WebSocket连接成功,开始推送测试数据")
  122. # 构造测试数据(完全匹配前端预期的格式)
  123. test_messages = [
  124. # 测试报警消息(gc/alert)
  125. {
  126. "topic": "gc/alert",
  127. "data": [
  128. {"switch_type": "2", "crane_name": "塔吊001", "msg": "轻度过载报警"},
  129. {"switch_type": "3", "crane_name": "塔吊002", "msg": "中度倾斜报警"},
  130. {"switch_type": "4", "crane_name": "塔吊003", "msg": "重度碰撞报警"}
  131. ]
  132. }
  133. ]
  134. # 循环推送测试数据(每隔3秒推一次,模拟实时更新)
  135. while True:
  136. if is_closed:
  137. break
  138. for msg in test_messages:
  139. # 将字典转为JSON字符串(前端能解析)
  140. await websocket.send_text(json.dumps(msg))
  141. log.info(f"已推送测试数据: {msg['topic']}")
  142. await asyncio.sleep(5) # 每3秒推一条,可调整间隔
  143. except WebSocketDisconnect:
  144. is_closed = True
  145. log.info("前端主动断开WebSocket连接")
  146. except Exception as e:
  147. is_closed = True
  148. log.error(f"WebSocket异常: {str(e)}", exc_info=True)
  149. finally:
  150. if not is_closed:
  151. try:
  152. await websocket.close()
  153. log.info("关闭WebSocket连接")
  154. except RuntimeError:
  155. log.debug("WebSocket已关闭,无需重复操作")