diff --git a/servicetalk-concurrent-api/docs/modules/ROOT/pages/blocking-safe-by-default.adoc b/servicetalk-concurrent-api/docs/modules/ROOT/pages/blocking-safe-by-default.adoc index 4b4706415e..e4d866e138 100644 --- a/servicetalk-concurrent-api/docs/modules/ROOT/pages/blocking-safe-by-default.adoc +++ b/servicetalk-concurrent-api/docs/modules/ROOT/pages/blocking-safe-by-default.adoc @@ -128,8 +128,10 @@ thread) -> `map` -> `toFuture` where the element is collected for the result. Fo === `subscribeOn()` Example Using the `subscribeOn(executor)` operator allows processing of the subscription and demand on a specific thread. Using -the `subscribeOn(executor)` operator is necessary if the thread doing the `subscribe` or interacting with the -`Subscription` is sensitive to blocking, for example, an EventLoop thread. +the `subscribeOn(executor)` operator generally requires an understanding of the behavior of the source; using a +different source may change the need for offloading. `subscribeOn(executor)` is used less frequently than +`publishOn(executor)` but is useful when it is necessary to offload the control path; the `subscribe` method or +`Subscription` methods. [source, java] ---- @@ -228,142 +230,6 @@ different `Executor` in specific cases. ServiceTalk provides reactive operators conditional offloading operators; `subscribeOn(executor, predicate)` and `publishOn(executor, predicate)` are also available. -=== `publishOn()` Example - -Using the `publishOn(executor)` operator allows processing of signals related to the source content on a different -thread than is generating the content. It is the most common form of offloading used as it relates to the data path, -specifically the `Subscription` methods. - -[source, java] ----- -Collection result = Publisher.range(1, 10) <2> <5> - .map(element -> element) // non-offloaded NO-OP - .publishOn(publishExecutor) <4> - .map(element -> element) // offloaded NO-OP - .toFuture() <3> - .get(); <1> <6> ----- - -<1> `toFuture()` begins by calling `subscribe(Subscriber)`. Executing on the calling thread, execution flows up -the operator chain towards the source; `map` -> `publishOn` -> `map` -> `Publisher.range(1, 10)`. - -<2> Still executing on the calling thread, `Range` will call `Subscriber.onSubscribe(Subscription)` on the -`Subscriber`. This flows back down the operator chain, `Range` -> `map` -> `publishOn` (offloads on to `publishExecutor` -thread) -> `map` -> `toFuture`. - -<3> The `onSubscribe(Subscription)` method of `toFuture()`'s `Subscriber` will call -`Subscription.request(Long.MAX_VALUE)`. Execution flows up the operator chain towards the source; -`map` -> `publishOn` -> `map` -> `Range`. `Range` will publish synchronously via `onNext(element)` nine items, the -integers "`1`" through "`9`". - -<4> Each `onNext` flows down the operator chain, `Range` -> `map` -> `publishOn` (offloads on to `publishExecutor` -thread) -> `map` -> `toFuture` where the element is collected for the result. For each offloaded item a thread of -`publishExecutor` will be used for executing the second `map` operator and final collect operation. - -<5> After all items, `Range` sends the terminal `onComplete()` signal synchronously which flows down the operator chain, -`Range` -> `map -> `publishOn` (offloads on to `publishExecutor` thread) -> `map` -> `toFuture` and will complete the -`Future` with the integer collection result. - -<6> The calling thread will wait at `get()` for the `Future` result to be asynchronously completed. - -=== `subscribeOn()` Example - -Using the `subscribeOn(executor)` operator allows processing of the subscription and demand on a specific thread. Using -the `subscribeOn(executor)` operator generally requires an understanding of the behavior of the source; using a -different source may change the need for offloading. `subscribeOn(executor)` is used less frequently than -`publishOn(executor)` but is useful when it is necessary to offload the control path; the `subscribe` method or -`Subscription` methods. - -[source, java] ----- -Collection result = Publisher.range(1, 10) <2> <4> - .map(element -> element) // NO-OP - .subscribeOn(subscribeExecutor) - .toFuture() <1> <3> <5> - .get(); <6> ----- - -<1> `toFuture()` will do a `subscribe(Subscriber)`. This flows up the operator chain toward the source; -`subscribeOn` (offload onto `subscribeExecutor` thread) -> `map` -> `Range`. - -<2> Still on a thread from `subscribeExecutor` `Range` will call `Subscriber.onSubscribe(Subscription)` on the -`Subscriber`. This flows back down the operator chain, `Range` -> `map` -> `subscribeOn` -> `toFuture`. - -<3> Still on the`subscribeExecutor` thread, `toFuture()`'s `onSubscribe(Subscription)` call -`Subscription.request(Long.MAX_VALUE)`. This flows up the operator chain, `subscribeOn` (offloads again onto another -`subscribeExecutor` thread) -> `map` -> `Range`. - -<4> Still on thread from the second offload to `subscribeExecutor`, `Range` ` will publish synchronously via -`onNext(element)` nine items, the integers "`1`" through "`9`". Each `onNext` flows back down the operator chain, -`Range` -> `map` -> `subscribeOn` -> `toFuture` where the element is collected for the result. - -<5> Still on thread from the second offload to `subscribeExecutor`, after all items, `Range` will call `onComplete`. -When the `toFuture()` `Subscriber` receives the `onComplete()` signal it will complete the `Future` with the integer -collection result. - -<6> The calling thread will wait at `get()` for the `Future` result to be asynchronously completed. - -=== publishOn()/subscribeOn() Detailed Example - -These examples can be expanded to demonstrate the offloading behavior directly. The expanded example extends the NO-OP -`map` implementations to reveal the active thread during their execution. To show the active thread at the other -points described in the callouts the expanded example also adds `whenOnSubscribe`, `whenRequest`, `liftSync` and -`whenFinally` operations in the operator chain. The output of the example shows the thread used for executing each of -the operators, while the specialized operators provide examples of how you might use them to debug your own programs. - -[source, java] ----- -Collection result = Publisher.range(1, 3) - .map(element -> { - System.out.println("\nPublish starts on " + Thread.currentThread() + " Received : " + element); - return element; - }) - .whenOnSubscribe(subscription -> { - System.out.println("\nonSubscribe starts on " + Thread.currentThread()); - }) - .publishOn(publishExecutor) - .map(element -> { - System.out.println("\nPublish offloaded to " + Thread.currentThread() + " Received : " + element); - return element; - }) - .whenRequest(request -> { - System.out.println("\nrequest(" + request + ") offloaded to " + Thread.currentThread()); - }) - .liftSync(subscriber -> { - System.out.println("\nSubscribe offloaded to " + Thread.currentThread()); - return subscriber; - }) - .subscribeOn(subscribeExecutor) - .liftSync(subscriber -> { - System.out.println("\nSubscribe begins on " + Thread.currentThread()); - return subscriber; - }) - .whenOnSubscribe(subscription -> { - System.out.println("\nonSubscribe offloaded to " + Thread.currentThread()); - }) - .whenRequest(request -> { - System.out.println("\nrequest(" + request + ") starts on " + Thread.currentThread()); - }) - .whenFinally(new TerminalSignalConsumer() { - @Override - public void onComplete() { - System.out.println("\ncomplete on " + Thread.currentThread()); - } - - @Override - public void onError(final Throwable throwable) { - System.out.println("\nerror (" + throwable + ") on " + Thread.currentThread()); - } - - @Override - public void cancel() { - System.out.println("\ncancel on " + Thread.currentThread()); - } - }) - .toFuture() - .get(); ----- - == Implementation In order to use ServiceTalk's blocking support feature, one does not need to know about implementation details and the