Skip to content

Commit

Permalink
Remove duplicate doc sections from "Blocking: Safe by default" (#2448)
Browse files Browse the repository at this point in the history
Motivation:

This doc page contains duplicate sections which only differ
slightly or not at all, adding unnecessary redundancy.

Modifications:

This changeset removes the duplicate sections, but makes sure
to carry the differences forward so that no information is lost.

Result:

Redundant sections removed from the doc page.
  • Loading branch information
daschl authored Nov 30, 2022
1 parent b843d5c commit e9c7be7
Showing 1 changed file with 4 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,10 @@ thread) -> `map` -> `toFuture` where the element is collected for the result. Fo
=== `subscribeOn()` Example

Using the `subscribeOn(executor)` operator allows processing of the subscription and demand on a specific thread. Using
the `subscribeOn(executor)` operator is necessary if the thread doing the `subscribe` or interacting with the
`Subscription` is sensitive to blocking, for example, an EventLoop thread.
the `subscribeOn(executor)` operator generally requires an understanding of the behavior of the source; using a
different source may change the need for offloading. `subscribeOn(executor)` is used less frequently than
`publishOn(executor)` but is useful when it is necessary to offload the control path; the `subscribe` method or
`Subscription` methods.

[source, java]
----
Expand Down Expand Up @@ -228,142 +230,6 @@ different `Executor` in specific cases. ServiceTalk provides reactive operators
conditional offloading operators; `subscribeOn(executor, predicate)` and `publishOn(executor, predicate)` are also
available.

=== `publishOn()` Example

Using the `publishOn(executor)` operator allows processing of signals related to the source content on a different
thread than is generating the content. It is the most common form of offloading used as it relates to the data path,
specifically the `Subscription` methods.

[source, java]
----
Collection<Integer> result = Publisher.range(1, 10) <2> <5>
.map(element -> element) // non-offloaded NO-OP
.publishOn(publishExecutor) <4>
.map(element -> element) // offloaded NO-OP
.toFuture() <3>
.get(); <1> <6>
----

<1> `toFuture()` begins by calling `subscribe(Subscriber)`. Executing on the calling thread, execution flows up
the operator chain towards the source; `map` -> `publishOn` -> `map` -> `Publisher.range(1, 10)`.

<2> Still executing on the calling thread, `Range` will call `Subscriber.onSubscribe(Subscription)` on the
`Subscriber`. This flows back down the operator chain, `Range` -> `map` -> `publishOn` (offloads on to `publishExecutor`
thread) -> `map` -> `toFuture`.

<3> The `onSubscribe(Subscription)` method of `toFuture()`&#xfeff;'s `Subscriber` will call
`Subscription.request(Long.MAX_VALUE)`. Execution flows up the operator chain towards the source;
`map` -> `publishOn` -> `map` -> `Range`. `Range` will publish synchronously via `onNext(element)` nine items, the
integers "`1`" through "`9`".

<4> Each `onNext` flows down the operator chain, `Range` -> `map` -> `publishOn` (offloads on to `publishExecutor`
thread) -> `map` -> `toFuture` where the element is collected for the result. For each offloaded item a thread of
`publishExecutor` will be used for executing the second `map` operator and final collect operation.

