Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Don't mark world state as stalled until a minimum time without progress is reached #1179

Merged
merged 5 commits into from
Mar 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.nio.file.Path;
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneOffset;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

Expand Down Expand Up @@ -104,6 +107,8 @@ public void setUpUnchangedState() throws Exception {
syncConfig.getWorldStateHashCountPerRequest(),
syncConfig.getWorldStateRequestParallelism(),
syncConfig.getWorldStateMaxRequestsWithoutProgress(),
syncConfig.getWorldStateMinMillisBeforeStalling(),
Clock.fixed(Instant.ofEpochSecond(1000), ZoneOffset.UTC),
metricsSystem);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import tech.pegasys.pantheon.util.Subscribers;

import java.nio.file.Path;
import java.time.Clock;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;

Expand All @@ -55,6 +56,7 @@ public DefaultSynchronizer(
final EthContext ethContext,
final SyncState syncState,
final Path dataDirectory,
final Clock clock,
final MetricsSystem metricsSystem) {
this.syncState = syncState;

Expand Down Expand Up @@ -89,7 +91,8 @@ public DefaultSynchronizer(
metricsSystem,
ethContext,
worldStateStorage,
syncState);
syncState,
clock);
}

private TrailingPeerRequirements calculateTrailingPeerRequirements() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Clock;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

