From bf95b8e74ed55308ccccd89ea6dd86d160d35033 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Tue, 15 Aug 2023 12:18:02 +0100 Subject: [PATCH] fix(@libp2p/webtransport): be more thorough about closing sessions Refactors session closing to happen in one function and call that function when the session has closed or failed to init. Doesn't quite solve the "Too many pending WebTransport Sessions" problem but does slow it down a little bit. Refs: #1896 --- packages/transport-webtransport/src/index.ts | 169 +++++++++++++----- .../transport-webtransport/test/browser.ts | 81 ++++----- 2 files changed, 160 insertions(+), 90 deletions(-) diff --git a/packages/transport-webtransport/src/index.ts b/packages/transport-webtransport/src/index.ts index 322e96ff3e..e7d37dbf43 100644 --- a/packages/transport-webtransport/src/index.ts +++ b/packages/transport-webtransport/src/index.ts @@ -16,6 +16,16 @@ declare global { var WebTransport: any } +// https://www.w3.org/TR/webtransport/#web-transport-close-info +interface WebTransportCloseInfo { + closeCode: number + reason: string +} + +interface WebTransportSessionCleanup { + (closeInfo?: WebTransportCloseInfo): void +} + const log = logger('libp2p:webtransport') export interface WebTransportInit { @@ -42,6 +52,8 @@ class WebTransportTransport implements Transport { readonly [symbol] = true async dial (ma: Multiaddr, options: DialOptions): Promise { + options?.signal?.throwIfAborted() + log('dialing %s', ma) const localPeer = this.components.peerId if (localPeer === undefined) { @@ -52,60 +64,122 @@ class WebTransportTransport implements Transport { const { url, certhashes, remotePeer } = parseMultiaddr(ma) - if (certhashes.length === 0) { - throw new Error('Expected multiaddr to contain certhashes') - } - - const wt = new WebTransport(`${url}/.well-known/libp2p-webtransport?type=noise`, { - serverCertificateHashes: certhashes.map(certhash => ({ - algorithm: 'sha-256', - value: certhash.digest - })) - }) - wt.closed.catch((error: Error) => { - log.error('WebTransport transport closed due to:', error) - }) - await wt.ready - if (remotePeer == null) { throw new Error('Need a target peerid') } - if (!await this.authenticateWebTransport(wt, localPeer, remotePeer, certhashes)) { - throw new Error('Failed to authenticate webtransport') + if (certhashes.length === 0) { + throw new Error('Expected multiaddr to contain certhashes') } - const maConn: MultiaddrConnection = { - close: async (options?: AbortOptions) => { - log('Closing webtransport') - await wt.close() - }, - abort: (err: Error) => { - log('Aborting webtransport with err:', err) - wt.close() - }, - remoteAddr: ma, - timeline: { - open: Date.now() - }, - // This connection is never used directly since webtransport supports native streams. - ...inertDuplex() - } + let abortListener: (() => void) | undefined + let maConn: MultiaddrConnection | undefined - wt.closed.catch((err: Error) => { - log.error('WebTransport connection closed:', err) - // This is how we specify the connection is closed and shouldn't be used. - maConn.timeline.close = Date.now() - }) + let cleanUpWTSession: (closeInfo?: WebTransportCloseInfo) => void = () => {} try { + let closed = false + const wt = new WebTransport(`${url}/.well-known/libp2p-webtransport?type=noise`, { + serverCertificateHashes: certhashes.map(certhash => ({ + algorithm: 'sha-256', + value: certhash.digest + })) + }) + + cleanUpWTSession = (closeInfo?: WebTransportCloseInfo) => { + try { + if (maConn != null) { + if (maConn.timeline.close != null) { + // already closed session + return + } + + // This is how we specify the connection is closed and shouldn't be used. + maConn.timeline.close = Date.now() + } + + if (closed) { + // already closed session + return + } + + wt.close(closeInfo) + } catch (err) { + log.error('error closing wt session', err) + } finally { + closed = true + } + } + + // this promise resolves/throws when the session is closed or failed to init + wt.closed + .then(async () => { + await maConn?.close() + }) + .catch((err: Error) => { + log.error('error on remote wt session close', err) + maConn?.abort(err) + }) + .finally(() => { + // if we never got as far as creating the maConn, just clean up the session + if (maConn == null) { + cleanUpWTSession() + } + }) + + // if the dial is aborted before we are ready, close the WebTransport session + abortListener = () => { + if (abortListener != null) { + options.signal?.removeEventListener('abort', abortListener) + } + + cleanUpWTSession() + } + options.signal?.addEventListener('abort', abortListener) + + await wt.ready + + if (!await this.authenticateWebTransport(wt, localPeer, remotePeer, certhashes)) { + throw new Error('Failed to authenticate webtransport') + } + + maConn = { + close: async (options?: AbortOptions) => { + log('Closing webtransport') + cleanUpWTSession() + }, + abort: (err: Error) => { + log('aborting webtransport due to passed err', err) + cleanUpWTSession({ + closeCode: 0, + reason: err.message + }) + }, + remoteAddr: ma, + timeline: { + open: Date.now() + }, + // This connection is never used directly since webtransport supports native streams. + ...inertDuplex() + } + options?.signal?.throwIfAborted() - } catch (e) { - wt.close() - throw e - } - return options.upgrader.upgradeOutbound(maConn, { skipEncryption: true, muxerFactory: this.webtransportMuxer(wt), skipProtection: true }) + return await options.upgrader.upgradeOutbound(maConn, { skipEncryption: true, muxerFactory: this.webtransportMuxer(wt, cleanUpWTSession), skipProtection: true }) + } catch (err: any) { + log.error('caught wt session err', err) + + cleanUpWTSession({ + closeCode: 0, + reason: err.message + }) + + throw err + } finally { + if (abortListener != null) { + options.signal?.removeEventListener('abort', abortListener) + } + } } async authenticateWebTransport (wt: InstanceType, localPeer: PeerId, remotePeer: PeerId, certhashes: Array>): Promise { @@ -156,7 +230,7 @@ class WebTransportTransport implements Transport { return true } - webtransportMuxer (wt: InstanceType): StreamMuxerFactory { + webtransportMuxer (wt: InstanceType, cleanUpWTSession: WebTransportSessionCleanup): StreamMuxerFactory { let streamIDCounter = 0 const config = this.config return { @@ -217,11 +291,14 @@ class WebTransportTransport implements Transport { */ close: async (options?: AbortOptions) => { log('Closing webtransport muxer') - await wt.close() + cleanUpWTSession() }, abort: (err: Error) => { log('Aborting webtransport muxer with err:', err) - wt.close() + cleanUpWTSession({ + closeCode: 0, + reason: err.message + }) }, // This stream muxer is webtransport native. Therefore it doesn't plug in with any other duplex. ...inertDuplex() diff --git a/packages/transport-webtransport/test/browser.ts b/packages/transport-webtransport/test/browser.ts index 89bfde1b7f..608b5e220e 100644 --- a/packages/transport-webtransport/test/browser.ts +++ b/packages/transport-webtransport/test/browser.ts @@ -4,7 +4,7 @@ import { noise } from '@chainsafe/libp2p-noise' import { multiaddr } from '@multiformats/multiaddr' import { expect } from 'aegir/chai' -import { createLibp2p } from 'libp2p' +import { createLibp2p, type Libp2p } from 'libp2p' import { webTransport } from '../src/index.js' declare global { @@ -14,22 +14,34 @@ declare global { } describe('libp2p-webtransport', () => { - it('webtransport connects to go-libp2p', async () => { - if (process.env.serverAddr == null) { - throw new Error('serverAddr not found') - } + let node: Libp2p - const maStr: string = process.env.serverAddr - const ma = multiaddr(maStr) - const node = await createLibp2p({ + beforeEach(async () => { + node = await createLibp2p({ transports: [webTransport()], connectionEncryption: [noise()], connectionGater: { denyDialMultiaddr: async () => false } }) + }) - await node.start() + afterEach(async () => { + if (node != null) { + await node.stop() + + const conns = node.getConnections() + expect(conns.length).to.equal(0) + } + }) + + it('webtransport connects to go-libp2p', async () => { + if (process.env.serverAddr == null) { + throw new Error('serverAddr not found') + } + + const maStr: string = process.env.serverAddr + const ma = multiaddr(maStr) // Ping many times for (let index = 0; index < 100; index++) { @@ -70,10 +82,6 @@ describe('libp2p-webtransport', () => { expect(res).to.be.greaterThan(-1) } - - await node.stop() - const conns = node.getConnections() - expect(conns.length).to.equal(0) }) it('fails to connect without certhashes', async () => { @@ -86,19 +94,25 @@ describe('libp2p-webtransport', () => { const maStrP2p = maStr.split('/p2p/')[1] const ma = multiaddr(maStrNoCerthash + '/p2p/' + maStrP2p) - const node = await createLibp2p({ - transports: [webTransport()], - connectionEncryption: [noise()], - connectionGater: { - denyDialMultiaddr: async () => false - } - }) - await node.start() - const err = await expect(node.dial(ma)).to.eventually.be.rejected() expect(err.toString()).to.contain('Expected multiaddr to contain certhashes') + }) - await node.stop() + it('fails to connect due to an aborted signal', async () => { + if (process.env.serverAddr == null) { + throw new Error('serverAddr not found') + } + + const maStr: string = process.env.serverAddr + const ma = multiaddr(maStr) + + const controller = new AbortController() + controller.abort() + + const err = await expect(node.dial(ma, { + signal: controller.signal + })).to.eventually.be.rejected() + expect(err.toString()).to.contain('aborted') }) it('connects to ipv6 addresses', async () => { @@ -107,21 +121,10 @@ describe('libp2p-webtransport', () => { } const ma = multiaddr(process.env.serverAddr6) - const node = await createLibp2p({ - transports: [webTransport()], - connectionEncryption: [noise()], - connectionGater: { - denyDialMultiaddr: async () => false - } - }) - - await node.start() // the address is unreachable but we can parse it correctly const stream = await node.dialProtocol(ma, '/ipfs/ping/1.0.0') await stream.close() - - await node.stop() }) it('closes writes of streams after they have sunk a source', async () => { @@ -132,13 +135,6 @@ describe('libp2p-webtransport', () => { const maStr: string = process.env.serverAddr const ma = multiaddr(maStr) - const node = await createLibp2p({ - transports: [webTransport()], - connectionEncryption: [noise()], - connectionGater: { - denyDialMultiaddr: async () => false - } - }) async function * gen (): AsyncGenerator { yield new Uint8Array([0]) @@ -148,7 +144,6 @@ describe('libp2p-webtransport', () => { yield new Uint8Array([12, 13, 14, 15]) } - await node.start() const stream = await node.dialProtocol(ma, 'echo') await stream.sink(gen()) @@ -165,7 +160,5 @@ describe('libp2p-webtransport', () => { await stream.closeRead() expect(stream.timeline.close).to.be.greaterThan(0) - - await node.stop() }) })