Skip to content

Commit

Permalink
test: add compliance tests for different types of stream sending (#2398)
Browse files Browse the repository at this point in the history
To ensure we can send data over libp2p streams in the various
supported ways, add extra tests to the interface compliance suite.
  • Loading branch information
achingbrain authored Feb 7, 2024
1 parent 0321812 commit 9891ecd
Show file tree
Hide file tree
Showing 2 changed files with 244 additions and 0 deletions.
1 change: 1 addition & 0 deletions packages/interface-compliance-tests/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@
"aegir": "^42.2.3",
"delay": "^6.0.0",
"it-all": "^3.0.4",
"it-byte-stream": "^1.0.8",
"it-drain": "^3.0.5",
"it-map": "^3.0.5",
"it-ndjson": "^1.0.5",
Expand Down
243 changes: 243 additions & 0 deletions packages/interface-compliance-tests/src/stream-muxer/base-test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { expect } from 'aegir/chai'
import all from 'it-all'
import { byteStream } from 'it-byte-stream'
import drain from 'it-drain'
import map from 'it-map'
import { duplexPair } from 'it-pair/duplex'
Expand Down Expand Up @@ -191,5 +192,247 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
expect(listenerChunks).to.be.eql(['hey'])
expect(dialerChunks).to.be.eql(['hello'])
})

it('should echo a small value via a pipe', async () => {
const p = duplexPair<Uint8Array | Uint8ArrayList>()
const onDialerStreamPromise: DeferredPromise<Stream> = defer()
const onDataReceivedPromise: DeferredPromise<Uint8Array> = defer()
const dialerFactory = await common.setup()
const dialer = dialerFactory.createStreamMuxer({
direction: 'outbound',
onIncomingStream: (stream) => {
onDialerStreamPromise.resolve(stream)
}
})
const listenerFactory = await common.setup()
const listener = listenerFactory.createStreamMuxer({
direction: 'inbound',
onIncomingStream: (stream) => {
void Promise.resolve().then(async () => {
const output = new Uint8ArrayList()

for await (const buf of stream.source) {
output.append(buf)
}

onDataReceivedPromise.resolve(output.subarray())
})
}
})

void pipe(p[0], dialer, p[0])
void pipe(p[1], listener, p[1])

const stream = await dialer.newStream()
const input = Uint8Array.from([0, 1, 2, 3, 4])

await pipe(
[input],
stream
)
await stream.close()

expect(await onDataReceivedPromise.promise).to.equalBytes(input)
})

it('should echo a large value via a pipe', async () => {
const p = duplexPair<Uint8Array | Uint8ArrayList>()
const onDialerStreamPromise: DeferredPromise<Stream> = defer()
const onDataReceivedPromise: DeferredPromise<Uint8Array> = defer()
const dialerFactory = await common.setup()
const dialer = dialerFactory.createStreamMuxer({
direction: 'outbound',
onIncomingStream: (stream) => {
onDialerStreamPromise.resolve(stream)
}
})
const listenerFactory = await common.setup()
const listener = listenerFactory.createStreamMuxer({
direction: 'inbound',
onIncomingStream: (stream) => {
void Promise.resolve().then(async () => {
const output = new Uint8ArrayList()

for await (const buf of stream.source) {
output.append(buf)
}

onDataReceivedPromise.resolve(output.subarray())
})
}
})

void pipe(p[0], dialer, p[0])
void pipe(p[1], listener, p[1])

const stream = await dialer.newStream()
const input = Uint8Array.from(new Array(1024 * 1024 * 10).fill(0))

await pipe(
[input],
stream
)
await stream.close()

expect(await onDataReceivedPromise.promise).to.equalBytes(input)
})

it('should echo a small value via sink', async () => {
const p = duplexPair<Uint8Array | Uint8ArrayList>()
const onDialerStreamPromise: DeferredPromise<Stream> = defer()
const onDataReceivedPromise: DeferredPromise<Uint8Array> = defer()
const dialerFactory = await common.setup()
const dialer = dialerFactory.createStreamMuxer({
direction: 'outbound',
onIncomingStream: (stream) => {
onDialerStreamPromise.resolve(stream)
}
})
const listenerFactory = await common.setup()
const listener = listenerFactory.createStreamMuxer({
direction: 'inbound',
onIncomingStream: (stream) => {
void Promise.resolve().then(async () => {
const output = new Uint8ArrayList()

for await (const buf of stream.source) {
output.append(buf)
}

onDataReceivedPromise.resolve(output.subarray())
})
}
})

void pipe(p[0], dialer, p[0])
void pipe(p[1], listener, p[1])

const stream = await dialer.newStream()
const input = Uint8Array.from([0, 1, 2, 3, 4])

await stream.sink([input])
await stream.close()

expect(await onDataReceivedPromise.promise).to.equalBytes(input)
})

