Skip to content

Commit

Permalink
fix: support uint8arraylists (#149)
Browse files Browse the repository at this point in the history
To make parsing streams more flexible, support uint8arraylists
  • Loading branch information
achingbrain authored Jan 10, 2025
1 parent fd0158f commit b739509
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 1 deletion.
3 changes: 3 additions & 0 deletions packages/it-ndjson/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,8 @@
"aegir": "^45.0.8",
"buffer": "^6.0.3",
"it-all": "^3.0.0"
},
"dependencies": {
"uint8arraylist": "^2.4.8"
}
}
8 changes: 7 additions & 1 deletion packages/it-ndjson/src/parse.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { isUint8ArrayList } from 'uint8arraylist'
import { InvalidMessageLengthError } from './errors.js'
import type { Uint8ArrayList } from 'uint8arraylist'

export interface ParseOptions {
/**
Expand All @@ -7,7 +9,7 @@ export interface ParseOptions {
maxMessageLength?: number
}

export default async function * parse <T> (source: AsyncIterable<Uint8Array | string> | Iterable<Uint8Array | string>, opts: ParseOptions = {}): AsyncGenerator<T, void, undefined> {
export default async function * parse <T> (source: AsyncIterable<Uint8Array | Uint8ArrayList | string> | Iterable<Uint8Array | Uint8ArrayList | string>, opts: ParseOptions = {}): AsyncGenerator<T, void, undefined> {
const matcher = /\r?\n/
const decoder = new TextDecoder('utf8')
let buffer = ''
Expand All @@ -17,6 +19,10 @@ export default async function * parse <T> (source: AsyncIterable<Uint8Array | st
chunk = new TextEncoder().encode(chunk)
}

if (isUint8ArrayList(chunk)) {
chunk = chunk.subarray()
}

buffer += decoder.decode(chunk, { stream: true })

if (buffer.length > (opts?.maxMessageLength ?? buffer.length)) {
Expand Down
15 changes: 15 additions & 0 deletions packages/it-ndjson/test/index.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Buffer } from 'buffer'
import { expect } from 'aegir/chai'
import all from 'it-all'
import { Uint8ArrayList } from 'uint8arraylist'
import * as ndjson from '../src/index.js'

async function * toAsyncIterator <T> (array: T[]): AsyncIterable<T> {
Expand Down Expand Up @@ -74,6 +75,20 @@ describe('it-ndjson', () => {
expect(results).to.deep.equal([{ id: 1 }, { id: 2 }, { id: 3 }])
})

it('should split from Uint8ArrayLists', async () => {
const source = toAsyncIterator([new Uint8ArrayList(toUint8Array('{ "id": 1 }\n{ "i')), new Uint8ArrayList(toUint8Array('d": 2 }')), new Uint8ArrayList(toUint8Array('\n{"id":3}'))])
const results = await all(ndjson.parse(source))

expect(results).to.deep.equal([{ id: 1 }, { id: 2 }, { id: 3 }])
})

it('should split from Uint8ArrayLists with multiple chunks', async () => {
const source = toAsyncIterator([new Uint8ArrayList(toUint8Array('{ "id": 1 }\n{ "i'), toUint8Array('d": 2 }')), new Uint8ArrayList(toUint8Array('\n{"id":3}'))])
const results = await all(ndjson.parse(source))

expect(results).to.deep.equal([{ id: 1 }, { id: 2 }, { id: 3 }])
})

it('should round trip', async () => {
const input = '{"id":1}\n{"id":2}\n{"id":3}\n'
const source = toAsyncIterator([input])
Expand Down

0 comments on commit b739509

Please sign in to comment.