Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: ensure all listeners are properly closed on tcp shutdown #2058

Merged
merged 14 commits into from
Oct 1, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions packages/libp2p/src/transport-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -262,12 +262,22 @@ export class DefaultTransportManager implements TransportManager, Startable {
* If a transport has any running listeners, they will be closed.
*/
async remove (key: string): Promise<void> {
log('removing %s', key)
const listeners = this.listeners.get(key) ?? []
log('removing transport %s', key)
maschad marked this conversation as resolved.
Show resolved Hide resolved

// Close any running listeners
for (const listener of this.listeners.get(key) ?? []) {
await listener.close()
const tasks = []
log('closing listeners for %s', key)
maschad marked this conversation as resolved.
Show resolved Hide resolved
while (listeners.length > 0) {
const listener = listeners.pop()

if (listener == null) {
continue
}

tasks.push(listener.close())
}
await Promise.all(tasks)

this.transports.delete(key)
this.listeners.delete(key)
Expand Down
114 changes: 76 additions & 38 deletions packages/transport-tcp/src/listener.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import net from 'net'
import { CodeError } from '@libp2p/interface/errors'
import { EventEmitter, CustomEvent } from '@libp2p/interface/events'
import { logger } from '@libp2p/logger'
import { CODE_P2P } from './constants.js'
Expand Down Expand Up @@ -46,17 +47,25 @@ interface Context extends TCPCreateListenerOptions {
closeServerOnMaxConnections?: CloseServerOnMaxConnectionsOpts
}

const SERVER_STATUS_UP = 1
const SERVER_STATUS_DOWN = 0

export interface TCPListenerMetrics {
status: MetricGroup
errors: CounterGroup
events: CounterGroup
}

type Status = { started: false } | {
started: true
enum TCPListenerStatusCode {
/**
* When server object is initialized but we don't know the listening address yet or
* the server object is stopped manually, can be resumed only by calling listen()
**/
INACTIVE = 0,
ACTIVE = 1,
/* During the connection limits */
PAUSED = 2,
}

type Status = { code: TCPListenerStatusCode.INACTIVE } | {
code: Exclude<TCPListenerStatusCode, TCPListenerStatusCode.INACTIVE>
listeningAddr: Multiaddr
peerId: string | null
netConfig: NetConfig
Expand All @@ -66,7 +75,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
private readonly server: net.Server
/** Keep track of open connections to destroy in case of timeout */
private readonly connections = new Set<MultiaddrConnection>()
private status: Status = { started: false }
private status: Status = { code: TCPListenerStatusCode.INACTIVE }
private metrics?: TCPListenerMetrics
private addr: string

Expand All @@ -88,7 +97,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
if (context.closeServerOnMaxConnections != null) {
// Sanity check options
if (context.closeServerOnMaxConnections.closeAbove < context.closeServerOnMaxConnections.listenBelow) {
throw Error('closeAbove must be >= listenBelow')
throw new CodeError('closeAbove must be >= listenBelow', 'ERROR_CONNECTION_LIMITS')
}
}

Expand Down Expand Up @@ -133,7 +142,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
}

this.metrics?.status.update({
[this.addr]: SERVER_STATUS_UP
[this.addr]: TCPListenerStatusCode.ACTIVE
})
}

Expand All @@ -145,13 +154,22 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
})
.on('close', () => {
this.metrics?.status.update({
[this.addr]: SERVER_STATUS_DOWN
[this.addr]: this.status.code
})
this.dispatchEvent(new CustomEvent('close'))

// If this event is emitted, the transport manager will remove the listener from it's cache
// in the meanwhile if the connections are dropped then listener will start listening again
// and the transport manager will not be able to close the server
if (this.status.code !== TCPListenerStatusCode.PAUSED) {
this.dispatchEvent(new CustomEvent('close'))
}
})
}

