Skip to content

Commit

Permalink
fix: improve sessions implementation (#495)
Browse files Browse the repository at this point in the history
Moves most common session code into an abstract superclass to remove
duplication.

- Sessions are created synchronously
- The root CID of a session is filled on the first CID retrieval
- Providers are found and queried for the root block directly, any that have it are added to the session
- Providers that have errored (e.g. protocol selection failure) are excluded from the session
- Bitswap only queries provider peers, not directly connected peers
- HTTP Gatways are loaded from the routing
- When providers are returned without multiaddrs we try to load them without blocking yielding of other providers

---------

Co-authored-by: Russell Dempsey <[email protected]>
  • Loading branch information
achingbrain and SgtPooki authored Apr 15, 2024
1 parent 9a10498 commit 9ea934e
Show file tree
Hide file tree
Showing 31 changed files with 1,665 additions and 787 deletions.
3 changes: 1 addition & 2 deletions packages/bitswap/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,12 @@
},
"dependencies": {
"@helia/interface": "^4.0.0",
"@helia/utils": "^0.1.0",
"@libp2p/interface": "^1.1.2",
"@libp2p/logger": "^4.0.5",
"@libp2p/peer-collections": "^5.1.6",
"@libp2p/utils": "^5.2.3",
"@multiformats/multiaddr": "^12.1.14",
"@multiformats/multiaddr-matcher": "^1.1.2",
"any-signal": "^4.1.1",
"debug": "^4.3.4",
"interface-blockstore": "^5.2.9",
Expand All @@ -165,7 +165,6 @@
"it-length-prefixed": "^9.0.0",
"it-length-prefixed-stream": "^1.1.6",
"it-map": "^3.0.5",
"it-merge": "^3.0.3",
"it-pipe": "^3.0.1",
"it-take": "^3.0.1",
"multiformats": "^13.0.1",
Expand Down
18 changes: 4 additions & 14 deletions packages/bitswap/src/bitswap.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
/* eslint-disable no-loop-func */
import { DEFAULT_SESSION_MAX_PROVIDERS, DEFAULT_SESSION_MIN_PROVIDERS, DEFAULT_SESSION_PROVIDER_QUERY_CONCURRENCY } from '@helia/interface'
import { setMaxListeners } from '@libp2p/interface'
import { anySignal } from 'any-signal'
import { Network } from './network.js'
import { PeerWantLists } from './peer-want-lists/index.js'
import { createBitswapSession } from './session.js'
import { Stats } from './stats.js'
import { WantList } from './want-list.js'
import type { BitswapOptions, Bitswap as BitswapInterface, BitswapWantProgressEvents, BitswapNotifyProgressEvents, BitswapSession, WantListEntry, BitswapComponents, CreateBitswapSessionOptions } from './index.js'
import type { BitswapOptions, Bitswap as BitswapInterface, BitswapWantProgressEvents, BitswapNotifyProgressEvents, WantListEntry, BitswapComponents } from './index.js'
import type { BlockBroker, CreateSessionOptions } from '@helia/interface'
import type { ComponentLogger, PeerId } from '@libp2p/interface'
import type { Logger } from '@libp2p/logger'
import type { AbortOptions } from '@multiformats/multiaddr'
Expand Down Expand Up @@ -62,22 +62,12 @@ export class Bitswap implements BitswapInterface {
}, init)
}

async createSession (root: CID, options?: CreateBitswapSessionOptions): Promise<BitswapSession> {
const minProviders = options?.minProviders ?? DEFAULT_SESSION_MIN_PROVIDERS
const maxProviders = options?.maxProviders ?? DEFAULT_SESSION_MAX_PROVIDERS

createSession (options: CreateSessionOptions = {}): Required<Pick<BlockBroker<BitswapWantProgressEvents>, 'retrieve'>> {
return createBitswapSession({
wantList: this.wantList,
network: this.network,
logger: this.logger
}, {
root,
queryConcurrency: options?.providerQueryConcurrency ?? DEFAULT_SESSION_PROVIDER_QUERY_CONCURRENCY,
minProviders,
maxProviders,
connectedPeers: options?.queryConnectedPeers !== false ? [...this.wantList.peers.keys()] : [],
signal: options?.signal
})
}, options)
}

