Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

[PIE-1791] Fix logic to disconnect from peers on fork #1863

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,24 @@ public CheckpointHeaderValidationStep(

@Override
public Stream<BlockHeader> apply(final CheckpointRangeHeaders checkpointRangeHeaders) {
final BlockHeader expectedParent = checkpointRangeHeaders.getCheckpointRange().getStart();
final BlockHeader rangeStart = checkpointRangeHeaders.getCheckpointRange().getStart();
final BlockHeader firstHeaderToImport = checkpointRangeHeaders.getFirstHeaderToImport();

if (isValid(expectedParent, firstHeaderToImport)) {
if (isValid(rangeStart, firstHeaderToImport)) {
return checkpointRangeHeaders.getHeadersToImport().stream();
} else {
final BlockHeader rangeEnd = checkpointRangeHeaders.getCheckpointRange().getEnd();
final String errorMessage =
String.format(
"Invalid checkpoint headers. Headers downloaded between #%d (%s) and #%d (%s) do not connect at #%d (%s)",
rangeStart.getNumber(),
rangeStart.getHash(),
rangeEnd.getNumber(),
rangeEnd.getHash(),
firstHeaderToImport.getNumber(),
firstHeaderToImport.getHash());
throw new InvalidBlockException(
"Provided first header does not connect to last header.",
expectedParent.getNumber(),
expectedParent.getHash());
errorMessage, firstHeaderToImport.getNumber(), firstHeaderToImport.getHash());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncTarget;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.exceptions.InvalidBlockException;
import tech.pegasys.pantheon.ethereum.p2p.rlpx.wire.messages.DisconnectMessage;
import tech.pegasys.pantheon.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
import tech.pegasys.pantheon.metrics.Counter;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.MetricsSystem;
Expand Down Expand Up @@ -115,22 +115,20 @@ private CompletionStage<Void> repeatUnlessDownloadComplete(

private CompletionStage<Void> handleFailedDownload(final Throwable error) {
pipelineErrorCounter.inc();
if (ExceptionUtils.rootCause(error) instanceof InvalidBlockException) {
LOG.warn(
"Invalid block detected. Disconnecting from sync target. {}",
ExceptionUtils.rootCause(error).getMessage());
syncState.disconnectSyncTarget(DisconnectReason.BREACH_OF_PROTOCOL);
}

if (!cancelled.get()
&& syncTargetManager.shouldContinueDownloading()
&& !(ExceptionUtils.rootCause(error) instanceof CancellationException)) {
logDownloadFailure("Chain download failed. Restarting after short delay.", error);
// Allowing the normal looping logic to retry after a brief delay.
return scheduler.scheduleFutureTask(() -> completedFuture(null), PAUSE_AFTER_ERROR_DURATION);
}
if (ExceptionUtils.rootCause(error) instanceof InvalidBlockException) {
syncState
.syncTarget()
.ifPresent(
syncTarget ->
syncTarget
.peer()
.disconnect(DisconnectMessage.DisconnectReason.BREACH_OF_PROTOCOL));
}

logDownloadFailure("Chain download failed.", error);
// Propagate the error out, terminating this chain download.
Expand All @@ -141,9 +139,10 @@ private void logDownloadFailure(final String message, final Throwable error) {
final Throwable rootCause = ExceptionUtils.rootCause(error);
if (rootCause instanceof CancellationException || rootCause instanceof InterruptedException) {
LOG.trace(message, error);
} else if (rootCause instanceof EthTaskException
|| rootCause instanceof InvalidBlockException) {
} else if (rootCause instanceof EthTaskException) {
LOG.debug(message, error);
} else if (rootCause instanceof InvalidBlockException) {
LOG.warn(message, error);
} else {
LOG.error(message, error);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import tech.pegasys.pantheon.ethereum.core.Synchronizer.SyncStatusListener;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeers;
import tech.pegasys.pantheon.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
import tech.pegasys.pantheon.util.Subscribers;

import java.util.Optional;
Expand Down Expand Up @@ -106,6 +107,10 @@ private boolean isInSyncWithBestPeer(final long syncTolerance) {
.orElse(true);
}

public void disconnectSyncTarget(final DisconnectReason reason) {
syncTarget.ifPresent(syncTarget -> syncTarget.peer().disconnect(reason));
}

public void clearSyncTarget() {
replaceSyncTarget(Optional.empty());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ private CompletableFuture<List<BlockHeader>> processHeaders(
headersResult.getPeer().disconnect(DisconnectReason.BREACH_OF_PROTOCOL);
future.completeExceptionally(
new InvalidBlockException(
"Invalid header", header.getNumber(), header.getHash()));
"Header failed validation.", child.getNumber(), child.getHash()));
return future;
}
headers[headerIndex] = header;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@
public class InvalidBlockException extends RuntimeException {

public InvalidBlockException(final String message, final long blockNumber, final Hash blockHash) {
super(message + ": " + blockNumber + ", " + blockHash);
super(message + ": Invalid block at #" + blockNumber + " (" + blockHash + ")");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Optional;
import java.util.Set;

public class MockPeerConnection implements PeerConnection {
Expand All @@ -35,6 +36,7 @@ public class MockPeerConnection implements PeerConnection {
private final BytesValue nodeId;
private final Peer peer;
private final PeerInfo peerInfo;
private Optional<DisconnectReason> disconnectReason = Optional.empty();

public MockPeerConnection(final Set<Capability> caps, final PeerSendHandler onSend) {
this.caps = caps;
Expand Down Expand Up @@ -84,9 +86,18 @@ public void terminateConnection(final DisconnectReason reason, final boolean pee

@Override
public void disconnect(final DisconnectReason reason) {
if (disconnected) {
// Already disconnected
return;
}
disconnectReason = Optional.of(reason);
disconnected = true;
}

public Optional<DisconnectReason> getDisconnectReason() {
return disconnectReason;
}

@Override
public InetSocketAddress getLocalAddress() {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static tech.pegasys.pantheon.ethereum.core.InMemoryStorageProvider.createInMemoryBlockchain;
import static tech.pegasys.pantheon.ethereum.core.InMemoryStorageProvider.createInMemoryWorldStateArchive;

import tech.pegasys.pantheon.config.GenesisConfigFile;
import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.chain.Blockchain;
import tech.pegasys.pantheon.ethereum.chain.GenesisState;
Expand All @@ -32,6 +33,7 @@
import tech.pegasys.pantheon.ethereum.util.RawBlockIterator;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateArchive;
import tech.pegasys.pantheon.testutil.BlockTestUtil;
import tech.pegasys.pantheon.testutil.BlockTestUtil.ChainResources;

import java.io.IOException;
import java.net.URISyntaxException;
Expand Down Expand Up @@ -91,23 +93,36 @@ public int blockCount() {
}

public static BlockchainSetupUtil<Void> forTesting() {
final ProtocolSchedule<Void> protocolSchedule = MainnetProtocolSchedule.create();
return createEthashChain(BlockTestUtil.getTestChainResources());
}

public static BlockchainSetupUtil<Void> forOutdatedFork() {
return createEthashChain(BlockTestUtil.getOutdatedForkResources());
}

public static BlockchainSetupUtil<Void> forUpgradedFork() {
return createEthashChain(BlockTestUtil.getUpgradedForkResources());
}

private static BlockchainSetupUtil<Void> createEthashChain(final ChainResources chainResources) {
final TemporaryFolder temp = new TemporaryFolder();
try {
temp.create();
final String genesisJson =
Resources.toString(BlockTestUtil.getTestGenesisUrl(), Charsets.UTF_8);
final String genesisJson = Resources.toString(chainResources.getGenesisURL(), Charsets.UTF_8);

final GenesisState genesisState = GenesisState.fromJson(genesisJson, protocolSchedule);
final GenesisConfigFile genesisConfigFile = GenesisConfigFile.fromConfig(genesisJson);
final ProtocolSchedule<Void> protocolSchedule =
MainnetProtocolSchedule.fromConfig(genesisConfigFile.getConfigOptions());

final GenesisState genesisState = GenesisState.fromJson(genesisJson, protocolSchedule);
final MutableBlockchain blockchain = createInMemoryBlockchain(genesisState.getBlock());
final WorldStateArchive worldArchive = createInMemoryWorldStateArchive();

genesisState.writeStateTo(worldArchive.getMutable());
final ProtocolContext<Void> protocolContext =
new ProtocolContext<>(blockchain, worldArchive, null);

final Path blocksPath = Path.of(BlockTestUtil.getTestBlockchainUrl().toURI());
final Path blocksPath = Path.of(chainResources.getBlocksURL().toURI());
final List<Block> blocks = new ArrayList<>();
final BlockHeaderFunctions blockHeaderFunctions =
ScheduleBasedBlockHeaderFunctions.create(protocolSchedule);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,12 @@ public class CheckpointHeaderValidationStepTest {
private CheckpointHeaderValidationStep<Void> validationStep;

private final BlockHeader checkpointStart = gen.header(10);
private final BlockHeader checkpointEnd = gen.header(13);
private final BlockHeader firstHeader = gen.header(11);
private final CheckpointRangeHeaders rangeHeaders =
new CheckpointRangeHeaders(
new CheckpointRange(syncTarget, checkpointStart, gen.header(13)),
asList(firstHeader, gen.header(12), gen.header(13)));
new CheckpointRange(syncTarget, checkpointStart, checkpointEnd),
asList(firstHeader, gen.header(12), checkpointEnd));

@Before
public void setUp() {
Expand Down Expand Up @@ -88,6 +89,20 @@ public void shouldThrowExceptionWhenValidationFails() {
firstHeader, checkpointStart, protocolContext, DETACHED_ONLY))
.thenReturn(false);
assertThatThrownBy(() -> validationStep.apply(rangeHeaders))
.isInstanceOf(InvalidBlockException.class);
.isInstanceOf(InvalidBlockException.class)
.hasMessageContaining(
"Invalid checkpoint headers. Headers downloaded between #"
+ checkpointStart.getNumber()
+ " ("
+ checkpointStart.getHash()
+ ") and #"
+ checkpointEnd.getNumber()
+ " ("
+ checkpointEnd.getHash()
+ ") do not connect at #"
+ firstHeader.getNumber()
+ " ("
+ firstHeader.getHash()
+ ")");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncTarget;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.exceptions.InvalidBlockException;
import tech.pegasys.pantheon.ethereum.p2p.rlpx.wire.messages.DisconnectMessage;
import tech.pegasys.pantheon.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.services.pipeline.Pipeline;

Expand All @@ -46,7 +46,6 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
Expand Down Expand Up @@ -255,20 +254,42 @@ public void shouldAbortPipelineIfCancelledAfterDownloadStarts() {
}

@Test
public void shouldDisconnectPeerIfInvalidBlockException() {
public void shouldDisconnectSyncTargetOnInvalidBlockException_finishedDownloading_cancelled() {
testInvalidBlockHandling(true, true);
}

@Test
public void
shouldDisconnectSyncTargetOnInvalidBlockException_notFinishedDownloading_notCancelled() {
testInvalidBlockHandling(false, false);
}

@Test
public void shouldDisconnectSyncTargetOnInvalidBlockException_finishedDownloading_notCancelled() {
testInvalidBlockHandling(true, false);
}

@Test
public void shouldDisconnectSyncTargetOnInvalidBlockException_notFinishedDownloading_cancelled() {
testInvalidBlockHandling(false, true);
}

public void testInvalidBlockHandling(
final boolean isFinishedDownloading, final boolean isCancelled) {
final CompletableFuture<SyncTarget> selectTargetFuture = new CompletableFuture<>();
when(syncTargetManager.shouldContinueDownloading()).thenReturn(false);
when(syncTargetManager.shouldContinueDownloading()).thenReturn(isFinishedDownloading);
when(syncTargetManager.findSyncTarget(Optional.empty()))
.thenReturn(selectTargetFuture)
.thenReturn(new CompletableFuture<>());
final EthPeer ethPeer = Mockito.mock(EthPeer.class);
final BlockHeader commonAncestor = Mockito.mock(BlockHeader.class);
final SyncTarget target = new SyncTarget(ethPeer, commonAncestor);
when(syncState.syncTarget()).thenReturn(Optional.of(target));

chainDownloader.start();
verify(syncTargetManager).findSyncTarget(Optional.empty());
if (isCancelled) {
chainDownloader.cancel();
}
selectTargetFuture.completeExceptionally(new InvalidBlockException("", 1, null));
verify(ethPeer).disconnect(DisconnectMessage.DisconnectReason.BREACH_OF_PROTOCOL);

verify(syncState, times(1)).disconnectSyncTarget(DisconnectReason.BREACH_OF_PROTOCOL);
}

private CompletableFuture<Void> expectPipelineStarted(final SyncTarget syncTarget) {
Expand Down
Loading