Skip to content

Commit

Permalink
eth/filters: fix pending for getLogs (ethereum#24949)
Browse files Browse the repository at this point in the history
  • Loading branch information
JukLee0ira committed Aug 3, 2024
1 parent 14d183a commit c8b98d2
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 19 deletions.
27 changes: 17 additions & 10 deletions accounts/abi/bind/backends/simulated.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@ type SimulatedBackend struct {
database ethdb.Database // In memory database to store our testing data
blockchain *core.BlockChain // Ethereum blockchain to handle the consensus

mu sync.Mutex
pendingBlock *types.Block // Currently pending block that will be imported on request
pendingState *state.StateDB // Currently pending state that will be the active on on request
mu sync.Mutex
pendingBlock *types.Block // Currently pending block that will be imported on request
pendingState *state.StateDB // Currently pending state that will be the active on request
pendingReceipts types.Receipts // Currently receipts for the pending block

events *filters.EventSystem // Event system for filtering log events live

Expand Down Expand Up @@ -126,8 +127,8 @@ func NewXDCSimulatedBackend(alloc core.GenesisAlloc, gasLimit uint64, chainConfi
database: database,
blockchain: blockchain,
config: genesis.Config,
events: filters.NewEventSystem(&filterBackend{database, blockchain}, false),
}
backend.events = filters.NewEventSystem(&filterBackend{database, blockchain, backend}, false)
blockchain.Client = backend
backend.rollback()
return backend
Expand All @@ -146,8 +147,8 @@ func NewSimulatedBackend(alloc core.GenesisAlloc) *SimulatedBackend {
database: database,
blockchain: blockchain,
config: genesis.Config,
events: filters.NewEventSystem(&filterBackend{database, blockchain}, false),
}
backend.events = filters.NewEventSystem(&filterBackend{database, blockchain, backend}, false)
backend.rollback()
return backend
}
Expand Down Expand Up @@ -400,7 +401,7 @@ func (b *SimulatedBackend) SendTransaction(ctx context.Context, tx *types.Transa
}

// Include tx in chain.
blocks, _ := core.GenerateChain(b.config, block, b.blockchain.Engine(), b.database, 1, func(number int, block *core.BlockGen) {
blocks, receipts := core.GenerateChain(b.config, block, b.blockchain.Engine(), b.database, 1, func(number int, block *core.BlockGen) {
for _, tx := range b.pendingBlock.Transactions() {
block.AddTxWithChain(b.blockchain, tx)
}
Expand All @@ -410,6 +411,7 @@ func (b *SimulatedBackend) SendTransaction(ctx context.Context, tx *types.Transa

b.pendingBlock = blocks[0]
b.pendingState, _ = state.New(b.pendingBlock.Root(), statedb.Database())
b.pendingReceipts = receipts[0]
return nil
}

Expand All @@ -421,7 +423,7 @@ func (b *SimulatedBackend) FilterLogs(ctx context.Context, query XDPoSChain.Filt
var filter *filters.Filter
if query.BlockHash != nil {
// Block filter requested, construct a single-shot filter
filter = filters.NewBlockFilter(&filterBackend{b.database, b.blockchain}, *query.BlockHash, query.Addresses, query.Topics)
filter = filters.NewBlockFilter(&filterBackend{b.database, b.blockchain, b}, *query.BlockHash, query.Addresses, query.Topics)
} else {
// Initialize unset filter boundaried to run from genesis to chain head
from := int64(0)
Expand All @@ -433,7 +435,7 @@ func (b *SimulatedBackend) FilterLogs(ctx context.Context, query XDPoSChain.Filt
to = query.ToBlock.Int64()
}
// Construct the range filter
filter = filters.NewRangeFilter(&filterBackend{b.database, b.blockchain}, from, to, query.Addresses, query.Topics)
filter = filters.NewRangeFilter(&filterBackend{b.database, b.blockchain, b}, from, to, query.Addresses, query.Topics)
}
// Run the filter and return all the logs
logs, err := filter.Logs(ctx)
Expand Down Expand Up @@ -523,8 +525,9 @@ func (m callMsg) AccessList() types.AccessList { return m.CallMsg.AccessList }
// filterBackend implements filters.Backend to support filtering for logs without
// taking bloom-bits acceleration structures into account.
type filterBackend struct {
db ethdb.Database
bc *core.BlockChain
db ethdb.Database
bc *core.BlockChain
backend *SimulatedBackend
}

func (fb *filterBackend) ChainDb() ethdb.Database { return fb.db }
Expand All @@ -545,6 +548,10 @@ func (fb *filterBackend) GetReceipts(ctx context.Context, hash common.Hash) (typ
return core.GetBlockReceipts(fb.db, hash, core.GetBlockNumber(fb.db, hash)), nil
}

func (fb *filterBackend) PendingBlockAndReceipts() (*types.Block, types.Receipts) {
return fb.backend.pendingBlock, fb.backend.pendingReceipts
}

func (fb *filterBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log, error) {
receipts := core.GetBlockReceipts(fb.db, hash, core.GetBlockNumber(fb.db, hash))
if receipts == nil {
Expand Down
46 changes: 38 additions & 8 deletions eth/filters/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type Backend interface {
HeaderByHash(ctx context.Context, blockHash common.Hash) (*types.Header, error)
GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)
GetLogs(ctx context.Context, blockHash common.Hash) ([][]*types.Log, error)
PendingBlockAndReceipts() (*types.Block, types.Receipts)

SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
Expand Down Expand Up @@ -128,26 +129,35 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
}
return f.blockLogs(ctx, header)
}
// Short-cut if all we care about is pending logs
if f.begin == rpc.PendingBlockNumber.Int64() {
if f.end != rpc.PendingBlockNumber.Int64() {
return nil, errors.New("invalid block range")
}
return f.pendingLogs()
}
// Figure out the limits of the filter range
header, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
if header == nil {
return nil, nil
}
head := header.Number.Uint64()

if f.begin == -1 {
var (
head = header.Number.Uint64()
end = uint64(f.end)
pending = f.end == rpc.PendingBlockNumber.Int64()
)
if f.begin == rpc.LatestBlockNumber.Int64() {
f.begin = int64(head)
}
end := uint64(f.end)
if f.end == -1 {
if f.end == rpc.LatestBlockNumber.Int64() || f.end == rpc.PendingBlockNumber.Int64() {
end = head
}
// Gather all indexed logs, and finish with non indexed ones
var (
logs []*types.Log
err error
logs []*types.Log
err error
size, sections = f.backend.BloomStatus()
)
size, sections := f.backend.BloomStatus()
if indexed := sections * size; indexed > uint64(f.begin) {
if indexed > end {
logs, err = f.indexedLogs(ctx, end)
Expand All @@ -160,6 +170,13 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
}
rest, err := f.unindexedLogs(ctx, end)
logs = append(logs, rest...)
if pending {
pendingLogs, err := f.pendingLogs()
if err != nil {
return nil, err
}
logs = append(logs, pendingLogs...)
}
return logs, err
}

Expand Down Expand Up @@ -272,6 +289,19 @@ func (f *Filter) checkMatches(ctx context.Context, header *types.Header) (logs [
return nil, nil
}

// pendingLogs returns the logs matching the filter criteria within the pending block.
func (f *Filter) pendingLogs() ([]*types.Log, error) {
block, receipts := f.backend.PendingBlockAndReceipts()
if bloomFilter(block.Bloom(), f.addresses, f.topics) {
var unfiltered []*types.Log
for _, r := range receipts {
unfiltered = append(unfiltered, r.Logs...)
}
return filterLogs(unfiltered, nil, nil, f.addresses, f.topics), nil
}
return nil, nil
}

func includes(addresses []common.Address, a common.Address) bool {
for _, addr := range addresses {
if addr == a {
Expand Down
6 changes: 5 additions & 1 deletion eth/filters/filter_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ func (b *testBackend) GetLogs(ctx context.Context, blockHash common.Hash) ([][]*
return logs, nil
}

func (b *testBackend) PendingBlockAndReceipts() (*types.Block, types.Receipts) {
return nil, nil
}

func (b *testBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
return b.txFeed.Subscribe(ch)
}
Expand Down Expand Up @@ -602,7 +606,7 @@ func TestPendingTxFilterDeadlock(t *testing.T) {
var (
db = rawdb.NewMemoryDatabase()
backend = &testBackend{db: db}
api = NewFilterAPI(backend, false, timeout)
api = NewPublicFilterAPI(backend, false, timeout)
done = make(chan struct{})
)

Expand Down
1 change: 1 addition & 0 deletions internal/ethapi/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type Backend interface {
StateAndHeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*state.StateDB, *types.Header, error)
StateAndHeaderByNumberOrHash(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash) (*state.StateDB, *types.Header, error)
GetBlock(ctx context.Context, blockHash common.Hash) (*types.Block, error)
PendingBlockAndReceipts() (*types.Block, types.Receipts)
GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)
GetTd(blockHash common.Hash) *big.Int
GetEVM(ctx context.Context, msg core.Message, state *state.StateDB, XDCxState *tradingstate.TradingStateDB, header *types.Header, vmConfig *vm.Config) (*vm.EVM, func() error, error)
Expand Down

0 comments on commit c8b98d2

Please sign in to comment.