From 4f242f47bc8568299f04bade8aa4d1d11b939912 Mon Sep 17 00:00:00 2001 From: Daniel Lando Date: Tue, 30 Apr 2024 15:49:34 +0200 Subject: [PATCH] fix: keepalive issues (#1855) * fix: keepalive issues * fix: typo * fix: add missing shift * fix: avoid race conditions * fix: improve tests --- example.ts | 36 ++++++++------- src/lib/client.ts | 58 ++++++++++++++---------- src/lib/handlers/index.ts | 10 ++++- test/abstract_client.ts | 94 +++++++++++++++++++++++++++++---------- 4 files changed, 133 insertions(+), 65 deletions(-) diff --git a/example.ts b/example.ts index 1f26a4b6a..363b979f7 100644 --- a/example.ts +++ b/example.ts @@ -1,24 +1,25 @@ -import mqtt from '.' +import mqtt from './src/index' -const client = mqtt.connect('mqtt://test.mosquitto.org', { +const client = mqtt.connect('mqtts://test.mosquitto.org', { keepalive: 10, + port: 8883, reconnectPeriod: 15000, + rejectUnauthorized: false, }) -const testTopic = 'presence' +const randomNumber = Math.floor(Math.random() * 1000) + +const testTopic = `presence_${randomNumber.toString()}` function publish() { - client.publish( - testTopic, - `Hello mqtt ${new Date().toISOString()}`, - (err2) => { - if (!err2) { - console.log('message published') - } else { - console.error(err2) - } - }, - ) + const msg = `Hello mqtt ${new Date().toISOString()}` + client.publish(testTopic, msg, { qos: 1 }, (err2) => { + if (!err2) { + console.log('message published') + } else { + console.error(err2) + } + }) } client.subscribe(testTopic, (err) => { @@ -31,11 +32,12 @@ client.subscribe(testTopic, (err) => { client.on('message', (topic, message) => { console.log('received message "%s" from topic "%s"', message, topic) - setTimeout(() => { - publish() - }, 2000) }) +setInterval(() => { + publish() +}, 2000) + client.on('error', (err) => { console.error(err) }) diff --git a/src/lib/client.ts b/src/lib/client.ts index 5c73bb3d5..153b4a5c5 100644 --- a/src/lib/client.ts +++ b/src/lib/client.ts @@ -424,8 +424,6 @@ export default class MqttClient extends TypedEventEmitter void } @@ -435,6 +433,9 @@ export default class MqttClient extends TypedEventEmitter void + /** Timestamp of last received control packet */ + public pingResp: number + public pingTimer: PingTimer /** @@ -659,11 +660,7 @@ export default class MqttClient extends TypedEventEmitter { @@ -2092,10 +2082,20 @@ export default class MqttClient extends TypedEventEmitter { }) return client } + + // keep track of last time we received a packet (for keepalive mechanism) + client.pingResp = Date.now() + + // do not shift on pingresp otherwise we would skip the pingreq sending + if (packet.cmd !== 'pingresp') { + client['_shiftPingInterval']() + } + client.log('_handlePacket :: emitting packetreceive') client.emit('packetreceive', packet) @@ -49,7 +58,6 @@ const handle: PacketHandler = (client, packet, done) => { break case 'pingresp': // this will be checked in _checkPing client method every keepalive interval - client.pingResp = true done() break case 'disconnect': diff --git a/test/abstract_client.ts b/test/abstract_client.ts index eedf07802..efaad272a 100644 --- a/test/abstract_client.ts +++ b/test/abstract_client.ts @@ -2,7 +2,7 @@ * Testing dependencies */ import { assert } from 'chai' -import sinon from 'sinon' +import sinon, { SinonSpy } from 'sinon' import fs from 'fs' import levelStore from 'mqtt-level-store' import Store from '../src/lib/store' @@ -93,11 +93,13 @@ export default function abstractTest(server, config, ports) { client.once('close', () => { assert.notExists(client.pingTimer) + client.end(true, (err) => done(err)) }) client.once('connect', () => { assert.exists(client.pingTimer) + client.stream.end() }) }) @@ -1980,6 +1982,12 @@ export default function abstractTest(server, config, ports) { const spy = sinon.spy() client['_checkPing'] = spy + client.on('error', (err) => { + client.end(true, () => { + done(err) + }) + }) + client.once('connect', () => { clock.tick(interval * 1000) assert.strictEqual(spy.callCount, 1) @@ -1994,7 +2002,7 @@ export default function abstractTest(server, config, ports) { }) }) - it('should not checkPing if publishing at a higher rate than keepalive', function _test(t, done) { + it('should not shift ping on publish', function _test(t, done) { const intervalMs = 3000 const client = connect({ keepalive: intervalMs / 1000 }) @@ -2003,35 +2011,70 @@ export default function abstractTest(server, config, ports) { client.once('connect', () => { client.publish('foo', 'bar') - clock.tick(intervalMs - 1) + clock.tick(intervalMs) client.publish('foo', 'bar') - clock.tick(2) + clock.tick(intervalMs) - assert.strictEqual(spy.callCount, 0) + assert.strictEqual(spy.callCount, 2) client.end(true, done) }) }) - it('should checkPing if publishing at a higher rate than keepalive and reschedulePings===false', function _test(t, done) { - const intervalMs = 3000 - const client = connect({ - keepalive: intervalMs / 1000, - reschedulePings: false, - }) + const checkPing = (reschedulePings: boolean) => { + it(`should checkPing if publishing at a higher rate than keepalive and reschedulePings===${reschedulePings}`, function _test(t, done) { + const intervalMs = 3000 + const client = connect({ + keepalive: intervalMs / 1000, + reschedulePings, + }) - const spy = sinon.spy() - client['_checkPing'] = spy + const spyReschedule = sinon.spy( + client, + '_reschedulePing' as any, + ) - client.once('connect', () => { - client.publish('foo', 'bar') - clock.tick(intervalMs - 1) - client.publish('foo', 'bar') - clock.tick(2) + let received = 0 - assert.strictEqual(spy.callCount, 1) - client.end(true, done) + client.on('packetreceive', (packet) => { + if (packet.cmd === 'puback') { + clock.tick(intervalMs) + + received++ + + if (reschedulePings) { + assert.strictEqual( + spyReschedule.callCount, + received, + ) + } else { + assert.strictEqual(spyReschedule.callCount, 0) + } + + if (received === 2) { + client.end(true, done) + } + } + }) + + server.once('client', (serverClient) => { + serverClient.on('publish', () => { + // needed to trigger the setImmediate inside server publish listener and send suback + clock.tick(1) + }) + }) + + client.once('connect', () => { + // reset call count (it's called also on connack) + spyReschedule.resetHistory() + // use qos1 so the puback is received (to reschedule ping) + client.publish('foo', 'bar', { qos: 1 }) + client.publish('foo', 'bar', { qos: 1 }) + }) }) - }) + } + + checkPing(true) + checkPing(false) }) describe('pinging', () => { @@ -2067,13 +2110,16 @@ export default function abstractTest(server, config, ports) { } }) - let client = connect({ + const options: IClientOptions = { keepalive: 60, reconnectPeriod: 5000, - }) + } + + let client = connect() client.once('connect', () => { - client.pingResp = false + // when using fake timers Date.now() counts from 0: https://sinonjs.org/releases/latest/fake-timers/ + client.pingResp = -options.keepalive * 1000 client.once('error', (err) => { assert.equal(err.message, 'Keepalive timeout')