diff --git a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/Synchronizer.java b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/Synchronizer.java index 3194d11b13..653ff0f938 100644 --- a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/Synchronizer.java +++ b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/Synchronizer.java @@ -17,7 +17,9 @@ /** Provides an interface to block synchronization processes. */ public interface Synchronizer { - public void start(); + void start(); + + void stop(); /** * @return the status, based on SyncingResult When actively synchronizing blocks, alternatively diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java index 8cca3e1035..09c7233fe0 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java @@ -23,17 +23,27 @@ import tech.pegasys.pantheon.ethereum.eth.sync.fullsync.FullSyncDownloader; import tech.pegasys.pantheon.ethereum.eth.sync.state.PendingBlocks; import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState; +import tech.pegasys.pantheon.ethereum.eth.sync.worldstate.NodeDataRequest; import tech.pegasys.pantheon.ethereum.eth.sync.worldstate.WorldStateDownloader; import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage; import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.MetricCategory; +import tech.pegasys.pantheon.metrics.MetricsSystem; import tech.pegasys.pantheon.metrics.OperationTimer; -import tech.pegasys.pantheon.services.queue.InMemoryBigQueue; +import tech.pegasys.pantheon.services.queue.BigQueue; +import tech.pegasys.pantheon.services.queue.BytesQueue; +import tech.pegasys.pantheon.services.queue.BytesQueueAdapter; +import tech.pegasys.pantheon.services.queue.RocksDbQueue; import tech.pegasys.pantheon.util.ExceptionUtils; +import java.io.IOException; +import java.nio.file.Path; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.io.MoreFiles; +import com.google.common.io.RecursiveDeleteOption; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -48,6 +58,9 @@ public class DefaultSynchronizer implements Synchronizer { private final BlockPropagationManager blockPropagationManager; private final FullSyncDownloader fullSyncDownloader; private final Optional> fastSyncDownloader; + private final Path stateQueueDirectory; + private final BigQueue stateQueue; + private final WorldStateDownloader worldStateDownloader; public DefaultSynchronizer( final SynchronizerConfiguration syncConfig, @@ -56,10 +69,14 @@ public DefaultSynchronizer( final WorldStateStorage worldStateStorage, final EthContext ethContext, final SyncState syncState, - final LabelledMetric ethTasksTimer) { + final Path dataDirectory, + final MetricsSystem metricsSystem) { this.syncConfig = syncConfig; this.ethContext = ethContext; this.syncState = syncState; + LabelledMetric ethTasksTimer = + metricsSystem.createLabelledTimer( + MetricCategory.SYNCHRONIZER, "task", "Internal processing tasks", "taskName"); this.blockPropagationManager = new BlockPropagationManager<>( syncConfig, @@ -69,19 +86,22 @@ public DefaultSynchronizer( syncState, new PendingBlocks(), ethTasksTimer); + + ChainHeadTracker.trackChainHeadForPeers( + ethContext, protocolSchedule, protocolContext.getBlockchain(), syncConfig, ethTasksTimer); + this.fullSyncDownloader = new FullSyncDownloader<>( syncConfig, protocolSchedule, protocolContext, ethContext, syncState, ethTasksTimer); - ChainHeadTracker.trackChainHeadForPeers( - ethContext, protocolSchedule, protocolContext.getBlockchain(), syncConfig, ethTasksTimer); if (syncConfig.syncMode() == SyncMode.FAST) { - LOG.info("Fast sync enabled."); - final WorldStateDownloader worldStateDownloader = + this.stateQueueDirectory = getStateQueueDirectory(dataDirectory); + this.stateQueue = createWorldStateDownloaderQueue(stateQueueDirectory, metricsSystem); + this.worldStateDownloader = new WorldStateDownloader( ethContext, worldStateStorage, - new InMemoryBigQueue<>(), + stateQueue, syncConfig.getWorldStateHashCountPerRequest(), syncConfig.getWorldStateRequestParallelism(), ethTasksTimer); @@ -98,6 +118,9 @@ public DefaultSynchronizer( worldStateDownloader)); } else { this.fastSyncDownloader = Optional.empty(); + this.worldStateDownloader = null; + this.stateQueueDirectory = null; + this.stateQueue = null; } } @@ -114,6 +137,43 @@ public void start() { } } + @Override + public void stop() { + cleanupFastSync(); + } + + private void cleanupFastSync() { + if (!fastSyncDownloader.isPresent()) { + // We're not fast syncing - nothing to do + return; + } + + // Make sure downloader is stopped before we start cleaning up its dependencies + worldStateDownloader.cancel(); + try { + stateQueue.close(); + if (stateQueueDirectory.toFile().exists()) { + // Clean up this data for now (until fast sync resume functionality is in place) + MoreFiles.deleteRecursively(stateQueueDirectory, RecursiveDeleteOption.ALLOW_INSECURE); + } + } catch (IOException e) { + LOG.error("Unable to clean up fast sync state", e); + } + } + + private Path getStateQueueDirectory(final Path dataDirectory) { + Path queueDataDir = dataDirectory.resolve("fastsync/statequeue"); + queueDataDir.toFile().mkdirs(); + return queueDataDir; + } + + private BigQueue createWorldStateDownloaderQueue( + final Path dataDirectory, final MetricsSystem metricsSystem) { + BytesQueue bytesQueue = RocksDbQueue.create(dataDirectory, metricsSystem); + return new BytesQueueAdapter<>( + bytesQueue, NodeDataRequest::serialize, NodeDataRequest::deserialize); + } + private void handleFastSyncResult(final FastSyncState result, final Throwable error) { final Throwable rootCause = ExceptionUtils.rootCause(error); @@ -128,6 +188,8 @@ private void handleFastSyncResult(final FastSyncState result, final Throwable er "Fast sync completed successfully with pivot block {}", result.getPivotBlockNumber().getAsLong()); } + cleanupFastSync(); + startFullSync(); } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java index 5263197ae7..f8a4a3cc95 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/SynchronizerConfiguration.java @@ -268,8 +268,6 @@ public int getWorldStateRequestParallelism() { } public static class Builder { - private int fastSyncPivotDistance = DEFAULT_PIVOT_DISTANCE_FROM_HEAD; - private float fastSyncFullValidationRate = DEFAULT_FULL_VALIDATION_RATE; private SyncMode syncMode = SyncMode.FULL; private Range blockPropagationRange = Range.closed(-10L, 30L); private long downloaderChangeTargetThresholdByHeight = 20L; @@ -283,6 +281,10 @@ public static class Builder { private int downloaderParallelism = 2; private int transactionsParallelism = 2; private int computationParallelism = Runtime.getRuntime().availableProcessors(); + private int fastSyncPivotDistance = DEFAULT_PIVOT_DISTANCE_FROM_HEAD; + private float fastSyncFullValidationRate = DEFAULT_FULL_VALIDATION_RATE; + private int fastSyncMinimumPeerCount = DEFAULT_FAST_SYNC_MINIMUM_PEERS; + private Duration fastSyncMaximumPeerWaitTime = DEFAULT_FAST_SYNC_MAXIMUM_PEER_WAIT_TIME; public Builder fastSyncPivotDistance(final int distance) { fastSyncPivotDistance = distance; @@ -364,13 +366,23 @@ public Builder computationParallelism(final int computationParallelism) { return this; } + public Builder fastSyncMinimumPeerCount(final int fastSyncMinimumPeerCount) { + this.fastSyncMinimumPeerCount = fastSyncMinimumPeerCount; + return this; + } + + public Builder fastSyncMaximumPeerWaitTime(final Duration fastSyncMaximumPeerWaitTime) { + this.fastSyncMaximumPeerWaitTime = fastSyncMaximumPeerWaitTime; + return this; + } + public SynchronizerConfiguration build() { return new SynchronizerConfiguration( syncMode, fastSyncPivotDistance, fastSyncFullValidationRate, - DEFAULT_FAST_SYNC_MINIMUM_PEERS, - DEFAULT_FAST_SYNC_MAXIMUM_PEER_WAIT_TIME, + fastSyncMinimumPeerCount, + fastSyncMaximumPeerWaitTime, DEFAULT_WORLD_STATE_HASH_COUNT_PER_REQUEST, DEFAULT_WORLD_STATE_REQUEST_PARALLELISM, blockPropagationRange, diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java index a043b6bbe2..f31af93575 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java @@ -37,7 +37,11 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + public class WorldStateDownloader { + private static final Logger LOG = LogManager.getLogger(); private enum Status { IDLE, @@ -74,6 +78,10 @@ public WorldStateDownloader( } public CompletableFuture run(final BlockHeader header) { + LOG.info( + "Begin downloading world state from peers for block {} ({})", + header.getNumber(), + header.getHash()); synchronized (this) { if (status == Status.DONE || status == Status.RUNNING) { return future; @@ -94,6 +102,10 @@ public CompletableFuture run(final BlockHeader header) { return future; } + public void cancel() { + // TODO + } + private void requestNodeData(final BlockHeader header) { if (sendingRequests.compareAndSet(false, true)) { while (shouldRequestNodeData()) { @@ -137,6 +149,7 @@ private void requestNodeData(final BlockHeader header) { } private synchronized void markDone() { + LOG.info("Finished downloading world state from peers"); if (future == null) { future = CompletableFuture.completedFuture(null); } else { diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/Runner.java b/pantheon/src/main/java/tech/pegasys/pantheon/Runner.java index 6fde46423a..1e980d7343 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/Runner.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/Runner.java @@ -88,6 +88,9 @@ public void execute() { @Override public void close() throws Exception { + if (networkRunner.getNetwork().isP2pEnabled()) { + pantheonController.getSynchronizer().stop(); + } networkRunner.stop(); networkRunner.awaitStop(); diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/cli/PantheonControllerBuilder.java b/pantheon/src/main/java/tech/pegasys/pantheon/cli/PantheonControllerBuilder.java index 779c3d35eb..e4e3251c30 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/cli/PantheonControllerBuilder.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/cli/PantheonControllerBuilder.java @@ -112,6 +112,7 @@ public PantheonController build() throws IOException { miningParameters, nodeKeys, metricsSystem, - privacyParameters); + privacyParameters, + homePath); } } diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/controller/CliquePantheonController.java b/pantheon/src/main/java/tech/pegasys/pantheon/controller/CliquePantheonController.java index 34622d6b6f..ed21d93a75 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/controller/CliquePantheonController.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/controller/CliquePantheonController.java @@ -57,10 +57,10 @@ import tech.pegasys.pantheon.ethereum.storage.StorageProvider; import tech.pegasys.pantheon.ethereum.worldstate.WorldStateArchive; import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage; -import tech.pegasys.pantheon.metrics.MetricCategory; import tech.pegasys.pantheon.metrics.MetricsSystem; import java.io.IOException; +import java.nio.file.Path; import java.time.Clock; import java.util.Collection; import java.util.Map; @@ -110,6 +110,7 @@ public static PantheonController init( final MiningParameters miningParams, final int networkId, final KeyPair nodeKeys, + final Path dataDirectory, final MetricsSystem metricsSystem) { final Address localAddress = Util.publicKeyToAddress(nodeKeys.getPublicKey()); final CliqueConfigOptions cliqueConfig = @@ -164,8 +165,8 @@ public static PantheonController init( worldStateStorage, ethProtocolManager.ethContext(), syncState, - metricsSystem.createLabelledTimer( - MetricCategory.SYNCHRONIZER, "task", "Internal processing tasks", "taskName")); + dataDirectory, + metricsSystem); final TransactionPool transactionPool = TransactionPoolFactory.createTransactionPool( diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftLegacyPantheonController.java b/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftLegacyPantheonController.java index b7624a5d27..8d2621ab96 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftLegacyPantheonController.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftLegacyPantheonController.java @@ -54,10 +54,10 @@ import tech.pegasys.pantheon.ethereum.storage.StorageProvider; import tech.pegasys.pantheon.ethereum.worldstate.WorldStateArchive; import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage; -import tech.pegasys.pantheon.metrics.MetricCategory; import tech.pegasys.pantheon.metrics.MetricsSystem; import java.io.IOException; +import java.nio.file.Path; import java.util.Collection; import java.util.Map; @@ -108,6 +108,7 @@ public static PantheonController init( final boolean ottomanTestnetOperation, final int networkId, final KeyPair nodeKeys, + final Path dataDirectory, final MetricsSystem metricsSystem) { final ProtocolSchedule protocolSchedule = IbftProtocolSchedule.create(genesisConfig.getConfigOptions()); @@ -175,8 +176,8 @@ public static PantheonController init( worldStateStorage, ethProtocolManager.ethContext(), syncState, - metricsSystem.createLabelledTimer( - MetricCategory.SYNCHRONIZER, "task", "Internal processing tasks", "taskName")); + dataDirectory, + metricsSystem); final Runnable closer = () -> { diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java b/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java index cddd7dbe54..beab7d798f 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/controller/IbftPantheonController.java @@ -74,11 +74,11 @@ import tech.pegasys.pantheon.ethereum.storage.StorageProvider; import tech.pegasys.pantheon.ethereum.worldstate.WorldStateArchive; import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage; -import tech.pegasys.pantheon.metrics.MetricCategory; import tech.pegasys.pantheon.metrics.MetricsSystem; import tech.pegasys.pantheon.util.Subscribers; import java.io.IOException; +import java.nio.file.Path; import java.time.Clock; import java.util.Collection; import java.util.Map; @@ -130,6 +130,7 @@ public static PantheonController init( final MiningParameters miningParams, final int networkId, final KeyPair nodeKeys, + final Path dataDirectory, final MetricsSystem metricsSystem) { final ProtocolSchedule protocolSchedule = IbftProtocolSchedule.create(genesisConfig.getConfigOptions()); @@ -180,8 +181,8 @@ public static PantheonController init( worldStateStorage, ethProtocolManager.ethContext(), syncState, - metricsSystem.createLabelledTimer( - MetricCategory.SYNCHRONIZER, "task", "Internal processing tasks", "taskName")); + dataDirectory, + metricsSystem); final TransactionPool transactionPool = TransactionPoolFactory.createTransactionPool( diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/controller/MainnetPantheonController.java b/pantheon/src/main/java/tech/pegasys/pantheon/controller/MainnetPantheonController.java index 16744aac9a..8e9028e623 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/controller/MainnetPantheonController.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/controller/MainnetPantheonController.java @@ -43,10 +43,10 @@ import tech.pegasys.pantheon.ethereum.storage.StorageProvider; import tech.pegasys.pantheon.ethereum.worldstate.WorldStateArchive; import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage; -import tech.pegasys.pantheon.metrics.MetricCategory; import tech.pegasys.pantheon.metrics.MetricsSystem; import java.io.IOException; +import java.nio.file.Path; import java.time.Clock; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -98,8 +98,9 @@ public static PantheonController init( final SynchronizerConfiguration taintedSyncConfig, final MiningParameters miningParams, final KeyPair nodeKeys, - final MetricsSystem metricsSystem, - final PrivacyParameters privacyParameters) { + final PrivacyParameters privacyParameters, + final Path dataDirectory, + final MetricsSystem metricsSystem) { final GenesisState genesisState = GenesisState.fromConfig(genesisConfig, protocolSchedule); final BlockchainStorage blockchainStorage = @@ -139,8 +140,8 @@ public static PantheonController init( worldStateStorage, ethProtocolManager.ethContext(), syncState, - metricsSystem.createLabelledTimer( - MetricCategory.SYNCHRONIZER, "task", "Internal processing tasks", "taskName")); + dataDirectory, + metricsSystem); final TransactionPool transactionPool = TransactionPoolFactory.createTransactionPool( diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/controller/PantheonController.java b/pantheon/src/main/java/tech/pegasys/pantheon/controller/PantheonController.java index 75b881501d..eb9f14f870 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/controller/PantheonController.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/controller/PantheonController.java @@ -33,6 +33,7 @@ import tech.pegasys.pantheon.metrics.MetricsSystem; import java.io.Closeable; +import java.nio.file.Path; import java.util.Collection; import java.util.Map; @@ -49,7 +50,8 @@ static PantheonController fromConfig( final MiningParameters miningParameters, final KeyPair nodeKeys, final MetricsSystem metricsSystem, - final PrivacyParameters privacyParameters) { + final PrivacyParameters privacyParameters, + final Path dataDirectory) { final GenesisConfigOptions configOptions = genesisConfigFile.getConfigOptions(); @@ -61,8 +63,9 @@ static PantheonController fromConfig( syncConfig, miningParameters, nodeKeys, - metricsSystem, - privacyParameters); + privacyParameters, + dataDirectory, + metricsSystem); } else if (configOptions.isIbft2()) { return IbftPantheonController.init( storageProvider, @@ -71,6 +74,7 @@ static PantheonController fromConfig( miningParameters, networkId, nodeKeys, + dataDirectory, metricsSystem); } else if (configOptions.isIbftLegacy()) { return IbftLegacyPantheonController.init( @@ -80,6 +84,7 @@ static PantheonController fromConfig( ottomanTestnetOperation, networkId, nodeKeys, + dataDirectory, metricsSystem); } else if (configOptions.isClique()) { return CliquePantheonController.init( @@ -89,6 +94,7 @@ static PantheonController fromConfig( miningParameters, networkId, nodeKeys, + dataDirectory, metricsSystem); } else { throw new IllegalArgumentException("Unknown consensus mechanism defined"); diff --git a/pantheon/src/test/java/tech/pegasys/pantheon/RunnerTest.java b/pantheon/src/test/java/tech/pegasys/pantheon/RunnerTest.java index f66e272520..aff095fe12 100644 --- a/pantheon/src/test/java/tech/pegasys/pantheon/RunnerTest.java +++ b/pantheon/src/test/java/tech/pegasys/pantheon/RunnerTest.java @@ -46,6 +46,7 @@ import java.io.IOException; import java.net.InetAddress; import java.nio.file.Path; +import java.time.Duration; import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutorService; @@ -66,7 +67,6 @@ import okhttp3.Response; import org.assertj.core.api.Assertions; import org.awaitility.Awaitility; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -82,23 +82,17 @@ public void fullSyncFromGenesis() throws Exception { } @Test - @Ignore("Fast sync implementation in progress.") public void fastSyncFromGenesis() throws Exception { syncFromGenesis(SyncMode.FAST); } private void syncFromGenesis(final SyncMode mode) throws Exception { - final Path dbAhead = temp.newFolder().toPath(); + final Path dataDirAhead = temp.newFolder().toPath(); + final Path dbAhead = dataDirAhead.resolve("database"); final int blockCount = 500; final KeyPair aheadDbNodeKeys = loadKeyPair(dbAhead); - final SynchronizerConfiguration fastSyncConfig = - SynchronizerConfiguration.builder() - .syncMode(mode) - // TODO: Disable switch from fast to full sync via configuration for now, set pivot to - // realistic value when world state persistence is added. - // .fastSyncPivotDistance(blockCount / 2).build(); - .fastSyncPivotDistance(0) - .build(); + final SynchronizerConfiguration syncConfigAhead = + SynchronizerConfiguration.builder().syncMode(SyncMode.FULL).build(); final MetricsSystem noOpMetricsSystem = new NoOpMetricsSystem(); // Setup state with block data @@ -107,11 +101,12 @@ private void syncFromGenesis(final SyncMode mode) throws Exception { createKeyValueStorageProvider(dbAhead), GenesisConfigFile.mainnet(), MainnetProtocolSchedule.create(), - fastSyncConfig, + syncConfigAhead, new MiningParametersTestBuilder().enabled(false).build(), aheadDbNodeKeys, - noOpMetricsSystem, - PrivacyParameters.noPrivacy())) { + PrivacyParameters.noPrivacy(), + dataDirAhead, + noOpMetricsSystem)) { setupState(blockCount, controller.getProtocolSchedule(), controller.getProtocolContext()); } @@ -121,11 +116,12 @@ private void syncFromGenesis(final SyncMode mode) throws Exception { createKeyValueStorageProvider(dbAhead), GenesisConfigFile.mainnet(), MainnetProtocolSchedule.create(), - fastSyncConfig, + syncConfigAhead, new MiningParametersTestBuilder().enabled(false).build(), aheadDbNodeKeys, - noOpMetricsSystem, - PrivacyParameters.noPrivacy()); + PrivacyParameters.noPrivacy(), + dataDirAhead, + noOpMetricsSystem); final String listenHost = InetAddress.getLoopbackAddress().getHostAddress(); final ExecutorService executorService = Executors.newFixedThreadPool(2); final JsonRpcConfiguration aheadJsonRpcConfiguration = jsonRpcConfiguration(); @@ -155,6 +151,14 @@ private void syncFromGenesis(final SyncMode mode) throws Exception { try { executorService.submit(runnerAhead::execute); + + final SynchronizerConfiguration syncConfigBehind = + SynchronizerConfiguration.builder() + .syncMode(mode) + .fastSyncPivotDistance(5) + .fastSyncMaximumPeerWaitTime(Duration.ofSeconds(5)) + .build(); + final Path dataDirBehind = temp.newFolder().toPath(); final JsonRpcConfiguration behindJsonRpcConfiguration = jsonRpcConfiguration(); final WebSocketConfiguration behindWebSocketConfiguration = wsRpcConfiguration(); final MetricsConfiguration behindMetricsConfiguration = metricsConfiguration(); @@ -165,11 +169,12 @@ private void syncFromGenesis(final SyncMode mode) throws Exception { new InMemoryStorageProvider(), GenesisConfigFile.mainnet(), MainnetProtocolSchedule.create(), - fastSyncConfig, + syncConfigBehind, new MiningParametersTestBuilder().enabled(false).build(), KeyPair.generate(), - noOpMetricsSystem, - PrivacyParameters.noPrivacy()); + PrivacyParameters.noPrivacy(), + dataDirBehind, + noOpMetricsSystem); final Runner runnerBehind = runnerBuilder .pantheonController(controllerBehind) diff --git a/pantheon/src/test/java/tech/pegasys/pantheon/util/BlockImporterTest.java b/pantheon/src/test/java/tech/pegasys/pantheon/util/BlockImporterTest.java index 262f54df61..ed7837b5a0 100644 --- a/pantheon/src/test/java/tech/pegasys/pantheon/util/BlockImporterTest.java +++ b/pantheon/src/test/java/tech/pegasys/pantheon/util/BlockImporterTest.java @@ -45,7 +45,8 @@ public final class BlockImporterTest { @Test public void blockImport() throws IOException { - final Path source = folder.newFile().toPath(); + final Path dataDir = folder.newFolder().toPath(); + final Path source = dataDir.resolve("1000.blocks"); BlockTestUtil.write1000Blocks(source); final PantheonController targetController = PantheonController.fromConfig( @@ -57,7 +58,8 @@ public void blockImport() throws IOException { new MiningParametersTestBuilder().enabled(false).build(), KeyPair.generate(), new NoOpMetricsSystem(), - PrivacyParameters.noPrivacy()); + PrivacyParameters.noPrivacy(), + dataDir); final BlockImporter.ImportResult result = blockImporter.importBlockchain(source, targetController); // Don't count the Genesis block @@ -67,7 +69,8 @@ public void blockImport() throws IOException { @Test public void ibftImport() throws IOException { - final Path source = folder.newFile().toPath(); + final Path dataDir = folder.newFolder().toPath(); + final Path source = dataDir.resolve("ibft.blocks"); final String config = Resources.toString(Resources.getResource("ibftlegacy_genesis.json"), UTF_8); @@ -91,7 +94,8 @@ public void ibftImport() throws IOException { new MiningParametersTestBuilder().enabled(false).build(), KeyPair.generate(), new NoOpMetricsSystem(), - PrivacyParameters.noPrivacy()); + PrivacyParameters.noPrivacy(), + dataDir); final BlockImporter.ImportResult result = blockImporter.importBlockchain(source, controller); // Don't count the Genesis block diff --git a/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/BytesQueue.java b/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/BytesQueue.java new file mode 100644 index 0000000000..6abaaf55e0 --- /dev/null +++ b/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/BytesQueue.java @@ -0,0 +1,17 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.services.queue; + +import tech.pegasys.pantheon.util.bytes.BytesValue; + +public interface BytesQueue extends BigQueue {} diff --git a/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/BytesQueueAdapter.java b/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/BytesQueueAdapter.java new file mode 100644 index 0000000000..b8c41f8fed --- /dev/null +++ b/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/BytesQueueAdapter.java @@ -0,0 +1,55 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.services.queue; + +import tech.pegasys.pantheon.util.bytes.BytesValue; + +import java.io.IOException; +import java.util.function.Function; + +public class BytesQueueAdapter implements BigQueue { + + private final BytesQueue queue; + private final Function serializer; + private final Function deserializer; + + public BytesQueueAdapter( + final BytesQueue queue, + final Function serializer, + final Function deserializer) { + this.queue = queue; + this.serializer = serializer; + this.deserializer = deserializer; + } + + @Override + public void enqueue(final T value) { + queue.enqueue(serializer.apply(value)); + } + + @Override + public T dequeue() { + BytesValue value = queue.dequeue(); + return value == null ? null : deserializer.apply(value); + } + + @Override + public long size() { + return queue.size(); + } + + @Override + public void close() throws IOException { + queue.close(); + } +} diff --git a/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/RocksDbQueue.java b/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/RocksDbQueue.java index e2cb8e3623..e5ec03f820 100644 --- a/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/RocksDbQueue.java +++ b/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/RocksDbQueue.java @@ -29,7 +29,7 @@ import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; -public class RocksDbQueue implements BigQueue { +public class RocksDbQueue implements BytesQueue { private static final Logger LOG = LogManager.getLogger();