Expand Down Expand Up @@ -71,7 +72,8 @@ public static <C> Optional<FastSynchronizer<C>> create(
final MetricsSystem metricsSystem,
final EthContext ethContext,
final WorldStateStorage worldStateStorage,
final SyncState syncState) {
final SyncState syncState,
final Clock clock) {
if (syncConfig.syncMode() != SyncMode.FAST) {
return Optional.empty();
}
Expand Down Expand Up @@ -100,6 +102,8 @@ public static <C> Optional<FastSynchronizer<C>> create(
syncConfig.getWorldStateHashCountPerRequest(),
syncConfig.getWorldStateRequestParallelism(),
syncConfig.getWorldStateMaxRequestsWithoutProgress(),
syncConfig.getWorldStateMinMillisBeforeStalling(),
clock,
metricsSystem);
final FastSyncDownloader<C> fastSyncDownloader =
new FastSyncDownloader<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import tech.pegasys.pantheon.util.uint.UInt256;

import java.time.Duration;
import java.util.concurrent.TimeUnit;

import com.google.common.collect.Range;

Expand All @@ -29,7 +30,9 @@ public class SynchronizerConfiguration {
private static final Duration DEFAULT_FAST_SYNC_MAXIMUM_PEER_WAIT_TIME = Duration.ofMinutes(5);
private static final int DEFAULT_WORLD_STATE_HASH_COUNT_PER_REQUEST = 384;
private static final int DEFAULT_WORLD_STATE_REQUEST_PARALLELISM = 10;
private static final int DEFAULT_WORLD_STATE_MAX_REQUESTS_WITHOUT_PROGRESS = 100;
private static final int DEFAULT_WORLD_STATE_MAX_REQUESTS_WITHOUT_PROGRESS = 1000;
private static final long DEFAULT_WORLD_STATE_MIN_MILLIS_BEFORE_STALLING =
TimeUnit.MINUTES.toMillis(5);

// Fast sync config
private final int fastSyncPivotDistance;
Expand Down Expand Up @@ -57,6 +60,7 @@ public class SynchronizerConfiguration {
private final int transactionsParallelism;
private final int computationParallelism;
private final int maxTrailingPeers;
private final long worldStateMinMillisBeforeStalling;

private SynchronizerConfiguration(
final int fastSyncPivotDistance,
Expand All @@ -66,6 +70,7 @@ private SynchronizerConfiguration(
final int worldStateHashCountPerRequest,
final int worldStateRequestParallelism,
final int worldStateMaxRequestsWithoutProgress,
final long worldStateMinMillisBeforeStalling,
final Range<Long> blockPropagationRange,
final SyncMode syncMode,
final long downloaderChangeTargetThresholdByHeight,
Expand All @@ -85,6 +90,7 @@ private SynchronizerConfiguration(
this.worldStateHashCountPerRequest = worldStateHashCountPerRequest;
this.worldStateRequestParallelism = worldStateRequestParallelism;
this.worldStateMaxRequestsWithoutProgress = worldStateMaxRequestsWithoutProgress;
this.worldStateMinMillisBeforeStalling = worldStateMinMillisBeforeStalling;
this.blockPropagationRange = blockPropagationRange;
this.syncMode = syncMode;
this.downloaderChangeTargetThresholdByHeight = downloaderChangeTargetThresholdByHeight;
Expand Down Expand Up @@ -199,6 +205,10 @@ public int getWorldStateMaxRequestsWithoutProgress() {
return worldStateMaxRequestsWithoutProgress;
}

public long getWorldStateMinMillisBeforeStalling() {
return worldStateMinMillisBeforeStalling;
}

public int getMaxTrailingPeers() {
return maxTrailingPeers;
}
Expand All @@ -222,6 +232,7 @@ public static class Builder {
private int worldStateRequestParallelism = DEFAULT_WORLD_STATE_REQUEST_PARALLELISM;
private int worldStateMaxRequestsWithoutProgress =
DEFAULT_WORLD_STATE_MAX_REQUESTS_WITHOUT_PROGRESS;
private long worldStateMinMillisBeforeStalling = DEFAULT_WORLD_STATE_MIN_MILLIS_BEFORE_STALLING;
private Duration fastSyncMaximumPeerWaitTime = DEFAULT_FAST_SYNC_MAXIMUM_PEER_WAIT_TIME;
private int maxTrailingPeers = Integer.MAX_VALUE;

Expand Down Expand Up @@ -335,6 +346,7 @@ public SynchronizerConfiguration build() {
worldStateHashCountPerRequest,
worldStateRequestParallelism,
worldStateMaxRequestsWithoutProgress,
worldStateMinMillisBeforeStalling,
blockPropagationRange,
syncMode,
downloaderChangeTargetThresholdByHeight,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import tech.pegasys.pantheon.util.ExceptionUtils;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.time.Clock;
import java.util.Collection;
import java.util.Collections;
import java.util.Set;
Expand All @@ -38,21 +39,29 @@ class WorldDownloadState {
private final boolean downloadWasResumed;
private final CachingTaskCollection<NodeDataRequest> pendingRequests;
private final int maxRequestsWithoutProgress;
private final Clock clock;
private final Set<EthTask<?>> outstandingRequests =
Collections.newSetFromMap(new ConcurrentHashMap<>());
private final CompletableFuture<Void> internalFuture;
private final CompletableFuture<Void> downloadFuture;
// Volatile so monitoring can access it without having to synchronize.
private volatile int requestsSinceLastProgress = 0;
private final long minMillisBeforeStalling;
private volatile long timestampOfLastProgress;
private BytesValue rootNodeData;
private WorldStateDownloadProcess worldStateDownloadProcess;

public WorldDownloadState(
final CachingTaskCollection<NodeDataRequest> pendingRequests,
final int maxRequestsWithoutProgress) {
final int maxRequestsWithoutProgress,
final long minMillisBeforeStalling,
final Clock clock) {
this.minMillisBeforeStalling = minMillisBeforeStalling;
this.timestampOfLastProgress = clock.millis();
this.downloadWasResumed = !pendingRequests.isEmpty();
this.pendingRequests = pendingRequests;
this.maxRequestsWithoutProgress = maxRequestsWithoutProgress;
this.clock = clock;
this.internalFuture = new CompletableFuture<>();
this.downloadFuture = new CompletableFuture<>();
this.internalFuture.whenComplete(this::cleanup);
Expand Down Expand Up @@ -147,9 +156,11 @@ public synchronized void setRootNodeData(final BytesValue rootNodeData) {
public synchronized void requestComplete(final boolean madeProgress) {
if (madeProgress) {
requestsSinceLastProgress = 0;
timestampOfLastProgress = clock.millis();
} else {
requestsSinceLastProgress++;
if (requestsSinceLastProgress >= maxRequestsWithoutProgress) {
if (requestsSinceLastProgress >= maxRequestsWithoutProgress
&& timestampOfLastProgress + minMillisBeforeStalling < clock.millis()) {
markAsStalled(maxRequestsWithoutProgress);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.services.tasks.CachingTaskCollection;

import java.time.Clock;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
Expand All @@ -31,6 +32,8 @@
public class WorldStateDownloader {
private static final Logger LOG = LogManager.getLogger();

private final long minMillisBeforeStalling;
private final Clock clock;
private final MetricsSystem metricsSystem;

private final EthContext ethContext;
Expand All @@ -49,13 +52,17 @@ public WorldStateDownloader(
final int hashCountPerRequest,
final int maxOutstandingRequests,
final int maxNodeRequestsWithoutProgress,
final long minMillisBeforeStalling,
final Clock clock,
final MetricsSystem metricsSystem) {
this.ethContext = ethContext;
this.worldStateStorage = worldStateStorage;
this.taskCollection = taskCollection;
this.hashCountPerRequest = hashCountPerRequest;
this.maxOutstandingRequests = maxOutstandingRequests;
this.maxNodeRequestsWithoutProgress = maxNodeRequestsWithoutProgress;
this.minMillisBeforeStalling = minMillisBeforeStalling;
this.clock = clock;
this.metricsSystem = metricsSystem;

metricsSystem.createIntegerGauge(
Expand Down Expand Up @@ -105,7 +112,8 @@ public CompletableFuture<Void> run(final BlockHeader header) {
stateRoot);

final WorldDownloadState newDownloadState =
new WorldDownloadState(taskCollection, maxNodeRequestsWithoutProgress);
new WorldDownloadState(
taskCollection, maxNodeRequestsWithoutProgress, minMillisBeforeStalling, clock);
this.downloadState.set(newDownloadState);

if (!newDownloadState.downloadWasResumed()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import static java.util.Collections.singletonList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static tech.pegasys.pantheon.ethereum.eth.sync.worldstate.NodeDataRequest.createAccountDataRequest;
Expand All @@ -27,9 +28,11 @@
import tech.pegasys.pantheon.services.kvstore.InMemoryKeyValueStorage;
import tech.pegasys.pantheon.services.tasks.CachingTaskCollection;
import tech.pegasys.pantheon.services.tasks.InMemoryTaskQueue;
import tech.pegasys.pantheon.testutil.TestClock;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import org.junit.Before;
import org.junit.Test;
Expand All @@ -39,6 +42,7 @@ public class WorldDownloadStateTest {
private static final BytesValue ROOT_NODE_DATA = BytesValue.of(1, 2, 3, 4);
private static final Hash ROOT_NODE_HASH = Hash.hash(ROOT_NODE_DATA);
private static final int MAX_REQUESTS_WITHOUT_PROGRESS = 10;
private static final long MIN_MILLIS_BEFORE_STALLING = 50_000;

private final WorldStateStorage worldStateStorage =
new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage());
Expand All @@ -50,8 +54,10 @@ public class WorldDownloadStateTest {
private final WorldStateDownloadProcess worldStateDownloadProcess =
mock(WorldStateDownloadProcess.class);

private final TestClock clock = new TestClock();
private final WorldDownloadState downloadState =
new WorldDownloadState(pendingRequests, MAX_REQUESTS_WITHOUT_PROGRESS);
new WorldDownloadState(
pendingRequests, MAX_REQUESTS_WITHOUT_PROGRESS, MIN_MILLIS_BEFORE_STALLING, clock);

private final CompletableFuture<Void> future = downloadState.getDownloadFuture();

Expand Down Expand Up @@ -121,23 +127,71 @@ public void shouldResetRequestsSinceProgressCountWhenProgressIsMade() {
downloadState.requestComplete(false);

downloadState.requestComplete(true);
clock.stepMillis(MIN_MILLIS_BEFORE_STALLING + 1);

for (int i = 0; i < MAX_REQUESTS_WITHOUT_PROGRESS - 1; i++) {
downloadState.requestComplete(false);
assertThat(downloadState.getDownloadFuture()).isNotDone();
}

downloadState.requestComplete(false);
assertThat(downloadState.getDownloadFuture()).isCompletedExceptionally();
assertWorldStateStalled(downloadState);
}

@Test
public void shouldNotAddRequestsAfterDownloadIsStalled() {
public void shouldNotBeStalledWhenMaxRequestsReachedUntilMinimumTimeAlsoReached() {
for (int i = 0; i < MAX_REQUESTS_WITHOUT_PROGRESS; i++) {
downloadState.requestComplete(false);
assertThat(downloadState.getDownloadFuture()).isNotDone();
}

// Exceeding the requests without progress limit doesn't trigger stalled state
downloadState.requestComplete(false);
assertThat(downloadState.getDownloadFuture()).isNotDone();

// Until the minimum time has elapsed, then the next request with no progress marks as stalled
clock.stepMillis(MIN_MILLIS_BEFORE_STALLING + 1);
downloadState.requestComplete(false);
assertWorldStateStalled(downloadState);
}

@Test
public void shouldNotBeStalledIfMinimumTimeIsReachedButMaximumRequestsIsNot() {
clock.stepMillis(MIN_MILLIS_BEFORE_STALLING + 1);
downloadState.requestComplete(false);
assertThat(downloadState.getDownloadFuture()).isNotDone();
}

@Test
public void shouldResetTimeSinceProgressWhenProgressIsMade() {
// Enough time has progressed but the next request makes progress so we are not stalled.
clock.stepMillis(MIN_MILLIS_BEFORE_STALLING + 1);
downloadState.requestComplete(true);
assertThat(downloadState.getDownloadFuture()).isNotDone();

// We then reach the max number of requests without progress but the timer should have reset
for (int i = 0; i < MAX_REQUESTS_WITHOUT_PROGRESS; i++) {
downloadState.requestComplete(false);
assertThat(downloadState.getDownloadFuture()).isNotDone();
}
assertThat(downloadState.getDownloadFuture()).isNotDone();
}

@Test
public void shouldNotAddRequestsAfterDownloadIsCompleted() {
downloadState.checkCompletion(worldStateStorage, header);

downloadState.enqueueRequests(singletonList(createAccountDataRequest(Hash.EMPTY_TRIE_HASH)));
downloadState.enqueueRequest(createAccountDataRequest(Hash.EMPTY_TRIE_HASH));

assertThat(pendingRequests.isEmpty()).isTrue();
}

private void assertWorldStateStalled(final WorldDownloadState state) {
final CompletableFuture<Void> future = state.getDownloadFuture();
assertThat(future).isCompletedExceptionally();
assertThatThrownBy(future::get)
.isInstanceOf(ExecutionException.class)
.hasRootCauseInstanceOf(StalledDownloadException.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import tech.pegasys.pantheon.services.kvstore.InMemoryKeyValueStorage;
import tech.pegasys.pantheon.services.tasks.CachingTaskCollection;
import tech.pegasys.pantheon.services.tasks.InMemoryTaskQueue;
import tech.pegasys.pantheon.testutil.TestClock;
import tech.pegasys.pantheon.util.bytes.Bytes32;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import tech.pegasys.pantheon.util.uint.UInt256;
Expand Down Expand Up @@ -974,6 +975,8 @@ private WorldStateDownloader createDownloader(
config.getWorldStateHashCountPerRequest(),
config.getWorldStateRequestParallelism(),
config.getWorldStateMaxRequestsWithoutProgress(),
config.getWorldStateMinMillisBeforeStalling(),
TestClock.fixed(),
new NoOpMetricsSystem());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ static PantheonController<CliqueContext> init(
ethProtocolManager.ethContext(),
syncState,
dataDirectory,
clock,
metricsSystem);

final TransactionPool transactionPool =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ static PantheonController<IbftContext> init(
istanbul64ProtocolManager.ethContext(),
syncState,
dataDirectory,
clock,
metricsSystem);

final Runnable closer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ static PantheonController<IbftContext> init(
ethProtocolManager.ethContext(),
syncState,
dataDirectory,
clock,
metricsSystem);

final TransactionPool transactionPool =
Expand Down
Loading