<5> After all items, `Range` sends the terminal `onComplete()` signal synchronously which flows down the operator chain,
`Range` -> `map -> `publishOn` (offloads on to `publishExecutor` thread) -> `map` -> `toFuture` and will complete the
`Future` with the integer collection result.

<6> The calling thread will wait at `get()` for the `Future` result to be asynchronously completed.

=== `subscribeOn()` Example

Using the `subscribeOn(executor)` operator allows processing of the subscription and demand on a specific thread. Using
the `subscribeOn(executor)` operator generally requires an understanding of the behavior of the source; using a
different source may change the need for offloading. `subscribeOn(executor)` is used less frequently than
`publishOn(executor)` but is useful when it is necessary to offload the control path; the `subscribe` method or
`Subscription` methods.

[source, java]
----
Collection<Integer> result = Publisher.range(1, 10) <2> <4>
.map(element -> element) // NO-OP
.subscribeOn(subscribeExecutor)
.toFuture() <1> <3> <5>
.get(); <6>
----

<1> `toFuture()` will do a `subscribe(Subscriber)`. This flows up the operator chain toward the source;
`subscribeOn` (offload onto `subscribeExecutor` thread) -> `map` -> `Range`.

<2> Still on a thread from `subscribeExecutor` `Range` will call `Subscriber.onSubscribe(Subscription)` on the
`Subscriber`. This flows back down the operator chain, `Range` -> `map` -> `subscribeOn` -> `toFuture`.

<3> Still on the`subscribeExecutor` thread, `toFuture()`'s `onSubscribe(Subscription)` call
`Subscription.request(Long.MAX_VALUE)`. This flows up the operator chain, `subscribeOn` (offloads again onto another
`subscribeExecutor` thread) -> `map` -> `Range`.

<4> Still on thread from the second offload to `subscribeExecutor`, `Range` ` will publish synchronously via
`onNext(element)` nine items, the integers "`1`" through "`9`". Each `onNext` flows back down the operator chain,
`Range` -> `map` -> `subscribeOn` -> `toFuture` where the element is collected for the result.

<5> Still on thread from the second offload to `subscribeExecutor`, after all items, `Range` will call `onComplete`.
When the `toFuture()` `Subscriber` receives the `onComplete()` signal it will complete the `Future` with the integer
collection result.

<6> The calling thread will wait at `get()` for the `Future` result to be asynchronously completed.

=== publishOn()/subscribeOn() Detailed Example

These examples can be expanded to demonstrate the offloading behavior directly. The expanded example extends the NO-OP
`map` implementations to reveal the active thread during their execution. To show the active thread at the other
points described in the callouts the expanded example also adds `whenOnSubscribe`, `whenRequest`, `liftSync` and
`whenFinally` operations in the operator chain. The output of the example shows the thread used for executing each of
the operators, while the specialized operators provide examples of how you might use them to debug your own programs.

[source, java]
----
Collection<?> result = Publisher.range(1, 3)
.map(element -> {
System.out.println("\nPublish starts on " + Thread.currentThread() + " Received : " + element);
return element;
})
.whenOnSubscribe(subscription -> {
System.out.println("\nonSubscribe starts on " + Thread.currentThread());
})
.publishOn(publishExecutor)
.map(element -> {
System.out.println("\nPublish offloaded to " + Thread.currentThread() + " Received : " + element);
return element;
})
.whenRequest(request -> {
System.out.println("\nrequest(" + request + ") offloaded to " + Thread.currentThread());
})
.liftSync(subscriber -> {
System.out.println("\nSubscribe offloaded to " + Thread.currentThread());
return subscriber;
})
.subscribeOn(subscribeExecutor)
.liftSync(subscriber -> {
System.out.println("\nSubscribe begins on " + Thread.currentThread());
return subscriber;
})
.whenOnSubscribe(subscription -> {
System.out.println("\nonSubscribe offloaded to " + Thread.currentThread());
})
.whenRequest(request -> {
System.out.println("\nrequest(" + request + ") starts on " + Thread.currentThread());
})
.whenFinally(new TerminalSignalConsumer() {
@Override
public void onComplete() {
System.out.println("\ncomplete on " + Thread.currentThread());
}
@Override
public void onError(final Throwable throwable) {
System.out.println("\nerror (" + throwable + ") on " + Thread.currentThread());
}
@Override
public void cancel() {
System.out.println("\ncancel on " + Thread.currentThread());
}
})
.toFuture()
.get();
----

== Implementation

In order to use ServiceTalk's blocking support feature, one does not need to know about implementation details and the
Expand Down

0 comments on commit e9c7be7

Please sign in to comment.