Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance when promoting transaction from next layers #5920

Merged
merged 7 commits into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics;

import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
Expand Down Expand Up @@ -104,8 +105,9 @@ protected void internalRemove(
}

@Override
public PendingTransaction promote(final Predicate<PendingTransaction> promotionFilter) {
return null;
public List<PendingTransaction> promote(
final Predicate<PendingTransaction> promotionFilter, final long l, final int freeSlots) {
fab-10 marked this conversation as resolved.
Show resolved Hide resolved
return List.of();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,22 +398,20 @@ public final void blockAdded(
nextLayer.blockAdded(feeMarket, blockHeader, maxConfirmedNonceBySender);
maxConfirmedNonceBySender.forEach(this::confirmed);
internalBlockAdded(blockHeader, feeMarket);
promoteTransactions();
}

protected abstract void internalBlockAdded(
final BlockHeader blockHeader, final FeeMarket feeMarket);

final void promoteTransactions() {
int freeSlots = maxTransactionsNumber() - pendingTransactions.size();
final int freeSlots = maxTransactionsNumber() - pendingTransactions.size();
final long freeSpace = cacheFreeSpace();

while (cacheFreeSpace() > 0 && freeSlots > 0) {
final var promotedTx = nextLayer.promote(this::promotionFilter);
if (promotedTx != null) {
processAdded(promotedTx);
--freeSlots;
} else {
break;
}
if (freeSlots > 0 && freeSpace > 0) {
nextLayer
.promote(this::promotionFilter, cacheFreeSpace(), freeSlots)
.forEach(this::processAdded);
}
}

Expand Down Expand Up @@ -444,8 +442,6 @@ private void confirmed(final Address sender, final long maxConfirmedNonce) {
internalConfirmed(senderTxs, sender, maxConfirmedNonce, highestNonceRemovedTx);
}
}

promoteTransactions();
}

protected abstract void internalConfirmed(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,9 @@ public OptionalLong getCurrentNonceFor(final Address sender) {
}

@Override
public PendingTransaction promote(final Predicate<PendingTransaction> promotionFilter) {
return null;
public List<PendingTransaction> promote(
final Predicate<PendingTransaction> promotionFilter, final long l, final int freeSlots) {
fab-10 marked this conversation as resolved.
Show resolved Hide resolved
return List.of();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics;
import org.hyperledger.besu.ethereum.mainnet.feemarket.FeeMarket;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
Expand Down Expand Up @@ -139,30 +141,51 @@ public Stream<PendingTransaction> stream() {
}

@Override
public PendingTransaction promote(final Predicate<PendingTransaction> promotionFilter) {

final var maybePromotedTx =
orderByMaxFee.descendingSet().stream()
.filter(candidateTx -> promotionFilter.test(candidateTx))
.findFirst();

return maybePromotedTx
.map(
promotedTx -> {
final var senderTxs = txsBySender.get(promotedTx.getSender());
// we always promote the first tx of a sender, so remove the first entry
senderTxs.pollFirstEntry();
processRemove(senderTxs, promotedTx.getTransaction(), PROMOTED);

// now that we have space, promote from the next layer
promoteTransactions();

if (senderTxs.isEmpty()) {
txsBySender.remove(promotedTx.getSender());
}
return promotedTx;
})
.orElse(null);
public List<PendingTransaction> promote(
final Predicate<PendingTransaction> promotionFilter,
final long freeSpace,
final int freeSlots) {
long accSpace = 0;
final List<PendingTransaction> promotedTxs = new ArrayList<>();

// first find all txs that can be promoted
search:
for (final var senderFirstTx : orderByMaxFee.descendingSet()) {
final var senderTxs = txsBySender.get(senderFirstTx.getSender());
for (final var candidateTx : senderTxs.values()) {
if (promotionFilter.test(candidateTx)) {
accSpace += candidateTx.memorySize();
if (promotedTxs.size() < freeSlots && accSpace <= freeSpace) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe a comment to explain why < for slots and <= for space

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can I ask what you get from the code about the difference? to understand what needs clarification

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

freeSlots and freeSpace have similar names - how much is free - so why does the if statement check < for one and <= for the other?

why not <= for freeSlots?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

slots always increment by 1, while for space it could be any amount, so you can read it like that

if (promotedTxs.size() + 1 <= freeSlots && accumulatedSpace <= freeSpace)

but the +1 is redundant knowing that promotedTxs.size() can only increment by 1 at time, and we can shortcut with <

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right so it's because the accumulatedSpace is already incremented at this point but for the slots it happens after. makes sense. it's ok for me :)

promotedTxs.add(candidateTx);
} else {
// no room for more txs the search is over exit the loops
break search;
}
} else {
// skip remaining txs for this sender to avoid gaps
break;
}
}
}

// then remove promoted txs from this layer
promotedTxs.forEach(
promotedTx -> {
final var sender = promotedTx.getSender();
final var senderTxs = txsBySender.get(sender);
senderTxs.remove(promotedTx.getNonce());
processRemove(senderTxs, promotedTx.getTransaction(), PROMOTED);
if (senderTxs.isEmpty()) {
txsBySender.remove(sender);
}
});

if (!promotedTxs.isEmpty()) {
// since we removed some txs we can try to promote from next layer
promoteTransactions();
}

return promotedTxs;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.hyperledger.besu.ethereum.mainnet.feemarket.FeeMarket;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -126,36 +125,86 @@ protected void internalReplaced(final PendingTransaction replacedTx) {
@Override
protected void internalBlockAdded(final BlockHeader blockHeader, final FeeMarket feeMarket) {}

/**
* We only want to promote transactions that have gap == 0, so there will be no gap in the prev
* layers. A promoted transaction is removed from this layer, and the gap data is updated for its
* sender.
*
* @param promotionFilter the prev layer's promotion filter
* @param freeSpace max amount of memory promoted txs can occupy
* @param freeSlots max number of promoted txs
* @return a list of transactions promoted to the prev layer
*/
@Override
public PendingTransaction promote(final Predicate<PendingTransaction> promotionFilter) {
final PendingTransaction promotedTx =
orderByGap.get(0).stream()
.map(txsBySender::get)
.map(NavigableMap::values)
.flatMap(Collection::stream)
.filter(promotionFilter)
.findFirst()
.orElse(null);

if (promotedTx != null) {
final Address sender = promotedTx.getSender();
final var senderTxs = txsBySender.get(sender);
senderTxs.pollFirstEntry();
processRemove(senderTxs, promotedTx.getTransaction(), PROMOTED);
if (senderTxs.isEmpty()) {
txsBySender.remove(sender);
orderByGap.get(0).remove(sender);
gapBySender.remove(sender);
} else {
final long firstNonce = senderTxs.firstKey();
final int newGap = (int) (firstNonce - (promotedTx.getNonce() + 1));
if (newGap != 0) {
updateGap(sender, 0, newGap);
public List<PendingTransaction> promote(
final Predicate<PendingTransaction> promotionFilter,
final long freeSpace,
final int freeSlots) {
long accSpace = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

usedSpace?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

allocatedSpace?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here acc stays for accumulated space, to keep how much space has been accumulated by the selected txs till now, do you think it is better to rename it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

accumulated also works

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renaming to accumulatedSpace

final List<PendingTransaction> promotedTxs = new ArrayList<>();

final var zeroGapSenders = orderByGap.get(0);

search:
for (final var sender : zeroGapSenders) {
final var senderSeqTxs = getSequentialSubset(txsBySender.get(sender));

for (final var candidateTx : senderSeqTxs.values()) {

if (promotionFilter.test(candidateTx)) {
accSpace += candidateTx.memorySize();
if (promotedTxs.size() < freeSlots && accSpace <= freeSpace) {
promotedTxs.add(candidateTx);
} else {
// no room for more txs the search is over exit the loops
break search;
}
} else {
// skip remaining txs for this sender
break;
}
}
}

return promotedTx;
// remove promoted txs from this layer
promotedTxs.forEach(
promotedTx -> {
final var sender = promotedTx.getSender();
final var senderTxs = txsBySender.get(sender);
senderTxs.remove(promotedTx.getNonce());
processRemove(senderTxs, promotedTx.getTransaction(), PROMOTED);
if (senderTxs.isEmpty()) {
txsBySender.remove(sender);
orderByGap.get(0).remove(sender);
gapBySender.remove(sender);
} else {
final long firstNonce = senderTxs.firstKey();
final int newGap = (int) (firstNonce - (promotedTx.getNonce() + 1));
if (newGap != 0) {
updateGap(sender, 0, newGap);
}
}
});

if (!promotedTxs.isEmpty()) {
// since we removed some txs we can try to promote from next layer
promoteTransactions();
}

return promotedTxs;
}

private NavigableMap<Long, PendingTransaction> getSequentialSubset(
final NavigableMap<Long, PendingTransaction> senderTxs) {
long lastSequentialNonce = senderTxs.firstKey();
for (final long nonce : senderTxs.tailMap(lastSequentialNonce, false).keySet()) {
if (nonce == lastSequentialNonce + 1) {
++lastSequentialNonce;
} else {
break;
}
}
return senderTxs.headMap(lastSequentialNonce, true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ void blockAdded(
*/
OptionalLong getCurrentNonceFor(Address sender);

PendingTransaction promote(Predicate<PendingTransaction> promotionFilter);
List<PendingTransaction> promote(
Predicate<PendingTransaction> promotionFilter, final long freeSpace, final int freeSlots);

long subscribeToAdded(PendingTransactionAddedListener listener);

Expand Down