Skip to content

Commit

Permalink
feat(ReadableStreams): Support for ReadableStreams e.g. `from(readabl…
Browse files Browse the repository at this point in the history
…eStream)`
  • Loading branch information
jayphelps committed Mar 21, 2021
1 parent 3330ff3 commit 0ba9e64
Show file tree
Hide file tree
Showing 10 changed files with 158 additions and 21 deletions.
2 changes: 1 addition & 1 deletion api_guard/dist/types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ export declare class Observable<T> implements Subscribable<T> {
static create: (...args: any[]) => any;
}

export declare type ObservableInput<T> = Observable<T> | InteropObservable<T> | AsyncIterable<T> | PromiseLike<T> | ArrayLike<T> | Iterable<T>;
export declare type ObservableInput<T> = Observable<T> | InteropObservable<T> | AsyncIterable<T> | PromiseLike<T> | ArrayLike<T> | Iterable<T> | ReadableStreamLike<T>;

export declare type ObservableInputTuple<T> = {
[K in keyof T]: ObservableInput<T[K]>;
Expand Down
20 changes: 18 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@
"typedoc": "^0.17.8",
"typescript": "~4.2.2",
"validate-commit-msg": "2.14.0",
"web-streams-polyfill": "^3.0.2",
"webpack": "^4.31.0"
},
"files": [
Expand Down
97 changes: 82 additions & 15 deletions spec/observables/from-spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { expect } from 'chai';
import { TestScheduler } from 'rxjs/testing';
import { asyncScheduler, of, from, Observer, observable, Subject, noop } from 'rxjs';
import { first, concatMap, delay, take, map, tap } from 'rxjs/operators';
import { first, concatMap, delay, take, tap } from 'rxjs/operators';
import { ReadableStream } from 'web-streams-polyfill';

// tslint:disable:no-any
declare const expectObservable: any;
Expand Down Expand Up @@ -108,7 +109,7 @@ describe('from', () => {
});
});


