From 5851d4cca9c696dd7cf570fce60aeeeff1710999 Mon Sep 17 00:00:00 2001 From: Jacky Nguyen Date: Fri, 9 Feb 2024 22:22:20 -0500 Subject: [PATCH] Ensure connection between AsyncKeepLastQueue and its parent --- src/async_queue.ts | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/src/async_queue.ts b/src/async_queue.ts index 8fa634a..d9d5297 100644 --- a/src/async_queue.ts +++ b/src/async_queue.ts @@ -416,10 +416,16 @@ export abstract class AsyncReadonlyQueue { }); } - conflate(reducer: (prior: T, next: T) => Promise | T): AsyncReadonlyQueue { + conflate(reducer: (prior: T, next: T, signal: AbortSignal) => Promise | T): AsyncReadonlyQueue { let accumulator: T | undefined = undefined; - const conflatedQueue = new AsyncKeepLastQueue("conflate", this._maxBufferSize); + const abortController = new AbortController(); + const conflatedQueue = new AsyncKeepLastQueue( + "conflate", + this._maxBufferSize, + () => abortController.abort(), + [this], + ); (async () => { for await (const item of this.items()) { @@ -427,7 +433,7 @@ export abstract class AsyncReadonlyQueue { if (accumulator === undefined) { accumulator = item; } else { - accumulator = await reducer(accumulator, item); + accumulator = await reducer(accumulator, item, abortController.signal); } conflatedQueue.accumulate(accumulator); @@ -451,6 +457,7 @@ export abstract class AsyncReadonlyQueue { "conflateWithSeed", this._maxBufferSize, () => abortController.abort(), + [this], ); (async () => { @@ -487,7 +494,7 @@ export abstract class AsyncReadonlyQueue { "conflateWithSeedFn", this._maxBufferSize, () => abortController.abort(), - undefined, + [this], () => { accumulator = undefined; }, @@ -527,7 +534,12 @@ export abstract class AsyncReadonlyQueue { let lastItem: T | undefined = undefined; let timer: number | undefined = undefined; - const debouncedQueue = new AsyncKeepLastQueue("debounce", this._maxBufferSize); + const debouncedQueue = new AsyncKeepLastQueue( + "debounce", + this._maxBufferSize, + undefined, + [this], + ); (async () => { for await (const item of this.items()) {