| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179 |
- // src/utils/mqttUtil.ts
- import mqtt, { MqttClient, IClientOptions } from 'mqtt';
- /**
- * MQTT消息处理回调类型
- */
- export type MqttMessageCallback = (topic: string, payload: any) => void;
- /**
- * 单个MQTT实例配置项
- */
- export interface MqttInstanceConfig {
- wsUrl: string; // 当前组件的MQTT WS地址
- clientOptions?: IClientOptions; // 客户端配置(用户名、密码等)
- topics: string[]; // 当前组件要订阅的主题列表
- }
- /**
- * MQTT工具类(非单例,每个组件可独立实例化)
- */
- class MqttUtil {
- // 实例私有属性(每个实例独立拥有)
- private mqttClient: MqttClient | null = null;
- private config: MqttInstanceConfig;
- private messageCallback: MqttMessageCallback | null = null;
- private isConnected: boolean = false; // 当前实例连接状态
- /**
- * 构造函数(每个组件new独立实例)
- * @param config 当前实例的配置项
- */
- constructor(config: MqttInstanceConfig) {
- this.config = config;
- }
- /**
- * 初始化当前组件的MQTT连接(创建连接+订阅主题)
- * @param callback 当前组件的消息回调
- */
- public initConnect(callback: MqttMessageCallback): void {
- // 避免重复初始化
- if (this.isConnected && this.mqttClient) {
- console.log(`[MQTT] 当前组件连接已建立,无需重复初始化`);
- this.messageCallback = callback;
- return;
- }
- const { wsUrl, clientOptions, topics } = this.config;
- console.log(`[MQTT] 组件独立连接 Broker: ${wsUrl}`);
- // 创建独立的MQTT客户端
- this.mqttClient = mqtt.connect(wsUrl, clientOptions);
- this.messageCallback = callback;
- // 绑定事件
- this.bindEvents(topics);
- }
- /**
- * 绑定当前实例的MQTT事件
- * @param topics 要订阅的主题
- */
- private bindEvents(topics: string[]): void {
- if (!this.mqttClient) return;
- // 连接成功
- this.mqttClient.on('connect', () => {
- this.isConnected = true;
- console.log(`[MQTT] 组件独立连接成功`);
- // 连接成功后订阅当前组件的主题
- this.subscribeTopics(topics);
- });
- // 接收消息(组件专属回调)
- this.mqttClient.on('message', (topic, payload) => {
- let data: any = payload.toString();
- // 组件内单独处理中文转义(按需开启)
- try {
- // 还原Unicode中文 + 解析JSON
- const unicodeReg = /\\u([0-9a-fA-F]{4})/g;
- const payloadStr = payload.toString().replace(unicodeReg, (_, hex) => {
- return String.fromCharCode(parseInt(hex, 16));
- });
- data = JSON.parse(payloadStr);
- } catch (err) {
- console.warn('[MQTT] 消息非JSON格式,使用原始字符串:', err);
- }
- // 执行当前组件的回调
- if (this.messageCallback) {
- this.messageCallback(topic, data);
- }
- });
- // 连接错误
- this.mqttClient.on('error', (err) => {
- this.isConnected = false;
- console.error(`[MQTT] 组件独立连接错误:`, err);
- });
- // 连接断开
- this.mqttClient.on('close', () => {
- this.isConnected = false;
- console.log(`[MQTT] 组件独立连接已断开`);
- });
- }
- /**
- * 订阅主题(当前实例专属)
- * @param topics 主题列表/单个主题
- */
- public subscribeTopics(topics: string | string[]): void {
- if (!this.mqttClient || !this.isConnected) {
- console.error(`[MQTT] 组件连接未建立,无法订阅主题`);
- return;
- }
- const topicList = Array.isArray(topics) ? topics : [topics];
- this.mqttClient.subscribe(topicList, (err) => {
- if (err) {
- console.error(`[MQTT] 组件订阅主题失败:`, err);
- return;
- }
- console.log(`[MQTT] 组件订阅主题成功:${topicList.join('、')}`);
- });
- }
- /**
- * 取消订阅主题(当前实例专属)
- * @param topics 主题列表/单个主题
- */
- public unsubscribeTopics(topics: string | string[]): void {
- if (!this.mqttClient || !this.isConnected) {
- console.error(`[MQTT] 组件连接未建立,无法取消订阅`);
- return;
- }
- const topicList = Array.isArray(topics) ? topics : [topics];
- this.mqttClient.unsubscribe(topicList, (err) => {
- if (err) {
- console.error(`[MQTT] 组件取消订阅主题失败:`, err);
- return;
- }
- console.log(`[MQTT] 组件取消订阅主题成功:${topicList.join('、')}`);
- });
- }
- /**
- * 释放当前实例所有资源(断开连接+清空回调+置空实例)
- */
- public releaseResources(): void {
- // 1. 仅当连接有效时,执行断开操作(无需手动取消订阅)
- if (this.mqttClient) {
- try {
- // end(false):优雅断开(先发送完未发送的消息,再断开),默认false
- // 无需手动取消订阅,Broker会自动清理该客户端的所有订阅
- this.mqttClient.end(false, () => {
- console.log(`[MQTT] 组件MQTT连接优雅断开`);
- });
- } catch (err) {
- console.warn(`[MQTT] 组件MQTT连接断开时出现异常:`, err);
- }
- this.mqttClient = null;
- }
-
- // 2. 清空回调和连接状态(无论连接是否有效,都要重置)
- this.messageCallback = null;
- this.isConnected = false;
- console.log(`[MQTT] 组件MQTT资源已全部释放`);
- }
- /**
- * 获取当前实例连接状态
- */
- public getConnectStatus(): boolean {
- return this.isConnected;
- }
- }
- export default MqttUtil;
|