diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/CodeNodeDataRequest.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/CodeNodeDataRequest.java index c27b1c5e94..63a828fc92 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/CodeNodeDataRequest.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/CodeNodeDataRequest.java @@ -17,8 +17,9 @@ import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage.Updater; import tech.pegasys.pantheon.util.bytes.BytesValue; +import java.util.Collections; +import java.util.List; import java.util.Optional; -import java.util.stream.Stream; class CodeNodeDataRequest extends NodeDataRequest { @@ -32,9 +33,9 @@ protected void doPersist(final Updater updater) { } @Override - public Stream getChildRequests() { + public List getChildRequests() { // Code nodes have nothing further to download - return Stream.empty(); + return Collections.emptyList(); } @Override diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/NodeDataRequest.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/NodeDataRequest.java index 9db0785a41..51d800b55d 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/NodeDataRequest.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/NodeDataRequest.java @@ -21,8 +21,8 @@ import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage; import tech.pegasys.pantheon.util.bytes.BytesValue; +import java.util.List; import java.util.Optional; -import java.util.stream.Stream; public abstract class NodeDataRequest { private final RequestType requestType; @@ -116,7 +116,7 @@ public final void persist(final WorldStateStorage.Updater updater) { protected abstract void doPersist(final WorldStateStorage.Updater updater); - public abstract Stream getChildRequests(); + public abstract List getChildRequests(); public abstract Optional getExistingData(final WorldStateStorage worldStateStorage); } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/TrieNodeDataRequest.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/TrieNodeDataRequest.java index 35840c15eb..94f3f8903e 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/TrieNodeDataRequest.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/TrieNodeDataRequest.java @@ -13,61 +13,58 @@ package tech.pegasys.pantheon.ethereum.eth.sync.worldstate; import tech.pegasys.pantheon.ethereum.core.Hash; -import tech.pegasys.pantheon.ethereum.trie.MerklePatriciaTrie; import tech.pegasys.pantheon.ethereum.trie.Node; import tech.pegasys.pantheon.ethereum.trie.TrieNodeDecoder; import tech.pegasys.pantheon.util.bytes.BytesValue; +import java.util.Collections; import java.util.List; -import java.util.stream.Stream; +import java.util.Optional; +import java.util.stream.Collectors; -abstract class TrieNodeDataRequest extends NodeDataRequest { +import com.google.common.base.Objects; - private static final TrieNodeDecoder nodeDecoder = TrieNodeDecoder.create(); +abstract class TrieNodeDataRequest extends NodeDataRequest { TrieNodeDataRequest(final RequestType kind, final Hash hash) { super(kind, hash); } @Override - public Stream getChildRequests() { + public List getChildRequests() { if (getData() == null) { // If this node hasn't been downloaded yet, we can't return any child data - return Stream.empty(); + return Collections.emptyList(); } - final Node node = nodeDecoder.decode(getData()); - return getRequestsFromLoadedTrieNode(node); - } + List> nodes = TrieNodeDecoder.decodeNodes(getData()); + // Collect hash-referenced child nodes to be requested + List requests = + nodes.stream() + .filter(this::nodeIsHashReferencedDescendant) + .map(Node::getHash) + .map(Hash::wrap) + .map(this::createChildNodeDataRequest) + .collect(Collectors.toList()); - private Stream getRequestsFromLoadedTrieNode(final Node trieNode) { - // Process this node's children - final Stream childRequests = - trieNode - .getChildren() - .map(List::stream) - .map(s -> s.flatMap(this::getRequestsFromChildTrieNode)) - .orElse(Stream.of()); + // Collect any requests embedded in leaf values + nodes.stream() + .filter(this::canReadNodeValue) + .map(Node::getValue) + .filter(Optional::isPresent) + .map(Optional::get) + .map(this::getRequestsFromTrieNodeValue) + .forEach(requests::addAll); - // Process value at this node, if present - return trieNode - .getValue() - .map(v -> Stream.concat(childRequests, (getRequestsFromTrieNodeValue(v).stream()))) - .orElse(childRequests); + return requests; } - private Stream getRequestsFromChildTrieNode(final Node trieNode) { - if (trieNode.isReferencedByHash()) { - // If child nodes are reference by hash, we need to download them - final Hash hash = Hash.wrap(trieNode.getHash()); - if (MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH.equals(hash) || BytesValue.EMPTY.equals(hash)) { - return Stream.empty(); - } - final NodeDataRequest req = createChildNodeDataRequest(hash); - return Stream.of(req); - } - // Otherwise if the child's value has been inlined we can go ahead and process it - return getRequestsFromLoadedTrieNode(trieNode); + private boolean nodeIsHashReferencedDescendant(final Node node) { + return !Objects.equal(node.getHash(), getHash()) && node.isReferencedByHash(); + } + + private boolean canReadNodeValue(final Node node) { + return !nodeIsHashReferencedDescendant(node); } protected abstract NodeDataRequest createChildNodeDataRequest(final Hash childHash); diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldDownloadState.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldDownloadState.java index a5ad867f38..9a154cab13 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldDownloadState.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldDownloadState.java @@ -21,6 +21,7 @@ import tech.pegasys.pantheon.util.ExceptionUtils; import tech.pegasys.pantheon.util.bytes.BytesValue; +import java.util.Collection; import java.util.Collections; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; @@ -29,7 +30,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Stream; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -37,6 +37,7 @@ class WorldDownloadState { private static final Logger LOG = LogManager.getLogger(); + private final boolean downloadWasResumed; private final TaskQueue pendingRequests; private final ArrayBlockingQueue> requestsToPersist; private final int maxOutstandingRequests; @@ -57,6 +58,7 @@ public WorldDownloadState( final ArrayBlockingQueue> requestsToPersist, final int maxOutstandingRequests, final int maxRequestsWithoutProgress) { + this.downloadWasResumed = !pendingRequests.isEmpty(); this.pendingRequests = pendingRequests; this.requestsToPersist = requestsToPersist; this.maxOutstandingRequests = maxOutstandingRequests; @@ -98,6 +100,10 @@ private synchronized void cleanup(final Void result, final Throwable error) { } } + public boolean downloadWasResumed() { + return downloadWasResumed; + } + public void whileAdditionalRequestsCanBeSent(final Runnable action) { while (shouldRequestNodeData()) { if (sendingRequests.compareAndSet(false, true)) { @@ -149,7 +155,7 @@ public synchronized void enqueueRequest(final NodeDataRequest request) { } } - public synchronized void enqueueRequests(final Stream requests) { + public synchronized void enqueueRequests(final Collection requests) { if (!internalFuture.isDone()) { requests.forEach(pendingRequests::enqueue); } @@ -212,9 +218,14 @@ private synchronized void markAsStalled(final int maxNodeRequestRetries) { internalFuture.completeExceptionally(e); } + public synchronized boolean shouldRequestRootNode() { + // If we get to the end of the download and we don't have the root, request it + return !internalFuture.isDone() && pendingRequests.allTasksCompleted() && rootNodeData == null; + } + public synchronized boolean checkCompletion( final WorldStateStorage worldStateStorage, final BlockHeader header) { - if (!internalFuture.isDone() && pendingRequests.allTasksCompleted()) { + if (!internalFuture.isDone() && pendingRequests.allTasksCompleted() && rootNodeData != null) { final Updater updater = worldStateStorage.updater(); updater.putAccountStateTrieNode(header.getStateRoot(), rootNodeData); updater.commit(); diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java index e23884af8a..e3c2edbec3 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java @@ -165,7 +165,10 @@ public CompletableFuture run(final BlockHeader header) { maxNodeRequestsWithoutProgress); this.downloadState.set(newDownloadState); - newDownloadState.enqueueRequest(NodeDataRequest.createAccountDataRequest(stateRoot)); + if (!newDownloadState.downloadWasResumed()) { + // Only queue the root node if we're starting a new download from scratch + newDownloadState.enqueueRequest(NodeDataRequest.createAccountDataRequest(stateRoot)); + } ethContext .getScheduler() @@ -310,9 +313,13 @@ private void storeData( madeProgress = true; request.setData(matchingData); if (isRootState(blockHeader, request)) { - downloadState.enqueueRequests(request.getChildRequests()); + if (!downloadState.downloadWasResumed()) { + // Only queue rootnode children if we started from scratch + downloadState.enqueueRequests(request.getChildRequests()); + } downloadState.setRootNodeData(request.getData()); task.markCompleted(); + downloadState.checkCompletion(worldStateStorage, blockHeader); } else { downloadState.addToPersistenceQueue(task); } @@ -361,6 +368,11 @@ protected void executeTask() { }); storageUpdater.commit(); + if (downloadState.shouldRequestRootNode()) { + downloadState.enqueueRequest( + NodeDataRequest.createAccountDataRequest(header.getStateRoot())); + } + if (downloadState.checkCompletion(worldStateStorage, header)) { result.get().complete(null); } else { diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldDownloadStateTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldDownloadStateTest.java index 66da576aed..e5872d231e 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldDownloadStateTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldDownloadStateTest.java @@ -32,9 +32,9 @@ import tech.pegasys.pantheon.services.queue.TaskQueue.Task; import tech.pegasys.pantheon.util.bytes.BytesValue; +import java.util.Arrays; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CompletableFuture; -import java.util.stream.Stream; import org.junit.Before; import org.junit.Test; @@ -226,7 +226,7 @@ public void shouldNotAllowMultipleCallsToSendAdditionalRequestsAtOnce() { public void shouldNotEnqueueRequestsAfterDownloadIsStalled() { downloadState.checkCompletion(worldStateStorage, header); - downloadState.enqueueRequests(Stream.of(createAccountDataRequest(Hash.EMPTY_TRIE_HASH))); + downloadState.enqueueRequests(Arrays.asList(createAccountDataRequest(Hash.EMPTY_TRIE_HASH))); downloadState.enqueueRequest(createAccountDataRequest(Hash.EMPTY_TRIE_HASH)); assertThat(pendingRequests.isEmpty()).isTrue(); diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java index 64d400e3c3..68a08bdf78 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java @@ -15,6 +15,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; @@ -70,6 +71,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -424,7 +426,7 @@ private void testCancellation(final boolean shouldCancelFuture) { } @Test - public void doesRequestKnownAccountTrieNodesFromNetwork() { + public void doesNotRequestKnownAccountTrieNodesFromNetwork() { final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(new EthScheduler(1, 1, 1, new NoOpMetricsSystem())); @@ -454,7 +456,7 @@ public void doesRequestKnownAccountTrieNodesFromNetwork() { // Seed local storage with some trie node values final Map allNodes = - collectTrieNodesToBeRequested(remoteStorage, remoteWorldState.rootHash(), 5); + collectTrieNodesToBeRequestedAfterRoot(remoteStorage, remoteWorldState.rootHash(), 5); final Set knownNodes = new HashSet<>(); final Set unknownNodes = new HashSet<>(); assertThat(allNodes.size()).isGreaterThan(0); // Sanity check @@ -491,7 +493,7 @@ public void doesRequestKnownAccountTrieNodesFromNetwork() { giveOtherThreadsAGo(); } - // Check that known trie nodes were requested + // Check that unknown trie nodes were requested final List requestedHashes = sentMessages.stream() .filter(m -> m.getCode() == EthPV63.GET_NODE_DATA) @@ -510,7 +512,7 @@ public void doesRequestKnownAccountTrieNodesFromNetwork() { } @Test - public void doesRequestKnownStorageTrieNodesFromNetwork() { + public void doesNotRequestKnownStorageTrieNodesFromNetwork() { final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(new EthScheduler(1, 1, 1, new NoOpMetricsSystem())); @@ -554,7 +556,8 @@ public void doesRequestKnownStorageTrieNodesFromNetwork() { final Set knownNodes = new HashSet<>(); final Set unknownNodes = new HashSet<>(); for (final Bytes32 storageRootHash : storageRootHashes) { - allTrieNodes.putAll(collectTrieNodesToBeRequested(remoteStorage, storageRootHash, 5)); + allTrieNodes.putAll( + collectTrieNodesToBeRequestedAfterRoot(remoteStorage, storageRootHash, 5)); } assertThat(allTrieNodes.size()).isGreaterThan(0); // Sanity check final Updater localStorageUpdater = localStorage.updater(); @@ -593,7 +596,7 @@ public void doesRequestKnownStorageTrieNodesFromNetwork() { // World state should be available by the time the result is complete assertThat(localStorage.isWorldStateAvailable(stateRoot)).isTrue(); - // Check that known trie nodes were requested + // Check that unknown trie nodes were requested final List requestedHashes = sentMessages.stream() .filter(m -> m.getCode() == EthPV63.GET_NODE_DATA) @@ -666,9 +669,89 @@ public void stalledDownloader() { assertThat(retryResult).isCompleted(); } + @Test + public void resumesFromNonEmptyQueue() { + final EthProtocolManager ethProtocolManager = + EthProtocolManagerTestUtil.create(new EthScheduler(1, 1, 1, new NoOpMetricsSystem())); + + // Setup "remote" state + final WorldStateStorage remoteStorage = + new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()); + final WorldStateArchive remoteWorldStateArchive = new WorldStateArchive(remoteStorage); + final MutableWorldState remoteWorldState = remoteWorldStateArchive.getMutable(); + + // Generate accounts and save corresponding state root + List accounts = dataGen.createRandomAccounts(remoteWorldState, 10); + final Hash stateRoot = remoteWorldState.rootHash(); + assertThat(stateRoot).isNotEqualTo(EMPTY_TRIE_ROOT); // Sanity check + final BlockHeader header = + dataGen.block(BlockOptions.create().setStateRoot(stateRoot).setBlockNumber(10)).getHeader(); + + // Add some nodes to the queue + final TaskQueue queue = spy(new InMemoryTaskQueue<>()); + List queuedHashes = getFirstSetOfChildNodeRequests(remoteStorage, stateRoot); + assertThat(queuedHashes.size()).isGreaterThan(0); // Sanity check + for (Bytes32 bytes32 : queuedHashes) { + queue.enqueue(new AccountTrieNodeDataRequest(Hash.wrap(bytes32))); + } + // Sanity check + for (Bytes32 bytes32 : queuedHashes) { + final Hash hash = Hash.wrap(bytes32); + verify(queue, times(1)).enqueue(argThat((r) -> r.getHash().equals(hash))); + } + + final WorldStateStorage localStorage = + new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()); + final SynchronizerConfiguration syncConfig = + SynchronizerConfiguration.builder().worldStateMaxRequestsWithoutProgress(10).build(); + final WorldStateDownloader downloader = + createDownloader(syncConfig, ethProtocolManager.ethContext(), localStorage, queue); + + // Create a peer that can respond + final RespondingEthPeer peer = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, header.getNumber()); + + // Respond to node data requests + final List sentMessages = new ArrayList<>(); + final Responder blockChainResponder = + RespondingEthPeer.blockchainResponder(mock(Blockchain.class), remoteWorldStateArchive); + final Responder responder = + RespondingEthPeer.wrapResponderWithCollector(blockChainResponder, sentMessages); + + CompletableFuture result = downloader.run(header); + while (!result.isDone()) { + peer.respond(responder); + giveOtherThreadsAGo(); + } + assertThat(localStorage.isWorldStateAvailable(stateRoot)).isTrue(); + + // Check that already enqueued trie nodes were requested + final List requestedHashes = + sentMessages.stream() + .filter(m -> m.getCode() == EthPV63.GET_NODE_DATA) + .map(GetNodeDataMessage::readFrom) + .flatMap(m -> StreamSupport.stream(m.hashes().spliterator(), true)) + .collect(Collectors.toList()); + assertThat(requestedHashes.size()).isGreaterThan(0); + assertThat(requestedHashes).containsAll(queuedHashes); + + // Check that already enqueued requests were not enqueued more than once + for (Bytes32 bytes32 : queuedHashes) { + final Hash hash = Hash.wrap(bytes32); + verify(queue, times(1)).enqueue(argThat((r) -> r.getHash().equals(hash))); + } + + // Check that all expected account data was downloaded + assertThat(result).isDone(); + final WorldStateArchive localWorldStateArchive = new WorldStateArchive(localStorage); + final WorldState localWorldState = localWorldStateArchive.get(stateRoot).get(); + assertAccountsMatch(localWorldState, accounts); + } + /** * Walks through trie represented by the given rootHash and returns hash-node pairs that would - * need to be requested from the network in order to reconstruct this trie. + * need to be requested from the network in order to reconstruct this trie, excluding the root + * node. * * @param storage Storage holding node data required to reconstitute the trie represented by * rootHash @@ -676,35 +759,42 @@ public void stalledDownloader() { * @param maxNodes The maximum number of values to collect before returning * @return A list of hash-node pairs */ - private Map collectTrieNodesToBeRequested( + private Map collectTrieNodesToBeRequestedAfterRoot( final WorldStateStorage storage, final Bytes32 rootHash, final int maxNodes) { final Map trieNodes = new HashMap<>(); - final TrieNodeDecoder decoder = TrieNodeDecoder.create(); - final BytesValue rootNode = storage.getNodeData(rootHash).get(); - - // Walk through hash-referenced nodes - final List> hashReferencedNodes = new ArrayList<>(); - hashReferencedNodes.add(decoder.decode(rootNode)); - while (!hashReferencedNodes.isEmpty() && trieNodes.size() < maxNodes) { - final Node currentNode = hashReferencedNodes.remove(0); - final List> children = new ArrayList<>(); - currentNode.getChildren().ifPresent(children::addAll); - while (!children.isEmpty() && trieNodes.size() < maxNodes) { - final Node child = children.remove(0); - if (child.isReferencedByHash()) { - final BytesValue childNode = storage.getNodeData(child.getHash()).get(); - trieNodes.put(child.getHash(), childNode); - hashReferencedNodes.add(decoder.decode(childNode)); - } else { - child.getChildren().ifPresent(children::addAll); - } - } - } + TrieNodeDecoder.breadthFirstDecoder(storage::getNodeData, rootHash) + .filter(n -> !Objects.equals(n.getHash(), rootHash)) + .filter(Node::isReferencedByHash) + .limit(maxNodes) + .forEach((n) -> trieNodes.put(n.getHash(), n.getRlp())); return trieNodes; } + /** + * Returns the first set of node hashes that would need to be requested from the network after + * retrieving the root node in order to rebuild the trie represented by the given rootHash and + * storage. + * + * @param storage Storage holding node data required to reconstitute the trie represented by + * rootHash + * @param rootHash The hash of the root node of some trie + * @return A list of node hashes + */ + private List getFirstSetOfChildNodeRequests( + final WorldStateStorage storage, final Bytes32 rootHash) { + final List hashesToRequest = new ArrayList<>(); + + BytesValue rootNodeRlp = storage.getNodeData(rootHash).get(); + TrieNodeDecoder.decodeNodes(rootNodeRlp).stream() + .filter(n -> !Objects.equals(n.getHash(), rootHash)) + .filter(Node::isReferencedByHash) + .forEach((n) -> hashesToRequest.add(n.getHash())); + + return hashesToRequest; + } + private void downloadAvailableWorldStateFromPeers( final int peerCount, final int accountCount, diff --git a/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/TrieNodeDecoder.java b/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/TrieNodeDecoder.java index 18ca328e50..92e9525a66 100644 --- a/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/TrieNodeDecoder.java +++ b/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/TrieNodeDecoder.java @@ -12,25 +12,160 @@ */ package tech.pegasys.pantheon.ethereum.trie; +import static com.google.common.base.Preconditions.checkArgument; + +import tech.pegasys.pantheon.util.bytes.Bytes32; import tech.pegasys.pantheon.util.bytes.BytesValue; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Objects; import java.util.Optional; import java.util.function.Function; +import java.util.stream.Stream; + +import com.google.common.collect.Streams; public class TrieNodeDecoder { + private static final StoredNodeFactory emptyNodeFactory = + new StoredNodeFactory<>((h) -> Optional.empty(), Function.identity(), Function.identity()); + + // Hide constructor for static utility class + private TrieNodeDecoder() {} + + /** + * Decode an rlp-encoded trie node + * + * @param rlp The rlp-encoded node + * @return A {@code Node} representation of the rlp data + */ + public static Node decode(final BytesValue rlp) { + return emptyNodeFactory.decode(rlp); + } - private final StoredNodeFactory nodeFactory; + /** + * Flattens this node and all of its inlined nodes and node references into a list. + * + * @param nodeRlp The bytes of the trie node to be decoded. + * @return A list of nodes and node references embedded in the given rlp. + */ + public static List> decodeNodes(final BytesValue nodeRlp) { + Node node = decode(nodeRlp); + List> nodes = new ArrayList<>(); + nodes.add(node); - private TrieNodeDecoder() { - nodeFactory = - new StoredNodeFactory<>((h) -> Optional.empty(), Function.identity(), Function.identity()); + final List> toProcess = new ArrayList<>(); + node.getChildren().ifPresent(toProcess::addAll); + while (!toProcess.isEmpty()) { + final Node currentNode = toProcess.remove(0); + if (Objects.equals(NullNode.instance(), currentNode)) { + // Skip null nodes + continue; + } + nodes.add(currentNode); + + if (!currentNode.isReferencedByHash()) { + // If current node is inlined, that means we can process its children + currentNode.getChildren().ifPresent(toProcess::addAll); + } + } + + return nodes; } - public static TrieNodeDecoder create() { - return new TrieNodeDecoder(); + /** + * Walks the trie in a bread-first manner, returning the list of nodes encountered in order. If + * any nodes are missing from the nodeLoader, those nodes are just skipped. + * + * @param nodeLoader The NodeLoader for looking up nodes by hash + * @param rootHash The hash of the root node + * @param maxDepth The maximum depth to traverse to. A value of zero will traverse the root node + * only. + * @return A stream non-null nodes in the breadth-first traversal order. + */ + public static Stream> breadthFirstDecoder( + final NodeLoader nodeLoader, final Bytes32 rootHash, final int maxDepth) { + checkArgument(maxDepth >= 0); + return Streams.stream(new BreadthFirstIterator(nodeLoader, rootHash, maxDepth)); } - public Node decode(final BytesValue rlp) { - return nodeFactory.decode(rlp); + /** + * Walks the trie in a bread-first manner, returning the list of nodes encountered in order. If + * any nodes are missing from the nodeLoader, those nodes are just skipped. + * + * @param nodeLoader The NodeLoader for looking up nodes by hash + * @param rootHash The hash of the root node + * @return A stream non-null nodes in the breadth-first traversal order. + */ + public static Stream> breadthFirstDecoder( + final NodeLoader nodeLoader, final Bytes32 rootHash) { + return breadthFirstDecoder(nodeLoader, rootHash, Integer.MAX_VALUE); + } + + private static class BreadthFirstIterator implements Iterator> { + + private final int maxDepth; + private final StoredNodeFactory nodeFactory; + + private int currentDepth = 0; + private final List> currentNodes = new ArrayList<>(); + private final List> nextNodes = new ArrayList<>(); + + BreadthFirstIterator(final NodeLoader nodeLoader, final Bytes32 rootHash, final int maxDepth) { + this.maxDepth = maxDepth; + this.nodeFactory = + new StoredNodeFactory<>(nodeLoader, Function.identity(), Function.identity()); + + nodeLoader.getNode(rootHash).map(TrieNodeDecoder::decode).ifPresent(currentNodes::add); + } + + public static BreadthFirstIterator create( + final NodeLoader nodeLoader, final Bytes32 rootHash, final int maxDepth) { + return new BreadthFirstIterator(nodeLoader, rootHash, maxDepth); + } + + @Override + public boolean hasNext() { + return !currentNodes.isEmpty() && currentDepth <= maxDepth; + } + + @Override + public Node next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + final Node nextNode = currentNodes.remove(0); + + final List> children = new ArrayList<>(); + nextNode.getChildren().ifPresent(children::addAll); + while (!children.isEmpty()) { + Node child = children.remove(0); + if (Objects.equals(child, NullNode.instance())) { + // Ignore null nodes + continue; + } + if (child.isReferencedByHash()) { + // Retrieve hash-referenced child + final Optional> maybeChildNode = nodeFactory.retrieve(child.getHash()); + if (!maybeChildNode.isPresent()) { + continue; + } + child = maybeChildNode.get(); + } + nextNodes.add(child); + } + + // Set up next level + if (currentNodes.isEmpty()) { + currentDepth += 1; + currentNodes.addAll(nextNodes); + nextNodes.clear(); + } + + return nextNode; + } } } diff --git a/ethereum/trie/src/test/java/tech/pegasys/pantheon/ethereum/trie/TrieNodeDecoderTest.java b/ethereum/trie/src/test/java/tech/pegasys/pantheon/ethereum/trie/TrieNodeDecoderTest.java new file mode 100644 index 0000000000..4feb18eb03 --- /dev/null +++ b/ethereum/trie/src/test/java/tech/pegasys/pantheon/ethereum/trie/TrieNodeDecoderTest.java @@ -0,0 +1,224 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.trie; + +import static org.assertj.core.api.Assertions.assertThat; + +import tech.pegasys.pantheon.services.kvstore.InMemoryKeyValueStorage; +import tech.pegasys.pantheon.services.kvstore.KeyValueStorage.Transaction; +import tech.pegasys.pantheon.util.bytes.Bytes32; +import tech.pegasys.pantheon.util.bytes.BytesValue; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Random; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.junit.Test; + +public class TrieNodeDecoderTest { + + @Test + public void decodeNodes() { + final InMemoryKeyValueStorage storage = new InMemoryKeyValueStorage(); + + // Build a small trie + MerklePatriciaTrie trie = + new StoredMerklePatriciaTrie<>(storage::get, Function.identity(), Function.identity()); + trie.put(BytesValue.fromHexString("0x100000"), BytesValue.of(1)); + trie.put(BytesValue.fromHexString("0x200000"), BytesValue.of(2)); + trie.put(BytesValue.fromHexString("0x300000"), BytesValue.of(3)); + + trie.put(BytesValue.fromHexString("0x110000"), BytesValue.of(10)); + trie.put(BytesValue.fromHexString("0x210000"), BytesValue.of(20)); + // Create large leaf node that will not be inlined + trie.put( + BytesValue.fromHexString("0x310000"), + BytesValue.fromHexString("0x11223344556677889900112233445566778899")); + + // Save nodes to storage + final Transaction tx = storage.startTransaction(); + trie.commit(tx::put); + tx.commit(); + + // Get and flatten root node + final BytesValue rootNodeRlp = storage.get(trie.getRootHash()).get(); + final List> nodes = TrieNodeDecoder.decodeNodes(rootNodeRlp); + // The full trie hold 10 nodes, the branch node starting with 0x3... holding 2 values will be a + // hash + // referenced node and so its 2 child nodes will be missing + assertThat(nodes.size()).isEqualTo(8); + + // Collect and check values + List actualValues = + nodes.stream() + .filter(n -> !n.isReferencedByHash()) + .map(Node::getValue) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toList()); + assertThat(actualValues) + .containsExactlyInAnyOrder( + BytesValue.of(1), BytesValue.of(10), BytesValue.of(2), BytesValue.of(20)); + } + + @Test + public void breadthFirstDecode_smallTrie() { + final InMemoryKeyValueStorage storage = new InMemoryKeyValueStorage(); + + // Build a small trie + MerklePatriciaTrie trie = + new StoredMerklePatriciaTrie<>(storage::get, Function.identity(), Function.identity()); + trie.put(BytesValue.fromHexString("0x100000"), BytesValue.of(1)); + trie.put(BytesValue.fromHexString("0x200000"), BytesValue.of(2)); + trie.put(BytesValue.fromHexString("0x300000"), BytesValue.of(3)); + + trie.put(BytesValue.fromHexString("0x110000"), BytesValue.of(10)); + trie.put(BytesValue.fromHexString("0x210000"), BytesValue.of(20)); + trie.put(BytesValue.fromHexString("0x310000"), BytesValue.of(30)); + + // Save nodes to storage + final Transaction tx = storage.startTransaction(); + trie.commit(tx::put); + tx.commit(); + + // First layer should just be the root node + final List> depth0Nodes = + TrieNodeDecoder.breadthFirstDecoder(storage::get, trie.getRootHash(), 0) + .collect(Collectors.toList()); + + assertThat(depth0Nodes.size()).isEqualTo(1); + final Node rootNode = depth0Nodes.get(0); + assertThat(rootNode.getHash()).isEqualTo(trie.getRootHash()); + + // Decode first 2 levels + final List> depth0And1Nodes = + (TrieNodeDecoder.breadthFirstDecoder(storage::get, trie.getRootHash(), 1) + .collect(Collectors.toList())); + final int secondLevelNodeCount = 3; + final int expectedNodeCount = secondLevelNodeCount + 1; + assertThat(depth0And1Nodes.size()).isEqualTo(expectedNodeCount); + // First node should be root node + assertThat(depth0And1Nodes.get(0).getHash()).isEqualTo(rootNode.getHash()); + // Subsequent nodes should be children of root node + List expectedNodesHashes = + rootNode.getChildren().get().stream() + .filter(n -> !Objects.equals(n, NullNode.instance())) + .map(Node::getHash) + .collect(Collectors.toList()); + List actualNodeHashes = + depth0And1Nodes.subList(1, expectedNodeCount).stream() + .map(Node::getHash) + .collect(Collectors.toList()); + assertThat(actualNodeHashes).isEqualTo(expectedNodesHashes); + + // Decode full trie + final List> allNodes = + TrieNodeDecoder.breadthFirstDecoder(storage::get, trie.getRootHash()) + .collect(Collectors.toList()); + assertThat(allNodes.size()).isEqualTo(10); + // Collect and check values + List actualValues = + allNodes.stream() + .map(Node::getValue) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toList()); + assertThat(actualValues) + .containsExactly( + BytesValue.of(1), + BytesValue.of(10), + BytesValue.of(2), + BytesValue.of(20), + BytesValue.of(3), + BytesValue.of(30)); + } + + @Test + public void breadthFirstDecode_partialTrie() { + final InMemoryKeyValueStorage fullStorage = new InMemoryKeyValueStorage(); + final InMemoryKeyValueStorage partialStorage = new InMemoryKeyValueStorage(); + + // Build a small trie + MerklePatriciaTrie trie = + new StoredMerklePatriciaTrie<>(fullStorage::get, Function.identity(), Function.identity()); + final Random random = new Random(1); + for (int i = 0; i < 30; i++) { + byte[] key = new byte[4]; + byte[] val = new byte[4]; + random.nextBytes(key); + random.nextBytes(val); + trie.put(BytesValue.wrap(key), BytesValue.wrap(val)); + } + final Transaction tx = fullStorage.startTransaction(); + trie.commit(tx::put); + tx.commit(); + + // Get root node + Node rootNode = + TrieNodeDecoder.breadthFirstDecoder(fullStorage::get, trie.getRootHash()).findFirst().get(); + + // Decode partially available trie + final Transaction partialTx = partialStorage.startTransaction(); + partialTx.put(trie.getRootHash(), rootNode.getRlp()); + partialTx.commit(); + final List> allDecodableNodes = + TrieNodeDecoder.breadthFirstDecoder(partialStorage::get, trie.getRootHash()) + .collect(Collectors.toList()); + assertThat(allDecodableNodes.size()).isGreaterThanOrEqualTo(1); + assertThat(allDecodableNodes.get(0).getHash()).isEqualTo(rootNode.getHash()); + } + + @Test + public void breadthFirstDecode_emptyTrie() { + List> result = + TrieNodeDecoder.breadthFirstDecoder( + (h) -> Optional.empty(), MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH) + .collect(Collectors.toList()); + assertThat(result.size()).isEqualTo(0); + } + + @Test + public void breadthFirstDecode_singleNodeTrie() { + final InMemoryKeyValueStorage storage = new InMemoryKeyValueStorage(); + + MerklePatriciaTrie trie = + new StoredMerklePatriciaTrie<>(storage::get, Function.identity(), Function.identity()); + trie.put(BytesValue.fromHexString("0x100000"), BytesValue.of(1)); + + // Save nodes to storage + final Transaction tx = storage.startTransaction(); + trie.commit(tx::put); + tx.commit(); + + List> result = + TrieNodeDecoder.breadthFirstDecoder(storage::get, trie.getRootHash()) + .collect(Collectors.toList()); + assertThat(result.size()).isEqualTo(1); + assertThat(result.get(0).getValue()).contains(BytesValue.of(1)); + BytesValue actualPath = CompactEncoding.pathToBytes(result.get(0).getPath()); + assertThat(actualPath).isEqualTo(BytesValue.fromHexString("0x100000")); + } + + @Test + public void breadthFirstDecode_unknownTrie() { + + Bytes32 randomRootHash = Bytes32.fromHexStringLenient("0x12"); + List> result = + TrieNodeDecoder.breadthFirstDecoder((h) -> Optional.empty(), randomRootHash) + .collect(Collectors.toList()); + assertThat(result.size()).isEqualTo(0); + } +} diff --git a/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/RocksDbTaskQueue.java b/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/RocksDbTaskQueue.java index 21cd6e6a3c..24ffefcb77 100644 --- a/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/RocksDbTaskQueue.java +++ b/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/RocksDbTaskQueue.java @@ -38,9 +38,9 @@ public class RocksDbTaskQueue implements TaskQueue { private long lastEnqueuedKey = 0; private long lastDequeuedKey = 0; + private long oldestKey = 0; private RocksIterator dequeueIterator; private long lastValidKeyFromIterator; - private long oldestKey = 0; private final Set> outstandingTasks = new HashSet<>(); private boolean closed = false; @@ -73,11 +73,30 @@ private RocksDbTaskQueue( MetricCategory.BIG_QUEUE, "dequeue_latency_seconds", "Latency for dequeuing an item."); + + // Initialize queue from existing db + initializeQueue(); } catch (final RocksDBException e) { throw new StorageException(e); } } + private void initializeQueue() { + RocksIterator iter = db.newIterator(); + iter.seekToFirst(); + if (!iter.isValid()) { + // There is no data yet, nothing to do + return; + } + long firstKey = Longs.fromByteArray(iter.key()); + iter.seekToLast(); + long lastKey = Longs.fromByteArray(iter.key()); + + lastDequeuedKey = firstKey - 1; + oldestKey = firstKey; + lastEnqueuedKey = lastKey; + } + public static RocksDbTaskQueue create( final Path storageDirectory, final Function serializer, diff --git a/services/queue/src/test/java/tech/pegasys/pantheon/services/queue/RocksDbTaskQueueTest.java b/services/queue/src/test/java/tech/pegasys/pantheon/services/queue/RocksDbTaskQueueTest.java index 7d01093528..5f355b3743 100644 --- a/services/queue/src/test/java/tech/pegasys/pantheon/services/queue/RocksDbTaskQueueTest.java +++ b/services/queue/src/test/java/tech/pegasys/pantheon/services/queue/RocksDbTaskQueueTest.java @@ -41,22 +41,41 @@ private RocksDbTaskQueue createQueue(final Path dataDir) { } @Test - public void shouldIgnoreExistingData() throws Exception { + public void shouldResumeFromExistingQueue() throws Exception { + testResumeFromExistingQueue(10); + } + + @Test + public void shouldResumeFromExistingQueueWithOneElement() throws Exception { + testResumeFromExistingQueue(1); + } + + @Test + public void shouldResumeFromExistingQueueWithNoElements() throws Exception { + testResumeFromExistingQueue(0); + } + + private void testResumeFromExistingQueue(final int elementCount) throws Exception { final Path dataDir = folder.newFolder().toPath(); try (final RocksDbTaskQueue queue = createQueue(dataDir)) { - queue.enqueue(BytesValue.of(1)); - queue.enqueue(BytesValue.of(2)); - queue.enqueue(BytesValue.of(3)); + for (int i = 0; i < elementCount; i++) { + queue.enqueue(BytesValue.of(i)); + } } try (final RocksDbTaskQueue resumedQueue = createQueue(dataDir)) { - assertThat(resumedQueue.dequeue()).isEqualTo(null); + assertThat(resumedQueue.size()).isEqualTo(elementCount); + // Queue an additional element + resumedQueue.enqueue(BytesValue.of(99)); + assertThat(resumedQueue.size()).isEqualTo(elementCount + 1); - resumedQueue.enqueue(BytesValue.of(50)); - assertThat(resumedQueue.dequeue().getData()).isEqualTo(BytesValue.of(50)); + // Check that everything dequeues in order as expected + for (int i = 0; i < elementCount; i++) { + assertThat(resumedQueue.dequeue().getData()).isEqualTo(BytesValue.of(i)); + } + assertThat(resumedQueue.dequeue().getData()).isEqualTo(BytesValue.of(99)); - resumedQueue.enqueue(BytesValue.of(60)); - assertThat(resumedQueue.dequeue().getData()).isEqualTo(BytesValue.of(60)); + assertThat(resumedQueue.size()).isEqualTo(0); } } }