From c441230993307985f973c0f300a88f17a2118b82 Mon Sep 17 00:00:00 2001 From: tabcat Date: Fri, 11 Oct 2024 18:50:10 +0700 Subject: [PATCH 1/4] fix(@libp2p/tcp): race condition in onSocket This fixes a race condition in the onSocket listener method. onSocket gets a net.Socket parameter that needs to be closed later before the tcp server can close. The net.Socket is used to create a MultiaddrConnection but the connection is not tracked with this.connections until after it has been upgraded. If the tcp listener.close is called while a the MultiaddrConnection is waiting to be upgraded, then the MultiaddrConnection socket cannot be closed as it does not exist in this.connections. Instead of adding the MultiaddrConnection to this.connections on creation I decided to create a separate property named this.maConnections. I decided to track the non-upgraded connections separately because I was unsure how this.connections must be used with other things like metrics. --- packages/transport-tcp/src/listener.ts | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/packages/transport-tcp/src/listener.ts b/packages/transport-tcp/src/listener.ts index b83b35d843..8df45f5138 100644 --- a/packages/transport-tcp/src/listener.ts +++ b/packages/transport-tcp/src/listener.ts @@ -69,6 +69,7 @@ export class TCPListener extends TypedEventEmitter implements Li private readonly server: net.Server /** Keep track of open connections to destroy in case of timeout */ private readonly connections = new Set() + private readonly maConnections = new Set() private status: Status = { code: TCPListenerStatusCode.INACTIVE } private metrics?: TCPListenerMetrics private addr: string @@ -195,11 +196,13 @@ export class TCPListener extends TypedEventEmitter implements Li } this.log('new inbound connection %s', maConn.remoteAddr) + this.maConnections.add(maConn) this.context.upgrader.upgradeInbound(maConn) .then((conn) => { this.log('inbound connection upgraded %s', maConn.remoteAddr) this.connections.add(maConn) + this.maConnections.delete(maConn) socket.once('close', () => { this.connections.delete(maConn) @@ -307,6 +310,11 @@ export class TCPListener extends TypedEventEmitter implements Li conn.abort(err) }) + // cleanup connections that have not been upgraded + this.maConnections.forEach(conn => { + conn.abort(err) + }) + // shut down the server socket, permanently await this.pause(true) } From f3cf0196df603794d51bd0bda8ec80560b0f0fa0 Mon Sep 17 00:00:00 2001 From: tabcat Date: Fri, 11 Oct 2024 23:49:56 +0700 Subject: [PATCH 2/4] remove maConn from this.maConnections on upgrade fail --- packages/transport-tcp/src/listener.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/transport-tcp/src/listener.ts b/packages/transport-tcp/src/listener.ts index 8df45f5138..5a168ec540 100644 --- a/packages/transport-tcp/src/listener.ts +++ b/packages/transport-tcp/src/listener.ts @@ -242,6 +242,7 @@ export class TCPListener extends TypedEventEmitter implements Li .catch(async err => { this.log.error('inbound connection upgrade failed', err) this.metrics?.errors.increment({ [`${this.addr} inbound_upgrade`]: true }) + this.maConnections.delete(maConn) maConn.abort(err) }) } From 00f9233664ea6d0cb5baef5b6e89c6a7ed613aa0 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Wed, 23 Oct 2024 10:49:00 +0100 Subject: [PATCH 3/4] chore: add tests, abort upgrade on shutdown --- packages/transport-tcp/package.json | 1 + packages/transport-tcp/src/listener.ts | 79 ++++++++-------- .../test/connection-limits.spec.ts | 59 ++++++------ .../transport-tcp/test/listen-dial.spec.ts | 89 +++++++++++++++++++ packages/transport-tcp/test/utils.ts | 12 +++ 5 files changed, 170 insertions(+), 70 deletions(-) create mode 100644 packages/transport-tcp/test/utils.ts diff --git a/packages/transport-tcp/package.json b/packages/transport-tcp/package.json index 6efe16bb30..aed2b8cca9 100644 --- a/packages/transport-tcp/package.json +++ b/packages/transport-tcp/package.json @@ -66,6 +66,7 @@ "@multiformats/multiaddr": "^12.2.3", "@types/sinon": "^17.0.3", "p-defer": "^4.0.1", + "p-event": "^6.0.1", "progress-events": "^1.0.0", "race-event": "^1.3.0", "stream-to-it": "^1.0.1" diff --git a/packages/transport-tcp/src/listener.ts b/packages/transport-tcp/src/listener.ts index 5a168ec540..f9b2c9da92 100644 --- a/packages/transport-tcp/src/listener.ts +++ b/packages/transport-tcp/src/listener.ts @@ -1,5 +1,6 @@ import net from 'net' -import { AbortError, AlreadyStartedError, InvalidParametersError, NotStartedError, TypedEventEmitter } from '@libp2p/interface' +import { AlreadyStartedError, InvalidParametersError, NotStartedError, TypedEventEmitter, setMaxListeners } from '@libp2p/interface' +import { pEvent } from 'p-event' import { CODE_P2P } from './constants.js' import { toMultiaddrConnection } from './socket-to-conn.js' import { @@ -67,13 +68,13 @@ type Status = { code: TCPListenerStatusCode.INACTIVE } | { export class TCPListener extends TypedEventEmitter implements Listener { private readonly server: net.Server - /** Keep track of open connections to destroy in case of timeout */ - private readonly connections = new Set() - private readonly maConnections = new Set() + /** Keep track of open sockets to destroy in case of timeout */ + private readonly sockets = new Set() private status: Status = { code: TCPListenerStatusCode.INACTIVE } private metrics?: TCPListenerMetrics private addr: string private readonly log: Logger + private readonly shutdownController: AbortController constructor (private readonly context: Context) { super() @@ -81,6 +82,9 @@ export class TCPListener extends TypedEventEmitter implements Li context.keepAlive = context.keepAlive ?? true context.noDelay = context.noDelay ?? true + this.shutdownController = new AbortController() + setMaxListeners(Infinity, this.shutdownController.signal) + this.log = context.logger.forComponent('libp2p:tcp:listener') this.addr = 'unknown' this.server = net.createServer(context, this.onSocket.bind(this)) @@ -120,7 +124,7 @@ export class TCPListener extends TypedEventEmitter implements Li help: 'Current active connections in TCP listener', calculate: () => { return { - [this.addr]: this.connections.size + [this.addr]: this.sockets.size } } }) @@ -196,20 +200,20 @@ export class TCPListener extends TypedEventEmitter implements Li } this.log('new inbound connection %s', maConn.remoteAddr) - this.maConnections.add(maConn) + this.sockets.add(socket) - this.context.upgrader.upgradeInbound(maConn) + this.context.upgrader.upgradeInbound(maConn, { + signal: this.shutdownController.signal + }) .then((conn) => { this.log('inbound connection upgraded %s', maConn.remoteAddr) - this.connections.add(maConn) - this.maConnections.delete(maConn) socket.once('close', () => { - this.connections.delete(maConn) + this.sockets.delete(socket) if ( this.context.closeServerOnMaxConnections != null && - this.connections.size < this.context.closeServerOnMaxConnections.listenBelow + this.sockets.size < this.context.closeServerOnMaxConnections.listenBelow ) { // The most likely case of error is if the port taken by this // application is bound by another process during the time the @@ -230,11 +234,9 @@ export class TCPListener extends TypedEventEmitter implements Li if ( this.context.closeServerOnMaxConnections != null && - this.connections.size >= this.context.closeServerOnMaxConnections.closeAbove + this.sockets.size >= this.context.closeServerOnMaxConnections.closeAbove ) { - this.pause(false).catch(e => { - this.log.error('error attempting to close server once connection count over limit', e) - }) + this.pause() } this.safeDispatchEvent('connection', { detail: conn }) @@ -242,7 +244,7 @@ export class TCPListener extends TypedEventEmitter implements Li .catch(async err => { this.log.error('inbound connection upgrade failed', err) this.metrics?.errors.increment({ [`${this.addr} inbound_upgrade`]: true }) - this.maConnections.delete(maConn) + this.sockets.delete(socket) maConn.abort(err) }) } @@ -304,20 +306,28 @@ export class TCPListener extends TypedEventEmitter implements Li } async close (): Promise { - const err = new AbortError('Listener is closing') + const events: Array> = [] - // synchronously close each connection - this.connections.forEach(conn => { - conn.abort(err) - }) + if (this.server.listening) { + events.push(pEvent(this.server, 'close')) + } + + // shut down the server socket, permanently + this.pause(true) + + // stop any in-progress connection upgrades + this.shutdownController.abort() - // cleanup connections that have not been upgraded - this.maConnections.forEach(conn => { - conn.abort(err) + // synchronously close any open connections - should be done after closing + // the server socket in case new sockets are opened during the shutdown + this.sockets.forEach(socket => { + if (socket.readable) { + events.push(pEvent(socket, 'close')) + socket.destroy() + } }) - // shut down the server socket, permanently - await this.pause(true) + await Promise.all(events) } /** @@ -341,7 +351,7 @@ export class TCPListener extends TypedEventEmitter implements Li this.log('listening on %s', this.server.address()) } - private async pause (permanent: boolean): Promise { + private pause (permanent: boolean = false): void { if (!this.server.listening && this.status.code === TCPListenerStatusCode.PAUSED && permanent) { this.status = { code: TCPListenerStatusCode.INACTIVE } return @@ -370,15 +380,10 @@ export class TCPListener extends TypedEventEmitter implements Li // during the time the server is closing this.status = permanent ? { code: TCPListenerStatusCode.INACTIVE } : { ...this.status, code: TCPListenerStatusCode.PAUSED } - await new Promise((resolve, reject) => { - this.server.close(err => { - if (err != null) { - reject(err) - return - } - - resolve() - }) - }) + // stop accepting incoming connections - existing connections are maintained + // - any callback passed here would be invoked after existing connections + // close, we want to maintain them so no callback is passed otherwise his + // method will never return + this.server.close() } } diff --git a/packages/transport-tcp/test/connection-limits.spec.ts b/packages/transport-tcp/test/connection-limits.spec.ts index a889c7cbe4..b43163dade 100644 --- a/packages/transport-tcp/test/connection-limits.spec.ts +++ b/packages/transport-tcp/test/connection-limits.spec.ts @@ -1,6 +1,5 @@ import net from 'node:net' import { promisify } from 'util' -import { TypedEventEmitter } from '@libp2p/interface' import { mockUpgrader } from '@libp2p/interface-compliance-tests/mocks' import { defaultLogger } from '@libp2p/logger' import { multiaddr } from '@multiformats/multiaddr' @@ -64,21 +63,25 @@ async function assertServerConnections (listener: TCPListener, connections: numb // Expect server connections but allow time for sockets to connect or disconnect for (let i = 0; i < 100; i++) { // eslint-disable-next-line @typescript-eslint/dot-notation - if (listener['connections'].size === connections) { + if (listener['sockets'].size === connections) { return } else { await promisify(setTimeout)(10) } } // eslint-disable-next-line @typescript-eslint/dot-notation - expect(listener['connections'].size).equals(connections, 'invalid amount of server connections') + expect(listener['sockets'].size).equals(connections, 'invalid amount of server connections') } describe('closeAbove/listenBelow', () => { - const afterEachCallbacks: Array<() => Promise | any> = [] + let afterEachCallbacks: Array<() => Promise | any> = [] + + beforeEach(() => { + afterEachCallbacks = [] + }) + afterEach(async () => { await Promise.all(afterEachCallbacks.map(fn => fn())) - afterEachCallbacks.length = 0 }) it('reject dial of connection above closeAbove', async () => { @@ -86,16 +89,14 @@ describe('closeAbove/listenBelow', () => { const closeAbove = 3 const port = 9900 - const trasnport = tcp({ closeServerOnMaxConnections: { listenBelow, closeAbove } })({ + const transport = tcp({ closeServerOnMaxConnections: { listenBelow, closeAbove } })({ logger: defaultLogger() }) - const upgrader = mockUpgrader({ - events: new TypedEventEmitter() - }) - const listener = trasnport.createListener({ upgrader }) as TCPListener - // eslint-disable-next-line @typescript-eslint/promise-function-async - afterEachCallbacks.push(() => listener.close()) + const upgrader = mockUpgrader() + const listener = transport.createListener({ upgrader }) as TCPListener + afterEachCallbacks.push(async () => listener.close()) + await listener.listen(multiaddr(`/ip4/127.0.0.1/tcp/${port}`)) const { assertConnectedSocket, assertRefusedSocket } = buildSocketAssertions(port, afterEachCallbacks) @@ -115,16 +116,14 @@ describe('closeAbove/listenBelow', () => { const closeAbove = 3 const port = 9900 - const trasnport = tcp({ closeServerOnMaxConnections: { listenBelow, closeAbove } })({ + const transport = tcp({ closeServerOnMaxConnections: { listenBelow, closeAbove } })({ logger: defaultLogger() }) - const upgrader = mockUpgrader({ - events: new TypedEventEmitter() - }) - const listener = trasnport.createListener({ upgrader }) as TCPListener - // eslint-disable-next-line @typescript-eslint/promise-function-async - afterEachCallbacks.push(() => listener.close()) + const upgrader = mockUpgrader() + const listener = transport.createListener({ upgrader }) as TCPListener + afterEachCallbacks.push(async () => listener.close()) + await listener.listen(multiaddr(`/ip4/127.0.0.1/tcp/${port}`)) const { assertConnectedSocket } = buildSocketAssertions(port, afterEachCallbacks) @@ -152,16 +151,13 @@ describe('closeAbove/listenBelow', () => { const closeAbove = 3 const port = 9900 - const trasnport = tcp({ closeServerOnMaxConnections: { listenBelow, closeAbove } })({ + const transport = tcp({ closeServerOnMaxConnections: { listenBelow, closeAbove } })({ logger: defaultLogger() }) - const upgrader = mockUpgrader({ - events: new TypedEventEmitter() - }) - const listener = trasnport.createListener({ upgrader }) as TCPListener - // eslint-disable-next-line @typescript-eslint/promise-function-async - afterEachCallbacks.push(() => listener.close()) + const upgrader = mockUpgrader() + const listener = transport.createListener({ upgrader }) as TCPListener + afterEachCallbacks.push(async () => listener.close()) let closeEventCallCount = 0 listener.addEventListener('close', () => { @@ -185,16 +181,13 @@ describe('closeAbove/listenBelow', () => { const closeAbove = 3 const port = 9900 - const trasnport = tcp({ closeServerOnMaxConnections: { listenBelow, closeAbove } })({ + const transport = tcp({ closeServerOnMaxConnections: { listenBelow, closeAbove } })({ logger: defaultLogger() }) - const upgrader = mockUpgrader({ - events: new TypedEventEmitter() - }) - const listener = trasnport.createListener({ upgrader }) as TCPListener - // eslint-disable-next-line @typescript-eslint/promise-function-async - afterEachCallbacks.push(() => listener.close()) + const upgrader = mockUpgrader() + const listener = transport.createListener({ upgrader }) as TCPListener + afterEachCallbacks.push(async () => listener.close()) let listeningEventCallCount = 0 listener.addEventListener('listening', () => { diff --git a/packages/transport-tcp/test/listen-dial.spec.ts b/packages/transport-tcp/test/listen-dial.spec.ts index fe90b4eecd..778de8427f 100644 --- a/packages/transport-tcp/test/listen-dial.spec.ts +++ b/packages/transport-tcp/test/listen-dial.spec.ts @@ -10,6 +10,7 @@ import { pipe } from 'it-pipe' import pDefer from 'p-defer' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import { tcp } from '../src/index.js' +import { delay } from './utils.js' import type { MultiaddrConnection, Transport, Upgrader } from '@libp2p/interface' const isCI = process.env.CI @@ -394,4 +395,92 @@ describe('dial', () => { await listener.close() }) + + it('should close before connection upgrade is completed', async () => { + // create a Promise that resolves when the upgrade starts + const upgradeStarted = pDefer() + + // create a listener with the handler + const listener = transport.createListener({ + upgrader: { + async upgradeInbound () { + upgradeStarted.resolve() + + // make the upgrade stall - delay for longer than the test timeout + await delay(120000) + + throw new Error('Upgrade failed') + }, + async upgradeOutbound () { + throw new Error('Not implemented') + } + } + }) + + // listen on a multiaddr + await listener.listen(multiaddr('/ip4/127.0.0.1/tcp/0')) + + const localAddrs = listener.getAddrs() + expect(localAddrs.length).to.equal(1) + + // dial the listener address + transport.dial(localAddrs[0], { + upgrader + }).catch(() => {}) + + // wait for the upgrade to start + await upgradeStarted.promise + + // close the listener, process should exit normally + await listener.close() + }) + + it('should abort inbound upgrade on close', async () => { + // create a Promise that resolves when the upgrade starts + const upgradeStarted = pDefer() + const abortedUpgrade = pDefer() + + // create a listener with the handler + const listener = transport.createListener({ + upgrader: { + async upgradeInbound (maConn, opts) { + upgradeStarted.resolve() + + opts?.signal?.addEventListener('abort', () => { + abortedUpgrade.resolve() + }, { + once: true + }) + + // make the upgrade stall - delay for longer than the test timeout + await delay(120000) + + throw new Error('Upgrade failed') + }, + async upgradeOutbound () { + throw new Error('Not implemented') + } + } + }) + + // listen on a multiaddr + await listener.listen(multiaddr('/ip4/127.0.0.1/tcp/0')) + + const localAddrs = listener.getAddrs() + expect(localAddrs.length).to.equal(1) + + // dial the listener address + transport.dial(localAddrs[0], { + upgrader + }).catch(() => {}) + + // wait for the upgrade to start + await upgradeStarted.promise + + // close the listener + await listener.close() + + // should abort the upgrade + await abortedUpgrade.promise + }) }) diff --git a/packages/transport-tcp/test/utils.ts b/packages/transport-tcp/test/utils.ts new file mode 100644 index 0000000000..5f10b5f6ba --- /dev/null +++ b/packages/transport-tcp/test/utils.ts @@ -0,0 +1,12 @@ +/** + * A delayed promise that doesn't keep the node process running + */ +export async function delay (ms: number): Promise { + await new Promise(resolve => { + AbortSignal.timeout(ms).addEventListener('abort', () => { + resolve() + }, { + once: true + }) + }) +} From f023585686727399f888f537912890435a135a6c Mon Sep 17 00:00:00 2001 From: achingbrain Date: Wed, 23 Oct 2024 11:02:13 +0100 Subject: [PATCH 4/4] chore: simplify test --- packages/transport-tcp/test/listen-dial.spec.ts | 15 ++++----------- packages/transport-tcp/test/utils.ts | 12 ------------ 2 files changed, 4 insertions(+), 23 deletions(-) delete mode 100644 packages/transport-tcp/test/utils.ts diff --git a/packages/transport-tcp/test/listen-dial.spec.ts b/packages/transport-tcp/test/listen-dial.spec.ts index 778de8427f..81e47a58dc 100644 --- a/packages/transport-tcp/test/listen-dial.spec.ts +++ b/packages/transport-tcp/test/listen-dial.spec.ts @@ -10,7 +10,6 @@ import { pipe } from 'it-pipe' import pDefer from 'p-defer' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import { tcp } from '../src/index.js' -import { delay } from './utils.js' import type { MultiaddrConnection, Transport, Upgrader } from '@libp2p/interface' const isCI = process.env.CI @@ -406,13 +405,10 @@ describe('dial', () => { async upgradeInbound () { upgradeStarted.resolve() - // make the upgrade stall - delay for longer than the test timeout - await delay(120000) - - throw new Error('Upgrade failed') + return new Promise(() => {}) }, async upgradeOutbound () { - throw new Error('Not implemented') + return new Promise(() => {}) } } }) @@ -452,13 +448,10 @@ describe('dial', () => { once: true }) - // make the upgrade stall - delay for longer than the test timeout - await delay(120000) - - throw new Error('Upgrade failed') + return new Promise(() => {}) }, async upgradeOutbound () { - throw new Error('Not implemented') + return new Promise(() => {}) } } }) diff --git a/packages/transport-tcp/test/utils.ts b/packages/transport-tcp/test/utils.ts deleted file mode 100644 index 5f10b5f6ba..0000000000 --- a/packages/transport-tcp/test/utils.ts +++ /dev/null @@ -1,12 +0,0 @@ -/** - * A delayed promise that doesn't keep the node process running - */ -export async function delay (ms: number): Promise { - await new Promise(resolve => { - AbortSignal.timeout(ms).addEventListener('abort', () => { - resolve() - }, { - once: true - }) - }) -}