Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
[NC-2207] Integrate rocksdb-based queue into WorldStateDownloader (#746)
Browse files Browse the repository at this point in the history
  • Loading branch information
mbaxter authored Feb 5, 2019
1 parent cd07193 commit 7e25096
Show file tree
Hide file tree
Showing 16 changed files with 239 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
/** Provides an interface to block synchronization processes. */
public interface Synchronizer {

public void start();
void start();

void stop();

/**
* @return the status, based on SyncingResult When actively synchronizing blocks, alternatively
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,27 @@
import tech.pegasys.pantheon.ethereum.eth.sync.fullsync.FullSyncDownloader;
import tech.pegasys.pantheon.ethereum.eth.sync.state.PendingBlocks;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.eth.sync.worldstate.NodeDataRequest;
import tech.pegasys.pantheon.ethereum.eth.sync.worldstate.WorldStateDownloader;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.services.queue.InMemoryBigQueue;
import tech.pegasys.pantheon.services.queue.BigQueue;
import tech.pegasys.pantheon.services.queue.BytesQueue;
import tech.pegasys.pantheon.services.queue.BytesQueueAdapter;
import tech.pegasys.pantheon.services.queue.RocksDbQueue;
import tech.pegasys.pantheon.util.ExceptionUtils;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;

import com.google.common.io.MoreFiles;
import com.google.common.io.RecursiveDeleteOption;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -48,6 +58,9 @@ public class DefaultSynchronizer<C> implements Synchronizer {
private final BlockPropagationManager<C> blockPropagationManager;
private final FullSyncDownloader<C> fullSyncDownloader;
private final Optional<FastSyncDownloader<C>> fastSyncDownloader;
private final Path stateQueueDirectory;
private final BigQueue<NodeDataRequest> stateQueue;
private final WorldStateDownloader worldStateDownloader;

public DefaultSynchronizer(
final SynchronizerConfiguration syncConfig,
Expand All @@ -56,10 +69,14 @@ public DefaultSynchronizer(
final WorldStateStorage worldStateStorage,
final EthContext ethContext,
final SyncState syncState,
final LabelledMetric<OperationTimer> ethTasksTimer) {
final Path dataDirectory,
final MetricsSystem metricsSystem) {
this.syncConfig = syncConfig;
this.ethContext = ethContext;
this.syncState = syncState;
LabelledMetric<OperationTimer> ethTasksTimer =
metricsSystem.createLabelledTimer(
MetricCategory.SYNCHRONIZER, "task", "Internal processing tasks", "taskName");
this.blockPropagationManager =
new BlockPropagationManager<>(
syncConfig,
Expand All @@ -69,19 +86,22 @@ public DefaultSynchronizer(
syncState,
new PendingBlocks(),
ethTasksTimer);

ChainHeadTracker.trackChainHeadForPeers(
ethContext, protocolSchedule, protocolContext.getBlockchain(), syncConfig, ethTasksTimer);

this.fullSyncDownloader =
new FullSyncDownloader<>(
syncConfig, protocolSchedule, protocolContext, ethContext, syncState, ethTasksTimer);

ChainHeadTracker.trackChainHeadForPeers(
ethContext, protocolSchedule, protocolContext.getBlockchain(), syncConfig, ethTasksTimer);
if (syncConfig.syncMode() == SyncMode.FAST) {
LOG.info("Fast sync enabled.");
final WorldStateDownloader worldStateDownloader =
this.stateQueueDirectory = getStateQueueDirectory(dataDirectory);
this.stateQueue = createWorldStateDownloaderQueue(stateQueueDirectory, metricsSystem);
this.worldStateDownloader =
new WorldStateDownloader(
ethContext,
worldStateStorage,
new InMemoryBigQueue<>(),
stateQueue,
syncConfig.getWorldStateHashCountPerRequest(),
syncConfig.getWorldStateRequestParallelism(),
ethTasksTimer);
Expand All @@ -98,6 +118,9 @@ public DefaultSynchronizer(
worldStateDownloader));
} else {
this.fastSyncDownloader = Optional.empty();
this.worldStateDownloader = null;
this.stateQueueDirectory = null;
this.stateQueue = null;
}
}

Expand All @@ -114,6 +137,43 @@ public void start() {
}
}

@Override
public void stop() {
cleanupFastSync();
}

private void cleanupFastSync() {
if (!fastSyncDownloader.isPresent()) {
// We're not fast syncing - nothing to do
return;
}

// Make sure downloader is stopped before we start cleaning up its dependencies
worldStateDownloader.cancel();
try {
stateQueue.close();
if (stateQueueDirectory.toFile().exists()) {
// Clean up this data for now (until fast sync resume functionality is in place)
MoreFiles.deleteRecursively(stateQueueDirectory, RecursiveDeleteOption.ALLOW_INSECURE);
}
} catch (IOException e) {
LOG.error("Unable to clean up fast sync state", e);
}
}

private Path getStateQueueDirectory(final Path dataDirectory) {
Path queueDataDir = dataDirectory.resolve("fastsync/statequeue");
queueDataDir.toFile().mkdirs();
return queueDataDir;
}

private BigQueue<NodeDataRequest> createWorldStateDownloaderQueue(
final Path dataDirectory, final MetricsSystem metricsSystem) {
BytesQueue bytesQueue = RocksDbQueue.create(dataDirectory, metricsSystem);
return new BytesQueueAdapter<>(
bytesQueue, NodeDataRequest::serialize, NodeDataRequest::deserialize);
}

private void handleFastSyncResult(final FastSyncState result, final Throwable error) {

final Throwable rootCause = ExceptionUtils.rootCause(error);
Expand All @@ -128,6 +188,8 @@ private void handleFastSyncResult(final FastSyncState result, final Throwable er
"Fast sync completed successfully with pivot block {}",
result.getPivotBlockNumber().getAsLong());
}
cleanupFastSync();

startFullSync();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,6 @@ public int getWorldStateRequestParallelism() {
}

public static class Builder {
private int fastSyncPivotDistance = DEFAULT_PIVOT_DISTANCE_FROM_HEAD;
private float fastSyncFullValidationRate = DEFAULT_FULL_VALIDATION_RATE;
private SyncMode syncMode = SyncMode.FULL;
private Range<Long> blockPropagationRange = Range.closed(-10L, 30L);
private long downloaderChangeTargetThresholdByHeight = 20L;
Expand All @@ -283,6 +281,10 @@ public static class Builder {
private int downloaderParallelism = 2;
private int transactionsParallelism = 2;
private int computationParallelism = Runtime.getRuntime().availableProcessors();
private int fastSyncPivotDistance = DEFAULT_PIVOT_DISTANCE_FROM_HEAD;
private float fastSyncFullValidationRate = DEFAULT_FULL_VALIDATION_RATE;
private int fastSyncMinimumPeerCount = DEFAULT_FAST_SYNC_MINIMUM_PEERS;
private Duration fastSyncMaximumPeerWaitTime = DEFAULT_FAST_SYNC_MAXIMUM_PEER_WAIT_TIME;

public Builder fastSyncPivotDistance(final int distance) {
fastSyncPivotDistance = distance;
Expand Down Expand Up @@ -364,13 +366,23 @@ public Builder computationParallelism(final int computationParallelism) {
return this;
}

public Builder fastSyncMinimumPeerCount(final int fastSyncMinimumPeerCount) {
this.fastSyncMinimumPeerCount = fastSyncMinimumPeerCount;
return this;
}

public Builder fastSyncMaximumPeerWaitTime(final Duration fastSyncMaximumPeerWaitTime) {
this.fastSyncMaximumPeerWaitTime = fastSyncMaximumPeerWaitTime;
return this;
}

public SynchronizerConfiguration build() {
return new SynchronizerConfiguration(
syncMode,
fastSyncPivotDistance,
fastSyncFullValidationRate,
DEFAULT_FAST_SYNC_MINIMUM_PEERS,
DEFAULT_FAST_SYNC_MAXIMUM_PEER_WAIT_TIME,
fastSyncMinimumPeerCount,
fastSyncMaximumPeerWaitTime,
DEFAULT_WORLD_STATE_HASH_COUNT_PER_REQUEST,
DEFAULT_WORLD_STATE_REQUEST_PARALLELISM,
blockPropagationRange,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class WorldStateDownloader {
private static final Logger LOG = LogManager.getLogger();

private enum Status {
IDLE,
Expand Down Expand Up @@ -74,6 +78,10 @@ public WorldStateDownloader(
}

public CompletableFuture<Void> run(final BlockHeader header) {
LOG.info(
"Begin downloading world state from peers for block {} ({})",
header.getNumber(),
header.getHash());
synchronized (this) {
if (status == Status.DONE || status == Status.RUNNING) {
return future;
Expand All @@ -94,6 +102,10 @@ public CompletableFuture<Void> run(final BlockHeader header) {
return future;
}

public void cancel() {
// TODO
}

private void requestNodeData(final BlockHeader header) {
if (sendingRequests.compareAndSet(false, true)) {
while (shouldRequestNodeData()) {
Expand Down Expand Up @@ -137,6 +149,7 @@ private void requestNodeData(final BlockHeader header) {
}

private synchronized void markDone() {
LOG.info("Finished downloading world state from peers");
if (future == null) {
future = CompletableFuture.completedFuture(null);
} else {
Expand Down
3 changes: 3 additions & 0 deletions pantheon/src/main/java/tech/pegasys/pantheon/Runner.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ public void execute() {

@Override
public void close() throws Exception {
if (networkRunner.getNetwork().isP2pEnabled()) {
pantheonController.getSynchronizer().stop();
}
networkRunner.stop();
networkRunner.awaitStop();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public PantheonController<?> build() throws IOException {
miningParameters,
nodeKeys,
metricsSystem,
privacyParameters);
privacyParameters,
homePath);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@
import tech.pegasys.pantheon.ethereum.storage.StorageProvider;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateArchive;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;

import java.io.IOException;
import java.nio.file.Path;
import java.time.Clock;
import java.util.Collection;
import java.util.Map;
Expand Down Expand Up @@ -110,6 +110,7 @@ public static PantheonController<CliqueContext> init(
final MiningParameters miningParams,
final int networkId,
final KeyPair nodeKeys,
final Path dataDirectory,
final MetricsSystem metricsSystem) {
final Address localAddress = Util.publicKeyToAddress(nodeKeys.getPublicKey());
final CliqueConfigOptions cliqueConfig =
Expand Down Expand Up @@ -164,8 +165,8 @@ public static PantheonController<CliqueContext> init(
worldStateStorage,
ethProtocolManager.ethContext(),
syncState,
metricsSystem.createLabelledTimer(
MetricCategory.SYNCHRONIZER, "task", "Internal processing tasks", "taskName"));
dataDirectory,
metricsSystem);

final TransactionPool transactionPool =
TransactionPoolFactory.createTransactionPool(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@
import tech.pegasys.pantheon.ethereum.storage.StorageProvider;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateArchive;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Map;

Expand Down Expand Up @@ -108,6 +108,7 @@ public static PantheonController<IbftContext> init(
final boolean ottomanTestnetOperation,
final int networkId,
final KeyPair nodeKeys,
final Path dataDirectory,
final MetricsSystem metricsSystem) {
final ProtocolSchedule<IbftContext> protocolSchedule =
IbftProtocolSchedule.create(genesisConfig.getConfigOptions());
Expand Down Expand Up @@ -175,8 +176,8 @@ public static PantheonController<IbftContext> init(
worldStateStorage,
ethProtocolManager.ethContext(),
syncState,
metricsSystem.createLabelledTimer(
MetricCategory.SYNCHRONIZER, "task", "Internal processing tasks", "taskName"));
dataDirectory,
metricsSystem);

final Runnable closer =
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@
import tech.pegasys.pantheon.ethereum.storage.StorageProvider;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateArchive;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.util.Subscribers;

import java.io.IOException;
import java.nio.file.Path;
import java.time.Clock;
import java.util.Collection;
import java.util.Map;
Expand Down Expand Up @@ -130,6 +130,7 @@ public static PantheonController<IbftContext> init(
final MiningParameters miningParams,
final int networkId,
final KeyPair nodeKeys,
final Path dataDirectory,
final MetricsSystem metricsSystem) {
final ProtocolSchedule<IbftContext> protocolSchedule =
IbftProtocolSchedule.create(genesisConfig.getConfigOptions());
Expand Down Expand Up @@ -180,8 +181,8 @@ public static PantheonController<IbftContext> init(
worldStateStorage,
ethProtocolManager.ethContext(),
syncState,
metricsSystem.createLabelledTimer(
MetricCategory.SYNCHRONIZER, "task", "Internal processing tasks", "taskName"));
dataDirectory,
metricsSystem);

final TransactionPool transactionPool =
TransactionPoolFactory.createTransactionPool(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@
import tech.pegasys.pantheon.ethereum.storage.StorageProvider;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateArchive;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;

import java.io.IOException;
import java.nio.file.Path;
import java.time.Clock;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -98,8 +98,9 @@ public static PantheonController<Void> init(
final SynchronizerConfiguration taintedSyncConfig,
final MiningParameters miningParams,
final KeyPair nodeKeys,
final MetricsSystem metricsSystem,
final PrivacyParameters privacyParameters) {
final PrivacyParameters privacyParameters,
final Path dataDirectory,
final MetricsSystem metricsSystem) {

final GenesisState genesisState = GenesisState.fromConfig(genesisConfig, protocolSchedule);
final BlockchainStorage blockchainStorage =
Expand Down Expand Up @@ -139,8 +140,8 @@ public static PantheonController<Void> init(
worldStateStorage,
ethProtocolManager.ethContext(),
syncState,
metricsSystem.createLabelledTimer(
MetricCategory.SYNCHRONIZER, "task", "Internal processing tasks", "taskName"));
dataDirectory,
metricsSystem);

final TransactionPool transactionPool =
TransactionPoolFactory.createTransactionPool(
Expand Down
Loading

0 comments on commit 7e25096

Please sign in to comment.