Skip to content

Commit

Permalink
Support Completable from a Callable (#2225)
Browse files Browse the repository at this point in the history
Motivation

New Completable adapter from a Callable implementation.

Modification

New API to support this adaptor along with modification on the pre-existing fromRunnable to re-use this new Callable approach and minimize duplication.

Result

Richer API
  • Loading branch information
tkountis authored May 23, 2022
1 parent 44ae20a commit b952931
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2019, 2021 Apple Inc. and the ServiceTalk project authors
* Copyright © 2019, 2021-2022 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,17 +17,19 @@

import io.servicetalk.concurrent.internal.ThreadInterruptingCancellable;

import java.util.concurrent.Callable;

import static io.servicetalk.concurrent.internal.SubscriberUtils.handleExceptionFromOnSubscribe;
import static io.servicetalk.concurrent.internal.SubscriberUtils.safeOnComplete;
import static io.servicetalk.concurrent.internal.SubscriberUtils.safeOnError;
import static java.lang.Thread.currentThread;
import static java.util.Objects.requireNonNull;

final class RunnableCompletable extends AbstractSynchronousCompletable {
private final Runnable runnable;
final class CallableCompletable extends AbstractSynchronousCompletable {
private final Callable<Void> callable;

RunnableCompletable(final Runnable runnable) {
this.runnable = requireNonNull(runnable);
CallableCompletable(final Callable<Void> callable) {
this.callable = requireNonNull(callable);
}

@Override
Expand All @@ -41,14 +43,14 @@ void doSubscribe(final Subscriber subscriber) {
}

try {
runnable.run();
callable.call();
} catch (Throwable cause) {
cancellable.setDone(cause);
safeOnError(subscriber, cause);
return;
}
// It is safe to set this outside the scope of the try/catch above because we don't do any blocking
// operations which may be interrupted between the completion of the blockingHttpService call and
// operations which may be interrupted between the completion of the Callable call and
// here.
cancellable.setDone();
safeOnComplete(subscriber);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -1793,7 +1794,27 @@ public static Completable completed() {
* @return A new {@code Completable}.
*/
public static Completable fromRunnable(final Runnable runnable) {
return new RunnableCompletable(runnable);
return new CallableCompletable(() -> {
runnable.run();
return null;
});
}

/**
* Creates a {@link Completable} which when subscribed will invoke {@link Callable#call()} on the passed
* {@link Callable} and emit the value returned by that invocation from the returned {@link Completable}. Any error
* emitted by the {@link Callable} will terminate the returned {@link Completable} with the same error.
* <p>
* Blocking inside {@link Callable#call()} will in turn block the subscribe call to the returned
* {@link Completable}.
* If this behavior is undesirable then the returned {@link Completable} should be offloaded using
* {@link #subscribeOn(io.servicetalk.concurrent.Executor)} which offloads the subscribe call.
*
* @param callable {@link Callable} which is invoked before completion.
* @return A new {@code Completable}.
*/
public static Completable fromCallable(final Callable<Void> callable) {
return new CallableCompletable(callable);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2018-2019, 2021 Apple Inc. and the ServiceTalk project authors
* Copyright © 2018-2019, 2021-2022 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,7 +32,6 @@
import static io.servicetalk.concurrent.api.SourceAdapters.fromSource;
import static io.servicetalk.concurrent.internal.SubscriberUtils.handleExceptionFromOnSubscribe;
import static io.servicetalk.concurrent.internal.SubscriberUtils.safeOnError;
import static io.servicetalk.http.api.BlockingUtils.blockingToCompletable;
import static io.servicetalk.http.api.DefaultHttpExecutionStrategy.OFFLOAD_RECEIVE_META_STRATEGY;
import static io.servicetalk.http.api.DefaultPayloadInfo.forTransportReceive;
import static io.servicetalk.http.api.HeaderUtils.hasContentLength;
Expand Down Expand Up @@ -154,12 +153,18 @@ public void cancel() {

@Override
public Completable closeAsync() {
return blockingToCompletable(original::close);
return Completable.fromCallable(() -> {
original.close();
return null;
});
}

@Override
public Completable closeAsyncGracefully() {
return blockingToCompletable(original::closeGracefully);
return Completable.fromCallable(() -> {
original.closeGracefully();
return null;
});
}

private static final class BufferHttpPayloadWriter implements HttpPayloadWriter<Buffer> {
Expand Down

0 comments on commit b952931

Please sign in to comment.