Skip to content

Commit

Permalink
add fallback for parallelization (#8084)
Browse files Browse the repository at this point in the history
Signed-off-by: Karim Taam <[email protected]>
  • Loading branch information
matkt authored Jan 10, 2025
1 parent 8cddcfd commit 85f85da
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 58 deletions.
6 changes: 2 additions & 4 deletions besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -1816,10 +1816,8 @@ public BesuControllerBuilder setupControllerBuilder() {
if (DataStorageFormat.BONSAI.equals(getDataStorageConfiguration().getDataStorageFormat())) {
final DiffBasedSubStorageConfiguration subStorageConfiguration =
getDataStorageConfiguration().getDiffBasedSubStorageConfiguration();
if (subStorageConfiguration.getLimitTrieLogsEnabled()) {
besuControllerBuilder.isParallelTxProcessingEnabled(
subStorageConfiguration.getUnstable().isParallelTxProcessingEnabled());
}
besuControllerBuilder.isParallelTxProcessingEnabled(
subStorageConfiguration.getUnstable().isParallelTxProcessingEnabled());
}
return besuControllerBuilder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.core.TransactionReceipt;
import org.hyperledger.besu.ethereum.core.Withdrawal;
import org.hyperledger.besu.ethereum.mainnet.AbstractBlockProcessor.PreprocessingFunction.NoPreprocessing;
import org.hyperledger.besu.ethereum.mainnet.requests.ProcessRequestContext;
import org.hyperledger.besu.ethereum.mainnet.requests.RequestProcessorCoordinator;
import org.hyperledger.besu.ethereum.privacy.storage.PrivateMetadataUpdater;
Expand Down Expand Up @@ -99,6 +100,26 @@ public BlockProcessingResult processBlock(
final List<BlockHeader> ommers,
final Optional<List<Withdrawal>> maybeWithdrawals,
final PrivateMetadataUpdater privateMetadataUpdater) {
return processBlock(
blockchain,
worldState,
blockHeader,
transactions,
ommers,
maybeWithdrawals,
privateMetadataUpdater,
new NoPreprocessing());
}

protected BlockProcessingResult processBlock(
final Blockchain blockchain,
final MutableWorldState worldState,
final BlockHeader blockHeader,
final List<Transaction> transactions,
final List<BlockHeader> ommers,
final Optional<List<Withdrawal>> maybeWithdrawals,
final PrivateMetadataUpdater privateMetadataUpdater,
final PreprocessingFunction preprocessingBlockFunction) {
final List<TransactionReceipt> receipts = new ArrayList<>();
long currentGasUsed = 0;
long currentBlobGasUsed = 0;
Expand All @@ -125,7 +146,7 @@ public BlockProcessingResult processBlock(
.orElse(Wei.ZERO);

final Optional<PreprocessingContext> preProcessingContext =
runBlockPreProcessing(
preprocessingBlockFunction.run(
worldState,
privateMetadataUpdater,
blockHeader,
Expand Down Expand Up @@ -255,17 +276,6 @@ public BlockProcessingResult processBlock(
parallelizedTxFound ? Optional.of(nbParallelTx) : Optional.empty());
}

protected Optional<PreprocessingContext> runBlockPreProcessing(
final MutableWorldState worldState,
final PrivateMetadataUpdater privateMetadataUpdater,
final BlockHeader blockHeader,
final List<Transaction> transactions,
final Address miningBeneficiary,
final BlockHashLookup blockHashLookup,
final Wei blobGasPrice) {
return Optional.empty();
}

protected TransactionProcessingResult getTransactionProcessingResult(
final Optional<PreprocessingContext> preProcessingContext,
final MutableWorldState worldState,
Expand Down Expand Up @@ -318,5 +328,30 @@ abstract boolean rewardCoinbase(
final boolean skipZeroBlockRewards);

public interface PreprocessingContext {}
;

public interface PreprocessingFunction {
Optional<PreprocessingContext> run(
final MutableWorldState worldState,
final PrivateMetadataUpdater privateMetadataUpdater,
final BlockHeader blockHeader,
final List<Transaction> transactions,
final Address miningBeneficiary,
final BlockHashLookup blockHashLookup,
final Wei blobGasPrice);

class NoPreprocessing implements PreprocessingFunction {

@Override
public Optional<PreprocessingContext> run(
final MutableWorldState worldState,
final PrivateMetadataUpdater privateMetadataUpdater,
final BlockHeader blockHeader,
final List<Transaction> transactions,
final Address miningBeneficiary,
final BlockHashLookup blockHashLookup,
final Wei blobGasPrice) {
return Optional.empty();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@

import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.datatypes.Wei;
import org.hyperledger.besu.ethereum.BlockProcessingResult;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.MutableWorldState;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.core.Withdrawal;
import org.hyperledger.besu.ethereum.mainnet.AbstractBlockProcessor.PreprocessingFunction.NoPreprocessing;
import org.hyperledger.besu.ethereum.mainnet.BlockProcessor;
import org.hyperledger.besu.ethereum.mainnet.MainnetBlockProcessor;
import org.hyperledger.besu.ethereum.mainnet.MainnetTransactionProcessor;
Expand All @@ -37,8 +41,13 @@
import java.util.List;
import java.util.Optional;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MainnetParallelBlockProcessor extends MainnetBlockProcessor {

private static final Logger LOG = LoggerFactory.getLogger(MainnetParallelBlockProcessor.class);

private final Optional<MetricsSystem> metricsSystem;
private final Optional<Counter> confirmedParallelizedTransactionCounter;
private final Optional<Counter> conflictingButCachedTransactionCounter;
Expand Down Expand Up @@ -78,34 +87,6 @@ public MainnetParallelBlockProcessor(
"Counter for the number of conflicted transactions during block processing"));
}

@Override
protected Optional<PreprocessingContext> runBlockPreProcessing(
final MutableWorldState worldState,
final PrivateMetadataUpdater privateMetadataUpdater,
final BlockHeader blockHeader,
final List<Transaction> transactions,
final Address miningBeneficiary,
final BlockHashLookup blockHashLookup,
final Wei blobGasPrice) {
if ((worldState instanceof DiffBasedWorldState)) {
ParallelizedConcurrentTransactionProcessor parallelizedConcurrentTransactionProcessor =
new ParallelizedConcurrentTransactionProcessor(transactionProcessor);
// runAsyncBlock, if activated, facilitates the non-blocking parallel execution of
// transactions in the background through an optimistic strategy.
parallelizedConcurrentTransactionProcessor.runAsyncBlock(
worldState,
blockHeader,
transactions,
miningBeneficiary,
blockHashLookup,
blobGasPrice,
privateMetadataUpdater);
return Optional.of(
new ParallelizedPreProcessingContext(parallelizedConcurrentTransactionProcessor));
}
return Optional.empty();
}

@Override
protected TransactionProcessingResult getTransactionProcessingResult(
final Optional<PreprocessingContext> preProcessingContext,
Expand All @@ -126,7 +107,7 @@ protected TransactionProcessingResult getTransactionProcessingResult(
(ParallelizedPreProcessingContext) preProcessingContext.get();
transactionProcessingResult =
parallelizedPreProcessingContext
.getParallelizedConcurrentTransactionProcessor()
.parallelizedConcurrentTransactionProcessor()
.applyParallelizedTransactionResult(
worldState,
miningBeneficiary,
Expand Down Expand Up @@ -154,21 +135,48 @@ protected TransactionProcessingResult getTransactionProcessingResult(
}
}

static class ParallelizedPreProcessingContext implements PreprocessingContext {
final ParallelizedConcurrentTransactionProcessor parallelizedConcurrentTransactionProcessor;

public ParallelizedPreProcessingContext(
final ParallelizedConcurrentTransactionProcessor
parallelizedConcurrentTransactionProcessor) {
this.parallelizedConcurrentTransactionProcessor = parallelizedConcurrentTransactionProcessor;
}

public ParallelizedConcurrentTransactionProcessor
getParallelizedConcurrentTransactionProcessor() {
return parallelizedConcurrentTransactionProcessor;
@Override
public BlockProcessingResult processBlock(
final Blockchain blockchain,
final MutableWorldState worldState,
final BlockHeader blockHeader,
final List<Transaction> transactions,
final List<BlockHeader> ommers,
final Optional<List<Withdrawal>> maybeWithdrawals,
final PrivateMetadataUpdater privateMetadataUpdater) {
final BlockProcessingResult blockProcessingResult =
super.processBlock(
blockchain,
worldState,
blockHeader,
transactions,
ommers,
maybeWithdrawals,
privateMetadataUpdater,
new ParallelTransactionPreprocessing());
if (blockProcessingResult.isFailed()) {
// Fallback to non-parallel processing if there is a block processing exception .
LOG.info(
"Parallel transaction processing failure. Falling back to non-parallel processing for block #{} ({})",
blockHeader.getNumber(),
blockHeader.getBlockHash());
return super.processBlock(
blockchain,
worldState,
blockHeader,
transactions,
ommers,
maybeWithdrawals,
privateMetadataUpdater,
new NoPreprocessing());
}
return blockProcessingResult;
}

record ParallelizedPreProcessingContext(
ParallelizedConcurrentTransactionProcessor parallelizedConcurrentTransactionProcessor)
implements PreprocessingContext {}

public static class ParallelBlockProcessorBuilder
implements ProtocolSpecBuilder.BlockProcessorBuilder {

Expand Down Expand Up @@ -196,4 +204,35 @@ public BlockProcessor apply(
metricsSystem);
}
}

class ParallelTransactionPreprocessing implements PreprocessingFunction {

@Override
public Optional<PreprocessingContext> run(
final MutableWorldState worldState,
final PrivateMetadataUpdater privateMetadataUpdater,
final BlockHeader blockHeader,
final List<Transaction> transactions,
final Address miningBeneficiary,
final BlockHashLookup blockHashLookup,
final Wei blobGasPrice) {
if ((worldState instanceof DiffBasedWorldState)) {
ParallelizedConcurrentTransactionProcessor parallelizedConcurrentTransactionProcessor =
new ParallelizedConcurrentTransactionProcessor(transactionProcessor);
// When enabled, runAsyncBlock performs non-conflicting parallel execution of transactions
// in the background using an optimistic approach.
parallelizedConcurrentTransactionProcessor.runAsyncBlock(
worldState,
blockHeader,
transactions,
miningBeneficiary,
blockHashLookup,
blobGasPrice,
privateMetadataUpdater);
return Optional.of(
new ParallelizedPreProcessingContext(parallelizedConcurrentTransactionProcessor));
}
return Optional.empty();
}
}
}

0 comments on commit 85f85da

Please sign in to comment.