Skip to content

Commit

Permalink
Fix state download race condition by creating a TaskQueue API (PegaSy…
Browse files Browse the repository at this point in the history
  • Loading branch information
mbaxter authored and tmohay committed Feb 14, 2019
1 parent a9263c3 commit b277f38
Show file tree
Hide file tree
Showing 13 changed files with 443 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.services.queue.BigQueue;
import tech.pegasys.pantheon.services.queue.BytesQueue;
import tech.pegasys.pantheon.services.queue.BytesQueueAdapter;
import tech.pegasys.pantheon.services.queue.RocksDbQueue;
import tech.pegasys.pantheon.services.queue.BytesTaskQueue;
import tech.pegasys.pantheon.services.queue.BytesTaskQueueAdapter;
import tech.pegasys.pantheon.services.queue.RocksDbTaskQueue;
import tech.pegasys.pantheon.services.queue.TaskQueue;

import java.io.File;
import java.io.IOException;
Expand All @@ -50,14 +50,14 @@ class FastSynchronizer<C> {

private final FastSyncDownloader<C> fastSyncDownloader;
private final Path fastSyncDataDirectory;
private final BigQueue<NodeDataRequest> stateQueue;
private final TaskQueue<NodeDataRequest> stateQueue;
private final WorldStateDownloader worldStateDownloader;
private final FastSyncState initialSyncState;

private FastSynchronizer(
final FastSyncDownloader<C> fastSyncDownloader,
final Path fastSyncDataDirectory,
final BigQueue<NodeDataRequest> stateQueue,
final TaskQueue<NodeDataRequest> stateQueue,
final WorldStateDownloader worldStateDownloader,
final FastSyncState initialSyncState) {
this.fastSyncDownloader = fastSyncDownloader;
Expand Down Expand Up @@ -94,7 +94,7 @@ public static <C> Optional<FastSynchronizer<C>> create(
return Optional.empty();
}

final BigQueue<NodeDataRequest> stateQueue =
final TaskQueue<NodeDataRequest> stateQueue =
createWorldStateDownloaderQueue(getStateQueueDirectory(dataDirectory), metricsSystem);
final WorldStateDownloader worldStateDownloader =
new WorldStateDownloader(
Expand Down Expand Up @@ -166,10 +166,10 @@ private static void ensureDirectoryExists(final File dir) {
}
}

private static BigQueue<NodeDataRequest> createWorldStateDownloaderQueue(
private static TaskQueue<NodeDataRequest> createWorldStateDownloaderQueue(
final Path dataDirectory, final MetricsSystem metricsSystem) {
final BytesQueue bytesQueue = RocksDbQueue.create(dataDirectory, metricsSystem);
return new BytesQueueAdapter<>(
final BytesTaskQueue bytesQueue = RocksDbTaskQueue.create(dataDirectory, metricsSystem);
return new BytesTaskQueueAdapter<>(
bytesQueue, NodeDataRequest::serialize, NodeDataRequest::deserialize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.services.queue.BigQueue;
import tech.pegasys.pantheon.services.queue.TaskQueue;
import tech.pegasys.pantheon.services.queue.TaskQueue.Task;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.time.Duration;
Expand Down Expand Up @@ -55,7 +56,7 @@ private enum Status {
}

private final EthContext ethContext;
private final BigQueue<NodeDataRequest> pendingRequests;
private final TaskQueue<NodeDataRequest> pendingRequests;
private final int hashCountPerRequest;
private final int maxOutstandingRequests;
private final AtomicInteger outstandingRequests = new AtomicInteger(0);
Expand All @@ -69,7 +70,7 @@ private enum Status {
public WorldStateDownloader(
final EthContext ethContext,
final WorldStateStorage worldStateStorage,
final BigQueue<NodeDataRequest> pendingRequests,
final TaskQueue<NodeDataRequest> pendingRequests,
final int hashCountPerRequest,
final int maxOutstandingRequests,
final LabelledMetric<OperationTimer> ethTasksTimer,
Expand Down Expand Up @@ -140,28 +141,31 @@ private void requestNodeData(final BlockHeader header) {
EthPeer peer = maybePeer.get();

// Collect data to be requested
List<NodeDataRequest> toRequest = new ArrayList<>();
List<Task<NodeDataRequest>> toRequest = new ArrayList<>();
while (toRequest.size() < hashCountPerRequest) {
NodeDataRequest pendingRequest = pendingRequests.dequeue();
if (pendingRequest == null) {
Task<NodeDataRequest> pendingRequestTask = pendingRequests.dequeue();
if (pendingRequestTask == null) {
break;
}
NodeDataRequest pendingRequest = pendingRequestTask.getData();
final Optional<BytesValue> existingData =
pendingRequest.getExistingData(worldStateStorage);
if (existingData.isPresent()) {
pendingRequest.setData(existingData.get());
queueChildRequests(pendingRequest);
pendingRequestTask.markCompleted();
continue;
}
toRequest.add(pendingRequest);
toRequest.add(pendingRequestTask);
}

// Request and process node data
outstandingRequests.incrementAndGet();
sendAndProcessRequests(peer, toRequest, header)
.whenComplete(
(res, error) -> {
if (outstandingRequests.decrementAndGet() == 0 && pendingRequests.isEmpty()) {
if (outstandingRequests.decrementAndGet() == 0
&& pendingRequests.allTasksCompleted()) {
// We're done
final Updater updater = worldStateStorage.updater();
updater.putAccountStateTrieNode(header.getStateRoot(), rootNode);
Expand Down Expand Up @@ -201,9 +205,15 @@ private CompletableFuture<?> waitForNewPeer() {
}

private CompletableFuture<?> sendAndProcessRequests(
final EthPeer peer, final List<NodeDataRequest> requests, final BlockHeader blockHeader) {
final EthPeer peer,
final List<Task<NodeDataRequest>> requestTasks,
final BlockHeader blockHeader) {
List<Hash> hashes =
requests.stream().map(NodeDataRequest::getHash).distinct().collect(Collectors.toList());
requestTasks.stream()
.map(Task::getData)
.map(NodeDataRequest::getHash)
.distinct()
.collect(Collectors.toList());
return GetNodeDataFromPeerTask.forHashes(ethContext, hashes, ethTasksTimer)
.assignPeer(peer)
.run()
Expand All @@ -213,11 +223,12 @@ private CompletableFuture<?> sendAndProcessRequests(
(data, err) -> {
boolean requestFailed = err != null;
Updater storageUpdater = worldStateStorage.updater();
for (NodeDataRequest request : requests) {
for (Task<NodeDataRequest> task : requestTasks) {
NodeDataRequest request = task.getData();
BytesValue matchingData = requestFailed ? null : data.get(request.getHash());
if (matchingData == null) {
retriedRequestsTotal.inc();
pendingRequests.enqueue(request);
task.markFailed();
} else {
completedRequestsCounter.inc();
// Persist request data
Expand All @@ -229,6 +240,7 @@ private CompletableFuture<?> sendAndProcessRequests(
}

queueChildRequests(request);
task.markCompleted();
}
}
storageUpdater.commit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage.Updater;
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.services.kvstore.InMemoryKeyValueStorage;
import tech.pegasys.pantheon.services.queue.BigQueue;
import tech.pegasys.pantheon.services.queue.InMemoryBigQueue;
import tech.pegasys.pantheon.services.queue.InMemoryTaskQueue;
import tech.pegasys.pantheon.services.queue.TaskQueue;
import tech.pegasys.pantheon.util.bytes.Bytes32;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import tech.pegasys.pantheon.util.uint.UInt256;
Expand Down Expand Up @@ -117,7 +117,7 @@ public void downloadEmptyWorldState() {
.limit(5)
.collect(Collectors.toList());

BigQueue<NodeDataRequest> queue = new InMemoryBigQueue<>();
TaskQueue<NodeDataRequest> queue = new InMemoryTaskQueue<>();
WorldStateStorage localStorage =
new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage());
WorldStateDownloader downloader =
Expand Down Expand Up @@ -164,7 +164,7 @@ public void downloadAlreadyAvailableWorldState() {
.limit(5)
.collect(Collectors.toList());

BigQueue<NodeDataRequest> queue = new InMemoryBigQueue<>();
TaskQueue<NodeDataRequest> queue = new InMemoryTaskQueue<>();
WorldStateDownloader downloader =
new WorldStateDownloader(
ethProtocolManager.ethContext(),
Expand Down Expand Up @@ -210,7 +210,7 @@ public void canRecoverFromTimeouts() {
.limit(5)
.collect(Collectors.toList());

BigQueue<NodeDataRequest> queue = new InMemoryBigQueue<>();
TaskQueue<NodeDataRequest> queue = new InMemoryTaskQueue<>();
WorldStateStorage localStorage =
new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage());
WorldStateDownloader downloader =
Expand Down Expand Up @@ -271,7 +271,7 @@ public void doesNotRequestKnownCodeFromNetwork() {
.limit(5)
.collect(Collectors.toList());

BigQueue<NodeDataRequest> queue = new InMemoryBigQueue<>();
TaskQueue<NodeDataRequest> queue = new InMemoryTaskQueue<>();
WorldStateStorage localStorage =
new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage());

Expand Down Expand Up @@ -349,7 +349,7 @@ public void doesRequestKnownAccountTrieNodesFromNetwork() {
.limit(5)
.collect(Collectors.toList());

BigQueue<NodeDataRequest> queue = new InMemoryBigQueue<>();
TaskQueue<NodeDataRequest> queue = new InMemoryTaskQueue<>();
WorldStateStorage localStorage =
new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage());

Expand Down Expand Up @@ -441,7 +441,7 @@ public void doesRequestKnownStorageTrieNodesFromNetwork() {
.limit(5)
.collect(Collectors.toList());

BigQueue<NodeDataRequest> queue = new InMemoryBigQueue<>();
TaskQueue<NodeDataRequest> queue = new InMemoryTaskQueue<>();
WorldStateStorage localStorage =
new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage());

Expand Down Expand Up @@ -606,7 +606,7 @@ private void downloadAvailableWorldStateFromPeers(
.getHeader();
assertThat(otherStateRoot).isNotEqualTo(stateRoot); // Sanity check

BigQueue<NodeDataRequest> queue = new InMemoryBigQueue<>();
TaskQueue<NodeDataRequest> queue = new InMemoryTaskQueue<>();
WorldStateStorage localStorage =
new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage());
WorldStateArchive localWorldStateArchive = new WorldStateArchive(localStorage);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@

import tech.pegasys.pantheon.util.bytes.BytesValue;

public interface BytesQueue extends BigQueue<BytesValue> {}
public interface BytesTaskQueue extends TaskQueue<BytesValue> {}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
import java.io.IOException;
import java.util.function.Function;

public class BytesQueueAdapter<T> implements BigQueue<T> {
public class BytesTaskQueueAdapter<T> implements TaskQueue<T> {

private final BytesQueue queue;
private final BytesTaskQueue queue;
private final Function<T, BytesValue> serializer;
private final Function<BytesValue, T> deserializer;

public BytesQueueAdapter(
final BytesQueue queue,
public BytesTaskQueueAdapter(
final BytesTaskQueue queue,
final Function<T, BytesValue> serializer,
final Function<BytesValue, T> deserializer) {
this.queue = queue;
Expand All @@ -33,23 +33,63 @@ public BytesQueueAdapter(
}

@Override
public void enqueue(final T value) {
queue.enqueue(serializer.apply(value));
public void enqueue(final T taskData) {
queue.enqueue(serializer.apply(taskData));
}

@Override
public T dequeue() {
BytesValue value = queue.dequeue();
return value == null ? null : deserializer.apply(value);
public Task<T> dequeue() {
Task<BytesValue> task = queue.dequeue();
if (task == null) {
return null;
}

T data = deserializer.apply(task.getData());
return new AdapterTask<>(task, data);
}

@Override
public long size() {
return queue.size();
}

@Override
public boolean isEmpty() {
return queue.isEmpty();
}

@Override
public boolean allTasksCompleted() {
return queue.allTasksCompleted();
}

@Override
public void close() throws IOException {
queue.close();
}

private static class AdapterTask<T> implements Task<T> {
private final Task<BytesValue> wrappedTask;
private final T data;

public AdapterTask(final Task<BytesValue> wrappedTask, final T data) {
this.wrappedTask = wrappedTask;
this.data = data;
}

@Override
public T getData() {
return data;
}

@Override
public void markCompleted() {
wrappedTask.markCompleted();
}

@Override
public void markFailed() {
wrappedTask.markFailed();
}
}
}
Loading

0 comments on commit b277f38

Please sign in to comment.