1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129 |
- 'use strict'
- /**
- * Module dependencies
- */
- var events = require('events')
- var Store = require('./store')
- var eos = require('end-of-stream')
- var mqttPacket = require('mqtt-packet')
- var Writable = require('readable-stream').Writable
- var inherits = require('inherits')
- var reInterval = require('reinterval')
- var validations = require('./validations')
- var xtend = require('xtend')
- var setImmediate = global.setImmediate || function (callback) {
- // works in node v0.8
- process.nextTick(callback)
- }
- var defaultConnectOptions = {
- keepalive: 60,
- reschedulePings: true,
- protocolId: 'MQTT',
- protocolVersion: 4,
- reconnectPeriod: 1000,
- connectTimeout: 30 * 1000,
- clean: true,
- resubscribe: true
- }
- function defaultId () {
- return 'mqttjs_' + Math.random().toString(16).substr(2, 8)
- }
- function sendPacket (client, packet, cb) {
- client.emit('packetsend', packet)
- var result = mqttPacket.writeToStream(packet, client.stream)
- if (!result && cb) {
- client.stream.once('drain', cb)
- } else if (cb) {
- cb()
- }
- }
- function flush (queue) {
- if (queue) {
- Object.keys(queue).forEach(function (messageId) {
- if (typeof queue[messageId] === 'function') {
- queue[messageId](new Error('Connection closed'))
- delete queue[messageId]
- }
- })
- }
- }
- function storeAndSend (client, packet, cb) {
- client.outgoingStore.put(packet, function storedPacket (err) {
- if (err) {
- return cb && cb(err)
- }
- sendPacket(client, packet, cb)
- })
- }
- function nop () {}
- /**
- * MqttClient constructor
- *
- * @param {Stream} stream - stream
- * @param {Object} [options] - connection options
- * (see Connection#connect)
- */
- function MqttClient (streamBuilder, options) {
- var k
- var that = this
- if (!(this instanceof MqttClient)) {
- return new MqttClient(streamBuilder, options)
- }
- this.options = options || {}
- // Defaults
- for (k in defaultConnectOptions) {
- if (typeof this.options[k] === 'undefined') {
- this.options[k] = defaultConnectOptions[k]
- } else {
- this.options[k] = options[k]
- }
- }
- this.options.clientId = (typeof this.options.clientId === 'string') ? this.options.clientId : defaultId()
- this.streamBuilder = streamBuilder
- // Inflight message storages
- this.outgoingStore = this.options.outgoingStore || new Store()
- this.incomingStore = this.options.incomingStore || new Store()
- // Should QoS zero messages be queued when the connection is broken?
- this.queueQoSZero = this.options.queueQoSZero === undefined ? true : this.options.queueQoSZero
- // map of subscribed topics to support reconnection
- this._resubscribeTopics = {}
- // map of a subscribe messageId and a topic
- this.messageIdToTopic = {}
- // Ping timer, setup in _setupPingTimer
- this.pingTimer = null
- // Is the client connected?
- this.connected = false
- // Are we disconnecting?
- this.disconnecting = false
- // Packet queue
- this.queue = []
- // connack timer
- this.connackTimer = null
- // Reconnect timer
- this.reconnectTimer = null
- /**
- * MessageIDs starting with 1
- * ensure that nextId is min. 1, see https://github.com/mqttjs/MQTT.js/issues/810
- */
- this.nextId = Math.max(1, Math.floor(Math.random() * 65535))
- // Inflight callbacks
- this.outgoing = {}
- // Mark connected on connect
- this.on('connect', function () {
- if (this.disconnected) {
- return
- }
- this.connected = true
- var outStore = this.outgoingStore.createStream()
- this.once('close', remove)
- outStore.on('end', function () {
- that.removeListener('close', remove)
- })
- outStore.on('error', function (err) {
- that.removeListener('close', remove)
- that.emit('error', err)
- })
- function remove () {
- outStore.destroy()
- outStore = null
- }
- function storeDeliver () {
- // edge case, we wrapped this twice
- if (!outStore) {
- return
- }
- var packet = outStore.read(1)
- var cb
- if (!packet) {
- // read when data is available in the future
- outStore.once('readable', storeDeliver)
- return
- }
- // Avoid unnecessary stream read operations when disconnected
- if (!that.disconnecting && !that.reconnectTimer) {
- cb = that.outgoing[packet.messageId]
- that.outgoing[packet.messageId] = function (err, status) {
- // Ensure that the original callback passed in to publish gets invoked
- if (cb) {
- cb(err, status)
- }
- storeDeliver()
- }
- that._sendPacket(packet)
- } else if (outStore.destroy) {
- outStore.destroy()
- }
- }
- // start flowing
- storeDeliver()
- })
- // Mark disconnected on stream close
- this.on('close', function () {
- this.connected = false
- clearTimeout(this.connackTimer)
- })
- // Setup ping timer
- this.on('connect', this._setupPingTimer)
- // Send queued packets
- this.on('connect', function () {
- var queue = this.queue
- function deliver () {
- var entry = queue.shift()
- var packet = null
- if (!entry) {
- return
- }
- packet = entry.packet
- that._sendPacket(
- packet,
- function (err) {
- if (entry.cb) {
- entry.cb(err)
- }
- deliver()
- }
- )
- }
- deliver()
- })
- var firstConnection = true
- // resubscribe
- this.on('connect', function () {
- if (!firstConnection &&
- this.options.clean &&
- Object.keys(this._resubscribeTopics).length > 0) {
- if (this.options.resubscribe) {
- this._resubscribeTopics.resubscribe = true
- this.subscribe(this._resubscribeTopics)
- } else {
- this._resubscribeTopics = {}
- }
- }
- firstConnection = false
- })
- // Clear ping timer
- this.on('close', function () {
- if (that.pingTimer !== null) {
- that.pingTimer.clear()
- that.pingTimer = null
- }
- })
- // Setup reconnect timer on disconnect
- this.on('close', this._setupReconnect)
- events.EventEmitter.call(this)
- this._setupStream()
- }
- inherits(MqttClient, events.EventEmitter)
- /**
- * setup the event handlers in the inner stream.
- *
- * @api private
- */
- MqttClient.prototype._setupStream = function () {
- var connectPacket
- var that = this
- var writable = new Writable()
- var parser = mqttPacket.parser(this.options)
- var completeParse = null
- var packets = []
- this._clearReconnect()
- this.stream = this.streamBuilder(this)
- parser.on('packet', function (packet) {
- packets.push(packet)
- })
- function nextTickWork () {
- process.nextTick(work)
- }
- function work () {
- var packet = packets.shift()
- var done = completeParse
- if (packet) {
- that._handlePacket(packet, nextTickWork)
- } else {
- completeParse = null
- done()
- }
- }
- writable._write = function (buf, enc, done) {
- completeParse = done
- parser.parse(buf)
- work()
- }
- this.stream.pipe(writable)
- // Suppress connection errors
- this.stream.on('error', nop)
- // Echo stream close
- eos(this.stream, this.emit.bind(this, 'close'))
- // Send a connect packet
- connectPacket = Object.create(this.options)
- connectPacket.cmd = 'connect'
- // avoid message queue
- sendPacket(this, connectPacket)
- // Echo connection errors
- parser.on('error', this.emit.bind(this, 'error'))
- // many drain listeners are needed for qos 1 callbacks if the connection is intermittent
- this.stream.setMaxListeners(1000)
- clearTimeout(this.connackTimer)
- this.connackTimer = setTimeout(function () {
- that._cleanUp(true)
- }, this.options.connectTimeout)
- }
- MqttClient.prototype._handlePacket = function (packet, done) {
- this.emit('packetreceive', packet)
- switch (packet.cmd) {
- case 'publish':
- this._handlePublish(packet, done)
- break
- case 'puback':
- case 'pubrec':
- case 'pubcomp':
- case 'suback':
- case 'unsuback':
- this._handleAck(packet)
- done()
- break
- case 'pubrel':
- this._handlePubrel(packet, done)
- break
- case 'connack':
- this._handleConnack(packet)
- done()
- break
- case 'pingresp':
- this._handlePingresp(packet)
- done()
- break
- default:
- // do nothing
- // maybe we should do an error handling
- // or just log it
- break
- }
- }
- MqttClient.prototype._checkDisconnecting = function (callback) {
- if (this.disconnecting) {
- if (callback) {
- callback(new Error('client disconnecting'))
- } else {
- this.emit('error', new Error('client disconnecting'))
- }
- }
- return this.disconnecting
- }
- /**
- * publish - publish <message> to <topic>
- *
- * @param {String} topic - topic to publish to
- * @param {String, Buffer} message - message to publish
- * @param {Object} [opts] - publish options, includes:
- * {Number} qos - qos level to publish on
- * {Boolean} retain - whether or not to retain the message
- * {Boolean} dup - whether or not mark a message as duplicate
- * @param {Function} [callback] - function(err){}
- * called when publish succeeds or fails
- * @returns {MqttClient} this - for chaining
- * @api public
- *
- * @example client.publish('topic', 'message');
- * @example
- * client.publish('topic', 'message', {qos: 1, retain: true, dup: true});
- * @example client.publish('topic', 'message', console.log);
- */
- MqttClient.prototype.publish = function (topic, message, opts, callback) {
- var packet
- // .publish(topic, payload, cb);
- if (typeof opts === 'function') {
- callback = opts
- opts = null
- }
- // default opts
- var defaultOpts = {qos: 0, retain: false, dup: false}
- opts = xtend(defaultOpts, opts)
- if (this._checkDisconnecting(callback)) {
- return this
- }
- packet = {
- cmd: 'publish',
- topic: topic,
- payload: message,
- qos: opts.qos,
- retain: opts.retain,
- messageId: this._nextId(),
- dup: opts.dup
- }
- switch (opts.qos) {
- case 1:
- case 2:
- // Add to callbacks
- this.outgoing[packet.messageId] = callback || nop
- this._sendPacket(packet)
- break
- default:
- this._sendPacket(packet, callback)
- break
- }
- return this
- }
- /**
- * subscribe - subscribe to <topic>
- *
- * @param {String, Array, Object} topic - topic(s) to subscribe to, supports objects in the form {'topic': qos}
- * @param {Object} [opts] - optional subscription options, includes:
- * {Number} qos - subscribe qos level
- * @param {Function} [callback] - function(err, granted){} where:
- * {Error} err - subscription error (none at the moment!)
- * {Array} granted - array of {topic: 't', qos: 0}
- * @returns {MqttClient} this - for chaining
- * @api public
- * @example client.subscribe('topic');
- * @example client.subscribe('topic', {qos: 1});
- * @example client.subscribe({'topic': 0, 'topic2': 1}, console.log);
- * @example client.subscribe('topic', console.log);
- */
- MqttClient.prototype.subscribe = function () {
- var packet
- var args = Array.prototype.slice.call(arguments)
- var subs = []
- var obj = args.shift()
- var resubscribe = obj.resubscribe
- var callback = args.pop() || nop
- var opts = args.pop()
- var invalidTopic
- var that = this
- delete obj.resubscribe
- if (typeof obj === 'string') {
- obj = [obj]
- }
- if (typeof callback !== 'function') {
- opts = callback
- callback = nop
- }
- invalidTopic = validations.validateTopics(obj)
- if (invalidTopic !== null) {
- setImmediate(callback, new Error('Invalid topic ' + invalidTopic))
- return this
- }
- if (this._checkDisconnecting(callback)) {
- return this
- }
- var defaultOpts = { qos: 0 }
- opts = xtend(defaultOpts, opts)
- if (Array.isArray(obj)) {
- obj.forEach(function (topic) {
- if (that._resubscribeTopics[topic] < opts.qos ||
- !that._resubscribeTopics.hasOwnProperty(topic) ||
- resubscribe) {
- subs.push({
- topic: topic,
- qos: opts.qos
- })
- }
- })
- } else {
- Object
- .keys(obj)
- .forEach(function (k) {
- if (that._resubscribeTopics[k] < obj[k] ||
- !that._resubscribeTopics.hasOwnProperty(k) ||
- resubscribe) {
- subs.push({
- topic: k,
- qos: obj[k]
- })
- }
- })
- }
- packet = {
- cmd: 'subscribe',
- subscriptions: subs,
- qos: 1,
- retain: false,
- dup: false,
- messageId: this._nextId()
- }
- if (!subs.length) {
- callback(null, [])
- return
- }
- // subscriptions to resubscribe to in case of disconnect
- if (this.options.resubscribe) {
- var topics = []
- subs.forEach(function (sub) {
- if (that.options.reconnectPeriod > 0) {
- that._resubscribeTopics[sub.topic] = sub.qos
- topics.push(sub.topic)
- }
- })
- that.messageIdToTopic[packet.messageId] = topics
- }
- this.outgoing[packet.messageId] = function (err, packet) {
- if (!err) {
- var granted = packet.granted
- for (var i = 0; i < granted.length; i += 1) {
- subs[i].qos = granted[i]
- }
- }
- callback(err, subs)
- }
- this._sendPacket(packet)
- return this
- }
- /**
- * unsubscribe - unsubscribe from topic(s)
- *
- * @param {String, Array} topic - topics to unsubscribe from
- * @param {Function} [callback] - callback fired on unsuback
- * @returns {MqttClient} this - for chaining
- * @api public
- * @example client.unsubscribe('topic');
- * @example client.unsubscribe('topic', console.log);
- */
- MqttClient.prototype.unsubscribe = function (topic, callback) {
- var packet = {
- cmd: 'unsubscribe',
- qos: 1,
- messageId: this._nextId()
- }
- var that = this
- callback = callback || nop
- if (this._checkDisconnecting(callback)) {
- return this
- }
- if (typeof topic === 'string') {
- packet.unsubscriptions = [topic]
- } else if (typeof topic === 'object' && topic.length) {
- packet.unsubscriptions = topic
- }
- if (this.options.resubscribe) {
- packet.unsubscriptions.forEach(function (topic) {
- delete that._resubscribeTopics[topic]
- })
- }
- this.outgoing[packet.messageId] = callback
- this._sendPacket(packet)
- return this
- }
- /**
- * end - close connection
- *
- * @returns {MqttClient} this - for chaining
- * @param {Boolean} force - do not wait for all in-flight messages to be acked
- * @param {Function} cb - called when the client has been closed
- *
- * @api public
- */
- MqttClient.prototype.end = function (force, cb) {
- var that = this
- if (typeof force === 'function') {
- cb = force
- force = false
- }
- function closeStores () {
- that.disconnected = true
- that.incomingStore.close(function () {
- that.outgoingStore.close(function () {
- if (cb) {
- cb.apply(null, arguments)
- }
- that.emit('end')
- })
- })
- if (that._deferredReconnect) {
- that._deferredReconnect()
- }
- }
- function finish () {
- // defer closesStores of an I/O cycle,
- // just to make sure things are
- // ok for websockets
- that._cleanUp(force, setImmediate.bind(null, closeStores))
- }
- if (this.disconnecting) {
- return this
- }
- this._clearReconnect()
- this.disconnecting = true
- if (!force && Object.keys(this.outgoing).length > 0) {
- // wait 10ms, just to be sure we received all of it
- this.once('outgoingEmpty', setTimeout.bind(null, finish, 10))
- } else {
- finish()
- }
- return this
- }
- /**
- * removeOutgoingMessage - remove a message in outgoing store
- * the outgoing callback will be called withe Error('Message removed') if the message is removed
- *
- * @param {Number} mid - messageId to remove message
- * @returns {MqttClient} this - for chaining
- * @api public
- *
- * @example client.removeOutgoingMessage(client.getLastMessageId());
- */
- MqttClient.prototype.removeOutgoingMessage = function (mid) {
- var cb = this.outgoing[mid]
- delete this.outgoing[mid]
- this.outgoingStore.del({messageId: mid}, function () {
- cb(new Error('Message removed'))
- })
- return this
- }
- /**
- * reconnect - connect again using the same options as connect()
- *
- * @param {Object} [opts] - optional reconnect options, includes:
- * {Store} incomingStore - a store for the incoming packets
- * {Store} outgoingStore - a store for the outgoing packets
- * if opts is not given, current stores are used
- * @returns {MqttClient} this - for chaining
- *
- * @api public
- */
- MqttClient.prototype.reconnect = function (opts) {
- var that = this
- var f = function () {
- if (opts) {
- that.options.incomingStore = opts.incomingStore
- that.options.outgoingStore = opts.outgoingStore
- } else {
- that.options.incomingStore = null
- that.options.outgoingStore = null
- }
- that.incomingStore = that.options.incomingStore || new Store()
- that.outgoingStore = that.options.outgoingStore || new Store()
- that.disconnecting = false
- that.disconnected = false
- that._deferredReconnect = null
- that._reconnect()
- }
- if (this.disconnecting && !this.disconnected) {
- this._deferredReconnect = f
- } else {
- f()
- }
- return this
- }
- /**
- * _reconnect - implement reconnection
- * @api privateish
- */
- MqttClient.prototype._reconnect = function () {
- this.emit('reconnect')
- this._setupStream()
- }
- /**
- * _setupReconnect - setup reconnect timer
- */
- MqttClient.prototype._setupReconnect = function () {
- var that = this
- if (!that.disconnecting && !that.reconnectTimer && (that.options.reconnectPeriod > 0)) {
- if (!this.reconnecting) {
- this.emit('offline')
- this.reconnecting = true
- }
- that.reconnectTimer = setInterval(function () {
- that._reconnect()
- }, that.options.reconnectPeriod)
- }
- }
- /**
- * _clearReconnect - clear the reconnect timer
- */
- MqttClient.prototype._clearReconnect = function () {
- if (this.reconnectTimer) {
- clearInterval(this.reconnectTimer)
- this.reconnectTimer = null
- }
- }
- /**
- * _cleanUp - clean up on connection end
- * @api private
- */
- MqttClient.prototype._cleanUp = function (forced, done) {
- if (done) {
- this.stream.on('close', done)
- }
- if (forced) {
- if ((this.options.reconnectPeriod === 0) && this.options.clean) {
- flush(this.outgoing)
- }
- this.stream.destroy()
- } else {
- this._sendPacket(
- { cmd: 'disconnect' },
- setImmediate.bind(
- null,
- this.stream.end.bind(this.stream)
- )
- )
- }
- if (!this.disconnecting) {
- this._clearReconnect()
- this._setupReconnect()
- }
- if (this.pingTimer !== null) {
- this.pingTimer.clear()
- this.pingTimer = null
- }
- if (done && !this.connected) {
- this.stream.removeListener('close', done)
- done()
- }
- }
- /**
- * _sendPacket - send or queue a packet
- * @param {String} type - packet type (see `protocol`)
- * @param {Object} packet - packet options
- * @param {Function} cb - callback when the packet is sent
- * @api private
- */
- MqttClient.prototype._sendPacket = function (packet, cb) {
- if (!this.connected) {
- if (((packet.qos || 0) === 0 && this.queueQoSZero) || packet.cmd !== 'publish') {
- this.queue.push({ packet: packet, cb: cb })
- } else if (packet.qos > 0) {
- cb = this.outgoing[packet.messageId]
- this.outgoingStore.put(packet, function (err) {
- if (err) {
- return cb && cb(err)
- }
- })
- } else if (cb) {
- cb(new Error('No connection to broker'))
- }
- return
- }
- // When sending a packet, reschedule the ping timer
- this._shiftPingInterval()
- switch (packet.cmd) {
- case 'publish':
- break
- case 'pubrel':
- storeAndSend(this, packet, cb)
- return
- default:
- sendPacket(this, packet, cb)
- return
- }
- switch (packet.qos) {
- case 2:
- case 1:
- storeAndSend(this, packet, cb)
- break
- /**
- * no need of case here since it will be caught by default
- * and jshint comply that before default it must be a break
- * anyway it will result in -1 evaluation
- */
- case 0:
- /* falls through */
- default:
- sendPacket(this, packet, cb)
- break
- }
- }
- /**
- * _setupPingTimer - setup the ping timer
- *
- * @api private
- */
- MqttClient.prototype._setupPingTimer = function () {
- var that = this
- if (!this.pingTimer && this.options.keepalive) {
- this.pingResp = true
- this.pingTimer = reInterval(function () {
- that._checkPing()
- }, this.options.keepalive * 1000)
- }
- }
- /**
- * _shiftPingInterval - reschedule the ping interval
- *
- * @api private
- */
- MqttClient.prototype._shiftPingInterval = function () {
- if (this.pingTimer && this.options.keepalive && this.options.reschedulePings) {
- this.pingTimer.reschedule(this.options.keepalive * 1000)
- }
- }
- /**
- * _checkPing - check if a pingresp has come back, and ping the server again
- *
- * @api private
- */
- MqttClient.prototype._checkPing = function () {
- if (this.pingResp) {
- this.pingResp = false
- this._sendPacket({ cmd: 'pingreq' })
- } else {
- // do a forced cleanup since socket will be in bad shape
- this._cleanUp(true)
- }
- }
- /**
- * _handlePingresp - handle a pingresp
- *
- * @api private
- */
- MqttClient.prototype._handlePingresp = function () {
- this.pingResp = true
- }
- /**
- * _handleConnack
- *
- * @param {Object} packet
- * @api private
- */
- MqttClient.prototype._handleConnack = function (packet) {
- var rc = packet.returnCode
- var errors = [
- '',
- 'Unacceptable protocol version',
- 'Identifier rejected',
- 'Server unavailable',
- 'Bad username or password',
- 'Not authorized'
- ]
- clearTimeout(this.connackTimer)
- if (rc === 0) {
- this.reconnecting = false
- this.emit('connect', packet)
- } else if (rc > 0) {
- var err = new Error('Connection refused: ' + errors[rc])
- err.code = rc
- this.emit('error', err)
- }
- }
- /**
- * _handlePublish
- *
- * @param {Object} packet
- * @api private
- */
- /*
- those late 2 case should be rewrite to comply with coding style:
- case 1:
- case 0:
- // do not wait sending a puback
- // no callback passed
- if (1 === qos) {
- this._sendPacket({
- cmd: 'puback',
- messageId: mid
- });
- }
- // emit the message event for both qos 1 and 0
- this.emit('message', topic, message, packet);
- this.handleMessage(packet, done);
- break;
- default:
- // do nothing but every switch mus have a default
- // log or throw an error about unknown qos
- break;
- for now i just suppressed the warnings
- */
- MqttClient.prototype._handlePublish = function (packet, done) {
- done = typeof done !== 'undefined' ? done : nop
- var topic = packet.topic.toString()
- var message = packet.payload
- var qos = packet.qos
- var mid = packet.messageId
- var that = this
- switch (qos) {
- case 2:
- this.incomingStore.put(packet, function (err) {
- if (err) {
- return done(err)
- }
- that._sendPacket({cmd: 'pubrec', messageId: mid}, done)
- })
- break
- case 1:
- // emit the message event
- this.emit('message', topic, message, packet)
- this.handleMessage(packet, function (err) {
- if (err) {
- return done(err)
- }
- // send 'puback' if the above 'handleMessage' method executed
- // successfully.
- that._sendPacket({cmd: 'puback', messageId: mid}, done)
- })
- break
- case 0:
- // emit the message event
- this.emit('message', topic, message, packet)
- this.handleMessage(packet, done)
- break
- default:
- // do nothing
- // log or throw an error about unknown qos
- break
- }
- }
- /**
- * Handle messages with backpressure support, one at a time.
- * Override at will.
- *
- * @param Packet packet the packet
- * @param Function callback call when finished
- * @api public
- */
- MqttClient.prototype.handleMessage = function (packet, callback) {
- callback()
- }
- /**
- * _handleAck
- *
- * @param {Object} packet
- * @api private
- */
- MqttClient.prototype._handleAck = function (packet) {
- /* eslint no-fallthrough: "off" */
- var mid = packet.messageId
- var type = packet.cmd
- var response = null
- var cb = this.outgoing[mid]
- var that = this
- if (!cb) {
- // Server sent an ack in error, ignore it.
- return
- }
- // Process
- switch (type) {
- case 'pubcomp':
- // same thing as puback for QoS 2
- case 'puback':
- // Callback - we're done
- delete this.outgoing[mid]
- this.outgoingStore.del(packet, cb)
- break
- case 'pubrec':
- response = {
- cmd: 'pubrel',
- qos: 2,
- messageId: mid
- }
- this._sendPacket(response)
- break
- case 'suback':
- delete this.outgoing[mid]
- if (packet.granted.length === 1 && (packet.granted[0] & 0x80) !== 0) {
- // suback with Failure status
- var topics = this.messageIdToTopic[mid]
- if (topics) {
- topics.forEach(function (topic) {
- delete that._resubscribeTopics[topic]
- })
- }
- }
- cb(null, packet)
- break
- case 'unsuback':
- delete this.outgoing[mid]
- cb(null)
- break
- default:
- that.emit('error', new Error('unrecognized packet type'))
- }
- if (this.disconnecting &&
- Object.keys(this.outgoing).length === 0) {
- this.emit('outgoingEmpty')
- }
- }
- /**
- * _handlePubrel
- *
- * @param {Object} packet
- * @api private
- */
- MqttClient.prototype._handlePubrel = function (packet, callback) {
- callback = typeof callback !== 'undefined' ? callback : nop
- var mid = packet.messageId
- var that = this
- var comp = {cmd: 'pubcomp', messageId: mid}
- that.incomingStore.get(packet, function (err, pub) {
- if (!err && pub.cmd !== 'pubrel') {
- that.emit('message', pub.topic, pub.payload, pub)
- that.incomingStore.put(packet, function (err) {
- if (err) {
- return callback(err)
- }
- that.handleMessage(pub, function (err) {
- if (err) {
- return callback(err)
- }
- that._sendPacket(comp, callback)
- })
- })
- } else {
- that._sendPacket(comp, callback)
- }
- })
- }
- /**
- * _nextId
- * @return unsigned int
- */
- MqttClient.prototype._nextId = function () {
- // id becomes current state of this.nextId and increments afterwards
- var id = this.nextId++
- // Ensure 16 bit unsigned int (max 65535, nextId got one higher)
- if (this.nextId === 65536) {
- this.nextId = 1
- }
- return id
- }
- /**
- * getLastMessageId
- * @return unsigned int
- */
- MqttClient.prototype.getLastMessageId = function () {
- return (this.nextId === 1) ? 65535 : (this.nextId - 1)
- }
- module.exports = MqttClient
|