Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(Observable): introduce Subscribable interface to use instead of Observable in input arguments #1475

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions src/Observable.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {PartialObserver} from './Observer';
import {PartialObserver, Observer} from './Observer';
import {Operator} from './Operator';
import {Subscriber} from './Subscriber';
import {Subscription} from './Subscription';
Expand All @@ -9,17 +9,21 @@ import {toSubscriber} from './util/toSubscriber';
import {IfObservable} from './observable/IfObservable';
import {ErrorObservable} from './observable/ErrorObservable';

export type ObservableOrPromise<T> = Observable<T> | Promise<T>;
export interface Subscribable<T> {
subscribe(observer: Observer<T>): Subscription;
}

export type SubscribableOrPromise<T> = Subscribable<T> | Promise<T>;
export type ArrayOrIterator<T> = Iterator<T> | ArrayLike<T>;
export type ObservableInput<T> = ObservableOrPromise<T> | ArrayOrIterator<T>;
export type ObservableInput<T> = SubscribableOrPromise<T> | ArrayOrIterator<T>;

/**
* A representation of any set of values over any amount of time. This the most basic building block
* of RxJS.
*
* @class Observable<T>
*/
export class Observable<T> {
export class Observable<T> implements Subscribable<T> {

public _isScalar: boolean = false;

Expand Down
12 changes: 6 additions & 6 deletions src/operator/debounce.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {Operator} from '../Operator';
import {Observable, ObservableOrPromise} from '../Observable';
import {Observable, SubscribableOrPromise} from '../Observable';
import {Subscriber} from '../Subscriber';
import {Subscription} from '../Subscription';

Expand All @@ -19,16 +19,16 @@ import {subscribeToResult} from '../util/subscribeToResult';
* @method debounce
* @owner Observable
*/
export function debounce<T>(durationSelector: (value: T) => ObservableOrPromise<number>): Observable<T> {
export function debounce<T>(durationSelector: (value: T) => SubscribableOrPromise<number>): Observable<T> {
return this.lift(new DebounceOperator(durationSelector));
}

export interface DebounceSignature<T> {
(durationSelector: (value: T) => ObservableOrPromise<number>): Observable<T>;
(durationSelector: (value: T) => SubscribableOrPromise<number>): Observable<T>;
}

class DebounceOperator<T> implements Operator<T, T> {
constructor(private durationSelector: (value: T) => ObservableOrPromise<number>) {
constructor(private durationSelector: (value: T) => SubscribableOrPromise<number>) {
}

call(subscriber: Subscriber<T>): Subscriber<T> {
Expand All @@ -42,7 +42,7 @@ class DebounceSubscriber<T, R> extends OuterSubscriber<T, R> {
private durationSubscription: Subscription = null;

constructor(destination: Subscriber<R>,
private durationSelector: (value: T) => ObservableOrPromise<number>) {
private durationSelector: (value: T) => SubscribableOrPromise<number>) {
super(destination);
}

Expand All @@ -63,7 +63,7 @@ class DebounceSubscriber<T, R> extends OuterSubscriber<T, R> {
this.destination.complete();
}

private _tryNext(value: T, duration: ObservableOrPromise<number>): void {
private _tryNext(value: T, duration: SubscribableOrPromise<number>): void {
let subscription = this.durationSubscription;
this.value = value;
this.hasValue = true;
Expand Down
10 changes: 5 additions & 5 deletions src/operator/inspect.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {Operator} from '../Operator';
import {Subscriber} from '../Subscriber';
import {Observable, ObservableOrPromise} from '../Observable';
import {Observable, SubscribableOrPromise} from '../Observable';
import {Subscription} from '../Subscription';

import {tryCatch} from '../util/tryCatch';
Expand All @@ -14,16 +14,16 @@ import {subscribeToResult} from '../util/subscribeToResult';
* @method inspect
* @owner Observable
*/
export function inspect<T>(durationSelector: (value: T) => ObservableOrPromise<any>): Observable<T> {
export function inspect<T>(durationSelector: (value: T) => SubscribableOrPromise<any>): Observable<T> {
return this.lift(new InspectOperator(durationSelector));
}

export interface InspectSignature<T> {
(durationSelector: (value: T) => ObservableOrPromise<any>): Observable<T>;
(durationSelector: (value: T) => SubscribableOrPromise<any>): Observable<T>;
}

class InspectOperator<T> implements Operator<T, T> {
constructor(private durationSelector: (value: T) => ObservableOrPromise<any>) {
constructor(private durationSelector: (value: T) => SubscribableOrPromise<any>) {
}

call(subscriber: Subscriber<T>): Subscriber<T> {
Expand All @@ -38,7 +38,7 @@ class InspectSubscriber<T, R> extends OuterSubscriber<T, R> {
private throttled: Subscription;

constructor(destination: Subscriber<T>,
private durationSelector: (value: T) => ObservableOrPromise<any>) {
private durationSelector: (value: T) => SubscribableOrPromise<any>) {
super(destination);
}

Expand Down
6 changes: 3 additions & 3 deletions src/operator/mergeMapTo.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {Observable, ObservableInput, ObservableOrPromise} from '../Observable';
import {Observable, ObservableInput, SubscribableOrPromise} from '../Observable';
import {Operator} from '../Operator';
import {PartialObserver} from '../Observer';
import {Subscriber} from '../Subscriber';
Expand Down Expand Up @@ -31,7 +31,7 @@ export interface MergeMapToSignature<T> {
// TODO: Figure out correct signature here: an Operator<Observable<T>, R>
// needs to implement call(observer: Subscriber<R>): Subscriber<Observable<T>>
export class MergeMapToOperator<T, I, R> implements Operator<Observable<T>, R> {
constructor(private ish: ObservableOrPromise<I>,
constructor(private ish: SubscribableOrPromise<I>,
private resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R,
private concurrent: number = Number.POSITIVE_INFINITY) {
}
Expand All @@ -48,7 +48,7 @@ export class MergeMapToSubscriber<T, I, R> extends OuterSubscriber<T, I> {
protected index: number = 0;

constructor(destination: Subscriber<R>,
private ish: ObservableOrPromise<I>,
private ish: SubscribableOrPromise<I>,
private resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R,
private concurrent: number = Number.POSITIVE_INFINITY) {
super(destination);
Expand Down
14 changes: 7 additions & 7 deletions src/operator/throttle.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {Operator} from '../Operator';
import {Observable, ObservableOrPromise} from '../Observable';
import {Observable, SubscribableOrPromise} from '../Observable';
import {Subscriber} from '../Subscriber';
import {Subscription} from '../Subscription';

Expand All @@ -13,16 +13,16 @@ import {subscribeToResult} from '../util/subscribeToResult';
* @method throttle
* @owner Observable
*/
export function throttle<T>(durationSelector: (value: T) => ObservableOrPromise<number>): Observable<T> {
export function throttle<T>(durationSelector: (value: T) => SubscribableOrPromise<number>): Observable<T> {
return this.lift(new ThrottleOperator(durationSelector));
}

export interface ThrottleSignature<T> {
(durationSelector: (value: T) => ObservableOrPromise<number>): Observable<T>;
(durationSelector: (value: T) => SubscribableOrPromise<number>): Observable<T>;
}

class ThrottleOperator<T> implements Operator<T, T> {
constructor(private durationSelector: (value: T) => ObservableOrPromise<number>) {
constructor(private durationSelector: (value: T) => SubscribableOrPromise<number>) {
}

call(subscriber: Subscriber<T>): Subscriber<T> {
Expand All @@ -34,7 +34,7 @@ class ThrottleSubscriber<T, R> extends OuterSubscriber<T, R> {
private throttled: Subscription;

constructor(protected destination: Subscriber<T>,
private durationSelector: (value: T) => ObservableOrPromise<number>) {
private durationSelector: (value: T) => SubscribableOrPromise<number>) {
super(destination);
}

Expand All @@ -45,7 +45,7 @@ class ThrottleSubscriber<T, R> extends OuterSubscriber<T, R> {
}

private tryDurationSelector(value: T): void {
let duration: ObservableOrPromise<number> = null;
let duration: SubscribableOrPromise<number> = null;
try {
duration = this.durationSelector(value);
} catch (err) {
Expand All @@ -55,7 +55,7 @@ class ThrottleSubscriber<T, R> extends OuterSubscriber<T, R> {
this.emitAndThrottle(value, duration);
}

private emitAndThrottle(value: T, duration: ObservableOrPromise<number>) {
private emitAndThrottle(value: T, duration: SubscribableOrPromise<number>) {
this.add(this.throttled = subscribeToResult(this, duration));
this.destination.next(value);
}
Expand Down