Skip to content

Commit

Permalink
Single#concatPropagateCancel(Publisher) to force cancel propagation (#…
Browse files Browse the repository at this point in the history
…2381)

Motivation:
In some scenarios concat is used where the right hand side has assocated state that is desirable to always trigger (concat payload body to meta data for http response processing). Regular concat will not subscribe if the Single terminates with onError or is cancelled which will prevent terminal event visibility.
  • Loading branch information
Scottmitch authored Oct 3, 2022
1 parent 450096e commit ffc7353
Show file tree
Hide file tree
Showing 5 changed files with 431 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -764,9 +764,10 @@ public final Single<T> concat(Completable next) {
* @return New {@link Publisher} that first emits the result of this {@link Single} and then subscribes and emits
* all elements from {@code next} {@link Publisher}.
* @see #concat(Publisher, boolean)
* @see #concatPropagateCancel(Publisher)
*/
public final Publisher<T> concat(Publisher<? extends T> next) {
return new SingleConcatWithPublisher<>(this, next, false);
return concat(next, false);
}

/**
Expand All @@ -793,7 +794,29 @@ public final Publisher<T> concat(Publisher<? extends T> next) {
* all elements from {@code next} {@link Publisher}.
*/
public final Publisher<T> concat(Publisher<? extends T> next, boolean deferSubscribe) {
return new SingleConcatWithPublisher<>(this, next, deferSubscribe);
return new SingleConcatWithPublisher<>(this, next, deferSubscribe, false);
}

/**
* This method is like {@link #concat(Completable)} except {@code next} will be subscribed to and cancelled if this
* {@link Publisher} is cancelled or terminates with {@link Subscriber#onError(Throwable)}.
* <p>
* This method provides a means to sequence the execution of two asynchronous sources and in sequential programming
* is similar to:
* <pre>{@code
* List<T> results = new ...;
* results.add(resultOfThisSingle());
* results.addAll(nextStream());
* return results;
* }</pre>
* @param next {@link Publisher} to concat. Will be subscribed to and cancelled if this {@link Publisher} is
* cancelled or terminates with {@link Subscriber#onError(Throwable)}.
* @return New {@link Publisher} that first emits the result of this {@link Single} and then subscribes and emits
* all elements from {@code next} {@link Publisher}.
* @see #concat(Completable)
*/
public final Publisher<T> concatPropagateCancel(Publisher<? extends T> next) {
return new SingleConcatWithPublisher<>(this, next, false, true);
}

/**
Expand Down
Loading

0 comments on commit ffc7353

Please sign in to comment.