Преглед изворни кода

修改ws使用mqtt工具连接

cuiHe пре 1 недеља
родитељ
комит
8e9d0864e1

+ 22 - 16
backend/app/api/v1/module_application/ai/controller.py

@@ -2,17 +2,17 @@
 
 from fastapi import APIRouter, Depends, Path, Body, WebSocket
 from fastapi.responses import JSONResponse, StreamingResponse
+
 from app.common.response import StreamResponse, SuccessResponse
 from app.common.request import PaginationService
 from app.core.base_params import PaginationQueryParam
 from app.core.dependencies import AuthPermission
 from app.core.logger import log
+
 from app.api.v1.module_system.auth.schema import AuthSchema
 from app.core.router_class import OperationLogRoute
 from .service import McpService
 from .schema import McpCreateSchema, McpUpdateSchema, ChatQuerySchema, McpQueryParam
-import json
-import asyncio
 
 
 AIRouter = APIRouter(route_class=OperationLogRoute, prefix="/ai", tags=["MCP智能助手"])
@@ -25,16 +25,16 @@ async def chat_controller(
 ) -> StreamingResponse:
     """
     智能对话接口
-
+    
     参数:
     - query (ChatQuerySchema): 聊天查询模型
-
+    
     返回:
     - StreamingResponse: 流式响应,每次返回一个聊天响应
     """
     user_name = auth.user.name if auth.user else "未知用户"
     log.info(f"用户 {user_name} 发起智能对话: {query.message[:50]}...")
-
+    
     async def generate_response():
         try:
             async for chunk in McpService.chat_query(query=query):
