Skip to content

Commit

Permalink
fix: add dial progress events to transports (#2607)
Browse files Browse the repository at this point in the history
Adds dial progress events to transports.
  • Loading branch information
achingbrain authored Jul 3, 2024
1 parent af85a7c commit abb9f90
Show file tree
Hide file tree
Showing 20 changed files with 197 additions and 100 deletions.
12 changes: 7 additions & 5 deletions packages/interface-internal/src/connection-manager/index.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import type { TransportManagerDialProgressEvents } from '../transport-manager/index.js'
import type { AbortOptions, PendingDial, Connection, MultiaddrConnection, PeerId, IsDialableOptions, Address } from '@libp2p/interface'
import type { AbortOptions, PendingDial, Connection, MultiaddrConnection, PeerId, IsDialableOptions, Address, OutboundConnectionUpgradeEvents } from '@libp2p/interface'
import type { PeerMap } from '@libp2p/peer-collections'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { ProgressOptions, ProgressEvent } from 'progress-events'

export type OpenConnectionProgressEvents =
TransportManagerDialProgressEvents |
ProgressEvent<'dial:already-connected'> |
ProgressEvent<'dial:already-in-dial-queue'> |
ProgressEvent<'dial:add-to-dial-queue'> |
ProgressEvent<'dial:calculate-addresses', Address[]>
ProgressEvent<'dial-queue:already-connected'> |
ProgressEvent<'dial-queue:already-in-dial-queue'> |
ProgressEvent<'dial-queue:add-to-dial-queue'> |
ProgressEvent<'dial-queue:start-dial'> |
ProgressEvent<'dial-queue:calculated-addresses', Address[]> |
OutboundConnectionUpgradeEvents

export interface OpenConnectionOptions extends AbortOptions, ProgressOptions<OpenConnectionProgressEvents> {
/**
Expand Down
2 changes: 1 addition & 1 deletion packages/interface-internal/src/transport-manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import type { Multiaddr } from '@multiformats/multiaddr'
import type { ProgressOptions, ProgressEvent } from 'progress-events'

export type TransportManagerDialProgressEvents =
ProgressEvent<'dial:selected-transport', string>
ProgressEvent<'transport-manager:selected-transport', string>

export interface TransportManagerDialOptions extends AbortOptions, ProgressOptions<TransportManagerDialProgressEvents> {

Expand Down
23 changes: 16 additions & 7 deletions packages/interface/src/transport/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type { TypedEventTarget } from '../event-target.js'
import type { AbortOptions } from '../index.js'
import type { StreamMuxerFactory } from '../stream-muxer/index.js'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { ProgressOptions, ProgressEvent } from 'progress-events'

export interface ListenerEvents {
'connection': CustomEvent<Connection>
Expand Down Expand Up @@ -39,14 +40,14 @@ export interface CreateListenerOptions {
upgrader: Upgrader
}

export interface DialOptions extends AbortOptions {
export interface DialOptions<DialEvents extends ProgressEvent = ProgressEvent> extends AbortOptions, ProgressOptions<DialEvents> {
upgrader: Upgrader
}

/**
* A libp2p transport is understood as something that offers a dial and listen interface to establish connections.
* A libp2p transport offers dial and listen methods to establish connections.
*/
export interface Transport {
export interface Transport<DialEvents extends ProgressEvent = ProgressEvent> {
/**
* Used to identify the transport
*/
Expand All @@ -60,7 +61,7 @@ export interface Transport {
/**
* Dial a given multiaddr.
*/
dial(ma: Multiaddr, options: DialOptions): Promise<Connection>
dial(ma: Multiaddr, options: DialOptions<DialEvents>): Promise<Connection>

/**
* Create transport listeners.
Expand Down Expand Up @@ -99,7 +100,7 @@ export enum FaultTolerance {
NO_FATAL
}

export interface UpgraderOptions {
export interface UpgraderOptions<ConnectionUpgradeEvents extends ProgressEvent = ProgressEvent> extends ProgressOptions<ConnectionUpgradeEvents> {
skipEncryption?: boolean
skipProtection?: boolean
muxerFactory?: StreamMuxerFactory
Expand All @@ -111,14 +112,22 @@ export interface UpgraderOptions {
transient?: boolean
}

export type InboundConnectionUpgradeEvents =
ProgressEvent<'upgrader:encrypt-inbound-connection'> |
ProgressEvent<'upgrader:multiplex-inbound-connection'>

export type OutboundConnectionUpgradeEvents =
ProgressEvent<'upgrader:encrypt-outbound-connection'> |
ProgressEvent<'upgrader:multiplex-outbound-connection'>

export interface Upgrader {
/**
* Upgrades an outbound connection on `transport.dial`.
*/
upgradeOutbound(maConn: MultiaddrConnection, opts?: UpgraderOptions): Promise<Connection>
upgradeOutbound(maConn: MultiaddrConnection, opts?: UpgraderOptions<OutboundConnectionUpgradeEvents>): Promise<Connection>

/**
* Upgrades an inbound connection on transport listener.
*/
upgradeInbound(maConn: MultiaddrConnection, opts?: UpgraderOptions): Promise<Connection>
upgradeInbound(maConn: MultiaddrConnection, opts?: UpgraderOptions<InboundConnectionUpgradeEvents>): Promise<Connection>
}
12 changes: 7 additions & 5 deletions packages/libp2p/src/connection-manager/dial-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ export class DialQueue {

if (existingConnection != null) {
this.log('already connected to %a', existingConnection.remoteAddr)
options.onProgress?.(new CustomProgressEvent('dial:already-connected'))
options.onProgress?.(new CustomProgressEvent('dial-queue:already-connected'))
return existingConnection
}

Expand Down Expand Up @@ -187,7 +187,7 @@ export class DialQueue {
existingDial.options.multiaddrs.add(multiaddr.toString())
}

options.onProgress?.(new CustomProgressEvent('dial:already-in-dial-queue'))
options.onProgress?.(new CustomProgressEvent('dial-queue:already-in-dial-queue'))
return existingDial.join(options)
}

Expand All @@ -197,8 +197,9 @@ export class DialQueue {

this.log('creating dial target for %p', peerId, multiaddrs.map(ma => ma.toString()))

options.onProgress?.(new CustomProgressEvent('dial:add-to-dial-queue'))
options.onProgress?.(new CustomProgressEvent('dial-queue:add-to-dial-queue'))
return this.queue.add(async (options) => {
options?.onProgress?.(new CustomProgressEvent('dial-queue:start-dial'))
// create abort conditions - need to do this before `calculateMultiaddrs` as
// we may be about to resolve a dns addr which can time out
const signal = this.createDialAbortController(options?.signal)
Expand All @@ -212,7 +213,7 @@ export class DialQueue {
signal
})

options?.onProgress?.(new CustomProgressEvent<Address[]>('dial:calculate-addresses', addrsToDial))
options?.onProgress?.(new CustomProgressEvent<Address[]>('dial-queue:calculated-addresses', addrsToDial))

addrsToDial.map(({ multiaddr }) => multiaddr.toString()).forEach(addr => {
options?.multiaddrs.add(addr)
Expand Down Expand Up @@ -282,7 +283,8 @@ export class DialQueue {
peerId,
priority: options.priority ?? DEFAULT_DIAL_PRIORITY,
multiaddrs: new Set(multiaddrs.map(ma => ma.toString())),
signal: options.signal
signal: options.signal,
onProgress: options.onProgress
})
}

Expand Down
2 changes: 1 addition & 1 deletion packages/libp2p/src/connection-manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
if (existingConnection != null) {
this.log('had an existing non-transient connection to %p', peerId)

options.onProgress?.(new CustomProgressEvent('dial:already-connected'))
options.onProgress?.(new CustomProgressEvent('dial-queue:already-connected'))
return existingConnection
}
}
Expand Down
5 changes: 4 additions & 1 deletion packages/libp2p/src/transport-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,12 @@ export class DefaultTransportManager implements TransportManager, Startable {
throw new CodeError(`No transport available for address ${String(ma)}`, codes.ERR_TRANSPORT_UNAVAILABLE)
}

options?.onProgress?.(new CustomProgressEvent<string>('dial:selected-transport', transport[Symbol.toStringTag]))
options?.onProgress?.(new CustomProgressEvent<string>('transport-manager:selected-transport', transport[Symbol.toStringTag]))

try {
// @ts-expect-error the transport has a typed onProgress option but we
// can't predict what transport implementation we selected so all we can
// do is pass the onProgress handler in and hope for the best
return await transport.dial(ma, {
...options,
upgrader: this.components.upgrader
Expand Down
5 changes: 5 additions & 0 deletions packages/libp2p/src/upgrader.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { CodeError, ERR_TIMEOUT, setMaxListeners } from '@libp2p/interface'
import * as mss from '@libp2p/multistream-select'
import { peerIdFromString } from '@libp2p/peer-id'
import { CustomProgressEvent } from 'progress-events'
import { createConnection } from './connection/index.js'
import { INBOUND_UPGRADE_TIMEOUT } from './connection-manager/constants.js'
import { codes } from './errors.js'
Expand Down Expand Up @@ -185,6 +186,8 @@ export class DefaultUpgrader implements Upgrader {
// Encrypt the connection
encryptedConn = protectedConn
if (opts?.skipEncryption !== true) {
opts?.onProgress?.(new CustomProgressEvent('upgrader:encrypt-inbound-connection'));

({
conn: encryptedConn,
remotePeer,
Expand Down Expand Up @@ -214,6 +217,8 @@ export class DefaultUpgrader implements Upgrader {
if (opts?.muxerFactory != null) {
muxerFactory = opts.muxerFactory
} else if (this.muxers.size > 0) {
opts?.onProgress?.(new CustomProgressEvent('upgrader:multiplex-inbound-connection'))

// Multiplex the connection
const multiplexed = await this._multiplexInbound({
...protectedConn,
Expand Down
1 change: 1 addition & 0 deletions packages/transport-circuit-relay-v2/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
"it-stream-types": "^2.0.1",
"multiformats": "^13.1.0",
"p-defer": "^4.0.1",
"progress-events": "^1.0.0",
"protons-runtime": "^5.4.0",
"race-signal": "^1.0.2",
"uint8arraylist": "^2.4.8",
Expand Down
38 changes: 30 additions & 8 deletions packages/transport-circuit-relay-v2/src/transport/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@ import { streamToMaConnection } from '@libp2p/utils/stream-to-ma-conn'
import * as mafmt from '@multiformats/mafmt'
import { multiaddr } from '@multiformats/multiaddr'
import { pbStream } from 'it-protobuf-stream'
import { CustomProgressEvent } from 'progress-events'
import { CIRCUIT_PROTO_CODE, DEFAULT_DISCOVERY_FILTER_ERROR_RATE, DEFAULT_DISCOVERY_FILTER_SIZE, ERR_HOP_REQUEST_FAILED, ERR_RELAYED_DIAL, MAX_CONNECTIONS, RELAY_V2_HOP_CODEC, RELAY_V2_STOP_CODEC } from '../constants.js'
import { StopMessage, HopMessage, Status } from '../pb/index.js'
import { RelayDiscovery } from './discovery.js'
import { createListener } from './listener.js'
import { ReservationStore } from './reservation-store.js'
import type { CircuitRelayTransportComponents, CircuitRelayTransportInit } from './index.js'
import type { Transport, CreateListenerOptions, Listener, Upgrader, AbortOptions, ComponentLogger, Logger, Connection, Stream, ConnectionGater, PeerId, PeerStore } from '@libp2p/interface'
import type { AddressManager, ConnectionManager, IncomingStreamData, Registrar, TransportManager } from '@libp2p/interface-internal'
import type { Transport, CreateListenerOptions, Listener, Upgrader, ComponentLogger, Logger, Connection, Stream, ConnectionGater, PeerId, PeerStore, OutboundConnectionUpgradeEvents, DialOptions } from '@libp2p/interface'
import type { AddressManager, ConnectionManager, IncomingStreamData, OpenConnectionProgressEvents, Registrar, TransportManager } from '@libp2p/interface-internal'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { ProgressEvent, ProgressOptions } from 'progress-events'

const isValidStop = (request: StopMessage): request is Required<StopMessage> => {
if (request.peer == null) {
Expand All @@ -29,7 +31,7 @@ const isValidStop = (request: StopMessage): request is Required<StopMessage> =>
return true
}

interface ConnectOptions {
interface ConnectOptions extends ProgressOptions<CircuitRelayDialEvents> {
stream: Stream
connection: Connection
destinationPeer: PeerId
Expand All @@ -45,7 +47,16 @@ const defaults = {
stopTimeout: 30000
}

export class CircuitRelayTransport implements Transport {
export type CircuitRelayDialEvents =
OutboundConnectionUpgradeEvents |
OpenConnectionProgressEvents |
ProgressEvent<'circuit-relay:open-connection'> |
ProgressEvent<'circuit-relay:reuse-connection'> |
ProgressEvent<'circuit-relay:open-hop-stream'> |
ProgressEvent<'circuit-relay:write-connect-message'> |
ProgressEvent<'circuit-relay:read-connect-response'>

export class CircuitRelayTransport implements Transport<CircuitRelayDialEvents> {
private readonly discovery?: RelayDiscovery
private readonly registrar: Registrar
private readonly peerStore: PeerStore
Expand Down Expand Up @@ -156,7 +167,7 @@ export class CircuitRelayTransport implements Transport {
/**
* Dial a peer over a relay
*/
async dial (ma: Multiaddr, options: AbortOptions = {}): Promise<Connection> {
async dial (ma: Multiaddr, options: DialOptions<CircuitRelayDialEvents>): Promise<Connection> {
if (ma.protoCodes().filter(code => code === CIRCUIT_PROTO_CODE).length !== 1) {
const errMsg = 'Invalid circuit relay address'
this.log.error(errMsg, ma)
Expand Down Expand Up @@ -187,13 +198,18 @@ export class CircuitRelayTransport implements Transport {
await this.peerStore.merge(relayPeer, {
multiaddrs: [relayAddr]
})

options.onProgress?.(new CustomProgressEvent('circuit-relay:open-connection'))
relayConnection = await this.connectionManager.openConnection(relayPeer, options)
disconnectOnFailure = true
} else {
options.onProgress?.(new CustomProgressEvent('circuit-relay:reuse-connection'))
}

let stream: Stream | undefined

try {
options.onProgress?.(new CustomProgressEvent('circuit-relay:open-hop-stream'))
stream = await relayConnection.newStream(RELAY_V2_HOP_CODEC)

return await this.connectV2({
Expand All @@ -203,7 +219,8 @@ export class CircuitRelayTransport implements Transport {
destinationAddr,
relayAddr,
ma,
disconnectOnFailure
disconnectOnFailure,
onProgress: options.onProgress
})
} catch (err: any) {
this.log.error('circuit relay dial to destination %p via relay %p failed', destinationPeer, relayPeer, err)
Expand All @@ -220,12 +237,15 @@ export class CircuitRelayTransport implements Transport {
{
stream, connection, destinationPeer,
destinationAddr, relayAddr, ma,
disconnectOnFailure
disconnectOnFailure,
onProgress
}: ConnectOptions
): Promise<Connection> {
try {
const pbstr = pbStream(stream)
const hopstr = pbstr.pb(HopMessage)

onProgress?.(new CustomProgressEvent('circuit-relay:write-connect-message'))
await hopstr.write({
type: HopMessage.Type.CONNECT,
peer: {
Expand All @@ -234,6 +254,7 @@ export class CircuitRelayTransport implements Transport {
}
})

onProgress?.(new CustomProgressEvent('circuit-relay:read-connect-response'))
const status = await hopstr.read()

if (status.status !== Status.OK) {
Expand All @@ -249,7 +270,8 @@ export class CircuitRelayTransport implements Transport {

this.log('new outbound relayed connection %a', maConn.remoteAddr)
return await this.upgrader.upgradeOutbound(maConn, {
transient: status.limit != null
transient: status.limit != null,
onProgress
})
} catch (err: any) {
this.log.error(`Circuit relay dial to destination ${destinationPeer.toString()} via relay ${connection.remotePeer.toString()} failed`, err)
Expand Down
1 change: 1 addition & 0 deletions packages/transport-tcp/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
"@multiformats/mafmt": "^12.1.6",
"@multiformats/multiaddr": "^12.2.3",
"@types/sinon": "^17.0.3",
"progress-events": "^1.0.0",
"stream-to-it": "^1.0.1"
},
"devDependencies": {
Expand Down
17 changes: 11 additions & 6 deletions packages/transport-tcp/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@
import net from 'net'
import { AbortError, CodeError, serviceCapabilities, transportSymbol } from '@libp2p/interface'
import * as mafmt from '@multiformats/mafmt'
import { CustomProgressEvent } from 'progress-events'
import { CODE_CIRCUIT, CODE_P2P, CODE_UNIX } from './constants.js'
import { type CloseServerOnMaxConnectionsOpts, TCPListener } from './listener.js'
import { toMultiaddrConnection } from './socket-to-conn.js'
import { multiaddrToNetConfig } from './utils.js'
import type { ComponentLogger, Logger, Connection, CounterGroup, Metrics, CreateListenerOptions, DialOptions, Transport, Listener } from '@libp2p/interface'
import type { ComponentLogger, Logger, Connection, CounterGroup, Metrics, CreateListenerOptions, DialOptions, Transport, Listener, OutboundConnectionUpgradeEvents } from '@libp2p/interface'
import type { AbortOptions, Multiaddr } from '@multiformats/multiaddr'
import type { Socket, IpcSocketConnectOpts, TcpSocketConnectOpts } from 'net'
import type { ProgressEvent } from 'progress-events'

export interface TCPOptions {
/**
Expand Down Expand Up @@ -108,7 +110,11 @@ export interface TCPSocketOptions extends AbortOptions {
allowHalfOpen?: boolean
}

export interface TCPDialOptions extends DialOptions, TCPSocketOptions {
export type TCPDialEvents =
OutboundConnectionUpgradeEvents |
ProgressEvent<'tcp:open-connection'>

export interface TCPDialOptions extends DialOptions<TCPDialEvents>, TCPSocketOptions {

}

Expand All @@ -125,7 +131,7 @@ export interface TCPMetrics {
dialerEvents: CounterGroup
}

class TCP implements Transport {
class TCP implements Transport<TCPDialEvents> {
private readonly opts: TCPOptions
private readonly metrics?: TCPMetrics
private readonly components: TCPComponents
Expand Down Expand Up @@ -199,9 +205,8 @@ class TCP implements Transport {
}

async _connect (ma: Multiaddr, options: TCPDialOptions): Promise<Socket> {
if (options.signal?.aborted === true) {
throw new AbortError()
}
options.signal?.throwIfAborted()
options.onProgress?.(new CustomProgressEvent('tcp:open-connection'))

return new Promise<Socket>((resolve, reject) => {
const start = Date.now()
Expand Down
1 change: 1 addition & 0 deletions packages/transport-webrtc/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
"p-defer": "^4.0.1",
"p-event": "^6.0.1",
"p-timeout": "^6.1.2",
"progress-events": "^1.0.0",
"protons-runtime": "^5.4.0",
"race-signal": "^1.0.2",
"react-native-webrtc": "^118.0.7",
Expand Down
Loading

0 comments on commit abb9f90

Please sign in to comment.