Skip to content

Commit

Permalink
fix(@libp2p/webtransport): be more thorough about closing sessions
Browse files Browse the repository at this point in the history
Refactors session closing to happen in one function and call that
function when the session has closed or failed to init.

Doesn't quite solve the "Too many pending WebTransport Sessions"
problem but does slow it down a little bit.

Refs: #1896
  • Loading branch information
achingbrain committed Aug 15, 2023
1 parent 8716555 commit bf95b8e
Show file tree
Hide file tree
Showing 2 changed files with 160 additions and 90 deletions.
169 changes: 123 additions & 46 deletions packages/transport-webtransport/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,16 @@ declare global {
var WebTransport: any
}

// https://www.w3.org/TR/webtransport/#web-transport-close-info
interface WebTransportCloseInfo {
closeCode: number
reason: string
}

interface WebTransportSessionCleanup {
(closeInfo?: WebTransportCloseInfo): void
}

const log = logger('libp2p:webtransport')

export interface WebTransportInit {
Expand All @@ -42,6 +52,8 @@ class WebTransportTransport implements Transport {
readonly [symbol] = true

async dial (ma: Multiaddr, options: DialOptions): Promise<Connection> {
options?.signal?.throwIfAborted()

log('dialing %s', ma)
const localPeer = this.components.peerId
if (localPeer === undefined) {
Expand All @@ -52,60 +64,122 @@ class WebTransportTransport implements Transport {

const { url, certhashes, remotePeer } = parseMultiaddr(ma)

if (certhashes.length === 0) {
throw new Error('Expected multiaddr to contain certhashes')
}

const wt = new WebTransport(`${url}/.well-known/libp2p-webtransport?type=noise`, {
serverCertificateHashes: certhashes.map(certhash => ({
algorithm: 'sha-256',
value: certhash.digest
}))
})
wt.closed.catch((error: Error) => {
log.error('WebTransport transport closed due to:', error)
})
await wt.ready

if (remotePeer == null) {
throw new Error('Need a target peerid')
}

if (!await this.authenticateWebTransport(wt, localPeer, remotePeer, certhashes)) {
throw new Error('Failed to authenticate webtransport')
if (certhashes.length === 0) {
throw new Error('Expected multiaddr to contain certhashes')
}

const maConn: MultiaddrConnection = {
close: async (options?: AbortOptions) => {
log('Closing webtransport')
await wt.close()
},
abort: (err: Error) => {
log('Aborting webtransport with err:', err)
wt.close()
},
remoteAddr: ma,
timeline: {
open: Date.now()
},
// This connection is never used directly since webtransport supports native streams.
...inertDuplex()
}
let abortListener: (() => void) | undefined
let maConn: MultiaddrConnection | undefined

wt.closed.catch((err: Error) => {
log.error('WebTransport connection closed:', err)
// This is how we specify the connection is closed and shouldn't be used.
maConn.timeline.close = Date.now()
})
let cleanUpWTSession: (closeInfo?: WebTransportCloseInfo) => void = () => {}

try {
let closed = false
const wt = new WebTransport(`${url}/.well-known/libp2p-webtransport?type=noise`, {
serverCertificateHashes: certhashes.map(certhash => ({
algorithm: 'sha-256',
value: certhash.digest
}))
})

cleanUpWTSession = (closeInfo?: WebTransportCloseInfo) => {
try {
if (maConn != null) {
if (maConn.timeline.close != null) {
// already closed session
return
}

// This is how we specify the connection is closed and shouldn't be used.
maConn.timeline.close = Date.now()
}

if (closed) {
// already closed session
return
}

wt.close(closeInfo)
} catch (err) {
log.error('error closing wt session', err)
} finally {
closed = true
}
}

// this promise resolves/throws when the session is closed or failed to init
wt.closed
.then(async () => {
await maConn?.close()
})
.catch((err: Error) => {
log.error('error on remote wt session close', err)
maConn?.abort(err)
})
.finally(() => {
// if we never got as far as creating the maConn, just clean up the session
if (maConn == null) {
cleanUpWTSession()
}
})

// if the dial is aborted before we are ready, close the WebTransport session
abortListener = () => {
if (abortListener != null) {
options.signal?.removeEventListener('abort', abortListener)
}

cleanUpWTSession()
}
options.signal?.addEventListener('abort', abortListener)

await wt.ready

if (!await this.authenticateWebTransport(wt, localPeer, remotePeer, certhashes)) {
throw new Error('Failed to authenticate webtransport')
}

maConn = {
close: async (options?: AbortOptions) => {
log('Closing webtransport')
cleanUpWTSession()
},
abort: (err: Error) => {
log('aborting webtransport due to passed err', err)
cleanUpWTSession({
closeCode: 0,
reason: err.message
})
},
remoteAddr: ma,
timeline: {
open: Date.now()
},
// This connection is never used directly since webtransport supports native streams.
...inertDuplex()
}

options?.signal?.throwIfAborted()
} catch (e) {
wt.close()
throw e
}

return options.upgrader.upgradeOutbound(maConn, { skipEncryption: true, muxerFactory: this.webtransportMuxer(wt), skipProtection: true })
return await options.upgrader.upgradeOutbound(maConn, { skipEncryption: true, muxerFactory: this.webtransportMuxer(wt, cleanUpWTSession), skipProtection: true })
} catch (err: any) {
log.error('caught wt session err', err)

cleanUpWTSession({
closeCode: 0,
reason: err.message
})

throw err
} finally {
if (abortListener != null) {
options.signal?.removeEventListener('abort', abortListener)
}
}
}

async authenticateWebTransport (wt: InstanceType<typeof WebTransport>, localPeer: PeerId, remotePeer: PeerId, certhashes: Array<MultihashDigest<number>>): Promise<boolean> {
Expand Down Expand Up @@ -156,7 +230,7 @@ class WebTransportTransport implements Transport {
return true
}

webtransportMuxer (wt: InstanceType<typeof WebTransport>): StreamMuxerFactory {
webtransportMuxer (wt: InstanceType<typeof WebTransport>, cleanUpWTSession: WebTransportSessionCleanup): StreamMuxerFactory {
let streamIDCounter = 0
const config = this.config
return {
Expand Down Expand Up @@ -217,11 +291,14 @@ class WebTransportTransport implements Transport {
*/
close: async (options?: AbortOptions) => {
log('Closing webtransport muxer')
await wt.close()
cleanUpWTSession()
},
abort: (err: Error) => {
log('Aborting webtransport muxer with err:', err)
wt.close()
cleanUpWTSession({
closeCode: 0,
reason: err.message
})
},
// This stream muxer is webtransport native. Therefore it doesn't plug in with any other duplex.
...inertDuplex()
Expand Down
81 changes: 37 additions & 44 deletions packages/transport-webtransport/test/browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import { noise } from '@chainsafe/libp2p-noise'
import { multiaddr } from '@multiformats/multiaddr'
import { expect } from 'aegir/chai'
import { createLibp2p } from 'libp2p'
import { createLibp2p, type Libp2p } from 'libp2p'
import { webTransport } from '../src/index.js'

declare global {
Expand All @@ -14,22 +14,34 @@ declare global {
}

describe('libp2p-webtransport', () => {
it('webtransport connects to go-libp2p', async () => {
if (process.env.serverAddr == null) {
throw new Error('serverAddr not found')
}
let node: Libp2p

const maStr: string = process.env.serverAddr
const ma = multiaddr(maStr)
const node = await createLibp2p({
beforeEach(async () => {
node = await createLibp2p({
transports: [webTransport()],
connectionEncryption: [noise()],
connectionGater: {
denyDialMultiaddr: async () => false
}
})
})

await node.start()
afterEach(async () => {
if (node != null) {
await node.stop()

const conns = node.getConnections()
expect(conns.length).to.equal(0)
}
})

it('webtransport connects to go-libp2p', async () => {
if (process.env.serverAddr == null) {
throw new Error('serverAddr not found')
}

const maStr: string = process.env.serverAddr
const ma = multiaddr(maStr)

// Ping many times
for (let index = 0; index < 100; index++) {
Expand Down Expand Up @@ -70,10 +82,6 @@ describe('libp2p-webtransport', () => {

expect(res).to.be.greaterThan(-1)
}

await node.stop()
const conns = node.getConnections()
expect(conns.length).to.equal(0)
})

it('fails to connect without certhashes', async () => {
Expand All @@ -86,19 +94,25 @@ describe('libp2p-webtransport', () => {
const maStrP2p = maStr.split('/p2p/')[1]
const ma = multiaddr(maStrNoCerthash + '/p2p/' + maStrP2p)

const node = await createLibp2p({
transports: [webTransport()],
connectionEncryption: [noise()],
connectionGater: {
denyDialMultiaddr: async () => false
}
})
await node.start()

const err = await expect(node.dial(ma)).to.eventually.be.rejected()
expect(err.toString()).to.contain('Expected multiaddr to contain certhashes')
})

await node.stop()
it('fails to connect due to an aborted signal', async () => {
if (process.env.serverAddr == null) {
throw new Error('serverAddr not found')
}

const maStr: string = process.env.serverAddr
const ma = multiaddr(maStr)

const controller = new AbortController()
controller.abort()

const err = await expect(node.dial(ma, {
signal: controller.signal
})).to.eventually.be.rejected()
expect(err.toString()).to.contain('aborted')
})

it('connects to ipv6 addresses', async () => {
Expand All @@ -107,21 +121,10 @@ describe('libp2p-webtransport', () => {
}

const ma = multiaddr(process.env.serverAddr6)
const node = await createLibp2p({
transports: [webTransport()],
connectionEncryption: [noise()],
connectionGater: {
denyDialMultiaddr: async () => false
}
})

await node.start()

// the address is unreachable but we can parse it correctly
const stream = await node.dialProtocol(ma, '/ipfs/ping/1.0.0')
await stream.close()

await node.stop()
})

it('closes writes of streams after they have sunk a source', async () => {
Expand All @@ -132,13 +135,6 @@ describe('libp2p-webtransport', () => {

const maStr: string = process.env.serverAddr
const ma = multiaddr(maStr)
const node = await createLibp2p({
transports: [webTransport()],
connectionEncryption: [noise()],
connectionGater: {
denyDialMultiaddr: async () => false
}
})

async function * gen (): AsyncGenerator<Uint8Array, void, unknown> {
yield new Uint8Array([0])
Expand All @@ -148,7 +144,6 @@ describe('libp2p-webtransport', () => {
yield new Uint8Array([12, 13, 14, 15])
}

await node.start()
const stream = await node.dialProtocol(ma, 'echo')

await stream.sink(gen())
Expand All @@ -165,7 +160,5 @@ describe('libp2p-webtransport', () => {
await stream.closeRead()

expect(stream.timeline.close).to.be.greaterThan(0)

await node.stop()
})
})

0 comments on commit bf95b8e

Please sign in to comment.