Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
Disconnect peers before the pivot block while fast syncing (#1139)
Browse files Browse the repository at this point in the history
  • Loading branch information
ajsutton authored Mar 20, 2019
1 parent 25b3a49 commit c6edd4f
Show file tree
Hide file tree
Showing 11 changed files with 174 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason;
import tech.pegasys.pantheon.metrics.MetricsSystem;

import java.util.function.Supplier;

import org.apache.logging.log4j.Logger;

public class ChainHeadTracker implements ConnectCallback {
Expand All @@ -51,14 +53,10 @@ public static void trackChainHeadForPeers(
final EthContext ethContext,
final ProtocolSchedule<?> protocolSchedule,
final Blockchain blockchain,
final SynchronizerConfiguration syncConfiguration,
final Supplier<TrailingPeerRequirements> trailingPeerRequirementsCalculator,
final MetricsSystem metricsSystem) {
final TrailingPeerLimiter trailingPeerLimiter =
new TrailingPeerLimiter(
ethContext.getEthPeers(),
blockchain,
syncConfiguration.trailingPeerBlocksBehindThreshold(),
syncConfiguration.maxTrailingPeers());
new TrailingPeerLimiter(ethContext.getEthPeers(), trailingPeerRequirementsCalculator);
final ChainHeadTracker tracker =
new ChainHeadTracker(ethContext, protocolSchedule, trailingPeerLimiter, metricsSystem);
ethContext.getEthPeers().subscribeConnect(tracker);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,11 @@ public DefaultSynchronizer(
new BlockBroadcaster(ethContext));

ChainHeadTracker.trackChainHeadForPeers(
ethContext, protocolSchedule, protocolContext.getBlockchain(), syncConfig, metricsSystem);
ethContext,
protocolSchedule,
protocolContext.getBlockchain(),
this::calculateTrailingPeerRequirements,
metricsSystem);

this.fullSyncDownloader =
new FullSyncDownloader<>(
Expand All @@ -92,6 +96,12 @@ public DefaultSynchronizer(
syncState);
}

private TrailingPeerRequirements calculateTrailingPeerRequirements() {
return fastSynchronizer
.flatMap(FastSynchronizer::calculateTrailingPeerRequirements)
.orElse(TrailingPeerRequirements.UNRESTRICTED);
}

@Override
public void start() {
if (started.compareAndSet(false, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,4 +182,8 @@ private static CachingTaskCollection<NodeDataRequest> createWorldStateDownloader

return taskCollection;
}

public Optional<TrailingPeerRequirements> calculateTrailingPeerRequirements() {
return fastSyncDownloader.getTrailingPeerRequirements();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ public class SynchronizerConfiguration {
private final int downloaderCheckpointTimeoutsPermitted;
private final int downloaderChainSegmentTimeoutsPermitted;
private final int downloaderChainSegmentSize;
private final long trailingPeerBlocksBehindThreshold;
private final int maxTrailingPeers;
private final int downloaderParallelism;
private final int transactionsParallelism;
private final int computationParallelism;
Expand All @@ -75,8 +73,6 @@ private SynchronizerConfiguration(
final int downloaderCheckpointTimeoutsPermitted,
final int downloaderChainSegmentTimeoutsPermitted,
final int downloaderChainSegmentSize,
final long trailingPeerBlocksBehindThreshold,
final int maxTrailingPeers,
final int downloaderParallelism,
final int transactionsParallelism,
final int computationParallelism) {
Expand All @@ -95,8 +91,6 @@ private SynchronizerConfiguration(
this.downloaderCheckpointTimeoutsPermitted = downloaderCheckpointTimeoutsPermitted;
this.downloaderChainSegmentTimeoutsPermitted = downloaderChainSegmentTimeoutsPermitted;
this.downloaderChainSegmentSize = downloaderChainSegmentSize;
this.trailingPeerBlocksBehindThreshold = trailingPeerBlocksBehindThreshold;
this.maxTrailingPeers = maxTrailingPeers;
this.downloaderParallelism = downloaderParallelism;
this.transactionsParallelism = transactionsParallelism;
this.computationParallelism = computationParallelism;
Expand Down Expand Up @@ -159,19 +153,6 @@ public int downloaderChainSegmentSize() {
return downloaderChainSegmentSize;
}

/**
* The number of blocks behind we allow a peer to be before considering them a trailing peer.
*
* @return the maximum number of blocks behind a peer can be while being considered current.
*/
public long trailingPeerBlocksBehindThreshold() {
return trailingPeerBlocksBehindThreshold;
}

public int maxTrailingPeers() {
return maxTrailingPeers;
}

public int downloaderParallelism() {
return downloaderParallelism;
}
Expand Down Expand Up @@ -224,8 +205,6 @@ public static class Builder {
private int downloaderCheckpointTimeoutsPermitted = 5;
private int downloaderChainSegmentTimeoutsPermitted = 5;
private int downloaderChainSegmentSize = 200;
private long trailingPeerBlocksBehindThreshold;
private int maxTrailingPeers = Integer.MAX_VALUE;
private int downloaderParallelism = 4;
private int transactionsParallelism = 2;
private int computationParallelism = Runtime.getRuntime().availableProcessors();
Expand Down Expand Up @@ -293,16 +272,6 @@ public Builder blockPropagationRange(final long min, final long max) {
return this;
}

public Builder trailingPeerBlocksBehindThreshold(final long trailingPeerBlocksBehindThreshold) {
this.trailingPeerBlocksBehindThreshold = trailingPeerBlocksBehindThreshold;
return this;
}

public Builder maxTrailingPeers(final int maxTrailingPeers) {
this.maxTrailingPeers = maxTrailingPeers;
return this;
}

public Builder downloaderParallelisim(final int downloaderParallelism) {
this.downloaderParallelism = downloaderParallelism;
return this;
Expand Down Expand Up @@ -361,8 +330,6 @@ public SynchronizerConfiguration build() {
downloaderCheckpointTimeoutsPermitted,
downloaderChainSegmentTimeoutsPermitted,
downloaderChainSegmentSize,
trailingPeerBlocksBehindThreshold,
maxTrailingPeers,
downloaderParallelism,
transactionsParallelism,
computationParallelism);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import java.util.Comparator;
import java.util.List;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.apache.logging.log4j.Logger;
Expand All @@ -37,30 +38,27 @@ public class TrailingPeerLimiter implements BlockAddedObserver {
// how often we rerun the check.
private static final int RECHECK_PEERS_WHEN_BLOCK_NUMBER_MULTIPLE_OF = 100;
private final EthPeers ethPeers;
private final Blockchain blockchain;
private final long trailingPeerBlocksBehindThreshold;
private final int maxTrailingPeers;
private final Supplier<TrailingPeerRequirements> trailingPeerRequirementsCalculator;

public TrailingPeerLimiter(
final EthPeers ethPeers,
final Blockchain blockchain,
final long trailingPeerBlocksBehindThreshold,
final int maxTrailingPeers) {
final Supplier<TrailingPeerRequirements> trailingPeerRequirementsCalculator) {
this.ethPeers = ethPeers;
this.blockchain = blockchain;
this.trailingPeerBlocksBehindThreshold = trailingPeerBlocksBehindThreshold;
this.maxTrailingPeers = maxTrailingPeers;
this.trailingPeerRequirementsCalculator = trailingPeerRequirementsCalculator;
}

public void enforceTrailingPeerLimit() {
final TrailingPeerRequirements requirements = trailingPeerRequirementsCalculator.get();
if (requirements.getMaxTrailingPeers() == Long.MAX_VALUE) {
return;
}
final long minimumHeightToBeUpToDate = requirements.getMinimumHeightToBeUpToDate();
final long maxTrailingPeers = requirements.getMaxTrailingPeers();
final List<EthPeer> trailingPeers =
ethPeers
.availablePeers()
.filter(peer -> peer.chainState().hasEstimatedHeight())
.filter(
peer ->
peer.chainState().getEstimatedHeight() + trailingPeerBlocksBehindThreshold
< blockchain.getChainHeadBlockNumber())
.filter(peer -> peer.chainState().getEstimatedHeight() < minimumHeightToBeUpToDate)
.sorted(BY_CHAIN_HEIGHT)
.collect(Collectors.toList());

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.eth.sync;

import tech.pegasys.pantheon.ethereum.core.BlockHeader;

import java.util.Objects;

import com.google.common.base.MoreObjects;

public class TrailingPeerRequirements {
public static TrailingPeerRequirements UNRESTRICTED =
new TrailingPeerRequirements(BlockHeader.GENESIS_BLOCK_NUMBER, Long.MAX_VALUE);
private final long minimumHeightToBeUpToDate;
private final long maxTrailingPeers;

public TrailingPeerRequirements(
final long minimumHeightToBeUpToDate, final long maxTrailingPeers) {
this.minimumHeightToBeUpToDate = minimumHeightToBeUpToDate;
this.maxTrailingPeers = maxTrailingPeers;
}

public long getMinimumHeightToBeUpToDate() {
return minimumHeightToBeUpToDate;
}

public long getMaxTrailingPeers() {
return maxTrailingPeers;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final TrailingPeerRequirements that = (TrailingPeerRequirements) o;
return minimumHeightToBeUpToDate == that.minimumHeightToBeUpToDate
&& maxTrailingPeers == that.maxTrailingPeers;
}

@Override
public int hashCode() {
return Objects.hash(minimumHeightToBeUpToDate, maxTrailingPeers);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("minimumHeightToBeUpToDate", minimumHeightToBeUpToDate)
.add("maxTrailingPeers", maxTrailingPeers)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
import static tech.pegasys.pantheon.util.FutureUtils.completedExceptionally;
import static tech.pegasys.pantheon.util.FutureUtils.exceptionallyCompose;

import tech.pegasys.pantheon.ethereum.eth.sync.TrailingPeerRequirements;
import tech.pegasys.pantheon.ethereum.eth.sync.worldstate.StalledDownloadException;
import tech.pegasys.pantheon.ethereum.eth.sync.worldstate.WorldStateDownloader;
import tech.pegasys.pantheon.util.ExceptionUtils;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import org.apache.logging.log4j.LogManager;
Expand All @@ -29,6 +31,7 @@ public class FastSyncDownloader<C> {
private final FastSyncActions<C> fastSyncActions;
private final WorldStateDownloader worldStateDownloader;
private final FastSyncStateStorage fastSyncStateStorage;
private volatile Optional<TrailingPeerRequirements> trailingPeerRequirements = Optional.empty();

public FastSyncDownloader(
final FastSyncActions<C> fastSyncActions,
Expand All @@ -45,12 +48,14 @@ public CompletableFuture<FastSyncState> start(final FastSyncState fastSyncState)
.waitForSuitablePeers(fastSyncState)
.thenCompose(fastSyncActions::selectPivotBlock)
.thenCompose(fastSyncActions::downloadPivotBlockHeader)
.thenApply(this::updateMaxTrailingPeers)
.thenApply(this::storeState)
.thenCompose(this::downloadChainAndWorldState),
this::handleWorldStateUnavailable);
}

private CompletableFuture<FastSyncState> handleWorldStateUnavailable(final Throwable error) {
trailingPeerRequirements = Optional.empty();
if (ExceptionUtils.rootCause(error) instanceof StalledDownloadException) {
LOG.warn(
"Fast sync was unable to download the world state. Retrying with a new pivot block.");
Expand All @@ -60,6 +65,16 @@ private CompletableFuture<FastSyncState> handleWorldStateUnavailable(final Throw
}
}

private FastSyncState updateMaxTrailingPeers(final FastSyncState state) {
if (state.getPivotBlockNumber().isPresent()) {
trailingPeerRequirements =
Optional.of(new TrailingPeerRequirements(state.getPivotBlockNumber().getAsLong(), 0));
} else {
trailingPeerRequirements = Optional.empty();
}
return state;
}

private FastSyncState storeState(final FastSyncState state) {
fastSyncStateStorage.storeState(state);
return state;
Expand All @@ -85,6 +100,14 @@ private CompletableFuture<FastSyncState> downloadChainAndWorldState(
});

return CompletableFuture.allOf(worldStateFuture, chainFuture)
.thenApply(complete -> currentState);
.thenApply(
complete -> {
trailingPeerRequirements = Optional.empty();
return currentState;
});
}

public Optional<TrailingPeerRequirements> getTrailingPeerRequirements() {
return trailingPeerRequirements;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,14 @@ public class TrailingPeerLimiterTest {
private final List<EthPeer> peers = new ArrayList<>();
private final TrailingPeerLimiter trailingPeerLimiter =
new TrailingPeerLimiter(
ethPeers, blockchain, TRAILING_PEER_BLOCKS_BEHIND_THRESHOLD, MAX_TRAILING_PEERS);
ethPeers,
() ->
new TrailingPeerRequirements(
CHAIN_HEAD - TRAILING_PEER_BLOCKS_BEHIND_THRESHOLD, MAX_TRAILING_PEERS));

@Before
public void setUp() {
when(ethPeers.availablePeers()).then(invocation -> peers.stream());
when(blockchain.getChainHeadBlockNumber()).thenReturn(CHAIN_HEAD);
}

@Test
Expand Down
Loading

0 comments on commit c6edd4f

Please sign in to comment.