@@ -44,7 +44,7 @@ async def chat_controller(
         except Exception as e:
             log.error(f"流式响应出错: {str(e)}")
             yield f"抱歉,处理您的请求时出现了错误: {str(e)}".encode('utf-8')
-
+    
     return StreamResponse(generate_response(), media_type="text/plain; charset=utf-8")
 
 
@@ -55,10 +55,10 @@ async def detail_controller(
 ) -> JSONResponse:
     """
     获取 MCP 服务器详情接口
-
+    
     参数:
     - id (int): MCP 服务器ID
-
+    
     返回:
     - JSONResponse: 包含 MCP 服务器详情的 JSON 响应
     """
@@ -75,12 +75,12 @@ async def list_controller(
 ) -> JSONResponse:
     """
     查询 MCP 服务器列表接口
-
+    
     参数:
     - page (PaginationQueryParam): 分页查询参数模型
     - search (McpQueryParam): 查询参数模型
     - auth (AuthSchema): 认证信息模型
-
+    
     返回:
     - JSONResponse: 包含 MCP 服务器列表的 JSON 响应
     """
@@ -97,11 +97,11 @@ async def create_controller(
 ) -> JSONResponse:
     """
     创建 MCP 服务器接口
-
+    
     参数:
     - data (McpCreateSchema): 创建 MCP 服务器模型
     - auth (AuthSchema): 认证信息模型
-
+    
     返回:
     - JSONResponse: 包含创建 MCP 服务器结果的 JSON 响应
     """
@@ -118,12 +118,12 @@ async def update_controller(
 ) -> JSONResponse:
     """
     修改 MCP 服务器接口
-
+    
     参数:
     - data (McpUpdateSchema): 修改 MCP 服务器模型
     - id (int): MCP 服务器ID
     - auth (AuthSchema): 认证信息模型
-
+    
     返回:
     - JSONResponse: 包含修改 MCP 服务器结果的 JSON 响应
     """
@@ -139,11 +139,11 @@ async def delete_controller(
 ) -> JSONResponse:
     """
     删除 MCP 服务器接口
-
+    
     参数:
     - ids (list[int]): MCP 服务器ID列表
     - auth (AuthSchema): 认证信息模型
-
+    
     返回:
     - JSONResponse: 包含删除 MCP 服务器结果的 JSON 响应
     """
@@ -151,10 +151,16 @@ async def delete_controller(
     log.info(f"删除 MCP 服务器成功: {ids}")
     return SuccessResponse(msg="删除 MCP 服务器成功")
 
+
 @AIRouter.websocket("/ws/chat", name="WebSocket聊天")
 async def websocket_chat_controller(
     websocket: WebSocket,
 ):
+    """
+    WebSocket聊天接口
+    
+    ws://127.0.0.1:8001/api/v1/ai/mcp/ws/chat
+    """
     await websocket.accept()
     try:
         while True:

+ 3 - 47
backend/app/api/v1/module_business/crane/controller.py

@@ -1,10 +1,8 @@
 # -*- coding: utf-8 -*-
 
-import json
-import asyncio
-from starlette.websockets import WebSocketDisconnect
-from fastapi import APIRouter, Depends, UploadFile, Body, Path, WebSocket
+from fastapi import APIRouter, Depends, UploadFile, Body, Path, Query
 from fastapi.responses import StreamingResponse, JSONResponse
+
 from app.common.response import SuccessResponse, StreamResponse
 from app.core.dependencies import AuthPermission
 from app.api.v1.module_system.auth.schema import AuthSchema
@@ -125,46 +123,4 @@ async def export_crane_template_controller() -> StreamingResponse:
         data=bytes2file_response(import_template_result),
         media_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
         headers={'Content-Disposition': 'attachment; filename=biz_crane_template.xlsx'}
-    )
-
-@BizCraneRouter.websocket("/ws/online", name="天车在线状态检测", dependencies=[])
-async def websocket_chat_controller(
-        websocket: WebSocket,
-):
-    await websocket.accept()
-    is_closed = False
-    try:
-        log.info("前端WebSocket连接成功,开始推送测试数据")
-        # 构造测试数据(完全匹配前端预期的格式)
-        test_messages = [
-            # 测试报警消息(gc/alert)
-            {
-                "topic": "gc/crane_status",
-                "data": [
-                    {"crane_no": "11111111", "is_online": True},
-                    {"crane_no": "123456", "is_online": False}
-                ]
-            }
-        ]
-        # 循环推送测试数据(每隔3秒推一次,模拟实时更新)
-        while True:
-            if is_closed:
-                break
-            for msg in test_messages:
-                # 将字典转为JSON字符串(前端能解析)
-                await websocket.send_text(json.dumps(msg))
-                log.info(f"已推送测试数据: {msg['topic']}")
-                await asyncio.sleep(5)  # 每3秒推一条,可调整间隔
-    except WebSocketDisconnect:
-        is_closed = True
-        log.info("前端主动断开WebSocket连接")
-    except Exception as e:
-        is_closed = True
-        log.error(f"WebSocket异常: {str(e)}", exc_info=True)
-    finally:
-        if not is_closed:
-            try:
-                await websocket.close()
-                log.info("关闭WebSocket连接")
-            except RuntimeError:
-                log.debug("WebSocket已关闭,无需重复操作")
+    )

+ 4 - 49
backend/app/api/v1/module_business/vardict/controller.py

@@ -1,10 +1,8 @@
 # -*- coding: utf-8 -*-
 
-import json
-import asyncio
-from starlette.websockets import WebSocketDisconnect
-from fastapi import APIRouter, Depends, UploadFile, Body, Path, WebSocket
+from fastapi import APIRouter, Depends, UploadFile, Body, Path, Query
 from fastapi.responses import StreamingResponse, JSONResponse
+
 from app.common.response import SuccessResponse, StreamResponse
 from app.core.dependencies import AuthPermission
 from app.api.v1.module_system.auth.schema import AuthSchema
@@ -12,6 +10,7 @@ from app.core.base_params import PaginationQueryParam
 from app.utils.common_util import bytes2file_response
 from app.core.logger import log
 from app.core.base_schema import BatchSetAvailable
+
 from .service import BizVarDictService
 from .schema import BizVarDictCreateSchema, BizVarDictUpdateSchema, BizVarDictQueryParam
 
@@ -124,48 +123,4 @@ async def export_vardict_template_controller() -> StreamingResponse:
         data=bytes2file_response(import_template_result),
         media_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
         headers={'Content-Disposition': 'attachment; filename=biz_var_dict_template.xlsx'}
