diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractRetryingPeerTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractRetryingPeerTask.java index 4d8fe66161..7d874de153 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractRetryingPeerTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/AbstractRetryingPeerTask.java @@ -22,10 +22,10 @@ import tech.pegasys.pantheon.util.ExceptionUtils; import java.time.Duration; -import java.util.Collection; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeoutException; +import java.util.function.Predicate; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -37,11 +37,12 @@ * * @param The type as a typed list that the peer task can get partial or full results in. */ -public abstract class AbstractRetryingPeerTask> extends AbstractEthTask { +public abstract class AbstractRetryingPeerTask extends AbstractEthTask { private static final Logger LOG = LogManager.getLogger(); private final EthContext ethContext; private final int maxRetries; + private final Predicate isEmptyResponse; private int retryCount = 0; private Optional assignedPeer = Optional.empty(); @@ -49,14 +50,17 @@ public abstract class AbstractRetryingPeerTask> extends * @param ethContext The context of the current Eth network we are attached to. * @param maxRetries Maximum number of retries to accept before completing exceptionally. * @param ethTasksTimer The metrics timer to use to time the duration of the task. + * @param isEmptyResponse Test if the response received was empty. */ public AbstractRetryingPeerTask( final EthContext ethContext, final int maxRetries, - final LabelledMetric ethTasksTimer) { + final LabelledMetric ethTasksTimer, + final Predicate isEmptyResponse) { super(ethTasksTimer); this.ethContext = ethContext; this.maxRetries = maxRetries; + this.isEmptyResponse = isEmptyResponse; } public void assignPeer(final EthPeer peer) { @@ -82,7 +86,7 @@ protected void executeTask() { handleTaskError(error); } else { // If we get a partial success reset the retry counter. - if (peerResult.size() > 0) { + if (!isEmptyResponse.test(peerResult)) { retryCount = 0; } executeTaskTimed(); diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncBlockHandler.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncBlockHandler.java index af1e8055c4..6e29dacb03 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncBlockHandler.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncBlockHandler.java @@ -19,11 +19,10 @@ import tech.pegasys.pantheon.ethereum.core.BlockHeader; import tech.pegasys.pantheon.ethereum.core.BlockImporter; import tech.pegasys.pantheon.ethereum.core.TransactionReceipt; -import tech.pegasys.pantheon.ethereum.eth.manager.AbstractPeerTask.PeerTaskResult; import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; import tech.pegasys.pantheon.ethereum.eth.sync.ValidationPolicy; import tech.pegasys.pantheon.ethereum.eth.sync.tasks.CompleteBlocksTask; -import tech.pegasys.pantheon.ethereum.eth.sync.tasks.GetReceiptsFromPeerTask; +import tech.pegasys.pantheon.ethereum.eth.sync.tasks.GetReceiptsForHeadersTask; import tech.pegasys.pantheon.ethereum.eth.sync.tasks.PipelinedImportChainSegmentTask.BlockHandler; import tech.pegasys.pantheon.ethereum.eth.sync.tasks.exceptions.InvalidBlockException; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; @@ -75,9 +74,7 @@ private CompletableFuture> downloadBodies(final List he private CompletableFuture>> downloadReceipts( final List headers) { - return GetReceiptsFromPeerTask.forHeaders(ethContext, headers, ethTasksTimer) - .run() - .thenApply(PeerTaskResult::getResult); + return GetReceiptsForHeadersTask.forHeaders(ethContext, headers, ethTasksTimer).run(); } private List combineBlocksAndReceipts( diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/CompleteBlocksTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/CompleteBlocksTask.java index 565745095c..d96a4792e7 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/CompleteBlocksTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/CompleteBlocksTask.java @@ -29,6 +29,7 @@ import tech.pegasys.pantheon.metrics.LabelledMetric; import tech.pegasys.pantheon.metrics.OperationTimer; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Optional; @@ -60,7 +61,7 @@ private CompleteBlocksTask( final List headers, final int maxRetries, final LabelledMetric ethTasksTimer) { - super(ethContext, maxRetries, ethTasksTimer); + super(ethContext, maxRetries, ethTasksTimer, Collection::isEmpty); checkArgument(headers.size() > 0, "Must supply a non-empty headers list"); this.protocolSchedule = protocolSchedule; this.ethContext = ethContext; diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java index 1c568993e3..032772ba55 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java @@ -31,6 +31,7 @@ import tech.pegasys.pantheon.metrics.OperationTimer; import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -68,7 +69,7 @@ private DownloadHeaderSequenceTask( final int segmentLength, final int maxRetries, final LabelledMetric ethTasksTimer) { - super(ethContext, maxRetries, ethTasksTimer); + super(ethContext, maxRetries, ethTasksTimer, Collection::isEmpty); this.protocolSchedule = protocolSchedule; this.protocolContext = protocolContext; this.ethContext = ethContext; diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetReceiptsForHeadersTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetReceiptsForHeadersTask.java new file mode 100644 index 0000000000..6c2671ce22 --- /dev/null +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetReceiptsForHeadersTask.java @@ -0,0 +1,128 @@ +/* + * Copyright 2018 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.sync.tasks; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Collections.emptyList; +import static java.util.Collections.emptyMap; + +import tech.pegasys.pantheon.ethereum.core.BlockHeader; +import tech.pegasys.pantheon.ethereum.core.Hash; +import tech.pegasys.pantheon.ethereum.core.TransactionReceipt; +import tech.pegasys.pantheon.ethereum.eth.manager.AbstractPeerTask.PeerTaskResult; +import tech.pegasys.pantheon.ethereum.eth.manager.AbstractRetryingPeerTask; +import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; +import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.OperationTimer; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** Given a set of headers, repeatedly requests the receipts for those blocks. */ +public class GetReceiptsForHeadersTask + extends AbstractRetryingPeerTask>> { + private static final Logger LOG = LogManager.getLogger(); + private static final int DEFAULT_RETRIES = 3; + + private final EthContext ethContext; + + private final List headers; + private final Map> receipts; + + private GetReceiptsForHeadersTask( + final EthContext ethContext, + final List headers, + final int maxRetries, + final LabelledMetric ethTasksTimer) { + super(ethContext, maxRetries, ethTasksTimer, Map::isEmpty); + checkArgument(headers.size() > 0, "Must supply a non-empty headers list"); + this.ethContext = ethContext; + + this.headers = headers; + this.receipts = new HashMap<>(); + completeEmptyReceipts(headers); + } + + public static GetReceiptsForHeadersTask forHeaders( + final EthContext ethContext, + final List headers, + final int maxRetries, + final LabelledMetric ethTasksTimer) { + return new GetReceiptsForHeadersTask(ethContext, headers, maxRetries, ethTasksTimer); + } + + public static GetReceiptsForHeadersTask forHeaders( + final EthContext ethContext, + final List headers, + final LabelledMetric ethTasksTimer) { + return new GetReceiptsForHeadersTask(ethContext, headers, DEFAULT_RETRIES, ethTasksTimer); + } + + private void completeEmptyReceipts(final List headers) { + headers.stream() + .filter(header -> header.getReceiptsRoot().equals(Hash.EMPTY_TRIE_HASH)) + .forEach(header -> receipts.put(header, emptyList())); + } + + @Override + protected CompletableFuture>> executePeerTask( + final Optional assignedPeer) { + return requestReceipts(assignedPeer).thenCompose(this::processResponse); + } + + private CompletableFuture>> requestReceipts( + final Optional assignedPeer) { + final List incompleteHeaders = incompleteHeaders(); + if (incompleteHeaders.isEmpty()) { + return CompletableFuture.completedFuture(emptyMap()); + } + LOG.debug( + "Requesting bodies to complete {} blocks, starting with {}.", + incompleteHeaders.size(), + incompleteHeaders.get(0).getNumber()); + return executeSubTask( + () -> { + final GetReceiptsFromPeerTask task = + GetReceiptsFromPeerTask.forHeaders(ethContext, incompleteHeaders, ethTasksTimer); + assignedPeer.ifPresent(task::assignPeer); + return task.run().thenApply(PeerTaskResult::getResult); + }); + } + + private CompletableFuture>> processResponse( + final Map> responseData) { + receipts.putAll(responseData); + + if (isComplete()) { + result.get().complete(receipts); + } + + return CompletableFuture.completedFuture(responseData); + } + + private List incompleteHeaders() { + return headers.stream().filter(h -> receipts.get(h) == null).collect(Collectors.toList()); + } + + private boolean isComplete() { + return headers.stream().allMatch(header -> receipts.get(header) != null); + } +} diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/RetryingGetHeaderFromPeerByNumberTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/RetryingGetHeaderFromPeerByNumberTask.java index b47009cd82..298f56b918 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/RetryingGetHeaderFromPeerByNumberTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/RetryingGetHeaderFromPeerByNumberTask.java @@ -20,6 +20,7 @@ import tech.pegasys.pantheon.metrics.LabelledMetric; import tech.pegasys.pantheon.metrics.OperationTimer; +import java.util.Collection; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -36,7 +37,7 @@ private RetryingGetHeaderFromPeerByNumberTask( final LabelledMetric ethTasksTimer, final long pivotBlockNumber, final int maxRetries) { - super(ethContext, maxRetries, ethTasksTimer); + super(ethContext, maxRetries, ethTasksTimer, Collection::isEmpty); this.protocolSchedule = protocolSchedule; this.ethContext = ethContext; this.pivotBlockNumber = pivotBlockNumber; diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetReceiptsForHeadersTaskTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetReceiptsForHeadersTaskTest.java new file mode 100644 index 0000000000..4e1668dd15 --- /dev/null +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/GetReceiptsForHeadersTaskTest.java @@ -0,0 +1,70 @@ +/* + * Copyright 2018 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.sync.tasks; + +import static java.util.Collections.emptyList; +import static org.assertj.core.api.Assertions.assertThat; +import static tech.pegasys.pantheon.ethereum.core.Hash.EMPTY_TRIE_HASH; + +import tech.pegasys.pantheon.ethereum.core.BlockHeader; +import tech.pegasys.pantheon.ethereum.core.BlockHeaderTestFixture; +import tech.pegasys.pantheon.ethereum.core.TransactionReceipt; +import tech.pegasys.pantheon.ethereum.eth.manager.EthTask; +import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.RetryingMessageTaskTest; +import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.google.common.collect.ImmutableMap; +import org.junit.Test; + +public class GetReceiptsForHeadersTaskTest + extends RetryingMessageTaskTest>> { + + @Override + protected Map> generateDataToBeRequested() { + // Setup data to be requested and expected response + final Map> blocks = new HashMap<>(); + for (long i = 0; i < 3; i++) { + final BlockHeader header = blockchain.getBlockHeader(10 + i).get(); + blocks.put(header, blockchain.getTxReceipts(header.getHash()).get()); + } + return blocks; + } + + @Override + protected EthTask>> createTask( + final Map> requestedData) { + final List headersToComplete = new ArrayList<>(requestedData.keySet()); + return GetReceiptsForHeadersTask.forHeaders( + ethContext, headersToComplete, maxRetries, NoOpMetricsSystem.NO_OP_LABELLED_TIMER); + } + + @Test + public void shouldBeCompleteWhenAllReceiptsAreEmpty() { + final BlockHeader header1 = + new BlockHeaderTestFixture().number(1).receiptsRoot(EMPTY_TRIE_HASH).buildHeader(); + final BlockHeader header2 = + new BlockHeaderTestFixture().number(2).receiptsRoot(EMPTY_TRIE_HASH).buildHeader(); + final BlockHeader header3 = + new BlockHeaderTestFixture().number(3).receiptsRoot(EMPTY_TRIE_HASH).buildHeader(); + + final Map> expected = + ImmutableMap.of(header1, emptyList(), header2, emptyList(), header3, emptyList()); + + assertThat(createTask(expected).run()).isCompletedWithValue(expected); + } +}