Skip to content

Commit

Permalink
fix: only accept lists of messages in encoder (libp2p#236)
Browse files Browse the repository at this point in the history
At runtime the encoder is supplied lists of messages by a `pushableV`, the only time the source has individual messages is during test runs so simplify by only accepting lists of messages.
  • Loading branch information
achingbrain authored Nov 25, 2022
1 parent 084d3dc commit 4175cac
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 35 deletions.
10 changes: 3 additions & 7 deletions src/encode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,11 @@ const encoder = new Encoder()
/**
* Encode and yield one or more messages
*/
export async function * encode (source: Source<Message | Message[]>) {
for await (const msg of source) {
export async function * encode (source: Source<Message[]>) {
for await (const msgs of source) {
const list = new Uint8ArrayList()

if (Array.isArray(msg)) {
for (const m of msg) {
encoder.write(m, list)
}
} else {
for (const msg of msgs) {
encoder.write(msg, list)
}

Expand Down
16 changes: 8 additions & 8 deletions test/coder.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { Uint8ArrayList } from 'uint8arraylist'

describe('coder', () => {
it('should encode header', async () => {
const source: Message[] = [{ id: 17, type: 0, data: new Uint8ArrayList(uint8ArrayFromString('17')) }]
const source: Message[][] = [[{ id: 17, type: 0, data: new Uint8ArrayList(uint8ArrayFromString('17')) }]]

const data = uint8ArrayConcat(await all(encode(source)))

Expand All @@ -29,34 +29,34 @@ describe('coder', () => {
})

it('should encode several msgs into buffer', async () => {
const source: Message[] = [
const source: Message[][] = [[
{ id: 17, type: 0, data: new Uint8ArrayList(uint8ArrayFromString('17')) },
{ id: 19, type: 0, data: new Uint8ArrayList(uint8ArrayFromString('19')) },
{ id: 21, type: 0, data: new Uint8ArrayList(uint8ArrayFromString('21')) }
]
]]

const data = uint8ArrayConcat(await all(encode(source)))

expect(data).to.equalBytes(uint8ArrayFromString('88010231379801023139a801023231', 'base16'))
})

it('should encode from Uint8ArrayList', async () => {
const source: NewStreamMessage[] = [{
const source: NewStreamMessage[][] = [[{
id: 17,
type: 0,
data: new Uint8ArrayList(
uint8ArrayFromString(Math.random().toString()),
uint8ArrayFromString(Math.random().toString())
)
}]
}]]

const data = uint8ArrayConcat(await all(encode(source)))

expect(data).to.equalBytes(
uint8ArrayConcat([
uint8ArrayFromString('8801', 'base16'),
Uint8Array.from([source[0].data.length]),
source[0].data instanceof Uint8Array ? source[0].data : source[0].data.slice()
Uint8Array.from([source[0][0].data.length]),
source[0][0].data instanceof Uint8Array ? source[0][0].data : source[0][0].data.slice()
])
)
})
Expand All @@ -77,7 +77,7 @@ describe('coder', () => {
})

it('should encode zero length body msg', async () => {
const source: Message[] = [{ id: 17, type: 0 }]
const source: Message[][] = [[{ id: 17, type: 0 }]]

const data = uint8ArrayConcat(await all(encode(source)))

Expand Down
16 changes: 8 additions & 8 deletions test/mplex.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,23 @@ describe('mplex', () => {

// max out the streams for this connection
for (let i = 0; i < maxInboundStreams; i++) {
const source: NewStreamMessage[] = [{
const source: NewStreamMessage[][] = [[{
id: i,
type: 0,
data: new Uint8ArrayList(uint8ArrayFromString('17'))
}]
}]]

const data = uint8ArrayConcat(await all(encode(source)))

stream.push(data)
}

// simulate a new incoming stream
const source: NewStreamMessage[] = [{
const source: NewStreamMessage[][] = [[{
id: 11,
type: 0,
data: new Uint8ArrayList(uint8ArrayFromString('17'))
}]
}]]

const data = uint8ArrayConcat(await all(encode(source)))

Expand Down Expand Up @@ -89,13 +89,13 @@ describe('mplex', () => {
const id = 17

// simulate a new incoming stream that sends lots of data
const input: Source<Message> = (async function * send () {
const input: Source<Message[]> = (async function * send () {
const newStreamMessage: NewStreamMessage = {
id,
type: MessageTypes.NEW_STREAM,
data: new Uint8ArrayList(new Uint8Array(1024))
}
yield newStreamMessage
yield [newStreamMessage]

await delay(10)

Expand All @@ -105,7 +105,7 @@ describe('mplex', () => {
type: MessageTypes.MESSAGE_INITIATOR,
data: new Uint8ArrayList(new Uint8Array(1024 * 1000))
}
yield dataMessage
yield [dataMessage]

sent++

Expand All @@ -118,7 +118,7 @@ describe('mplex', () => {
id,
type: MessageTypes.CLOSE_INITIATOR
}
yield closeMessage
yield [closeMessage]
})()

// create the muxer
Expand Down
24 changes: 12 additions & 12 deletions test/restrict-size.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ describe('restrict size', () => {
it('should throw when size is too big', async () => {
const maxSize = 32

const input: Message[] = [
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(8)) },
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) },
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(maxSize)) },
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(64)) }
const input: Message[][] = [
[{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(8)) }],
[{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) }],
[{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(maxSize)) }],
[{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(64)) }]
]

const output: Message[] = []
Expand All @@ -38,9 +38,9 @@ describe('restrict size', () => {
} catch (err: any) {
expect(err).to.have.property('code', 'ERR_MSG_TOO_BIG')
expect(output).to.have.length(3)
expect(output[0]).to.deep.equal(input[0])
expect(output[1]).to.deep.equal(input[1])
expect(output[2]).to.deep.equal(input[2])
expect(output[0]).to.deep.equal(input[0][0])
expect(output[1]).to.deep.equal(input[1][0])
expect(output[2]).to.deep.equal(input[2][0])
return
}
throw new Error('did not restrict size')
Expand All @@ -51,30 +51,30 @@ describe('restrict size', () => {
id: 4,
type: MessageTypes.CLOSE_RECEIVER
}
const input: Message[] = [message]
const input: Message[][] = [[message]]

const output = await pipe(
input,
encode,
decode(32),
async (source) => await all(source)
)
expect(output).to.deep.equal(input)
expect(output).to.deep.equal(input[0])
})

it('should throw when unprocessed message queue size is too big', async () => {
const maxMessageSize = 32
const maxUnprocessedMessageQueueSize = 64

const input: Message[] = [
const input: Message[][] = [[
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) },
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) },
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) },
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) },
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) },
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) },
{ id: 0, type: 1, data: new Uint8ArrayList(randomBytes(16)) }
]
]]

const output: Message[] = []

Expand Down

0 comments on commit 4175cac

Please sign in to comment.