Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
Support resuming fast-sync downloads (#848)
Browse files Browse the repository at this point in the history
* Skip requesting data we already have but continue to walk the tree to ensure we have all child nodes.

* Don't delete fast sync state on stop.  Allow resuming world state downloads.
  • Loading branch information
ajsutton authored Feb 13, 2019
1 parent d421cf9 commit 3828dad
Show file tree
Hide file tree
Showing 10 changed files with 79 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ public interface Synchronizer {

void start();

void stop();

/**
* @return the status, based on SyncingResult When actively synchronizing blocks, alternatively
* empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,7 @@ public void start() {
}
}

@Override
public void stop() {
fastSynchronizer.ifPresent(FastSynchronizer::deleteFastSyncState);
}

private void handleFastSyncResult(final FastSyncState result, final Throwable error) {

final Throwable rootCause = ExceptionUtils.rootCause(error);
if (rootCause instanceof FastSyncException) {
LOG.error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
import tech.pegasys.pantheon.ethereum.rlp.RLP;
import tech.pegasys.pantheon.ethereum.trie.MerklePatriciaTrie;
import tech.pegasys.pantheon.ethereum.worldstate.StateTrieAccountValue;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage.Updater;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

class AccountTrieNodeDataRequest extends TrieNodeDataRequest {

Expand All @@ -36,6 +38,11 @@ public void persist(final Updater updater) {
updater.putAccountStateTrieNode(getHash(), getData());
}

@Override
public Optional<BytesValue> getExistingData(final WorldStateStorage worldStateStorage) {
return worldStateStorage.getAccountStateTrieNode(getHash());
}

@Override
protected NodeDataRequest createChildNodeDataRequest(final Hash childHash) {
return NodeDataRequest.createAccountDataRequest(childHash);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
import static com.google.common.base.Preconditions.checkNotNull;

import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage.Updater;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.util.Optional;
import java.util.stream.Stream;

class CodeNodeDataRequest extends NodeDataRequest {
Expand All @@ -36,4 +39,9 @@ public Stream<NodeDataRequest> getChildRequests() {
// Code nodes have nothing further to download
return Stream.empty();
}

@Override
public Optional<BytesValue> getExistingData(final WorldStateStorage worldStateStorage) {
return worldStateStorage.getCode(getHash());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.util.Optional;
import java.util.stream.Stream;

public abstract class NodeDataRequest {
Expand Down Expand Up @@ -96,4 +97,6 @@ public NodeDataRequest setData(final BytesValue data) {
public abstract void persist(final WorldStateStorage.Updater updater);

public abstract Stream<NodeDataRequest> getChildRequests();

public abstract Optional<BytesValue> getExistingData(final WorldStateStorage worldStateStorage);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
import static com.google.common.base.Preconditions.checkNotNull;

import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage.Updater;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.util.Collections;
import java.util.List;
import java.util.Optional;

class StorageTrieNodeDataRequest extends TrieNodeDataRequest {

Expand All @@ -33,6 +35,11 @@ public void persist(final Updater updater) {
updater.putAccountStorageTrieNode(getHash(), getData());
}

@Override
public Optional<BytesValue> getExistingData(final WorldStateStorage worldStateStorage) {
return worldStateStorage.getAccountStorageTrieNode(getHash());
}

@Override
protected NodeDataRequest createChildNodeDataRequest(final Hash childHash) {
return NodeDataRequest.createStorageDataRequest(childHash);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,18 @@ private void requestNodeData(final BlockHeader header) {

// Collect data to be requested
List<NodeDataRequest> toRequest = new ArrayList<>();
for (int i = 0; i < hashCountPerRequest; i++) {
while (toRequest.size() < hashCountPerRequest) {
NodeDataRequest pendingRequest = pendingRequests.dequeue();
if (pendingRequest == null) {
break;
}
final Optional<BytesValue> existingData =
pendingRequest.getExistingData(worldStateStorage);
if (existingData.isPresent()) {
pendingRequest.setData(existingData.get());
queueChildRequests(pendingRequest);
continue;
}
toRequest.add(pendingRequest);
}

Expand Down Expand Up @@ -221,25 +228,19 @@ private CompletableFuture<?> sendAndProcessRequests(
request.persist(storageUpdater);
}

// Queue child requests
request
.getChildRequests()
.filter(this::filterChildRequests)
.forEach(pendingRequests::enqueue);
queueChildRequests(request);
}
}
storageUpdater.commit();
});
}

private boolean isRootState(final BlockHeader blockHeader, final NodeDataRequest request) {
return request.getHash().equals(blockHeader.getStateRoot());
private void queueChildRequests(final NodeDataRequest request) {
request.getChildRequests().forEach(pendingRequests::enqueue);
}

private boolean filterChildRequests(final NodeDataRequest request) {
// For now, just filter out requests for code that we already know about
return !(request.getRequestType() == RequestType.CODE
&& worldStateStorage.contains(request.getHash()));
private boolean isRootState(final BlockHeader blockHeader, final NodeDataRequest request) {
return request.getHash().equals(blockHeader.getStateRoot());
}

private Map<Hash, BytesValue> mapNodeDataByHash(final List<BytesValue> data) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,13 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -350,11 +354,23 @@ public void doesRequestKnownAccountTrieNodesFromNetwork() {
new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage());

// Seed local storage with some trie node values
Map<Bytes32, BytesValue> knownTrieNodes =
Map<Bytes32, BytesValue> allNodes =
collectTrieNodesToBeRequested(remoteStorage, remoteWorldState.rootHash(), 5);
assertThat(knownTrieNodes.size()).isGreaterThan(0); // Sanity check
final Set<Bytes32> knownNodes = new HashSet<>();
final Set<Bytes32> unknownNodes = new HashSet<>();
assertThat(allNodes.size()).isGreaterThan(0); // Sanity check
Updater localStorageUpdater = localStorage.updater();
knownTrieNodes.forEach(localStorageUpdater::putAccountStateTrieNode);
final AtomicBoolean storeNode = new AtomicBoolean(true);
allNodes.forEach(
(nodeHash, node) -> {
if (storeNode.get()) {
localStorageUpdater.putAccountStateTrieNode(nodeHash, node);
knownNodes.add(nodeHash);
} else {
unknownNodes.add(nodeHash);
}
storeNode.set(!storeNode.get());
});
localStorageUpdater.commit();

WorldStateDownloader downloader =
Expand Down Expand Up @@ -390,7 +406,8 @@ public void doesRequestKnownAccountTrieNodesFromNetwork() {
.flatMap(m -> StreamSupport.stream(m.hashes().spliterator(), true))
.collect(Collectors.toList());
assertThat(requestedHashes.size()).isGreaterThan(0);
assertThat(requestedHashes).containsAll(knownTrieNodes.keySet());
assertThat(requestedHashes).containsAll(unknownNodes);
assertThat(requestedHashes).doesNotContainAnyElementsOf(knownNodes);

// Check that all expected account data was downloaded
WorldStateArchive localWorldStateArchive = new WorldStateArchive(localStorage);
Expand Down Expand Up @@ -440,13 +457,26 @@ public void doesRequestKnownStorageTrieNodesFromNetwork() {
.map(StateTrieAccountValue::readFrom)
.map(StateTrieAccountValue::getStorageRoot)
.collect(Collectors.toList());
Map<Bytes32, BytesValue> knownTrieNodes = new HashMap<>();
Map<Bytes32, BytesValue> allTrieNodes = new HashMap<>();
final Set<Bytes32> knownNodes = new HashSet<>();
final Set<Bytes32> unknownNodes = new HashSet<>();
for (Bytes32 storageRootHash : storageRootHashes) {
knownTrieNodes.putAll(collectTrieNodesToBeRequested(remoteStorage, storageRootHash, 5));
allTrieNodes.putAll(collectTrieNodesToBeRequested(remoteStorage, storageRootHash, 5));
}
assertThat(knownTrieNodes.size()).isGreaterThan(0); // Sanity check
assertThat(allTrieNodes.size()).isGreaterThan(0); // Sanity check
Updater localStorageUpdater = localStorage.updater();
knownTrieNodes.forEach(localStorageUpdater::putAccountStorageTrieNode);
boolean storeNode = true;
for (Entry<Bytes32, BytesValue> entry : allTrieNodes.entrySet()) {
Bytes32 hash = entry.getKey();
BytesValue data = entry.getValue();
if (storeNode) {
localStorageUpdater.putAccountStorageTrieNode(hash, data);
knownNodes.add(hash);
} else {
unknownNodes.add(hash);
}
storeNode = !storeNode;
}
localStorageUpdater.commit();

WorldStateDownloader downloader =
Expand Down Expand Up @@ -486,7 +516,8 @@ public void doesRequestKnownStorageTrieNodesFromNetwork() {
.flatMap(m -> StreamSupport.stream(m.hashes().spliterator(), true))
.collect(Collectors.toList());
assertThat(requestedHashes.size()).isGreaterThan(0);
assertThat(requestedHashes).containsAll(knownTrieNodes.keySet());
assertThat(requestedHashes).containsAll(unknownNodes);
assertThat(requestedHashes).doesNotContainAnyElementsOf(knownNodes);

// Check that all expected account data was downloaded
WorldStateArchive localWorldStateArchive = new WorldStateArchive(localStorage);
Expand Down
3 changes: 0 additions & 3 deletions pantheon/src/main/java/tech/pegasys/pantheon/Runner.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,6 @@ public void execute() {

@Override
public void close() throws Exception {
if (networkRunner.getNetwork().isP2pEnabled()) {
pantheonController.getSynchronizer().stop();
}
networkRunner.stop();
networkRunner.awaitStop();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,12 @@
import java.util.concurrent.atomic.AtomicLong;

import com.google.common.primitives.Longs;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;

public class RocksDbQueue implements BytesQueue {

private static final Logger LOG = LogManager.getLogger();

private final Options options;
private final RocksDB db;

Expand All @@ -46,11 +42,7 @@ public class RocksDbQueue implements BytesQueue {
private RocksDbQueue(final Path storageDirectory, final MetricsSystem metricsSystem) {
try {
RocksDbUtil.loadNativeLibrary();
options =
new Options()
.setCreateIfMissing(true)
// TODO: Support restoration from a previously persisted queue
.setErrorIfExists(true);
options = new Options().setCreateIfMissing(true);
db = RocksDB.open(options, storageDirectory.toString());

enqueueLatency =
Expand Down

0 comments on commit 3828dad

Please sign in to comment.