-    )
-
-@BizVarDictRouter.websocket("/ws/alert", name="变量报警信息", dependencies=[])
-async def websocket_chat_controller(
-        websocket: WebSocket,
-):
-    await websocket.accept()
-    is_closed = False
-    try:
-
-        log.info("前端WebSocket连接成功,开始推送测试数据")
-        # 构造测试数据(完全匹配前端预期的格式)
-        test_messages = [
-            # 测试报警消息(gc/alert)
-            {
-                "topic": "gc/alert",
-                "data": [
-                    {"switch_type": "2", "crane_name": "塔吊001", "msg": "轻度过载报警"},
-                    {"switch_type": "3", "crane_name": "塔吊002", "msg": "中度倾斜报警"},
-                    {"switch_type": "4", "crane_name": "塔吊003", "msg": "重度碰撞报警"}
-                ]
-            }
-        ]
-        # 循环推送测试数据(每隔3秒推一次,模拟实时更新)
-        while True:
-            if is_closed:
-                break
-            for msg in test_messages:
-                # 将字典转为JSON字符串(前端能解析)
-                await websocket.send_text(json.dumps(msg))
-                log.info(f"已推送测试数据: {msg['topic']}")
-                await asyncio.sleep(5)  # 每3秒推一条,可调整间隔
-    except WebSocketDisconnect:
-        is_closed = True
-        log.info("前端主动断开WebSocket连接")
-    except Exception as e:
-        is_closed = True
-        log.error(f"WebSocket异常: {str(e)}", exc_info=True)
-    finally:
-        if not is_closed:
-            try:
-                await websocket.close()
-                log.info("关闭WebSocket连接")
-            except RuntimeError:
-                log.debug("WebSocket已关闭,无需重复操作")
+    )

+ 84 - 137
backend/app/plugin/init_app.py

@@ -1,9 +1,8 @@
 # -*- coding: utf-8 -*-
 
-import json
 from starlette.responses import HTMLResponse
