diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorPivot.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorPivot.java index d1b3b0b3b4..89d2e35c92 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OperatorPivot.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OperatorPivot.java @@ -25,10 +25,12 @@ import rx.Observable.OnSubscribe; import rx.Observable.Operator; +import rx.Observer; import rx.Subscriber; import rx.Subscription; import rx.functions.Action0; import rx.observables.GroupedObservable; +import rx.observers.SerializedObserver; import rx.subscriptions.Subscriptions; public final class OperatorPivot implements Operator>, GroupedObservable>> { @@ -268,7 +270,7 @@ private Outer getOrCreateOuter(final AtomicReference state, fi return null; } - Outer newOuter = Outer. create(this, state, key2); + Outer newOuter = Outer. create(key2); Outer existing = outerSubjects.putIfAbsent(key2, newOuter); if (existing != null) { // we lost the race to create so return the one that did @@ -283,11 +285,12 @@ private Outer getOrCreateOuter(final AtomicReference state, fi private static final class Inner { - private final BufferUntilSubscriber subscriber; + private final Observer subscriber; private final GroupedObservable group; private Inner(BufferUntilSubscriber subscriber, GroupedObservable group) { - this.subscriber = subscriber; + // since multiple threads are being pivoted we need to make sure this is serialized + this.subscriber = new SerializedObserver(subscriber); this.group = group; } @@ -335,15 +338,16 @@ public void onNext(T t) { private static final class Outer { - private final BufferUntilSubscriber> subscriber; + private final Observer> subscriber; private final GroupedObservable> group; private Outer(BufferUntilSubscriber> subscriber, GroupedObservable> group) { - this.subscriber = subscriber; + // since multiple threads are being pivoted we need to make sure this is serialized + this.subscriber = new SerializedObserver>(subscriber); this.group = group; } - public static Outer create(final GroupState groups, final AtomicReference state, final K2 key2) { + public static Outer create(final K2 key2) { final BufferUntilSubscriber> subject = BufferUntilSubscriber.create(); GroupedObservable> group = new GroupedObservable>(key2, new OnSubscribe>() { diff --git a/rxjava-core/src/test/java/rx/internal/operators/OperatorPivotTest.java b/rxjava-core/src/test/java/rx/internal/operators/OperatorPivotTest.java index f35f3912be..14d06d5594 100644 --- a/rxjava-core/src/test/java/rx/internal/operators/OperatorPivotTest.java +++ b/rxjava-core/src/test/java/rx/internal/operators/OperatorPivotTest.java @@ -13,23 +13,26 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package rx.internal.operators; import java.util.Random; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Ignore; import org.junit.Test; import rx.Observable; import rx.Observable.OnSubscribe; import rx.Observer; +import rx.Scheduler; import rx.Subscriber; import rx.functions.Func1; import rx.observables.GroupedObservable; @@ -38,10 +41,12 @@ public class OperatorPivotTest { - @Test + @Test(timeout=10000) public void testPivotEvenAndOdd() throws InterruptedException { - Observable> o1 = Observable.range(1, 10).groupBy(modKeySelector).subscribeOn(Schedulers.newThread()); - Observable> o2 = Observable.range(11, 10).groupBy(modKeySelector).subscribeOn(Schedulers.newThread()); + for(int i=0; i<1000; i++) { + System.out.println("------------------------------------------ testPivotEvenAndOdd -------------------------------------------"); + Observable> o1 = Observable.range(1, 10).groupBy(modKeySelector).subscribeOn(Schedulers.computation()); + Observable> o2 = Observable.range(11, 10).groupBy(modKeySelector).subscribeOn(Schedulers.computation()); Observable>> groups = Observable.from(GroupedObservable.from("o1", o1), GroupedObservable.from("o2", o2)); Observable>> pivoted = Observable.pivot(groups); @@ -53,10 +58,12 @@ public void testPivotEvenAndOdd() throws InterruptedException { @Override public Observable call(final GroupedObservable> outerGroup) { + System.out.println("Outer Group: " + outerGroup.getKey()); return outerGroup.flatMap(new Func1, Observable>() { @Override public Observable call(final GroupedObservable innerGroup) { + System.out.println("Inner Group: " + innerGroup.getKey()); return innerGroup.map(new Func1() { @Override @@ -94,14 +101,16 @@ public void onNext(String t) { }); - if (!latch.await(800, TimeUnit.MILLISECONDS)) { + if (!latch.await(20000000, TimeUnit.MILLISECONDS)) { System.out.println("xxxxxxxxxxxxxxxxxx> TIMED OUT