Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
[PAN-2427] Prep chain downloader for branch by abstraction (#1194)
Browse files Browse the repository at this point in the history
  • Loading branch information
ajsutton authored Apr 2, 2019
1 parent 887eaae commit bf9d3b4
Show file tree
Hide file tree
Showing 16 changed files with 1,261 additions and 1,058 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,250 +12,11 @@
*/
package tech.pegasys.pantheon.ethereum.eth.sync;

import static java.util.Collections.emptyList;

import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.EthTaskException;
import tech.pegasys.pantheon.ethereum.eth.manager.task.WaitForPeersTask;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncTarget;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.exceptions.InvalidBlockException;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.util.ExceptionUtils;

import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import com.google.common.annotations.VisibleForTesting;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class ChainDownloader<C> {
private static final Logger LOG = LogManager.getLogger();

private final SynchronizerConfiguration config;
private final EthContext ethContext;
private final SyncState syncState;
private final SyncTargetManager<C> syncTargetManager;
private final CheckpointHeaderManager<C> checkpointHeaderManager;
private final BlockImportTaskFactory blockImportTaskFactory;
private final MetricsSystem metricsSystem;
private final CompletableFuture<Void> downloadFuture = new CompletableFuture<>();

private int chainSegmentTimeouts = 0;

private final AtomicBoolean started = new AtomicBoolean(false);
private CompletableFuture<?> currentTask;

public ChainDownloader(
final SynchronizerConfiguration config,
final EthContext ethContext,
final SyncState syncState,
final SyncTargetManager<C> syncTargetManager,
final CheckpointHeaderManager<C> checkpointHeaderManager,
final BlockImportTaskFactory blockImportTaskFactory,
final MetricsSystem metricsSystem) {
this.metricsSystem = metricsSystem;
this.config = config;
this.ethContext = ethContext;

this.syncState = syncState;
this.syncTargetManager = syncTargetManager;
this.checkpointHeaderManager = checkpointHeaderManager;
this.blockImportTaskFactory = blockImportTaskFactory;
}

public CompletableFuture<Void> start() {
if (started.compareAndSet(false, true)) {
executeDownload();
return downloadFuture;
} else {
throw new IllegalStateException(
"Attempt to start an already started " + this.getClass().getSimpleName() + ".");
}
}

@VisibleForTesting
public CompletableFuture<?> getCurrentTask() {
return currentTask;
}

private void executeDownload() {
if (downloadFuture.isDone()) {
return;
}
// Find target, pull checkpoint headers, import, repeat
currentTask =
waitForPeers()
.thenCompose(r -> syncTargetManager.findSyncTarget(syncState.syncTarget()))
.thenApply(this::updateSyncState)
.thenCompose(this::pullCheckpointHeaders)
.thenCompose(this::importBlocks)
.thenCompose(r -> checkSyncTarget())
.whenComplete(
(r, t) -> {
if (t != null) {
final Throwable rootCause = ExceptionUtils.rootCause(t);
if (rootCause instanceof CancellationException) {
LOG.trace("Download cancelled", t);
} else if (rootCause instanceof InvalidBlockException) {
LOG.debug("Invalid block downloaded", t);
} else if (rootCause instanceof EthTaskException) {
LOG.debug(rootCause.toString());
} else if (rootCause instanceof InterruptedException) {
LOG.trace("Interrupted while downloading chain", rootCause);
} else {
LOG.error("Error encountered while downloading", t);
}
// On error, wait a bit before retrying
ethContext
.getScheduler()
.scheduleFutureTask(this::executeDownload, Duration.ofSeconds(2));
} else if (syncTargetManager.shouldContinueDownloading()) {
executeDownload();
} else {
LOG.info("Chain download complete");
downloadFuture.complete(null);
}
});
}

private SyncTarget updateSyncState(final SyncTarget newTarget) {
if (isSameAsCurrentTarget(newTarget)) {
return syncState.syncTarget().get();
}
return syncState.setSyncTarget(newTarget.peer(), newTarget.commonAncestor());
}

private Boolean isSameAsCurrentTarget(final SyncTarget newTarget) {
return syncState
.syncTarget()
.map(currentTarget -> currentTarget.equals(newTarget))
.orElse(false);
}

private CompletableFuture<List<BlockHeader>> pullCheckpointHeaders(final SyncTarget syncTarget) {
return syncTarget.peer().isDisconnected()
? CompletableFuture.completedFuture(emptyList())
: checkpointHeaderManager.pullCheckpointHeaders(syncTarget);
}

private CompletableFuture<?> waitForPeers() {
return WaitForPeersTask.create(ethContext, 1, metricsSystem).run();
}

private CompletableFuture<Void> checkSyncTarget() {
final Optional<SyncTarget> maybeSyncTarget = syncState.syncTarget();
if (!maybeSyncTarget.isPresent()) {
// No sync target, so nothing to check.
return CompletableFuture.completedFuture(null);
}

final SyncTarget syncTarget = maybeSyncTarget.get();
if (syncTargetManager.shouldSwitchSyncTarget(syncTarget)) {
LOG.info("Better sync target found, clear current sync target: {}.", syncTarget);
clearSyncTarget(syncTarget);
return CompletableFuture.completedFuture(null);
}
if (finishedSyncingToCurrentTarget(syncTarget)) {
LOG.info("Finished syncing to target: {}.", syncTarget);
clearSyncTarget(syncTarget);
// Wait a bit before checking for a new sync target
final CompletableFuture<Void> future = new CompletableFuture<>();
ethContext
.getScheduler()
.scheduleFutureTask(() -> future.complete(null), Duration.ofSeconds(10));
return future;
}
return CompletableFuture.completedFuture(null);
}

private boolean finishedSyncingToCurrentTarget(final SyncTarget syncTarget) {
return !syncTargetManager.syncTargetCanProvideMoreBlocks(syncTarget)
|| checkpointHeaderManager.checkpointsHaveTimedOut()
|| chainSegmentsHaveTimedOut();
}

private boolean chainSegmentsHaveTimedOut() {
return chainSegmentTimeouts >= config.downloaderChainSegmentTimeoutsPermitted();
}

private void clearSyncTarget() {
syncState.syncTarget().ifPresent(this::clearSyncTarget);
}

private void clearSyncTarget(final SyncTarget syncTarget) {
chainSegmentTimeouts = 0;
checkpointHeaderManager.clearSyncTarget();
syncState.clearSyncTarget();
}

private CompletableFuture<List<Block>> importBlocks(final List<BlockHeader> checkpointHeaders) {
if (checkpointHeaders.isEmpty()) {
// No checkpoints to download
return CompletableFuture.completedFuture(emptyList());
}

final CompletableFuture<List<Block>> importedBlocks =
blockImportTaskFactory.importBlocksForCheckpoints(checkpointHeaders);

return importedBlocks.whenComplete(
(r, t) -> {
t = ExceptionUtils.rootCause(t);
if (t instanceof InvalidBlockException) {
// Blocks were invalid, meaning our checkpoints are wrong
// Reset sync target
final Optional<SyncTarget> maybeSyncTarget = syncState.syncTarget();
maybeSyncTarget.ifPresent(
target -> target.peer().disconnect(DisconnectReason.BREACH_OF_PROTOCOL));
final String peerDescriptor =
maybeSyncTarget
.map(SyncTarget::peer)
.map(EthPeer::toString)
.orElse("(unknown - already disconnected)");
LOG.warn(
"Invalid block discovered while downloading from peer {}. Disconnect.",
peerDescriptor);
clearSyncTarget();
} else if (t != null || r.isEmpty()) {
if (t != null) {
final Throwable rootCause = ExceptionUtils.rootCause(t);
if (rootCause instanceof EthTaskException) {
LOG.debug(rootCause.toString());
} else if (rootCause instanceof InterruptedException) {
LOG.trace("Interrupted while importing blocks", rootCause);
} else {
LOG.error("Encountered error importing blocks", t);
}
}
if (checkpointHeaderManager.clearImportedCheckpointHeaders()) {
chainSegmentTimeouts = 0;
}
if (t instanceof TimeoutException || r != null) {
// Download timed out, or returned no new blocks
chainSegmentTimeouts++;
}
} else {
chainSegmentTimeouts = 0;
public interface ChainDownloader {

final BlockHeader lastImportedCheckpoint =
checkpointHeaderManager.allCheckpointsImported();
syncState.setCommonAncestor(lastImportedCheckpoint);
}
});
}
CompletableFuture<Void> start();

public interface BlockImportTaskFactory {
CompletableFuture<List<Block>> importBlocksForCheckpoints(
final List<BlockHeader> checkpointHeaders);
}
void cancel();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import tech.pegasys.pantheon.ethereum.core.SyncStatus;
import tech.pegasys.pantheon.ethereum.core.Synchronizer;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastDownloaderFactory;
import tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncDownloader;
import tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncException;
import tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncState;
import tech.pegasys.pantheon.ethereum.eth.sync.fullsync.FullSyncDownloader;
Expand All @@ -43,10 +45,10 @@ public class DefaultSynchronizer<C> implements Synchronizer {

private final SyncState syncState;
private final AtomicBoolean started = new AtomicBoolean(false);
private final Subscribers<SyncStatusListener> syncStatusListeners = new Subscribers<>();
private final BlockPropagationManager<C> blockPropagationManager;
private final Optional<FastSyncDownloader<C>> fastSyncDownloader;
private final FullSyncDownloader<C> fullSyncDownloader;
private final Optional<FastSynchronizer<C>> fastSynchronizer;
private final Subscribers<SyncStatusListener> syncStatusListeners = new Subscribers<>();

public DefaultSynchronizer(
final SynchronizerConfiguration syncConfig,
Expand All @@ -60,6 +62,13 @@ public DefaultSynchronizer(
final MetricsSystem metricsSystem) {
this.syncState = syncState;

ChainHeadTracker.trackChainHeadForPeers(
ethContext,
protocolSchedule,
protocolContext.getBlockchain(),
this::calculateTrailingPeerRequirements,
metricsSystem);

this.blockPropagationManager =
new BlockPropagationManager<>(
syncConfig,
Expand All @@ -71,19 +80,11 @@ public DefaultSynchronizer(
metricsSystem,
new BlockBroadcaster(ethContext));

ChainHeadTracker.trackChainHeadForPeers(
ethContext,
protocolSchedule,
protocolContext.getBlockchain(),
this::calculateTrailingPeerRequirements,
metricsSystem);

this.fullSyncDownloader =
new FullSyncDownloader<>(
syncConfig, protocolSchedule, protocolContext, ethContext, syncState, metricsSystem);

fastSynchronizer =
FastSynchronizer.create(
this.fastSyncDownloader =
FastDownloaderFactory.create(
syncConfig,
dataDirectory,
protocolSchedule,
Expand All @@ -96,17 +97,17 @@ public DefaultSynchronizer(
}

private TrailingPeerRequirements calculateTrailingPeerRequirements() {
return fastSynchronizer
.flatMap(FastSynchronizer::calculateTrailingPeerRequirements)
return fastSyncDownloader
.flatMap(FastSyncDownloader::calculateTrailingPeerRequirements)
.orElseGet(fullSyncDownloader::calculateTrailingPeerRequirements);
}

@Override
public void start() {
if (started.compareAndSet(false, true)) {
syncState.addSyncStatusListener(this::syncStatusCallback);
if (fastSynchronizer.isPresent()) {
fastSynchronizer.get().start().whenComplete(this::handleFastSyncResult);
if (fastSyncDownloader.isPresent()) {
fastSyncDownloader.get().start().whenComplete(this::handleFastSyncResult);
} else {
startFullSync();
}
Expand All @@ -128,7 +129,7 @@ private void handleFastSyncResult(final FastSyncState result, final Throwable er
"Fast sync completed successfully with pivot block {}",
result.getPivotBlockNumber().getAsLong());
}
fastSynchronizer.ifPresent(FastSynchronizer::deleteFastSyncState);
fastSyncDownloader.ifPresent(FastSyncDownloader::deleteFastSyncState);

startFullSync();
}
Expand Down
Loading

0 comments on commit bf9d3b4

Please sign in to comment.