Skip to content

Commit

Permalink
perf: fix backpressure on readables (#399)
Browse files Browse the repository at this point in the history
  • Loading branch information
jeswr authored Jul 29, 2024
1 parent 73a9ef1 commit f228aea
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 8 deletions.
33 changes: 25 additions & 8 deletions src/N3Store.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import { default as N3DataFactory, termToId, termFromId } from './N3DataFactory'
import namespaces from './IRIs';
import { isDefaultGraph } from './N3Util';

const ITERATOR = Symbol('iter');

// ## Constructor
export class N3EntityIndex {
constructor(options = {}) {
this._id = 1;
Expand Down Expand Up @@ -386,10 +389,16 @@ export default class N3Store {
removeMatches(subject, predicate, object, graph) {
const stream = new Readable({ objectMode: true });

stream._read = () => {
for (const quad of this.readQuads(subject, predicate, object, graph))
stream.push(quad);
stream.push(null);
const iterable = this.readQuads(subject, predicate, object, graph);
stream._read = size => {
while (size-- > 0) {
const { done, value } = iterable.next();
if (done) {
stream.push(null);
return;
}
stream.push(value);
}
};

return this.remove(stream);
Expand Down Expand Up @@ -805,10 +814,18 @@ class DatasetCoreAndReadableStream extends Readable {
return this.filtered.size;
}

_read() {
for (const quad of this)
this.push(quad);
this.push(null);
_read(size) {
if (size > 0 && !this[ITERATOR])
this[ITERATOR] = this[Symbol.iterator]();
const iterable = this[ITERATOR];
while (size-- > 0) {
const { done, value } = iterable.next();
if (done) {
this.push(null);
return;
}
this.push(value);
}
}

add(quad) {
Expand Down
14 changes: 14 additions & 0 deletions test/N3Store-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,14 @@ describe('Store', () => {
});

describe('with an existing subject parameter', () => {
const largeStore = new Store([]);
const results = [];
for (let i = 0; i < 100; i += 1) {
largeStore.add(new Quad(`s${i}`, 'p1', 'o1'));
largeStore.add(new Quad(`s${i}`, 'p2', 'o1'));
results.push([`s${i}`, 'p2', 'o1']);
}

it(
'should return all items with this subject in all graphs',
forResultStream(shouldIncludeAll, store.match(new NamedNode('s1'), null, null),
Expand All @@ -657,6 +665,12 @@ describe('Store', () => {
['s1', 'p1', 'o1', 'c4'])
);

it(
'should return all items with this subject in all graphs in a large store',
forResultStream(shouldIncludeAll, largeStore.match(null, new NamedNode('p2'), null),
...results)
);

it('should return an object implementing the DatasetCore interface', () => {
const subject = new NamedNode('s1');
const dataset = store.match(subject, null, null);
Expand Down

0 comments on commit f228aea

Please sign in to comment.