diff --git a/ethereum/eth/src/jmh/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderBenchmark.java b/ethereum/eth/src/jmh/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderBenchmark.java index ee328bfb9a..9f92ef29b9 100644 --- a/ethereum/eth/src/jmh/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderBenchmark.java +++ b/ethereum/eth/src/jmh/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderBenchmark.java @@ -37,6 +37,9 @@ import tech.pegasys.pantheon.util.bytes.BytesValue; import java.nio.file.Path; +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneOffset; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -104,6 +107,8 @@ public void setUpUnchangedState() throws Exception { syncConfig.getWorldStateHashCountPerRequest(), syncConfig.getWorldStateRequestParallelism(), syncConfig.getWorldStateMaxRequestsWithoutProgress(), + syncConfig.getWorldStateMinMillisBeforeStalling(), + Clock.fixed(Instant.ofEpochSecond(1000), ZoneOffset.UTC), metricsSystem); } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java index 7f91526b94..6b6c9b9ca5 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java @@ -30,6 +30,7 @@ import tech.pegasys.pantheon.util.Subscribers; import java.nio.file.Path; +import java.time.Clock; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; @@ -55,6 +56,7 @@ public DefaultSynchronizer( final EthContext ethContext, final SyncState syncState, final Path dataDirectory, + final Clock clock, final MetricsSystem metricsSystem) { this.syncState = syncState; @@ -89,7 +91,8 @@ public DefaultSynchronizer( metricsSystem, ethContext, worldStateStorage, - syncState); + syncState, + clock); } private TrailingPeerRequirements calculateTrailingPeerRequirements() { diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/FastSynchronizer.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/FastSynchronizer.java index 2b06cb6242..477c25d0af 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/FastSynchronizer.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/FastSynchronizer.java @@ -33,6 +33,7 @@ import java.io.File; import java.io.IOException; import java.nio.file.Path; +import java.time.Clock; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -71,7 +72,8 @@ public static Optional> create( final MetricsSystem metricsSystem, final EthContext ethContext, final WorldStateStorage worldStateStorage, - final SyncState syncState) { + final SyncState syncState, + final Clock clock) { if (syncConfig.syncMode() != SyncMode.FAST) { return Optional.empty(); } @@ -100,6 +102,8 @@ public static Optional> create( syncConfig.getWorldStateHashCountPerRequest(), syncConfig.getWorldStateRequestParallelism(), syncConfig.getWorldStateMaxRequestsWithoutProgress(), + syncConfig.getWorldStateMinMillisBeforeStalling(), + clock, metricsSystem); final FastSyncDownloader fastSyncDownloader = new FastSyncDownloader<>( diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java index 2199cc08d2..0b30180f44 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java @@ -17,6 +17,7 @@ import tech.pegasys.pantheon.util.uint.UInt256; import java.time.Duration; +import java.util.concurrent.TimeUnit; import com.google.common.collect.Range; @@ -29,7 +30,9 @@ public class SynchronizerConfiguration { private static final Duration DEFAULT_FAST_SYNC_MAXIMUM_PEER_WAIT_TIME = Duration.ofMinutes(5); private static final int DEFAULT_WORLD_STATE_HASH_COUNT_PER_REQUEST = 384; private static final int DEFAULT_WORLD_STATE_REQUEST_PARALLELISM = 10; - private static final int DEFAULT_WORLD_STATE_MAX_REQUESTS_WITHOUT_PROGRESS = 100; + private static final int DEFAULT_WORLD_STATE_MAX_REQUESTS_WITHOUT_PROGRESS = 1000; + private static final long DEFAULT_WORLD_STATE_MIN_MILLIS_BEFORE_STALLING = + TimeUnit.MINUTES.toMillis(5); // Fast sync config private final int fastSyncPivotDistance; @@ -57,6 +60,7 @@ public class SynchronizerConfiguration { private final int transactionsParallelism; private final int computationParallelism; private final int maxTrailingPeers; + private final long worldStateMinMillisBeforeStalling; private SynchronizerConfiguration( final int fastSyncPivotDistance, @@ -66,6 +70,7 @@ private SynchronizerConfiguration( final int worldStateHashCountPerRequest, final int worldStateRequestParallelism, final int worldStateMaxRequestsWithoutProgress, + final long worldStateMinMillisBeforeStalling, final Range blockPropagationRange, final SyncMode syncMode, final long downloaderChangeTargetThresholdByHeight, @@ -85,6 +90,7 @@ private SynchronizerConfiguration( this.worldStateHashCountPerRequest = worldStateHashCountPerRequest; this.worldStateRequestParallelism = worldStateRequestParallelism; this.worldStateMaxRequestsWithoutProgress = worldStateMaxRequestsWithoutProgress; + this.worldStateMinMillisBeforeStalling = worldStateMinMillisBeforeStalling; this.blockPropagationRange = blockPropagationRange; this.syncMode = syncMode; this.downloaderChangeTargetThresholdByHeight = downloaderChangeTargetThresholdByHeight; @@ -199,6 +205,10 @@ public int getWorldStateMaxRequestsWithoutProgress() { return worldStateMaxRequestsWithoutProgress; } + public long getWorldStateMinMillisBeforeStalling() { + return worldStateMinMillisBeforeStalling; + } + public int getMaxTrailingPeers() { return maxTrailingPeers; } @@ -222,6 +232,7 @@ public static class Builder { private int worldStateRequestParallelism = DEFAULT_WORLD_STATE_REQUEST_PARALLELISM; private int worldStateMaxRequestsWithoutProgress = DEFAULT_WORLD_STATE_MAX_REQUESTS_WITHOUT_PROGRESS; + private long worldStateMinMillisBeforeStalling = DEFAULT_WORLD_STATE_MIN_MILLIS_BEFORE_STALLING; private Duration fastSyncMaximumPeerWaitTime = DEFAULT_FAST_SYNC_MAXIMUM_PEER_WAIT_TIME; private int maxTrailingPeers = Integer.MAX_VALUE; @@ -335,6 +346,7 @@ public SynchronizerConfiguration build() { worldStateHashCountPerRequest, worldStateRequestParallelism, worldStateMaxRequestsWithoutProgress, + worldStateMinMillisBeforeStalling, blockPropagationRange, syncMode, downloaderChangeTargetThresholdByHeight, diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldDownloadState.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldDownloadState.java index 4a5dabff40..18eda27258 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldDownloadState.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldDownloadState.java @@ -22,6 +22,7 @@ import tech.pegasys.pantheon.util.ExceptionUtils; import tech.pegasys.pantheon.util.bytes.BytesValue; +import java.time.Clock; import java.util.Collection; import java.util.Collections; import java.util.Set; @@ -38,21 +39,29 @@ class WorldDownloadState { private final boolean downloadWasResumed; private final CachingTaskCollection pendingRequests; private final int maxRequestsWithoutProgress; + private final Clock clock; private final Set> outstandingRequests = Collections.newSetFromMap(new ConcurrentHashMap<>()); private final CompletableFuture internalFuture; private final CompletableFuture downloadFuture; // Volatile so monitoring can access it without having to synchronize. private volatile int requestsSinceLastProgress = 0; + private final long minMillisBeforeStalling; + private volatile long timestampOfLastProgress; private BytesValue rootNodeData; private WorldStateDownloadProcess worldStateDownloadProcess; public WorldDownloadState( final CachingTaskCollection pendingRequests, - final int maxRequestsWithoutProgress) { + final int maxRequestsWithoutProgress, + final long minMillisBeforeStalling, + final Clock clock) { + this.minMillisBeforeStalling = minMillisBeforeStalling; + this.timestampOfLastProgress = clock.millis(); this.downloadWasResumed = !pendingRequests.isEmpty(); this.pendingRequests = pendingRequests; this.maxRequestsWithoutProgress = maxRequestsWithoutProgress; + this.clock = clock; this.internalFuture = new CompletableFuture<>(); this.downloadFuture = new CompletableFuture<>(); this.internalFuture.whenComplete(this::cleanup); @@ -147,9 +156,11 @@ public synchronized void setRootNodeData(final BytesValue rootNodeData) { public synchronized void requestComplete(final boolean madeProgress) { if (madeProgress) { requestsSinceLastProgress = 0; + timestampOfLastProgress = clock.millis(); } else { requestsSinceLastProgress++; - if (requestsSinceLastProgress >= maxRequestsWithoutProgress) { + if (requestsSinceLastProgress >= maxRequestsWithoutProgress + && timestampOfLastProgress + minMillisBeforeStalling < clock.millis()) { markAsStalled(maxRequestsWithoutProgress); } } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java index 76361725a9..ad49c994a0 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java @@ -20,6 +20,7 @@ import tech.pegasys.pantheon.metrics.MetricsSystem; import tech.pegasys.pantheon.services.tasks.CachingTaskCollection; +import java.time.Clock; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -31,6 +32,8 @@ public class WorldStateDownloader { private static final Logger LOG = LogManager.getLogger(); + private final long minMillisBeforeStalling; + private final Clock clock; private final MetricsSystem metricsSystem; private final EthContext ethContext; @@ -49,6 +52,8 @@ public WorldStateDownloader( final int hashCountPerRequest, final int maxOutstandingRequests, final int maxNodeRequestsWithoutProgress, + final long minMillisBeforeStalling, + final Clock clock, final MetricsSystem metricsSystem) { this.ethContext = ethContext; this.worldStateStorage = worldStateStorage; @@ -56,6 +61,8 @@ public WorldStateDownloader( this.hashCountPerRequest = hashCountPerRequest; this.maxOutstandingRequests = maxOutstandingRequests; this.maxNodeRequestsWithoutProgress = maxNodeRequestsWithoutProgress; + this.minMillisBeforeStalling = minMillisBeforeStalling; + this.clock = clock; this.metricsSystem = metricsSystem; metricsSystem.createIntegerGauge( @@ -105,7 +112,8 @@ public CompletableFuture run(final BlockHeader header) { stateRoot); final WorldDownloadState newDownloadState = - new WorldDownloadState(taskCollection, maxNodeRequestsWithoutProgress); + new WorldDownloadState( + taskCollection, maxNodeRequestsWithoutProgress, minMillisBeforeStalling, clock); this.downloadState.set(newDownloadState); if (!newDownloadState.downloadWasResumed()) { diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldDownloadStateTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldDownloadStateTest.java index 9a63bf1ac2..898d308701 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldDownloadStateTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldDownloadStateTest.java @@ -14,6 +14,7 @@ import static java.util.Collections.singletonList; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static tech.pegasys.pantheon.ethereum.eth.sync.worldstate.NodeDataRequest.createAccountDataRequest; @@ -27,9 +28,11 @@ import tech.pegasys.pantheon.services.kvstore.InMemoryKeyValueStorage; import tech.pegasys.pantheon.services.tasks.CachingTaskCollection; import tech.pegasys.pantheon.services.tasks.InMemoryTaskQueue; +import tech.pegasys.pantheon.testutil.TestClock; import tech.pegasys.pantheon.util.bytes.BytesValue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import org.junit.Before; import org.junit.Test; @@ -39,6 +42,7 @@ public class WorldDownloadStateTest { private static final BytesValue ROOT_NODE_DATA = BytesValue.of(1, 2, 3, 4); private static final Hash ROOT_NODE_HASH = Hash.hash(ROOT_NODE_DATA); private static final int MAX_REQUESTS_WITHOUT_PROGRESS = 10; + private static final long MIN_MILLIS_BEFORE_STALLING = 50_000; private final WorldStateStorage worldStateStorage = new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()); @@ -50,8 +54,10 @@ public class WorldDownloadStateTest { private final WorldStateDownloadProcess worldStateDownloadProcess = mock(WorldStateDownloadProcess.class); + private final TestClock clock = new TestClock(); private final WorldDownloadState downloadState = - new WorldDownloadState(pendingRequests, MAX_REQUESTS_WITHOUT_PROGRESS); + new WorldDownloadState( + pendingRequests, MAX_REQUESTS_WITHOUT_PROGRESS, MIN_MILLIS_BEFORE_STALLING, clock); private final CompletableFuture future = downloadState.getDownloadFuture(); @@ -121,6 +127,7 @@ public void shouldResetRequestsSinceProgressCountWhenProgressIsMade() { downloadState.requestComplete(false); downloadState.requestComplete(true); + clock.stepMillis(MIN_MILLIS_BEFORE_STALLING + 1); for (int i = 0; i < MAX_REQUESTS_WITHOUT_PROGRESS - 1; i++) { downloadState.requestComplete(false); @@ -128,11 +135,50 @@ public void shouldResetRequestsSinceProgressCountWhenProgressIsMade() { } downloadState.requestComplete(false); - assertThat(downloadState.getDownloadFuture()).isCompletedExceptionally(); + assertWorldStateStalled(downloadState); } @Test - public void shouldNotAddRequestsAfterDownloadIsStalled() { + public void shouldNotBeStalledWhenMaxRequestsReachedUntilMinimumTimeAlsoReached() { + for (int i = 0; i < MAX_REQUESTS_WITHOUT_PROGRESS; i++) { + downloadState.requestComplete(false); + assertThat(downloadState.getDownloadFuture()).isNotDone(); + } + + // Exceeding the requests without progress limit doesn't trigger stalled state + downloadState.requestComplete(false); + assertThat(downloadState.getDownloadFuture()).isNotDone(); + + // Until the minimum time has elapsed, then the next request with no progress marks as stalled + clock.stepMillis(MIN_MILLIS_BEFORE_STALLING + 1); + downloadState.requestComplete(false); + assertWorldStateStalled(downloadState); + } + + @Test + public void shouldNotBeStalledIfMinimumTimeIsReachedButMaximumRequestsIsNot() { + clock.stepMillis(MIN_MILLIS_BEFORE_STALLING + 1); + downloadState.requestComplete(false); + assertThat(downloadState.getDownloadFuture()).isNotDone(); + } + + @Test + public void shouldResetTimeSinceProgressWhenProgressIsMade() { + // Enough time has progressed but the next request makes progress so we are not stalled. + clock.stepMillis(MIN_MILLIS_BEFORE_STALLING + 1); + downloadState.requestComplete(true); + assertThat(downloadState.getDownloadFuture()).isNotDone(); + + // We then reach the max number of requests without progress but the timer should have reset + for (int i = 0; i < MAX_REQUESTS_WITHOUT_PROGRESS; i++) { + downloadState.requestComplete(false); + assertThat(downloadState.getDownloadFuture()).isNotDone(); + } + assertThat(downloadState.getDownloadFuture()).isNotDone(); + } + + @Test + public void shouldNotAddRequestsAfterDownloadIsCompleted() { downloadState.checkCompletion(worldStateStorage, header); downloadState.enqueueRequests(singletonList(createAccountDataRequest(Hash.EMPTY_TRIE_HASH))); @@ -140,4 +186,12 @@ public void shouldNotAddRequestsAfterDownloadIsStalled() { assertThat(pendingRequests.isEmpty()).isTrue(); } + + private void assertWorldStateStalled(final WorldDownloadState state) { + final CompletableFuture future = state.getDownloadFuture(); + assertThat(future).isCompletedExceptionally(); + assertThatThrownBy(future::get) + .isInstanceOf(ExecutionException.class) + .hasRootCauseInstanceOf(StalledDownloadException.class); + } } diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java index 5710cf1a21..648d403175 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java @@ -60,6 +60,7 @@ import tech.pegasys.pantheon.services.kvstore.InMemoryKeyValueStorage; import tech.pegasys.pantheon.services.tasks.CachingTaskCollection; import tech.pegasys.pantheon.services.tasks.InMemoryTaskQueue; +import tech.pegasys.pantheon.testutil.TestClock; import tech.pegasys.pantheon.util.bytes.Bytes32; import tech.pegasys.pantheon.util.bytes.BytesValue; import tech.pegasys.pantheon.util.uint.UInt256; @@ -974,6 +975,8 @@ private WorldStateDownloader createDownloader( config.getWorldStateHashCountPerRequest(), config.getWorldStateRequestParallelism(), config.getWorldStateMaxRequestsWithoutProgress(), + config.getWorldStateMinMillisBeforeStalling(), + TestClock.fixed(), new NoOpMetricsSystem()); } diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/controller/CliquePantheonController.java b/pantheon/src/main/java/tech/pegasys/pantheon/controller/CliquePantheonController.java index 243c9d485a..adbd4c3fb3 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/controller/CliquePantheonController.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/controller/CliquePantheonController.java @@ -168,6 +168,7 @@ static PantheonController init( ethProtocolManager.ethContext(), syncState, dataDirectory, + clock, metricsSystem); final TransactionPool transactionPool = diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftLegacyPantheonController.java b/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftLegacyPantheonController.java index 7c01b85401..ee95b5791a 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftLegacyPantheonController.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftLegacyPantheonController.java @@ -160,6 +160,7 @@ static PantheonController init( istanbul64ProtocolManager.ethContext(), syncState, dataDirectory, + clock, metricsSystem); final Runnable closer = diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java b/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java index c0e890e0c1..30fe91c62d 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java @@ -197,6 +197,7 @@ static PantheonController init( ethProtocolManager.ethContext(), syncState, dataDirectory, + clock, metricsSystem); final TransactionPool transactionPool = diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/controller/MainnetPantheonController.java b/pantheon/src/main/java/tech/pegasys/pantheon/controller/MainnetPantheonController.java index 46e0acecab..e86c1e097d 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/controller/MainnetPantheonController.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/controller/MainnetPantheonController.java @@ -135,6 +135,7 @@ public static PantheonController init( ethProtocolManager.ethContext(), syncState, dataDirectory, + clock, metricsSystem); final OptionalLong daoBlock = genesisConfig.getConfigOptions().getDaoForkBlock(); diff --git a/testutil/src/main/java/tech/pegasys/pantheon/testutil/TestClock.java b/testutil/src/main/java/tech/pegasys/pantheon/testutil/TestClock.java index 73e495d07b..d890621db6 100644 --- a/testutil/src/main/java/tech/pegasys/pantheon/testutil/TestClock.java +++ b/testutil/src/main/java/tech/pegasys/pantheon/testutil/TestClock.java @@ -15,9 +15,31 @@ import java.time.Clock; import java.time.Instant; import java.time.ZoneId; +import java.time.ZoneOffset; -public class TestClock { +public class TestClock extends Clock { public static Clock fixed() { return Clock.fixed(Instant.ofEpochSecond(10_000_000), ZoneId.systemDefault()); } + + private Instant now = Instant.ofEpochSecond(24982948294L); + + @Override + public ZoneId getZone() { + return ZoneOffset.UTC; + } + + @Override + public Clock withZone(final ZoneId zone) { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public Instant instant() { + return now; + } + + public void stepMillis(final long millis) { + now = now.plusMillis(millis); + } }