it('should finalize a generator', () => {
const results: any[] = [];

Expand Down Expand Up @@ -175,16 +176,24 @@ describe('from', () => {
});

// tslint:disable-next-line:no-any it's silly to define all of these types.
const sources: Array<{ name: string, value: any }> = [
{ name: 'observable', value: of('x') },
{ name: 'observable-like', value: fakervable('x') },
{ name: 'observable-like-array', value: fakeArrayObservable('x') },
{ name: 'array', value: ['x'] },
{ name: 'promise', value: Promise.resolve('x') },
{ name: 'iterator', value: fakerator('x') },
{ name: 'array-like', value: { [0]: 'x', length: 1 }},
{ name: 'string', value: 'x'},
{ name: 'arguments', value: getArguments('x') },
const sources: Array<{ name: string, createValue: () => any }> = [
{ name: 'observable', createValue: () => of('x') },
{ name: 'observable-like', createValue: () => fakervable('x') },
{ name: 'observable-like-array', createValue: () => fakeArrayObservable('x') },
{ name: 'array', createValue: () => ['x'] },
{ name: 'promise', createValue: () => Promise.resolve('x') },
{ name: 'iterator', createValue: () => fakerator('x') },
{ name: 'array-like', createValue: () => ({ [0]: 'x', length: 1 }) },
// ReadableStreams are not lazy, so we have to have this createValue() thunk
// so that each tests gets a new one.
{ name: 'readable-stream-like', createValue: () => new ReadableStream({
pull(controller) {
controller.enqueue('x');
controller.close();
},
})},
{ name: 'string', createValue: () => 'x'},
{ name: 'arguments', createValue: () => getArguments('x') },
];

if (Symbol && Symbol.asyncIterator) {
Expand All @@ -211,14 +220,14 @@ describe('from', () => {

sources.push({
name: 'async-iterator',
value: fakeAsyncIterator('x')
createValue: () => fakeAsyncIterator('x')
});
}

for (const source of sources) {
it(`should accept ${source.name}`, (done) => {
let nextInvoked = false;
from(source.value)
from(source.createValue())
.subscribe(
(x) => {
nextInvoked = true;
Expand All @@ -235,7 +244,7 @@ describe('from', () => {
});
it(`should accept ${source.name} and scheduler`, (done) => {
let nextInvoked = false;
from(source.value, asyncScheduler)
from(source.createValue(), asyncScheduler)
.subscribe(
(x) => {
nextInvoked = true;
Expand Down Expand Up @@ -321,4 +330,62 @@ describe('from', () => {

expect(finallyExecuted).to.be.true;
});

it('should support ReadableStream-like objects', (done) => {
const input = [0, 1, 2];
const output: number[] = [];

const readableStream = new ReadableStream({
pull(controller) {
if (input.length > 0) {
controller.enqueue(input.shift());

if (input.length === 0) {
controller.close();
}
}
},
});

from(readableStream).subscribe({
next: value => {
output.push(value);
expect(readableStream.locked).to.equal(true);
},
complete: () => {
expect(output).to.deep.equal([0, 1, 2]);
expect(readableStream.locked).to.equal(false);
done();
}
});
});

it('should lock and release ReadableStream-like objects', (done) => {
const input = [0, 1, 2];
const output: number[] = [];

const readableStream = new ReadableStream({
pull(controller) {
if (input.length > 0) {
controller.enqueue(input.shift());

if (input.length === 0) {
controller.close();
}
}
},
});

from(readableStream).subscribe({
next: value => {
output.push(value);
expect(readableStream.locked).to.equal(true);
},
complete: () => {
expect(output).to.deep.equal([0, 1, 2]);
expect(readableStream.locked).to.equal(false);
done();
}
});
});
});
8 changes: 8 additions & 0 deletions src/internal/observable/from.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { isInteropObservable } from '../util/isInteropObservable';
import { isAsyncIterable } from '../util/isAsyncIterable';
import { createInvalidObservableTypeError } from '../util/throwUnobservableError';
import { isIterable } from '../util/isIterable';
import { isReadableStreamLike, ReadableStreamLike, readableStreamLikeToAsyncGenerator } from '../util/isReadableStreamLike';

export function from<O extends ObservableInput<any>>(input: O): Observable<ObservedValueOf<O>>;
/** @deprecated The scheduler argument is deprecated, use scheduled. Details: https://rxjs.dev/deprecations/scheduler-argument */
Expand Down Expand Up @@ -141,6 +142,9 @@ export function innerFrom<T>(input: ObservableInput<T>): Observable<T> {
if (isIterable(input)) {
return fromIterable(input);
}
if (isReadableStreamLike(input)) {
return fromReadableStreamLike(input);
}
}

throw createInvalidObservableTypeError(input);
Expand Down Expand Up @@ -220,6 +224,10 @@ function fromAsyncIterable<T>(asyncIterable: AsyncIterable<T>) {
});
}

function fromReadableStreamLike<T>(readableStream: ReadableStreamLike<T>) {
return fromAsyncIterable(readableStreamLikeToAsyncGenerator(readableStream));
}

async function process<T>(asyncIterable: AsyncIterable<T>, subscriber: Subscriber<T>) {
for await (const value of asyncIterable) {
subscriber.next(value);
Expand Down
8 changes: 8 additions & 0 deletions src/internal/scheduled/scheduleReadableStreamLike.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { SchedulerLike } from '../types';
import { Observable } from '../Observable';
import { scheduleAsyncIterable } from './scheduleAsyncIterable';
import { ReadableStreamLike, readableStreamLikeToAsyncGenerator } from '../util/isReadableStreamLike';

export function scheduleReadableStreamLike<T>(input: ReadableStreamLike<T>, scheduler: SchedulerLike): Observable<T> {
return scheduleAsyncIterable(readableStreamLikeToAsyncGenerator(input), scheduler);
}
7 changes: 6 additions & 1 deletion src/internal/scheduled/scheduled.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@ import { scheduleObservable } from './scheduleObservable';
import { schedulePromise } from './schedulePromise';
import { scheduleArray } from './scheduleArray';
import { scheduleIterable } from './scheduleIterable';
import { scheduleAsyncIterable } from './scheduleAsyncIterable';
import { isInteropObservable } from '../util/isInteropObservable';
import { isPromise } from '../util/isPromise';
import { isArrayLike } from '../util/isArrayLike';
import { isIterable } from '../util/isIterable';
import { ObservableInput, SchedulerLike } from '../types';
import { Observable } from '../Observable';
import { scheduleAsyncIterable } from './scheduleAsyncIterable';
import { isAsyncIterable } from '../util/isAsyncIterable';
import { createInvalidObservableTypeError } from '../util/throwUnobservableError';
import { isReadableStreamLike } from '../util/isReadableStreamLike';
import { scheduleReadableStreamLike } from './scheduleReadableStreamLike';

/**
* Converts from a common {@link ObservableInput} type to an observable where subscription and emissions
Expand Down Expand Up @@ -40,6 +42,9 @@ export function scheduled<T>(input: ObservableInput<T>, scheduler: SchedulerLike
if (isIterable(input)) {
return scheduleIterable(input, scheduler);
}
if (isReadableStreamLike(input)) {
return scheduleReadableStreamLike(input, scheduler);
}
}
throw createInvalidObservableTypeError(input);
}
10 changes: 9 additions & 1 deletion src/internal/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import { Observable } from './Observable';
import { Subscription } from './Subscription';
import { ReadableStreamLike } from './util/isReadableStreamLike';

/**
* NOTE: This will add Symbol.observable globally for all TypeScript users,
Expand Down Expand Up @@ -91,7 +92,14 @@ export interface Subscribable<T> {
/**
* Valid types that can be converted to observables.
*/
export type ObservableInput<T> = Observable<T> | InteropObservable<T> | AsyncIterable<T> | PromiseLike<T> | ArrayLike<T> | Iterable<T>;
export type ObservableInput<T> =
| Observable<T>
| InteropObservable<T>
| AsyncIterable<T>
| PromiseLike<T>
| ArrayLike<T>
| Iterable<T>
| ReadableStreamLike<T>;

/** @deprecated use {@link InteropObservable } */
export type ObservableLike<T> = InteropObservable<T>;
Expand Down
24 changes: 24 additions & 0 deletions src/internal/util/isReadableStreamLike.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { isFunction } from './isFunction';

export interface ReadableStreamLike<T> {
getReader(): ReadableStreamDefaultReader<T>;
}

export async function* readableStreamLikeToAsyncGenerator<T>(readableStream: ReadableStreamLike<T>): AsyncGenerator<T> {
const reader = readableStream.getReader();
try {
while (true) {
const { value, done } = await reader.read();
if (done) {
return;
}
yield value!;
}
} finally {
reader.releaseLock();
}
}

export function isReadableStreamLike<T>(obj: any): obj is ReadableStreamLike<T> {
return isFunction(obj?.getReader);
}
2 changes: 1 addition & 1 deletion src/internal/util/throwUnobservableError.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ export function createInvalidObservableTypeError(input: any) {
return new TypeError(
`You provided ${
input !== null && typeof input === 'object' ? 'an invalid object' : `'${input}'`
} where a stream was expected. You can provide an Observable, Promise, Array, AsyncIterable, or Iterable.`
} where a stream was expected. You can provide an Observable, Promise, ReadableStream, Array, AsyncIterable, or Iterable.`
);
}

0 comments on commit 0ba9e64

Please sign in to comment.