client.js 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544
  1. 'use strict'
  2. var mqtt = require('..')
  3. var should = require('should')
  4. var fork = require('child_process').fork
  5. var path = require('path')
  6. var abstractClientTests = require('./abstract_client')
  7. var net = require('net')
  8. var eos = require('end-of-stream')
  9. var mqttPacket = require('mqtt-packet')
  10. var Buffer = require('safe-buffer').Buffer
  11. var Duplex = require('readable-stream').Duplex
  12. var Connection = require('mqtt-connection')
  13. var Server = require('./server')
  14. var port = 9876
  15. var server
  16. function connOnlyServer () {
  17. return new Server(function (client) {
  18. client.on('connect', function (packet) {
  19. client.connack({returnCode: 0})
  20. })
  21. })
  22. }
  23. /**
  24. * Test server
  25. */
  26. function buildServer () {
  27. return new Server(function (client) {
  28. client.on('connect', function (packet) {
  29. if (packet.clientId === 'invalid') {
  30. client.connack({returnCode: 2})
  31. } else {
  32. client.connack({returnCode: 0})
  33. }
  34. })
  35. client.on('publish', function (packet) {
  36. setImmediate(function () {
  37. switch (packet.qos) {
  38. case 0:
  39. break
  40. case 1:
  41. client.puback(packet)
  42. break
  43. case 2:
  44. client.pubrec(packet)
  45. break
  46. }
  47. })
  48. })
  49. client.on('pubrel', function (packet) {
  50. client.pubcomp(packet)
  51. })
  52. client.on('pubrec', function (packet) {
  53. client.pubrel(packet)
  54. })
  55. client.on('pubcomp', function () {
  56. // Nothing to be done
  57. })
  58. client.on('subscribe', function (packet) {
  59. client.suback({
  60. messageId: packet.messageId,
  61. granted: packet.subscriptions.map(function (e) {
  62. return e.qos
  63. })
  64. })
  65. })
  66. client.on('unsubscribe', function (packet) {
  67. client.unsuback(packet)
  68. })
  69. client.on('pingreq', function () {
  70. client.pingresp()
  71. })
  72. })
  73. }
  74. server = buildServer().listen(port)
  75. describe('MqttClient', function () {
  76. describe('creating', function () {
  77. it('should allow instantiation of MqttClient without the \'new\' operator', function (done) {
  78. should(function () {
  79. var client
  80. try {
  81. client = mqtt.MqttClient(function () {
  82. throw Error('break')
  83. }, {})
  84. client.end()
  85. } catch (err) {
  86. if (err.message !== 'break') {
  87. throw err
  88. }
  89. done()
  90. }
  91. }).not.throw('Object #<Object> has no method \'_setupStream\'')
  92. })
  93. })
  94. var config = { protocol: 'mqtt', port: port }
  95. abstractClientTests(server, config)
  96. describe('message ids', function () {
  97. it('should increment the message id', function () {
  98. var client = mqtt.connect(config)
  99. var currentId = client._nextId()
  100. client._nextId().should.equal(currentId + 1)
  101. client.end()
  102. })
  103. it('should return 1 once the internal counter reached limit', function () {
  104. var client = mqtt.connect(config)
  105. client.nextId = 65535
  106. client._nextId().should.equal(65535)
  107. client._nextId().should.equal(1)
  108. client.end()
  109. })
  110. it('should return 65535 for last message id once the internal counter reached limit', function () {
  111. var client = mqtt.connect(config)
  112. client.nextId = 65535
  113. client._nextId().should.equal(65535)
  114. client.getLastMessageId().should.equal(65535)
  115. client._nextId().should.equal(1)
  116. client.getLastMessageId().should.equal(1)
  117. client.end()
  118. })
  119. it('should not throw an error if packet\'s messageId is not found when receiving a pubrel packet', function (done) {
  120. var server2 = new Server(function (c) {
  121. c.on('connect', function (packet) {
  122. c.connack({returnCode: 0})
  123. c.pubrel({ messageId: Math.floor(Math.random() * 9000) + 1000 })
  124. })
  125. })
  126. server2.listen(port + 49, function () {
  127. var client = mqtt.connect({
  128. port: port + 49,
  129. host: 'localhost'
  130. })
  131. client.on('packetsend', function (packet) {
  132. if (packet.cmd === 'pubcomp') {
  133. client.end()
  134. server2.close()
  135. done()
  136. }
  137. })
  138. })
  139. })
  140. it('should not go overflow if the TCP frame contains a lot of PUBLISH packets', function (done) {
  141. var parser = mqttPacket.parser()
  142. var count = 0
  143. var max = 1000
  144. var duplex = new Duplex({
  145. read: function (n) {},
  146. write: function (chunk, enc, cb) {
  147. parser.parse(chunk)
  148. cb() // nothing to do
  149. }
  150. })
  151. var client = new mqtt.MqttClient(function () {
  152. return duplex
  153. }, {})
  154. client.on('message', function (t, p, packet) {
  155. if (++count === max) {
  156. done()
  157. }
  158. })
  159. parser.on('packet', function (packet) {
  160. var packets = []
  161. if (packet.cmd === 'connect') {
  162. duplex.push(mqttPacket.generate({
  163. cmd: 'connack',
  164. sessionPresent: false,
  165. returnCode: 0
  166. }))
  167. for (var i = 0; i < max; i++) {
  168. packets.push(mqttPacket.generate({
  169. cmd: 'publish',
  170. topic: Buffer.from('hello'),
  171. payload: Buffer.from('world'),
  172. retain: false,
  173. dup: false,
  174. messageId: i + 1,
  175. qos: 1
  176. }))
  177. }
  178. duplex.push(Buffer.concat(packets))
  179. }
  180. })
  181. })
  182. })
  183. describe('flushing', function () {
  184. it('should attempt to complete pending unsub and send on ping timeout', function (done) {
  185. this.timeout(10000)
  186. var server3 = connOnlyServer().listen(port + 72)
  187. var pubCallbackCalled = false
  188. var unsubscribeCallbackCalled = false
  189. var client = mqtt.connect({
  190. port: port + 72,
  191. host: 'localhost',
  192. keepalive: 1,
  193. connectTimeout: 350,
  194. reconnectPeriod: 0
  195. })
  196. client.once('connect', () => {
  197. client.publish('fakeTopic', 'fakeMessage', {qos: 1}, (err, result) => {
  198. should.exist(err)
  199. pubCallbackCalled = true
  200. })
  201. client.unsubscribe('fakeTopic', (err, result) => {
  202. should.exist(err)
  203. unsubscribeCallbackCalled = true
  204. })
  205. setTimeout(() => {
  206. client.end(() => {
  207. should.equal(pubCallbackCalled && unsubscribeCallbackCalled, true, 'callbacks not invoked')
  208. server3.close()
  209. done()
  210. })
  211. }, 5000)
  212. })
  213. })
  214. })
  215. describe('reconnecting', function () {
  216. it('should attempt to reconnect once server is down', function (done) {
  217. this.timeout(15000)
  218. var innerServer = fork(path.join(__dirname, 'helpers', 'server_process.js'))
  219. var client = mqtt.connect({ port: 3000, host: 'localhost', keepalive: 1 })
  220. client.once('connect', function () {
  221. innerServer.kill('SIGINT') // mocks server shutdown
  222. client.once('close', function () {
  223. should.exist(client.reconnectTimer)
  224. client.end()
  225. done()
  226. })
  227. })
  228. })
  229. it('should reconnect to multiple host-ports-protocol combinations if servers is passed', function (done) {
  230. this.timeout(15000)
  231. var server2 = buildServer().listen(port + 42)
  232. server2.on('listening', function () {
  233. var client = mqtt.connect({
  234. protocol: 'wss',
  235. servers: [
  236. { port: port + 42, host: 'localhost', protocol: 'ws' },
  237. { port: port, host: 'localhost' }
  238. ],
  239. keepalive: 50
  240. })
  241. server2.on('client', function (c) {
  242. should.equal(client.stream.socket.url, 'ws://localhost:9918/', 'Protocol for first connection should use ws.')
  243. c.stream.destroy()
  244. server2.close()
  245. })
  246. server.once('client', function () {
  247. should.equal(client.stream.socket.url, 'wss://localhost:9876/', 'Protocol for second client should use the default protocol: wss, on port: port + 42.')
  248. client.end()
  249. done()
  250. })
  251. client.once('connect', function () {
  252. client.stream.destroy()
  253. })
  254. })
  255. })
  256. it('should reconnect if a connack is not received in an interval', function (done) {
  257. this.timeout(2000)
  258. var server2 = net.createServer().listen(port + 43)
  259. server2.on('connection', function (c) {
  260. eos(c, function () {
  261. server2.close()
  262. })
  263. })
  264. server2.on('listening', function () {
  265. var client = mqtt.connect({
  266. servers: [
  267. { port: port + 43, host: 'localhost_fake' },
  268. { port: port, host: 'localhost' }
  269. ],
  270. connectTimeout: 500
  271. })
  272. server.once('client', function () {
  273. client.end()
  274. done()
  275. })
  276. client.once('connect', function () {
  277. client.stream.destroy()
  278. })
  279. })
  280. })
  281. it('should not be cleared by the connack timer', function (done) {
  282. this.timeout(4000)
  283. var server2 = net.createServer().listen(port + 44)
  284. server2.on('connection', function (c) {
  285. c.destroy()
  286. })
  287. server2.once('listening', function () {
  288. var reconnects = 0
  289. var connectTimeout = 1000
  290. var reconnectPeriod = 100
  291. var expectedReconnects = Math.floor(connectTimeout / reconnectPeriod)
  292. var client = mqtt.connect({
  293. port: port + 44,
  294. host: 'localhost',
  295. connectTimeout: connectTimeout,
  296. reconnectPeriod: reconnectPeriod
  297. })
  298. client.on('reconnect', function () {
  299. reconnects++
  300. if (reconnects >= expectedReconnects) {
  301. client.end()
  302. done()
  303. }
  304. })
  305. })
  306. })
  307. it('should not keep requeueing the first message when offline', function (done) {
  308. this.timeout(2500)
  309. var server2 = buildServer().listen(port + 45)
  310. var client = mqtt.connect({
  311. port: port + 45,
  312. host: 'localhost',
  313. connectTimeout: 350,
  314. reconnectPeriod: 300
  315. })
  316. server2.on('client', function (c) {
  317. client.publish('hello', 'world', { qos: 1 }, function () {
  318. c.destroy()
  319. server2.close()
  320. client.publish('hello', 'world', { qos: 1 })
  321. })
  322. })
  323. setTimeout(function () {
  324. if (client.queue.length === 0) {
  325. client.end(true)
  326. done()
  327. } else {
  328. client.end(true)
  329. }
  330. }, 2000)
  331. })
  332. it('should not send the same subscribe multiple times on a flaky connection', function (done) {
  333. this.timeout(3500)
  334. var KILL_COUNT = 4
  335. var killedConnections = 0
  336. var subIds = {}
  337. var client = mqtt.connect({
  338. port: port + 46,
  339. host: 'localhost',
  340. connectTimeout: 350,
  341. reconnectPeriod: 300
  342. })
  343. var server2 = new Server(function (client) {
  344. client.on('error', function () {})
  345. client.on('connect', function (packet) {
  346. if (packet.clientId === 'invalid') {
  347. client.connack({returnCode: 2})
  348. } else {
  349. client.connack({returnCode: 0})
  350. }
  351. })
  352. }).listen(port + 46)
  353. server2.on('client', function (c) {
  354. client.subscribe('topic', function () {
  355. done()
  356. client.end()
  357. c.destroy()
  358. server2.close()
  359. })
  360. c.on('subscribe', function (packet) {
  361. if (killedConnections < KILL_COUNT) {
  362. // Kill the first few sub attempts to simulate a flaky connection
  363. killedConnections++
  364. c.destroy()
  365. } else {
  366. // Keep track of acks
  367. if (!subIds[packet.messageId]) {
  368. subIds[packet.messageId] = 0
  369. }
  370. subIds[packet.messageId]++
  371. if (subIds[packet.messageId] > 1) {
  372. done(new Error('Multiple duplicate acked subscriptions received for messageId ' + packet.messageId))
  373. client.end(true)
  374. c.destroy()
  375. server2.destroy()
  376. }
  377. c.suback({
  378. messageId: packet.messageId,
  379. granted: packet.subscriptions.map(function (e) {
  380. return e.qos
  381. })
  382. })
  383. }
  384. })
  385. })
  386. })
  387. it('should not fill the queue of subscribes if it cannot connect', function (done) {
  388. this.timeout(2500)
  389. var port2 = port + 48
  390. var server2 = net.createServer(function (stream) {
  391. var client = new Connection(stream)
  392. client.on('error', function () {})
  393. client.on('connect', function (packet) {
  394. client.connack({returnCode: 0})
  395. client.destroy()
  396. })
  397. })
  398. server2.listen(port2, function () {
  399. var client = mqtt.connect({
  400. port: port2,
  401. host: 'localhost',
  402. connectTimeout: 350,
  403. reconnectPeriod: 300
  404. })
  405. client.subscribe('hello')
  406. setTimeout(function () {
  407. client.queue.length.should.equal(1)
  408. client.end()
  409. done()
  410. }, 1000)
  411. })
  412. })
  413. it('should not send the same publish multiple times on a flaky connection', function (done) {
  414. this.timeout(3500)
  415. var KILL_COUNT = 4
  416. var killedConnections = 0
  417. var pubIds = {}
  418. var client = mqtt.connect({
  419. port: port + 47,
  420. host: 'localhost',
  421. connectTimeout: 350,
  422. reconnectPeriod: 300
  423. })
  424. var server2 = net.createServer(function (stream) {
  425. var client = new Connection(stream)
  426. client.on('error', function () {})
  427. client.on('connect', function (packet) {
  428. if (packet.clientId === 'invalid') {
  429. client.connack({returnCode: 2})
  430. } else {
  431. client.connack({returnCode: 0})
  432. }
  433. })
  434. this.emit('client', client)
  435. }).listen(port + 47)
  436. server2.on('client', function (c) {
  437. client.publish('topic', 'data', { qos: 1 }, function () {
  438. done()
  439. client.end()
  440. c.destroy()
  441. server2.destroy()
  442. })
  443. c.on('publish', function onPublish (packet) {
  444. if (killedConnections < KILL_COUNT) {
  445. // Kill the first few pub attempts to simulate a flaky connection
  446. killedConnections++
  447. c.destroy()
  448. // to avoid receiving inflight messages
  449. c.removeListener('publish', onPublish)
  450. } else {
  451. // Keep track of acks
  452. if (!pubIds[packet.messageId]) {
  453. pubIds[packet.messageId] = 0
  454. }
  455. pubIds[packet.messageId]++
  456. if (pubIds[packet.messageId] > 1) {
  457. done(new Error('Multiple duplicate acked publishes received for messageId ' + packet.messageId))
  458. client.end(true)
  459. c.destroy()
  460. server2.destroy()
  461. }
  462. c.puback(packet)
  463. }
  464. })
  465. })
  466. })
  467. })
  468. })