diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeer.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeer.java index 2d9cdd6968..4e92d7e912 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeer.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/EthPeer.java @@ -101,8 +101,8 @@ public void recordRequestTimeout(final int requestCode) { reputation.recordRequestTimeout(requestCode).ifPresent(this::disconnect); } - public void recordUselessResponse() { - LOG.debug("Received useless response from peer {}", this); + public void recordUselessResponse(final String requestType) { + LOG.debug("Received useless response for {} from peer {}", requestType, this); reputation.recordUselessResponse(System.currentTimeMillis()).ifPresent(this::disconnect); } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractGetHeadersFromPeerTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractGetHeadersFromPeerTask.java index a3461ed4ba..ec64bd3f32 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractGetHeadersFromPeerTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/AbstractGetHeadersFromPeerTask.java @@ -66,7 +66,7 @@ protected Optional> processResponse( if (streamClosed) { // All outstanding requests have been responded to and we still haven't found the response // we wanted. It must have been empty or contain data that didn't match. - peer.recordUselessResponse(); + peer.recordUselessResponse("headers"); return Optional.of(Collections.emptyList()); } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetBodiesFromPeerTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetBodiesFromPeerTask.java index 5e342a7edf..ed10df2609 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetBodiesFromPeerTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetBodiesFromPeerTask.java @@ -95,7 +95,7 @@ protected Optional> processResponse( if (streamClosed) { // All outstanding requests have been responded to and we still haven't found the response // we wanted. It must have been empty or contain data that didn't match. - peer.recordUselessResponse(); + peer.recordUselessResponse("bodies"); return Optional.of(Collections.emptyList()); } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetReceiptsFromPeerTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetReceiptsFromPeerTask.java index b25ca401fc..76165df0bf 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetReceiptsFromPeerTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetReceiptsFromPeerTask.java @@ -84,7 +84,7 @@ protected Optional>> processResponse( if (streamClosed) { // All outstanding requests have been responded to and we still haven't found the response // we wanted. It must have been empty or contain data that didn't match. - peer.recordUselessResponse(); + peer.recordUselessResponse("receipts"); return Optional.of(emptyMap()); } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ImportBlocksTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ImportBlocksTask.java index 52268548b3..4ed5a21e06 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ImportBlocksTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ImportBlocksTask.java @@ -26,6 +26,7 @@ import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; @@ -96,6 +97,11 @@ protected void executeTaskWithPeer(final EthPeer peer) throws PeerNotConnected { }); } + @Override + protected Optional findSuitablePeer() { + return ethContext.getEthPeers().idlePeer(referenceHeader.getNumber()); + } + private CompletableFuture>> downloadHeaders() { final AbstractPeerTask> task = GetHeadersFromPeerByHashTask.startingAtHash( diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ImportBlocksTaskTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ImportBlocksTaskTest.java index e40695824b..4aa77cd1ff 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ImportBlocksTaskTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ImportBlocksTaskTest.java @@ -13,6 +13,7 @@ package tech.pegasys.pantheon.ethereum.eth.sync.tasks; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static tech.pegasys.pantheon.ethereum.core.InMemoryStorageProvider.createInMemoryBlockchain; import tech.pegasys.pantheon.ethereum.ProtocolContext; @@ -26,6 +27,7 @@ import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer; import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer.Responder; import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.AbstractMessageTaskTest; +import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.NoAvailablePeersException; import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult; import tech.pegasys.pantheon.ethereum.eth.manager.task.EthTask; import tech.pegasys.pantheon.ethereum.eth.messages.BlockHeadersMessage; @@ -161,6 +163,23 @@ public void completesWhenPeersSendEmptyResponses() assertThat(future.isCompletedExceptionally()).isFalse(); } + @Test + public void shouldNotRequestDataFromPeerBelowExpectedHeight() { + // Setup a unresponsive peer + final Responder responder = RespondingEthPeer.emptyResponder(); + final RespondingEthPeer respondingEthPeer = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1); + + // Execute task and wait for response + final List requestedData = generateDataToBeRequested(); + final EthTask>> task = createTask(requestedData); + final CompletableFuture>> future = task.run(); + respondingEthPeer.respondWhile(responder, () -> !future.isDone()); + assertThat(future.isDone()).isTrue(); + assertThat(future.isCompletedExceptionally()).isTrue(); + assertThatThrownBy(future::get).hasCauseInstanceOf(NoAvailablePeersException.class); + } + private MutableBlockchain createShortChain(final long truncateAtBlockNumber) { final BlockHeader genesisHeader = blockchain.getBlockHeader(BlockHeader.GENESIS_BLOCK_NUMBER).get();