Skip to content

Commit

Permalink
feat: connection error handler (mqttjs#1053)
Browse files Browse the repository at this point in the history
  • Loading branch information
Yoseph Maguire authored and Cédric von Allmen committed Nov 27, 2020
1 parent 3811def commit 1e949c8
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 15 deletions.
18 changes: 14 additions & 4 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
/**
* Module dependencies
*/
var events = require('events')
var EventEmitter = require('events').EventEmitter
var Store = require('./store')
var mqttPacket = require('mqtt-packet')
var Writable = require('readable-stream').Writable
Expand Down Expand Up @@ -257,12 +257,12 @@ function MqttClient (streamBuilder, options) {
// Setup reconnect timer on disconnect
this.on('close', this._setupReconnect)

events.EventEmitter.call(this)
EventEmitter.call(this)

debug('MqttClient: call _setupStream')
this._setupStream()
}
inherits(MqttClient, events.EventEmitter)
inherits(MqttClient, EventEmitter)

/**
* setup the event handlers in the inner stream.
Expand Down Expand Up @@ -320,11 +320,21 @@ MqttClient.prototype._setupStream = function () {
work()
}

function streamErrorHandler (error) {
debug('stream error')
if (error.code === 'ECONNREFUSED') {
// handle error
that.emit('error', error)
} else {
nop(error)
}
}

debug('_setupStream: piping stream to writable')
this.stream.pipe(writable)

// Suppress connection errors
this.stream.on('error', nop)
this.stream.on('error', streamErrorHandler)

// Echo stream close
this.stream.on('close', function () {
Expand Down
4 changes: 2 additions & 2 deletions lib/connect/tcp.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ var net = require('net')
variables port and host can be removed since
you have all required information in opts object
*/
function buildBuilder (client, opts) {
function streamBuilder (client, opts) {
var port, host
opts.port = opts.port || 1883
opts.hostname = opts.hostname || opts.host || 'localhost'
Expand All @@ -16,4 +16,4 @@ function buildBuilder (client, opts) {
return net.createConnection(port, host)
}

module.exports = buildBuilder
module.exports = streamBuilder
8 changes: 5 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
"contributors": [
"Adam Rudd <[email protected]>",
"Matteo Collina <[email protected]> (https://github.com/mcollina)",
"Siarhei Buntsevich <[email protected]> (https://github.com/scarry1992)"
"Siarhei Buntsevich <[email protected]> (https://github.com/scarry1992)",
"Yoseph Maguire <[email protected]> (https://github.com/YoDaMa)"
],
"keywords": [
"mqtt",
Expand Down Expand Up @@ -83,6 +84,7 @@
"@types/node": "^10.0.0",
"airtap": "^3.0.0",
"browserify": "^16.2.2",
"chai": "^4.2.0",
"codecov": "^3.0.4",
"global": "^4.3.2",
"istanbul": "^0.4.5",
Expand All @@ -93,14 +95,14 @@
"rimraf": "^3.0.2",
"safe-buffer": "^5.1.2",
"should": "^13.2.1",
"sinon": "~1.17.7",
"sinon": "^9.0.0",
"snazzy": "^8.0.0",
"standard": "^11.0.1",
"through2": "^3.0.0",
"tslint": "^5.11.0",
"tslint-config-standard": "^8.0.1",
"typescript": "^3.2.2",
"uglify-js": "^3.4.5",
"uglify-js": "^3.8.0",
"ws": "^3.3.3"
},
"standard": {
Expand Down
20 changes: 17 additions & 3 deletions test/abstract_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ var mqtt = require('../')
var xtend = require('xtend')
var Server = require('./server')
var Store = require('./../lib/store')
var assert = require('chai').assert
var port = 9876

module.exports = function (server, config) {
Expand Down Expand Up @@ -336,7 +337,7 @@ module.exports = function (server, config) {
})
})

it('should emit error', function (done) {
it('should emit error on invalid clientId', function (done) {
var client = connect({clientId: 'invalid'})
client.once('connect', function () {
done(new Error('Should not emit connect'))
Expand All @@ -349,6 +350,17 @@ module.exports = function (server, config) {
})
})

it('should emit error event if the socket refuses the connection', function (done) {
// fake a port
var client = connect({ port: 4557 })

client.on('error', function (e) {
assert.equal(e.code, 'ECONNREFUSED')
client.end()
done()
})
})

it('should have different client ids', function (done) {
var client1 = connect()
var client2 = connect()
Expand All @@ -361,7 +373,7 @@ module.exports = function (server, config) {
})

describe('handling offline states', function () {
it('should emit offline events once when the client transitions from connected states to disconnected ones', function (done) {
it('should emit offline event once when the client transitions from connected states to disconnected ones', function (done) {
var client = connect({reconnectPeriod: 20})

client.on('connect', function () {
Expand All @@ -373,10 +385,12 @@ module.exports = function (server, config) {
})
})

it('should emit offline events once when the client (at first) can NOT connect to servers', function (done) {
it('should emit offline event once when the client (at first) can NOT connect to servers', function (done) {
// fake a port
var client = connect({ reconnectPeriod: 20, port: 4557 })

client.on('error', function () {})

client.on('offline', function () {
client.end(true, done)
})
Expand Down
3 changes: 1 addition & 2 deletions test/secure_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ describe('MqttSecureClient', function () {
server.setupConnection(tlsSocket)
})


hostname = 'localhost'
client = mqtt.connect({
protocol: 'mqtts',
Expand All @@ -172,7 +171,7 @@ describe('MqttSecureClient', function () {
rejectUnauthorized: true,
host: hostname
})

client.on('error', function (err) {
done(err)
})
Expand Down
1 change: 0 additions & 1 deletion test/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,3 @@ MqttSecureServer = module.exports.SecureServer =
}
inherits(MqttSecureServer, tls.Server)
MqttSecureServer.prototype.setupConnection = setupConnection

0 comments on commit 1e949c8

Please sign in to comment.