client.js 26 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129
  1. 'use strict'
  2. /**
  3. * Module dependencies
  4. */
  5. var events = require('events')
  6. var Store = require('./store')
  7. var eos = require('end-of-stream')
  8. var mqttPacket = require('mqtt-packet')
  9. var Writable = require('readable-stream').Writable
  10. var inherits = require('inherits')
  11. var reInterval = require('reinterval')
  12. var validations = require('./validations')
  13. var xtend = require('xtend')
  14. var setImmediate = global.setImmediate || function (callback) {
  15. // works in node v0.8
  16. process.nextTick(callback)
  17. }
  18. var defaultConnectOptions = {
  19. keepalive: 60,
  20. reschedulePings: true,
  21. protocolId: 'MQTT',
  22. protocolVersion: 4,
  23. reconnectPeriod: 1000,
  24. connectTimeout: 30 * 1000,
  25. clean: true,
  26. resubscribe: true
  27. }
  28. function defaultId () {
  29. return 'mqttjs_' + Math.random().toString(16).substr(2, 8)
  30. }
  31. function sendPacket (client, packet, cb) {
  32. client.emit('packetsend', packet)
  33. var result = mqttPacket.writeToStream(packet, client.stream)
  34. if (!result && cb) {
  35. client.stream.once('drain', cb)
  36. } else if (cb) {
  37. cb()
  38. }
  39. }
  40. function flush (queue) {
  41. if (queue) {
  42. Object.keys(queue).forEach(function (messageId) {
  43. if (typeof queue[messageId] === 'function') {
  44. queue[messageId](new Error('Connection closed'))
  45. delete queue[messageId]
  46. }
  47. })
  48. }
  49. }
  50. function storeAndSend (client, packet, cb) {
  51. client.outgoingStore.put(packet, function storedPacket (err) {
  52. if (err) {
  53. return cb && cb(err)
  54. }
  55. sendPacket(client, packet, cb)
  56. })
  57. }
  58. function nop () {}
  59. /**
  60. * MqttClient constructor
  61. *
  62. * @param {Stream} stream - stream
  63. * @param {Object} [options] - connection options
  64. * (see Connection#connect)
  65. */
  66. function MqttClient (streamBuilder, options) {
  67. var k
  68. var that = this
  69. if (!(this instanceof MqttClient)) {
  70. return new MqttClient(streamBuilder, options)
  71. }
  72. this.options = options || {}
  73. // Defaults
  74. for (k in defaultConnectOptions) {
  75. if (typeof this.options[k] === 'undefined') {
  76. this.options[k] = defaultConnectOptions[k]
  77. } else {
  78. this.options[k] = options[k]
  79. }
  80. }
  81. this.options.clientId = (typeof this.options.clientId === 'string') ? this.options.clientId : defaultId()
  82. this.streamBuilder = streamBuilder
  83. // Inflight message storages
  84. this.outgoingStore = this.options.outgoingStore || new Store()
  85. this.incomingStore = this.options.incomingStore || new Store()
  86. // Should QoS zero messages be queued when the connection is broken?
  87. this.queueQoSZero = this.options.queueQoSZero === undefined ? true : this.options.queueQoSZero
  88. // map of subscribed topics to support reconnection
  89. this._resubscribeTopics = {}
  90. // map of a subscribe messageId and a topic
  91. this.messageIdToTopic = {}
  92. // Ping timer, setup in _setupPingTimer
  93. this.pingTimer = null
  94. // Is the client connected?
  95. this.connected = false
  96. // Are we disconnecting?
  97. this.disconnecting = false
  98. // Packet queue
  99. this.queue = []
  100. // connack timer
  101. this.connackTimer = null
  102. // Reconnect timer
  103. this.reconnectTimer = null
  104. /**
  105. * MessageIDs starting with 1
  106. * ensure that nextId is min. 1, see https://github.com/mqttjs/MQTT.js/issues/810
  107. */
  108. this.nextId = Math.max(1, Math.floor(Math.random() * 65535))
  109. // Inflight callbacks
  110. this.outgoing = {}
  111. // Mark connected on connect
  112. this.on('connect', function () {
  113. if (this.disconnected) {
  114. return
  115. }
  116. this.connected = true
  117. var outStore = this.outgoingStore.createStream()
  118. this.once('close', remove)
  119. outStore.on('end', function () {
  120. that.removeListener('close', remove)
  121. })
  122. outStore.on('error', function (err) {
  123. that.removeListener('close', remove)
  124. that.emit('error', err)
  125. })
  126. function remove () {
  127. outStore.destroy()
  128. outStore = null
  129. }
  130. function storeDeliver () {
  131. // edge case, we wrapped this twice
  132. if (!outStore) {
  133. return
  134. }
  135. var packet = outStore.read(1)
  136. var cb
  137. if (!packet) {
  138. // read when data is available in the future
  139. outStore.once('readable', storeDeliver)
  140. return
  141. }
  142. // Avoid unnecessary stream read operations when disconnected
  143. if (!that.disconnecting && !that.reconnectTimer) {
  144. cb = that.outgoing[packet.messageId]
  145. that.outgoing[packet.messageId] = function (err, status) {
  146. // Ensure that the original callback passed in to publish gets invoked
  147. if (cb) {
  148. cb(err, status)
  149. }
  150. storeDeliver()
  151. }
  152. that._sendPacket(packet)
  153. } else if (outStore.destroy) {
  154. outStore.destroy()
  155. }
  156. }
  157. // start flowing
  158. storeDeliver()
  159. })
  160. // Mark disconnected on stream close
  161. this.on('close', function () {
  162. this.connected = false
  163. clearTimeout(this.connackTimer)
  164. })
  165. // Setup ping timer
  166. this.on('connect', this._setupPingTimer)
  167. // Send queued packets
  168. this.on('connect', function () {
  169. var queue = this.queue
  170. function deliver () {
  171. var entry = queue.shift()
  172. var packet = null
  173. if (!entry) {
  174. return
  175. }
  176. packet = entry.packet
  177. that._sendPacket(
  178. packet,
  179. function (err) {
  180. if (entry.cb) {
  181. entry.cb(err)
  182. }
  183. deliver()
  184. }
  185. )
  186. }
  187. deliver()
  188. })
  189. var firstConnection = true
  190. // resubscribe
  191. this.on('connect', function () {
  192. if (!firstConnection &&
  193. this.options.clean &&
  194. Object.keys(this._resubscribeTopics).length > 0) {
  195. if (this.options.resubscribe) {
  196. this._resubscribeTopics.resubscribe = true
  197. this.subscribe(this._resubscribeTopics)
  198. } else {
  199. this._resubscribeTopics = {}
  200. }
  201. }
  202. firstConnection = false
  203. })
  204. // Clear ping timer
  205. this.on('close', function () {
  206. if (that.pingTimer !== null) {
  207. that.pingTimer.clear()
  208. that.pingTimer = null
  209. }
  210. })
  211. // Setup reconnect timer on disconnect
  212. this.on('close', this._setupReconnect)
  213. events.EventEmitter.call(this)
  214. this._setupStream()
  215. }
  216. inherits(MqttClient, events.EventEmitter)
  217. /**
  218. * setup the event handlers in the inner stream.
  219. *
  220. * @api private
  221. */
  222. MqttClient.prototype._setupStream = function () {
  223. var connectPacket
  224. var that = this
  225. var writable = new Writable()
  226. var parser = mqttPacket.parser(this.options)
  227. var completeParse = null
  228. var packets = []
  229. this._clearReconnect()
  230. this.stream = this.streamBuilder(this)
  231. parser.on('packet', function (packet) {
  232. packets.push(packet)
  233. })
  234. function nextTickWork () {
  235. process.nextTick(work)
  236. }
  237. function work () {
  238. var packet = packets.shift()
  239. var done = completeParse
  240. if (packet) {
  241. that._handlePacket(packet, nextTickWork)
  242. } else {
  243. completeParse = null
  244. done()
  245. }
  246. }
  247. writable._write = function (buf, enc, done) {
  248. completeParse = done
  249. parser.parse(buf)
  250. work()
  251. }
  252. this.stream.pipe(writable)
  253. // Suppress connection errors
  254. this.stream.on('error', nop)
  255. // Echo stream close
  256. eos(this.stream, this.emit.bind(this, 'close'))
  257. // Send a connect packet
  258. connectPacket = Object.create(this.options)
  259. connectPacket.cmd = 'connect'
  260. // avoid message queue
  261. sendPacket(this, connectPacket)
  262. // Echo connection errors
  263. parser.on('error', this.emit.bind(this, 'error'))
  264. // many drain listeners are needed for qos 1 callbacks if the connection is intermittent
  265. this.stream.setMaxListeners(1000)
  266. clearTimeout(this.connackTimer)
  267. this.connackTimer = setTimeout(function () {
  268. that._cleanUp(true)
  269. }, this.options.connectTimeout)
  270. }
  271. MqttClient.prototype._handlePacket = function (packet, done) {
  272. this.emit('packetreceive', packet)
  273. switch (packet.cmd) {
  274. case 'publish':
  275. this._handlePublish(packet, done)
  276. break
  277. case 'puback':
  278. case 'pubrec':
  279. case 'pubcomp':
  280. case 'suback':
  281. case 'unsuback':
  282. this._handleAck(packet)
  283. done()
  284. break
  285. case 'pubrel':
  286. this._handlePubrel(packet, done)
  287. break
  288. case 'connack':
  289. this._handleConnack(packet)
  290. done()
  291. break
  292. case 'pingresp':
  293. this._handlePingresp(packet)
  294. done()
  295. break
  296. default:
  297. // do nothing
  298. // maybe we should do an error handling
  299. // or just log it
  300. break
  301. }
  302. }
  303. MqttClient.prototype._checkDisconnecting = function (callback) {
  304. if (this.disconnecting) {
  305. if (callback) {
  306. callback(new Error('client disconnecting'))
  307. } else {
  308. this.emit('error', new Error('client disconnecting'))
  309. }
  310. }
  311. return this.disconnecting
  312. }
  313. /**
  314. * publish - publish <message> to <topic>
  315. *
  316. * @param {String} topic - topic to publish to
  317. * @param {String, Buffer} message - message to publish
  318. * @param {Object} [opts] - publish options, includes:
  319. * {Number} qos - qos level to publish on
  320. * {Boolean} retain - whether or not to retain the message
  321. * {Boolean} dup - whether or not mark a message as duplicate
  322. * @param {Function} [callback] - function(err){}
  323. * called when publish succeeds or fails
  324. * @returns {MqttClient} this - for chaining
  325. * @api public
  326. *
  327. * @example client.publish('topic', 'message');
  328. * @example
  329. * client.publish('topic', 'message', {qos: 1, retain: true, dup: true});
  330. * @example client.publish('topic', 'message', console.log);
  331. */
  332. MqttClient.prototype.publish = function (topic, message, opts, callback) {
  333. var packet
  334. // .publish(topic, payload, cb);
  335. if (typeof opts === 'function') {
  336. callback = opts
  337. opts = null
  338. }
  339. // default opts
  340. var defaultOpts = {qos: 0, retain: false, dup: false}
  341. opts = xtend(defaultOpts, opts)
  342. if (this._checkDisconnecting(callback)) {
  343. return this
  344. }
  345. packet = {
  346. cmd: 'publish',
  347. topic: topic,
  348. payload: message,
  349. qos: opts.qos,
  350. retain: opts.retain,
  351. messageId: this._nextId(),
  352. dup: opts.dup
  353. }
  354. switch (opts.qos) {
  355. case 1:
  356. case 2:
  357. // Add to callbacks
  358. this.outgoing[packet.messageId] = callback || nop
  359. this._sendPacket(packet)
  360. break
  361. default:
  362. this._sendPacket(packet, callback)
  363. break
  364. }
  365. return this
  366. }
  367. /**
  368. * subscribe - subscribe to <topic>
  369. *
  370. * @param {String, Array, Object} topic - topic(s) to subscribe to, supports objects in the form {'topic': qos}
  371. * @param {Object} [opts] - optional subscription options, includes:
  372. * {Number} qos - subscribe qos level
  373. * @param {Function} [callback] - function(err, granted){} where:
  374. * {Error} err - subscription error (none at the moment!)
  375. * {Array} granted - array of {topic: 't', qos: 0}
  376. * @returns {MqttClient} this - for chaining
  377. * @api public
  378. * @example client.subscribe('topic');
  379. * @example client.subscribe('topic', {qos: 1});
  380. * @example client.subscribe({'topic': 0, 'topic2': 1}, console.log);
  381. * @example client.subscribe('topic', console.log);
  382. */
  383. MqttClient.prototype.subscribe = function () {
  384. var packet
  385. var args = Array.prototype.slice.call(arguments)
  386. var subs = []
  387. var obj = args.shift()
  388. var resubscribe = obj.resubscribe
  389. var callback = args.pop() || nop
  390. var opts = args.pop()
  391. var invalidTopic
  392. var that = this
  393. delete obj.resubscribe
  394. if (typeof obj === 'string') {
  395. obj = [obj]
  396. }
  397. if (typeof callback !== 'function') {
  398. opts = callback
  399. callback = nop
  400. }
  401. invalidTopic = validations.validateTopics(obj)
  402. if (invalidTopic !== null) {
  403. setImmediate(callback, new Error('Invalid topic ' + invalidTopic))
  404. return this
  405. }
  406. if (this._checkDisconnecting(callback)) {
  407. return this
  408. }
  409. var defaultOpts = { qos: 0 }
  410. opts = xtend(defaultOpts, opts)
  411. if (Array.isArray(obj)) {
  412. obj.forEach(function (topic) {
  413. if (that._resubscribeTopics[topic] < opts.qos ||
  414. !that._resubscribeTopics.hasOwnProperty(topic) ||
  415. resubscribe) {
  416. subs.push({
  417. topic: topic,
  418. qos: opts.qos
  419. })
  420. }
  421. })
  422. } else {
  423. Object
  424. .keys(obj)
  425. .forEach(function (k) {
  426. if (that._resubscribeTopics[k] < obj[k] ||
  427. !that._resubscribeTopics.hasOwnProperty(k) ||
  428. resubscribe) {
  429. subs.push({
  430. topic: k,
  431. qos: obj[k]
  432. })
  433. }
  434. })
  435. }
  436. packet = {
  437. cmd: 'subscribe',
  438. subscriptions: subs,
  439. qos: 1,
  440. retain: false,
  441. dup: false,
  442. messageId: this._nextId()
  443. }
  444. if (!subs.length) {
  445. callback(null, [])
  446. return
  447. }
  448. // subscriptions to resubscribe to in case of disconnect
  449. if (this.options.resubscribe) {
  450. var topics = []
  451. subs.forEach(function (sub) {
  452. if (that.options.reconnectPeriod > 0) {
  453. that._resubscribeTopics[sub.topic] = sub.qos
  454. topics.push(sub.topic)
  455. }
  456. })
  457. that.messageIdToTopic[packet.messageId] = topics
  458. }
  459. this.outgoing[packet.messageId] = function (err, packet) {
  460. if (!err) {
  461. var granted = packet.granted
  462. for (var i = 0; i < granted.length; i += 1) {
  463. subs[i].qos = granted[i]
  464. }
  465. }
  466. callback(err, subs)
  467. }
  468. this._sendPacket(packet)
  469. return this
  470. }
  471. /**
  472. * unsubscribe - unsubscribe from topic(s)
  473. *
  474. * @param {String, Array} topic - topics to unsubscribe from
  475. * @param {Function} [callback] - callback fired on unsuback
  476. * @returns {MqttClient} this - for chaining
  477. * @api public
  478. * @example client.unsubscribe('topic');
  479. * @example client.unsubscribe('topic', console.log);
  480. */
  481. MqttClient.prototype.unsubscribe = function (topic, callback) {
  482. var packet = {
  483. cmd: 'unsubscribe',
  484. qos: 1,
  485. messageId: this._nextId()
  486. }
  487. var that = this
  488. callback = callback || nop
  489. if (this._checkDisconnecting(callback)) {
  490. return this
  491. }
  492. if (typeof topic === 'string') {
  493. packet.unsubscriptions = [topic]
  494. } else if (typeof topic === 'object' && topic.length) {
  495. packet.unsubscriptions = topic
  496. }
  497. if (this.options.resubscribe) {
  498. packet.unsubscriptions.forEach(function (topic) {
  499. delete that._resubscribeTopics[topic]
  500. })
  501. }
  502. this.outgoing[packet.messageId] = callback
  503. this._sendPacket(packet)
  504. return this
  505. }
  506. /**
  507. * end - close connection
  508. *
  509. * @returns {MqttClient} this - for chaining
  510. * @param {Boolean} force - do not wait for all in-flight messages to be acked
  511. * @param {Function} cb - called when the client has been closed
  512. *
  513. * @api public
  514. */
  515. MqttClient.prototype.end = function (force, cb) {
  516. var that = this
  517. if (typeof force === 'function') {
  518. cb = force
  519. force = false
  520. }
  521. function closeStores () {
  522. that.disconnected = true
  523. that.incomingStore.close(function () {
  524. that.outgoingStore.close(function () {
  525. if (cb) {
  526. cb.apply(null, arguments)
  527. }
  528. that.emit('end')
  529. })
  530. })
  531. if (that._deferredReconnect) {
  532. that._deferredReconnect()
  533. }
  534. }
  535. function finish () {
  536. // defer closesStores of an I/O cycle,
  537. // just to make sure things are
  538. // ok for websockets
  539. that._cleanUp(force, setImmediate.bind(null, closeStores))
  540. }
  541. if (this.disconnecting) {
  542. return this
  543. }
  544. this._clearReconnect()
  545. this.disconnecting = true
  546. if (!force && Object.keys(this.outgoing).length > 0) {
  547. // wait 10ms, just to be sure we received all of it
  548. this.once('outgoingEmpty', setTimeout.bind(null, finish, 10))
  549. } else {
  550. finish()
  551. }
  552. return this
  553. }
  554. /**
  555. * removeOutgoingMessage - remove a message in outgoing store
  556. * the outgoing callback will be called withe Error('Message removed') if the message is removed
  557. *
  558. * @param {Number} mid - messageId to remove message
  559. * @returns {MqttClient} this - for chaining
  560. * @api public
  561. *
  562. * @example client.removeOutgoingMessage(client.getLastMessageId());
  563. */
  564. MqttClient.prototype.removeOutgoingMessage = function (mid) {
  565. var cb = this.outgoing[mid]
  566. delete this.outgoing[mid]
  567. this.outgoingStore.del({messageId: mid}, function () {
  568. cb(new Error('Message removed'))
  569. })
  570. return this
  571. }
  572. /**
  573. * reconnect - connect again using the same options as connect()
  574. *
  575. * @param {Object} [opts] - optional reconnect options, includes:
  576. * {Store} incomingStore - a store for the incoming packets
  577. * {Store} outgoingStore - a store for the outgoing packets
  578. * if opts is not given, current stores are used
  579. * @returns {MqttClient} this - for chaining
  580. *
  581. * @api public
  582. */
  583. MqttClient.prototype.reconnect = function (opts) {
  584. var that = this
  585. var f = function () {
  586. if (opts) {
  587. that.options.incomingStore = opts.incomingStore
  588. that.options.outgoingStore = opts.outgoingStore
  589. } else {
  590. that.options.incomingStore = null
  591. that.options.outgoingStore = null
  592. }
  593. that.incomingStore = that.options.incomingStore || new Store()
  594. that.outgoingStore = that.options.outgoingStore || new Store()
  595. that.disconnecting = false
  596. that.disconnected = false
  597. that._deferredReconnect = null
  598. that._reconnect()
  599. }
  600. if (this.disconnecting && !this.disconnected) {
  601. this._deferredReconnect = f
  602. } else {
  603. f()
  604. }
  605. return this
  606. }
  607. /**
  608. * _reconnect - implement reconnection
  609. * @api privateish
  610. */
  611. MqttClient.prototype._reconnect = function () {
  612. this.emit('reconnect')
  613. this._setupStream()
  614. }
  615. /**
  616. * _setupReconnect - setup reconnect timer
  617. */
  618. MqttClient.prototype._setupReconnect = function () {
  619. var that = this
  620. if (!that.disconnecting && !that.reconnectTimer && (that.options.reconnectPeriod > 0)) {
  621. if (!this.reconnecting) {
  622. this.emit('offline')
  623. this.reconnecting = true
  624. }
  625. that.reconnectTimer = setInterval(function () {
  626. that._reconnect()
  627. }, that.options.reconnectPeriod)
  628. }
  629. }
  630. /**
  631. * _clearReconnect - clear the reconnect timer
  632. */
  633. MqttClient.prototype._clearReconnect = function () {
  634. if (this.reconnectTimer) {
  635. clearInterval(this.reconnectTimer)
  636. this.reconnectTimer = null
  637. }
  638. }
  639. /**
  640. * _cleanUp - clean up on connection end
  641. * @api private
  642. */
  643. MqttClient.prototype._cleanUp = function (forced, done) {
  644. if (done) {
  645. this.stream.on('close', done)
  646. }
  647. if (forced) {
  648. if ((this.options.reconnectPeriod === 0) && this.options.clean) {
  649. flush(this.outgoing)
  650. }
  651. this.stream.destroy()
  652. } else {
  653. this._sendPacket(
  654. { cmd: 'disconnect' },
  655. setImmediate.bind(
  656. null,
  657. this.stream.end.bind(this.stream)
  658. )
  659. )
  660. }
  661. if (!this.disconnecting) {
  662. this._clearReconnect()
  663. this._setupReconnect()
  664. }
  665. if (this.pingTimer !== null) {
  666. this.pingTimer.clear()
  667. this.pingTimer = null
  668. }
  669. if (done && !this.connected) {
  670. this.stream.removeListener('close', done)
  671. done()
  672. }
  673. }
  674. /**
  675. * _sendPacket - send or queue a packet
  676. * @param {String} type - packet type (see `protocol`)
  677. * @param {Object} packet - packet options
  678. * @param {Function} cb - callback when the packet is sent
  679. * @api private
  680. */
  681. MqttClient.prototype._sendPacket = function (packet, cb) {
  682. if (!this.connected) {
  683. if (((packet.qos || 0) === 0 && this.queueQoSZero) || packet.cmd !== 'publish') {
  684. this.queue.push({ packet: packet, cb: cb })
  685. } else if (packet.qos > 0) {
  686. cb = this.outgoing[packet.messageId]
  687. this.outgoingStore.put(packet, function (err) {
  688. if (err) {
  689. return cb && cb(err)
  690. }
  691. })
  692. } else if (cb) {
  693. cb(new Error('No connection to broker'))
  694. }
  695. return
  696. }
  697. // When sending a packet, reschedule the ping timer
  698. this._shiftPingInterval()
  699. switch (packet.cmd) {
  700. case 'publish':
  701. break
  702. case 'pubrel':
  703. storeAndSend(this, packet, cb)
  704. return
  705. default:
  706. sendPacket(this, packet, cb)
  707. return
  708. }
  709. switch (packet.qos) {
  710. case 2:
  711. case 1:
  712. storeAndSend(this, packet, cb)
  713. break
  714. /**
  715. * no need of case here since it will be caught by default
  716. * and jshint comply that before default it must be a break
  717. * anyway it will result in -1 evaluation
  718. */
  719. case 0:
  720. /* falls through */
  721. default:
  722. sendPacket(this, packet, cb)
  723. break
  724. }
  725. }
  726. /**
  727. * _setupPingTimer - setup the ping timer
  728. *
  729. * @api private
  730. */
  731. MqttClient.prototype._setupPingTimer = function () {
  732. var that = this
  733. if (!this.pingTimer && this.options.keepalive) {
  734. this.pingResp = true
  735. this.pingTimer = reInterval(function () {
  736. that._checkPing()
  737. }, this.options.keepalive * 1000)
  738. }
  739. }
  740. /**
  741. * _shiftPingInterval - reschedule the ping interval
  742. *
  743. * @api private
  744. */
  745. MqttClient.prototype._shiftPingInterval = function () {
  746. if (this.pingTimer && this.options.keepalive && this.options.reschedulePings) {
  747. this.pingTimer.reschedule(this.options.keepalive * 1000)
  748. }
  749. }
  750. /**
  751. * _checkPing - check if a pingresp has come back, and ping the server again
  752. *
  753. * @api private
  754. */
  755. MqttClient.prototype._checkPing = function () {
  756. if (this.pingResp) {
  757. this.pingResp = false
  758. this._sendPacket({ cmd: 'pingreq' })
  759. } else {
  760. // do a forced cleanup since socket will be in bad shape
  761. this._cleanUp(true)
  762. }
  763. }
  764. /**
  765. * _handlePingresp - handle a pingresp
  766. *
  767. * @api private
  768. */
  769. MqttClient.prototype._handlePingresp = function () {
  770. this.pingResp = true
  771. }
  772. /**
  773. * _handleConnack
  774. *
  775. * @param {Object} packet
  776. * @api private
  777. */
  778. MqttClient.prototype._handleConnack = function (packet) {
  779. var rc = packet.returnCode
  780. var errors = [
  781. '',
  782. 'Unacceptable protocol version',
  783. 'Identifier rejected',
  784. 'Server unavailable',
  785. 'Bad username or password',
  786. 'Not authorized'
  787. ]
  788. clearTimeout(this.connackTimer)
  789. if (rc === 0) {
  790. this.reconnecting = false
  791. this.emit('connect', packet)
  792. } else if (rc > 0) {
  793. var err = new Error('Connection refused: ' + errors[rc])
  794. err.code = rc
  795. this.emit('error', err)
  796. }
  797. }
  798. /**
  799. * _handlePublish
  800. *
  801. * @param {Object} packet
  802. * @api private
  803. */
  804. /*
  805. those late 2 case should be rewrite to comply with coding style:
  806. case 1:
  807. case 0:
  808. // do not wait sending a puback
  809. // no callback passed
  810. if (1 === qos) {
  811. this._sendPacket({
  812. cmd: 'puback',
  813. messageId: mid
  814. });
  815. }
  816. // emit the message event for both qos 1 and 0
  817. this.emit('message', topic, message, packet);
  818. this.handleMessage(packet, done);
  819. break;
  820. default:
  821. // do nothing but every switch mus have a default
  822. // log or throw an error about unknown qos
  823. break;
  824. for now i just suppressed the warnings
  825. */
  826. MqttClient.prototype._handlePublish = function (packet, done) {
  827. done = typeof done !== 'undefined' ? done : nop
  828. var topic = packet.topic.toString()
  829. var message = packet.payload
  830. var qos = packet.qos
  831. var mid = packet.messageId
  832. var that = this
  833. switch (qos) {
  834. case 2:
  835. this.incomingStore.put(packet, function (err) {
  836. if (err) {
  837. return done(err)
  838. }
  839. that._sendPacket({cmd: 'pubrec', messageId: mid}, done)
  840. })
  841. break
  842. case 1:
  843. // emit the message event
  844. this.emit('message', topic, message, packet)
  845. this.handleMessage(packet, function (err) {
  846. if (err) {
  847. return done(err)
  848. }
  849. // send 'puback' if the above 'handleMessage' method executed
  850. // successfully.
  851. that._sendPacket({cmd: 'puback', messageId: mid}, done)
  852. })
  853. break
  854. case 0:
  855. // emit the message event
  856. this.emit('message', topic, message, packet)
  857. this.handleMessage(packet, done)
  858. break
  859. default:
  860. // do nothing
  861. // log or throw an error about unknown qos
  862. break
  863. }
  864. }
  865. /**
  866. * Handle messages with backpressure support, one at a time.
  867. * Override at will.
  868. *
  869. * @param Packet packet the packet
  870. * @param Function callback call when finished
  871. * @api public
  872. */
  873. MqttClient.prototype.handleMessage = function (packet, callback) {
  874. callback()
  875. }
  876. /**
  877. * _handleAck
  878. *
  879. * @param {Object} packet
  880. * @api private
  881. */
  882. MqttClient.prototype._handleAck = function (packet) {
  883. /* eslint no-fallthrough: "off" */
  884. var mid = packet.messageId
  885. var type = packet.cmd
  886. var response = null
  887. var cb = this.outgoing[mid]
  888. var that = this
  889. if (!cb) {
  890. // Server sent an ack in error, ignore it.
  891. return
  892. }
  893. // Process
  894. switch (type) {
  895. case 'pubcomp':
  896. // same thing as puback for QoS 2
  897. case 'puback':
  898. // Callback - we're done
  899. delete this.outgoing[mid]
  900. this.outgoingStore.del(packet, cb)
  901. break
  902. case 'pubrec':
  903. response = {
  904. cmd: 'pubrel',
  905. qos: 2,
  906. messageId: mid
  907. }
  908. this._sendPacket(response)
  909. break
  910. case 'suback':
  911. delete this.outgoing[mid]
  912. if (packet.granted.length === 1 && (packet.granted[0] & 0x80) !== 0) {
  913. // suback with Failure status
  914. var topics = this.messageIdToTopic[mid]
  915. if (topics) {
  916. topics.forEach(function (topic) {
  917. delete that._resubscribeTopics[topic]
  918. })
  919. }
  920. }
  921. cb(null, packet)
  922. break
  923. case 'unsuback':
  924. delete this.outgoing[mid]
  925. cb(null)
  926. break
  927. default:
  928. that.emit('error', new Error('unrecognized packet type'))
  929. }
  930. if (this.disconnecting &&
  931. Object.keys(this.outgoing).length === 0) {
  932. this.emit('outgoingEmpty')
  933. }
  934. }
  935. /**
  936. * _handlePubrel
  937. *
  938. * @param {Object} packet
  939. * @api private
  940. */
  941. MqttClient.prototype._handlePubrel = function (packet, callback) {
  942. callback = typeof callback !== 'undefined' ? callback : nop
  943. var mid = packet.messageId
  944. var that = this
  945. var comp = {cmd: 'pubcomp', messageId: mid}
  946. that.incomingStore.get(packet, function (err, pub) {
  947. if (!err && pub.cmd !== 'pubrel') {
  948. that.emit('message', pub.topic, pub.payload, pub)
  949. that.incomingStore.put(packet, function (err) {
  950. if (err) {
  951. return callback(err)
  952. }
  953. that.handleMessage(pub, function (err) {
  954. if (err) {
  955. return callback(err)
  956. }
  957. that._sendPacket(comp, callback)
  958. })
  959. })
  960. } else {
  961. that._sendPacket(comp, callback)
  962. }
  963. })
  964. }
  965. /**
  966. * _nextId
  967. * @return unsigned int
  968. */
  969. MqttClient.prototype._nextId = function () {
  970. // id becomes current state of this.nextId and increments afterwards
  971. var id = this.nextId++
  972. // Ensure 16 bit unsigned int (max 65535, nextId got one higher)
  973. if (this.nextId === 65536) {
  974. this.nextId = 1
  975. }
  976. return id
  977. }
  978. /**
  979. * getLastMessageId
  980. * @return unsigned int
  981. */
  982. MqttClient.prototype.getLastMessageId = function () {
  983. return (this.nextId === 1) ? 65535 : (this.nextId - 1)
  984. }
  985. module.exports = MqttClient