async want (cid: CID, options: WantOptions = {}): Promise<Uint8Array> {
Expand Down
45 changes: 2 additions & 43 deletions packages/bitswap/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@
import { Bitswap as BitswapClass } from './bitswap.js'
import type { BitswapNetworkNotifyProgressEvents, BitswapNetworkWantProgressEvents } from './network.js'
import type { WantType } from './pb/message.js'
import type { CreateSessionOptions } from '@helia/interface'
import type { BlockBroker, CreateSessionOptions } from '@helia/interface'
import type { Routing } from '@helia/interface/routing'
import type { Libp2p, AbortOptions, Startable, ComponentLogger, Metrics, PeerId } from '@libp2p/interface'
import type { PeerSet } from '@libp2p/peer-collections'
import type { Blockstore } from 'interface-blockstore'
import type { CID } from 'multiformats/cid'
import type { MultihashHasher } from 'multiformats/hashes/interface'
Expand All @@ -29,52 +28,12 @@ export type BitswapWantBlockProgressEvents =
ProgressEvent<'bitswap:want-block:block', CID> |
BitswapNetworkWantProgressEvents

/**
* A bitswap session is a network overlay consisting of peers that all have the
* first block in a file. Subsequent requests will only go to these peers.
*/
export interface BitswapSession {
/**
* The peers in this session
*/
peers: PeerSet

/**
* Fetch an additional CID from this DAG
*/
want(cid: CID, options?: AbortOptions & ProgressOptions<BitswapWantProgressEvents>): Promise<Uint8Array>
}

export interface WantListEntry {
cid: CID
priority: number
wantType: WantType
}

export interface CreateBitswapSessionOptions extends CreateSessionOptions<BitswapWantProgressEvents> {
/**
* If true, query connected peers before searching for providers via
* Helia routers
*
* @default true
*/
queryConnectedPeers?: boolean

/**
* If true, search for providers via Helia routers to query for the root CID
*
* @default true
*/
queryRoutingPeers?: boolean

/**
* The priority to use when querying availability of the root CID
*
* @default 1
*/
priority?: number
}

export interface Bitswap extends Startable {
/**
* Returns the current state of the wantlist
Expand All @@ -100,7 +59,7 @@ export interface Bitswap extends Startable {
/**
* Start a session to retrieve a file from the network
*/
createSession(root: CID, options?: AbortOptions & ProgressOptions<BitswapWantProgressEvents>): Promise<BitswapSession>
createSession(options?: CreateSessionOptions<BitswapWantProgressEvents>): Required<Pick<BlockBroker<BitswapWantProgressEvents>, 'retrieve'>>
}

export interface MultihashHasherLoader {
Expand Down
39 changes: 11 additions & 28 deletions packages/bitswap/src/network.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { CodeError, TypedEventEmitter, setMaxListeners } from '@libp2p/interface'
import { PeerQueue, type PeerQueueJobOptions } from '@libp2p/utils/peer-queue'
import { Circuit } from '@multiformats/multiaddr-matcher'
import { anySignal } from 'any-signal'
import debug from 'debug'
import drain from 'it-drain'
Expand Down Expand Up @@ -129,14 +128,14 @@ export class Network extends TypedEventEmitter<NetworkEvents> {
this.messageSendTimeout = init.messageSendTimeout ?? DEFAULT_MESSAGE_SEND_TIMEOUT
this.runOnTransientConnections = init.runOnTransientConnections ?? DEFAULT_RUN_ON_TRANSIENT_CONNECTIONS
this.metrics = {
blocksSent: components.libp2p.metrics?.registerCounter('ipfs_bitswap_sent_blocks_total'),
dataSent: components.libp2p.metrics?.registerCounter('ipfs_bitswap_sent_data_bytes_total')
blocksSent: components.libp2p.metrics?.registerCounter('helia_bitswap_sent_blocks_total'),
dataSent: components.libp2p.metrics?.registerCounter('helia_bitswap_sent_data_bytes_total')
}

this.sendQueue = new PeerQueue({
concurrency: init.messageSendConcurrency ?? DEFAULT_MESSAGE_SEND_CONCURRENCY,
metrics: components.libp2p.metrics,
metricName: 'ipfs_bitswap_message_send_queue'
metricName: 'helia_bitswap_message_send_queue'
})
this.sendQueue.addEventListener('error', (evt) => {
this.log.error('error sending wantlist to peer', evt.detail)
Expand Down Expand Up @@ -263,29 +262,12 @@ export class Network extends TypedEventEmitter<NetworkEvents> {
options?.onProgress?.(new CustomProgressEvent<PeerId>('bitswap:network:find-providers', cid))

for await (const provider of this.routing.findProviders(cid, options)) {
// unless we explicitly run on transient connections, skip peers that only
// have circuit relay addresses as bitswap won't run over them
if (!this.runOnTransientConnections) {
let hasDirectAddress = false

for (let ma of provider.multiaddrs) {
if (ma.getPeerId() == null) {
ma = ma.encapsulate(`/p2p/${provider.id}`)
}

if (!Circuit.exactMatch(ma)) {
hasDirectAddress = true
break
}
}

if (!hasDirectAddress) {
continue
}
}
// make sure we can dial the provider
const dialable = await this.libp2p.isDialable(provider.multiaddrs, {
runOnTransientConnection: this.runOnTransientConnections
})

// ignore non-bitswap providers
if (provider.protocols?.includes('transport-bitswap') === false) {
if (!dialable) {
continue
}

Expand Down Expand Up @@ -327,8 +309,9 @@ export class Network extends TypedEventEmitter<NetworkEvents> {
pendingBytes: msg.pendingBytes ?? 0
}

const signal = anySignal([AbortSignal.timeout(this.messageSendTimeout), options?.signal])
setMaxListeners(Infinity, signal)
const timeoutSignal = AbortSignal.timeout(this.messageSendTimeout)
const signal = anySignal([timeoutSignal, options?.signal])
setMaxListeners(Infinity, timeoutSignal, signal)

try {
const existingJob = this.sendQueue.queue.find(job => {
Expand Down
2 changes: 1 addition & 1 deletion packages/bitswap/src/peer-want-lists/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ export class PeerWantLists {
this.log = components.logger.forComponent('helia:bitswap:peer-want-lists')

this.ledgerMap = trackedPeerMap({
name: 'ipfs_bitswap_ledger_map',
name: 'helia_bitswap_ledger_map',
metrics: components.libp2p.metrics
})

Expand Down
Loading

0 comments on commit 9ea934e

Please sign in to comment.