Skip to content

Commit

Permalink
Merge branch 'master' into window-time-with-max-size
Browse files Browse the repository at this point in the history
  • Loading branch information
benlesh authored Dec 12, 2016
2 parents bb52bbe + b5a3413 commit 1c9f746
Show file tree
Hide file tree
Showing 82 changed files with 101 additions and 86 deletions.
2 changes: 1 addition & 1 deletion src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ export class Observable<T> implements Subscribable<T> {
const sink = toSubscriber(observerOrNext, error, complete);

if (operator) {
operator.call(sink, this);
operator.call(sink, this.source);
} else {
sink.add(this._subscribe(sink));
}
Expand Down
2 changes: 1 addition & 1 deletion src/observable/ConnectableObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class RefCountOperator<T> implements Operator<T, T> {
(<any> connectable)._refCount++;

const refCounter = new RefCountSubscriber(subscriber, connectable);
const subscription = source._subscribe(refCounter);
const subscription = source.subscribe(refCounter);

if (!refCounter.closed) {
(<any> refCounter).connection = connectable.connect();
Expand Down
5 changes: 3 additions & 2 deletions src/observable/SubscribeOnObservable.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Action } from '../scheduler/Action';
import { Scheduler } from '../Scheduler';
import { Subscriber } from '../Subscriber';
import { Subscription } from '../Subscription';
Expand All @@ -20,9 +21,9 @@ export class SubscribeOnObservable<T> extends Observable<T> {
return new SubscribeOnObservable(source, delay, scheduler);
}

static dispatch<T>(arg: DispatchArg<T>): Subscription {
static dispatch<T>(this: Action<T>, arg: DispatchArg<T>): Subscription {
const { source, subscriber } = arg;
return source.subscribe(subscriber);
return this.add(source.subscribe(subscriber));
}

constructor(public source: Observable<T>,
Expand Down
2 changes: 1 addition & 1 deletion src/operator/audit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class AuditOperator<T> implements Operator<T, T> {
}

call(subscriber: Subscriber<T>, source: any): TeardownLogic {
return source._subscribe(new AuditSubscriber<T, T>(subscriber, this.durationSelector));
return source.subscribe(new AuditSubscriber<T, T>(subscriber, this.durationSelector));
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/operator/auditTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class AuditTimeOperator<T> implements Operator<T, T> {
}

call(subscriber: Subscriber<T>, source: any): TeardownLogic {
return source._subscribe(new AuditTimeSubscriber(subscriber, this.duration, this.scheduler));
return source.subscribe(new AuditTimeSubscriber(subscriber, this.duration, this.scheduler));
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/operator/buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class BufferOperator<T> implements Operator<T, T[]> {
}

call(subscriber: Subscriber<T[]>, source: any): any {
return source._subscribe(new BufferSubscriber(subscriber, this.closingNotifier));
return source.subscribe(new BufferSubscriber(subscriber, this.closingNotifier));
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/operator/bufferCount.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class BufferCountOperator<T> implements Operator<T, T[]> {
}

call(subscriber: Subscriber<T[]>, source: any): any {
return source._subscribe(new BufferCountSubscriber(subscriber, this.bufferSize, this.startBufferEvery));
return source.subscribe(new BufferCountSubscriber(subscriber, this.bufferSize, this.startBufferEvery));
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/operator/bufferTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class BufferTimeOperator<T> implements Operator<T, T[]> {
}

call(subscriber: Subscriber<T[]>, source: any): any {
return source._subscribe(new BufferTimeSubscriber(
return source.subscribe(new BufferTimeSubscriber(
subscriber, this.bufferTimeSpan, this.bufferCreationInterval, this.maxBufferSize, this.scheduler
));
}
Expand Down
4 changes: 2 additions & 2 deletions src/operator/bufferToggle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class BufferToggleOperator<T, O> implements Operator<T, T[]> {
}

call(subscriber: Subscriber<T[]>, source: any): any {
return source._subscribe(new BufferToggleSubscriber(subscriber, this.openings, this.closingSelector));
return source.subscribe(new BufferToggleSubscriber(subscriber, this.openings, this.closingSelector));
}
}

Expand Down Expand Up @@ -167,4 +167,4 @@ class BufferToggleSubscriber<T, O> extends OuterSubscriber<T, O> {
subscription.add(innerSubscription);
}
}
}
}
2 changes: 1 addition & 1 deletion src/operator/bufferWhen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class BufferWhenOperator<T> implements Operator<T, T[]> {
}

call(subscriber: Subscriber<T[]>, source: any): any {
return source._subscribe(new BufferWhenSubscriber(subscriber, this.closingSelector));
return source.subscribe(new BufferWhenSubscriber(subscriber, this.closingSelector));
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/operator/catch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class CatchOperator<T, R> implements Operator<T, R> {
}

call(subscriber: Subscriber<R>, source: any): any {
return source._subscribe(new CatchSubscriber(subscriber, this.selector, this.caught));
return source.subscribe(new CatchSubscriber(subscriber, this.selector, this.caught));
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/operator/combineLatest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ export class CombineLatestOperator<T, R> implements Operator<T, R> {
}

call(subscriber: Subscriber<R>, source: any): any {
return source._subscribe(new CombineLatestSubscriber(subscriber, this.project));
return source.subscribe(new CombineLatestSubscriber(subscriber, this.project));
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/operator/count.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class CountOperator<T> implements Operator<T, number> {
}

call(subscriber: Subscriber<number>, source: any): any {
return source._subscribe(new CountSubscriber(subscriber, this.predicate, this.source));
return source.subscribe(new CountSubscriber(subscriber, this.predicate, this.source));
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/operator/debounce.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class DebounceOperator<T> implements Operator<T, T> {
}

call(subscriber: Subscriber<T>, source: any): TeardownLogic {
return source._subscribe(new DebounceSubscriber(subscriber, this.durationSelector));
return source.subscribe(new DebounceSubscriber(subscriber, this.durationSelector));
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/operator/debounceTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class DebounceTimeOperator<T> implements Operator<T, T> {
}

call(subscriber: Subscriber<T>, source: any): TeardownLogic {
return source._subscribe(new DebounceTimeSubscriber(subscriber, this.dueTime, this.scheduler));
return source.subscribe(new DebounceTimeSubscriber(subscriber, this.dueTime, this.scheduler));
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/operator/defaultIfEmpty.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class DefaultIfEmptyOperator<T, R> implements Operator<T, T | R> {
}

call(subscriber: Subscriber<T | R>, source: any): any {
return source._subscribe(new DefaultIfEmptySubscriber(subscriber, this.defaultValue));
return source.subscribe(new DefaultIfEmptySubscriber(subscriber, this.defaultValue));
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/operator/delay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class DelayOperator<T> implements Operator<T, T> {
}

call(subscriber: Subscriber<T>, source: any): TeardownLogic {
return source._subscribe(new DelaySubscriber(subscriber, this.delay, this.scheduler));
return source.subscribe(new DelaySubscriber(subscriber, this.delay, this.scheduler));
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/operator/delayWhen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class DelayWhenOperator<T> implements Operator<T, T> {
}

call(subscriber: Subscriber<T>, source: any): TeardownLogic {
return source._subscribe(new DelayWhenSubscriber(subscriber, this.delayDurationSelector));
return source.subscribe(new DelayWhenSubscriber(subscriber, this.delayDurationSelector));
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/operator/dematerialize.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export function dematerialize<T>(this: Observable<T>): Observable<any> {

class DeMaterializeOperator<T extends Notification<any>, R> implements Operator<T, R> {
call(subscriber: Subscriber<any>, source: any): any {
return source._subscribe(new DeMaterializeSubscriber(subscriber));
return source.subscribe(new DeMaterializeSubscriber(subscriber));
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/operator/distinct.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class DistinctOperator<T, K> implements Operator<T, T> {
}

call(subscriber: Subscriber<T>, source: any): TeardownLogic {
return source._subscribe(new DistinctSubscriber(subscriber, this.keySelector, this.flushes));
return source.subscribe(new DistinctSubscriber(subscriber, this.keySelector, this.flushes));
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/operator/distinctUntilChanged.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class DistinctUntilChangedOperator<T, K> implements Operator<T, T> {
}

call(subscriber: Subscriber<T>, source: any): TeardownLogic {
return source._subscribe(new DistinctUntilChangedSubscriber(subscriber, this.compare, this.keySelector));
return source.subscribe(new DistinctUntilChangedSubscriber(subscriber, this.compare, this.keySelector));
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/operator/do.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class DoOperator<T> implements Operator<T, T> {
private complete?: () => void) {
}
call(subscriber: Subscriber<T>, source: any): TeardownLogic {
return source._subscribe(new DoSubscriber(subscriber, this.nextOrObserver, this.error, this.complete));
return source.subscribe(new DoSubscriber(subscriber, this.nextOrObserver, this.error, this.complete));
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/operator/elementAt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class ElementAtOperator<T> implements Operator<T, T> {
}

call(subscriber: Subscriber<T>, source: any): TeardownLogic {
return source._subscribe(new ElementAtSubscriber(subscriber, this.index, this.defaultValue));
return source.subscribe(new ElementAtSubscriber(subscriber, this.index, this.defaultValue));
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/operator/every.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class EveryOperator<T> implements Operator<T, boolean> {
}

call(observer: Subscriber<boolean>, source: any): any {
return source._subscribe(new EverySubscriber(observer, this.predicate, this.thisArg, this.source));
return source.subscribe(new EverySubscriber(observer, this.predicate, this.thisArg, this.source));
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/operator/exhaust.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export function exhaust<T>(this: Observable<T>): Observable<T> {

class SwitchFirstOperator<T> implements Operator<T, T> {
call(subscriber: Subscriber<T>, source: any): TeardownLogic {
return source._subscribe(new SwitchFirstSubscriber(subscriber));
return source.subscribe(new SwitchFirstSubscriber(subscriber));
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/operator/exhaustMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class SwitchFirstMapOperator<T, I, R> implements Operator<T, R> {
}

call(subscriber: Subscriber<R>, source: any): any {
return source._subscribe(new SwitchFirstMapSubscriber(subscriber, this.project, this.resultSelector));
return source.subscribe(new SwitchFirstMapSubscriber(subscriber, this.project, this.resultSelector));
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/operator/expand.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ export class ExpandOperator<T, R> implements Operator<T, R> {
}

call(subscriber: Subscriber<R>, source: any): any {
return source._subscribe(new ExpandSubscriber(subscriber, this.project, this.concurrent, this.scheduler));
return source.subscribe(new ExpandSubscriber(subscriber, this.project, this.concurrent, this.scheduler));
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/operator/filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class FilterOperator<T> implements Operator<T, T> {
}

call(subscriber: Subscriber<T>, source: any): TeardownLogic {
return source._subscribe(new FilterSubscriber(subscriber, this.predicate, this.thisArg));
return source.subscribe(new FilterSubscriber(subscriber, this.predicate, this.thisArg));
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/operator/finally.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class FinallyOperator<T> implements Operator<T, T> {
}

call(subscriber: Subscriber<T>, source: any): TeardownLogic {
return source._subscribe(new FinallySubscriber(subscriber, this.callback));
return source.subscribe(new FinallySubscriber(subscriber, this.callback));
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/operator/find.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ export class FindValueOperator<T> implements Operator<T, T> {
}

call(observer: Subscriber<T>, source: any): any {
return source._subscribe(new FindValueSubscriber(observer, this.predicate, this.source, this.yieldIndex, this.thisArg));
return source.subscribe(new FindValueSubscriber(observer, this.predicate, this.source, this.yieldIndex, this.thisArg));
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/operator/first.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class FirstOperator<T, R> implements Operator<T, R> {
}

call(observer: Subscriber<R>, source: any): any {
return source._subscribe(new FirstSubscriber(observer, this.predicate, this.resultSelector, this.defaultValue, this.source));
return source.subscribe(new FirstSubscriber(observer, this.predicate, this.resultSelector, this.defaultValue, this.source));
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/operator/groupBy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class GroupByOperator<T, K, R> implements Operator<T, GroupedObservable<K, R>> {
}

call(subscriber: Subscriber<GroupedObservable<K, R>>, source: any): any {
return source._subscribe(new GroupBySubscriber(
return source.subscribe(new GroupBySubscriber(
subscriber, this.keySelector, this.elementSelector, this.durationSelector, this.subjectSelector
));
}
Expand Down
2 changes: 1 addition & 1 deletion src/operator/ignoreElements.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export function ignoreElements<T>(this: Observable<T>): Observable<T> {

class IgnoreElementsOperator<T, R> implements Operator<T, R> {
call(subscriber: Subscriber<R>, source: any): any {
return source._subscribe(new IgnoreElementsSubscriber(subscriber));
return source.subscribe(new IgnoreElementsSubscriber(subscriber));
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/operator/isEmpty.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export function isEmpty<T>(this: Observable<T>): Observable<boolean> {

class IsEmptyOperator implements Operator<any, boolean> {
call (observer: Subscriber<boolean>, source: any): any {
return source._subscribe(new IsEmptySubscriber(observer));
return source.subscribe(new IsEmptySubscriber(observer));
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/operator/last.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class LastOperator<T, R> implements Operator<T, R> {
}

call(observer: Subscriber<R>, source: any): any {
return source._subscribe(new LastSubscriber(observer, this.predicate, this.resultSelector, this.defaultValue, this.source));
return source.subscribe(new LastSubscriber(observer, this.predicate, this.resultSelector, this.defaultValue, this.source));
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/operator/map.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export class MapOperator<T, R> implements Operator<T, R> {
}

call(subscriber: Subscriber<R>, source: any): any {
return source._subscribe(new MapSubscriber(subscriber, this.project, this.thisArg));
return source.subscribe(new MapSubscriber(subscriber, this.project, this.thisArg));
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/operator/mapTo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class MapToOperator<T, R> implements Operator<T, R> {
}

call(subscriber: Subscriber<R>, source: any): any {
return source._subscribe(new MapToSubscriber(subscriber, this.value));
return source.subscribe(new MapToSubscriber(subscriber, this.value));
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/operator/materialize.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ export function materialize<T>(this: Observable<T>): Observable<Notification<T>>

class MaterializeOperator<T> implements Operator<T, Notification<T>> {
call(subscriber: Subscriber<Notification<T>>, source: any): any {
return source._subscribe(new MaterializeSubscriber(subscriber));
return source.subscribe(new MaterializeSubscriber(subscriber));
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/operator/mergeAll.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ export class MergeAllOperator<T> implements Operator<Observable<T>, T> {
}

call(observer: Observer<T>, source: any): any {
return source._subscribe(new MergeAllSubscriber(observer, this.concurrent));
return source.subscribe(new MergeAllSubscriber(observer, this.concurrent));
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/operator/mergeMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ export class MergeMapOperator<T, I, R> implements Operator<T, I> {
}

call(observer: Subscriber<I>, source: any): any {
return source._subscribe(new MergeMapSubscriber(
return source.subscribe(new MergeMapSubscriber(
observer, this.project, this.resultSelector, this.concurrent
));
}
Expand Down
2 changes: 1 addition & 1 deletion src/operator/mergeMapTo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ export class MergeMapToOperator<T, I, R> implements Operator<Observable<T>, R> {
}

call(observer: Subscriber<R>, source: any): any {
return source._subscribe(new MergeMapToSubscriber(observer, this.ish, this.resultSelector, this.concurrent));
return source.subscribe(new MergeMapToSubscriber(observer, this.ish, this.resultSelector, this.concurrent));
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/operator/mergeScan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export class MergeScanOperator<T, R> implements Operator<T, R> {
}

call(subscriber: Subscriber<R>, source: any): any {
return source._subscribe(new MergeScanSubscriber(
return source.subscribe(new MergeScanSubscriber(
subscriber, this.project, this.seed, this.concurrent
));
}
Expand Down
Loading

0 comments on commit 1c9f746

Please sign in to comment.