diff --git a/src/operators/windowCount.ts b/src/operators/windowCount.ts index bd8576683e..ef5d6986b3 100644 --- a/src/operators/windowCount.ts +++ b/src/operators/windowCount.ts @@ -10,13 +10,15 @@ import tryCatch from '../util/tryCatch'; import {errorObject} from '../util/errorObject'; import bindCallback from '../util/bindCallback'; -export default function windowCount(windowSize: number, startWindowEvery: number = 0): Observable> { +export default function windowCount(windowSize: number, + startWindowEvery: number = 0): Observable> { return this.lift(new WindowCountOperator(windowSize, startWindowEvery)); } class WindowCountOperator implements Operator { - constructor(private windowSize: number, private startWindowEvery: number) { + constructor(private windowSize: number, + private startWindowEvery: number) { } call(subscriber: Subscriber): Subscriber { @@ -24,54 +26,41 @@ class WindowCountOperator implements Operator { } } -interface WindowObject { - count: number; - notified: boolean; - window: Subject; -} - class WindowCountSubscriber extends Subscriber { - private windows: WindowObject[] = [ - { count: 0, notified : false, window : new Subject() } - ]; + private windows: Subject[] = [ new Subject() ]; private count: number = 0; - constructor(destination: Subscriber, private windowSize: number, private startWindowEvery: number) { + constructor(destination: Subscriber, + private windowSize: number, + private startWindowEvery: number) { super(destination); + destination.next(this.windows[0]); } _next(value: T) { - const count = (this.count += 1); const startWindowEvery = (this.startWindowEvery > 0) ? this.startWindowEvery : this.windowSize; const windowSize = this.windowSize; const windows = this.windows; const len = windows.length; - if (count % startWindowEvery === 0) { - let window = new Subject(); - windows.push({ count: 0, notified : false, window : window }); - } - for (let i = 0; i < len; i++) { - let w = windows[i]; - const window = w.window; - - if (!w.notified) { - w.notified = true; - this.destination.next(window); - } - - window.next(value); - if (windowSize === (w.count += 1)) { - window.complete(); - } + windows[i].next(value); + } + const c = this.count - windowSize + 1; + if (c >= 0 && c % startWindowEvery === 0) { + windows.shift().complete(); + } + if (++this.count % startWindowEvery === 0) { + let window = new Subject(); + windows.push(window); + this.destination.next(window); } } _error(err: any) { const windows = this.windows; while (windows.length > 0) { - windows.shift().window.error(err); + windows.shift().error(err); } this.destination.error(err); } @@ -79,7 +68,7 @@ class WindowCountSubscriber extends Subscriber { _complete() { const windows = this.windows; while (windows.length > 0) { - windows.shift().window.complete(); + windows.shift().complete(); } this.destination.complete(); }