diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/FastSynchronizer.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/FastSynchronizer.java index 470e5f10c7..ebff6fe0f7 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/FastSynchronizer.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/FastSynchronizer.java @@ -29,8 +29,6 @@ import tech.pegasys.pantheon.metrics.MetricCategory; import tech.pegasys.pantheon.metrics.MetricsSystem; import tech.pegasys.pantheon.metrics.OperationTimer; -import tech.pegasys.pantheon.services.queue.BytesTaskQueue; -import tech.pegasys.pantheon.services.queue.BytesTaskQueueAdapter; import tech.pegasys.pantheon.services.queue.RocksDbTaskQueue; import tech.pegasys.pantheon.services.queue.TaskQueue; @@ -170,8 +168,7 @@ private static void ensureDirectoryExists(final File dir) { private static TaskQueue createWorldStateDownloaderQueue( final Path dataDirectory, final MetricsSystem metricsSystem) { - final BytesTaskQueue bytesQueue = RocksDbTaskQueue.create(dataDirectory, metricsSystem); - return new BytesTaskQueueAdapter<>( - bytesQueue, NodeDataRequest::serialize, NodeDataRequest::deserialize); + return RocksDbTaskQueue.create( + dataDirectory, NodeDataRequest::serialize, NodeDataRequest::deserialize, metricsSystem); } } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java index 2bb8bc8c57..d1e9525f54 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java @@ -30,6 +30,7 @@ import tech.pegasys.pantheon.metrics.OperationTimer; import tech.pegasys.pantheon.services.queue.TaskQueue; import tech.pegasys.pantheon.services.queue.TaskQueue.Task; +import tech.pegasys.pantheon.util.ExceptionUtils; import tech.pegasys.pantheon.util.bytes.BytesValue; import java.time.Duration; @@ -43,6 +44,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import org.apache.logging.log4j.LogManager; @@ -73,6 +75,7 @@ private enum Status { private volatile CompletableFuture future; private volatile Status status = Status.IDLE; private volatile BytesValue rootNode; + private final AtomicInteger highestRetryCount = new AtomicInteger(0); public WorldStateDownloader( final EthContext ethContext, @@ -106,6 +109,12 @@ public WorldStateDownloader( MetricCategory.SYNCHRONIZER, "world_state_retried_requests_total", "Total number of node data requests repeated as part of fast sync world state download"); + + metricsSystem.createIntegerGauge( + MetricCategory.SYNCHRONIZER, + "world_state_node_request_failures_max", + "Highest number of times a node data request has been retried in this download", + highestRetryCount::get); } public CompletableFuture run(final BlockHeader header) { @@ -115,7 +124,7 @@ public CompletableFuture run(final BlockHeader header) { header.getHash()); synchronized (this) { if (status == Status.RUNNING) { - CompletableFuture failed = new CompletableFuture<>(); + final CompletableFuture failed = new CompletableFuture<>(); failed.completeExceptionally( new IllegalStateException( "Cannot run an already running " + this.getClass().getSimpleName())); @@ -123,8 +132,9 @@ public CompletableFuture run(final BlockHeader header) { } status = Status.RUNNING; future = createFuture(); + highestRetryCount.set(0); - Hash stateRoot = header.getStateRoot(); + final Hash stateRoot = header.getStateRoot(); if (worldStateStorage.isWorldStateAvailable(stateRoot)) { // If we're requesting data for an existing world state, we're already done markDone(); @@ -144,23 +154,23 @@ public void cancel() { private void requestNodeData(final BlockHeader header) { if (sendingRequests.compareAndSet(false, true)) { while (shouldRequestNodeData()) { - Optional maybePeer = ethContext.getEthPeers().idlePeer(header.getNumber()); + final Optional maybePeer = ethContext.getEthPeers().idlePeer(header.getNumber()); if (!maybePeer.isPresent()) { // If no peer is available, wait and try again waitForNewPeer().whenComplete((r, t) -> requestNodeData(header)); break; } else { - EthPeer peer = maybePeer.get(); + final EthPeer peer = maybePeer.get(); // Collect data to be requested - List> toRequest = new ArrayList<>(); + final List> toRequest = new ArrayList<>(); while (toRequest.size() < hashCountPerRequest) { - Task pendingRequestTask = pendingRequests.dequeue(); + final Task pendingRequestTask = pendingRequests.dequeue(); if (pendingRequestTask == null) { break; } - NodeDataRequest pendingRequest = pendingRequestTask.getData(); + final NodeDataRequest pendingRequest = pendingRequestTask.getData(); final Optional existingData = pendingRequest.getExistingData(worldStateStorage); if (existingData.isPresent()) { @@ -176,7 +186,7 @@ private void requestNodeData(final BlockHeader header) { sendAndProcessRequests(peer, toRequest, header) .whenComplete( (task, error) -> { - boolean done; + final boolean done; synchronized (this) { outstandingRequests.remove(task); done = @@ -217,13 +227,13 @@ private CompletableFuture>> sendAndProcessRequ final EthPeer peer, final List> requestTasks, final BlockHeader blockHeader) { - List hashes = + final List hashes = requestTasks.stream() .map(Task::getData) .map(NodeDataRequest::getHash) .distinct() .collect(Collectors.toList()); - AbstractPeerTask> ethTask = + final AbstractPeerTask> ethTask = GetNodeDataFromPeerTask.forHashes(ethContext, hashes, ethTasksTimer).assignPeer(peer); outstandingRequests.add(ethTask); return ethTask @@ -232,14 +242,15 @@ private CompletableFuture>> sendAndProcessRequ .thenApply(this::mapNodeDataByHash) .handle( (data, err) -> { - boolean requestFailed = err != null; - Updater storageUpdater = worldStateStorage.updater(); - for (Task task : requestTasks) { - NodeDataRequest request = task.getData(); - BytesValue matchingData = requestFailed ? null : data.get(request.getHash()); + final boolean requestFailed = err != null; + final Updater storageUpdater = worldStateStorage.updater(); + for (final Task task : requestTasks) { + final NodeDataRequest request = task.getData(); + final BytesValue matchingData = requestFailed ? null : data.get(request.getHash()); if (matchingData == null) { retriedRequestsTotal.inc(); - int requestFailures = request.trackFailure(); + final int requestFailures = request.trackFailure(); + updateHighestRetryCount(requestFailures); if (requestFailures > maxNodeRequestRetries) { handleStalledDownload(); } @@ -263,6 +274,14 @@ private CompletableFuture>> sendAndProcessRequ }); } + private void updateHighestRetryCount(final int requestFailures) { + int previousHighestRetry = highestRetryCount.get(); + while (requestFailures > previousHighestRetry) { + highestRetryCount.compareAndSet(previousHighestRetry, requestFailures); + previousHighestRetry = highestRetryCount.get(); + } + } + private synchronized void queueChildRequests(final NodeDataRequest request) { if (status == Status.RUNNING) { request.getChildRequests().forEach(pendingRequests::enqueue); @@ -277,7 +296,7 @@ private synchronized CompletableFuture getFuture() { } private CompletableFuture createFuture() { - CompletableFuture future = new CompletableFuture<>(); + final CompletableFuture future = new CompletableFuture<>(); future.whenComplete( (res, err) -> { // Handle cancellations @@ -285,7 +304,9 @@ private CompletableFuture createFuture() { LOG.info("World state download cancelled"); doCancelDownload(); } else if (err != null) { - LOG.info("World state download failed. ", err); + if (!(ExceptionUtils.rootCause(err) instanceof StalledDownloadException)) { + LOG.info("World state download failed. ", err); + } doCancelDownload(); } }); @@ -297,14 +318,14 @@ private synchronized void handleStalledDownload() { "Download stalled due to too many failures to retrieve node data (>" + maxNodeRequestRetries + " failures)"; - WorldStateDownloaderException e = new StalledDownloadException(message); + final WorldStateDownloaderException e = new StalledDownloadException(message); future.completeExceptionally(e); } private synchronized void doCancelDownload() { status = Status.CANCELLED; pendingRequests.clear(); - for (EthTask outstandingRequest : outstandingRequests) { + for (final EthTask outstandingRequest : outstandingRequests) { outstandingRequest.cancel(); } } @@ -323,7 +344,7 @@ private boolean isRootState(final BlockHeader blockHeader, final NodeDataRequest private Map mapNodeDataByHash(final List data) { // Map data by hash - Map dataByHash = new HashMap<>(); + final Map dataByHash = new HashMap<>(); data.stream().forEach(d -> dataByHash.put(Hash.hash(d), d)); return dataByHash; } diff --git a/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/BytesTaskQueue.java b/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/BytesTaskQueue.java deleted file mode 100644 index 6039bb0de4..0000000000 --- a/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/BytesTaskQueue.java +++ /dev/null @@ -1,17 +0,0 @@ -/* - * Copyright 2019 ConsenSys AG. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package tech.pegasys.pantheon.services.queue; - -import tech.pegasys.pantheon.util.bytes.BytesValue; - -public interface BytesTaskQueue extends TaskQueue {} diff --git a/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/BytesTaskQueueAdapter.java b/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/BytesTaskQueueAdapter.java deleted file mode 100644 index 8eeafc4ce4..0000000000 --- a/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/BytesTaskQueueAdapter.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Copyright 2019 ConsenSys AG. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package tech.pegasys.pantheon.services.queue; - -import tech.pegasys.pantheon.util.bytes.BytesValue; - -import java.io.IOException; -import java.util.function.Function; - -public class BytesTaskQueueAdapter implements TaskQueue { - - private final BytesTaskQueue queue; - private final Function serializer; - private final Function deserializer; - - public BytesTaskQueueAdapter( - final BytesTaskQueue queue, - final Function serializer, - final Function deserializer) { - this.queue = queue; - this.serializer = serializer; - this.deserializer = deserializer; - } - - @Override - public void enqueue(final T taskData) { - queue.enqueue(serializer.apply(taskData)); - } - - @Override - public Task dequeue() { - Task task = queue.dequeue(); - if (task == null) { - return null; - } - - T data = deserializer.apply(task.getData()); - return new AdapterTask<>(task, data); - } - - @Override - public long size() { - return queue.size(); - } - - @Override - public boolean isEmpty() { - return queue.isEmpty(); - } - - @Override - public void clear() { - queue.clear(); - } - - @Override - public boolean allTasksCompleted() { - return queue.allTasksCompleted(); - } - - @Override - public void close() throws IOException { - queue.close(); - } - - private static class AdapterTask implements Task { - private final Task wrappedTask; - private final T data; - - public AdapterTask(final Task wrappedTask, final T data) { - this.wrappedTask = wrappedTask; - this.data = data; - } - - @Override - public T getData() { - return data; - } - - @Override - public void markCompleted() { - wrappedTask.markCompleted(); - } - - @Override - public void markFailed() { - wrappedTask.markFailed(); - } - } -} diff --git a/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/RocksDbTaskQueue.java b/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/RocksDbTaskQueue.java index 8ed91b8b5a..6a6c21e7a2 100644 --- a/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/RocksDbTaskQueue.java +++ b/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/RocksDbTaskQueue.java @@ -24,13 +24,14 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; import com.google.common.primitives.Longs; import org.rocksdb.Options; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; -public class RocksDbTaskQueue implements BytesTaskQueue { +public class RocksDbTaskQueue implements TaskQueue { private final Options options; private final RocksDB db; @@ -38,14 +39,23 @@ public class RocksDbTaskQueue implements BytesTaskQueue { private final AtomicLong lastEnqueuedKey = new AtomicLong(0); private final AtomicLong lastDequeuedKey = new AtomicLong(0); private final AtomicLong oldestKey = new AtomicLong(0); - private final Set outstandingTasks = new HashSet<>(); + private final Set> outstandingTasks = new HashSet<>(); private final AtomicBoolean closed = new AtomicBoolean(false); + private final Function serializer; + private final Function deserializer; + private final OperationTimer enqueueLatency; private final OperationTimer dequeueLatency; - private RocksDbTaskQueue(final Path storageDirectory, final MetricsSystem metricsSystem) { + private RocksDbTaskQueue( + final Path storageDirectory, + final Function serializer, + final Function deserializer, + final MetricsSystem metricsSystem) { + this.serializer = serializer; + this.deserializer = deserializer; try { RocksDbUtil.loadNativeLibrary(); options = new Options().setCreateIfMissing(true); @@ -66,40 +76,43 @@ private RocksDbTaskQueue(final Path storageDirectory, final MetricsSystem metric } } - public static RocksDbTaskQueue create( - final Path storageDirectory, final MetricsSystem metricsSystem) { - return new RocksDbTaskQueue(storageDirectory, metricsSystem); + public static RocksDbTaskQueue create( + final Path storageDirectory, + final Function serializer, + final Function deserializer, + final MetricsSystem metricsSystem) { + return new RocksDbTaskQueue<>(storageDirectory, serializer, deserializer, metricsSystem); } @Override - public synchronized void enqueue(final BytesValue taskData) { + public synchronized void enqueue(final T taskData) { assertNotClosed(); try (final OperationTimer.TimingContext ignored = enqueueLatency.startTimer()) { - byte[] key = Longs.toByteArray(lastEnqueuedKey.incrementAndGet()); - db.put(key, taskData.getArrayUnsafe()); - } catch (RocksDBException e) { + final byte[] key = Longs.toByteArray(lastEnqueuedKey.incrementAndGet()); + db.put(key, serializer.apply(taskData).getArrayUnsafe()); + } catch (final RocksDBException e) { throw new StorageException(e); } } @Override - public synchronized Task dequeue() { + public synchronized Task dequeue() { assertNotClosed(); if (isEmpty()) { return null; } try (final OperationTimer.TimingContext ignored = dequeueLatency.startTimer()) { - long key = lastDequeuedKey.incrementAndGet(); - byte[] value = db.get(Longs.toByteArray(key)); + final long key = lastDequeuedKey.incrementAndGet(); + final byte[] value = db.get(Longs.toByteArray(key)); if (value == null) { throw new IllegalStateException("Next expected value is missing"); } - BytesValue data = BytesValue.of(value); - RocksDbTask task = new RocksDbTask(this, data, key); + final BytesValue data = BytesValue.of(value); + final RocksDbTask task = new RocksDbTask<>(this, deserializer.apply(data), key); outstandingTasks.add(task); return task; - } catch (RocksDBException e) { + } catch (final RocksDBException e) { throw new StorageException(e); } } @@ -120,14 +133,14 @@ public synchronized boolean isEmpty() { public synchronized void clear() { assertNotClosed(); outstandingTasks.clear(); - byte[] from = Longs.toByteArray(oldestKey.get()); - byte[] to = Longs.toByteArray(lastEnqueuedKey.get() + 1); + final byte[] from = Longs.toByteArray(oldestKey.get()); + final byte[] to = Longs.toByteArray(lastEnqueuedKey.get() + 1); try { db.deleteRange(from, to); lastDequeuedKey.set(0); lastEnqueuedKey.set(0); oldestKey.set(0); - } catch (RocksDBException e) { + } catch (final RocksDBException e) { throw new StorageException(e); } } @@ -138,7 +151,7 @@ public synchronized boolean allTasksCompleted() { } private synchronized void deleteCompletedTasks() { - long oldestOutstandingKey = + final long oldestOutstandingKey = outstandingTasks.stream() .min(Comparator.comparingLong(RocksDbTask::getKey)) .map(RocksDbTask::getKey) @@ -146,12 +159,12 @@ private synchronized void deleteCompletedTasks() { if (oldestKey.get() < oldestOutstandingKey) { // Delete all contiguous completed tasks - byte[] fromKey = Longs.toByteArray(oldestKey.get()); - byte[] toKey = Longs.toByteArray(oldestOutstandingKey); + final byte[] fromKey = Longs.toByteArray(oldestKey.get()); + final byte[] toKey = Longs.toByteArray(oldestOutstandingKey); try { db.deleteRange(fromKey, toKey); oldestKey.set(oldestOutstandingKey); - } catch (RocksDBException e) { + } catch (final RocksDBException e) { throw new StorageException(e); } } @@ -171,7 +184,7 @@ private void assertNotClosed() { } } - private synchronized boolean markTaskCompleted(final RocksDbTask task) { + private synchronized boolean markTaskCompleted(final RocksDbTask task) { if (outstandingTasks.remove(task)) { deleteCompletedTasks(); return true; @@ -179,7 +192,7 @@ private synchronized boolean markTaskCompleted(final RocksDbTask task) { return false; } - private synchronized void handleFailedTask(final RocksDbTask task) { + private synchronized void handleFailedTask(final RocksDbTask task) { if (markTaskCompleted(task)) { enqueue(task.getData()); } @@ -191,13 +204,13 @@ public static class StorageException extends RuntimeException { } } - private static class RocksDbTask implements Task { + private static class RocksDbTask implements Task { private final AtomicBoolean completed = new AtomicBoolean(false); - private final RocksDbTaskQueue parentQueue; - private final BytesValue data; + private final RocksDbTaskQueue parentQueue; + private final T data; private final long key; - private RocksDbTask(final RocksDbTaskQueue parentQueue, final BytesValue data, final long key) { + private RocksDbTask(final RocksDbTaskQueue parentQueue, final T data, final long key) { this.parentQueue = parentQueue; this.data = data; this.key = key; @@ -208,7 +221,7 @@ public long getKey() { } @Override - public BytesValue getData() { + public T getData() { return data; } diff --git a/services/queue/src/test/java/tech/pegasys/pantheon/services/queue/BytesTaskQueueAdapterTest.java b/services/queue/src/test/java/tech/pegasys/pantheon/services/queue/BytesTaskQueueAdapterTest.java deleted file mode 100644 index 40f055d6dd..0000000000 --- a/services/queue/src/test/java/tech/pegasys/pantheon/services/queue/BytesTaskQueueAdapterTest.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright 2019 ConsenSys AG. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package tech.pegasys.pantheon.services.queue; - -import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; -import tech.pegasys.pantheon.util.bytes.BytesValue; - -import java.util.function.Function; - -import org.junit.Rule; -import org.junit.rules.TemporaryFolder; - -public class BytesTaskQueueAdapterTest extends AbstractTaskQueueTest> { - - @Rule public final TemporaryFolder folder = new TemporaryFolder(); - - @Override - protected TaskQueue createQueue() throws Exception { - BytesTaskQueue queue = - RocksDbTaskQueue.create(folder.newFolder().toPath(), new NoOpMetricsSystem()); - return new BytesTaskQueueAdapter<>(queue, Function.identity(), Function.identity()); - } -} diff --git a/services/queue/src/test/java/tech/pegasys/pantheon/services/queue/RocksDbTaskQueueTest.java b/services/queue/src/test/java/tech/pegasys/pantheon/services/queue/RocksDbTaskQueueTest.java index bf3b5c65de..4cbb725a05 100644 --- a/services/queue/src/test/java/tech/pegasys/pantheon/services/queue/RocksDbTaskQueueTest.java +++ b/services/queue/src/test/java/tech/pegasys/pantheon/services/queue/RocksDbTaskQueueTest.java @@ -13,18 +13,24 @@ package tech.pegasys.pantheon.services.queue; import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; +import tech.pegasys.pantheon.util.bytes.BytesValue; import java.io.IOException; +import java.util.function.Function; import org.junit.Rule; import org.junit.rules.TemporaryFolder; -public class RocksDbTaskQueueTest extends AbstractTaskQueueTest { +public class RocksDbTaskQueueTest extends AbstractTaskQueueTest> { @Rule public final TemporaryFolder folder = new TemporaryFolder(); @Override - protected RocksDbTaskQueue createQueue() throws IOException { - return RocksDbTaskQueue.create(folder.newFolder().toPath(), new NoOpMetricsSystem()); + protected RocksDbTaskQueue createQueue() throws IOException { + return RocksDbTaskQueue.create( + folder.newFolder().toPath(), + Function.identity(), + Function.identity(), + new NoOpMetricsSystem()); } }