Skip to content

Commit

Permalink
*: SubscribeNewQueuedTxsEvent api
Browse files Browse the repository at this point in the history
  • Loading branch information
markya0616 authored and CaraWang committed Sep 19, 2024
1 parent 06c5646 commit 9c51d27
Show file tree
Hide file tree
Showing 16 changed files with 434 additions and 4 deletions.
3 changes: 3 additions & 0 deletions core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ type NewTxsEvent struct{ Txs []*types.Transaction }
// ReannoTxsEvent is posted when a batch of local pending transactions exceed a specified duration.
type ReannoTxsEvent struct{ Txs []*types.Transaction }

// NewQueuedTxsEvent is posted when a batch of transactions enter the transaction pool.
type NewQueuedTxsEvent struct{ Txs []*types.Transaction }

// NewMinedBlockEvent is posted when a block has been imported.
type NewMinedBlockEvent struct{ Block *types.Block }

Expand Down
17 changes: 15 additions & 2 deletions core/txpool/blobpool/blobpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ import (
"sync/atomic"
"time"

"github.com/holiman/billy"
"github.com/holiman/uint256"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/misc/eip1559"
"github.com/ethereum/go-ethereum/consensus/misc/eip4844"
Expand All @@ -42,8 +45,6 @@ import (
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/holiman/billy"
"github.com/holiman/uint256"
)

const (
Expand Down Expand Up @@ -316,6 +317,7 @@ type BlobPool struct {
discoverFeed event.Feed // Event feed to send out new tx events on pool discovery (reorg excluded)
insertFeed event.Feed // Event feed to send out new tx events on pool inclusion (reorg included)
reannoTxFeed event.Feed // Event feed for announcing transactions again
queuedTxFeed event.Feed
scope event.SubscriptionScope

lock sync.RWMutex // Mutex protecting the pool during reorg handling
Expand Down Expand Up @@ -1277,6 +1279,11 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) {
}
return err
}

// already validated above
// Broadcast a new tx anyway if it's valid
go p.queuedTxFeed.Send(core.NewQueuedTxsEvent{Txs: types.Transactions{tx}})

// If the address is not yet known, request exclusivity to track the account
// only by this subpool until all transactions are evicted
from, _ := types.Sender(p.signer, tx) // already validated above
Expand Down Expand Up @@ -1616,6 +1623,12 @@ func (pool *BlobPool) SubscribeReannoTxsEvent(ch chan<- core.ReannoTxsEvent) eve
return pool.scope.Track(pool.reannoTxFeed.Subscribe(ch))
}

// SubscribeNewQueuedTxsEvent registers a subscription of NewQueuedTxsEvent and
// starts sending event to the given channel.
func (pool *BlobPool) SubscribeNewQueuedTxsEvent(ch chan<- core.NewQueuedTxsEvent) event.Subscription {
return pool.scope.Track(pool.queuedTxFeed.Subscribe(ch))
}

// Nonce returns the next nonce of an account, with all transactions executable
// by the pool already applied on top.
func (p *BlobPool) Nonce(addr common.Address) uint64 {
Expand Down
9 changes: 9 additions & 0 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ type LegacyPool struct {
gasTip atomic.Pointer[uint256.Int]
txFeed event.Feed
reannoTxFeed event.Feed // Event feed for announcing transactions again
queuedTxFeed event.Feed
scope event.SubscriptionScope
signer types.Signer
mu sync.RWMutex
Expand Down Expand Up @@ -476,6 +477,12 @@ func (pool *LegacyPool) SubscribeReannoTxsEvent(ch chan<- core.ReannoTxsEvent) e
return pool.scope.Track(pool.reannoTxFeed.Subscribe(ch))
}

// SubscribeNewQueuedTxsEvent registers a subscription of NewQueuedTxsEvent and
// starts sending event to the given channel.
func (pool *LegacyPool) SubscribeNewQueuedTxsEvent(ch chan<- core.NewQueuedTxsEvent) event.Subscription {
return pool.scope.Track(pool.queuedTxFeed.Subscribe(ch))
}

// SetGasTip updates the minimum gas tip required by the transaction pool for a
// new transaction, and drops all transactions below this threshold.
func (pool *LegacyPool) SetGasTip(tip *big.Int) {
Expand Down Expand Up @@ -758,6 +765,8 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
}
// already validated by this point
from, _ := types.Sender(pool.signer, tx)
// Broadcast a new tx anyway if it's valid
go pool.queuedTxFeed.Send(core.NewQueuedTxsEvent{Txs: types.Transactions{tx}})

// If the address is not yet known, request exclusivity to track the account
// only by this subpool until all transactions are evicted
Expand Down
171 changes: 170 additions & 1 deletion core/txpool/legacypool/legacypool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"testing"
"time"

"github.com/holiman/uint256"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
Expand All @@ -39,7 +41,6 @@ import (
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/trie"
"github.com/holiman/uint256"
)

var (
Expand Down Expand Up @@ -231,6 +232,34 @@ func validateEvents(events chan core.NewTxsEvent, count int) error {
return nil
}

// validateQueuedEvents checks that the correct number of transaction addition events
// were fired on the pool's event feed.
func validateQueuedEvents(events chan core.NewQueuedTxsEvent, count int) error {
var received []*types.Transaction

for len(received) < count {
select {
case ev := <-events:
received = append(received, ev.Txs...)
case <-time.After(time.Second):
return fmt.Errorf("event #%d not fired", len(received))
}
}
if len(received) > count {
return fmt.Errorf("more than %d events fired: %v", count, received[count:])
}
select {
case ev := <-events:
return fmt.Errorf("more than %d events fired: %v", count, ev.Txs)

case <-time.After(50 * time.Millisecond):
// This branch should be "default", but it's a data race between goroutines,
// reading the event channel and pushing into it, so better wait a bit ensuring
// really nothing gets injected.
}
return nil
}

func deriveSender(tx *types.Transaction) (common.Address, error) {
return types.Sender(types.HomesteadSigner{}, tx)
}
Expand Down Expand Up @@ -355,6 +384,146 @@ func TestInvalidTransactions(t *testing.T) {
}
}

func TestSubscribeNewQueuedTxsEvent(t *testing.T) {
t.Parallel()

// Create the pool to test the pricing enforcement with
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed))

// pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
pool := New(testTxPoolConfig, blockchain)
pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver())
defer pool.Close()

// Keep listening for authenticated transactions.
events := make(chan core.NewQueuedTxsEvent, 32)
sub := pool.SubscribeNewQueuedTxsEvent(events)
defer sub.Unsubscribe()

createAccounts := func(numbers int, txpool *LegacyPool) []*ecdsa.PrivateKey {
keys := []*ecdsa.PrivateKey{}
for i := 0; i < numbers; i++ {
key, _ := crypto.GenerateKey()
txpool.currentState.AddBalance(crypto.PubkeyToAddress(key.PublicKey), uint256.MustFromBig(big.NewInt(1000000)))

keys = append(keys, key)
}
return keys
}

tests := []struct {
name string
setup func(txPool *LegacyPool)
validEvents int
}{
{
name: "valid transaction",
validEvents: 10,
setup: func(txPool *LegacyPool) {
keys := createAccounts(4, txPool)
txs := types.Transactions{}

txs = append(txs, pricedTransaction(0, 100000, big.NewInt(2), keys[0]))
txs = append(txs, pricedTransaction(1, 100000, big.NewInt(1), keys[0]))
txs = append(txs, pricedTransaction(2, 100000, big.NewInt(2), keys[0]))

txs = append(txs, pricedTransaction(0, 100000, big.NewInt(1), keys[1]))
txs = append(txs, pricedTransaction(1, 100000, big.NewInt(2), keys[1]))
txs = append(txs, pricedTransaction(2, 100000, big.NewInt(2), keys[1]))

txs = append(txs, pricedTransaction(0, 100000, big.NewInt(2), keys[2]))
txs = append(txs, pricedTransaction(1, 100000, big.NewInt(1), keys[2]))
txs = append(txs, pricedTransaction(2, 100000, big.NewInt(2), keys[2]))

ltx := pricedTransaction(0, 100000, big.NewInt(1), keys[3])

pool.addRemotes(txs)
pool.addLocal(ltx)
},
},

{
name: "duplicate transaction",
validEvents: 1,
setup: func(txPool *LegacyPool) {
keys := createAccounts(1, txPool)
txs := types.Transactions{}

txs = append(txs, pricedTransaction(0, 100000, big.NewInt(1), keys[0]))
txs = append(txs, pricedTransaction(0, 100000, big.NewInt(1), keys[0]))

pool.addLocals(txs)
},
},
{
name: "bump gas price",
validEvents: 2,
setup: func(txPool *LegacyPool) {
keys := createAccounts(1, txPool)
txs := types.Transactions{}

txs = append(txs, pricedTransaction(0, 100000, big.NewInt(1), keys[0]))
txs = append(txs, pricedTransaction(0, 100000, big.NewInt(2), keys[0]))

pool.addLocals(txs)
},
},
{
name: "duplicate nonce with different gas price",
validEvents: 2,
setup: func(txPool *LegacyPool) {
keys := createAccounts(1, txPool)
txs := types.Transactions{}

txs = append(txs, pricedTransaction(0, 100000, big.NewInt(1), keys[0]))
txs = append(txs, pricedTransaction(0, 100000, big.NewInt(2), keys[0]))

pool.addLocals(txs)
},
},
{
name: "duplicate nonce with different gas limit",
validEvents: 2,
setup: func(txPool *LegacyPool) {
keys := createAccounts(1, txPool)
txs := types.Transactions{}

txs = append(txs, pricedTransaction(0, 100000, big.NewInt(1), keys[0]))
txs = append(txs, pricedTransaction(0, 80000, big.NewInt(1), keys[0]))

pool.addLocals(txs)
},
},
{
name: "discontinuous nonce",
validEvents: 2,
setup: func(txPool *LegacyPool) {
keys := createAccounts(1, txPool)
txs := types.Transactions{}

txs = append(txs, pricedTransaction(0, 100000, big.NewInt(1), keys[0]))
txs = append(txs, pricedTransaction(10000, 80000, big.NewInt(1), keys[0]))

pool.addLocals(txs)
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.setup(pool)

if err := validateQueuedEvents(events, tt.validEvents); err != nil {
t.Fatalf("event firing failed: %v", err)
}
if err := validatePoolInternals(pool); err != nil {
t.Fatalf("pool internal state corrupted: %v", err)
}
})
}
}

