Skip to content

Commit

Permalink
feat: Tcp provider without reconnect on idle (#581)
Browse files Browse the repository at this point in the history
This commit reimplements tcp provider with reconnection logic
implemented with reconnect-core. This adds Fibonacci backoff with
maximum delay of 5 seconds and removes reconnection logic for
idle connections.

Previously tcp provider reconnected idle connections. This is not
needed, because as long as the tcp connection is alive we can assume
that the source is working. Repeated reconnections may be unwanted, as
when instruments are turned off and there is no data available.
OpenPlotter's misconfigured kplex left connections dangling in
CLOSE_WAIT and eventually caused kplex to run out of filedescriptors,
causing error logs to fill storage.
  • Loading branch information
tkurki authored Aug 9, 2018
1 parent bfa5dfc commit 09bef66
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 125 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
"pem": "^1.11.0",
"please-upgrade-node": "^3.0.1",
"primus": "^7.0.0",
"reconnect-core": "^1.3.0",
"serialport": "^6.0.0",
"split": "^1.0.0",
"stat-mode": "^0.2.2",
Expand Down
162 changes: 37 additions & 125 deletions providers/tcp.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2015 Fabian Tollenaar <[email protected]>
* Copyright 2014-2018 Fabian Tollenaar <[email protected]>,Teppo Kurki <[email protected]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,7 +15,7 @@
*/

/* Usage: This is TCP client provider that can connect to a tcp server and pass data from there to the provider pipeline.
* It takes the options "host" and "port" and optionally "reconnect" (default true) and "maxRetries" (default 10)
* It takes the options "host" and "port".
* Example:
{
Expand All @@ -28,141 +28,53 @@
*/

var net = require('net'),
Transform = require('stream').Transform,
debug = require('debug')('signalk-provider-tcp')
const net = require('net')
const Transform = require('stream').Transform

function TcpStream (options) {
if (!(this instanceof TcpStream)) {
return new TcpStream(options)
}
const debug = require('debug')('signalk-provider-tcp')
const debugData = require('debug')('signalk-provider-tcp.data')

function TcpStream (options) {
Transform.call(this, options)

this.options = options
this.reconnect = !(
typeof options.reconnect === 'boolean' && options.reconnect === false
)
this.socket = null
this.retries = 0
this.maxRetries =
typeof options.maxRetries === 'number' && options.maxRetries > 0
? options.maxRetries
: 10

this.__reset = null
this.__timeout = null
this.__last = -1

this.start(true)

this.on('error', function (err) {
debug('Stream: "error". Message: ' + err.message)
})
}

require('util').inherits(TcpStream, Transform)

TcpStream.prototype.handleTimeout = function () {
if (Date.now() - this.__last > 90000 && this.__reset === null) {
debug('Connection timed out. Resetting.')
this.start()
return
}

if (this.__timeout !== null) {
clearTimeout(this.__timeout)
}

this.__timeout = setTimeout(this.handleTimeout.bind(this), 120000)
}

TcpStream.prototype.start = function (force) {
if (this.socket !== null) {
this.socket.unpipe(this)
this.socket.removeAllListeners('error')
this.socket.removeAllListeners('close')
this.socket.removeAllListeners('end')
this.socket.destroy()
this.socket = null
}

if (force !== true && this.reconnect !== true) {
debug('Reconnect is turned off. Game over.', this.reconnect)
return
}

if (this.__timeout !== null) {
clearTimeout(this.__timeout)
}

this.socket = net.connect(this.options)
this.__timeout = setTimeout(this.handleTimeout.bind(this), 30000)

this.socket.on(
'close',
function () {
if (this.__reset === null) {
debug('Socket: "close". Re-starting')
this.start()
TcpStream.prototype.pipe = function (pipeTo) {
const re = require('reconnect-core')(function () {
return net.connect.apply(null, arguments)
})({ maxDelay: 5 * 1000 }, tcpStream => {
tcpStream.on('data', data => {
if (debugData.enabled) {
debugData(data.toString())
}
}.bind(this)
)

this.socket.on('connect', function () {
if (this.__reset !== null) {
clearTimeout(this.__reset)
}

debug('Socket: "connect". Connected!')
this.write(data)
})
})

this.socket.on(
'error',
function (err) {
debug('Socket: "error". Message: ' + err.message)
this.retries++

if (this.retries < this.maxRetries) {
debug(
'Socket: "error". Retrying... ' +
this.retries +
' / ' +
this.maxRetries
)
this.start()
} else {
debug('Socket: "error". Out of retries, retrying in 30 seconds.\n\n')
if (this.__reset === null) {
if (this.__timeout !== null) {
clearTimeout(this.__timeout)
}

this.__reset = setTimeout(
function () {
this.maxRetries = 10
this.retries = 0
this.__reset = null
this.start()
}.bind(this),
30000
)
}
}
}.bind(this)
)

this.socket.pipe(this)
}

TcpStream.prototype._transform = function (chunk, encoding, done) {
this.__last = Date.now()
this.push(chunk)
done()
.on('connect', con => {
debug(`Connect ${this.options.host} ${this.options.port}`)
})
.on('reconnect', (n, delay) => {
debug(
`Reconnect ${this.options.host} ${
this.options.port
} retry ${n} delay ${delay}`
)
})
.on('disconnect', err => {
debug(`Disconnect ${this.options.host} ${this.options.port}`)
})
.on('error', err => {
console.error('TcpProvider:' + err.message)
})
.connect(this.options)

Transform.prototype.pipe.call(this, pipeTo)
}

TcpStream.prototype.end = function () {
console.error('tcp provider stream ended')
TcpStream.prototype._transform = function (data, encoding, callback) {
callback(null, data)
}

module.exports = TcpStream

0 comments on commit 09bef66

Please sign in to comment.