diff --git a/packages/interface-compliance-tests/package.json b/packages/interface-compliance-tests/package.json index c2af0e3642..0ca8f23319 100644 --- a/packages/interface-compliance-tests/package.json +++ b/packages/interface-compliance-tests/package.json @@ -124,6 +124,7 @@ "aegir": "^42.2.3", "delay": "^6.0.0", "it-all": "^3.0.4", + "it-byte-stream": "^1.0.8", "it-drain": "^3.0.5", "it-map": "^3.0.5", "it-ndjson": "^1.0.5", diff --git a/packages/interface-compliance-tests/src/stream-muxer/base-test.ts b/packages/interface-compliance-tests/src/stream-muxer/base-test.ts index 39456311dd..b78ba56399 100644 --- a/packages/interface-compliance-tests/src/stream-muxer/base-test.ts +++ b/packages/interface-compliance-tests/src/stream-muxer/base-test.ts @@ -1,5 +1,6 @@ import { expect } from 'aegir/chai' import all from 'it-all' +import { byteStream } from 'it-byte-stream' import drain from 'it-drain' import map from 'it-map' import { duplexPair } from 'it-pair/duplex' @@ -191,5 +192,247 @@ export default (common: TestSetup): void => { expect(listenerChunks).to.be.eql(['hey']) expect(dialerChunks).to.be.eql(['hello']) }) + + it('should echo a small value via a pipe', async () => { + const p = duplexPair() + const onDialerStreamPromise: DeferredPromise = defer() + const onDataReceivedPromise: DeferredPromise = defer() + const dialerFactory = await common.setup() + const dialer = dialerFactory.createStreamMuxer({ + direction: 'outbound', + onIncomingStream: (stream) => { + onDialerStreamPromise.resolve(stream) + } + }) + const listenerFactory = await common.setup() + const listener = listenerFactory.createStreamMuxer({ + direction: 'inbound', + onIncomingStream: (stream) => { + void Promise.resolve().then(async () => { + const output = new Uint8ArrayList() + + for await (const buf of stream.source) { + output.append(buf) + } + + onDataReceivedPromise.resolve(output.subarray()) + }) + } + }) + + void pipe(p[0], dialer, p[0]) + void pipe(p[1], listener, p[1]) + + const stream = await dialer.newStream() + const input = Uint8Array.from([0, 1, 2, 3, 4]) + + await pipe( + [input], + stream + ) + await stream.close() + + expect(await onDataReceivedPromise.promise).to.equalBytes(input) + }) + + it('should echo a large value via a pipe', async () => { + const p = duplexPair() + const onDialerStreamPromise: DeferredPromise = defer() + const onDataReceivedPromise: DeferredPromise = defer() + const dialerFactory = await common.setup() + const dialer = dialerFactory.createStreamMuxer({ + direction: 'outbound', + onIncomingStream: (stream) => { + onDialerStreamPromise.resolve(stream) + } + }) + const listenerFactory = await common.setup() + const listener = listenerFactory.createStreamMuxer({ + direction: 'inbound', + onIncomingStream: (stream) => { + void Promise.resolve().then(async () => { + const output = new Uint8ArrayList() + + for await (const buf of stream.source) { + output.append(buf) + } + + onDataReceivedPromise.resolve(output.subarray()) + }) + } + }) + + void pipe(p[0], dialer, p[0]) + void pipe(p[1], listener, p[1]) + + const stream = await dialer.newStream() + const input = Uint8Array.from(new Array(1024 * 1024 * 10).fill(0)) + + await pipe( + [input], + stream + ) + await stream.close() + + expect(await onDataReceivedPromise.promise).to.equalBytes(input) + }) + + it('should echo a small value via sink', async () => { + const p = duplexPair() + const onDialerStreamPromise: DeferredPromise = defer() + const onDataReceivedPromise: DeferredPromise = defer() + const dialerFactory = await common.setup() + const dialer = dialerFactory.createStreamMuxer({ + direction: 'outbound', + onIncomingStream: (stream) => { + onDialerStreamPromise.resolve(stream) + } + }) + const listenerFactory = await common.setup() + const listener = listenerFactory.createStreamMuxer({ + direction: 'inbound', + onIncomingStream: (stream) => { + void Promise.resolve().then(async () => { + const output = new Uint8ArrayList() + + for await (const buf of stream.source) { + output.append(buf) + } + + onDataReceivedPromise.resolve(output.subarray()) + }) + } + }) + + void pipe(p[0], dialer, p[0]) + void pipe(p[1], listener, p[1]) + + const stream = await dialer.newStream() + const input = Uint8Array.from([0, 1, 2, 3, 4]) + + await stream.sink([input]) + await stream.close() + + expect(await onDataReceivedPromise.promise).to.equalBytes(input) + }) + + it('should echo a large value via sink', async () => { + const p = duplexPair() + const onDialerStreamPromise: DeferredPromise = defer() + const onDataReceivedPromise: DeferredPromise = defer() + const dialerFactory = await common.setup() + const dialer = dialerFactory.createStreamMuxer({ + direction: 'outbound', + onIncomingStream: (stream) => { + onDialerStreamPromise.resolve(stream) + } + }) + const listenerFactory = await common.setup() + const listener = listenerFactory.createStreamMuxer({ + direction: 'inbound', + onIncomingStream: (stream) => { + void Promise.resolve().then(async () => { + const output = new Uint8ArrayList() + + for await (const buf of stream.source) { + output.append(buf) + } + + onDataReceivedPromise.resolve(output.subarray()) + }) + } + }) + + void pipe(p[0], dialer, p[0]) + void pipe(p[1], listener, p[1]) + + const stream = await dialer.newStream() + const input = Uint8Array.from(new Array(1024 * 1024 * 10).fill(0)) + + await stream.sink([input]) + await stream.close() + + expect(await onDataReceivedPromise.promise).to.equalBytes(input) + }) + + it('should echo a small value via a pushable', async () => { + const p = duplexPair() + const onDialerStreamPromise: DeferredPromise = defer() + const onDataReceivedPromise: DeferredPromise = defer() + const dialerFactory = await common.setup() + const dialer = dialerFactory.createStreamMuxer({ + direction: 'outbound', + onIncomingStream: (stream) => { + onDialerStreamPromise.resolve(stream) + } + }) + const listenerFactory = await common.setup() + const listener = listenerFactory.createStreamMuxer({ + direction: 'inbound', + onIncomingStream: (stream) => { + void Promise.resolve().then(async () => { + const output = new Uint8ArrayList() + + for await (const buf of stream.source) { + output.append(buf) + } + + onDataReceivedPromise.resolve(output.subarray()) + }) + } + }) + + void pipe(p[0], dialer, p[0]) + void pipe(p[1], listener, p[1]) + + const stream = await dialer.newStream() + const input = Uint8Array.from([0, 1, 2, 3, 4]) + + const pushable = byteStream(stream) + await pushable.write(input) + await pushable.unwrap().close() + + expect(await onDataReceivedPromise.promise).to.equalBytes(input) + }) + + it('should echo a large value via a pushable', async () => { + const p = duplexPair() + const onDialerStreamPromise: DeferredPromise = defer() + const onDataReceivedPromise: DeferredPromise = defer() + const dialerFactory = await common.setup() + const dialer = dialerFactory.createStreamMuxer({ + direction: 'outbound', + onIncomingStream: (stream) => { + onDialerStreamPromise.resolve(stream) + } + }) + const listenerFactory = await common.setup() + const listener = listenerFactory.createStreamMuxer({ + direction: 'inbound', + onIncomingStream: (stream) => { + void Promise.resolve().then(async () => { + const output = new Uint8ArrayList() + + for await (const buf of stream.source) { + output.append(buf) + } + + onDataReceivedPromise.resolve(output.subarray()) + }) + } + }) + + void pipe(p[0], dialer, p[0]) + void pipe(p[1], listener, p[1]) + + const stream = await dialer.newStream() + const input = Uint8Array.from(new Array(1024 * 1024 * 10).fill(0)) + + const pushable = byteStream(stream) + await pushable.write(input) + await pushable.unwrap().close() + + expect(await onDataReceivedPromise.promise).to.equalBytes(input) + }) }) }