-from typing import Any, AsyncGenerator, Callable
-from fastapi import FastAPI, Request, Response, WebSocket
+from typing import Any, AsyncGenerator
+from fastapi import Depends, FastAPI, Request, Response
 from fastapi.staticfiles import StaticFiles
 from fastapi.concurrency import asynccontextmanager
 from fastapi.openapi.docs import (
@@ -12,10 +11,8 @@ from fastapi.openapi.docs import (
     get_swagger_ui_oauth2_redirect_html
 )
 from fastapi_limiter import FastAPILimiter
+from fastapi_limiter.depends import RateLimiter
 from math import ceil
-from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint
-from starlette.types import ASGIApp
-import time
 
 from app.config.setting import settings
 from app.core.logger import log
@@ -29,138 +26,40 @@ from app.api.v1.module_system.params.service import ParamsService
 from app.api.v1.module_system.dict.service import DictDataService
 
 
-# ✅ 核心修复:手动实现Redis限流计数(适配0.1.6版本,确保限流生效)
-class CustomLimiterMiddleware(BaseHTTPMiddleware):
-    """
-    手动实现限流逻辑(直接操作Redis,替代RateLimiter的隐式调用)
-    核心:按IP+路径生成唯一key,Redis计数,超过阈值触发限流
-    """
-
-    def __init__(
-            self,
-            app: ASGIApp,
-            times: int = 5,  # 限流次数
-            seconds: int = 10,  # 限流时间窗口
-            prefix: str = settings.REQUEST_LIMITER_REDIS_PREFIX or "fastapi-limiter:"  # Redis key前缀
-    ):
-        super().__init__(app)
-        self.times = times
-        self.seconds = seconds
-        self.prefix = prefix
-
-    # 生成唯一限流key(IP + 请求路径)
-    def _get_limit_key(self, request: Request) -> str:
-        client_ip = request.client.host or "unknown"
-        path = request.url.path
-        return f"{self.prefix}:{client_ip}:{path}"
-
-    # 限流触发回调
-    async def _limit_callback(self, expire: int):
-        """返回指定格式的429响应"""
-        expires = ceil(expire / 30)  # 动态计算Retry-After值(你示例中的222是占位,实际是expires)
-        # 构造严格匹配的响应体
-        response_body = {
-            "code": -1,
-            "msg": "请求过于频繁,请稍后重试",
-            "data": {
-                "Retry-After": str(expires)  # 动态值,替换示例中的222
-            },
-            "status_code": 429,
-            "success": False
-        }
-        # 返回Response对象
-        return Response(
-            content=json.dumps(response_body, ensure_ascii=False),  # 确保中文正常显示
-            status_code=429,  # HTTP状态码
-            headers={"Retry-After": str(expires)},  # 响应头也保留(可选)
-            media_type="application/json"  # 声明JSON格式
-        )
-
-    async def dispatch(self, request: Request, call_next: RequestResponseEndpoint):
-        # 1. WebSocket请求 → 跳过限流
-        if request.scope.get("type") == "websocket":
-            return await call_next(request)
-
-        # 2. HTTP请求 → 执行手动限流逻辑
-        try:
-            # 获取Redis连接(fastapi-limiter已初始化)
-            redis_client = FastAPILimiter.redis
-            if not redis_client:
-                log.warning("Redis未初始化,跳过限流")
-                return await call_next(request)
-
-            # 生成限流key
-            limit_key = self._get_limit_key(request)
-            # 当前时间戳
-            now = int(time.time())
-            # 时间窗口起始(now - seconds)
-            window_start = now - self.seconds
-
-            # ✅ 核心Redis操作(原子计数,避免并发问题)
-            async with redis_client.pipeline(transaction=True) as pipe:
-                # 1. 删除时间窗口外的旧计数
-                await pipe.zremrangebyscore(limit_key, 0, window_start)
-                # 2. 添加当前请求时间戳到有序集合
-                await pipe.zadd(limit_key, {now: now})
-                # 3. 设置key过期时间(避免内存泄漏)
-                await pipe.expire(limit_key, self.seconds * 2)
-                # 4. 获取当前窗口内的请求数
-                await pipe.zcard(limit_key)
-                # 执行管道
-                results = await pipe.execute()
-                # 提取请求数(第四个操作的结果)
-                request_count = results[3]
-
-            # ✅ 判断是否超过限流阈值
-            if request_count > self.times:
-                # 获取过期时间,调用回调返回响应
-                ttl = await redis_client.ttl(limit_key)
-                # 关键:返回响应,不是 raise
-                return await self._limit_callback(ttl or self.seconds)
-        except CustomException:
-            # 限流触发 → 抛异常(全局处理器捕获)
-            raise
-        except Exception as e:
-            # 非限流异常 → 日志记录,放行请求(避免阻断业务)
-            log.error(f"限流中间件执行异常: {str(e)}")
-
-        # 3. 执行后续处理
-        response = await call_next(request)
-        return response
-
-
-# ✅ 生命周期函数(仅初始化,无中间件操作)
 @asynccontextmanager
 async def lifespan(app: FastAPI) -> AsyncGenerator[Any, Any]:
+    """
+    自定义 FastAPI 应用生命周期。
+    
+    参数:
+    - app (FastAPI): FastAPI 应用实例。
+    
+    返回:
+    - AsyncGenerator[Any, Any]: 生命周期上下文生成器。
+    """
     try:
-        # 数据库初始化
         await InitializeData().init_db()
         log.info(f"✅ {settings.DATABASE_TYPE}数据库初始化完成")
-
-        # 全局事件加载
         await import_modules_async(modules=settings.EVENT_LIST, desc="全局事件", app=app, status=True)
         log.info("✅ 全局事件模块加载完成")
-
-        # Redis配置/字典初始化
         await ParamsService().init_config_service(redis=app.state.redis)
         log.info("✅ Redis系统配置初始化完成")
         await DictDataService().init_dict_service(redis=app.state.redis)
         log.info("✅ Redis数据字典初始化完成")
-
-        # 定时任务初始化
         await SchedulerUtil.init_system_scheduler()
         scheduler_jobs_count = len(SchedulerUtil.get_all_jobs())
         scheduler_status = SchedulerUtil.get_job_status()
         log.info(f"✅ 定时任务调度器初始化完成 ({scheduler_jobs_count} 个任务)")
 
-        # ✅ 初始化fastapi-limiter(仅获取Redis连接)
+        # 6. 初始化请求限制器
         await FastAPILimiter.init(
             redis=app.state.redis,
             prefix=settings.REQUEST_LIMITER_REDIS_PREFIX,
+            http_callback=http_limit_callback,
         )
         log.info("✅ 请求限制器初始化完成")
-
-        # 启动信息面板
+        
+        # 导入并显示最终的启动信息面板
         from app.utils.console import run as console_run
         from app.common.enums import EnvironmentEnum
         console_run(
@@ -171,14 +70,13 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[Any, Any]:
             scheduler_jobs=scheduler_jobs_count,
             scheduler_status=scheduler_status,
         )
-
+        
     except Exception as e:
         log.error(f"❌ 应用初始化失败: {str(e)}")
         raise
 
     yield
-
-    # 关闭逻辑
+    
     try:
         await import_modules_async(modules=settings.EVENT_LIST, desc="全局事件", app=app, status=False)
         log.info("✅ 全局事件模块卸载完成")
@@ -186,45 +84,78 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[Any, Any]:
         log.info("✅ 定时任务调度器已关闭")
         await FastAPILimiter.close()
         log.info("✅ 请求限制器已关闭")
+
     except Exception as e:
         log.error(f"❌ 应用关闭过程中发生错误: {str(e)}")
+        
 
-
-# ✅ 中间件注册(添加自定义限流中间件)
 def register_middlewares(app: FastAPI) -> None:
-    # 1. 原有中间件
+    """
+    注册全局中间件。
+
+    参数:
+    - app (FastAPI): FastAPI 应用实例。
+
+    返回:
+    - None
+    """
     for middleware in settings.MIDDLEWARE_LIST[::-1]:
         if not middleware:
             continue
         middleware = import_module(middleware, desc="中间件")
         app.add_middleware(middleware)
 
-    # 2. 限流中间件(核心:5次/10秒)
-    app.add_middleware(
-        CustomLimiterMiddleware,
-        times=5,
-        seconds=10,
-        prefix=settings.REQUEST_LIMITER_REDIS_PREFIX or "fastapi-limiter:"
-    )
-    log.info("✅ 限流中间件注册完成")
+def register_exceptions(app: FastAPI) -> None:
+    """
+    统一注册异常处理器。
 
+    参数:
+    - app (FastAPI): FastAPI 应用实例。
 
-# ✅ 其他函数(不变)
-def register_exceptions(app: FastAPI) -> None:
+    返回:
+    - None
+    """
     handle_exception(app)
 
-
 def register_routers(app: FastAPI) -> None:
-    app.include_router(router=router)
+    """
+    注册根路由。
+
+    参数:
+    - app (FastAPI): FastAPI 应用实例。
 
+    返回:
+    - None
+    """
+    app.include_router(router=router, dependencies=[Depends(RateLimiter(times=5, seconds=10))])
 
 def register_files(app: FastAPI) -> None:
+    """
+    注册静态资源挂载和文件相关配置。
+
+    参数:
+    - app (FastAPI): FastAPI 应用实例。
+
+    返回:
+    - None
+    """
+    # 挂载静态文件目录
     if settings.STATIC_ENABLE:
+        # 确保日志目录存在
         settings.STATIC_ROOT.mkdir(parents=True, exist_ok=True)
         app.mount(path=settings.STATIC_URL, app=StaticFiles(directory=settings.STATIC_ROOT), name=settings.STATIC_DIR)
 
-
 def reset_api_docs(app: FastAPI) -> None:
+    """
+    使用本地静态资源自定义 API 文档页面(Swagger UI 与 ReDoc)。
+
+    参数:
+    - app (FastAPI): FastAPI 应用实例。
+
+    返回:
+    - None
+    """
+
     @app.get(settings.DOCS_URL, include_in_schema=False)
     async def custom_swagger_ui_html() -> HTMLResponse:
         return get_swagger_ui_html(
@@ -247,4 +178,20 @@ def reset_api_docs(app: FastAPI) -> None:
             title=app.title + " - ReDoc",
             redoc_js_url=settings.REDOC_JS_URL,
             redoc_favicon_url=settings.FAVICON_URL,
-        )
+        )
+
+async def http_limit_callback(request: Request, response: Response, expire: int):
+    """
+    请求限制时的默认回调函数
+
+    :param request: FastAPI 请求对象
+    :param response: FastAPI 响应对象
+    :param expire: 剩余毫秒数
+    :return:
+    """
+    expires = ceil(expire / 30)
+    raise CustomException(
+        status_code=429,
+        msg='请求过于频繁,请稍后重试',
+        data={'Retry-After': str(expires)},
+    )

