// src/utils/mqttUtil.ts import mqtt, { MqttClient, IClientOptions } from 'mqtt'; /** * MQTT消息处理回调类型 */ export type MqttMessageCallback = (topic: string, payload: any) => void; /** * 单个MQTT实例配置项 */ export interface MqttInstanceConfig { wsUrl: string; // 当前组件的MQTT WS地址 clientOptions?: IClientOptions; // 客户端配置(用户名、密码等) topics: string[]; // 当前组件要订阅的主题列表 } /** * MQTT工具类(非单例,每个组件可独立实例化) */ class MqttUtil { // 实例私有属性(每个实例独立拥有) private mqttClient: MqttClient | null = null; private config: MqttInstanceConfig; private messageCallback: MqttMessageCallback | null = null; private isConnected: boolean = false; // 当前实例连接状态 /** * 构造函数(每个组件new独立实例) * @param config 当前实例的配置项 */ constructor(config: MqttInstanceConfig) { this.config = config; } /** * 初始化当前组件的MQTT连接(创建连接+订阅主题) * @param callback 当前组件的消息回调 */ public initConnect(callback: MqttMessageCallback): void { // 避免重复初始化 if (this.isConnected && this.mqttClient) { console.log(`[MQTT] 当前组件连接已建立,无需重复初始化`); this.messageCallback = callback; return; } const { wsUrl, clientOptions, topics } = this.config; console.log(`[MQTT] 组件独立连接 Broker: ${wsUrl}`); // 创建独立的MQTT客户端 this.mqttClient = mqtt.connect(wsUrl, clientOptions); this.messageCallback = callback; // 绑定事件 this.bindEvents(topics); } /** * 绑定当前实例的MQTT事件 * @param topics 要订阅的主题 */ private bindEvents(topics: string[]): void { if (!this.mqttClient) return; // 连接成功 this.mqttClient.on('connect', () => { this.isConnected = true; console.log(`[MQTT] 组件独立连接成功`); // 连接成功后订阅当前组件的主题 this.subscribeTopics(topics); }); // 接收消息(组件专属回调) this.mqttClient.on('message', (topic, payload) => { let data: any = payload.toString(); // 组件内单独处理中文转义(按需开启) try { // 还原Unicode中文 + 解析JSON const unicodeReg = /\\u([0-9a-fA-F]{4})/g; const payloadStr = payload.toString().replace(unicodeReg, (_, hex) => { return String.fromCharCode(parseInt(hex, 16)); }); data = JSON.parse(payloadStr); } catch (err) { console.warn('[MQTT] 消息非JSON格式,使用原始字符串:', err); } // 执行当前组件的回调 if (this.messageCallback) { this.messageCallback(topic, data); } }); // 连接错误 this.mqttClient.on('error', (err) => { this.isConnected = false; console.error(`[MQTT] 组件独立连接错误:`, err); }); // 连接断开 this.mqttClient.on('close', () => { this.isConnected = false; console.log(`[MQTT] 组件独立连接已断开`); }); } /** * 订阅主题(当前实例专属) * @param topics 主题列表/单个主题 */ public subscribeTopics(topics: string | string[]): void { if (!this.mqttClient || !this.isConnected) { console.error(`[MQTT] 组件连接未建立,无法订阅主题`); return; } const topicList = Array.isArray(topics) ? topics : [topics]; this.mqttClient.subscribe(topicList, (err) => { if (err) { console.error(`[MQTT] 组件订阅主题失败:`, err); return; } console.log(`[MQTT] 组件订阅主题成功:${topicList.join('、')}`); }); } /** * 取消订阅主题(当前实例专属) * @param topics 主题列表/单个主题 */ public unsubscribeTopics(topics: string | string[]): void { if (!this.mqttClient || !this.isConnected) { console.error(`[MQTT] 组件连接未建立,无法取消订阅`); return; } const topicList = Array.isArray(topics) ? topics : [topics]; this.mqttClient.unsubscribe(topicList, (err) => { if (err) { console.error(`[MQTT] 组件取消订阅主题失败:`, err); return; } console.log(`[MQTT] 组件取消订阅主题成功:${topicList.join('、')}`); }); } /** * 释放当前实例所有资源(断开连接+清空回调+置空实例) */ public releaseResources(): void { // 1. 仅当连接有效时,执行断开操作(无需手动取消订阅) if (this.mqttClient) { try { // end(false):优雅断开(先发送完未发送的消息,再断开),默认false // 无需手动取消订阅,Broker会自动清理该客户端的所有订阅 this.mqttClient.end(false, () => { console.log(`[MQTT] 组件MQTT连接优雅断开`); }); } catch (err) { console.warn(`[MQTT] 组件MQTT连接断开时出现异常:`, err); } this.mqttClient = null; } // 2. 清空回调和连接状态(无论连接是否有效,都要重置) this.messageCallback = null; this.isConnected = false; console.log(`[MQTT] 组件MQTT资源已全部释放`); } /** * 获取当前实例连接状态 */ public getConnectStatus(): boolean { return this.isConnected; } } export default MqttUtil;