Skip to content

Commit

Permalink
Fix logic bug discovered by @fcollonval
Browse files Browse the repository at this point in the history
  • Loading branch information
afshin committed Nov 9, 2022
1 parent 91af63a commit 2bc912c
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 44 deletions.
42 changes: 19 additions & 23 deletions packages/signaling/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,9 @@ export interface ISignal<T, U> {
}

/**
* An object that is both a signal and an async iterable iterator.
* An object that is both a signal and an async iterable.
*/
export interface IStream<T, U>
extends ISignal<T, U>,
AsyncIterableIterator<U> {}
export interface IStream<T, U> extends ISignal<T, U>, AsyncIterable<U> {}

/**
* A concrete implementation of `ISignal`.
Expand Down Expand Up @@ -350,15 +348,18 @@ export namespace Signal {
}

/**
* A stream with the characteristics of a signal and an async iterator.
* A stream with the characteristics of a signal and an async iterable.
*/
export class Stream<T, U> extends Signal<T, U> implements IStream<T, U> {
/**
* Return an async iterator that yields every emission.
*/
async *[Symbol.asyncIterator](): AsyncIterableIterator<U> {
while (this.pending) {
yield this.pending.promise;
let pending = this.pending;
while (true) {
const resolved = await pending.promise;
pending = resolved.next;
yield resolved.args;
}
}

Expand All @@ -368,34 +369,29 @@ export class Stream<T, U> extends Signal<T, U> implements IStream<T, U> {
* @param args - The args to pass to the connected slots.
*/
emit(args: U): void {
if (this.blocked) {
return;
if (!this.blocked) {
const { pending } = this;
this.pending = new PromiseDelegate();
pending.resolve({ args, next: this.pending });
super.emit(args);
}
const { pending } = this;
super.emit(args);
this.pending = new PromiseDelegate();
pending.resolve(args);
}

/**
* Await the next value of the stream.
*
* @returns the next async iterator value in the stream.
*/
async next(): Promise<IteratorResult<U>> {
return { value: await this.pending.promise };
}

/**
* A promise that resolves the currently pending iteration.
*/
protected pending = new PromiseDelegate<U>();
protected pending: Private.Pending<U> = new PromiseDelegate();
}

/**
* The namespace for the module implementation details.
*/
namespace Private {
/**
* A pending promise in a promise chain underlying a stream.
*/
export type Pending<U> = PromiseDelegate<{ args: U; next: Pending<U> }>;

/**
* The signal exception handler function.
*/
Expand Down
39 changes: 21 additions & 18 deletions packages/signaling/tests/src/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -585,33 +585,36 @@ describe('@lumino/signaling', () => {

describe('Stream', () => {
describe('#[Symbol.asyncIterator]()', () => {
it('should yield emissions and respected blocking', async () => {
it('should yield emissions and respect blocking', async () => {
const stream = new Stream<unknown, string>({});
const expected = 'async';
const input = 'async';
const expected = 'aINTERRUPTEDsync';
let emitted = '';
let once = true;
stream.connect((_, emitted) => {
if (once) {
once = false;
stream.emit('I');
stream.emit('N');
stream.emit('T');
stream.emit('E');
stream.emit('R');
stream.emit('R');
stream.emit('U');
stream.emit('P');
stream.emit('T');
stream.emit('E');
stream.emit('D');
}
});
setTimeout(() => stream.block(() => stream.emit('BLOCKED EMISSION 1')));
expected.split('').forEach(x => setTimeout(() => stream.emit(x)));
input.split('').forEach(x => setTimeout(() => stream.emit(x)));
setTimeout(() => stream.block(() => stream.emit('BLOCKED EMISSION 2')));
for await (const letter of stream) {
emitted = emitted.concat(letter);
if (emitted === expected) break;
}
});
});

describe('#next()', () => {
it('should resolve an iterator result and respect blocking', async () => {
const stream = new Stream<unknown, string>({});
const expected = 'next';
let emitted = '';
setTimeout(() => stream.block(() => stream.emit('BLOCKED EMISSION 1')));
expected.split('').forEach(x => setTimeout(() => stream.emit(x)));
setTimeout(() => stream.block(() => stream.emit('BLOCKED EMISSION 2')));
for (let it = await stream.next(); !it.done; it = await stream.next()) {
emitted = emitted.concat(it.value);
if (emitted === expected) break;
}
});
});
});
});
7 changes: 4 additions & 3 deletions review/api/signaling.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
```ts

import { AttachedProperty } from '@lumino/properties';
import { PromiseDelegate } from '@lumino/coreutils';

// @public
Expand All @@ -14,7 +15,7 @@ export interface ISignal<T, U> {
}

// @public
export interface IStream<T, U> extends ISignal<T, U>, AsyncIterableIterator<U> {
export interface IStream<T, U> extends ISignal<T, U>, AsyncIterable<U> {
}

// @public
Expand Down Expand Up @@ -48,8 +49,8 @@ export type Slot<T, U> = (sender: T, args: U) => void;
export class Stream<T, U> extends Signal<T, U> implements IStream<T, U> {
[Symbol.asyncIterator](): AsyncIterableIterator<U>;
emit(args: U): void;
next(): Promise<IteratorResult<U>>;
protected pending: PromiseDelegate<U>;
// Warning: (ae-forgotten-export) The symbol "Private" needs to be exported by the entry point index.d.ts
protected pending: Private.Pending<U>;
}

// (No @packageDocumentation comment for this package)
Expand Down

0 comments on commit 2bc912c

Please sign in to comment.