Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Pruning Command Line Flags #1869

Merged
merged 19 commits into from
Aug 22, 2019
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,6 @@ public interface StorageProvider extends Closeable {
PrivateStateStorage createPrivateStateStorage();

KeyValueStorage createPruningStorage();

boolean isWorldStateIterable();
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,23 @@ public class KeyValueStorageProvider implements StorageProvider {
private final KeyValueStorage privateTransactionStorage;
private final KeyValueStorage privateStateStorage;
private final KeyValueStorage pruningStorage;
private final boolean isWorldStateIterable;

public KeyValueStorageProvider(
final KeyValueStorage blockchainStorage,
final KeyValueStorage worldStateStorage,
final KeyValueStorage worldStatePreimageStorage,
final KeyValueStorage privateTransactionStorage,
final KeyValueStorage privateStateStorage,
final KeyValueStorage pruningStorage) {
final KeyValueStorage pruningStorage,
final boolean isWorldStateIterable) {
this.blockchainStorage = blockchainStorage;
this.worldStateStorage = worldStateStorage;
this.worldStatePreimageStorage = worldStatePreimageStorage;
this.privateTransactionStorage = privateTransactionStorage;
this.privateStateStorage = privateStateStorage;
this.pruningStorage = pruningStorage;
this.isWorldStateIterable = isWorldStateIterable;
}

@Override
Expand Down Expand Up @@ -81,6 +84,11 @@ public KeyValueStorage createPruningStorage() {
return pruningStorage;
}

@Override
public boolean isWorldStateIterable() {
return isWorldStateIterable;
}

@Override
public void close() throws IOException {
blockchainStorage.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ private static StorageProvider ofUnsegmented(
final KeyValueStorage kv = RocksDbKeyValueStorage.create(rocksDbConfiguration, metricsSystem);
final KeyValueStorage preimageKv =
new LimitedInMemoryKeyValueStorage(worldStatePreimageCacheSize);
return new KeyValueStorageProvider(kv, kv, preimageKv, kv, kv, kv);
return new KeyValueStorageProvider(kv, kv, preimageKv, kv, kv, kv, false);
}

private static StorageProvider ofSegmented(
Expand All @@ -114,7 +114,8 @@ private static StorageProvider ofSegmented(
preimageStorage,
new SegmentedKeyValueStorageAdapter<>(RocksDbSegment.PRIVATE_TRANSACTIONS, columnarStorage),
new SegmentedKeyValueStorageAdapter<>(RocksDbSegment.PRIVATE_STATE, columnarStorage),
new SegmentedKeyValueStorageAdapter<>(RocksDbSegment.PRUNING_STATE, columnarStorage));
new SegmentedKeyValueStorageAdapter<>(RocksDbSegment.PRUNING_STATE, columnarStorage),
true);
}

private enum RocksDbSegment implements Segment {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,29 +30,28 @@ public class Pruner {
private final MarkSweepPruner pruningStrategy;
private final Blockchain blockchain;
private final ExecutorService executorService;
private final long retentionPeriodInBlocks;
private final long blocksRetained;
private final AtomicReference<State> state = new AtomicReference<>(State.IDLE);
private volatile long markBlockNumber = 0;
private volatile BlockHeader markedBlockHeader;
private long transientForkThreshold;
private long blockConfirmations;

public Pruner(
final MarkSweepPruner pruningStrategy,
final Blockchain blockchain,
final ExecutorService executorService,
final long transientForkThreshold,
final long retentionPeriodInBlocks) {
final PruningConfiguration pruningConfiguration) {
this.pruningStrategy = pruningStrategy;
this.executorService = executorService;
this.blockchain = blockchain;
if (transientForkThreshold < 0 || retentionPeriodInBlocks < 0) {
this.blocksRetained = pruningConfiguration.getBlocksRetained();
this.blockConfirmations = pruningConfiguration.getBlockConfirmations();
if (blockConfirmations < 0 || blocksRetained < 0) {
throw new IllegalArgumentException(
String.format(
"TransientForkThreshold and RetentionPeriodInBlocks must be non-negative. transientForkThreshold=%d, retentionPeriodInBlocks=%d",
transientForkThreshold, retentionPeriodInBlocks));
"blockConfirmations and blocksRetained must be non-negative. blockConfirmations=%d, blocksRetained=%d",
blockConfirmations, blocksRetained));
}
this.retentionPeriodInBlocks = retentionPeriodInBlocks;
this.transientForkThreshold = transientForkThreshold;
}

public void start() {
Expand All @@ -70,14 +69,14 @@ private void handleNewBlock(final BlockAddedEvent event) {
}

final long blockNumber = event.getBlock().getHeader().getNumber();
if (state.compareAndSet(State.IDLE, State.TRANSIENT_FORK_OUTLIVING)) {
if (state.compareAndSet(State.IDLE, State.MARK_BLOCK_CONFIRMATIONS_AWAITING)) {
pruningStrategy.prepare();
markBlockNumber = blockNumber;
} else if (blockNumber >= markBlockNumber + transientForkThreshold
&& state.compareAndSet(State.TRANSIENT_FORK_OUTLIVING, State.MARKING)) {
} else if (blockNumber >= markBlockNumber + blockConfirmations
&& state.compareAndSet(State.MARK_BLOCK_CONFIRMATIONS_AWAITING, State.MARKING)) {
markedBlockHeader = blockchain.getBlockHeader(markBlockNumber).get();
mark(markedBlockHeader);
} else if (blockNumber >= markBlockNumber + retentionPeriodInBlocks
} else if (blockNumber >= markBlockNumber + blocksRetained
&& blockchain.blockIsOnCanonicalChain(markedBlockHeader.getHash())
&& state.compareAndSet(State.MARKING_COMPLETE, State.SWEEPING)) {
sweep();
Expand All @@ -99,8 +98,7 @@ private void mark(final BlockHeader header) {
}

private void sweep() {
LOG.info(
"Begin sweeping unused nodes for pruning. Retention period: {}", retentionPeriodInBlocks);
LOG.info("Begin sweeping unused nodes for pruning. Retention period: {}", blocksRetained);
execute(
() -> {
pruningStrategy.sweepBefore(markBlockNumber);
Expand All @@ -119,7 +117,7 @@ private void execute(final Runnable action) {

private enum State {
IDLE,
TRANSIENT_FORK_OUTLIVING,
MARK_BLOCK_CONFIRMATIONS_AWAITING,
MARKING,
MARKING_COMPLETE,
SWEEPING;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.ethereum.worldstate;

public class PruningConfiguration {

private final long blocksRetainedBeforeSweeping;
private final long blockConfirmationsBeforeMarking;

public PruningConfiguration(
final long blockConfirmationsBeforeMarking, final long blocksRetainedBeforeSweeping) {
this.blockConfirmationsBeforeMarking = blockConfirmationsBeforeMarking;
this.blocksRetainedBeforeSweeping = blocksRetainedBeforeSweeping;
}

public long getBlocksRetained() {
return blocksRetainedBeforeSweeping;
}

public long getBlockConfirmations() {
return blockConfirmationsBeforeMarking;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ public KeyValueStorage createPruningStorage() {
return new InMemoryKeyValueStorage();
}

@Override
public boolean isWorldStateIterable() {
return true;
}

@Override
public void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ public void shouldMarkCorrectBlockAndSweep() throws InterruptedException {
final MutableBlockchain blockchain =
DefaultBlockchain.createMutable(genesisBlock, blockchainStorage, metricsSystem);

final Pruner pruner = new Pruner(markSweepPruner, blockchain, mockExecutorService, 0, 0);
final Pruner pruner =
new Pruner(
markSweepPruner, blockchain, mockExecutorService, new PruningConfiguration(0, 0));
pruner.start();

final Block block1 = appendBlockWithParent(blockchain, genesisBlock);
Expand All @@ -74,15 +76,17 @@ public void shouldMarkCorrectBlockAndSweep() throws InterruptedException {
}

@Test
public void shouldOnlySweepAfterTransientForkPeriodAndRetentionPeriodEnds()
public void shouldOnlySweepAfterBlockConfirmationPeriodAndRetentionPeriodEnds()
throws InterruptedException {
final BlockchainStorage blockchainStorage =
new KeyValueStoragePrefixedKeyBlockchainStorage(
new InMemoryKeyValueStorage(), new MainnetBlockHeaderFunctions());
final MutableBlockchain blockchain =
DefaultBlockchain.createMutable(genesisBlock, blockchainStorage, metricsSystem);

final Pruner pruner = new Pruner(markSweepPruner, blockchain, mockExecutorService, 1, 2);
final Pruner pruner =
new Pruner(
markSweepPruner, blockchain, mockExecutorService, new PruningConfiguration(1, 2));
pruner.start();

final Hash markBlockStateRootHash =
Expand All @@ -109,7 +113,9 @@ public void abortsPruningWhenFullyMarkedBlockNoLongerOnCanonicalChain()
DefaultBlockchain.createMutable(genesisBlock, blockchainStorage, metricsSystem);

// start pruner so it can start handling block added events
final Pruner pruner = new Pruner(markSweepPruner, blockchain, mockExecutorService, 0, 1);
final Pruner pruner =
new Pruner(
markSweepPruner, blockchain, mockExecutorService, new PruningConfiguration(0, 1));
pruner.start();

/*
Expand Down Expand Up @@ -143,7 +149,13 @@ public void abortsPruningWhenFullyMarkedBlockNoLongerOnCanonicalChain()
@Test
public void shouldRejectInvalidArguments() {
final Blockchain mockchain = mock(Blockchain.class);
assertThatThrownBy(() -> new Pruner(markSweepPruner, mockchain, mockExecutorService, -1, -2))
assertThatThrownBy(
() ->
new Pruner(
markSweepPruner,
mockchain,
mockExecutorService,
new PruningConfiguration(-1, -2)))
.isInstanceOf(IllegalArgumentException.class);
}

Expand All @@ -155,7 +167,9 @@ public void shouldCleanUpPruningStrategyOnShutdown() throws InterruptedException
final MutableBlockchain blockchain =
DefaultBlockchain.createMutable(genesisBlock, blockchainStorage, metricsSystem);

final Pruner pruner = new Pruner(markSweepPruner, blockchain, mockExecutorService, 0, 0);
final Pruner pruner =
new Pruner(
markSweepPruner, blockchain, mockExecutorService, new PruningConfiguration(0, 0));
pruner.start();
pruner.stop();
verify(markSweepPruner).cleanup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ public class DefaultSynchronizer<C> implements Synchronizer {

private static final Logger LOG = LogManager.getLogger();

private static final boolean PRUNING_ENABLED = false;
private final Pruner pruner;
private final Optional<Pruner> maybePruner;
private final SyncState syncState;
private final AtomicBoolean running = new AtomicBoolean(false);
private final Subscribers<SyncStatusListener> syncStatusListeners = Subscribers.create();
Expand All @@ -60,13 +59,13 @@ public DefaultSynchronizer(
final ProtocolContext<C> protocolContext,
final WorldStateStorage worldStateStorage,
final BlockBroadcaster blockBroadcaster,
final Pruner pruner,
final Optional<Pruner> maybePruner,
final EthContext ethContext,
final SyncState syncState,
final Path dataDirectory,
final Clock clock,
final MetricsSystem metricsSystem) {
this.pruner = pruner;
this.maybePruner = maybePruner;
this.syncState = syncState;

ChainHeadTracker.trackChainHeadForPeers(
Expand Down Expand Up @@ -169,9 +168,7 @@ private void handleFastSyncResult(final FastSyncState result, final Throwable er

private void startFullSync() {
fullSyncDownloader.start();
if (PRUNING_ENABLED) {
pruner.start();
}
maybePruner.ifPresent(Pruner::start);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public interface DefaultCommandValues {
String MANDATORY_NETWORK_FORMAT_HELP = "<NETWORK>";
String MANDATORY_NODE_ID_FORMAT_HELP = "<NODEID>";
Wei DEFAULT_MIN_TRANSACTION_GAS_PRICE = Wei.of(1000);
long DEFAULT_PRUNING_BLOCKS_RETAINED = 1024;
long DEFAULT_PRUNING_BLOCK_CONFIRMATIONS = 10;
BytesValue DEFAULT_EXTRA_DATA = BytesValue.EMPTY;
long DEFAULT_MAX_REFRESH_DELAY = 3600000;
long DEFAULT_MIN_REFRESH_DELAY = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import tech.pegasys.pantheon.ethereum.permissioning.PermissioningConfiguration;
import tech.pegasys.pantheon.ethereum.permissioning.PermissioningConfigurationBuilder;
import tech.pegasys.pantheon.ethereum.permissioning.SmartContractPermissioningConfiguration;
import tech.pegasys.pantheon.ethereum.worldstate.PruningConfiguration;
import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.metrics.PantheonMetricCategory;
Expand Down Expand Up @@ -547,6 +548,29 @@ void setBannedNodeIds(final List<String> values) {
arity = "1")
private final BytesValue extraData = DEFAULT_EXTRA_DATA;

@Option(
names = {"--pruning-enabled"},
hidden = true,
description =
"Enable pruning of world state of blocks older than the retention period (default: ${DEFAULT-VALUE})")
private final Boolean isPruningEnabled = false;

@Option(
names = {"--pruning-blocks-retained"},
hidden = true,
description =
"Number of recent blocks for which to keep entire world state (default: ${DEFAULT-VALUE})",
arity = "1")
private final Long pruningBlocksRetained = DEFAULT_PRUNING_BLOCKS_RETAINED;

@Option(
names = {"--pruning-block-confirmations"},
hidden = true,
description =
"Number of confirmations on a block before marking begins (default: ${DEFAULT-VALUE})",
arity = "1")
private final Long pruningBlockConfirmations = DEFAULT_PRUNING_BLOCK_CONFIRMATIONS;

@Option(
names = {"--permissions-nodes-config-file-enabled"},
description = "Enable node level permissions (default: ${DEFAULT-VALUE})")
Expand Down Expand Up @@ -829,6 +853,13 @@ private PantheonCommand checkOptions() {
!SyncMode.FAST.equals(syncMode),
singletonList("--fast-sync-min-peers"));

checkOptionDependencies(
logger,
commandLine,
"--pruning-enabled",
!isPruningEnabled,
asList("--pruning-block-confirmations", "--pruning-blocks-retained"));

// noinspection ConstantConditions
if (isMiningEnabled && coinbase == null) {
throw new ParameterException(
Expand Down Expand Up @@ -911,7 +942,9 @@ public PantheonControllerBuilder<?> getControllerBuilder() {
.metricsSystem(metricsSystem.get())
.privacyParameters(privacyParameters())
.clock(Clock.systemUTC())
.isRevertReasonEnabled(isRevertReasonEnabled);
.isRevertReasonEnabled(isRevertReasonEnabled)
.isPruningEnabled(isPruningEnabled)
.pruningConfiguration(buildPruningConfiguration());
} catch (IOException e) {
throw new ExecutionException(this.commandLine, "Invalid path", e);
}
Expand Down Expand Up @@ -1181,6 +1214,10 @@ private TransactionPoolConfiguration buildTransactionPoolConfiguration() {
.build();
}

private PruningConfiguration buildPruningConfiguration() {
return new PruningConfiguration(pruningBlockConfirmations, pruningBlocksRetained);
}

// Blockchain synchronisation from peers.
private void synchronize(
final PantheonController<?> controller,
Expand Down
Loading