From cdf1ae5ef3b0fb00aabf82bcaa67b4b02548ccf0 Mon Sep 17 00:00:00 2001 From: Meredith Baxter Date: Fri, 16 Aug 2019 12:37:34 -0400 Subject: [PATCH 1/5] Fix invalid block handling --- .../eth/sync/PipelineChainDownloader.java | 18 +++++++----- .../eth/sync/PipelineChainDownloaderTest.java | 28 +++++++++++++++++-- 2 files changed, 37 insertions(+), 9 deletions(-) diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/PipelineChainDownloader.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/PipelineChainDownloader.java index ff01ccc43b..1df28979b3 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/PipelineChainDownloader.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/PipelineChainDownloader.java @@ -115,14 +115,10 @@ private CompletionStage repeatUnlessDownloadComplete( private CompletionStage handleFailedDownload(final Throwable error) { pipelineErrorCounter.inc(); - if (!cancelled.get() - && syncTargetManager.shouldContinueDownloading() - && !(ExceptionUtils.rootCause(error) instanceof CancellationException)) { - logDownloadFailure("Chain download failed. Restarting after short delay.", error); - // Allowing the normal looping logic to retry after a brief delay. - return scheduler.scheduleFutureTask(() -> completedFuture(null), PAUSE_AFTER_ERROR_DURATION); - } if (ExceptionUtils.rootCause(error) instanceof InvalidBlockException) { + LOG.warn( + "Invalid block detected. Disconnecting from sync target. {}", + ExceptionUtils.rootCause(error).getMessage()); syncState .syncTarget() .ifPresent( @@ -132,6 +128,14 @@ private CompletionStage handleFailedDownload(final Throwable error) { .disconnect(DisconnectMessage.DisconnectReason.BREACH_OF_PROTOCOL)); } + if (!cancelled.get() + && syncTargetManager.shouldContinueDownloading() + && !(ExceptionUtils.rootCause(error) instanceof CancellationException)) { + logDownloadFailure("Chain download failed. Restarting after short delay.", error); + // Allowing the normal looping logic to retry after a brief delay. + return scheduler.scheduleFutureTask(() -> completedFuture(null), PAUSE_AFTER_ERROR_DURATION); + } + logDownloadFailure("Chain download failed.", error); // Propagate the error out, terminating this chain download. return completedExceptionally(error); diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/PipelineChainDownloaderTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/PipelineChainDownloaderTest.java index 9219369b6a..a6b08f354e 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/PipelineChainDownloaderTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/PipelineChainDownloaderTest.java @@ -255,9 +255,30 @@ public void shouldAbortPipelineIfCancelledAfterDownloadStarts() { } @Test - public void shouldDisconnectPeerIfInvalidBlockException() { + public void shouldDisconnectSyncTargetOnInvalidBlockException_finishedDownloading_cancelled() { + testInvalidBlockHandling(true, true); + } + + @Test + public void + shouldDisconnectSyncTargetOnInvalidBlockException_notFinishedDownloading_notCancelled() { + testInvalidBlockHandling(false, false); + } + + @Test + public void shouldDisconnectSyncTargetOnInvalidBlockException_finishedDownloading_notCancelled() { + testInvalidBlockHandling(true, false); + } + + @Test + public void shouldDisconnectSyncTargetOnInvalidBlockException_notFinishedDownloading_cancelled() { + testInvalidBlockHandling(false, true); + } + + public void testInvalidBlockHandling( + final boolean isFinishedDownloading, final boolean isCancelled) { final CompletableFuture selectTargetFuture = new CompletableFuture<>(); - when(syncTargetManager.shouldContinueDownloading()).thenReturn(false); + when(syncTargetManager.shouldContinueDownloading()).thenReturn(isFinishedDownloading); when(syncTargetManager.findSyncTarget(Optional.empty())) .thenReturn(selectTargetFuture) .thenReturn(new CompletableFuture<>()); @@ -267,6 +288,9 @@ public void shouldDisconnectPeerIfInvalidBlockException() { when(syncState.syncTarget()).thenReturn(Optional.of(target)); chainDownloader.start(); verify(syncTargetManager).findSyncTarget(Optional.empty()); + if (isCancelled) { + chainDownloader.cancel(); + } selectTargetFuture.completeExceptionally(new InvalidBlockException("", 1, null)); verify(ethPeer).disconnect(DisconnectMessage.DisconnectReason.BREACH_OF_PROTOCOL); } From 0c383002bdbee391b76b2836d9d12faa45ef59b3 Mon Sep 17 00:00:00 2001 From: Meredith Baxter Date: Fri, 16 Aug 2019 12:38:41 -0400 Subject: [PATCH 2/5] Making invalid block messages more detailed / verbose --- .../sync/CheckpointHeaderValidationStep.java | 18 +++++++++++----- .../eth/sync/PipelineChainDownloader.java | 5 +++-- .../tasks/DownloadHeaderSequenceTask.java | 2 +- .../exceptions/InvalidBlockException.java | 2 +- .../CheckpointHeaderValidationStepTest.java | 21 ++++++++++++++++--- 5 files changed, 36 insertions(+), 12 deletions(-) diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/CheckpointHeaderValidationStep.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/CheckpointHeaderValidationStep.java index b3bd6819f4..aebd574a13 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/CheckpointHeaderValidationStep.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/CheckpointHeaderValidationStep.java @@ -39,16 +39,24 @@ public CheckpointHeaderValidationStep( @Override public Stream apply(final CheckpointRangeHeaders checkpointRangeHeaders) { - final BlockHeader expectedParent = checkpointRangeHeaders.getCheckpointRange().getStart(); + final BlockHeader rangeStart = checkpointRangeHeaders.getCheckpointRange().getStart(); final BlockHeader firstHeaderToImport = checkpointRangeHeaders.getFirstHeaderToImport(); - if (isValid(expectedParent, firstHeaderToImport)) { + if (isValid(rangeStart, firstHeaderToImport)) { return checkpointRangeHeaders.getHeadersToImport().stream(); } else { + final BlockHeader rangeEnd = checkpointRangeHeaders.getCheckpointRange().getEnd(); + final String errorMessage = + String.format( + "Invalid checkpoint headers. Headers downloaded between #%d (%s) and #%d (%s) do not connect at #%d (%s)", + rangeStart.getNumber(), + rangeStart.getHash(), + rangeEnd.getNumber(), + rangeEnd.getHash(), + firstHeaderToImport.getNumber(), + firstHeaderToImport.getHash()); throw new InvalidBlockException( - "Provided first header does not connect to last header.", - expectedParent.getNumber(), - expectedParent.getHash()); + errorMessage, firstHeaderToImport.getNumber(), firstHeaderToImport.getHash()); } } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/PipelineChainDownloader.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/PipelineChainDownloader.java index 1df28979b3..a5a3218c75 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/PipelineChainDownloader.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/PipelineChainDownloader.java @@ -145,9 +145,10 @@ private void logDownloadFailure(final String message, final Throwable error) { final Throwable rootCause = ExceptionUtils.rootCause(error); if (rootCause instanceof CancellationException || rootCause instanceof InterruptedException) { LOG.trace(message, error); - } else if (rootCause instanceof EthTaskException - || rootCause instanceof InvalidBlockException) { + } else if (rootCause instanceof EthTaskException) { LOG.debug(message, error); + } else if (rootCause instanceof InvalidBlockException) { + LOG.warn(message, error); } else { LOG.error(message, error); } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java index 85b43a9fe0..2f3c9dab53 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/DownloadHeaderSequenceTask.java @@ -204,7 +204,7 @@ private CompletableFuture> processHeaders( headersResult.getPeer().disconnect(DisconnectReason.BREACH_OF_PROTOCOL); future.completeExceptionally( new InvalidBlockException( - "Invalid header", header.getNumber(), header.getHash())); + "Header failed validation.", child.getNumber(), child.getHash())); return future; } headers[headerIndex] = header; diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/exceptions/InvalidBlockException.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/exceptions/InvalidBlockException.java index b55e764cf0..85d670e079 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/exceptions/InvalidBlockException.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/exceptions/InvalidBlockException.java @@ -17,6 +17,6 @@ public class InvalidBlockException extends RuntimeException { public InvalidBlockException(final String message, final long blockNumber, final Hash blockHash) { - super(message + ": " + blockNumber + ", " + blockHash); + super(message + ": Invalid block at #" + blockNumber + " (" + blockHash + ")"); } } diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/CheckpointHeaderValidationStepTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/CheckpointHeaderValidationStepTest.java index 53d69b226e..034dd65fae 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/CheckpointHeaderValidationStepTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/CheckpointHeaderValidationStepTest.java @@ -50,11 +50,12 @@ public class CheckpointHeaderValidationStepTest { private CheckpointHeaderValidationStep validationStep; private final BlockHeader checkpointStart = gen.header(10); + private final BlockHeader checkpointEnd = gen.header(13); private final BlockHeader firstHeader = gen.header(11); private final CheckpointRangeHeaders rangeHeaders = new CheckpointRangeHeaders( - new CheckpointRange(syncTarget, checkpointStart, gen.header(13)), - asList(firstHeader, gen.header(12), gen.header(13))); + new CheckpointRange(syncTarget, checkpointStart, checkpointEnd), + asList(firstHeader, gen.header(12), checkpointEnd)); @Before public void setUp() { @@ -88,6 +89,20 @@ public void shouldThrowExceptionWhenValidationFails() { firstHeader, checkpointStart, protocolContext, DETACHED_ONLY)) .thenReturn(false); assertThatThrownBy(() -> validationStep.apply(rangeHeaders)) - .isInstanceOf(InvalidBlockException.class); + .isInstanceOf(InvalidBlockException.class) + .hasMessageContaining( + "Invalid checkpoint headers. Headers downloaded between #" + + checkpointStart.getNumber() + + " (" + + checkpointStart.getHash() + + ") and #" + + checkpointEnd.getNumber() + + " (" + + checkpointEnd.getHash() + + ") do not connect at #" + + firstHeader.getNumber() + + " (" + + firstHeader.getHash() + + ")"); } } From 7c04790ddb78c67734bacde860594dea8c6ef398 Mon Sep 17 00:00:00 2001 From: Meredith Baxter Date: Fri, 16 Aug 2019 16:32:04 -0400 Subject: [PATCH 3/5] Add integration test for fork management --- .../eth/manager/MockPeerConnection.java | 11 ++ .../ethtaskutils/BlockchainSetupUtil.java | 25 +++- .../FullSyncChainDownloaderForkTest.java | 119 ++++++++++++++++++ .../pipeline/AsyncOperationProcessor.java | 2 +- .../pantheon/testutil/BlockTestUtil.java | 84 +++++++++++-- .../resources/fork-chain-data/common.blocks | Bin 0 -> 3845 bytes .../fork-chain-data/fork-outdated.blocks | Bin 0 -> 6124 bytes .../fork-chain-data/fork-upgraded.blocks | Bin 0 -> 6124 bytes .../fork-chain-data/genesis-outdated.json | 47 +++++++ .../fork-chain-data/genesis-upgraded.json | 48 +++++++ .../fork-chain-data/src/common-blocks.json | 36 ++++++ .../fork-chain-data/src/fork-blocks.json | 28 +++++ 12 files changed, 382 insertions(+), 18 deletions(-) create mode 100644 ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncChainDownloaderForkTest.java create mode 100644 testutil/src/main/resources/fork-chain-data/common.blocks create mode 100644 testutil/src/main/resources/fork-chain-data/fork-outdated.blocks create mode 100644 testutil/src/main/resources/fork-chain-data/fork-upgraded.blocks create mode 100644 testutil/src/main/resources/fork-chain-data/genesis-outdated.json create mode 100644 testutil/src/main/resources/fork-chain-data/genesis-upgraded.json create mode 100644 testutil/src/main/resources/fork-chain-data/src/common-blocks.json create mode 100644 testutil/src/main/resources/fork-chain-data/src/fork-blocks.json diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/MockPeerConnection.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/MockPeerConnection.java index d07d47a01a..3db9dd820a 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/MockPeerConnection.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/MockPeerConnection.java @@ -24,6 +24,7 @@ import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Optional; import java.util.Set; public class MockPeerConnection implements PeerConnection { @@ -35,6 +36,7 @@ public class MockPeerConnection implements PeerConnection { private final BytesValue nodeId; private final Peer peer; private final PeerInfo peerInfo; + private Optional disconnectReason = Optional.empty(); public MockPeerConnection(final Set caps, final PeerSendHandler onSend) { this.caps = caps; @@ -84,9 +86,18 @@ public void terminateConnection(final DisconnectReason reason, final boolean pee @Override public void disconnect(final DisconnectReason reason) { + if (disconnected) { + // Already disconnected + return; + } + disconnectReason = Optional.of(reason); disconnected = true; } + public Optional getDisconnectReason() { + return disconnectReason; + } + @Override public InetSocketAddress getLocalAddress() { throw new UnsupportedOperationException(); diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/BlockchainSetupUtil.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/BlockchainSetupUtil.java index e98313e6fa..c86cc6b57c 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/BlockchainSetupUtil.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/BlockchainSetupUtil.java @@ -16,6 +16,7 @@ import static tech.pegasys.pantheon.ethereum.core.InMemoryStorageProvider.createInMemoryBlockchain; import static tech.pegasys.pantheon.ethereum.core.InMemoryStorageProvider.createInMemoryWorldStateArchive; +import tech.pegasys.pantheon.config.GenesisConfigFile; import tech.pegasys.pantheon.ethereum.ProtocolContext; import tech.pegasys.pantheon.ethereum.chain.Blockchain; import tech.pegasys.pantheon.ethereum.chain.GenesisState; @@ -32,6 +33,7 @@ import tech.pegasys.pantheon.ethereum.util.RawBlockIterator; import tech.pegasys.pantheon.ethereum.worldstate.WorldStateArchive; import tech.pegasys.pantheon.testutil.BlockTestUtil; +import tech.pegasys.pantheon.testutil.BlockTestUtil.ChainResources; import java.io.IOException; import java.net.URISyntaxException; @@ -91,15 +93,28 @@ public int blockCount() { } public static BlockchainSetupUtil forTesting() { - final ProtocolSchedule protocolSchedule = MainnetProtocolSchedule.create(); + return createEthashChain(BlockTestUtil.getTestChainResources()); + } + + public static BlockchainSetupUtil forOutdatedFork() { + return createEthashChain(BlockTestUtil.getOutdatedForkResources()); + } + + public static BlockchainSetupUtil forUpgradedFork() { + return createEthashChain(BlockTestUtil.getUpgradedForkResources()); + } + + private static BlockchainSetupUtil createEthashChain(final ChainResources chainResources) { final TemporaryFolder temp = new TemporaryFolder(); try { temp.create(); - final String genesisJson = - Resources.toString(BlockTestUtil.getTestGenesisUrl(), Charsets.UTF_8); + final String genesisJson = Resources.toString(chainResources.getGenesisURL(), Charsets.UTF_8); - final GenesisState genesisState = GenesisState.fromJson(genesisJson, protocolSchedule); + final GenesisConfigFile genesisConfigFile = GenesisConfigFile.fromConfig(genesisJson); + final ProtocolSchedule protocolSchedule = + MainnetProtocolSchedule.fromConfig(genesisConfigFile.getConfigOptions()); + final GenesisState genesisState = GenesisState.fromJson(genesisJson, protocolSchedule); final MutableBlockchain blockchain = createInMemoryBlockchain(genesisState.getBlock()); final WorldStateArchive worldArchive = createInMemoryWorldStateArchive(); @@ -107,7 +122,7 @@ public static BlockchainSetupUtil forTesting() { final ProtocolContext protocolContext = new ProtocolContext<>(blockchain, worldArchive, null); - final Path blocksPath = Path.of(BlockTestUtil.getTestBlockchainUrl().toURI()); + final Path blocksPath = Path.of(chainResources.getBlocksURL().toURI()); final List blocks = new ArrayList<>(); final BlockHeaderFunctions blockHeaderFunctions = ScheduleBasedBlockHeaderFunctions.create(protocolSchedule); diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncChainDownloaderForkTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncChainDownloaderForkTest.java new file mode 100644 index 0000000000..9697ce3dd1 --- /dev/null +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncChainDownloaderForkTest.java @@ -0,0 +1,119 @@ +/* + * Copyright 2018 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.sync.fullsync; + +import static org.assertj.core.api.Assertions.assertThat; + +import tech.pegasys.pantheon.ethereum.ProtocolContext; +import tech.pegasys.pantheon.ethereum.chain.Blockchain; +import tech.pegasys.pantheon.ethereum.chain.MutableBlockchain; +import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; +import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManager; +import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil; +import tech.pegasys.pantheon.ethereum.eth.manager.EthScheduler; +import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer; +import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer.Responder; +import tech.pegasys.pantheon.ethereum.eth.manager.ethtaskutils.BlockchainSetupUtil; +import tech.pegasys.pantheon.ethereum.eth.sync.ChainDownloader; +import tech.pegasys.pantheon.ethereum.eth.sync.SynchronizerConfiguration; +import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState; +import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; +import tech.pegasys.pantheon.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason; +import tech.pegasys.pantheon.metrics.MetricsSystem; +import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; +import tech.pegasys.pantheon.util.uint.UInt256; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class FullSyncChainDownloaderForkTest { + + protected ProtocolSchedule protocolSchedule; + protected EthProtocolManager ethProtocolManager; + protected EthContext ethContext; + protected ProtocolContext protocolContext; + private SyncState syncState; + + private BlockchainSetupUtil localBlockchainSetup; + protected MutableBlockchain localBlockchain; + private BlockchainSetupUtil otherBlockchainSetup; + protected Blockchain otherBlockchain; + private final MetricsSystem metricsSystem = new NoOpMetricsSystem(); + + @Before + public void setupTest() { + localBlockchainSetup = BlockchainSetupUtil.forUpgradedFork(); + localBlockchain = localBlockchainSetup.getBlockchain(); + otherBlockchainSetup = BlockchainSetupUtil.forOutdatedFork(); + otherBlockchain = otherBlockchainSetup.getBlockchain(); + + protocolSchedule = localBlockchainSetup.getProtocolSchedule(); + protocolContext = localBlockchainSetup.getProtocolContext(); + ethProtocolManager = + EthProtocolManagerTestUtil.create( + localBlockchain, + localBlockchainSetup.getWorldArchive(), + new EthScheduler(1, 1, 1, new NoOpMetricsSystem())); + ethContext = ethProtocolManager.ethContext(); + syncState = new SyncState(protocolContext.getBlockchain(), ethContext.getEthPeers()); + } + + @After + public void tearDown() { + ethProtocolManager.stop(); + } + + private ChainDownloader downloader(final SynchronizerConfiguration syncConfig) { + return FullSyncChainDownloader.create( + syncConfig, protocolSchedule, protocolContext, ethContext, syncState, metricsSystem); + } + + private ChainDownloader downloader() { + final SynchronizerConfiguration syncConfig = syncConfigBuilder().build(); + return downloader(syncConfig); + } + + private SynchronizerConfiguration.Builder syncConfigBuilder() { + return SynchronizerConfiguration.builder(); + } + + @Test + public void disconnectsFromPeerOnBadFork() { + otherBlockchainSetup.importAllBlocks(); + final UInt256 localTd = localBlockchain.getChainHead().getTotalDifficulty(); + + final Responder responder = RespondingEthPeer.blockchainResponder(otherBlockchain); + final RespondingEthPeer peer = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, localTd.plus(100), 100); + + final ChainDownloader downloader = downloader(); + downloader.start(); + + // Process until the sync target is selected + peer.respondWhileOtherThreadsWork(responder, () -> syncState.syncTarget().isEmpty()); + + // Check that we picked our peer + assertThat(syncState.syncTarget()).isPresent(); + assertThat(syncState.syncTarget().get().peer()).isEqualTo(peer.getEthPeer()); + + // Process until the sync target is cleared + peer.respondWhileOtherThreadsWork(responder, () -> syncState.syncTarget().isPresent()); + + // We should have disconnected from our peer on the invalid chain + assertThat(peer.getEthPeer().isDisconnected()).isTrue(); + assertThat(peer.getPeerConnection().getDisconnectReason()) + .contains(DisconnectReason.BREACH_OF_PROTOCOL); + assertThat(syncState.syncTarget()).isEmpty(); + } +} diff --git a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/AsyncOperationProcessor.java b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/AsyncOperationProcessor.java index 29bac46888..bf2c6eaac6 100644 --- a/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/AsyncOperationProcessor.java +++ b/services/pipeline/src/main/java/tech/pegasys/pantheon/services/pipeline/AsyncOperationProcessor.java @@ -81,7 +81,7 @@ private void outputNextCompletedTask(final WritePipe outputPipe) { } catch (final InterruptedException e) { LOG.trace("Interrupted while waiting for processing to complete", e); } catch (final ExecutionException e) { - throw new RuntimeException("Async operation failed", e); + throw new RuntimeException("Async operation failed. " + e.getMessage(), e); } catch (final TimeoutException e) { // Ignore and go back around the loop. } diff --git a/testutil/src/main/java/tech/pegasys/pantheon/testutil/BlockTestUtil.java b/testutil/src/main/java/tech/pegasys/pantheon/testutil/BlockTestUtil.java index 028326b0b1..c24f4bd40b 100644 --- a/testutil/src/main/java/tech/pegasys/pantheon/testutil/BlockTestUtil.java +++ b/testutil/src/main/java/tech/pegasys/pantheon/testutil/BlockTestUtil.java @@ -29,25 +29,67 @@ public final class BlockTestUtil { - private static final Supplier blockchainURLSupplier = - Suppliers.memoize(BlockTestUtil::supplyTestBlockchainURL); - private static final Supplier genesisURLSupplier = - Suppliers.memoize(BlockTestUtil::supplyTestGenesisURL); + private static final Supplier testChainSupplier = + Suppliers.memoize(BlockTestUtil::supplyTestChainResources); + private static final Supplier forkOutdatedSupplier = + Suppliers.memoize(BlockTestUtil::supplyOutdatedForkResources); + private static final Supplier forkUpgradedSupplier = + Suppliers.memoize(BlockTestUtil::supplyUpgradedForkResources); - private static URL supplyTestBlockchainURL() { - return ensureFileUrl(BlockTestUtil.class.getClassLoader().getResource("testBlockchain.blocks")); + public static URL getTestBlockchainUrl() { + return getTestChainResources().getBlocksURL(); } - private static URL supplyTestGenesisURL() { - return ensureFileUrl(BlockTestUtil.class.getClassLoader().getResource("testGenesis.json")); + public static URL getTestGenesisUrl() { + return getTestChainResources().getGenesisURL(); } - public static URL getTestBlockchainUrl() { - return blockchainURLSupplier.get(); + public static ChainResources getTestChainResources() { + return testChainSupplier.get(); } - public static URL getTestGenesisUrl() { - return genesisURLSupplier.get(); + public static ChainResources getOutdatedForkResources() { + return forkOutdatedSupplier.get(); + } + + public static ChainResources getUpgradedForkResources() { + return forkUpgradedSupplier.get(); + } + + private static ChainResources supplyTestChainResources() { + final URL genesisURL = + ensureFileUrl(BlockTestUtil.class.getClassLoader().getResource("testGenesis.json")); + final URL blocksURL = + ensureFileUrl(BlockTestUtil.class.getClassLoader().getResource("testBlockchain.blocks")); + return new ChainResources(genesisURL, blocksURL); + } + + private static ChainResources supplyOutdatedForkResources() { + final URL genesisURL = + ensureFileUrl( + BlockTestUtil.class + .getClassLoader() + .getResource("fork-chain-data/genesis-outdated.json")); + final URL blocksURL = + ensureFileUrl( + BlockTestUtil.class + .getClassLoader() + .getResource("fork-chain-data/fork-outdated.blocks")); + return new ChainResources(genesisURL, blocksURL); + } + + private static ChainResources supplyUpgradedForkResources() { + final URL genesisURL = + ensureFileUrl( + BlockTestUtil.class + .getClassLoader() + .getResource("fork-chain-data/genesis-upgraded.json")); + final URL blocksURL = + ensureFileUrl( + BlockTestUtil.class + .getClassLoader() + .getResource("fork-chain-data/fork-upgraded.blocks")); + return new ChainResources(genesisURL, blocksURL); } /** Take a resource URL and if needed copy it to a temp file and return that URL. */ @@ -84,4 +126,22 @@ public static void write1000Blocks(final Path target) { throw new IllegalStateException(ex); } } + + public static class ChainResources { + private final URL genesisURL; + private final URL blocksURL; + + public ChainResources(final URL genesisURL, final URL blocksURL) { + this.genesisURL = genesisURL; + this.blocksURL = blocksURL; + } + + public URL getGenesisURL() { + return genesisURL; + } + + public URL getBlocksURL() { + return blocksURL; + } + } } diff --git a/testutil/src/main/resources/fork-chain-data/common.blocks b/testutil/src/main/resources/fork-chain-data/common.blocks new file mode 100644 index 0000000000000000000000000000000000000000..d1ba3e2549dcce4d1bd1723f256aa9b785f92f0f GIT binary patch literal 3845 zcmey#B=eI=d;tRiut4^V?~8lKW2;uTZcX2I=8Dv1Ay?@sT_wUF7CQZPxH8#&3N`~4 zT+31OYxLkO3@T2n{Mz$2P_S;xkjeB z;K|mv#%nj<&OCj)%HrWuZt1393kZcy2ULO6fdfAo|NdnBv7qcpb_`>=X>9+B&fR7I zj}#bgF$%TS^$j?f`{q>E{&(4ACXXG~;>FikUi~!rUTtDBac8yO9py7@u7&S*1@0|x zncBilF!l*57)>4wDU9&6(GnYa`9s43#~G~OD;ljP+rQqP^rW<3{uW;b(Uhfsm2H0An`?Y$zFR?AF0(8z!RRHZU^M<2QkYQU|6{{~ zpmjMjP5L;$*4Df=-+8c4ZmY?SIeO(c(%vzNY1Y4gdbZ=+rwRAZ2JA|J$NvMBj+DN} z?KRhZg&Q2-IVFX?>o>e;7a`++_r)$DegPLU<6mS>l7nJ@YM>PByNc^QuIh2(={zA5 zqs^oD9sF`IXV+zd(MwRlX#6vzFr&o(r-lXATUU+8@@1uJn&`-TEa-*42EeVT1v}=q0FNH2xV*}D@y!-X;>gUb+=s8B%h2veaEVE;@wFzzv0Z9zhXs#g zyaEqS&dN~;PY^K-$Sr*NsMqw>l+KdyGWC!irLCJ6a9x?ko4-p&B3Wd5qxce|CoXb( zjMJHlJ1=a%J2U-gq5o+7Go-M=;=gGL2dD-3wP69HJ;T3}PKMy3#-~-4R@a$1pFXkl z$y#K`KOxbT;eT6X$C~b-_3SVA>i&5A<6%SN`{w`uKTQ$9(r;+k(SSu1S%^^ojAD7| z1r~uv zrPCmwj3^33ZXpP$2s3hQn3bCZCvrrLpaCR|v$aw~6;)joKW^TquGjzS?&{yaUiVu> zC@mrsK^VjUmAmXYBTqe}?_Q<{JnHJ--6w}dWu@Td=L#)W%=$BrXKi!=kQszNL9~(# zbBgeZT1cIBl#MSPKFB}Orkmg9e>Io*`4s@ks;mDU007E1a0X8p#GFXYj8#euWz%Q%?^O++e=!(s zG(Ncnm9PXty&dxZ0v0VTi}002I3J8;|L6g~Xy{41LurUyZ41?{)^k3hWq*it>1Jfr^;DKf6i9)*uiESs>f)xJ!}WAWM$v?OCvY* zhkia58W}96ybY2UG9dDY;SlTm-vOZ0{h(ZfG|7dy*jY>+BTa>F@O#0*i@*DG5j!<6 zy`AhPf8eE$cAu>GUh^MQC*#tHHL(Ns@_;$l!uJO*O}EF?RmowPQ!gj8-BL(@g#uqQ zHQG7H-AFEBAQh`|Rxx1f=?r&_`(wsDBdER)k{2={@`vHX*7@@Q@URqDZchhV)|ON> zVxgMWfTlT_UNVaNvIylRXkYU;Yf|P=Ij$+J>N|}-`8#gN=t-5b ztzfUBDX(b?Q_7Ep^siSyA4;6>l=Q5WPixWC|7|R9b93Rqy?HM+T}i*POs*l>P%->ea(PX}n*Z$@ zk^mbUj_jTMNkeAB-6Z0iUQ@|;xDo!-&MpfzX`GP$vo5{YnY47D)GDP*ds71LB8#aM zc~gsliF`UYx;9Vj$PN(jrSAkOf+1o&)xK30o)?-_$H0lQ+A}B9whI0ej!yH8Z zFx;ju{t3mK|M~-506_Rrm;g_KoeC#TMn{>hma&5Plt2!c;wzdRE$-g*@}l%6wi` z`9%d<&FnmUGMm8q`C);xxHYTr6F76JK}FEK!0y1zkeD6>a|2s*Do^5b%h`Xmyz^Rz zy^?idef6eFeJ3=XQ5m1dX48{W)ft+j0eWJ(LbklZ$vr;g)BD&L&-aPhj3eLW#imF5 zM9#(-GJTGi^72x^ zfPYoX5gR|%+U!@KR|2z)e2gzy^1OExOzrU%bL1q|WOTRgw6t?OtTW$g58DmOFa+Vx zWTqtEs-2gf#yVKN>~WH9y1ZwiP-ZhJq$uw7m(9FpLiGoY+D2}TOa5GvO-`WhF6-~J z?UHt1`)%wuJENhGLe`ufcOdU-AaLEE-Ta_UMY`{}^*`dT#8u{HNd3pOjrhb2@>>lV z5H02~+~zMe2R<+5{rUhHw1w88Qz3~2Z@Vo|4W}%7J)9*yO7njhlY8ivf6_9NOi06z zO7lZ4*M8>A8<%G%mha9rZDr{Zdr(KTI}7@PQziv5Hm?U$@lw7*`iE%#LX_w(Sq{Iy z>2@KZ==pI%-w~q1$ zgF=h;IGxCJ!K%JAWspWT>GX;}v13KUki3uqkv|M4wa)(# x0M7bp{j2@ioH{dTORE-pb|H~7N=(Y^xe}50ILCU$lSW3|EXnbWeN@}h@+X%NgT(*< literal 0 HcmV?d00001 diff --git a/testutil/src/main/resources/fork-chain-data/fork-upgraded.blocks b/testutil/src/main/resources/fork-chain-data/fork-upgraded.blocks new file mode 100644 index 0000000000000000000000000000000000000000..ebc3642ff1a75c99418a2965491fc845592a8d3a GIT binary patch literal 6124 zcmeI#XHZkw0tfIUlmLo!vl#laptvB-jbfp!QHoRvARtJLq7gt@TQS)2^p>@oZstSiJiqD|IQ|H2 z^h&KNWS}0t+lT+J=3+i=0T8de_16FZP^y95XJdHVCMhFAAtr!Dn>DDe>AUf^FZ|f> z*dBEB1_-q~l>Y_HT3XiOo9pm3Fofmn4ksPGl=2{j5Lcx4RL03D-Rj zfTp=ZtD$704CCoK{6YeuR(kjw(GV9)(Nc+ni|q~SUX;R@hl$*GB3;}5`P`}*bo<#u z+=(#Hdg)ZcW4TDlI8VdXvS(MO+u1cZ9{t&SHHLJWEvfELiy&mCpQKP4L<&K zHXy`bKv5i$7c#*2hY=7v{Qm?%hev*ShRLGKkrA_)dPcGm&G5GZ9nx>tIEej+qUOfl zQC1dG2jAJzFK_z~sZ$8a>4Eaz>zL+-aeK~KV))U!A$cJKe18~0V2A$#0G<%zNUOZokg|#`88A~$ zZotrcr|;%J8ZM0y^clwVsZr3J&JwgIw{_d!q)CxYWxFJf);?(T%9ksY&=8Gq#A`NwM6fSLS@thnbT&y zBuwdbO@&Zte=}RPLA9= zI+d9~bPmyUOj4uOfMPw_;5{VI+s7KJ zCO6%Lmpu}xnX1&JD#4_DOO%1I~sSC(YEoXghd3)AtYXys9 zdadvi0|yNny@=eR8@S4UH+$#sD4 z?^+LfoRh~zj|E(%sl3GJnGEC zWx0YW6}*5wJFYJ6-HZKa&R;yCyY#{ub^w%MaD#r4m_p8}-q<&dwKea2?;zE5S7oGF zLL~N9N#yvK%lr?DB=WV9RgzmH6H(!LqNoue=Bs%s34>XT_LG|Z$SR5Ap`uH!w4c@Y7TrZ`5^#wiWC1}6(3Cu z^pB}DJ2e+hnjn{Y<&hlNS~#^2YWB^xPKb|Vtu?|uv&(cJLz%oo;qbVAbRn;;z{vs z{~uT07VS~Ft8>ymaI7~y_nh%0woW=~wcuZld3a6 Date: Fri, 16 Aug 2019 17:26:29 -0400 Subject: [PATCH 4/5] Clean up --- .../ethereum/eth/sync/PipelineChainDownloader.java | 10 ++-------- .../pantheon/ethereum/eth/sync/state/SyncState.java | 5 +++++ 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/PipelineChainDownloader.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/PipelineChainDownloader.java index a5a3218c75..022f442619 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/PipelineChainDownloader.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/PipelineChainDownloader.java @@ -21,7 +21,7 @@ import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState; import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncTarget; import tech.pegasys.pantheon.ethereum.eth.sync.tasks.exceptions.InvalidBlockException; -import tech.pegasys.pantheon.ethereum.p2p.rlpx.wire.messages.DisconnectMessage; +import tech.pegasys.pantheon.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason; import tech.pegasys.pantheon.metrics.Counter; import tech.pegasys.pantheon.metrics.LabelledMetric; import tech.pegasys.pantheon.metrics.MetricsSystem; @@ -119,13 +119,7 @@ private CompletionStage handleFailedDownload(final Throwable error) { LOG.warn( "Invalid block detected. Disconnecting from sync target. {}", ExceptionUtils.rootCause(error).getMessage()); - syncState - .syncTarget() - .ifPresent( - syncTarget -> - syncTarget - .peer() - .disconnect(DisconnectMessage.DisconnectReason.BREACH_OF_PROTOCOL)); + syncState.disconnectSyncTarget(DisconnectReason.BREACH_OF_PROTOCOL); } if (!cancelled.get() diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/state/SyncState.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/state/SyncState.java index 7f93d354b5..257a83803c 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/state/SyncState.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/state/SyncState.java @@ -19,6 +19,7 @@ import tech.pegasys.pantheon.ethereum.core.Synchronizer.SyncStatusListener; import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; import tech.pegasys.pantheon.ethereum.eth.manager.EthPeers; +import tech.pegasys.pantheon.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason; import tech.pegasys.pantheon.util.Subscribers; import java.util.Optional; @@ -106,6 +107,10 @@ private boolean isInSyncWithBestPeer(final long syncTolerance) { .orElse(true); } + public void disconnectSyncTarget(final DisconnectReason reason) { + syncTarget.ifPresent(syncTarget -> syncTarget.peer().disconnect(reason)); + } + public void clearSyncTarget() { replaceSyncTarget(Optional.empty()); } From 57e8e382e50f27ae0943fe179407e4c11b65006a Mon Sep 17 00:00:00 2001 From: Meredith Baxter Date: Fri, 16 Aug 2019 18:09:31 -0400 Subject: [PATCH 5/5] Update unit tests --- .../eth/sync/PipelineChainDownloaderTest.java | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/PipelineChainDownloaderTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/PipelineChainDownloaderTest.java index a6b08f354e..8811552835 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/PipelineChainDownloaderTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/PipelineChainDownloaderTest.java @@ -32,7 +32,7 @@ import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState; import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncTarget; import tech.pegasys.pantheon.ethereum.eth.sync.tasks.exceptions.InvalidBlockException; -import tech.pegasys.pantheon.ethereum.p2p.rlpx.wire.messages.DisconnectMessage; +import tech.pegasys.pantheon.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason; import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; import tech.pegasys.pantheon.services.pipeline.Pipeline; @@ -46,7 +46,6 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; -import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; @RunWith(MockitoJUnitRunner.class) @@ -282,17 +281,15 @@ public void testInvalidBlockHandling( when(syncTargetManager.findSyncTarget(Optional.empty())) .thenReturn(selectTargetFuture) .thenReturn(new CompletableFuture<>()); - final EthPeer ethPeer = Mockito.mock(EthPeer.class); - final BlockHeader commonAncestor = Mockito.mock(BlockHeader.class); - final SyncTarget target = new SyncTarget(ethPeer, commonAncestor); - when(syncState.syncTarget()).thenReturn(Optional.of(target)); + chainDownloader.start(); verify(syncTargetManager).findSyncTarget(Optional.empty()); if (isCancelled) { chainDownloader.cancel(); } selectTargetFuture.completeExceptionally(new InvalidBlockException("", 1, null)); - verify(ethPeer).disconnect(DisconnectMessage.DisconnectReason.BREACH_OF_PROTOCOL); + + verify(syncState, times(1)).disconnectSyncTarget(DisconnectReason.BREACH_OF_PROTOCOL); } private CompletableFuture expectPipelineStarted(final SyncTarget syncTarget) {