controller.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. # -*- coding: utf-8 -*-
  2. import json
  3. import asyncio
  4. from starlette.websockets import WebSocketDisconnect
  5. from fastapi import APIRouter, Depends, UploadFile, Body, Path, WebSocket
  6. from fastapi.responses import StreamingResponse, JSONResponse
  7. from app.common.response import SuccessResponse, StreamResponse
  8. from app.core.dependencies import AuthPermission
  9. from app.api.v1.module_system.auth.schema import AuthSchema
  10. from app.core.base_params import PaginationQueryParam
  11. from app.utils.common_util import bytes2file_response
  12. from app.core.logger import log
  13. from app.core.base_schema import BatchSetAvailable
  14. from .service import BizCraneService
  15. from .schema import BizCraneCreateSchema, BizCraneUpdateSchema, BizCraneQueryParam
  16. BizCraneRouter = APIRouter(prefix='/crane', tags=["行车信息模块"])
  17. @BizCraneRouter.get("/detail/{id}", summary="获取行车信息详情", description="获取行车信息详情")
  18. async def get_crane_detail_controller(
  19. id: int = Path(..., description="ID"),
  20. auth: AuthSchema = Depends(AuthPermission(["module_business:crane:query"]))
  21. ) -> JSONResponse:
  22. """获取行车信息详情接口"""
  23. result_dict = await BizCraneService.detail_crane_service(auth=auth, id=id)
  24. log.info(f"获取行车信息详情成功 {id}")
  25. return SuccessResponse(data=result_dict, msg="获取行车信息详情成功")
  26. @BizCraneRouter.get("/list", summary="查询行车信息列表", description="查询行车信息列表")
  27. async def get_crane_list_controller(
  28. page: PaginationQueryParam = Depends(),
  29. search: BizCraneQueryParam = Depends(),
  30. auth: AuthSchema = Depends(AuthPermission(["module_business:crane:query"]))
  31. ) -> JSONResponse:
  32. """查询行车信息列表接口(数据库分页)"""
  33. result_dict = await BizCraneService.page_crane_service(
  34. auth=auth,
  35. page_no=page.page_no if page.page_no is not None else 1,
  36. page_size=page.page_size if page.page_size is not None else 10,
  37. search=search,
  38. order_by=page.order_by
  39. )
  40. log.info("查询行车信息列表成功")
  41. return SuccessResponse(data=result_dict, msg="查询行车信息列表成功")
  42. @BizCraneRouter.post("/create", summary="创建行车信息", description="创建行车信息")
  43. async def create_crane_controller(
  44. data: BizCraneCreateSchema,
  45. auth: AuthSchema = Depends(AuthPermission(["module_business:crane:create"]))
  46. ) -> JSONResponse:
  47. """创建行车信息接口"""
  48. result_dict = await BizCraneService.create_crane_service(auth=auth, data=data)
  49. log.info("创建行车信息成功")
  50. return SuccessResponse(data=result_dict, msg="创建行车信息成功")
  51. @BizCraneRouter.put("/update/{id}", summary="修改行车信息", description="修改行车信息")
  52. async def update_crane_controller(
  53. data: BizCraneUpdateSchema,
  54. id: int = Path(..., description="ID"),
  55. auth: AuthSchema = Depends(AuthPermission(["module_business:crane:update"]))
  56. ) -> JSONResponse:
  57. """修改行车信息接口"""
  58. result_dict = await BizCraneService.update_crane_service(auth=auth, id=id, data=data)
  59. log.info("修改行车信息成功")
  60. return SuccessResponse(data=result_dict, msg="修改行车信息成功")
  61. @BizCraneRouter.delete("/delete", summary="删除行车信息", description="删除行车信息")
  62. async def delete_crane_controller(
  63. ids: list[int] = Body(..., description="ID列表"),
  64. auth: AuthSchema = Depends(AuthPermission(["module_business:crane:delete"]))
  65. ) -> JSONResponse:
  66. """删除行车信息接口"""
  67. await BizCraneService.delete_crane_service(auth=auth, ids=ids)
  68. log.info(f"删除行车信息成功: {ids}")
  69. return SuccessResponse(msg="删除行车信息成功")
  70. @BizCraneRouter.patch("/available/setting", summary="批量修改行车信息状态", description="批量修改行车信息状态")
  71. async def batch_set_available_crane_controller(
  72. data: BatchSetAvailable,
  73. auth: AuthSchema = Depends(AuthPermission(["module_business:crane:patch"]))
  74. ) -> JSONResponse:
  75. """批量修改行车信息状态接口"""
  76. await BizCraneService.set_available_crane_service(auth=auth, data=data)
  77. log.info(f"批量修改行车信息状态成功: {data.ids}")
  78. return SuccessResponse(msg="批量修改行车信息状态成功")
  79. @BizCraneRouter.post('/export', summary="导出行车信息", description="导出行车信息")
  80. async def export_crane_list_controller(
  81. search: BizCraneQueryParam = Depends(),
  82. auth: AuthSchema = Depends(AuthPermission(["module_business:crane:export"]))
  83. ) -> StreamingResponse:
  84. """导出行车信息接口"""
  85. result_dict_list = await BizCraneService.list_crane_service(search=search, auth=auth)
  86. export_result = await BizCraneService.batch_export_crane_service(obj_list=result_dict_list)
  87. log.info('导出行车信息成功')
  88. return StreamResponse(
  89. data=bytes2file_response(export_result),
  90. media_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
  91. headers={
  92. 'Content-Disposition': 'attachment; filename=biz_crane.xlsx'
  93. }
  94. )
  95. @BizCraneRouter.post('/import', summary="导入行车信息", description="导入行车信息")
  96. async def import_crane_list_controller(
  97. file: UploadFile,
  98. auth: AuthSchema = Depends(AuthPermission(["module_business:crane:import"]))
  99. ) -> JSONResponse:
  100. """导入行车信息接口"""
  101. batch_import_result = await BizCraneService.batch_import_crane_service(file=file, auth=auth, update_support=True)
  102. log.info("导入行车信息成功")
  103. return SuccessResponse(data=batch_import_result, msg="导入行车信息成功")
  104. @BizCraneRouter.post('/download/template', summary="获取行车信息导入模板", description="获取行车信息导入模板", dependencies=[Depends(AuthPermission(["module_business:crane:download"]))])
  105. async def export_crane_template_controller() -> StreamingResponse:
  106. """获取行车信息导入模板接口"""
  107. import_template_result = await BizCraneService.import_template_download_crane_service()
  108. log.info('获取行车信息导入模板成功')
  109. return StreamResponse(
  110. data=bytes2file_response(import_template_result),
  111. media_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
  112. headers={'Content-Disposition': 'attachment; filename=biz_crane_template.xlsx'}
  113. )
  114. @BizCraneRouter.websocket("/ws/online", name="天车在线状态检测", dependencies=[])
  115. async def websocket_chat_controller(
  116. websocket: WebSocket,
  117. ):
  118. await websocket.accept()
  119. is_closed = False
  120. try:
  121. log.info("前端WebSocket连接成功,开始推送测试数据")
  122. # 构造测试数据(完全匹配前端预期的格式)
  123. test_messages = [
  124. # 测试报警消息(gc/alert)
  125. {
  126. "topic": "gc/crane_status",
  127. "data": [
  128. {"crane_no": "11111111", "is_online": True},
  129. {"crane_no": "123456", "is_online": False}
  130. ]
  131. }
  132. ]
  133. # 循环推送测试数据(每隔3秒推一次,模拟实时更新)
  134. while True:
  135. if is_closed:
  136. break
  137. for msg in test_messages:
  138. # 将字典转为JSON字符串(前端能解析)
  139. await websocket.send_text(json.dumps(msg))
  140. log.info(f"已推送测试数据: {msg['topic']}")
  141. await asyncio.sleep(5) # 每3秒推一条,可调整间隔
  142. except WebSocketDisconnect:
  143. is_closed = True
  144. log.info("前端主动断开WebSocket连接")
  145. except Exception as e:
  146. is_closed = True
  147. log.error(f"WebSocket异常: {str(e)}", exc_info=True)
  148. finally:
  149. if not is_closed:
  150. try:
  151. await websocket.close()
  152. log.info("关闭WebSocket连接")
  153. except RuntimeError:
  154. log.debug("WebSocket已关闭,无需重复操作")