Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

[NC-2200] Update WorldStateDownloader to only filter out known code requests #777

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,16 +96,28 @@ private List<Block> blockSequence(
}

public List<Account> createRandomAccounts(final MutableWorldState worldState, final int count) {
return createRandomAccounts(worldState, count, .5f, .75f);
}

public List<Account> createRandomContractAccountsWithNonEmptyStorage(
final MutableWorldState worldState, final int count) {
return createRandomAccounts(worldState, count, 1f, 1f);
}

private List<Account> createRandomAccounts(
final MutableWorldState worldState,
final int count,
final float percentContractAccounts,
final float percentContractAccountsWithNonEmptyStorage) {
WorldUpdater updater = worldState.updater();
List<Account> accounts = new ArrayList<>(count);
for (int i = 0; i < count; i++) {
MutableAccount account = updater.getOrCreate(address());
// Make some accounts contract accounts
if (random.nextFloat() < .5) {
// Subset of random accounts are contract accounts
if (random.nextFloat() < percentContractAccounts) {
// Some percentage of accounts are contract accounts
account.setCode(bytesValue(5, 50));
if (random.nextFloat() < .75) {
// Add some storage for most contract accounts
if (random.nextFloat() < percentContractAccountsWithNonEmptyStorage) {
// Add some storage for contract accounts
int storageValues = random.nextInt(20) + 10;
for (int j = 0; j < storageValues; j++) {
account.setStorageValue(uint256(), uint256());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,19 @@ private CompletableFuture<?> sendAndProcessRequests(
// Queue child requests
request
.getChildRequests()
.filter(n -> !worldStateStorage.contains(n.getHash()))
.filter(this::filterChildRequests)
.forEach(pendingRequests::enqueue);
}
}
});
}

private boolean filterChildRequests(final NodeDataRequest request) {
// For now, just filter out requests for code that we already know about
return !(request.getRequestType() == RequestType.CODE
&& worldStateStorage.contains(request.getHash()));
}

private Map<Hash, BytesValue> mapNodeDataByHash(final List<BytesValue> data) {
// Map data by hash
Map<Hash, BytesValue> dataByHash = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,14 @@ public static Responder blockchainResponder(
};
}

public static Responder wrapResponderWithCollector(
final Responder responder, final List<MessageData> messageCollector) {
return (cap, msg) -> {
messageCollector.add(msg);
return responder.respond(cap, msg);
};
}

/**
* Create a responder that only responds with a fixed portion of the available data.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,37 @@
import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil;
import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer.Responder;
import tech.pegasys.pantheon.ethereum.eth.messages.EthPV63;
import tech.pegasys.pantheon.ethereum.eth.messages.GetNodeDataMessage;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.rlp.RLP;
import tech.pegasys.pantheon.ethereum.storage.keyvalue.KeyValueStorageWorldStateStorage;
import tech.pegasys.pantheon.ethereum.trie.MerklePatriciaTrie;
import tech.pegasys.pantheon.ethereum.trie.Node;
import tech.pegasys.pantheon.ethereum.trie.StoredMerklePatriciaTrie;
import tech.pegasys.pantheon.ethereum.trie.TrieNodeDecoder;
import tech.pegasys.pantheon.ethereum.worldstate.StateTrieAccountValue;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateArchive;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage.Updater;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.services.kvstore.InMemoryKeyValueStorage;
import tech.pegasys.pantheon.services.queue.BigQueue;
import tech.pegasys.pantheon.services.queue.InMemoryBigQueue;
import tech.pegasys.pantheon.util.bytes.Bytes32;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import tech.pegasys.pantheon.util.uint.UInt256;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import org.junit.Test;

Expand Down Expand Up @@ -174,6 +189,291 @@ public void canRecoverFromTimeouts() {
assertAccountsMatch(localWorldState, accounts);
}

@Test
public void doesNotRequestKnownCodeFromNetwork() {
BlockDataGenerator dataGen = new BlockDataGenerator(1);
final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create();

// Setup "remote" state
final WorldStateStorage remoteStorage =
new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage());
final WorldStateArchive remoteWorldStateArchive = new WorldStateArchive(remoteStorage);
final MutableWorldState remoteWorldState = remoteWorldStateArchive.getMutable();

// Generate accounts and save corresponding state root
final List<Account> accounts =
dataGen.createRandomContractAccountsWithNonEmptyStorage(remoteWorldState, 20);
final Hash stateRoot = remoteWorldState.rootHash();
final BlockHeader header =
dataGen.block(BlockOptions.create().setStateRoot(stateRoot).setBlockNumber(10)).getHeader();

// Create some peers
List<RespondingEthPeer> peers =
Stream.generate(
() -> EthProtocolManagerTestUtil.createPeer(ethProtocolManager, header.getNumber()))
.limit(5)
.collect(Collectors.toList());

BigQueue<NodeDataRequest> queue = new InMemoryBigQueue<>();
WorldStateStorage localStorage =
new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage());

// Seed local storage with some contract values
Map<Bytes32, BytesValue> knownCode = new HashMap<>();
accounts.subList(0, 5).forEach(a -> knownCode.put(a.getCodeHash(), a.getCode()));
Updater localStorageUpdater = localStorage.updater();
knownCode.forEach(localStorageUpdater::putCode);
localStorageUpdater.commit();

WorldStateDownloader downloader =
new WorldStateDownloader(
ethProtocolManager.ethContext(),
localStorage,
queue,
10,
10,
NoOpMetricsSystem.NO_OP_LABELLED_TIMER);

CompletableFuture<Void> result = downloader.run(header);

// Respond to node data requests
List<MessageData> sentMessages = new ArrayList<>();
Responder blockChainResponder =
RespondingEthPeer.blockchainResponder(mock(Blockchain.class), remoteWorldStateArchive);
Responder responder =
RespondingEthPeer.wrapResponderWithCollector(blockChainResponder, sentMessages);

while (!result.isDone()) {
for (RespondingEthPeer peer : peers) {
peer.respond(responder);
}
}

// Check that known code was not requested
List<Bytes32> requestedHashes =
sentMessages.stream()
.filter(m -> m.getCode() == EthPV63.GET_NODE_DATA)
.map(GetNodeDataMessage::readFrom)
.flatMap(m -> StreamSupport.stream(m.hashes().spliterator(), true))
.collect(Collectors.toList());
assertThat(requestedHashes.size()).isGreaterThan(0);
assertThat(Collections.disjoint(requestedHashes, knownCode.keySet())).isTrue();

// Check that all expected account data was downloaded
WorldStateArchive localWorldStateArchive = new WorldStateArchive(localStorage);
final WorldState localWorldState = localWorldStateArchive.get(stateRoot);
assertThat(result).isDone();
assertAccountsMatch(localWorldState, accounts);
}

@Test
public void doesRequestKnownAccountTrieNodesFromNetwork() {
BlockDataGenerator dataGen = new BlockDataGenerator(1);
final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create();

// Setup "remote" state
final WorldStateStorage remoteStorage =
new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage());
final WorldStateArchive remoteWorldStateArchive = new WorldStateArchive(remoteStorage);
final MutableWorldState remoteWorldState = remoteWorldStateArchive.getMutable();

// Generate accounts and save corresponding state root
final List<Account> accounts =
dataGen.createRandomContractAccountsWithNonEmptyStorage(remoteWorldState, 20);
final Hash stateRoot = remoteWorldState.rootHash();
final BlockHeader header =
dataGen.block(BlockOptions.create().setStateRoot(stateRoot).setBlockNumber(10)).getHeader();

// Create some peers
List<RespondingEthPeer> peers =
Stream.generate(
() -> EthProtocolManagerTestUtil.createPeer(ethProtocolManager, header.getNumber()))
.limit(5)
.collect(Collectors.toList());

BigQueue<NodeDataRequest> queue = new InMemoryBigQueue<>();
WorldStateStorage localStorage =
new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage());

// Seed local storage with some trie node values
Map<Bytes32, BytesValue> knownTrieNodes =
collectTrieNodesToBeRequested(remoteStorage, remoteWorldState.rootHash(), 5);
assertThat(knownTrieNodes.size()).isGreaterThan(0); // Sanity check
Updater localStorageUpdater = localStorage.updater();
knownTrieNodes.forEach(localStorageUpdater::putAccountStateTrieNode);
localStorageUpdater.commit();

WorldStateDownloader downloader =
new WorldStateDownloader(
ethProtocolManager.ethContext(),
localStorage,
queue,
10,
10,
NoOpMetricsSystem.NO_OP_LABELLED_TIMER);

CompletableFuture<Void> result = downloader.run(header);

// Respond to node data requests
List<MessageData> sentMessages = new ArrayList<>();
Responder blockChainResponder =
RespondingEthPeer.blockchainResponder(mock(Blockchain.class), remoteWorldStateArchive);
Responder responder =
RespondingEthPeer.wrapResponderWithCollector(blockChainResponder, sentMessages);

while (!result.isDone()) {
for (RespondingEthPeer peer : peers) {
peer.respond(responder);
}
}

// Check that known trie nodes were requested
List<Bytes32> requestedHashes =
sentMessages.stream()
.filter(m -> m.getCode() == EthPV63.GET_NODE_DATA)
.map(GetNodeDataMessage::readFrom)
.flatMap(m -> StreamSupport.stream(m.hashes().spliterator(), true))
.collect(Collectors.toList());
assertThat(requestedHashes.size()).isGreaterThan(0);
assertThat(requestedHashes).containsAll(knownTrieNodes.keySet());

// Check that all expected account data was downloaded
WorldStateArchive localWorldStateArchive = new WorldStateArchive(localStorage);
final WorldState localWorldState = localWorldStateArchive.get(stateRoot);
assertThat(result).isDone();
assertAccountsMatch(localWorldState, accounts);
}

@Test
public void doesRequestKnownStorageTrieNodesFromNetwork() {
BlockDataGenerator dataGen = new BlockDataGenerator(1);
final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create();

// Setup "remote" state
final WorldStateStorage remoteStorage =
new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage());
final WorldStateArchive remoteWorldStateArchive = new WorldStateArchive(remoteStorage);
final MutableWorldState remoteWorldState = remoteWorldStateArchive.getMutable();

// Generate accounts and save corresponding state root
final List<Account> accounts =
dataGen.createRandomContractAccountsWithNonEmptyStorage(remoteWorldState, 20);
final Hash stateRoot = remoteWorldState.rootHash();
final BlockHeader header =
dataGen.block(BlockOptions.create().setStateRoot(stateRoot).setBlockNumber(10)).getHeader();

// Create some peers
List<RespondingEthPeer> peers =
Stream.generate(
() -> EthProtocolManagerTestUtil.createPeer(ethProtocolManager, header.getNumber()))
.limit(5)
.collect(Collectors.toList());

BigQueue<NodeDataRequest> queue = new InMemoryBigQueue<>();
WorldStateStorage localStorage =
new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage());

// Seed local storage with some trie node values
List<Bytes32> storageRootHashes =
new StoredMerklePatriciaTrie<>(
remoteStorage::getNodeData,
remoteWorldState.rootHash(),
Function.identity(),
Function.identity())
.entriesFrom(Bytes32.ZERO, 5).values().stream()
.map(RLP::input)
.map(StateTrieAccountValue::readFrom)
.map(StateTrieAccountValue::getStorageRoot)
.collect(Collectors.toList());
Map<Bytes32, BytesValue> knownTrieNodes = new HashMap<>();
for (Bytes32 storageRootHash : storageRootHashes) {
knownTrieNodes.putAll(collectTrieNodesToBeRequested(remoteStorage, storageRootHash, 5));
}
assertThat(knownTrieNodes.size()).isGreaterThan(0); // Sanity check
Updater localStorageUpdater = localStorage.updater();
knownTrieNodes.forEach(localStorageUpdater::putAccountStorageTrieNode);
localStorageUpdater.commit();

WorldStateDownloader downloader =
new WorldStateDownloader(
ethProtocolManager.ethContext(),
localStorage,
queue,
10,
10,
NoOpMetricsSystem.NO_OP_LABELLED_TIMER);

CompletableFuture<Void> result = downloader.run(header);

// Respond to node data requests
List<MessageData> sentMessages = new ArrayList<>();
Responder blockChainResponder =
RespondingEthPeer.blockchainResponder(mock(Blockchain.class), remoteWorldStateArchive);
Responder responder =
RespondingEthPeer.wrapResponderWithCollector(blockChainResponder, sentMessages);

while (!result.isDone()) {
for (RespondingEthPeer peer : peers) {
peer.respond(responder);
}
}

// Check that known trie nodes were requested
List<Bytes32> requestedHashes =
sentMessages.stream()
.filter(m -> m.getCode() == EthPV63.GET_NODE_DATA)
.map(GetNodeDataMessage::readFrom)
.flatMap(m -> StreamSupport.stream(m.hashes().spliterator(), true))
.collect(Collectors.toList());
assertThat(requestedHashes.size()).isGreaterThan(0);
assertThat(requestedHashes).containsAll(knownTrieNodes.keySet());

// Check that all expected account data was downloaded
WorldStateArchive localWorldStateArchive = new WorldStateArchive(localStorage);
final WorldState localWorldState = localWorldStateArchive.get(stateRoot);
assertThat(result).isDone();
assertAccountsMatch(localWorldState, accounts);
}

/**
* Walks through trie represented by the given rootHash and returns hash-node pairs that would
* need to be requested from the network in order to reconstruct this trie.
*
* @param storage Storage holding node data required to reconstitute the trie represented by
* rootHash
* @param rootHash The hash of the root node of some trie
* @param maxNodes The maximum number of values to collect before returning
* @return A list of hash-node pairs
*/
private Map<Bytes32, BytesValue> collectTrieNodesToBeRequested(
final WorldStateStorage storage, final Bytes32 rootHash, final int maxNodes) {
Map<Bytes32, BytesValue> trieNodes = new HashMap<>();

TrieNodeDecoder decoder = TrieNodeDecoder.create();
BytesValue rootNode = storage.getNodeData(rootHash).get();

// Walk through hash-referenced nodes
List<Node<BytesValue>> hashReferencedNodes = new ArrayList<>();
hashReferencedNodes.add(decoder.decode(rootNode));
while (!hashReferencedNodes.isEmpty() && trieNodes.size() < maxNodes) {
Node<BytesValue> currentNode = hashReferencedNodes.remove(0);
List<Node<BytesValue>> children = new ArrayList<>();
currentNode.getChildren().ifPresent(children::addAll);
while (!children.isEmpty() && trieNodes.size() < maxNodes) {
Node<BytesValue> child = children.remove(0);
if (child.isReferencedByHash()) {
BytesValue childNode = storage.getNodeData(child.getHash()).get();
trieNodes.put(child.getHash(), childNode);
hashReferencedNodes.add(decoder.decode(childNode));
} else {
child.getChildren().ifPresent(children::addAll);
}
}
}

return trieNodes;
}

private void downloadAvailableWorldStateFromPeers(
final int peerCount,
final int accountCount,
Expand Down