mqttUtil.ts 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. // src/utils/mqttUtil.ts
  2. import mqtt, { MqttClient, IClientOptions } from 'mqtt';
  3. /**
  4. * MQTT消息处理回调类型
  5. */
  6. export type MqttMessageCallback = (topic: string, payload: any) => void;
  7. /**
  8. * 单个MQTT实例配置项
  9. */
  10. export interface MqttInstanceConfig {
  11. wsUrl: string; // 当前组件的MQTT WS地址
  12. clientOptions?: IClientOptions; // 客户端配置(用户名、密码等)
  13. topics: string[]; // 当前组件要订阅的主题列表
  14. }
  15. /**
  16. * MQTT工具类(非单例,每个组件可独立实例化)
  17. */
  18. class MqttUtil {
  19. // 实例私有属性(每个实例独立拥有)
  20. private mqttClient: MqttClient | null = null;
  21. private config: MqttInstanceConfig;
  22. private messageCallback: MqttMessageCallback | null = null;
  23. private isConnected: boolean = false; // 当前实例连接状态
  24. /**
  25. * 构造函数(每个组件new独立实例)
  26. * @param config 当前实例的配置项
  27. */
  28. constructor(config: MqttInstanceConfig) {
  29. this.config = config;
  30. }
  31. /**
  32. * 初始化当前组件的MQTT连接(创建连接+订阅主题)
  33. * @param callback 当前组件的消息回调
  34. */
  35. public initConnect(callback: MqttMessageCallback): void {
  36. // 避免重复初始化
  37. if (this.isConnected && this.mqttClient) {
  38. console.log(`[MQTT] 当前组件连接已建立,无需重复初始化`);
  39. this.messageCallback = callback;
  40. return;
  41. }
  42. const { wsUrl, clientOptions, topics } = this.config;
  43. console.log(`[MQTT] 组件独立连接 Broker: ${wsUrl}`);
  44. // 创建独立的MQTT客户端
  45. this.mqttClient = mqtt.connect(wsUrl, clientOptions);
  46. this.messageCallback = callback;
  47. // 绑定事件
  48. this.bindEvents(topics);
  49. }
  50. /**
  51. * 绑定当前实例的MQTT事件
  52. * @param topics 要订阅的主题
  53. */
  54. private bindEvents(topics: string[]): void {
  55. if (!this.mqttClient) return;
  56. // 连接成功
  57. this.mqttClient.on('connect', () => {
  58. this.isConnected = true;
  59. console.log(`[MQTT] 组件独立连接成功`);
  60. // 连接成功后订阅当前组件的主题
  61. this.subscribeTopics(topics);
  62. });
  63. // 接收消息(组件专属回调)
  64. this.mqttClient.on('message', (topic, payload) => {
  65. let data: any = payload.toString();
  66. // 组件内单独处理中文转义(按需开启)
  67. try {
  68. // 还原Unicode中文 + 解析JSON
  69. const unicodeReg = /\\u([0-9a-fA-F]{4})/g;
  70. const payloadStr = payload.toString().replace(unicodeReg, (_, hex) => {
  71. return String.fromCharCode(parseInt(hex, 16));
  72. });
  73. data = JSON.parse(payloadStr);
  74. } catch (err) {
  75. console.warn('[MQTT] 消息非JSON格式,使用原始字符串:', err);
  76. }
  77. // 执行当前组件的回调
  78. if (this.messageCallback) {
  79. this.messageCallback(topic, data);
  80. }
  81. });
  82. // 连接错误
  83. this.mqttClient.on('error', (err) => {
  84. this.isConnected = false;
  85. console.error(`[MQTT] 组件独立连接错误:`, err);
  86. });
  87. // 连接断开
  88. this.mqttClient.on('close', () => {
  89. this.isConnected = false;
  90. console.log(`[MQTT] 组件独立连接已断开`);
  91. });
  92. }
  93. /**
  94. * 订阅主题(当前实例专属)
  95. * @param topics 主题列表/单个主题
  96. */
  97. public subscribeTopics(topics: string | string[]): void {
  98. if (!this.mqttClient || !this.isConnected) {
  99. console.error(`[MQTT] 组件连接未建立,无法订阅主题`);
  100. return;
  101. }
  102. const topicList = Array.isArray(topics) ? topics : [topics];
  103. this.mqttClient.subscribe(topicList, (err) => {
  104. if (err) {
  105. console.error(`[MQTT] 组件订阅主题失败:`, err);
  106. return;
  107. }
  108. console.log(`[MQTT] 组件订阅主题成功:${topicList.join('、')}`);
  109. });
  110. }
  111. /**
  112. * 取消订阅主题(当前实例专属)
  113. * @param topics 主题列表/单个主题
  114. */
  115. public unsubscribeTopics(topics: string | string[]): void {
  116. if (!this.mqttClient || !this.isConnected) {
  117. console.error(`[MQTT] 组件连接未建立,无法取消订阅`);
  118. return;
  119. }
  120. const topicList = Array.isArray(topics) ? topics : [topics];
  121. this.mqttClient.unsubscribe(topicList, (err) => {
  122. if (err) {
  123. console.error(`[MQTT] 组件取消订阅主题失败:`, err);
  124. return;
  125. }
  126. console.log(`[MQTT] 组件取消订阅主题成功:${topicList.join('、')}`);
  127. });
  128. }
  129. /**
  130. * 释放当前实例所有资源(断开连接+清空回调+置空实例)
  131. */
  132. public releaseResources(): void {
  133. // 1. 仅当连接有效时,执行断开操作(无需手动取消订阅)
  134. if (this.mqttClient) {
  135. try {
  136. // end(false):优雅断开(先发送完未发送的消息,再断开),默认false
  137. // 无需手动取消订阅,Broker会自动清理该客户端的所有订阅
  138. this.mqttClient.end(false, () => {
  139. console.log(`[MQTT] 组件MQTT连接优雅断开`);
  140. });
  141. } catch (err) {
  142. console.warn(`[MQTT] 组件MQTT连接断开时出现异常:`, err);
  143. }
  144. this.mqttClient = null;
  145. }
  146. // 2. 清空回调和连接状态(无论连接是否有效,都要重置)
  147. this.messageCallback = null;
  148. this.isConnected = false;
  149. console.log(`[MQTT] 组件MQTT资源已全部释放`);
  150. }
  151. /**
  152. * 获取当前实例连接状态
  153. */
  154. public getConnectStatus(): boolean {
  155. return this.isConnected;
  156. }
  157. }
  158. export default MqttUtil;