Skip to content

Commit

Permalink
Always enforce promotion filter for transactions in the prioritized l…
Browse files Browse the repository at this point in the history
…ayer (hyperledger#5921)

Signed-off-by: Fabio Di Fabio <[email protected]>
Co-authored-by: Sally MacFarlane <[email protected]>
Signed-off-by: Justin Florentine <[email protected]>
  • Loading branch information
2 people authored and jflo committed Nov 10, 2023
1 parent 5c533b7 commit d1e29a1
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
import java.util.function.Predicate;
import java.util.stream.Stream;

/**
* Holds the current set of executable pending transactions, that are candidate for inclusion on
* next block. The pending transactions are kept sorted by paid fee descending.
*/
public abstract class AbstractPrioritizedTransactions extends AbstractSequentialTransactionsLayer {
protected final TreeSet<PendingTransaction> orderByFee;

Expand Down Expand Up @@ -78,6 +82,12 @@ protected void internalReplaced(final PendingTransaction replacedTx) {
}

private boolean hasPriority(final PendingTransaction pendingTransaction) {
// if it does not pass the promotion filter, then has not priority
if (!promotionFilter(pendingTransaction)) {
return false;
}

// if there is space add it, otherwise check if it has more value than the last one
if (orderByFee.size() < poolConfig.getMaxPrioritizedTransactions()) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ private TransactionAddedResult addToNextLayer(
distance);
}

private TransactionAddedResult addToNextLayer(
protected TransactionAddedResult addToNextLayer(
final NavigableMap<Long, PendingTransaction> senderTxs,
final PendingTransaction pendingTransaction,
final int distance) {
Expand Down Expand Up @@ -304,7 +304,7 @@ private void evict(final long spaceToFree, final int txsToEvict) {
while ((evictedSize < spaceToFree || txsToEvict > evictedCount)
&& !lessReadySenderTxs.isEmpty()) {
lastTx = lessReadySenderTxs.pollLastEntry().getValue();
processEvict(lessReadySenderTxs, lastTx);
processEvict(lessReadySenderTxs, lastTx, EVICTED);
++evictedCount;
evictedSize += lastTx.memorySize();
// evicted can always be added to the next layer
Expand Down Expand Up @@ -371,11 +371,13 @@ protected PendingTransaction processRemove(
}

protected PendingTransaction processEvict(
final NavigableMap<Long, PendingTransaction> senderTxs, final PendingTransaction evictedTx) {
final NavigableMap<Long, PendingTransaction> senderTxs,
final PendingTransaction evictedTx,
final RemovalReason reason) {
final PendingTransaction removedTx = pendingTransactions.remove(evictedTx.getHash());
if (removedTx != null) {
decreaseSpaceUsed(evictedTx);
metrics.incrementRemoved(evictedTx.isReceivedFromLocalSource(), EVICTED.label(), name());
metrics.incrementRemoved(evictedTx.isReceivedFromLocalSource(), reason.label(), name());
internalEvict(senderTxs, removedTx);
}
return removedTx;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
*/
package org.hyperledger.besu.ethereum.eth.transactions.layered;

import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.BELOW_BASE_FEE;

import org.hyperledger.besu.datatypes.Wei;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Transaction;
Expand All @@ -27,17 +29,10 @@
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Holds the current set of pending transactions with the ability to iterate them based on priority
* for mining or look-up by hash.
*
* <p>This class is safe for use across multiple threads.
*/
public class BaseFeePrioritizedTransactions extends AbstractPrioritizedTransactions {

private static final Logger LOG = LoggerFactory.getLogger(BaseFeePrioritizedTransactions.class);
Expand Down Expand Up @@ -69,6 +64,15 @@ protected int compareByFee(final PendingTransaction pt1, final PendingTransactio
.compare(pt1, pt2);
}

/**
* On base fee markets when a new block is added we can calculate the base fee for the next block
* and use it to keep only pending transactions willing to pay at least that fee in the
* prioritized layer, since only these transactions are executable, while all the other can be
* demoted to the next layer.
*
* @param blockHeader the header of the added block
* @param feeMarket the fee market
*/
@Override
protected void internalBlockAdded(final BlockHeader blockHeader, final FeeMarket feeMarket) {
final Wei newNextBlockBaseFee = calculateNextBlockBaseFee(feeMarket, blockHeader);
Expand All @@ -81,7 +85,48 @@ protected void internalBlockAdded(final BlockHeader blockHeader, final FeeMarket

nextBlockBaseFee = Optional.of(newNextBlockBaseFee);
orderByFee.clear();
orderByFee.addAll(pendingTransactions.values());

final var itTxsBySender = txsBySender.entrySet().iterator();
while (itTxsBySender.hasNext()) {
final var senderTxs = itTxsBySender.next().getValue();

Optional<Long> maybeFirstUnderpricedNonce = Optional.empty();

for (final var e : senderTxs.entrySet()) {
final PendingTransaction tx = e.getValue();
// it must pass the promotion filter to be prioritized
if (promotionFilter(tx)) {
orderByFee.add(tx);
} else {
// otherwise sender txs starting from this nonce need to be demoted to next layer,
// and we can go to next sender
maybeFirstUnderpricedNonce = Optional.of(e.getKey());
break;
}
}

maybeFirstUnderpricedNonce.ifPresent(
nonce -> {
// demote all txs after the first underpriced to the next layer, because none of them is
// executable now, and we can avoid sorting them until they are candidate for execution
// again
final var demoteTxs = senderTxs.tailMap(nonce, true);
while (!demoteTxs.isEmpty()) {
final PendingTransaction demoteTx = demoteTxs.pollLastEntry().getValue();
LOG.atTrace()
.setMessage("Demoting tx {} with max gas price below next block base fee {}")
.addArgument(demoteTx::toTraceLog)
.addArgument(newNextBlockBaseFee::toHumanReadableString)
.log();
processEvict(senderTxs, demoteTx, BELOW_BASE_FEE);
addToNextLayer(senderTxs, demoteTx, 0);
}
});

if (senderTxs.isEmpty()) {
itTxsBySender.remove();
}
}
}

private Wei calculateNextBlockBaseFee(final FeeMarket feeMarket, final BlockHeader blockHeader) {
Expand All @@ -101,10 +146,7 @@ protected boolean promotionFilter(final PendingTransaction pendingTransaction) {
return nextBlockBaseFee
.map(
baseFee ->
pendingTransaction
.getTransaction()
.getEffectiveGasPrice(nextBlockBaseFee)
.greaterOrEqualThan(baseFee))
pendingTransaction.getTransaction().getMaxGasPrice().greaterOrEqualThan(baseFee))
.orElse(false);
}

Expand All @@ -115,13 +157,6 @@ protected String internalLogStats() {
return "Basefee Prioritized: Empty";
}

final var baseFeePartition =
stream()
.map(PendingTransaction::getTransaction)
.collect(
Collectors.partitioningBy(
tx -> tx.getMaxGasPrice().greaterOrEqualThan(nextBlockBaseFee.get()),
Collectors.counting()));
final Transaction highest = orderByFee.last().getTransaction();
final Transaction lowest = orderByFee.first().getTransaction();

Expand All @@ -145,10 +180,6 @@ protected String internalLogStats() {
+ ", hash: "
+ lowest.getHash()
+ "], next block base fee: "
+ nextBlockBaseFee.get().toHumanReadableString()
+ ", above next base fee: "
+ baseFeePartition.get(true)
+ ", below next base fee: "
+ baseFeePartition.get(false);
+ nextBlockBaseFee.get().toHumanReadableString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ enum RemovalReason {
INVALIDATED,
PROMOTED,
REPLACED,
RECONCILED;
RECONCILED,
BELOW_BASE_FEE;

private final String label;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class LayeredPendingTransactionsTest extends BaseTransactionPoolTest {

protected static final int MAX_TRANSACTIONS = 5;
protected static final int MAX_CAPACITY_BYTES = 10_000;
protected static final Wei DEFAULT_BASE_FEE = Wei.of(100);
protected static final int LIMITED_TRANSACTIONS_BY_SENDER = 4;
protected static final String REMOTE = "remote";
protected static final String LOCAL = "local";
Expand Down Expand Up @@ -96,7 +97,7 @@ public class LayeredPendingTransactionsTest extends BaseTransactionPoolTest {

private static BlockHeader mockBlockHeader() {
final BlockHeader blockHeader = mock(BlockHeader.class);
when(blockHeader.getBaseFee()).thenReturn(Optional.of(Wei.of(100)));
when(blockHeader.getBaseFee()).thenReturn(Optional.of(DEFAULT_BASE_FEE));
return blockHeader;
}

Expand Down Expand Up @@ -374,10 +375,10 @@ public void selectTransactionsFromSameSenderInNonceOrder() {
@MethodSource
public void ignoreSenderTransactionsAfterASkippedOne(
final TransactionSelectionResult skipSelectionResult) {
final Transaction transaction0a = createTransaction(0, Wei.of(20), KEYS1);
final Transaction transaction1a = createTransaction(1, Wei.of(20), KEYS1);
final Transaction transaction2a = createTransaction(2, Wei.of(20), KEYS1);
final Transaction transaction0b = createTransaction(0, Wei.of(10), KEYS2);
final Transaction transaction0a = createTransaction(0, DEFAULT_BASE_FEE.add(Wei.of(20)), KEYS1);
final Transaction transaction1a = createTransaction(1, DEFAULT_BASE_FEE.add(Wei.of(20)), KEYS1);
final Transaction transaction2a = createTransaction(2, DEFAULT_BASE_FEE.add(Wei.of(20)), KEYS1);
final Transaction transaction0b = createTransaction(0, DEFAULT_BASE_FEE.add(Wei.of(10)), KEYS2);

pendingTransactions.addLocalTransaction(transaction0a, Optional.empty());
pendingTransactions.addLocalTransaction(transaction1a, Optional.empty());
Expand Down Expand Up @@ -411,7 +412,7 @@ public void notForceNonceOrderWhenSendersDiffer() {
final Account sender2 = mock(Account.class);
when(sender2.getNonce()).thenReturn(1L);

final Transaction transactionSender1 = createTransaction(0, Wei.of(10), KEYS1);
final Transaction transactionSender1 = createTransaction(0, Wei.of(100), KEYS1);
final Transaction transactionSender2 = createTransaction(1, Wei.of(200), KEYS2);

pendingTransactions.addLocalTransaction(transactionSender1, Optional.empty());
Expand Down Expand Up @@ -473,9 +474,9 @@ public void returnEmptyOptionalAsMaximumNonceWhenNoTransactionsPresent() {

@Test
public void replaceTransactionWithSameSenderAndNonce() {
final Transaction transaction1 = createTransaction(0, Wei.of(20), KEYS1);
final Transaction transaction1 = createTransaction(0, Wei.of(200), KEYS1);
final Transaction transaction1b = createTransactionReplacement(transaction1, KEYS1);
final Transaction transaction2 = createTransaction(1, Wei.of(10), KEYS1);
final Transaction transaction2 = createTransaction(1, Wei.of(100), KEYS1);
assertThat(pendingTransactions.addRemoteTransaction(transaction1, Optional.empty()))
.isEqualTo(ADDED);
assertThat(pendingTransactions.addRemoteTransaction(transaction2, Optional.empty()))
Expand All @@ -499,14 +500,14 @@ public void replaceTransactionWithSameSenderAndNonce() {
public void replaceTransactionWithSameSenderAndNonce_multipleReplacements() {
final int replacedTxCount = 5;
final List<Transaction> replacedTransactions = new ArrayList<>(replacedTxCount);
Transaction duplicateTx = createTransaction(0, Wei.of(50), KEYS1);
Transaction duplicateTx = createTransaction(0, DEFAULT_BASE_FEE.add(Wei.of(50)), KEYS1);
for (int i = 0; i < replacedTxCount; i++) {
replacedTransactions.add(duplicateTx);
pendingTransactions.addRemoteTransaction(duplicateTx, Optional.empty());
duplicateTx = createTransactionReplacement(duplicateTx, KEYS1);
}

final Transaction independentTx = createTransaction(1, Wei.ONE, KEYS1);
final Transaction independentTx = createTransaction(1, DEFAULT_BASE_FEE.add(Wei.ONE), KEYS1);
assertThat(pendingTransactions.addRemoteTransaction(independentTx, Optional.empty()))
.isEqualTo(ADDED);
assertThat(
Expand Down

0 comments on commit d1e29a1

Please sign in to comment.