Skip to content

Commit

Permalink
feat: add .echo method to echo protocol (#2766)
Browse files Browse the repository at this point in the history
To make it easier to invoke the echo protocol, add a method to the
service.
  • Loading branch information
achingbrain authored Oct 22, 2024
1 parent c5bbb25 commit 75301ac
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 3 deletions.
2 changes: 2 additions & 0 deletions packages/protocol-echo/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
"dependencies": {
"@libp2p/interface": "^2.1.3",
"@libp2p/interface-internal": "^2.0.8",
"@multiformats/multiaddr": "^12.3.1",
"it-byte-stream": "^1.1.0",
"it-pipe": "^3.0.1"
},
"devDependencies": {
Expand Down
25 changes: 23 additions & 2 deletions packages/protocol-echo/src/echo.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { byteStream } from 'it-byte-stream'
import { pipe } from 'it-pipe'
import { PROTOCOL_NAME, PROTOCOL_VERSION } from './constants.js'
import type { Echo as EchoInterface, EchoComponents, EchoInit } from './index.js'
import type { Logger, Startable } from '@libp2p/interface'
import type { AbortOptions, Logger, PeerId, Startable } from '@libp2p/interface'
import type { Multiaddr } from '@multiformats/multiaddr'

/**
* A simple echo stream, any data received will be sent back to the sender
Expand Down Expand Up @@ -31,7 +33,8 @@ export class Echo implements Startable, EchoInterface {
})
}, {
maxInboundStreams: this.init.maxInboundStreams,
maxOutboundStreams: this.init.maxOutboundStreams
maxOutboundStreams: this.init.maxOutboundStreams,
runOnLimitedConnection: this.init.runOnLimitedConnection
})
this.started = true
}
Expand All @@ -44,4 +47,22 @@ export class Echo implements Startable, EchoInterface {
isStarted (): boolean {
return this.started
}

async echo (peer: PeerId | Multiaddr | Multiaddr[], buf: Uint8Array, options?: AbortOptions): Promise<Uint8Array> {
const conn = await this.components.connectionManager.openConnection(peer, options)
const stream = await conn.newStream(this.protocol, {
...this.init,
...options
})
const bytes = byteStream(stream)

const [, output] = await Promise.all([
bytes.write(buf, options),
bytes.read(buf.byteLength, options)
])

await stream.close(options)

return output.subarray()
}
}
5 changes: 4 additions & 1 deletion packages/protocol-echo/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,15 @@
*/

import { Echo as EchoClass } from './echo.js'
import type { ComponentLogger } from '@libp2p/interface'
import type { ComponentLogger, PeerId } from '@libp2p/interface'
import type { ConnectionManager, Registrar } from '@libp2p/interface-internal'
import type { Multiaddr } from '@multiformats/multiaddr'

export interface EchoInit {
protocolPrefix?: string
maxInboundStreams?: number
maxOutboundStreams?: number
runOnLimitedConnection?: boolean
}

export interface EchoComponents {
Expand All @@ -60,6 +62,7 @@ export interface EchoComponents {

export interface Echo {
protocol: string
echo(peer: PeerId | Multiaddr | Multiaddr[], buf: Uint8Array): Promise<Uint8Array>
}

export function echo (init: EchoInit = {}): (components: EchoComponents) => Echo {
Expand Down
32 changes: 32 additions & 0 deletions packages/protocol-echo/test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import { start, stop } from '@libp2p/interface'
import { defaultLogger } from '@libp2p/logger'
import { multiaddr } from '@multiformats/multiaddr'
import { expect } from 'aegir/chai'
import all from 'it-all'
import { duplexPair } from 'it-pair/duplex'
Expand Down Expand Up @@ -76,4 +77,35 @@ describe('echo', () => {

expect(output).to.equalBytes(input)
})

it('should echo data using method', async () => {
await start(echo)

const duplex = duplexPair<any>()
const outgoingStream = stubInterface<Stream>()
outgoingStream.source = duplex[0].source
outgoingStream.sink.callsFake(async source => duplex[0].sink(source))

const incomingStream = stubInterface<Stream>()
incomingStream.source = duplex[1].source
incomingStream.sink.callsFake(async source => duplex[1].sink(source))

const handler = components.registrar.handle.getCall(0).args[1]
handler({
stream: incomingStream,
connection: stubInterface<Connection>()
})

const ma = multiaddr('/ip4/123.123.123.123/tcp/1234')

components.connectionManager.openConnection.withArgs(ma).resolves(stubInterface<Connection>({
newStream: async () => outgoingStream
}))

const input = Uint8Array.from([0, 1, 2, 3])

const output = await echo.echo(ma, input)

expect(output).to.equalBytes(output)
})
})

0 comments on commit 75301ac

Please sign in to comment.