Skip to content

Commit

Permalink
Fix task queue so that the updated failure count for requests is actu…
Browse files Browse the repository at this point in the history
…ally stored. (PegaSysEng#893)
  • Loading branch information
ajsutton authored and tmohay committed Feb 20, 2019
1 parent ffe38f1 commit abd490c
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 209 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.services.queue.BytesTaskQueue;
import tech.pegasys.pantheon.services.queue.BytesTaskQueueAdapter;
import tech.pegasys.pantheon.services.queue.RocksDbTaskQueue;
import tech.pegasys.pantheon.services.queue.TaskQueue;

Expand Down Expand Up @@ -170,8 +168,7 @@ private static void ensureDirectoryExists(final File dir) {

private static TaskQueue<NodeDataRequest> createWorldStateDownloaderQueue(
final Path dataDirectory, final MetricsSystem metricsSystem) {
final BytesTaskQueue bytesQueue = RocksDbTaskQueue.create(dataDirectory, metricsSystem);
return new BytesTaskQueueAdapter<>(
bytesQueue, NodeDataRequest::serialize, NodeDataRequest::deserialize);
return RocksDbTaskQueue.create(
dataDirectory, NodeDataRequest::serialize, NodeDataRequest::deserialize, metricsSystem);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.services.queue.TaskQueue;
import tech.pegasys.pantheon.services.queue.TaskQueue.Task;
import tech.pegasys.pantheon.util.ExceptionUtils;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.time.Duration;
Expand All @@ -43,6 +44,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -73,6 +75,7 @@ private enum Status {
private volatile CompletableFuture<Void> future;
private volatile Status status = Status.IDLE;
private volatile BytesValue rootNode;
private final AtomicInteger highestRetryCount = new AtomicInteger(0);

public WorldStateDownloader(
final EthContext ethContext,
Expand Down Expand Up @@ -106,6 +109,12 @@ public WorldStateDownloader(
MetricCategory.SYNCHRONIZER,
"world_state_retried_requests_total",
"Total number of node data requests repeated as part of fast sync world state download");

metricsSystem.createIntegerGauge(
MetricCategory.SYNCHRONIZER,
"world_state_node_request_failures_max",
"Highest number of times a node data request has been retried in this download",
highestRetryCount::get);
}

public CompletableFuture<Void> run(final BlockHeader header) {
Expand All @@ -115,16 +124,17 @@ public CompletableFuture<Void> run(final BlockHeader header) {
header.getHash());
synchronized (this) {
if (status == Status.RUNNING) {
CompletableFuture<Void> failed = new CompletableFuture<>();
final CompletableFuture<Void> failed = new CompletableFuture<>();
failed.completeExceptionally(
new IllegalStateException(
"Cannot run an already running " + this.getClass().getSimpleName()));
return failed;
}
status = Status.RUNNING;
future = createFuture();
highestRetryCount.set(0);

Hash stateRoot = header.getStateRoot();
final Hash stateRoot = header.getStateRoot();
if (worldStateStorage.isWorldStateAvailable(stateRoot)) {
// If we're requesting data for an existing world state, we're already done
markDone();
Expand All @@ -144,23 +154,23 @@ public void cancel() {
private void requestNodeData(final BlockHeader header) {
if (sendingRequests.compareAndSet(false, true)) {
while (shouldRequestNodeData()) {
Optional<EthPeer> maybePeer = ethContext.getEthPeers().idlePeer(header.getNumber());
final Optional<EthPeer> maybePeer = ethContext.getEthPeers().idlePeer(header.getNumber());

if (!maybePeer.isPresent()) {
// If no peer is available, wait and try again
waitForNewPeer().whenComplete((r, t) -> requestNodeData(header));
break;
} else {
EthPeer peer = maybePeer.get();
final EthPeer peer = maybePeer.get();

// Collect data to be requested
List<Task<NodeDataRequest>> toRequest = new ArrayList<>();
final List<Task<NodeDataRequest>> toRequest = new ArrayList<>();
while (toRequest.size() < hashCountPerRequest) {
Task<NodeDataRequest> pendingRequestTask = pendingRequests.dequeue();
final Task<NodeDataRequest> pendingRequestTask = pendingRequests.dequeue();
if (pendingRequestTask == null) {
break;
}
NodeDataRequest pendingRequest = pendingRequestTask.getData();
final NodeDataRequest pendingRequest = pendingRequestTask.getData();
final Optional<BytesValue> existingData =
pendingRequest.getExistingData(worldStateStorage);
if (existingData.isPresent()) {
Expand All @@ -176,7 +186,7 @@ private void requestNodeData(final BlockHeader header) {
sendAndProcessRequests(peer, toRequest, header)
.whenComplete(
(task, error) -> {
boolean done;
final boolean done;
synchronized (this) {
outstandingRequests.remove(task);
done =
Expand Down Expand Up @@ -217,13 +227,13 @@ private CompletableFuture<AbstractPeerTask<List<BytesValue>>> sendAndProcessRequ
final EthPeer peer,
final List<Task<NodeDataRequest>> requestTasks,
final BlockHeader blockHeader) {
List<Hash> hashes =
final List<Hash> hashes =
requestTasks.stream()
.map(Task::getData)
.map(NodeDataRequest::getHash)
.distinct()
.collect(Collectors.toList());
AbstractPeerTask<List<BytesValue>> ethTask =
final AbstractPeerTask<List<BytesValue>> ethTask =
GetNodeDataFromPeerTask.forHashes(ethContext, hashes, ethTasksTimer).assignPeer(peer);
outstandingRequests.add(ethTask);
return ethTask
Expand All @@ -232,14 +242,15 @@ private CompletableFuture<AbstractPeerTask<List<BytesValue>>> sendAndProcessRequ
.thenApply(this::mapNodeDataByHash)
.handle(
(data, err) -> {
boolean requestFailed = err != null;
Updater storageUpdater = worldStateStorage.updater();
for (Task<NodeDataRequest> task : requestTasks) {
NodeDataRequest request = task.getData();
BytesValue matchingData = requestFailed ? null : data.get(request.getHash());
final boolean requestFailed = err != null;
final Updater storageUpdater = worldStateStorage.updater();
for (final Task<NodeDataRequest> task : requestTasks) {
final NodeDataRequest request = task.getData();
final BytesValue matchingData = requestFailed ? null : data.get(request.getHash());
if (matchingData == null) {
retriedRequestsTotal.inc();
int requestFailures = request.trackFailure();
final int requestFailures = request.trackFailure();
updateHighestRetryCount(requestFailures);
if (requestFailures > maxNodeRequestRetries) {
handleStalledDownload();
}
Expand All @@ -263,6 +274,14 @@ private CompletableFuture<AbstractPeerTask<List<BytesValue>>> sendAndProcessRequ
});
}

private void updateHighestRetryCount(final int requestFailures) {
int previousHighestRetry = highestRetryCount.get();
while (requestFailures > previousHighestRetry) {
highestRetryCount.compareAndSet(previousHighestRetry, requestFailures);
previousHighestRetry = highestRetryCount.get();
}
}

private synchronized void queueChildRequests(final NodeDataRequest request) {
if (status == Status.RUNNING) {
request.getChildRequests().forEach(pendingRequests::enqueue);
Expand All @@ -277,15 +296,17 @@ private synchronized CompletableFuture<Void> getFuture() {
}

private CompletableFuture<Void> createFuture() {
CompletableFuture<Void> future = new CompletableFuture<>();
final CompletableFuture<Void> future = new CompletableFuture<>();
future.whenComplete(
(res, err) -> {
// Handle cancellations
if (future.isCancelled()) {
LOG.info("World state download cancelled");
doCancelDownload();
} else if (err != null) {
LOG.info("World state download failed. ", err);
if (!(ExceptionUtils.rootCause(err) instanceof StalledDownloadException)) {
LOG.info("World state download failed. ", err);
}
doCancelDownload();
}
});
Expand All @@ -297,14 +318,14 @@ private synchronized void handleStalledDownload() {
"Download stalled due to too many failures to retrieve node data (>"
+ maxNodeRequestRetries
+ " failures)";
WorldStateDownloaderException e = new StalledDownloadException(message);
final WorldStateDownloaderException e = new StalledDownloadException(message);
future.completeExceptionally(e);
}

private synchronized void doCancelDownload() {
status = Status.CANCELLED;
pendingRequests.clear();
for (EthTask<?> outstandingRequest : outstandingRequests) {
for (final EthTask<?> outstandingRequest : outstandingRequests) {
outstandingRequest.cancel();
}
}
Expand All @@ -323,7 +344,7 @@ private boolean isRootState(final BlockHeader blockHeader, final NodeDataRequest

private Map<Hash, BytesValue> mapNodeDataByHash(final List<BytesValue> data) {
// Map data by hash
Map<Hash, BytesValue> dataByHash = new HashMap<>();
final Map<Hash, BytesValue> dataByHash = new HashMap<>();
data.stream().forEach(d -> dataByHash.put(Hash.hash(d), d));
return dataByHash;
}
Expand Down

This file was deleted.

This file was deleted.

Loading

0 comments on commit abd490c

Please sign in to comment.