Skip to content

Commit

Permalink
opt truncatePending and truncateQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
andyzhang2023 committed Dec 25, 2024
1 parent e6d8251 commit cea0c29
Showing 1 changed file with 15 additions and 8 deletions.
23 changes: 15 additions & 8 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,9 @@ type LegacyPool struct {
all *lookup // All transactions to allow lookups
priced *pricedList // All transactions sorted by price

pendingCounter int
queueCounter int

pendingCache *cacheForMiner //pending list cache for miner

reqResetCh chan *txpoolResetRequest
Expand Down Expand Up @@ -1004,6 +1007,7 @@ func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, local
} else {
// Nothing was replaced, bump the queued counter
queuedGauge.Inc(1)
pool.queueCounter++
}
// If the transaction isn't in lookup set but it's expected to be there,
// show the error log.
Expand Down Expand Up @@ -1062,6 +1066,7 @@ func (pool *LegacyPool) promoteTx(addr common.Address, hash common.Hash, tx *typ
pool.pendingCache.del([]*types.Transaction{old}, pool.signer)
} else {
// Nothing was replaced, bump the pending counter
pool.pendingCounter++
pendingGauge.Inc(1)
}
// Set the potentially new pending nonce and notify any subsystems of the new tx
Expand Down Expand Up @@ -1291,6 +1296,7 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo
// Reduce the pending counter
pool.pendingCache.del(append(invalids, tx), pool.signer)
pendingGauge.Dec(int64(1 + len(invalids)))
pool.pendingCounter -= 1 + len(invalids)
return 1 + len(invalids)
}
}
Expand All @@ -1299,6 +1305,7 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo
if removed, _ := future.Remove(tx); removed {
// Reduce the queued counter
queuedGauge.Dec(1)
pool.queueCounter -= 1
}
if future.Empty() {
delete(pool.queue, addr)
Expand Down Expand Up @@ -1710,6 +1717,7 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T
}
log.Trace("Promoted queued transactions", "count", len(promoted))
queuedGauge.Dec(int64(len(readies)))
pool.queueCounter -= len(readies)

// Drop all transactions over the allowed limit
var caps types.Transactions
Expand All @@ -1725,6 +1733,7 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T
// Mark all the items dropped as removed
pool.priced.Removed(len(forwards) + len(drops) + len(caps))
queuedGauge.Dec(int64(len(forwards) + len(drops) + len(caps)))
pool.queueCounter -= len(forwards) + len(drops) + len(caps)
if pool.locals.contains(addr) {
localGauge.Dec(int64(len(forwards) + len(drops) + len(caps)))
}
Expand All @@ -1744,10 +1753,7 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T
// pending limit. The algorithm tries to reduce transaction counts by an approximately
// equal number for all for accounts with many pending transactions.
func (pool *LegacyPool) truncatePending() {
pending := uint64(0)
for _, list := range pool.pending {
pending += uint64(list.Len())
}
pending := uint64(pool.pendingCounter)
if pending <= pool.config.GlobalSlots {
return
}
Expand Down Expand Up @@ -1792,6 +1798,7 @@ func (pool *LegacyPool) truncatePending() {
pool.priced.Removed(len(caps))
dropPendingCache = append(dropPendingCache, caps...)
pendingGauge.Dec(int64(len(caps)))
pool.pendingCounter -= len(caps)
if pool.locals.contains(offenders[i]) {
localGauge.Dec(int64(len(caps)))
}
Expand Down Expand Up @@ -1820,6 +1827,7 @@ func (pool *LegacyPool) truncatePending() {
dropPendingCache = append(dropPendingCache, caps...)
pool.priced.Removed(len(caps))
pendingGauge.Dec(int64(len(caps)))
pool.pendingCounter -= len(caps)
if pool.locals.contains(addr) {
localGauge.Dec(int64(len(caps)))
}
Expand All @@ -1833,10 +1841,7 @@ func (pool *LegacyPool) truncatePending() {

// truncateQueue drops the oldest transactions in the queue if the pool is above the global queue limit.
func (pool *LegacyPool) truncateQueue() {
queued := uint64(0)
for _, list := range pool.queue {
queued += uint64(list.Len())
}
queued := uint64(pool.queueCounter)
if queued <= pool.config.GlobalQueue {
return
}
Expand Down Expand Up @@ -1932,6 +1937,7 @@ func (pool *LegacyPool) demoteUnexecutables(demoteAddrs []common.Address) {
dropPendingCache = append(dropPendingCache, invalids...)
dropPendingCache = append(dropPendingCache, drops...)
pendingGauge.Dec(int64(len(olds) + len(drops) + len(invalids)))
pool.pendingCounter -= len(olds) + len(drops) + len(invalids)
if pool.locals.contains(addr) {
localGauge.Dec(int64(len(olds) + len(drops) + len(invalids)))
}
Expand All @@ -1947,6 +1953,7 @@ func (pool *LegacyPool) demoteUnexecutables(demoteAddrs []common.Address) {
}
dropPendingCache = append(dropPendingCache, gapped...)
pendingGauge.Dec(int64(len(gapped)))
pool.pendingCounter -= len(gapped)
}
// Delete the entire pending entry if it became empty.
if list.Empty() {
Expand Down

0 comments on commit cea0c29

Please sign in to comment.