Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
[PAN-2302] Resume world state download from existing queue (#1016)
Browse files Browse the repository at this point in the history
  • Loading branch information
mbaxter authored Mar 4, 2019
1 parent fd60946 commit 3ac6f47
Show file tree
Hide file tree
Showing 11 changed files with 601 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage.Updater;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;

class CodeNodeDataRequest extends NodeDataRequest {

Expand All @@ -32,9 +33,9 @@ protected void doPersist(final Updater updater) {
}

@Override
public Stream<NodeDataRequest> getChildRequests() {
public List<NodeDataRequest> getChildRequests() {
// Code nodes have nothing further to download
return Stream.empty();
return Collections.emptyList();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;

public abstract class NodeDataRequest {
private final RequestType requestType;
Expand Down Expand Up @@ -116,7 +116,7 @@ public final void persist(final WorldStateStorage.Updater updater) {

protected abstract void doPersist(final WorldStateStorage.Updater updater);

public abstract Stream<NodeDataRequest> getChildRequests();
public abstract List<NodeDataRequest> getChildRequests();

public abstract Optional<BytesValue> getExistingData(final WorldStateStorage worldStateStorage);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,61 +13,58 @@
package tech.pegasys.pantheon.ethereum.eth.sync.worldstate;

import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.trie.MerklePatriciaTrie;
import tech.pegasys.pantheon.ethereum.trie.Node;
import tech.pegasys.pantheon.ethereum.trie.TrieNodeDecoder;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;
import java.util.Optional;
import java.util.stream.Collectors;

abstract class TrieNodeDataRequest extends NodeDataRequest {
import com.google.common.base.Objects;

private static final TrieNodeDecoder nodeDecoder = TrieNodeDecoder.create();
abstract class TrieNodeDataRequest extends NodeDataRequest {

TrieNodeDataRequest(final RequestType kind, final Hash hash) {
super(kind, hash);
}

@Override
public Stream<NodeDataRequest> getChildRequests() {
public List<NodeDataRequest> getChildRequests() {
if (getData() == null) {
// If this node hasn't been downloaded yet, we can't return any child data
return Stream.empty();
return Collections.emptyList();
}

final Node<BytesValue> node = nodeDecoder.decode(getData());
return getRequestsFromLoadedTrieNode(node);
}
List<Node<BytesValue>> nodes = TrieNodeDecoder.decodeNodes(getData());
// Collect hash-referenced child nodes to be requested
List<NodeDataRequest> requests =
nodes.stream()
.filter(this::nodeIsHashReferencedDescendant)
.map(Node::getHash)
.map(Hash::wrap)
.map(this::createChildNodeDataRequest)
.collect(Collectors.toList());

private Stream<NodeDataRequest> getRequestsFromLoadedTrieNode(final Node<BytesValue> trieNode) {
// Process this node's children
final Stream<NodeDataRequest> childRequests =
trieNode
.getChildren()
.map(List::stream)
.map(s -> s.flatMap(this::getRequestsFromChildTrieNode))
.orElse(Stream.of());
// Collect any requests embedded in leaf values
nodes.stream()
.filter(this::canReadNodeValue)
.map(Node::getValue)
.filter(Optional::isPresent)
.map(Optional::get)
.map(this::getRequestsFromTrieNodeValue)
.forEach(requests::addAll);

// Process value at this node, if present
return trieNode
.getValue()
.map(v -> Stream.concat(childRequests, (getRequestsFromTrieNodeValue(v).stream())))
.orElse(childRequests);
return requests;
}

private Stream<NodeDataRequest> getRequestsFromChildTrieNode(final Node<BytesValue> trieNode) {
if (trieNode.isReferencedByHash()) {
// If child nodes are reference by hash, we need to download them
final Hash hash = Hash.wrap(trieNode.getHash());
if (MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH.equals(hash) || BytesValue.EMPTY.equals(hash)) {
return Stream.empty();
}
final NodeDataRequest req = createChildNodeDataRequest(hash);
return Stream.of(req);
}
// Otherwise if the child's value has been inlined we can go ahead and process it
return getRequestsFromLoadedTrieNode(trieNode);
private boolean nodeIsHashReferencedDescendant(final Node<BytesValue> node) {
return !Objects.equal(node.getHash(), getHash()) && node.isReferencedByHash();
}

private boolean canReadNodeValue(final Node<BytesValue> node) {
return !nodeIsHashReferencedDescendant(node);
}

protected abstract NodeDataRequest createChildNodeDataRequest(final Hash childHash);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import tech.pegasys.pantheon.util.ExceptionUtils;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
Expand All @@ -29,14 +30,14 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

class WorldDownloadState {
private static final Logger LOG = LogManager.getLogger();

private final boolean downloadWasResumed;
private final TaskQueue<NodeDataRequest> pendingRequests;
private final ArrayBlockingQueue<Task<NodeDataRequest>> requestsToPersist;
private final int maxOutstandingRequests;
Expand All @@ -57,6 +58,7 @@ public WorldDownloadState(
final ArrayBlockingQueue<Task<NodeDataRequest>> requestsToPersist,
final int maxOutstandingRequests,
final int maxRequestsWithoutProgress) {
this.downloadWasResumed = !pendingRequests.isEmpty();
this.pendingRequests = pendingRequests;
this.requestsToPersist = requestsToPersist;
this.maxOutstandingRequests = maxOutstandingRequests;
Expand Down Expand Up @@ -98,6 +100,10 @@ private synchronized void cleanup(final Void result, final Throwable error) {
}
}

public boolean downloadWasResumed() {
return downloadWasResumed;
}

public void whileAdditionalRequestsCanBeSent(final Runnable action) {
while (shouldRequestNodeData()) {
if (sendingRequests.compareAndSet(false, true)) {
Expand Down Expand Up @@ -149,7 +155,7 @@ public synchronized void enqueueRequest(final NodeDataRequest request) {
}
}

public synchronized void enqueueRequests(final Stream<NodeDataRequest> requests) {
public synchronized void enqueueRequests(final Collection<NodeDataRequest> requests) {
if (!internalFuture.isDone()) {
requests.forEach(pendingRequests::enqueue);
}
Expand Down Expand Up @@ -212,9 +218,14 @@ private synchronized void markAsStalled(final int maxNodeRequestRetries) {
internalFuture.completeExceptionally(e);
}

public synchronized boolean shouldRequestRootNode() {
// If we get to the end of the download and we don't have the root, request it
return !internalFuture.isDone() && pendingRequests.allTasksCompleted() && rootNodeData == null;
}

public synchronized boolean checkCompletion(
final WorldStateStorage worldStateStorage, final BlockHeader header) {
if (!internalFuture.isDone() && pendingRequests.allTasksCompleted()) {
if (!internalFuture.isDone() && pendingRequests.allTasksCompleted() && rootNodeData != null) {
final Updater updater = worldStateStorage.updater();
updater.putAccountStateTrieNode(header.getStateRoot(), rootNodeData);
updater.commit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,10 @@ public CompletableFuture<Void> run(final BlockHeader header) {
maxNodeRequestsWithoutProgress);
this.downloadState.set(newDownloadState);

newDownloadState.enqueueRequest(NodeDataRequest.createAccountDataRequest(stateRoot));
if (!newDownloadState.downloadWasResumed()) {
// Only queue the root node if we're starting a new download from scratch
newDownloadState.enqueueRequest(NodeDataRequest.createAccountDataRequest(stateRoot));
}

ethContext
.getScheduler()
Expand Down Expand Up @@ -310,9 +313,13 @@ private void storeData(
madeProgress = true;
request.setData(matchingData);
if (isRootState(blockHeader, request)) {
downloadState.enqueueRequests(request.getChildRequests());
if (!downloadState.downloadWasResumed()) {
// Only queue rootnode children if we started from scratch
downloadState.enqueueRequests(request.getChildRequests());
}
downloadState.setRootNodeData(request.getData());
task.markCompleted();
downloadState.checkCompletion(worldStateStorage, blockHeader);
} else {
downloadState.addToPersistenceQueue(task);
}
Expand Down Expand Up @@ -361,6 +368,11 @@ protected void executeTask() {
});
storageUpdater.commit();

if (downloadState.shouldRequestRootNode()) {
downloadState.enqueueRequest(
NodeDataRequest.createAccountDataRequest(header.getStateRoot()));
}

if (downloadState.checkCompletion(worldStateStorage, header)) {
result.get().complete(null);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@
import tech.pegasys.pantheon.services.queue.TaskQueue.Task;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.util.Arrays;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;

import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -226,7 +226,7 @@ public void shouldNotAllowMultipleCallsToSendAdditionalRequestsAtOnce() {
public void shouldNotEnqueueRequestsAfterDownloadIsStalled() {
downloadState.checkCompletion(worldStateStorage, header);

downloadState.enqueueRequests(Stream.of(createAccountDataRequest(Hash.EMPTY_TRIE_HASH)));
downloadState.enqueueRequests(Arrays.asList(createAccountDataRequest(Hash.EMPTY_TRIE_HASH)));
downloadState.enqueueRequest(createAccountDataRequest(Hash.EMPTY_TRIE_HASH));

assertThat(pendingRequests.isEmpty()).isTrue();
Expand Down
Loading

0 comments on commit 3ac6f47

Please sign in to comment.