From 6d087059043d83374c5b1259e332c2a110998fa2 Mon Sep 17 00:00:00 2001 From: OJ Kwon Date: Mon, 8 Feb 2016 12:03:08 -0800 Subject: [PATCH] fix(groupBy): does not emit on unsubscribed group --- spec/operators/groupBy-spec.js | 2 +- src/operator/groupBy.ts | 35 ++++++++++++++++++++++++++-------- 2 files changed, 28 insertions(+), 9 deletions(-) diff --git a/spec/operators/groupBy-spec.js b/spec/operators/groupBy-spec.js index 87cddb74f5..0b350fbaed 100644 --- a/spec/operators/groupBy-spec.js +++ b/spec/operators/groupBy-spec.js @@ -1,4 +1,4 @@ -/* globals describe, it, expect, hot, cold, expectObservable, expectSubscriptions */ +/* globals describe, it, expect, hot, cold, expectObservable, expectSubscriptions, rxTestScheduler */ var Rx = require('../../dist/cjs/Rx.KitchenSink'); var Observable = Rx.Observable; var GroupedObservable = require('../../dist/cjs/operator/groupBy').GroupedObservable; diff --git a/src/operator/groupBy.ts b/src/operator/groupBy.ts index 6cdf26ea48..d41617c6cd 100644 --- a/src/operator/groupBy.ts +++ b/src/operator/groupBy.ts @@ -83,7 +83,7 @@ class GroupBySubscriber extends Subscriber implements RefCountSubscr if (!group) { groups.set(key, group = new Subject()); - let groupedObservable = new GroupedObservable(key, group, this); + const groupedObservable = new GroupedObservable(key, group, this); if (this.durationSelector) { this._selectDuration(key, group); @@ -95,7 +95,7 @@ class GroupBySubscriber extends Subscriber implements RefCountSubscr if (this.elementSelector) { this._selectElement(value, group); } else { - group.next(value); + this.tryGroupNext(value, group); } } @@ -107,7 +107,7 @@ class GroupBySubscriber extends Subscriber implements RefCountSubscr this.error(err); return; } - group.next(result); + this.tryGroupNext(result, group); } private _selectDuration(key: K, group: any) { @@ -121,6 +121,12 @@ class GroupBySubscriber extends Subscriber implements RefCountSubscr this.add(duration.subscribe(new GroupDurationSubscriber(key, group, this))); } + private tryGroupNext(value: T|R, group: Subject): void { + if (!group.isUnsubscribed) { + group.next(value); + } + } + protected _error(err: any): void { const groups = this.groups; if (groups) { @@ -165,17 +171,30 @@ class GroupDurationSubscriber extends Subscriber { } protected _next(value: T): void { - this.group.complete(); - this.parent.removeGroup(this.key); + this.tryComplete(); } protected _error(err: any): void { - this.group.error(err); - this.parent.removeGroup(this.key); + this.tryError(err); } protected _complete(): void { - this.group.complete(); + this.tryComplete(); + } + + private tryError(err: any): void { + const group = this.group; + if (!group.isUnsubscribed) { + group.error(err); + } + this.parent.removeGroup(this.key); + } + + private tryComplete(): void { + const group = this.group; + if (!group.isUnsubscribed) { + group.complete(); + } this.parent.removeGroup(this.key); } }