Skip to content

Commit

Permalink
Improve performance when promoting transaction from next layers (#5920)
Browse files Browse the repository at this point in the history
Signed-off-by: Fabio Di Fabio <[email protected]>
Co-authored-by: Sally MacFarlane <[email protected]>
  • Loading branch information
fab-10 and macfarla authored Sep 27, 2023
1 parent b9f005e commit 5d344ad
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics;

import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
Expand Down Expand Up @@ -104,8 +105,11 @@ protected void internalRemove(
}

@Override
public PendingTransaction promote(final Predicate<PendingTransaction> promotionFilter) {
return null;
public List<PendingTransaction> promote(
final Predicate<PendingTransaction> promotionFilter,
final long freeSpace,
final int freeSlots) {
return List.of();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,22 +398,20 @@ public final void blockAdded(
nextLayer.blockAdded(feeMarket, blockHeader, maxConfirmedNonceBySender);
maxConfirmedNonceBySender.forEach(this::confirmed);
internalBlockAdded(blockHeader, feeMarket);
promoteTransactions();
}

protected abstract void internalBlockAdded(
final BlockHeader blockHeader, final FeeMarket feeMarket);

final void promoteTransactions() {
int freeSlots = maxTransactionsNumber() - pendingTransactions.size();
final int freeSlots = maxTransactionsNumber() - pendingTransactions.size();
final long freeSpace = cacheFreeSpace();

while (cacheFreeSpace() > 0 && freeSlots > 0) {
final var promotedTx = nextLayer.promote(this::promotionFilter);
if (promotedTx != null) {
processAdded(promotedTx);
--freeSlots;
} else {
break;
}
if (freeSlots > 0 && freeSpace > 0) {
nextLayer
.promote(this::promotionFilter, cacheFreeSpace(), freeSlots)
.forEach(this::processAdded);
}
}

Expand Down Expand Up @@ -444,8 +442,6 @@ private void confirmed(final Address sender, final long maxConfirmedNonce) {
internalConfirmed(senderTxs, sender, maxConfirmedNonce, highestNonceRemovedTx);
}
}

promoteTransactions();
}

protected abstract void internalConfirmed(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,11 @@ public OptionalLong getCurrentNonceFor(final Address sender) {
}

@Override
public PendingTransaction promote(final Predicate<PendingTransaction> promotionFilter) {
return null;
public List<PendingTransaction> promote(
final Predicate<PendingTransaction> promotionFilter,
final long freeSpace,
final int freeSlots) {
return List.of();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics;
import org.hyperledger.besu.ethereum.mainnet.feemarket.FeeMarket;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
Expand Down Expand Up @@ -139,30 +141,51 @@ public Stream<PendingTransaction> stream() {
}

@Override
public PendingTransaction promote(final Predicate<PendingTransaction> promotionFilter) {

final var maybePromotedTx =
orderByMaxFee.descendingSet().stream()
.filter(candidateTx -> promotionFilter.test(candidateTx))
.findFirst();

return maybePromotedTx
.map(
promotedTx -> {
final var senderTxs = txsBySender.get(promotedTx.getSender());
// we always promote the first tx of a sender, so remove the first entry
senderTxs.pollFirstEntry();
processRemove(senderTxs, promotedTx.getTransaction(), PROMOTED);

// now that we have space, promote from the next layer
promoteTransactions();

if (senderTxs.isEmpty()) {
txsBySender.remove(promotedTx.getSender());
}
return promotedTx;
})
.orElse(null);
public List<PendingTransaction> promote(
final Predicate<PendingTransaction> promotionFilter,
final long freeSpace,
final int freeSlots) {
long accumulatedSpace = 0;
final List<PendingTransaction> promotedTxs = new ArrayList<>();

// first find all txs that can be promoted
search:
for (final var senderFirstTx : orderByMaxFee.descendingSet()) {
final var senderTxs = txsBySender.get(senderFirstTx.getSender());
for (final var candidateTx : senderTxs.values()) {
if (promotionFilter.test(candidateTx)) {
accumulatedSpace += candidateTx.memorySize();
if (promotedTxs.size() < freeSlots && accumulatedSpace <= freeSpace) {
promotedTxs.add(candidateTx);
} else {
// no room for more txs the search is over exit the loops
break search;
}
} else {
// skip remaining txs for this sender to avoid gaps
break;
}
}
}

// then remove promoted txs from this layer
promotedTxs.forEach(
promotedTx -> {
final var sender = promotedTx.getSender();
final var senderTxs = txsBySender.get(sender);
senderTxs.remove(promotedTx.getNonce());
processRemove(senderTxs, promotedTx.getTransaction(), PROMOTED);
if (senderTxs.isEmpty()) {
txsBySender.remove(sender);
}
});

if (!promotedTxs.isEmpty()) {
// since we removed some txs we can try to promote from next layer
promoteTransactions();
}

return promotedTxs;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.hyperledger.besu.ethereum.mainnet.feemarket.FeeMarket;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -126,36 +125,86 @@ protected void internalReplaced(final PendingTransaction replacedTx) {
@Override
protected void internalBlockAdded(final BlockHeader blockHeader, final FeeMarket feeMarket) {}

/**
* We only want to promote transactions that have gap == 0, so there will be no gap in the prev
* layers. A promoted transaction is removed from this layer, and the gap data is updated for its
* sender.
*
* @param promotionFilter the prev layer's promotion filter
* @param freeSpace max amount of memory promoted txs can occupy
* @param freeSlots max number of promoted txs
* @return a list of transactions promoted to the prev layer
*/
@Override
public PendingTransaction promote(final Predicate<PendingTransaction> promotionFilter) {
final PendingTransaction promotedTx =
orderByGap.get(0).stream()
.map(txsBySender::get)
.map(NavigableMap::values)
.flatMap(Collection::stream)
.filter(promotionFilter)
.findFirst()
.orElse(null);

if (promotedTx != null) {
final Address sender = promotedTx.getSender();
final var senderTxs = txsBySender.get(sender);
senderTxs.pollFirstEntry();
processRemove(senderTxs, promotedTx.getTransaction(), PROMOTED);
if (senderTxs.isEmpty()) {
txsBySender.remove(sender);
orderByGap.get(0).remove(sender);
gapBySender.remove(sender);
} else {
final long firstNonce = senderTxs.firstKey();
final int newGap = (int) (firstNonce - (promotedTx.getNonce() + 1));
if (newGap != 0) {
updateGap(sender, 0, newGap);
public List<PendingTransaction> promote(
final Predicate<PendingTransaction> promotionFilter,
final long freeSpace,
final int freeSlots) {
long accumulatedSpace = 0;
final List<PendingTransaction> promotedTxs = new ArrayList<>();

final var zeroGapSenders = orderByGap.get(0);

search:
for (final var sender : zeroGapSenders) {
final var senderSeqTxs = getSequentialSubset(txsBySender.get(sender));

for (final var candidateTx : senderSeqTxs.values()) {

if (promotionFilter.test(candidateTx)) {
accumulatedSpace += candidateTx.memorySize();
if (promotedTxs.size() < freeSlots && accumulatedSpace <= freeSpace) {
promotedTxs.add(candidateTx);
} else {
// no room for more txs the search is over exit the loops
break search;
}
} else {
// skip remaining txs for this sender
break;
}
}
}

return promotedTx;
// remove promoted txs from this layer
promotedTxs.forEach(
promotedTx -> {
final var sender = promotedTx.getSender();
final var senderTxs = txsBySender.get(sender);
senderTxs.remove(promotedTx.getNonce());
processRemove(senderTxs, promotedTx.getTransaction(), PROMOTED);
if (senderTxs.isEmpty()) {
txsBySender.remove(sender);
orderByGap.get(0).remove(sender);
gapBySender.remove(sender);
} else {
final long firstNonce = senderTxs.firstKey();
final int newGap = (int) (firstNonce - (promotedTx.getNonce() + 1));
if (newGap != 0) {
updateGap(sender, 0, newGap);
}
}
});

if (!promotedTxs.isEmpty()) {
// since we removed some txs we can try to promote from next layer
promoteTransactions();
}

return promotedTxs;
}

private NavigableMap<Long, PendingTransaction> getSequentialSubset(
final NavigableMap<Long, PendingTransaction> senderTxs) {
long lastSequentialNonce = senderTxs.firstKey();
for (final long nonce : senderTxs.tailMap(lastSequentialNonce, false).keySet()) {
if (nonce == lastSequentialNonce + 1) {
++lastSequentialNonce;
} else {
break;
}
}
return senderTxs.headMap(lastSequentialNonce, true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ void blockAdded(
*/
OptionalLong getCurrentNonceFor(Address sender);

PendingTransaction promote(Predicate<PendingTransaction> promotionFilter);
List<PendingTransaction> promote(
Predicate<PendingTransaction> promotionFilter, final long freeSpace, final int freeSlots);

long subscribeToAdded(PendingTransactionAddedListener listener);

Expand Down

0 comments on commit 5d344ad

Please sign in to comment.