Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

[NC-2138] Implement chain download for fast sync #690

Merged
merged 10 commits into from
Jan 29, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,26 @@
*/
package tech.pegasys.pantheon.ethereum.eth.sync;

import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.chain.Blockchain;
import static java.util.Collections.emptyList;

import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.eth.manager.AbstractPeerTask.PeerTaskResult;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.EthTask;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncTarget;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.GetHeadersFromPeerByHashTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.WaitForPeersTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.exceptions.InvalidBlockException;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.util.ExceptionUtils;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

Expand All @@ -51,38 +43,33 @@ public class ChainDownloader<C> {
private static final Logger LOG = LogManager.getLogger();

private final SynchronizerConfiguration config;
private final ProtocolContext<C> protocolContext;
private final EthContext ethContext;
private final SyncState syncState;
private final SyncTargetManager<C> syncTargetManager;
private final CheckpointHeaderManager<C> checkpointHeaderManager;
private final BlockImportTaskFactory blockImportTaskFactory;
private final ProtocolSchedule<C> protocolSchedule;
private final LabelledMetric<OperationTimer> ethTasksTimer;

private final Deque<BlockHeader> checkpointHeaders = new ConcurrentLinkedDeque<>();
private int checkpointTimeouts = 0;
private int chainSegmentTimeouts = 0;

private final AtomicBoolean started = new AtomicBoolean(false);
private CompletableFuture<?> currentTask;

public ChainDownloader(
final SynchronizerConfiguration config,
final ProtocolSchedule<C> protocolSchedule,
final ProtocolContext<C> protocolContext,
final EthContext ethContext,
final SyncState syncState,
final LabelledMetric<OperationTimer> ethTasksTimer,
final SyncTargetManager<C> syncTargetManager,
final CheckpointHeaderManager<C> checkpointHeaderManager,
final BlockImportTaskFactory blockImportTaskFactory) {
this.protocolSchedule = protocolSchedule;
this.ethTasksTimer = ethTasksTimer;
this.config = config;
this.protocolContext = protocolContext;
this.ethContext = ethContext;

this.syncState = syncState;
this.syncTargetManager = syncTargetManager;
this.checkpointHeaderManager = checkpointHeaderManager;
this.blockImportTaskFactory = blockImportTaskFactory;
}

Expand All @@ -106,7 +93,7 @@ private CompletableFuture<?> executeDownload() {
waitForPeers()
.thenCompose(r -> syncTargetManager.findSyncTarget())
.thenCompose(this::pullCheckpointHeaders)
.thenCompose(r -> importBlocks())
.thenCompose(this::importBlocks)
.thenCompose(r -> checkSyncTarget())
.whenComplete(
(r, t) -> {
Expand All @@ -130,6 +117,12 @@ private CompletableFuture<?> executeDownload() {
return currentTask;
}

private CompletableFuture<List<BlockHeader>> pullCheckpointHeaders(final SyncTarget syncTarget) {
return syncTargetManager.isSyncTargetDisconnected()
? CompletableFuture.completedFuture(emptyList())
: checkpointHeaderManager.pullCheckpointHeaders(syncTarget);
}

private CompletableFuture<?> waitForPeers() {
return WaitForPeersTask.create(ethContext, 1, ethTasksTimer).run();
}
Expand Down Expand Up @@ -162,17 +155,10 @@ private CompletableFuture<Void> checkSyncTarget() {

private boolean finishedSyncingToCurrentTarget() {
return syncTargetManager.isSyncTargetDisconnected()
|| checkpointsHaveTimedOut()
|| checkpointHeaderManager.checkpointsHaveTimedOut()
|| chainSegmentsHaveTimedOut();
}

private boolean checkpointsHaveTimedOut() {
// We have no more checkpoints, and have been unable to pull any new checkpoints for
// several cycles.
return checkpointHeaders.size() == 0
&& checkpointTimeouts >= config.downloaderCheckpointTimeoutsPermitted();
}

private boolean chainSegmentsHaveTimedOut() {
return chainSegmentTimeouts >= config.downloaderChainSegmentTimeoutsPermitted();
}
Expand All @@ -183,73 +169,15 @@ private void clearSyncTarget() {

private void clearSyncTarget(final SyncTarget syncTarget) {
chainSegmentTimeouts = 0;
checkpointTimeouts = 0;
checkpointHeaders.clear();
checkpointHeaderManager.clearSyncTarget();
syncTargetManager.clearSyncTarget(syncTarget);
syncState.clearSyncTarget();
}

private boolean shouldDownloadMoreCheckpoints() {
return !syncTargetManager.isSyncTargetDisconnected()
&& checkpointHeaders.size() < config.downloaderHeaderRequestSize()
&& checkpointTimeouts < config.downloaderCheckpointTimeoutsPermitted();
}

private CompletableFuture<?> pullCheckpointHeaders(final SyncTarget syncTarget) {
if (!shouldDownloadMoreCheckpoints()) {
return CompletableFuture.completedFuture(null);
}

final BlockHeader lastHeader =
checkpointHeaders.size() > 0 ? checkpointHeaders.getLast() : syncTarget.commonAncestor();
// Try to pull more checkpoint headers
return checkpointHeadersTask(lastHeader, syncTarget)
.run()
.handle(
(r, t) -> {
t = ExceptionUtils.rootCause(t);
if (t instanceof TimeoutException) {
checkpointTimeouts++;
return null;
} else if (t != null) {
return r;
}
final List<BlockHeader> headers = r.getResult();
if (headers.size() > 0
&& checkpointHeaders.size() > 0
&& checkpointHeaders.getLast().equals(headers.get(0))) {
// Don't push header that is already tracked
headers.remove(0);
}
if (headers.isEmpty()) {
checkpointTimeouts++;
} else {
checkpointTimeouts = 0;
checkpointHeaders.addAll(headers);
LOG.debug("Tracking {} checkpoint headers", checkpointHeaders.size());
}
return r;
});
}

private EthTask<PeerTaskResult<List<BlockHeader>>> checkpointHeadersTask(
final BlockHeader lastHeader, final SyncTarget syncTarget) {
LOG.debug("Requesting checkpoint headers from {}", lastHeader.getNumber());
return GetHeadersFromPeerByHashTask.startingAtHash(
protocolSchedule,
ethContext,
lastHeader.getHash(),
lastHeader.getNumber(),
config.downloaderHeaderRequestSize() + 1,
config.downloaderChainSegmentSize() - 1,
ethTasksTimer)
.assignPeer(syncTarget.peer());
}

private CompletableFuture<List<Block>> importBlocks() {
private CompletableFuture<List<Block>> importBlocks(final List<BlockHeader> checkpointHeaders) {
if (checkpointHeaders.isEmpty()) {
// No checkpoints to download
return CompletableFuture.completedFuture(Collections.emptyList());
return CompletableFuture.completedFuture(emptyList());
}

final CompletableFuture<List<Block>> importedBlocks =
Expand Down Expand Up @@ -277,7 +205,7 @@ private CompletableFuture<List<Block>> importBlocks() {
if (t != null) {
LOG.error("Encountered error importing blocks", t);
}
if (clearImportedCheckpointHeaders()) {
if (checkpointHeaderManager.clearImportedCheckpointHeaders()) {
chainSegmentTimeouts = 0;
}
if (t instanceof TimeoutException || r != null) {
Expand All @@ -286,30 +214,16 @@ private CompletableFuture<List<Block>> importBlocks() {
}
} else {
chainSegmentTimeouts = 0;
final BlockHeader lastImportedCheckpoint = checkpointHeaders.getLast();
checkpointHeaders.clear();

final BlockHeader lastImportedCheckpoint =
checkpointHeaderManager.allCheckpointsImported();
syncState.setCommonAncestor(lastImportedCheckpoint);
}
});
}

private boolean clearImportedCheckpointHeaders() {
final Blockchain blockchain = protocolContext.getBlockchain();
// Update checkpoint headers to reflect if any checkpoints were imported.
final List<BlockHeader> imported = new ArrayList<>();
while (!checkpointHeaders.isEmpty()
&& blockchain.contains(checkpointHeaders.peekFirst().getHash())) {
imported.add(checkpointHeaders.removeFirst());
}
final BlockHeader lastImportedCheckpointHeader = imported.get(imported.size() - 1);
// The first checkpoint header is always present in the blockchain.
checkpointHeaders.addFirst(lastImportedCheckpointHeader);
syncState.setCommonAncestor(lastImportedCheckpointHeader);
return imported.size() > 1;
}

public interface BlockImportTaskFactory {
CompletableFuture<List<Block>> importBlocksForCheckpoints(
final Deque<BlockHeader> checkpointHeaders);
final List<BlockHeader> checkpointHeaders);
}
}
Loading