diff --git a/ethereum/eth/src/jmh/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderBenchmark.java b/ethereum/eth/src/jmh/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderBenchmark.java index 4825a28ac9..c796ab6b9b 100644 --- a/ethereum/eth/src/jmh/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderBenchmark.java +++ b/ethereum/eth/src/jmh/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderBenchmark.java @@ -101,7 +101,7 @@ public void setUpUnchangedState() throws Exception { pendingRequests, syncConfig.getWorldStateHashCountPerRequest(), syncConfig.getWorldStateRequestParallelism(), - syncConfig.getWorldStateRequestMaxRetries(), + syncConfig.getWorldStateMaxRequestsWithoutProgress(), metricsSystem); } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/FastSynchronizer.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/FastSynchronizer.java index 74b393f5c8..1af576dec0 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/FastSynchronizer.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/FastSynchronizer.java @@ -97,7 +97,7 @@ public static Optional> create( stateQueue, syncConfig.getWorldStateHashCountPerRequest(), syncConfig.getWorldStateRequestParallelism(), - syncConfig.getWorldStateRequestMaxRetries(), + syncConfig.getWorldStateMaxRequestsWithoutProgress(), metricsSystem); final FastSyncDownloader fastSyncDownloader = new FastSyncDownloader<>( diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java index 01aefba71c..3352045ca6 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java @@ -29,7 +29,7 @@ public class SynchronizerConfiguration { private static final Duration DEFAULT_FAST_SYNC_MAXIMUM_PEER_WAIT_TIME = Duration.ofMinutes(5); private static final int DEFAULT_WORLD_STATE_HASH_COUNT_PER_REQUEST = 384; private static final int DEFAULT_WORLD_STATE_REQUEST_PARALLELISM = 10; - private static final int DEFAULT_WORLD_STATE_REQUEST_MAX_RETRIES = 25; + private static final int DEFAULT_WORLD_STATE_MAX_REQUESTS_WITHOUT_PROGRESS = 100; // Fast sync config private final int fastSyncPivotDistance; @@ -38,7 +38,7 @@ public class SynchronizerConfiguration { private final Duration fastSyncMaximumPeerWaitTime; private final int worldStateHashCountPerRequest; private final int worldStateRequestParallelism; - private final int worldStateRequestMaxRetries; + private final int worldStateMaxRequestsWithoutProgress; // Block propagation config private final Range blockPropagationRange; @@ -66,7 +66,7 @@ private SynchronizerConfiguration( final Duration fastSyncMaximumPeerWaitTime, final int worldStateHashCountPerRequest, final int worldStateRequestParallelism, - final int worldStateRequestMaxRetries, + final int worldStateMaxRequestsWithoutProgress, final Range blockPropagationRange, final SyncMode syncMode, final long downloaderChangeTargetThresholdByHeight, @@ -86,7 +86,7 @@ private SynchronizerConfiguration( this.fastSyncMaximumPeerWaitTime = fastSyncMaximumPeerWaitTime; this.worldStateHashCountPerRequest = worldStateHashCountPerRequest; this.worldStateRequestParallelism = worldStateRequestParallelism; - this.worldStateRequestMaxRetries = worldStateRequestMaxRetries; + this.worldStateMaxRequestsWithoutProgress = worldStateMaxRequestsWithoutProgress; this.blockPropagationRange = blockPropagationRange; this.syncMode = syncMode; this.downloaderChangeTargetThresholdByHeight = downloaderChangeTargetThresholdByHeight; @@ -211,8 +211,8 @@ public int getWorldStateRequestParallelism() { return worldStateRequestParallelism; } - public int getWorldStateRequestMaxRetries() { - return worldStateRequestMaxRetries; + public int getWorldStateMaxRequestsWithoutProgress() { + return worldStateMaxRequestsWithoutProgress; } public static class Builder { @@ -234,7 +234,8 @@ public static class Builder { private int fastSyncMinimumPeerCount = DEFAULT_FAST_SYNC_MINIMUM_PEERS; private int worldStateHashCountPerRequest = DEFAULT_WORLD_STATE_HASH_COUNT_PER_REQUEST; private int worldStateRequestParallelism = DEFAULT_WORLD_STATE_REQUEST_PARALLELISM; - private int worldStateRequestMaxRetries = DEFAULT_WORLD_STATE_REQUEST_MAX_RETRIES; + private int worldStateMaxRequestsWithoutProgress = + DEFAULT_WORLD_STATE_MAX_REQUESTS_WITHOUT_PROGRESS; private Duration fastSyncMaximumPeerWaitTime = DEFAULT_FAST_SYNC_MAXIMUM_PEER_WAIT_TIME; public Builder fastSyncPivotDistance(final int distance) { @@ -332,8 +333,9 @@ public Builder worldStateRequestParallelism(final int worldStateRequestParalleli return this; } - public Builder worldStateRequestMaxRetries(final int worldStateRequestMaxRetries) { - this.worldStateRequestMaxRetries = worldStateRequestMaxRetries; + public Builder worldStateMaxRequestsWithoutProgress( + final int worldStateMaxRequestsWithoutProgress) { + this.worldStateMaxRequestsWithoutProgress = worldStateMaxRequestsWithoutProgress; return this; } @@ -350,7 +352,7 @@ public SynchronizerConfiguration build() { fastSyncMaximumPeerWaitTime, worldStateHashCountPerRequest, worldStateRequestParallelism, - worldStateRequestMaxRetries, + worldStateMaxRequestsWithoutProgress, blockPropagationRange, syncMode, downloaderChangeTargetThresholdByHeight, diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/NodeDataRequest.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/NodeDataRequest.java index 501781d33a..9db0785a41 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/NodeDataRequest.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/NodeDataRequest.java @@ -22,7 +22,6 @@ import tech.pegasys.pantheon.util.bytes.BytesValue; import java.util.Optional; -import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; public abstract class NodeDataRequest { @@ -30,7 +29,6 @@ public abstract class NodeDataRequest { private final Hash hash; private BytesValue data; private boolean requiresPersisting = true; - private final AtomicInteger failedRequestCount = new AtomicInteger(0); protected NodeDataRequest(final RequestType requestType, final Hash hash) { this.requestType = requestType; @@ -58,7 +56,6 @@ public static NodeDataRequest deserialize(final BytesValue encoded) { in.enterList(); final RequestType requestType = RequestType.fromValue(in.readByte()); final Hash hash = Hash.wrap(in.readBytes32()); - final int failureCount = in.readIntScalar(); in.leaveList(); final NodeDataRequest deserialized; @@ -78,7 +75,6 @@ public static NodeDataRequest deserialize(final BytesValue encoded) { + NodeDataRequest.class.getSimpleName()); } - deserialized.setFailureCount(failureCount); return deserialized; } @@ -86,7 +82,6 @@ private void writeTo(final RLPOutput out) { out.startList(); out.writeByte(requestType.getValue()); out.writeBytesValue(hash); - out.writeIntScalar(failedRequestCount.get()); out.endList(); } @@ -112,14 +107,6 @@ public NodeDataRequest setRequiresPersisting(final boolean requiresPersisting) { return this; } - public int trackFailure() { - return failedRequestCount.incrementAndGet(); - } - - private void setFailureCount(final int failures) { - failedRequestCount.set(failures); - } - public final void persist(final WorldStateStorage.Updater updater) { if (requiresPersisting) { checkNotNull(getData(), "Must set data before node can be persisted."); diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldDownloadState.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldDownloadState.java index 5f0860a5ef..a5ad867f38 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldDownloadState.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldDownloadState.java @@ -40,11 +40,14 @@ class WorldDownloadState { private final TaskQueue pendingRequests; private final ArrayBlockingQueue> requestsToPersist; private final int maxOutstandingRequests; + private final int maxRequestsWithoutProgress; private final Set> outstandingRequests = Collections.newSetFromMap(new ConcurrentHashMap<>()); private final AtomicBoolean sendingRequests = new AtomicBoolean(false); private final CompletableFuture internalFuture; private final CompletableFuture downloadFuture; + // Volatile so monitoring can access it without having to synchronize. + private volatile int requestsSinceLastProgress = 0; private boolean waitingForNewPeer = false; private BytesValue rootNodeData; private EthTask persistenceTask; @@ -52,10 +55,12 @@ class WorldDownloadState { public WorldDownloadState( final TaskQueue pendingRequests, final ArrayBlockingQueue> requestsToPersist, - final int maxOutstandingRequests) { + final int maxOutstandingRequests, + final int maxRequestsWithoutProgress) { this.pendingRequests = pendingRequests; this.requestsToPersist = requestsToPersist; this.maxOutstandingRequests = maxOutstandingRequests; + this.maxRequestsWithoutProgress = maxRequestsWithoutProgress; this.internalFuture = new CompletableFuture<>(); this.downloadFuture = new CompletableFuture<>(); this.internalFuture.whenComplete(this::cleanup); @@ -183,11 +188,26 @@ public int getPersistenceQueueSize() { return requestsToPersist.size(); } - public synchronized void markAsStalled(final int maxNodeRequestRetries) { + public synchronized void requestComplete(final boolean madeProgress) { + if (madeProgress) { + requestsSinceLastProgress = 0; + } else { + requestsSinceLastProgress++; + if (requestsSinceLastProgress >= maxRequestsWithoutProgress) { + markAsStalled(maxRequestsWithoutProgress); + } + } + } + + public int getRequestsSinceLastProgress() { + return requestsSinceLastProgress; + } + + private synchronized void markAsStalled(final int maxNodeRequestRetries) { final String message = "Download stalled due to too many failures to retrieve node data (>" + maxNodeRequestRetries - + " failures)"; + + " requests without making progress)"; final WorldStateDownloaderException e = new StalledDownloadException(message); internalFuture.completeExceptionally(e); } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java index a7b3687c1d..e23884af8a 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java @@ -44,7 +44,6 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Supplier; @@ -60,13 +59,12 @@ public class WorldStateDownloader { private final Counter retriedRequestsCounter; private final Counter existingNodeCounter; private final MetricsSystem metricsSystem; - private final AtomicInteger highestRetryCount = new AtomicInteger(0); private final EthContext ethContext; private final TaskQueue taskQueue; private final int hashCountPerRequest; private final int maxOutstandingRequests; - private final int maxNodeRequestRetries; + private final int maxNodeRequestsWithoutProgress; private final WorldStateStorage worldStateStorage; private final AtomicReference downloadState = new AtomicReference<>(); @@ -77,14 +75,14 @@ public WorldStateDownloader( final TaskQueue taskQueue, final int hashCountPerRequest, final int maxOutstandingRequests, - final int maxNodeRequestRetries, + final int maxNodeRequestsWithoutProgress, final MetricsSystem metricsSystem) { this.ethContext = ethContext; this.worldStateStorage = worldStateStorage; this.taskQueue = taskQueue; this.hashCountPerRequest = hashCountPerRequest; this.maxOutstandingRequests = maxOutstandingRequests; - this.maxNodeRequestRetries = maxNodeRequestRetries; + this.maxNodeRequestsWithoutProgress = maxNodeRequestsWithoutProgress; this.metricsSystem = metricsSystem; metricsSystem.createLongGauge( @@ -112,9 +110,9 @@ public WorldStateDownloader( metricsSystem.createIntegerGauge( MetricCategory.SYNCHRONIZER, - "world_state_node_request_failures_max", - "Highest number of times a node data request has been retried in this download", - highestRetryCount::get); + "world_state_node_requests_since_last_progress_current", + "Number of world state requests made since the last time new data was returned", + downloadStateValue(WorldDownloadState::getRequestsSinceLastProgress)); metricsSystem.createIntegerGauge( MetricCategory.SYNCHRONIZER, @@ -158,14 +156,14 @@ public CompletableFuture run(final BlockHeader header) { } // Room for the requests we expect to do in parallel plus some buffer but not unlimited. - final int persistenceQueueCapacity = hashCountPerRequest * maxNodeRequestRetries * 2; + final int persistenceQueueCapacity = hashCountPerRequest * maxOutstandingRequests * 2; final WorldDownloadState newDownloadState = new WorldDownloadState( taskQueue, new ArrayBlockingQueue<>(persistenceQueueCapacity), - maxOutstandingRequests); + maxOutstandingRequests, + maxNodeRequestsWithoutProgress); this.downloadState.set(newDownloadState); - highestRetryCount.set(0); newDownloadState.enqueueRequest(NodeDataRequest.createAccountDataRequest(stateRoot)); @@ -301,19 +299,15 @@ private void storeData( final BlockHeader blockHeader, final Map data, final WorldDownloadState downloadState) { + boolean madeProgress = false; for (final Task task : requestTasks) { final NodeDataRequest request = task.getData(); final BytesValue matchingData = data.get(request.getHash()); if (matchingData == null) { retriedRequestsCounter.inc(); - final int requestFailures = request.trackFailure(); - updateHighestRetryCount(requestFailures); task.markFailed(); - if (requestFailures > maxNodeRequestRetries) { - LOG.info("Unavailable node {}", request.getHash()); - downloadState.markAsStalled(maxNodeRequestRetries); - } } else { + madeProgress = true; request.setData(matchingData); if (isRootState(blockHeader, request)) { downloadState.enqueueRequests(request.getChildRequests()); @@ -324,17 +318,10 @@ private void storeData( } } } + downloadState.requestComplete(madeProgress); requestNodeData(blockHeader, downloadState); } - private void updateHighestRetryCount(final int requestFailures) { - int previousHighestRetry = highestRetryCount.get(); - while (requestFailures > previousHighestRetry) { - highestRetryCount.compareAndSet(previousHighestRetry, requestFailures); - previousHighestRetry = highestRetryCount.get(); - } - } - private boolean isRootState(final BlockHeader blockHeader, final NodeDataRequest request) { return request.getHash().equals(blockHeader.getStateRoot()); } diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldDownloadStateTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldDownloadStateTest.java index c89d625337..66da576aed 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldDownloadStateTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldDownloadStateTest.java @@ -44,6 +44,7 @@ public class WorldDownloadStateTest { private static final BytesValue ROOT_NODE_DATA = BytesValue.of(1, 2, 3, 4); private static final Hash ROOT_NODE_HASH = Hash.hash(ROOT_NODE_DATA); private static final int MAX_OUTSTANDING_REQUESTS = 3; + private static final int MAX_REQUESTS_WITHOUT_PROGRESS = 10; private final WorldStateStorage worldStateStorage = new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()); @@ -55,7 +56,11 @@ public class WorldDownloadStateTest { new ArrayBlockingQueue<>(100); private final WorldDownloadState downloadState = - new WorldDownloadState(pendingRequests, requestsToPersist, MAX_OUTSTANDING_REQUESTS); + new WorldDownloadState( + pendingRequests, + requestsToPersist, + MAX_OUTSTANDING_REQUESTS, + MAX_REQUESTS_WITHOUT_PROGRESS); private final CompletableFuture future = downloadState.getDownloadFuture(); @@ -181,10 +186,26 @@ public void shouldStopSendingAdditionalRequestsWhenFutureIsCancelled() { @Test public void shouldStopSendingAdditionalRequestsWhenDownloadIsMarkedAsStalled() { pendingRequests.enqueue(createAccountDataRequest(Hash.EMPTY_TRIE_HASH)); - final Runnable sendRequest = mockWithAction(() -> downloadState.markAsStalled(1)); + final Runnable sendRequest = mockWithAction(() -> downloadState.requestComplete(false)); downloadState.whileAdditionalRequestsCanBeSent(sendRequest); - verify(sendRequest, times(1)).run(); + verify(sendRequest, times(MAX_REQUESTS_WITHOUT_PROGRESS)).run(); + } + + @Test + public void shouldResetRequestsSinceProgressCountWhenProgressIsMade() { + downloadState.requestComplete(false); + downloadState.requestComplete(false); + + downloadState.requestComplete(true); + + for (int i = 0; i < MAX_REQUESTS_WITHOUT_PROGRESS - 1; i++) { + downloadState.requestComplete(false); + assertThat(downloadState.getDownloadFuture()).isNotDone(); + } + + downloadState.requestComplete(false); + assertThat(downloadState.getDownloadFuture()).isCompletedExceptionally(); } @Test diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java index 34fa5491c3..64d400e3c3 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java @@ -633,7 +633,7 @@ public void stalledDownloader() { final WorldStateStorage localStorage = new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()); final SynchronizerConfiguration syncConfig = - SynchronizerConfiguration.builder().worldStateRequestMaxRetries(10).build(); + SynchronizerConfiguration.builder().worldStateMaxRequestsWithoutProgress(10).build(); final WorldStateDownloader downloader = createDownloader(syncConfig, ethProtocolManager.ethContext(), localStorage, queue); @@ -902,7 +902,7 @@ private WorldStateDownloader createDownloader( queue, config.getWorldStateHashCountPerRequest(), config.getWorldStateRequestParallelism(), - config.getWorldStateRequestMaxRetries(), + config.getWorldStateMaxRequestsWithoutProgress(), new NoOpMetricsSystem()); }