Skip to content

Commit

Permalink
Ensure connection between AsyncKeepLastQueue and its parent
Browse files Browse the repository at this point in the history
  • Loading branch information
nktpro committed Feb 10, 2024
1 parent 3cf4685 commit 5851d4c
Showing 1 changed file with 17 additions and 5 deletions.
22 changes: 17 additions & 5 deletions src/async_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -416,18 +416,24 @@ export abstract class AsyncReadonlyQueue<T> {
});
}

conflate<U>(reducer: (prior: T, next: T) => Promise<T> | T): AsyncReadonlyQueue<T> {
conflate<U>(reducer: (prior: T, next: T, signal: AbortSignal) => Promise<T> | T): AsyncReadonlyQueue<T> {
let accumulator: T | undefined = undefined;

const conflatedQueue = new AsyncKeepLastQueue<T>("conflate", this._maxBufferSize);
const abortController = new AbortController();
const conflatedQueue = new AsyncKeepLastQueue<T>(
"conflate",
this._maxBufferSize,
() => abortController.abort(),
[this],
);

(async () => {
for await (const item of this.items()) {
if (conflatedQueue.isCompleted) return;
if (accumulator === undefined) {
accumulator = item;
} else {
accumulator = await reducer(accumulator, item);
accumulator = await reducer(accumulator, item, abortController.signal);
}

conflatedQueue.accumulate(accumulator);
Expand All @@ -451,6 +457,7 @@ export abstract class AsyncReadonlyQueue<T> {
"conflateWithSeed",
this._maxBufferSize,
() => abortController.abort(),
[this],
);

(async () => {
Expand Down Expand Up @@ -487,7 +494,7 @@ export abstract class AsyncReadonlyQueue<T> {
"conflateWithSeedFn",
this._maxBufferSize,
() => abortController.abort(),
undefined,
[this],
() => {
accumulator = undefined;
},
Expand Down Expand Up @@ -527,7 +534,12 @@ export abstract class AsyncReadonlyQueue<T> {
let lastItem: T | undefined = undefined;
let timer: number | undefined = undefined;

const debouncedQueue = new AsyncKeepLastQueue<T>("debounce", this._maxBufferSize);
const debouncedQueue = new AsyncKeepLastQueue<T>(
"debounce",
this._maxBufferSize,
undefined,
[this],
);

(async () => {
for await (const item of this.items()) {
Expand Down

0 comments on commit 5851d4c

Please sign in to comment.