'use strict' /** * Module dependencies */ var events = require('events') var Store = require('./store') var eos = require('end-of-stream') var mqttPacket = require('mqtt-packet') var Writable = require('readable-stream').Writable var inherits = require('inherits') var reInterval = require('reinterval') var validations = require('./validations') var xtend = require('xtend') var setImmediate = global.setImmediate || function (callback) { // works in node v0.8 process.nextTick(callback) } var defaultConnectOptions = { keepalive: 60, reschedulePings: true, protocolId: 'MQTT', protocolVersion: 4, reconnectPeriod: 1000, connectTimeout: 30 * 1000, clean: true, resubscribe: true } function defaultId () { return 'mqttjs_' + Math.random().toString(16).substr(2, 8) } function sendPacket (client, packet, cb) { client.emit('packetsend', packet) var result = mqttPacket.writeToStream(packet, client.stream) if (!result && cb) { client.stream.once('drain', cb) } else if (cb) { cb() } } function flush (queue) { if (queue) { Object.keys(queue).forEach(function (messageId) { if (typeof queue[messageId] === 'function') { queue[messageId](new Error('Connection closed')) delete queue[messageId] } }) } } function storeAndSend (client, packet, cb) { client.outgoingStore.put(packet, function storedPacket (err) { if (err) { return cb && cb(err) } sendPacket(client, packet, cb) }) } function nop () {} /** * MqttClient constructor * * @param {Stream} stream - stream * @param {Object} [options] - connection options * (see Connection#connect) */ function MqttClient (streamBuilder, options) { var k var that = this if (!(this instanceof MqttClient)) { return new MqttClient(streamBuilder, options) } this.options = options || {} // Defaults for (k in defaultConnectOptions) { if (typeof this.options[k] === 'undefined') { this.options[k] = defaultConnectOptions[k] } else { this.options[k] = options[k] } } this.options.clientId = (typeof this.options.clientId === 'string') ? this.options.clientId : defaultId() this.streamBuilder = streamBuilder // Inflight message storages this.outgoingStore = this.options.outgoingStore || new Store() this.incomingStore = this.options.incomingStore || new Store() // Should QoS zero messages be queued when the connection is broken? this.queueQoSZero = this.options.queueQoSZero === undefined ? true : this.options.queueQoSZero // map of subscribed topics to support reconnection this._resubscribeTopics = {} // map of a subscribe messageId and a topic this.messageIdToTopic = {} // Ping timer, setup in _setupPingTimer this.pingTimer = null // Is the client connected? this.connected = false // Are we disconnecting? this.disconnecting = false // Packet queue this.queue = [] // connack timer this.connackTimer = null // Reconnect timer this.reconnectTimer = null /** * MessageIDs starting with 1 * ensure that nextId is min. 1, see https://github.com/mqttjs/MQTT.js/issues/810 */ this.nextId = Math.max(1, Math.floor(Math.random() * 65535)) // Inflight callbacks this.outgoing = {} // Mark connected on connect this.on('connect', function () { if (this.disconnected) { return } this.connected = true var outStore = this.outgoingStore.createStream() this.once('close', remove) outStore.on('end', function () { that.removeListener('close', remove) }) outStore.on('error', function (err) { that.removeListener('close', remove) that.emit('error', err) }) function remove () { outStore.destroy() outStore = null } function storeDeliver () { // edge case, we wrapped this twice if (!outStore) { return } var packet = outStore.read(1) var cb if (!packet) { // read when data is available in the future outStore.once('readable', storeDeliver) return } // Avoid unnecessary stream read operations when disconnected if (!that.disconnecting && !that.reconnectTimer) { cb = that.outgoing[packet.messageId] that.outgoing[packet.messageId] = function (err, status) { // Ensure that the original callback passed in to publish gets invoked if (cb) { cb(err, status) } storeDeliver() } that._sendPacket(packet) } else if (outStore.destroy) { outStore.destroy() } } // start flowing storeDeliver() }) // Mark disconnected on stream close this.on('close', function () { this.connected = false clearTimeout(this.connackTimer) }) // Setup ping timer this.on('connect', this._setupPingTimer) // Send queued packets this.on('connect', function () { var queue = this.queue function deliver () { var entry = queue.shift() var packet = null if (!entry) { return } packet = entry.packet that._sendPacket( packet, function (err) { if (entry.cb) { entry.cb(err) } deliver() } ) } deliver() }) var firstConnection = true // resubscribe this.on('connect', function () { if (!firstConnection && this.options.clean && Object.keys(this._resubscribeTopics).length > 0) { if (this.options.resubscribe) { this._resubscribeTopics.resubscribe = true this.subscribe(this._resubscribeTopics) } else { this._resubscribeTopics = {} } } firstConnection = false }) // Clear ping timer this.on('close', function () { if (that.pingTimer !== null) { that.pingTimer.clear() that.pingTimer = null } }) // Setup reconnect timer on disconnect this.on('close', this._setupReconnect) events.EventEmitter.call(this) this._setupStream() } inherits(MqttClient, events.EventEmitter) /** * setup the event handlers in the inner stream. * * @api private */ MqttClient.prototype._setupStream = function () { var connectPacket var that = this var writable = new Writable() var parser = mqttPacket.parser(this.options) var completeParse = null var packets = [] this._clearReconnect() this.stream = this.streamBuilder(this) parser.on('packet', function (packet) { packets.push(packet) }) function nextTickWork () { process.nextTick(work) } function work () { var packet = packets.shift() var done = completeParse if (packet) { that._handlePacket(packet, nextTickWork) } else { completeParse = null done() } } writable._write = function (buf, enc, done) { completeParse = done parser.parse(buf) work() } this.stream.pipe(writable) // Suppress connection errors this.stream.on('error', nop) // Echo stream close eos(this.stream, this.emit.bind(this, 'close')) // Send a connect packet connectPacket = Object.create(this.options) connectPacket.cmd = 'connect' // avoid message queue sendPacket(this, connectPacket) // Echo connection errors parser.on('error', this.emit.bind(this, 'error')) // many drain listeners are needed for qos 1 callbacks if the connection is intermittent this.stream.setMaxListeners(1000) clearTimeout(this.connackTimer) this.connackTimer = setTimeout(function () { that._cleanUp(true) }, this.options.connectTimeout) } MqttClient.prototype._handlePacket = function (packet, done) { this.emit('packetreceive', packet) switch (packet.cmd) { case 'publish': this._handlePublish(packet, done) break case 'puback': case 'pubrec': case 'pubcomp': case 'suback': case 'unsuback': this._handleAck(packet) done() break case 'pubrel': this._handlePubrel(packet, done) break case 'connack': this._handleConnack(packet) done() break case 'pingresp': this._handlePingresp(packet) done() break default: // do nothing // maybe we should do an error handling // or just log it break } } MqttClient.prototype._checkDisconnecting = function (callback) { if (this.disconnecting) { if (callback) { callback(new Error('client disconnecting')) } else { this.emit('error', new Error('client disconnecting')) } } return this.disconnecting } /** * publish - publish to * * @param {String} topic - topic to publish to * @param {String, Buffer} message - message to publish * @param {Object} [opts] - publish options, includes: * {Number} qos - qos level to publish on * {Boolean} retain - whether or not to retain the message * {Boolean} dup - whether or not mark a message as duplicate * @param {Function} [callback] - function(err){} * called when publish succeeds or fails * @returns {MqttClient} this - for chaining * @api public * * @example client.publish('topic', 'message'); * @example * client.publish('topic', 'message', {qos: 1, retain: true, dup: true}); * @example client.publish('topic', 'message', console.log); */ MqttClient.prototype.publish = function (topic, message, opts, callback) { var packet // .publish(topic, payload, cb); if (typeof opts === 'function') { callback = opts opts = null } // default opts var defaultOpts = {qos: 0, retain: false, dup: false} opts = xtend(defaultOpts, opts) if (this._checkDisconnecting(callback)) { return this } packet = { cmd: 'publish', topic: topic, payload: message, qos: opts.qos, retain: opts.retain, messageId: this._nextId(), dup: opts.dup } switch (opts.qos) { case 1: case 2: // Add to callbacks this.outgoing[packet.messageId] = callback || nop this._sendPacket(packet) break default: this._sendPacket(packet, callback) break } return this } /** * subscribe - subscribe to * * @param {String, Array, Object} topic - topic(s) to subscribe to, supports objects in the form {'topic': qos} * @param {Object} [opts] - optional subscription options, includes: * {Number} qos - subscribe qos level * @param {Function} [callback] - function(err, granted){} where: * {Error} err - subscription error (none at the moment!) * {Array} granted - array of {topic: 't', qos: 0} * @returns {MqttClient} this - for chaining * @api public * @example client.subscribe('topic'); * @example client.subscribe('topic', {qos: 1}); * @example client.subscribe({'topic': 0, 'topic2': 1}, console.log); * @example client.subscribe('topic', console.log); */ MqttClient.prototype.subscribe = function () { var packet var args = Array.prototype.slice.call(arguments) var subs = [] var obj = args.shift() var resubscribe = obj.resubscribe var callback = args.pop() || nop var opts = args.pop() var invalidTopic var that = this delete obj.resubscribe if (typeof obj === 'string') { obj = [obj] } if (typeof callback !== 'function') { opts = callback callback = nop } invalidTopic = validations.validateTopics(obj) if (invalidTopic !== null) { setImmediate(callback, new Error('Invalid topic ' + invalidTopic)) return this } if (this._checkDisconnecting(callback)) { return this } var defaultOpts = { qos: 0 } opts = xtend(defaultOpts, opts) if (Array.isArray(obj)) { obj.forEach(function (topic) { if (that._resubscribeTopics[topic] < opts.qos || !that._resubscribeTopics.hasOwnProperty(topic) || resubscribe) { subs.push({ topic: topic, qos: opts.qos }) } }) } else { Object .keys(obj) .forEach(function (k) { if (that._resubscribeTopics[k] < obj[k] || !that._resubscribeTopics.hasOwnProperty(k) || resubscribe) { subs.push({ topic: k, qos: obj[k] }) } }) } packet = { cmd: 'subscribe', subscriptions: subs, qos: 1, retain: false, dup: false, messageId: this._nextId() } if (!subs.length) { callback(null, []) return } // subscriptions to resubscribe to in case of disconnect if (this.options.resubscribe) { var topics = [] subs.forEach(function (sub) { if (that.options.reconnectPeriod > 0) { that._resubscribeTopics[sub.topic] = sub.qos topics.push(sub.topic) } }) that.messageIdToTopic[packet.messageId] = topics } this.outgoing[packet.messageId] = function (err, packet) { if (!err) { var granted = packet.granted for (var i = 0; i < granted.length; i += 1) { subs[i].qos = granted[i] } } callback(err, subs) } this._sendPacket(packet) return this } /** * unsubscribe - unsubscribe from topic(s) * * @param {String, Array} topic - topics to unsubscribe from * @param {Function} [callback] - callback fired on unsuback * @returns {MqttClient} this - for chaining * @api public * @example client.unsubscribe('topic'); * @example client.unsubscribe('topic', console.log); */ MqttClient.prototype.unsubscribe = function (topic, callback) { var packet = { cmd: 'unsubscribe', qos: 1, messageId: this._nextId() } var that = this callback = callback || nop if (this._checkDisconnecting(callback)) { return this } if (typeof topic === 'string') { packet.unsubscriptions = [topic] } else if (typeof topic === 'object' && topic.length) { packet.unsubscriptions = topic } if (this.options.resubscribe) { packet.unsubscriptions.forEach(function (topic) { delete that._resubscribeTopics[topic] }) } this.outgoing[packet.messageId] = callback this._sendPacket(packet) return this } /** * end - close connection * * @returns {MqttClient} this - for chaining * @param {Boolean} force - do not wait for all in-flight messages to be acked * @param {Function} cb - called when the client has been closed * * @api public */ MqttClient.prototype.end = function (force, cb) { var that = this if (typeof force === 'function') { cb = force force = false } function closeStores () { that.disconnected = true that.incomingStore.close(function () { that.outgoingStore.close(function () { if (cb) { cb.apply(null, arguments) } that.emit('end') }) }) if (that._deferredReconnect) { that._deferredReconnect() } } function finish () { // defer closesStores of an I/O cycle, // just to make sure things are // ok for websockets that._cleanUp(force, setImmediate.bind(null, closeStores)) } if (this.disconnecting) { return this } this._clearReconnect() this.disconnecting = true if (!force && Object.keys(this.outgoing).length > 0) { // wait 10ms, just to be sure we received all of it this.once('outgoingEmpty', setTimeout.bind(null, finish, 10)) } else { finish() } return this } /** * removeOutgoingMessage - remove a message in outgoing store * the outgoing callback will be called withe Error('Message removed') if the message is removed * * @param {Number} mid - messageId to remove message * @returns {MqttClient} this - for chaining * @api public * * @example client.removeOutgoingMessage(client.getLastMessageId()); */ MqttClient.prototype.removeOutgoingMessage = function (mid) { var cb = this.outgoing[mid] delete this.outgoing[mid] this.outgoingStore.del({messageId: mid}, function () { cb(new Error('Message removed')) }) return this } /** * reconnect - connect again using the same options as connect() * * @param {Object} [opts] - optional reconnect options, includes: * {Store} incomingStore - a store for the incoming packets * {Store} outgoingStore - a store for the outgoing packets * if opts is not given, current stores are used * @returns {MqttClient} this - for chaining * * @api public */ MqttClient.prototype.reconnect = function (opts) { var that = this var f = function () { if (opts) { that.options.incomingStore = opts.incomingStore that.options.outgoingStore = opts.outgoingStore } else { that.options.incomingStore = null that.options.outgoingStore = null } that.incomingStore = that.options.incomingStore || new Store() that.outgoingStore = that.options.outgoingStore || new Store() that.disconnecting = false that.disconnected = false that._deferredReconnect = null that._reconnect() } if (this.disconnecting && !this.disconnected) { this._deferredReconnect = f } else { f() } return this } /** * _reconnect - implement reconnection * @api privateish */ MqttClient.prototype._reconnect = function () { this.emit('reconnect') this._setupStream() } /** * _setupReconnect - setup reconnect timer */ MqttClient.prototype._setupReconnect = function () { var that = this if (!that.disconnecting && !that.reconnectTimer && (that.options.reconnectPeriod > 0)) { if (!this.reconnecting) { this.emit('offline') this.reconnecting = true } that.reconnectTimer = setInterval(function () { that._reconnect() }, that.options.reconnectPeriod) } } /** * _clearReconnect - clear the reconnect timer */ MqttClient.prototype._clearReconnect = function () { if (this.reconnectTimer) { clearInterval(this.reconnectTimer) this.reconnectTimer = null } } /** * _cleanUp - clean up on connection end * @api private */ MqttClient.prototype._cleanUp = function (forced, done) { if (done) { this.stream.on('close', done) } if (forced) { if ((this.options.reconnectPeriod === 0) && this.options.clean) { flush(this.outgoing) } this.stream.destroy() } else { this._sendPacket( { cmd: 'disconnect' }, setImmediate.bind( null, this.stream.end.bind(this.stream) ) ) } if (!this.disconnecting) { this._clearReconnect() this._setupReconnect() } if (this.pingTimer !== null) { this.pingTimer.clear() this.pingTimer = null } if (done && !this.connected) { this.stream.removeListener('close', done) done() } } /** * _sendPacket - send or queue a packet * @param {String} type - packet type (see `protocol`) * @param {Object} packet - packet options * @param {Function} cb - callback when the packet is sent * @api private */ MqttClient.prototype._sendPacket = function (packet, cb) { if (!this.connected) { if (((packet.qos || 0) === 0 && this.queueQoSZero) || packet.cmd !== 'publish') { this.queue.push({ packet: packet, cb: cb }) } else if (packet.qos > 0) { cb = this.outgoing[packet.messageId] this.outgoingStore.put(packet, function (err) { if (err) { return cb && cb(err) } }) } else if (cb) { cb(new Error('No connection to broker')) } return } // When sending a packet, reschedule the ping timer this._shiftPingInterval() switch (packet.cmd) { case 'publish': break case 'pubrel': storeAndSend(this, packet, cb) return default: sendPacket(this, packet, cb) return } switch (packet.qos) { case 2: case 1: storeAndSend(this, packet, cb) break /** * no need of case here since it will be caught by default * and jshint comply that before default it must be a break * anyway it will result in -1 evaluation */ case 0: /* falls through */ default: sendPacket(this, packet, cb) break } } /** * _setupPingTimer - setup the ping timer * * @api private */ MqttClient.prototype._setupPingTimer = function () { var that = this if (!this.pingTimer && this.options.keepalive) { this.pingResp = true this.pingTimer = reInterval(function () { that._checkPing() }, this.options.keepalive * 1000) } } /** * _shiftPingInterval - reschedule the ping interval * * @api private */ MqttClient.prototype._shiftPingInterval = function () { if (this.pingTimer && this.options.keepalive && this.options.reschedulePings) { this.pingTimer.reschedule(this.options.keepalive * 1000) } } /** * _checkPing - check if a pingresp has come back, and ping the server again * * @api private */ MqttClient.prototype._checkPing = function () { if (this.pingResp) { this.pingResp = false this._sendPacket({ cmd: 'pingreq' }) } else { // do a forced cleanup since socket will be in bad shape this._cleanUp(true) } } /** * _handlePingresp - handle a pingresp * * @api private */ MqttClient.prototype._handlePingresp = function () { this.pingResp = true } /** * _handleConnack * * @param {Object} packet * @api private */ MqttClient.prototype._handleConnack = function (packet) { var rc = packet.returnCode var errors = [ '', 'Unacceptable protocol version', 'Identifier rejected', 'Server unavailable', 'Bad username or password', 'Not authorized' ] clearTimeout(this.connackTimer) if (rc === 0) { this.reconnecting = false this.emit('connect', packet) } else if (rc > 0) { var err = new Error('Connection refused: ' + errors[rc]) err.code = rc this.emit('error', err) } } /** * _handlePublish * * @param {Object} packet * @api private */ /* those late 2 case should be rewrite to comply with coding style: case 1: case 0: // do not wait sending a puback // no callback passed if (1 === qos) { this._sendPacket({ cmd: 'puback', messageId: mid }); } // emit the message event for both qos 1 and 0 this.emit('message', topic, message, packet); this.handleMessage(packet, done); break; default: // do nothing but every switch mus have a default // log or throw an error about unknown qos break; for now i just suppressed the warnings */ MqttClient.prototype._handlePublish = function (packet, done) { done = typeof done !== 'undefined' ? done : nop var topic = packet.topic.toString() var message = packet.payload var qos = packet.qos var mid = packet.messageId var that = this switch (qos) { case 2: this.incomingStore.put(packet, function (err) { if (err) { return done(err) } that._sendPacket({cmd: 'pubrec', messageId: mid}, done) }) break case 1: // emit the message event this.emit('message', topic, message, packet) this.handleMessage(packet, function (err) { if (err) { return done(err) } // send 'puback' if the above 'handleMessage' method executed // successfully. that._sendPacket({cmd: 'puback', messageId: mid}, done) }) break case 0: // emit the message event this.emit('message', topic, message, packet) this.handleMessage(packet, done) break default: // do nothing // log or throw an error about unknown qos break } } /** * Handle messages with backpressure support, one at a time. * Override at will. * * @param Packet packet the packet * @param Function callback call when finished * @api public */ MqttClient.prototype.handleMessage = function (packet, callback) { callback() } /** * _handleAck * * @param {Object} packet * @api private */ MqttClient.prototype._handleAck = function (packet) { /* eslint no-fallthrough: "off" */ var mid = packet.messageId var type = packet.cmd var response = null var cb = this.outgoing[mid] var that = this if (!cb) { // Server sent an ack in error, ignore it. return } // Process switch (type) { case 'pubcomp': // same thing as puback for QoS 2 case 'puback': // Callback - we're done delete this.outgoing[mid] this.outgoingStore.del(packet, cb) break case 'pubrec': response = { cmd: 'pubrel', qos: 2, messageId: mid } this._sendPacket(response) break case 'suback': delete this.outgoing[mid] if (packet.granted.length === 1 && (packet.granted[0] & 0x80) !== 0) { // suback with Failure status var topics = this.messageIdToTopic[mid] if (topics) { topics.forEach(function (topic) { delete that._resubscribeTopics[topic] }) } } cb(null, packet) break case 'unsuback': delete this.outgoing[mid] cb(null) break default: that.emit('error', new Error('unrecognized packet type')) } if (this.disconnecting && Object.keys(this.outgoing).length === 0) { this.emit('outgoingEmpty') } } /** * _handlePubrel * * @param {Object} packet * @api private */ MqttClient.prototype._handlePubrel = function (packet, callback) { callback = typeof callback !== 'undefined' ? callback : nop var mid = packet.messageId var that = this var comp = {cmd: 'pubcomp', messageId: mid} that.incomingStore.get(packet, function (err, pub) { if (!err && pub.cmd !== 'pubrel') { that.emit('message', pub.topic, pub.payload, pub) that.incomingStore.put(packet, function (err) { if (err) { return callback(err) } that.handleMessage(pub, function (err) { if (err) { return callback(err) } that._sendPacket(comp, callback) }) }) } else { that._sendPacket(comp, callback) } }) } /** * _nextId * @return unsigned int */ MqttClient.prototype._nextId = function () { // id becomes current state of this.nextId and increments afterwards var id = this.nextId++ // Ensure 16 bit unsigned int (max 65535, nextId got one higher) if (this.nextId === 65536) { this.nextId = 1 } return id } /** * getLastMessageId * @return unsigned int */ MqttClient.prototype.getLastMessageId = function () { return (this.nextId === 1) ? 65535 : (this.nextId - 1) } module.exports = MqttClient