func TestQueue(t *testing.T) {
t.Parallel()

Expand Down
4 changes: 4 additions & 0 deletions core/txpool/subpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ type SubPool interface {
// ReannoTxsEvent and send events to the given channel.
SubscribeReannoTxsEvent(chan<- core.ReannoTxsEvent) event.Subscription

// SubscribeNewQueuedTxsEvent registers a subscription of NewQueuedTxsEvent and
// starts sending event to the given channel.
SubscribeNewQueuedTxsEvent(ch chan<- core.NewQueuedTxsEvent) event.Subscription

// Nonce returns the next nonce of an account, with all transactions executable
// by the pool already applied on top.
Nonce(addr common.Address) uint64
Expand Down
13 changes: 13 additions & 0 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,19 @@ func (p *TxPool) SubscribeReannoTxsEvent(ch chan<- core.ReannoTxsEvent) event.Su
return p.subs.Track(event.JoinSubscriptions(subs...))
}

// SubscribeNewQueuedTxsEventregisters a subscription of NewQueuedTxsEvent and starts sending
// events to the given channel.
func (p *TxPool) SubscribeNewQueuedTxsEvent(ch chan<- core.NewQueuedTxsEvent) event.Subscription {
subs := make([]event.Subscription, 0, len(p.subpools))
for _, subpool := range p.subpools {
sub := subpool.SubscribeNewQueuedTxsEvent(ch)
if sub != nil { // sub will be nil when subpool have been shut down
subs = append(subs, sub)
}
}
return p.subs.Track(event.JoinSubscriptions(subs...))
}

// Nonce returns the next nonce of an account, with all transactions executable
// by the pool already applied on top.
func (p *TxPool) Nonce(addr common.Address) uint64 {
Expand Down
5 changes: 5 additions & 0 deletions core/vote/vote_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,11 @@ func (pool *VotePool) putIntoVotePool(vote *types.VoteEnvelope) bool {
return true
}

// no implementation for vote pool
func (pool *VotePool) SubscribeNewQueuedTxsEvent(ch chan<- core.NewQueuedTxsEvent) event.Subscription {
return nil
}

func (pool *VotePool) SubscribeNewVoteEvent(ch chan<- core.NewVoteEvent) event.Subscription {
return pool.scope.Track(pool.votesFeed.Subscribe(ch))
}
Expand Down
4 changes: 4 additions & 0 deletions eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,10 @@ func (b *EthAPIBackend) SuggestGasTipCap(ctx context.Context) (*big.Int, error)
return b.gpo.SuggestTipCap(ctx)
}

func (b *EthAPIBackend) SubscribeNewQueuedTxsEvent(ch chan<- core.NewQueuedTxsEvent) event.Subscription {
return b.eth.TxPool().SubscribeNewQueuedTxsEvent(ch)
}

func (b *EthAPIBackend) FeeHistory(ctx context.Context, blockCount uint64, lastBlock rpc.BlockNumber, rewardPercentiles []float64) (firstBlock *big.Int, reward [][]*big.Int, baseFee []*big.Int, gasUsedRatio []float64, err error) {
return b.gpo.FeeHistory(ctx, blockCount, lastBlock, rewardPercentiles)
}
Expand Down
Loading

0 comments on commit 9c51d27

Please sign in to comment.