Skip to content
This repository has been archived by the owner on Jun 27, 2023. It is now read-only.

fix!: remove @libp2p/components #106

Merged
merged 1 commit into from
Oct 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@
"release": "aegir release"
},
"dependencies": {
"@libp2p/components": "^3.1.1",
"@libp2p/crypto": "^1.0.0",
"@libp2p/interface-connection": "^3.0.1",
"@libp2p/interface-peer-id": "^1.0.2",
Expand All @@ -193,10 +192,10 @@
"it-length-prefixed": "^8.0.2",
"it-pipe": "^2.0.3",
"it-pushable": "^3.0.0",
"multiformats": "^9.6.3",
"multiformats": "^10.0.0",
"p-queue": "^7.2.0",
"uint8arraylist": "^2.0.0",
"uint8arrays": "^3.0.0"
"uint8arrays": "^4.0.2"
},
"devDependencies": {
"@libp2p/peer-id-factory": "^1.0.0",
Expand All @@ -205,8 +204,8 @@
"it-pair": "^2.0.2",
"p-defer": "^4.0.0",
"p-wait-for": "^5.0.0",
"protons": "^5.1.0",
"protons-runtime": "^3.1.0",
"protons": "^6.0.0",
"protons-runtime": "^4.0.1",
"sinon": "^14.0.0",
"util": "^0.12.4"
}
Expand Down
35 changes: 18 additions & 17 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,24 @@ import {
verifySignature
} from './sign.js'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { IncomingStreamData } from '@libp2p/interface-registrar'
import type { IncomingStreamData, Registrar } from '@libp2p/interface-registrar'
import type { Connection } from '@libp2p/interface-connection'
import { PubSub, Message, StrictNoSign, StrictSign, PubSubInit, PubSubEvents, PeerStreams, PubSubRPCMessage, PubSubRPC, PubSubRPCSubscription, SubscriptionChangeData, PublishResult, TopicValidatorFn, TopicValidatorResult } from '@libp2p/interface-pubsub'
import { PeerMap, PeerSet } from '@libp2p/peer-collections'
import { Components, Initializable } from '@libp2p/components'
import type { Uint8ArrayList } from 'uint8arraylist'

const log = logger('libp2p:pubsub')

export interface PubSubComponents {
peerId: PeerId
registrar: Registrar
}