it('should echo a large value via sink', async () => {
const p = duplexPair<Uint8Array | Uint8ArrayList>()
const onDialerStreamPromise: DeferredPromise<Stream> = defer()
const onDataReceivedPromise: DeferredPromise<Uint8Array> = defer()
const dialerFactory = await common.setup()
const dialer = dialerFactory.createStreamMuxer({
direction: 'outbound',
onIncomingStream: (stream) => {
onDialerStreamPromise.resolve(stream)
}
})
const listenerFactory = await common.setup()
const listener = listenerFactory.createStreamMuxer({
direction: 'inbound',
onIncomingStream: (stream) => {
void Promise.resolve().then(async () => {
const output = new Uint8ArrayList()

for await (const buf of stream.source) {
output.append(buf)
}

onDataReceivedPromise.resolve(output.subarray())
})
}
})

void pipe(p[0], dialer, p[0])
void pipe(p[1], listener, p[1])

const stream = await dialer.newStream()
const input = Uint8Array.from(new Array(1024 * 1024 * 10).fill(0))

await stream.sink([input])
await stream.close()

expect(await onDataReceivedPromise.promise).to.equalBytes(input)
})

it('should echo a small value via a pushable', async () => {
const p = duplexPair<Uint8Array | Uint8ArrayList>()
const onDialerStreamPromise: DeferredPromise<Stream> = defer()
const onDataReceivedPromise: DeferredPromise<Uint8Array> = defer()
const dialerFactory = await common.setup()
const dialer = dialerFactory.createStreamMuxer({
direction: 'outbound',
onIncomingStream: (stream) => {
onDialerStreamPromise.resolve(stream)
}
})
const listenerFactory = await common.setup()
const listener = listenerFactory.createStreamMuxer({
direction: 'inbound',
onIncomingStream: (stream) => {
void Promise.resolve().then(async () => {
const output = new Uint8ArrayList()

for await (const buf of stream.source) {
output.append(buf)
}

onDataReceivedPromise.resolve(output.subarray())
})
}
})

void pipe(p[0], dialer, p[0])
void pipe(p[1], listener, p[1])

const stream = await dialer.newStream()
const input = Uint8Array.from([0, 1, 2, 3, 4])

const pushable = byteStream(stream)
await pushable.write(input)
await pushable.unwrap().close()

expect(await onDataReceivedPromise.promise).to.equalBytes(input)
})

it('should echo a large value via a pushable', async () => {
const p = duplexPair<Uint8Array | Uint8ArrayList>()
const onDialerStreamPromise: DeferredPromise<Stream> = defer()
const onDataReceivedPromise: DeferredPromise<Uint8Array> = defer()
const dialerFactory = await common.setup()
const dialer = dialerFactory.createStreamMuxer({
direction: 'outbound',
onIncomingStream: (stream) => {
onDialerStreamPromise.resolve(stream)
}
})
const listenerFactory = await common.setup()
const listener = listenerFactory.createStreamMuxer({
direction: 'inbound',
onIncomingStream: (stream) => {
void Promise.resolve().then(async () => {
const output = new Uint8ArrayList()

for await (const buf of stream.source) {
output.append(buf)
}

onDataReceivedPromise.resolve(output.subarray())
})
}
})

void pipe(p[0], dialer, p[0])
void pipe(p[1], listener, p[1])

const stream = await dialer.newStream()
const input = Uint8Array.from(new Array(1024 * 1024 * 10).fill(0))

const pushable = byteStream(stream)
await pushable.write(input)
await pushable.unwrap().close()

expect(await onDataReceivedPromise.promise).to.equalBytes(input)
})
})
}

0 comments on commit 9891ecd

Please sign in to comment.