diff --git a/src/main/java/io/reactivex/internal/operators/parallel/ParallelRunOn.java b/src/main/java/io/reactivex/internal/operators/parallel/ParallelRunOn.java index 3281a20b9d..bf52698359 100644 --- a/src/main/java/io/reactivex/internal/operators/parallel/ParallelRunOn.java +++ b/src/main/java/io/reactivex/internal/operators/parallel/ParallelRunOn.java @@ -22,6 +22,8 @@ import io.reactivex.exceptions.MissingBackpressureException; import io.reactivex.internal.fuseable.ConditionalSubscriber; import io.reactivex.internal.queue.SpscArrayQueue; +import io.reactivex.internal.schedulers.SchedulerMultiWorkerSupport; +import io.reactivex.internal.schedulers.SchedulerMultiWorkerSupport.WorkerCallback; import io.reactivex.internal.subscriptions.SubscriptionHelper; import io.reactivex.internal.util.BackpressureHelper; import io.reactivex.parallel.ParallelFlowable; @@ -47,7 +49,7 @@ public ParallelRunOn(ParallelFlowable parent, } @Override - public void subscribe(Subscriber[] subscribers) { + public void subscribe(final Subscriber[] subscribers) { if (!validate(subscribers)) { return; } @@ -55,26 +57,50 @@ public void subscribe(Subscriber[] subscribers) { int n = subscribers.length; @SuppressWarnings("unchecked") - Subscriber[] parents = new Subscriber[n]; + final Subscriber[] parents = new Subscriber[n]; + + if (scheduler instanceof SchedulerMultiWorkerSupport) { + SchedulerMultiWorkerSupport multiworker = (SchedulerMultiWorkerSupport) scheduler; + multiworker.createWorkers(n, new MultiWorkerCallback(subscribers, parents)); + } else { + for (int i = 0; i < n; i++) { + createSubscriber(i, subscribers, parents, scheduler.createWorker()); + } + } + source.subscribe(parents); + } - int prefetch = this.prefetch; + void createSubscriber(int i, Subscriber[] subscribers, + Subscriber[] parents, Scheduler.Worker worker) { - for (int i = 0; i < n; i++) { - Subscriber a = subscribers[i]; + Subscriber a = subscribers[i]; - Worker w = scheduler.createWorker(); - SpscArrayQueue q = new SpscArrayQueue(prefetch); + SpscArrayQueue q = new SpscArrayQueue(prefetch); - if (a instanceof ConditionalSubscriber) { - parents[i] = new RunOnConditionalSubscriber((ConditionalSubscriber)a, prefetch, q, w); - } else { - parents[i] = new RunOnSubscriber(a, prefetch, q, w); - } + if (a instanceof ConditionalSubscriber) { + parents[i] = new RunOnConditionalSubscriber((ConditionalSubscriber)a, prefetch, q, worker); + } else { + parents[i] = new RunOnSubscriber(a, prefetch, q, worker); } - - source.subscribe(parents); } + final class MultiWorkerCallback implements WorkerCallback { + + final Subscriber[] subscribers; + + final Subscriber[] parents; + + MultiWorkerCallback(Subscriber[] subscribers, + Subscriber[] parents) { + this.subscribers = subscribers; + this.parents = parents; + } + + @Override + public void onWorker(int i, Worker w) { + createSubscriber(i, subscribers, parents, w); + } + } @Override public int parallelism() { diff --git a/src/main/java/io/reactivex/internal/schedulers/ComputationScheduler.java b/src/main/java/io/reactivex/internal/schedulers/ComputationScheduler.java index f2a4dcf7d3..dda22b9255 100644 --- a/src/main/java/io/reactivex/internal/schedulers/ComputationScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/ComputationScheduler.java @@ -15,19 +15,20 @@ */ package io.reactivex.internal.schedulers; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicReference; + import io.reactivex.Scheduler; import io.reactivex.annotations.NonNull; import io.reactivex.disposables.*; import io.reactivex.internal.disposables.*; - -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicReference; +import io.reactivex.internal.functions.ObjectHelper; /** * Holds a fixed pool of worker threads and assigns them * to requested Scheduler.Workers in a round-robin fashion. */ -public final class ComputationScheduler extends Scheduler { +public final class ComputationScheduler extends Scheduler implements SchedulerMultiWorkerSupport { /** This will indicate no pool is active. */ static final FixedSchedulerPool NONE; /** Manages a fixed number of workers. */ @@ -67,7 +68,7 @@ static int cap(int cpuCount, int paramThreads) { return paramThreads <= 0 || paramThreads > cpuCount ? cpuCount : paramThreads; } - static final class FixedSchedulerPool { + static final class FixedSchedulerPool implements SchedulerMultiWorkerSupport { final int cores; final PoolWorker[] eventLoops; @@ -96,6 +97,25 @@ public void shutdown() { w.dispose(); } } + + @Override + public void createWorkers(int number, WorkerCallback callback) { + int c = cores; + if (c == 0) { + for (int i = 0; i < number; i++) { + callback.onWorker(i, SHUTDOWN_WORKER); + } + } else { + int index = (int)n % c; + for (int i = 0; i < number; i++) { + callback.onWorker(i, new EventLoopWorker(eventLoops[index])); + if (++index == c) { + index = 0; + } + } + n = index; + } + } } /** @@ -125,6 +145,12 @@ public Worker createWorker() { return new EventLoopWorker(pool.get().getEventLoop()); } + @Override + public void createWorkers(int number, WorkerCallback callback) { + ObjectHelper.verifyPositive(number, "number > 0 required"); + pool.get().createWorkers(number, callback); + } + @NonNull @Override public Disposable scheduleDirect(@NonNull Runnable run, long delay, TimeUnit unit) { diff --git a/src/main/java/io/reactivex/internal/schedulers/SchedulerMultiWorkerSupport.java b/src/main/java/io/reactivex/internal/schedulers/SchedulerMultiWorkerSupport.java new file mode 100644 index 0000000000..d2c211a815 --- /dev/null +++ b/src/main/java/io/reactivex/internal/schedulers/SchedulerMultiWorkerSupport.java @@ -0,0 +1,52 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.schedulers; + +import io.reactivex.Scheduler; +import io.reactivex.annotations.*; + +/** + * Allows retrieving multiple workers from the implementing + * {@link io.reactivex.Scheduler} in a way that when asking for + * at most the parallelism level of the Scheduler, those + * {@link io.reactivex.Scheduler.Worker} instances will be running + * with different backing threads. + * + * @since 2.1.7 - experimental + */ +@Experimental +public interface SchedulerMultiWorkerSupport { + + /** + * Creates the given number of {@link io.reactivex.Scheduler.Worker} instances + * that are possibly backed by distinct threads + * and calls the specified {@code Consumer} with them. + * @param number the number of workers to create, positive + * @param callback the callback to send worker instances to + */ + void createWorkers(int number, @NonNull WorkerCallback callback); + + /** + * The callback interface for the {@link SchedulerMultiWorkerSupport#createWorkers(int, WorkerCallback)} + * method. + */ + interface WorkerCallback { + /** + * Called with the Worker index and instance. + * @param index the worker index, zero-based + * @param worker the worker instance + */ + void onWorker(int index, @NonNull Scheduler.Worker worker); + } +} diff --git a/src/test/java/io/reactivex/internal/schedulers/SchedulerMultiWorkerSupportTest.java b/src/test/java/io/reactivex/internal/schedulers/SchedulerMultiWorkerSupportTest.java new file mode 100644 index 0000000000..d77dd8786e --- /dev/null +++ b/src/test/java/io/reactivex/internal/schedulers/SchedulerMultiWorkerSupportTest.java @@ -0,0 +1,149 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.schedulers; + +import static org.junit.Assert.*; + +import java.util.*; +import java.util.concurrent.*; + +import org.junit.Test; + +import io.reactivex.Scheduler.Worker; +import io.reactivex.TestHelper; +import io.reactivex.disposables.CompositeDisposable; +import io.reactivex.internal.schedulers.SchedulerMultiWorkerSupport.WorkerCallback; +import io.reactivex.schedulers.Schedulers; + +public class SchedulerMultiWorkerSupportTest { + + final int max = ComputationScheduler.MAX_THREADS; + + @Test + public void moreThanMaxWorkers() { + final List list = new ArrayList(); + + SchedulerMultiWorkerSupport mws = (SchedulerMultiWorkerSupport)Schedulers.computation(); + + mws.createWorkers(max * 2, new WorkerCallback() { + @Override + public void onWorker(int i, Worker w) { + list.add(w); + } + }); + + assertEquals(max * 2, list.size()); + } + + @Test + public void getShutdownWorkers() { + final List list = new ArrayList(); + + ComputationScheduler.NONE.createWorkers(max * 2, new WorkerCallback() { + @Override + public void onWorker(int i, Worker w) { + list.add(w); + } + }); + + assertEquals(max * 2, list.size()); + + for (Worker w : list) { + assertEquals(ComputationScheduler.SHUTDOWN_WORKER, w); + } + } + + @Test + public void distinctThreads() throws Exception { + for (int i = 0; i < 1000; i++) { + + final CompositeDisposable composite = new CompositeDisposable(); + + try { + final CountDownLatch cdl = new CountDownLatch(max * 2); + + final Set threads1 = Collections.synchronizedSet(new HashSet()); + + final Set threads2 = Collections.synchronizedSet(new HashSet()); + + Runnable parallel1 = new Runnable() { + @Override + public void run() { + final List list1 = new ArrayList(); + + SchedulerMultiWorkerSupport mws = (SchedulerMultiWorkerSupport)Schedulers.computation(); + + mws.createWorkers(max, new WorkerCallback() { + @Override + public void onWorker(int i, Worker w) { + list1.add(w); + composite.add(w); + } + }); + + Runnable run = new Runnable() { + @Override + public void run() { + threads1.add(Thread.currentThread().getName()); + cdl.countDown(); + } + }; + + for (Worker w : list1) { + w.schedule(run); + } + } + }; + + Runnable parallel2 = new Runnable() { + @Override + public void run() { + final List list2 = new ArrayList(); + + SchedulerMultiWorkerSupport mws = (SchedulerMultiWorkerSupport)Schedulers.computation(); + + mws.createWorkers(max, new WorkerCallback() { + @Override + public void onWorker(int i, Worker w) { + list2.add(w); + composite.add(w); + } + }); + + Runnable run = new Runnable() { + @Override + public void run() { + threads2.add(Thread.currentThread().getName()); + cdl.countDown(); + } + }; + + for (Worker w : list2) { + w.schedule(run); + } + } + }; + + TestHelper.race(parallel1, parallel2); + + assertTrue(cdl.await(5, TimeUnit.SECONDS)); + + assertEquals(threads1.toString(), max, threads1.size()); + assertEquals(threads2.toString(), max, threads2.size()); + } finally { + composite.dispose(); + } + } + } +}