/**
* PubSubBaseProtocol handles the peers and connections logic for pubsub routers
* and specifies the API that pubsub routers should have.
*/
export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = PubSubEvents> extends EventEmitter<Events> implements PubSub<Events>, Initializable {
export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = PubSubEvents> extends EventEmitter<Events> implements PubSub<Events> {
public started: boolean
/**
* Map of topics to which peers are subscribed to
Expand Down Expand Up @@ -60,14 +64,14 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
public topicValidators: Map<string, TopicValidatorFn>
public queue: Queue
public multicodecs: string[]
public components: Components = new Components()
public components: PubSubComponents

private _registrarTopologyIds: string[] | undefined
protected enabled: boolean
private readonly maxInboundStreams: number
private readonly maxOutboundStreams: number

constructor (props: PubSubInit) {
constructor (components: PubSubComponents, props: PubSubInit) {
super()

const {
Expand All @@ -80,6 +84,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
maxOutboundStreams = 1
} = props

this.components = components
this.multicodecs = ensureArray(multicodecs)
this.enabled = props.enabled !== false
this.started = false
Expand All @@ -99,10 +104,6 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
this._onPeerDisconnected = this._onPeerDisconnected.bind(this)
}

init (components: Components) {
this.components = components
}

// LIFECYCLE METHODS

/**
Expand All @@ -117,7 +118,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P

log('starting')

const registrar = this.components.getRegistrar()
const registrar = this.components.registrar
// Incoming streams
// Called after a peer dials us
await Promise.all(this.multicodecs.map(async multicodec => await registrar.handle(multicodec, this._onIncomingStream, {
Expand Down Expand Up @@ -145,7 +146,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
return
}

const registrar = this.components.getRegistrar()
const registrar = this.components.registrar

// unregister protocol and handlers
if (this._registrarTopologyIds != null) {
Expand Down Expand Up @@ -412,7 +413,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
* Handles a message from a peer
*/
async processMessage (from: PeerId, msg: Message) {
if (this.components.getPeerId().equals(from) && !this.emitSelf) {
if (this.components.peerId.equals(from) && !this.emitSelf) {
return
}

Expand All @@ -425,7 +426,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
}

if (this.subscriptions.has(msg.topic)) {
const isFromSelf = this.components.getPeerId().equals(from)
const isFromSelf = this.components.peerId.equals(from)

if (!isFromSelf || this.emitSelf) {
super.dispatchEvent(new CustomEvent<Message>('message', {
Expand Down Expand Up @@ -584,7 +585,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
const signaturePolicy = this.globalSignaturePolicy
switch (signaturePolicy) {
case 'StrictSign':
return await signMessage(this.components.getPeerId(), message, this.encodeMessage.bind(this))
return await signMessage(this.components.peerId, message, this.encodeMessage.bind(this))
case 'StrictNoSign':
return await Promise.resolve({
type: 'unsigned',
Expand Down Expand Up @@ -627,7 +628,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
}

const message = {
from: this.components.getPeerId(),
from: this.components.peerId,
topic,
data: data ?? new Uint8Array(0),
sequenceNumber: randomSeqno()
Expand All @@ -649,10 +650,10 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
}

// send to all the other peers
const result = await this.publishMessage(this.components.getPeerId(), rpcMessage)
const result = await this.publishMessage(this.components.peerId, rpcMessage)

if (emittedToSelf) {
result.recipients = [...result.recipients, this.components.getPeerId()]
result.recipients = [...result.recipients, this.components.peerId]
}

return result
Expand Down
15 changes: 6 additions & 9 deletions test/emit-self.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import {
} from './utils/index.js'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import delay from 'delay'
import { Components } from '@libp2p/components'

const protocol = '/pubsub/1.0.0'
const topic = 'foo'
Expand All @@ -21,13 +20,12 @@ describe('emitSelf', () => {
const peerId = await createPeerId()

pubsub = new PubsubImplementation({
peerId,
registrar: new MockRegistrar()
}, {
multicodecs: [protocol],
emitSelf: true
})
pubsub.init(new Components({
peerId,
registrar: new MockRegistrar()
}))
})

before(async () => {
Expand Down Expand Up @@ -77,13 +75,12 @@ describe('emitSelf', () => {
const peerId = await createPeerId()

pubsub = new PubsubImplementation({
peerId,
registrar: new MockRegistrar()
}, {
multicodecs: [protocol],
emitSelf: false
})
pubsub.init(new Components({
peerId,
registrar: new MockRegistrar()
}))
})

before(async () => {
Expand Down
11 changes: 9 additions & 2 deletions test/instance.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import { expect } from 'aegir/chai'
import { PubSubBaseProtocol } from '../src/index.js'
import type { PublishResult, PubSubRPC, PubSubRPCMessage } from '@libp2p/interface-pubsub'
import type { Uint8ArrayList } from 'uint8arraylist'
import { MockRegistrar } from './utils/index.js'
import { createEd25519PeerId } from '@libp2p/peer-id-factory'

class PubsubProtocol extends PubSubBaseProtocol {
decodeRpc (bytes: Uint8Array): PubSubRPC {
Expand Down Expand Up @@ -33,9 +35,14 @@ describe('pubsub instance', () => {
}).to.throw()
})

it('should accept valid parameters', () => {
it('should accept valid parameters', async () => {
const peerId = await createEd25519PeerId()

expect(() => {
new PubsubProtocol({ // eslint-disable-line no-new
return new PubsubProtocol({
peerId,
registrar: new MockRegistrar()
}, { // eslint-disable-line no-new
multicodecs: ['/pubsub/1.0.0']
})
}).not.to.throw()
Expand Down
20 changes: 8 additions & 12 deletions test/lifecycle.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import {
import type { PeerId } from '@libp2p/interface-peer-id'
import type { Registrar } from '@libp2p/interface-registrar'
import type { PublishResult, PubSubRPC, PubSubRPCMessage } from '@libp2p/interface-pubsub'
import { Components } from '@libp2p/components'
import type { Uint8ArrayList } from 'uint8arraylist'

class PubsubProtocol extends PubSubBaseProtocol {
Expand Down Expand Up @@ -52,12 +51,11 @@ describe('pubsub base lifecycle', () => {
}

pubsub = new PubsubProtocol({
multicodecs: ['/pubsub/1.0.0']
})
pubsub.init(new Components({
peerId: peerId,
registrar: sinonMockRegistrar
}))
}, {
multicodecs: ['/pubsub/1.0.0']
})

expect(pubsub.peers.size).to.be.eql(0)
})
Expand Down Expand Up @@ -112,19 +110,17 @@ describe('pubsub base lifecycle', () => {
registrarB = new MockRegistrar()

pubsubA = new PubsubImplementation({
multicodecs: [protocol]
})
pubsubA.init(new Components({
peerId: peerIdA,
registrar: registrarA
}))
pubsubB = new PubsubImplementation({
}, {
multicodecs: [protocol]
})
pubsubB.init(new Components({
pubsubB = new PubsubImplementation({
peerId: peerIdB,
registrar: registrarB
}))
}, {
multicodecs: [protocol]
})
})

// start pubsub
Expand Down
8 changes: 3 additions & 5 deletions test/message.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import {
} from './utils/index.js'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { Message } from '@libp2p/interface-pubsub'
import { Components } from '@libp2p/components'
import { randomSeqno } from '../src/utils.js'

describe('pubsub base messages', () => {
Expand All @@ -19,12 +18,11 @@ describe('pubsub base messages', () => {
before(async () => {
peerId = await createPeerId()
pubsub = new PubsubImplementation({
multicodecs: ['/pubsub/1.0.0']
})
pubsub.init(new Components({
peerId: peerId,
registrar: new MockRegistrar()
}))
}, {
multicodecs: ['/pubsub/1.0.0']
})
})

afterEach(() => {
Expand Down
Loading