+ 4 - 12
backend/tests/test_main.py

@@ -7,22 +7,14 @@
 """
 
 import pytest
-import websockets
 from fastapi.testclient import TestClient
 
 
 def test_check_health(test_client: TestClient):
-    # 替换成你的实际 WS 地址(和前端一致)
-    ws_url = "ws://127.0.0.1:8001/api/v1/ai/mcp/ws/chat"
-    try:
-        async with websockets.connect(ws_url) as websocket:
-            print("✅ WS 连接成功!")
-            # 接收后端推送的消息
-            while True:
-                msg = await websocket.recv()
-                print("收到消息:", msg)
-    except Exception as e:
-        print("❌ WS 连接失败:", str(e))
+    """测试健康检查接口"""
+    response = test_client.get("/common/health")
+    assert response.status_code == 200
+    assert response.json() == {"msg": "Healthy"}
 
 
 # 运行所有测试

+ 2 - 0
frontend/package.json

@@ -77,6 +77,7 @@
     "exceljs": "^4.4.0",
     "file-saver": "^2.0.5",
     "js-beautify": "^1.15.4",
+    "mqtt": "^5.14.1",
     "nprogress": "^0.2.0",
     "path-browserify": "^1.0.1",
     "path-to-regexp": "^8.2.0",
@@ -96,6 +97,7 @@
     "@iconify/utils": "^2.3.0",
     "@types/codemirror": "^5.60.16",
     "@types/file-saver": "^2.0.7",
+    "@types/mqtt": "^2.5.0",
     "@types/node": "^22.16.5",
     "@types/nprogress": "^0.2.3",
     "@types/path-browserify": "^1.0.3",

+ 100 - 97
frontend/src/views/web/overview/index.vue

@@ -53,7 +53,9 @@
 import BizCraneAPI, { BizCranePageQuery, BizCraneTable } from '@/api/module_business/crane'
 import emptybgUrl from '@/assets/images/empty-bg.png';
 import { ElMessage } from 'element-plus';
-import { onMounted, onUnmounted, ref, reactive, inject } from 'vue';
+import { onMounted, onUnmounted, ref, reactive, inject, Ref } from 'vue';
+import { useRouter } from 'vue-router'; // 补充导入 useRouter
+import mqtt, { MqttClient } from 'mqtt'; // 导入 mqtt
 
 interface alertData {
   switch_type?: string;
@@ -77,15 +79,15 @@ const tab_loading = ref(true)
 const alertData = ref<alertData[]>([])
 const craneData = ref<BizCraneTable[]>([]);
 
-// WS 配置(两个独立的WS地址)
-const ALERT_WS_URL = import.meta.env.VITE_APP_WS_ENDPOINT + "/api/v1/business/vardict/ws/alert";
-const ONLINE_WS_URL = import.meta.env.VITE_APP_WS_ENDPOINT + "/api/v1/business/crane/ws/online";
+// ======================== MQTT 配置(替代原有两个 WS 地址) ========================
+const MQTT_WS_URL = import.meta.env.VITE_APP_WS_ENDPOINT || 'ws://127.0.0.1:9001';
+// 两个主题(对应原有两个 WS 的消息类型,需与后端发布的主题一致)
+const ALERT_TOPIC = 'gc/alert'; // 报警消息主题
+const ONLINE_STATUS_TOPIC = 'gc/crane_status'; // 在线状态消息主题
 
-// WS 实例 & 状态管理(两个独立实例)
-let alertWs: WebSocket | null = null; // 报警WS
-let onlineWs: WebSocket | null = null; // 在线状态WS
-const alertConnectionStatus = ref<"connected" | "connecting" | "disconnected">("disconnected");
-const onlineConnectionStatus = ref<"connected" | "connecting" | "disconnected">("disconnected");
+// MQTT 客户端实例 & 连接状态
+let mqttClient: MqttClient | null = null;
+const mqttConnectionStatus = ref<"connected" | "connecting" | "disconnected">("disconnected");
 
 // 表格配置
 const tableConfig = ref([
@@ -178,89 +180,94 @@ const getData = () => {
   getCraneListData()
 }
 
-// ======================== 核心改造:WS 连接管理(两个独立WS) ========================
+// ======================== MQTT 核心逻辑(替代原有 WS 连接) ========================
 /**
- * 通用 WS 连接函数
- * @param url WS地址
- * @param name WS名称(用于日志/提示)
- * @param onMessage 消息处理回调
- * @param statusRef 连接状态Ref
- * @returns WS实例
+ * 初始化 MQTT 连接并订阅主题
  */
- const connectWS = (
-  url: string,
-  name: string,
-  onMessage: (data: any) => void,
-  statusRef: Ref<"connected" | "connecting" | "disconnected">
-): WebSocket | null => {
-  // 初始状态设为 connecting
-  statusRef.value = "connecting";
-  console.log(`[${name}] 开始连接WS: ${url}`);
+const initMqttClient = () => {
+  // 避免重复连接
+  if (mqttClient && mqttClient.connected) {
+    console.log('[MQTT] 客户端已连接,无需重复初始化');
+    return;
+  }
 
-  try {
-    // 在此处创建 WebSocket 实例
-    const ws = new WebSocket(url);
-
-    // 连接成功
-    ws.onopen = () => {
-      console.log(`[${name}] WS连接已建立`);
-      statusRef.value = "connected";
-      ElMessage.success(`${name}系统连接成功`);
-    };
-
-    // 接收消息
-    ws.onmessage = (event) => {
-      console.log(`[${name}] 收到WS消息:`, event.data);
-      try {
-        const data = JSON.parse(event.data);
-        onMessage(data);
-      } catch (err) {
-        console.error(`[${name}] 解析WS消息失败:`, err);
-        onMessage({ content: event.data });
+  mqttConnectionStatus.value = "connecting";
+  console.log(`[MQTT] 开始连接 Broker: ${MQTT_WS_URL}`);
+
+  // 创建 MQTT 客户端(配置自动重连、心跳等)
+  mqttClient = mqtt.connect(MQTT_WS_URL, {
+    clientId: `crane_web_${Math.random().toString(16).slice(2, 10)}`, // 唯一客户端ID,避免冲突
+    keepalive: 60, // 心跳间隔(秒),自动维护连接
+    reconnectPeriod: 3000, // 断开后自动重连间隔(毫秒)
+    clean: true, // 清理会话,断开后不保留订阅关系
+    connectTimeout: 10000, // 连接超时时间(毫秒)
+  });
+
+  // 连接成功回调
+  mqttClient.on('connect', () => {
+    mqttConnectionStatus.value = "connected";
+    console.log('[MQTT] 连接成功');
+    ElMessage.success('MQTT 消息系统连接成功');
+
+    // 同时订阅两个主题
+    mqttClient?.subscribe([ALERT_TOPIC, ONLINE_STATUS_TOPIC], (err) => {
+      if (err) {
+        console.error('[MQTT] 订阅主题失败:', err);
+        ElMessage.error('订阅消息主题失败');
+        return;
       }
-    };
-
-    // 连接关闭(自动重连)
-    ws.onclose = (event) => {
-      console.log(`[${name}] WS连接已关闭`, event.code, event.reason);
-      statusRef.value = "disconnected";
-      // 非主动关闭(code!==1000)则3秒后重连
-      if (event.code !== 1000) {
-        setTimeout(() => {
-          console.log(`[${name}] 尝试重连WS...`);
-          connectWS(url, name, onMessage, statusRef);
-        }, 3000);
+      console.log(`[MQTT] 订阅主题成功:${ALERT_TOPIC}、${ONLINE_STATUS_TOPIC}`);
+    });
+  });
+
+  // 接收消息回调(统一处理所有主题消息)
+  mqttClient.on('message', (topic, payload) => {
+    console.log(`[MQTT] 收到主题 ${topic} 的消息:`, payload.toString());
+    try {
+      const data = JSON.parse(payload.toString());
+      // 根据主题区分消息类型,复用原有消息处理逻辑
+      if (topic === ALERT_TOPIC) {
+        handleAlertMessage(data);
+      } else if (topic === ONLINE_STATUS_TOPIC) {
+        handleOnlineMessage(data);
       }
-    };
-
-    // 连接错误
-    ws.onerror = (error) => {
-      console.error(`[${name}] WS连接错误:`, error);
-      statusRef.value = "disconnected";
-      ElMessage.error(`${name}连接失败,请检查服务器状态`);
-    };
-
-    return ws;
-  } catch (err) {
-    console.error(`[${name}] 创建WS连接失败:`, err);
-    statusRef.value = "disconnected";
-    ElMessage.error(`${name}连接初始化失败`);
-    return null;
-  }
+    } catch (err) {
+      console.error('[MQTT] 解析消息失败:', err);
+      ElMessage.warning('收到无效消息格式');
+    }
+  });
+
+  // 连接断开回调
+  mqttClient.on('close', () => {
+    mqttConnectionStatus.value = "disconnected";
+    console.log('[MQTT] 连接已断开');
+    ElMessage.warning('MQTT 消息连接已断开,正在尝试重连...');
+  });
+
+  // 连接错误回调
+  mqttClient.on('error', (err) => {
+    mqttConnectionStatus.value = "disconnected";
+    console.error('[MQTT] 连接错误:', err);
+    ElMessage.error('MQTT 消息连接失败,请检查服务器状态');
+  });
 };
 
-// 报警WS消息处理
+/**
+ * 报警消息处理(复用原有逻辑)
+ */
 const handleAlertMessage = (data: any) => {
   console.log('起重机报警消息:', data);
   alarm_loading.value = false;
-  alertData.value = data.data || []; // 赋值给报警列表
+  alertData.value = data || [];
 };
 
-// 在线状态WS消息处理
+/**
+ * 在线状态消息处理(复用原有逻辑)
+ */
 const handleOnlineMessage = (data: any) => {
   console.log('起重机在线状态消息:', data);
-  if (Array.isArray(data.data)) {
-    data.data.forEach((item: any) => {
+  if (Array.isArray(data)) {
+    data.forEach((item: any) => {
       craneData.value.forEach((craneItem) => {
         if (craneItem.crane_no === item.crane_no) {
           craneItem.online_status = item.is_online ? '在线' : '离线';
@@ -270,37 +277,33 @@ const handleOnlineMessage = (data: any) => {
   }
 };
 
-// 断开所有WS连接
-const disconnectAllWS = () => {
-  // 断开报警WS
-  if (alertWs) {
-    alertWs.close(1000, "用户主动断开");
-    alertWs = null;
-  }
-  // 断开在线状态WS
-  if (onlineWs) {
-    onlineWs.close(1000, "用户主动断开");
-    onlineWs = null;
+/**
+ * 断开 MQTT 连接
+ */
+const disconnectMqtt = () => {
+  if (mqttClient) {
+    mqttClient.end(true, () => {
+      console.log('[MQTT] 主动断开连接');
+      mqttClient = null;
+      mqttConnectionStatus.value = "disconnected";
+    });
   }
-  alertConnectionStatus.value = "disconnected";
-  onlineConnectionStatus.value = "disconnected";
 };
 
 // ======================== 生命周期 ========================
 onMounted(async () => {
   // 初始化业务数据
-  getData()
+  getData();
+  // 初始化 MQTT 连接
+  initMqttClient();
   if (receiveData) {
     receiveData({ craneName: '', isShowHomeButton: false });
   }
-  // 连接两个WS
-  alertWs = connectWS(ALERT_WS_URL, "报警", handleAlertMessage, alertConnectionStatus);
-  onlineWs = connectWS(ONLINE_WS_URL, "在线状态", handleOnlineMessage, onlineConnectionStatus);
 });
 
 onUnmounted(() => {
-  // 页面销毁时断开所有WS
-  disconnectAllWS();
+  // 页面销毁时主动断开 MQTT 连接
+  disconnectMqtt();
 });
 </script>