Skip to content

Commit

Permalink
3.x: remove no-arg, dematerialize(); remove replay(Scheduler) variants (
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Jun 24, 2019
1 parent 9e9d31c commit fb37226
Show file tree
Hide file tree
Showing 14 changed files with 43 additions and 697 deletions.
199 changes: 0 additions & 199 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -8843,57 +8843,6 @@ public final Flowable<T> delaySubscription(long delay, TimeUnit unit, Scheduler
return delaySubscription(timer(delay, unit, scheduler));
}

/**
* Returns a Flowable that reverses the effect of {@link #materialize materialize} by transforming the
* {@link Notification} objects emitted by the source Publisher into the items or notifications they
* represent.
* <p>
* <img width="640" height="335" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/dematerialize.png" alt="">
* <p>
* When the upstream signals an {@link Notification#createOnError(Throwable) onError} or
* {@link Notification#createOnComplete() onComplete} item, the
* returned Flowable cancels the flow and terminates with that type of terminal event:
* <pre><code>
* Flowable.just(createOnNext(1), createOnComplete(), createOnNext(2))
* .doOnCancel(() -&gt; System.out.println("Cancelled!"));
* .dematerialize()
* .test()
* .assertResult(1);
* </code></pre>
* If the upstream signals {@code onError} or {@code onComplete} directly, the flow is terminated
* with the same event.
* <pre><code>
* Flowable.just(createOnNext(1), createOnNext(2))
* .dematerialize()
* .test()
* .assertResult(1, 2);
* </code></pre>
* If this behavior is not desired, the completion can be suppressed by applying {@link #concatWith(Publisher)}
* with a {@link #never()} source.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s
* backpressure behavior.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code dematerialize} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T2> the output value type
* @return a Flowable that emits the items and notifications embedded in the {@link Notification} objects
* emitted by the source Publisher
* @see <a href="http://reactivex.io/documentation/operators/materialize-dematerialize.html">ReactiveX operators documentation: Dematerialize</a>
* @see #dematerialize(Function)
* @deprecated in 2.2.4; inherently type-unsafe as it overrides the output generic type. Use {@link #dematerialize(Function)} instead.
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@Deprecated
@SuppressWarnings({ "unchecked", "rawtypes" })
public final <T2> Flowable<T2> dematerialize() {
return RxJavaPlugins.onAssembly(new FlowableDematerialize(this, Functions.identity()));
}

/**
* Returns a Flowable that reverses the effect of {@link #materialize materialize} by transforming the
* {@link Notification} objects extracted from the source items via a selector function
Expand Down Expand Up @@ -13234,51 +13183,6 @@ public final <R> Flowable<R> replay(Function<? super Flowable<T>, ? extends Publ
FlowableInternalHelper.replaySupplier(this, bufferSize, time, unit, scheduler, eagerTruncate), selector);
}

/**
* Returns a Flowable that emits items that are the results of invoking a specified selector on items
* emitted by a {@link ConnectableFlowable} that shares a single subscription to the source Publisher,
* replaying a maximum of {@code bufferSize} items.
* <p>
* Note that due to concurrency requirements, {@code replay(bufferSize)} may hold strong references to more than
* {@code bufferSize} source emissions.
* <p>
* <img width="640" height="440" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/replay.fns.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>This operator supports backpressure. Note that the upstream requests are determined by the child
* Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will
* request 100 elements from the underlying Publisher sequence.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@link Scheduler} this operator will use.</dd>
* </dl>
*
* @param <R>
* the type of items emitted by the resulting Publisher
* @param selector
* a selector function, which can use the multicasted sequence as many times as needed, without
* causing multiple subscriptions to the Publisher
* @param bufferSize
* the buffer size that limits the number of items the connectable Publisher can replay
* @param scheduler
* the Scheduler on which the replay is observed
* @return a Flowable that emits items that are the results of invoking the selector on items emitted by
* a {@link ConnectableFlowable} that shares a single subscription to the source Publisher,
* replaying no more than {@code bufferSize} notifications
* @see <a href="http://reactivex.io/documentation/operators/replay.html">ReactiveX operators documentation: Replay</a>
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final <R> Flowable<R> replay(final Function<? super Flowable<T>, ? extends Publisher<R>> selector, final int bufferSize, final Scheduler scheduler) {
ObjectHelper.requireNonNull(selector, "selector is null");
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return FlowableReplay.multicastSelector(FlowableInternalHelper.replaySupplier(this, bufferSize, false),
FlowableInternalHelper.replayFunction(selector, scheduler)
);
}

/**
* Returns a Flowable that emits items that are the results of invoking a specified selector on items
* emitted by a {@link ConnectableFlowable} that shares a single subscription to the source Publisher,
Expand Down Expand Up @@ -13403,43 +13307,6 @@ public final <R> Flowable<R> replay(Function<? super Flowable<T>, ? extends Publ
return FlowableReplay.multicastSelector(FlowableInternalHelper.replaySupplier(this, time, unit, scheduler, eagerTruncate), selector);
}

/**
* Returns a Flowable that emits items that are the results of invoking a specified selector on items
* emitted by a {@link ConnectableFlowable} that shares a single subscription to the source Publisher.
* <p>
* <img width="640" height="445" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/replay.fs.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>This operator supports backpressure. Note that the upstream requests are determined by the child
* Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will
* request 100 elements from the underlying Publisher sequence.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@link Scheduler} this operator will use.</dd>
* </dl>
*
* @param <R>
* the type of items emitted by the resulting Publisher
* @param selector
* a selector function, which can use the multicasted sequence as many times as needed, without
* causing multiple subscriptions to the Publisher
* @param scheduler
* the Scheduler where the replay is observed
* @return a Flowable that emits items that are the results of invoking the selector on items emitted by
* a {@link ConnectableFlowable} that shares a single subscription to the source Publisher,
* replaying all items
* @see <a href="http://reactivex.io/documentation/operators/replay.html">ReactiveX operators documentation: Replay</a>
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final <R> Flowable<R> replay(final Function<? super Flowable<T>, ? extends Publisher<R>> selector, final Scheduler scheduler) {
ObjectHelper.requireNonNull(selector, "selector is null");
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return FlowableReplay.multicastSelector(FlowableInternalHelper.replaySupplier(this),
FlowableInternalHelper.replayFunction(selector, scheduler));
}

/**
* Returns a {@link ConnectableFlowable} that shares a single subscription to the source Publisher that
* replays at most {@code bufferSize} items emitted by that Publisher. A Connectable Publisher resembles
Expand Down Expand Up @@ -13651,41 +13518,6 @@ public final ConnectableFlowable<T> replay(final int bufferSize, final long time
return FlowableReplay.create(this, time, unit, scheduler, bufferSize, eagerTruncate);
}

/**
* Returns a {@link ConnectableFlowable} that shares a single subscription to the source Publisher and
* replays at most {@code bufferSize} items emitted by that Publisher. A Connectable Publisher resembles
* an ordinary Publisher, except that it does not begin emitting items when it is subscribed to, but only
* when its {@code connect} method is called.
* <p>
* Note that due to concurrency requirements, {@code replay(bufferSize)} may hold strong references to more than
* {@code bufferSize} source emissions.
* <p>
* <img width="640" height="515" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/replay.ns.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>This operator supports backpressure. Note that the upstream requests are determined by the child
* Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will
* request 100 elements from the underlying Publisher sequence.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@link Scheduler} this operator will use.</dd>
* </dl>
*
* @param bufferSize
* the buffer size that limits the number of items that can be replayed
* @param scheduler
* the scheduler on which the Subscribers will observe the emitted items
* @return a {@link ConnectableFlowable} that shares a single subscription to the source Publisher and
* replays at most {@code bufferSize} items that were emitted by the Publisher
* @see <a href="http://reactivex.io/documentation/operators/replay.html">ReactiveX operators documentation: Replay</a>
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final ConnectableFlowable<T> replay(final int bufferSize, final Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return FlowableReplay.observeOn(replay(bufferSize), scheduler);
}

/**
* Returns a {@link ConnectableFlowable} that shares a single subscription to the source Publisher and
* replays all items emitted by that Publisher within a specified time window. A Connectable Publisher
Expand Down Expand Up @@ -13800,37 +13632,6 @@ public final ConnectableFlowable<T> replay(final long time, final TimeUnit unit,
return FlowableReplay.create(this, time, unit, scheduler, eagerTruncate);
}

/**
* Returns a {@link ConnectableFlowable} that shares a single subscription to the source Publisher that
* will replay all of its items and notifications to any future {@link Subscriber} on the given
* {@link Scheduler}. A Connectable Publisher resembles an ordinary Publisher, except that it does not
* begin emitting items when it is subscribed to, but only when its {@code connect} method is called.
* <p>
* <img width="640" height="515" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/replay.s.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>This operator supports backpressure. Note that the upstream requests are determined by the child
* Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will
* request 100 elements from the underlying Publisher sequence.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@link Scheduler} this operator will use.</dd>
* </dl>
*
* @param scheduler
* the Scheduler on which the Subscribers will observe the emitted items
* @return a {@link ConnectableFlowable} that shares a single subscription to the source Publisher that
* will replay all of its items and notifications to any future {@link Subscriber} on the given
* {@link Scheduler}
* @see <a href="http://reactivex.io/documentation/operators/replay.html">ReactiveX operators documentation: Replay</a>
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final ConnectableFlowable<T> replay(final Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return FlowableReplay.observeOn(replay(), scheduler);
}

/**
* Returns a Flowable that mirrors the source Publisher, resubscribing to it if it calls {@code onError}
* (infinite retry count).
Expand Down
Loading

0 comments on commit fb37226

Please sign in to comment.