private onSocket (socket: net.Socket): void {
if (this.status.code !== TCPListenerStatusCode.ACTIVE) {
throw new CodeError('Server is is not listening yet', 'ERR_SERVER_NOT_RUNNING')
}
// Avoid uncaught errors caused by unstable connections
socket.on('error', err => {
log('socket error', err)
Expand All @@ -161,7 +179,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
let maConn: MultiaddrConnection
try {
maConn = toMultiaddrConnection(socket, {
listeningAddr: this.status.started ? this.status.listeningAddr : undefined,
listeningAddr: this.status.listeningAddr,
socketInactivityTimeout: this.context.socketInactivityTimeout,
socketCloseTimeout: this.context.socketCloseTimeout,
metrics: this.metrics?.events,
Expand Down Expand Up @@ -189,9 +207,9 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
) {
// The most likely case of error is if the port taken by this application is binded by
// another process during the time the server if closed. In that case there's not much
// we can do. netListen() will be called again every time a connection is dropped, which
// we can do. resume() will be called again every time a connection is dropped, which
// acts as an eventual retry mechanism. onListenError allows the consumer act on this.
this.netListen().catch(e => {
this.resume().catch(e => {
log.error('error attempting to listen server once connection count under limit', e)
this.context.closeServerOnMaxConnections?.onListenError?.(e as Error)
})
Expand All @@ -206,7 +224,9 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
this.context.closeServerOnMaxConnections != null &&
this.connections.size >= this.context.closeServerOnMaxConnections.closeAbove
) {
this.netClose()
this.pause(false).catch(e => {
log.error('error attempting to close server once connection count over limit', e)
})
}

this.dispatchEvent(new CustomEvent<Connection>('connection', { detail: conn }))
Expand All @@ -232,7 +252,7 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
}

getAddrs (): Multiaddr[] {
if (!this.status.started) {
if (this.status.code === TCPListenerStatusCode.INACTIVE) {
return []
}

Expand Down Expand Up @@ -264,35 +284,44 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
}

async listen (ma: Multiaddr): Promise<void> {
if (this.status.started) {
throw Error('server is already listening')
if (this.status.code === TCPListenerStatusCode.ACTIVE || this.status.code === TCPListenerStatusCode.PAUSED) {
throw new CodeError('server is already listening', 'ERR_SERVER_ALREADY_LISTENING')
}

const peerId = ma.getPeerId()
const listeningAddr = peerId == null ? ma.decapsulateCode(CODE_P2P) : ma
const { backlog } = this.context

this.status = {
started: true,
listeningAddr,
peerId,
netConfig: multiaddrToNetConfig(listeningAddr, { backlog })
}
try {
this.status = {
code: TCPListenerStatusCode.ACTIVE,
listeningAddr,
peerId,
netConfig: multiaddrToNetConfig(listeningAddr, { backlog })
}

await this.netListen()
await this.resume()
} catch (err) {
this.status = { code: TCPListenerStatusCode.INACTIVE }
throw err
}
}

async close (): Promise<void> {
await Promise.all(
Array.from(this.connections.values()).map(async maConn => { await attemptClose(maConn) })
)

// netClose already checks if server.listening
this.netClose()
// Close connections and server the same time to avoid any race condition
await Promise.all([
Promise.all(Array.from(this.connections.values()).map(async maConn => attemptClose(maConn))),
this.pause(true).catch(e => {
log.error('error attempting to close server once connection count over limit', e)
})
])
}

private async netListen (): Promise<void> {
if (!this.status.started || this.server.listening) {
/**
* Can resume a stopped or start an inert server
*/
private async resume (): Promise<void> {
if (this.server.listening || this.status.code === TCPListenerStatusCode.INACTIVE) {
return
}

Expand All @@ -304,11 +333,17 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
this.server.listen(netConfig, resolve)
})

this.status = { ...this.status, code: TCPListenerStatusCode.ACTIVE }
log('Listening on %s', this.server.address())
}

private netClose (): void {
if (!this.status.started || !this.server.listening) {
private async pause (permanent: boolean): Promise<void> {
if (!this.server.listening && this.status.code === TCPListenerStatusCode.PAUSED && permanent) {
this.status = { code: TCPListenerStatusCode.INACTIVE }
return
}

if (!this.server.listening || this.status.code !== TCPListenerStatusCode.ACTIVE) {
return
}

Expand All @@ -326,9 +361,12 @@ export class TCPListener extends EventEmitter<ListenerEvents> implements Listene
// Stops the server from accepting new connections and keeps existing connections.
// 'close' event is emitted only emitted when all connections are ended.
// The optional callback will be called once the 'close' event occurs.
//
// NOTE: Since we want to keep existing connections and have checked `!this.server.listening` it's not necessary
// to pass a callback to close.
this.server.close()

// We need to set this status before closing server, so other procedures are aware
// during the time the server is closing
this.status = permanent ? { code: TCPListenerStatusCode.INACTIVE } : { ...this.status, code: TCPListenerStatusCode.PAUSED }
await new Promise<void>((resolve, reject) => {
this.server.close(err => { (err != null) ? reject(err) : resolve() })
})
}
}
Loading