Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Introduce FutureUtils to reduce duplicated code around CompletableFuture #868

Merged
merged 6 commits into from
Feb 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
*/
package tech.pegasys.pantheon.ethereum.eth.manager;

import static tech.pegasys.pantheon.util.FutureUtils.completedExceptionally;

import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;

Expand Down Expand Up @@ -110,9 +112,7 @@ protected final <S> CompletableFuture<S> executeSubTask(
});
return subTaskFuture;
} else {
final CompletableFuture<S> future = new CompletableFuture<>();
future.completeExceptionally(new CancellationException());
return future;
return completedExceptionally(new CancellationException());
}
}
}
Expand All @@ -135,9 +135,7 @@ protected final <S> CompletableFuture<S> registerSubTask(
});
return subTaskFuture;
} else {
final CompletableFuture<S> future = new CompletableFuture<>();
future.completeExceptionally(new CancellationException());
return future;
return completedExceptionally(new CancellationException());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
*/
package tech.pegasys.pantheon.ethereum.eth.manager;

import static tech.pegasys.pantheon.util.FutureUtils.propagateResult;

import tech.pegasys.pantheon.util.ExceptionUtils;

import java.time.Duration;
Expand Down Expand Up @@ -98,19 +100,7 @@ public <T> CompletableFuture<T> scheduleSyncWorkerTask(
final Supplier<CompletableFuture<T>> future) {
final CompletableFuture<T> promise = new CompletableFuture<>();
final Future<?> workerFuture =
syncWorkerExecutor.submit(
() -> {
future
.get()
.whenComplete(
(r, t) -> {
if (t != null) {
promise.completeExceptionally(t);
} else {
promise.complete(r);
}
});
});
syncWorkerExecutor.submit(() -> propagateResult(future.get(), promise));
// If returned promise is cancelled, cancel the worker future
promise.whenComplete(
(r, t) -> {
Expand Down Expand Up @@ -170,18 +160,7 @@ public <T> CompletableFuture<T> scheduleFutureTask(
final CompletableFuture<T> promise = new CompletableFuture<>();
final ScheduledFuture<?> scheduledFuture =
scheduler.schedule(
() -> {
future
.get()
.whenComplete(
(r, t) -> {
if (t != null) {
promise.completeExceptionally(t);
} else {
promise.complete(r);
}
});
},
() -> propagateResult(future.get(), promise),
duration.toMillis(),
TimeUnit.MILLISECONDS);
// If returned promise is cancelled, cancel scheduled task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

import static java.util.concurrent.CompletableFuture.completedFuture;
import static tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncError.CHAIN_TOO_SHORT;
import static tech.pegasys.pantheon.util.FutureUtils.completedExceptionally;
import static tech.pegasys.pantheon.util.FutureUtils.exceptionallyCompose;

import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
Expand Down Expand Up @@ -73,59 +75,39 @@ public CompletableFuture<FastSyncState> waitForSuitablePeers(final FastSyncState
ethContext, syncConfig.getFastSyncMinimumPeerCount(), ethTasksTimer);

final EthScheduler scheduler = ethContext.getScheduler();
final CompletableFuture<FastSyncState> result = new CompletableFuture<>();
scheduler
.timeout(waitForPeersTask, syncConfig.getFastSyncMaximumPeerWaitTime())
.handle(
(waitResult, error) -> {
return exceptionallyCompose(
scheduler.timeout(waitForPeersTask, syncConfig.getFastSyncMaximumPeerWaitTime()),
error -> {
if (ExceptionUtils.rootCause(error) instanceof TimeoutException) {
if (ethContext.getEthPeers().availablePeerCount() > 0) {
LOG.warn(
"Fast sync timed out before minimum peer count was reached. Continuing with reduced peers.");
result.complete(fastSyncState);
return completedFuture(null);
} else {
LOG.warn(
"Maximum wait time for fast sync reached but no peers available. Continuing to wait for any available peer.");
waitForAnyPeer()
.thenAccept(value -> result.complete(fastSyncState))
.exceptionally(
taskError -> {
result.completeExceptionally(error);
return null;
});
return waitForAnyPeer();
}
} else if (error != null) {
LOG.error("Failed to find peers for fast sync", error);
result.completeExceptionally(error);
} else {
result.complete(fastSyncState);
return completedExceptionally(error);
}
return null;
});

return result;
})
.thenApply(successfulWaitResult -> fastSyncState);
}

private CompletableFuture<Void> waitForAnyPeer() {
final CompletableFuture<Void> result = new CompletableFuture<>();
waitForAnyPeer(result);
return result;
}

private void waitForAnyPeer(final CompletableFuture<Void> result) {
ethContext
.getScheduler()
.timeout(WaitForPeersTask.create(ethContext, 1, ethTasksTimer))
.whenComplete(
(waitResult, throwable) -> {
if (ExceptionUtils.rootCause(throwable) instanceof TimeoutException) {
waitForAnyPeer(result);
} else if (throwable != null) {
result.completeExceptionally(throwable);
} else {
result.complete(waitResult);
}
});
final CompletableFuture<Void> waitForPeerResult =
ethContext.getScheduler().timeout(WaitForPeersTask.create(ethContext, 1, ethTasksTimer));
return exceptionallyCompose(
waitForPeerResult,
throwable -> {
if (ExceptionUtils.rootCause(throwable) instanceof TimeoutException) {
return waitForAnyPeer();
}
return completedExceptionally(throwable);
});
}

public CompletableFuture<FastSyncState> selectPivotBlock(final FastSyncState fastSyncState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package tech.pegasys.pantheon.ethereum.eth.sync.fastsync;

import static java.util.Collections.emptyList;
import static tech.pegasys.pantheon.util.FutureUtils.completedExceptionally;

import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.core.Block;
Expand Down Expand Up @@ -112,11 +113,9 @@ public CompletableFuture<List<BlockWithReceipts>> validateAndImportBlocks(
}

private CompletableFuture<List<BlockWithReceipts>> invalidBlockFailure(final Block block) {
final CompletableFuture<List<BlockWithReceipts>> result = new CompletableFuture<>();
result.completeExceptionally(
return completedExceptionally(
new InvalidBlockException(
"Failed to import block", block.getHeader().getNumber(), block.getHash()));
return result;
}

private BlockImporter<C> getBlockImporter(final BlockWithReceipts blockWithReceipt) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
*/
package tech.pegasys.pantheon.ethereum.eth.sync.tasks;

import static tech.pegasys.pantheon.util.FutureUtils.completedExceptionally;

import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Hash;
Expand Down Expand Up @@ -87,9 +89,7 @@ private CompletableFuture<PeerTaskResult<List<BlockHeader>>> downloadHeader(fina
private CompletableFuture<PeerTaskResult<List<Block>>> completeBlock(
final PeerTaskResult<List<BlockHeader>> headerResult) {
if (headerResult.getResult().isEmpty()) {
final CompletableFuture<PeerTaskResult<List<Block>>> future = new CompletableFuture<>();
future.completeExceptionally(new IncompleteResultsException());
return future;
return completedExceptionally(new IncompleteResultsException());
}

return executeSubTask(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,12 @@ protected abstract CompletableFuture<Void> sendOutgoingPacket(
public abstract CompletableFuture<?> stop();

public CompletableFuture<?> start() {
final CompletableFuture<?> future = new CompletableFuture<>();
if (config.isActive()) {
final String host = config.getBindHost();
final int port = config.getBindPort();
LOG.info("Starting peer discovery agent on host={}, port={}", host, port);

listenForConnections()
return listenForConnections()
.thenAccept(
(InetSocketAddress localAddress) -> {
// Once listener is set up, finish initializing
Expand All @@ -140,21 +139,11 @@ public CompletableFuture<?> start() {
localAddress.getPort());
isActive = true;
startController();
})
.whenComplete(
(res, err) -> {
// Finalize future
if (err != null) {
future.completeExceptionally(err);
} else {
future.complete(null);
}
});
} else {
this.isActive = false;
future.complete(null);
return CompletableFuture.completedFuture(null);
}
return future;
}

private void startController() {
Expand Down
87 changes: 87 additions & 0 deletions util/src/main/java/tech/pegasys/pantheon/util/FutureUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.util;

import static java.util.concurrent.CompletableFuture.completedFuture;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;

public class FutureUtils {

/**
* Creates a {@link CompletableFuture} that is exceptionally completed by <code>error</code>.
*
* @param error the error to exceptionally complete the future with
* @param <T> the type of CompletableFuture
* @return a CompletableFuture exceptionally completed by <code>error</code>.
*/
public static <T> CompletableFuture<T> completedExceptionally(final Throwable error) {
final CompletableFuture<T> future = new CompletableFuture<>();
future.completeExceptionally(error);
return future;
}

/**
* Returns a new CompletionStage that, when the provided stage completes exceptionally, is
* executed with the provided stage's exception as the argument to the supplied function.
* Otherwise the returned stage completes successfully with the same value as the provided stage.
*
* <p>This is the exceptional equivalent to {@link CompletionStage#thenCompose(Function)}
*
* @param future the future to handle results or exceptions from
* @param errorHandler the function returning a new CompletionStage
* @param <T> the type of the CompletionStage's result
* @return the CompletionStage
*/
public static <T> CompletableFuture<T> exceptionallyCompose(
final CompletableFuture<T> future,
final Function<Throwable, CompletionStage<T>> errorHandler) {
final CompletableFuture<T> result = new CompletableFuture<>();
future.whenComplete(
(value, error) -> {
try {
final CompletionStage<T> nextStep =
error != null ? errorHandler.apply(error) : completedFuture(value);
propagateResult(nextStep, result);
} catch (final Throwable t) {
result.completeExceptionally(t);
}
});
return result;
}

/**
* Propagates the result of one {@link CompletionStage} to a different {@link CompletableFuture}.
*
* <p>When <code>from</code> completes successfully, <code>to</code> will be completed
* successfully with the same value. When <code>from</code> completes exceptionally, <code>to
* </code> will be completed exceptionally with the same exception.
*
* @param from the CompletionStage to take results and exceptions from
* @param to the CompletableFuture to propagate results and exceptions to
* @param <T> the type of the success value
*/
public static <T> void propagateResult(
final CompletionStage<T> from, final CompletableFuture<T> to) {
from.whenComplete(
(value, error) -> {
if (error != null) {
to.completeExceptionally(error);
} else {
to.complete(value);
}
});
}
}
Loading