Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

[NC-2268] Handle partial responses to get receipts requests #801

Merged
merged 10 commits into from
Feb 7, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import tech.pegasys.pantheon.util.ExceptionUtils;

import java.time.Duration;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -37,26 +37,30 @@
*
* @param <T> The type as a typed list that the peer task can get partial or full results in.
*/
public abstract class AbstractRetryingPeerTask<T extends Collection<?>> extends AbstractEthTask<T> {
public abstract class AbstractRetryingPeerTask<T> extends AbstractEthTask<T> {

private static final Logger LOG = LogManager.getLogger();
private final EthContext ethContext;
private final int maxRetries;
private final Predicate<T> isEmptyResponse;
private int retryCount = 0;
private Optional<EthPeer> assignedPeer = Optional.empty();

/**
* @param ethContext The context of the current Eth network we are attached to.
* @param maxRetries Maximum number of retries to accept before completing exceptionally.
* @param ethTasksTimer The metrics timer to use to time the duration of the task.
* @param isEmptyResponse Test if the response received was empty.
*/
public AbstractRetryingPeerTask(
final EthContext ethContext,
final int maxRetries,
final LabelledMetric<OperationTimer> ethTasksTimer) {
final LabelledMetric<OperationTimer> ethTasksTimer,
final Predicate<T> isEmptyResponse) {
super(ethTasksTimer);
this.ethContext = ethContext;
this.maxRetries = maxRetries;
this.isEmptyResponse = isEmptyResponse;
}

public void assignPeer(final EthPeer peer) {
Expand All @@ -82,7 +86,7 @@ protected void executeTask() {
handleTaskError(error);
} else {
// If we get a partial success reset the retry counter.
if (peerResult.size() > 0) {
if (!isEmptyResponse.test(peerResult)) {
retryCount = 0;
}
executeTaskTimed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.BlockImporter;
import tech.pegasys.pantheon.ethereum.core.TransactionReceipt;
import tech.pegasys.pantheon.ethereum.eth.manager.AbstractPeerTask.PeerTaskResult;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.sync.ValidationPolicy;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.CompleteBlocksTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.GetReceiptsFromPeerTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.GetReceiptsForHeadersTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.PipelinedImportChainSegmentTask.BlockHandler;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.exceptions.InvalidBlockException;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
Expand Down Expand Up @@ -75,9 +74,7 @@ private CompletableFuture<List<Block>> downloadBodies(final List<BlockHeader> he

private CompletableFuture<Map<BlockHeader, List<TransactionReceipt>>> downloadReceipts(
final List<BlockHeader> headers) {
return GetReceiptsFromPeerTask.forHeaders(ethContext, headers, ethTasksTimer)
.run()
.thenApply(PeerTaskResult::getResult);
return GetReceiptsForHeadersTask.forHeaders(ethContext, headers, ethTasksTimer).run();
}

private List<BlockWithReceipts> combineBlocksAndReceipts(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -60,7 +61,7 @@ private CompleteBlocksTask(
final List<BlockHeader> headers,
final int maxRetries,
final LabelledMetric<OperationTimer> ethTasksTimer) {
super(ethContext, maxRetries, ethTasksTimer);
super(ethContext, maxRetries, ethTasksTimer, Collection::isEmpty);
checkArgument(headers.size() > 0, "Must supply a non-empty headers list");
this.protocolSchedule = protocolSchedule;
this.ethContext = ethContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import tech.pegasys.pantheon.metrics.OperationTimer;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -68,7 +69,7 @@ private DownloadHeaderSequenceTask(
final int segmentLength,
final int maxRetries,
final LabelledMetric<OperationTimer> ethTasksTimer) {
super(ethContext, maxRetries, ethTasksTimer);
super(ethContext, maxRetries, ethTasksTimer, Collection::isEmpty);
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.ethContext = ethContext;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright 2018 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync.tasks;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;

import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.core.TransactionReceipt;
import tech.pegasys.pantheon.ethereum.eth.manager.AbstractPeerTask.PeerTaskResult;
import tech.pegasys.pantheon.ethereum.eth.manager.AbstractRetryingPeerTask;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/** Given a set of headers, repeatedly requests the receipts for those blocks. */
public class GetReceiptsForHeadersTask
extends AbstractRetryingPeerTask<Map<BlockHeader, List<TransactionReceipt>>> {
private static final Logger LOG = LogManager.getLogger();
private static final int DEFAULT_RETRIES = 3;

private final EthContext ethContext;

private final List<BlockHeader> headers;
private final Map<BlockHeader, List<TransactionReceipt>> receipts;

private GetReceiptsForHeadersTask(
final EthContext ethContext,
final List<BlockHeader> headers,
final int maxRetries,
final LabelledMetric<OperationTimer> ethTasksTimer) {
super(ethContext, maxRetries, ethTasksTimer, Map::isEmpty);
checkArgument(headers.size() > 0, "Must supply a non-empty headers list");
this.ethContext = ethContext;

this.headers = headers;
this.receipts = new HashMap<>();
completeEmptyReceipts(headers);
}

public static GetReceiptsForHeadersTask forHeaders(
final EthContext ethContext,
final List<BlockHeader> headers,
final int maxRetries,
final LabelledMetric<OperationTimer> ethTasksTimer) {
return new GetReceiptsForHeadersTask(ethContext, headers, maxRetries, ethTasksTimer);
}

public static GetReceiptsForHeadersTask forHeaders(
final EthContext ethContext,
final List<BlockHeader> headers,
final LabelledMetric<OperationTimer> ethTasksTimer) {
return new GetReceiptsForHeadersTask(ethContext, headers, DEFAULT_RETRIES, ethTasksTimer);
}

private void completeEmptyReceipts(final List<BlockHeader> headers) {
headers.stream()
.filter(header -> header.getReceiptsRoot().equals(Hash.EMPTY_TRIE_HASH))
.forEach(header -> receipts.put(header, emptyList()));
}

@Override
protected CompletableFuture<Map<BlockHeader, List<TransactionReceipt>>> executePeerTask(
final Optional<EthPeer> assignedPeer) {
return requestReceipts(assignedPeer).thenCompose(this::processResponse);
}

private CompletableFuture<Map<BlockHeader, List<TransactionReceipt>>> requestReceipts(
final Optional<EthPeer> assignedPeer) {
final List<BlockHeader> incompleteHeaders = incompleteHeaders();
if (incompleteHeaders.isEmpty()) {
return CompletableFuture.completedFuture(emptyMap());
}
LOG.debug(
"Requesting bodies to complete {} blocks, starting with {}.",
incompleteHeaders.size(),
incompleteHeaders.get(0).getNumber());
return executeSubTask(
() -> {
final GetReceiptsFromPeerTask task =
GetReceiptsFromPeerTask.forHeaders(ethContext, incompleteHeaders, ethTasksTimer);
assignedPeer.ifPresent(task::assignPeer);
return task.run().thenApply(PeerTaskResult::getResult);
});
}

private CompletableFuture<Map<BlockHeader, List<TransactionReceipt>>> processResponse(
final Map<BlockHeader, List<TransactionReceipt>> responseData) {
receipts.putAll(responseData);

if (isComplete()) {
result.get().complete(receipts);
}

return CompletableFuture.completedFuture(responseData);
}

private List<BlockHeader> incompleteHeaders() {
return headers.stream().filter(h -> receipts.get(h) == null).collect(Collectors.toList());
}

private boolean isComplete() {
return headers.stream().allMatch(header -> receipts.get(header) != null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;

import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand All @@ -36,7 +37,7 @@ private RetryingGetHeaderFromPeerByNumberTask(
final LabelledMetric<OperationTimer> ethTasksTimer,
final long pivotBlockNumber,
final int maxRetries) {
super(ethContext, maxRetries, ethTasksTimer);
super(ethContext, maxRetries, ethTasksTimer, Collection::isEmpty);
this.protocolSchedule = protocolSchedule;
this.ethContext = ethContext;
this.pivotBlockNumber = pivotBlockNumber;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2018 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync.tasks;

import static java.util.Collections.emptyList;
import static org.assertj.core.api.Assertions.assertThat;
import static tech.pegasys.pantheon.ethereum.core.Hash.EMPTY_TRIE_HASH;

import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.BlockHeaderTestFixture;
import tech.pegasys.pantheon.ethereum.core.TransactionReceipt;
import tech.pegasys.pantheon.ethereum.eth.manager.EthTask;
import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.RetryingMessageTaskTest;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.google.common.collect.ImmutableMap;
import org.junit.Test;

public class GetReceiptsForHeadersTaskTest
extends RetryingMessageTaskTest<Map<BlockHeader, List<TransactionReceipt>>> {

@Override
protected Map<BlockHeader, List<TransactionReceipt>> generateDataToBeRequested() {
// Setup data to be requested and expected response
final Map<BlockHeader, List<TransactionReceipt>> blocks = new HashMap<>();
for (long i = 0; i < 3; i++) {
final BlockHeader header = blockchain.getBlockHeader(10 + i).get();
blocks.put(header, blockchain.getTxReceipts(header.getHash()).get());
}
return blocks;
}

@Override
protected EthTask<Map<BlockHeader, List<TransactionReceipt>>> createTask(
final Map<BlockHeader, List<TransactionReceipt>> requestedData) {
final List<BlockHeader> headersToComplete = new ArrayList<>(requestedData.keySet());
return GetReceiptsForHeadersTask.forHeaders(
ethContext, headersToComplete, maxRetries, NoOpMetricsSystem.NO_OP_LABELLED_TIMER);
}

@Test
public void shouldBeCompleteWhenAllReceiptsAreEmpty() {
final BlockHeader header1 =
new BlockHeaderTestFixture().number(1).receiptsRoot(EMPTY_TRIE_HASH).buildHeader();
final BlockHeader header2 =
new BlockHeaderTestFixture().number(2).receiptsRoot(EMPTY_TRIE_HASH).buildHeader();
final BlockHeader header3 =
new BlockHeaderTestFixture().number(3).receiptsRoot(EMPTY_TRIE_HASH).buildHeader();

final Map<BlockHeader, List<TransactionReceipt>> expected =
ImmutableMap.of(header1, emptyList(), header2, emptyList(), header3, emptyList());

assertThat(createTask(expected).run()).isCompletedWithValue(expected);
}
}