diff --git a/accounts/abi/bind/backends/simulated.go b/accounts/abi/bind/backends/simulated.go index b8c6814040a4..6a9607b109c0 100644 --- a/accounts/abi/bind/backends/simulated.go +++ b/accounts/abi/bind/backends/simulated.go @@ -126,7 +126,7 @@ func NewXDCSimulatedBackend(alloc core.GenesisAlloc, gasLimit uint64, chainConfi database: database, blockchain: blockchain, config: genesis.Config, - events: filters.NewEventSystem(new(event.TypeMux), &filterBackend{database, blockchain}, false), + events: filters.NewEventSystem(&filterBackend{database, blockchain}, false), } blockchain.Client = backend backend.rollback() @@ -146,7 +146,7 @@ func NewSimulatedBackend(alloc core.GenesisAlloc) *SimulatedBackend { database: database, blockchain: blockchain, config: genesis.Config, - events: filters.NewEventSystem(new(event.TypeMux), &filterBackend{database, blockchain}, false), + events: filters.NewEventSystem(&filterBackend{database, blockchain}, false), } backend.rollback() return backend @@ -558,22 +558,34 @@ func (fb *filterBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*ty } func (fb *filterBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription { - return event.NewSubscription(func(quit <-chan struct{}) error { - <-quit - return nil - }) + return nullSubscription() } + func (fb *filterBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription { return fb.bc.SubscribeChainEvent(ch) } + func (fb *filterBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription { return fb.bc.SubscribeRemovedLogsEvent(ch) } + func (fb *filterBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { return fb.bc.SubscribeLogsEvent(ch) } +func (fb *filterBackend) SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription { + return nullSubscription() +} + func (fb *filterBackend) BloomStatus() (uint64, uint64) { return 4096, 0 } + func (fb *filterBackend) ServiceFilter(ctx context.Context, ms *bloombits.MatcherSession) { panic("not supported") } + +func nullSubscription() event.Subscription { + return event.NewSubscription(func(quit <-chan struct{}) error { + <-quit + return nil + }) +} diff --git a/core/events.go b/core/events.go index 60dc8d7ddd36..bf7e7027e5c9 100644 --- a/core/events.go +++ b/core/events.go @@ -30,11 +30,6 @@ type OrderTxPreEvent struct{ Tx *types.OrderTransaction } // LendingTxPreEvent is posted when a order transaction enters the order transaction pool. type LendingTxPreEvent struct{ Tx *types.LendingTransaction } -// PendingLogsEvent is posted pre mining and notifies of pending logs. -type PendingLogsEvent struct { - Logs []*types.Log -} - // PendingStateEvent is posted pre mining and notifies of pending state changes. type PendingStateEvent struct{} diff --git a/eth/api_backend.go b/eth/api_backend.go index 1a9ebc417091..732d3e5156d7 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -258,6 +258,10 @@ func (b *EthApiBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEven return b.eth.BlockChain().SubscribeRemovedLogsEvent(ch) } +func (b *EthApiBackend) SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription { + return b.eth.miner.SubscribePendingLogs(ch) +} + func (b *EthApiBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription { return b.eth.BlockChain().SubscribeChainEvent(ch) } diff --git a/eth/filters/api.go b/eth/filters/api.go index 04256a109620..330327442321 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -72,9 +72,8 @@ type PublicFilterAPI struct { func NewPublicFilterAPI(backend Backend, lightMode bool) *PublicFilterAPI { api := &PublicFilterAPI{ backend: backend, - mux: backend.EventMux(), chainDb: backend.ChainDb(), - events: NewEventSystem(backend.EventMux(), backend, lightMode), + events: NewEventSystem(backend, lightMode), filters: make(map[rpc.ID]*filter), } go api.timeoutLoop() @@ -439,7 +438,7 @@ func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) { hashes := f.hashes f.hashes = nil return returnHashes(hashes), nil - case LogsSubscription: + case LogsSubscription, MinedAndPendingLogsSubscription: logs := f.logs f.logs = nil return returnLogs(logs), nil diff --git a/eth/filters/bench_test.go b/eth/filters/bench_test.go index 98635b9de7b9..9c9aa5a80a3c 100644 --- a/eth/filters/bench_test.go +++ b/eth/filters/bench_test.go @@ -30,7 +30,6 @@ import ( "github.com/XinFinOrg/XDPoSChain/core/rawdb" "github.com/XinFinOrg/XDPoSChain/core/types" "github.com/XinFinOrg/XDPoSChain/ethdb" - "github.com/XinFinOrg/XDPoSChain/event" "github.com/XinFinOrg/XDPoSChain/node" ) @@ -124,14 +123,13 @@ func benchmarkBloomBits(b *testing.B, sectionSize uint64) { b.Log("Running filter benchmarks...") start = time.Now() - mux := new(event.TypeMux) var backend *testBackend for i := 0; i < benchFilterCnt; i++ { if i%20 == 0 { db.Close() db, _ = rawdb.NewLevelDBDatabase(benchDataDir, 128, 1024, "") - backend = &testBackend{mux, db, cnt, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)} + backend = &testBackend{db: db, sections: cnt} } var addr common.Address addr[0] = byte(i) @@ -190,8 +188,7 @@ func BenchmarkNoBloomBits(b *testing.B) { b.Log("Running filter benchmarks...") start := time.Now() - mux := new(event.TypeMux) - backend := &testBackend{mux, db, 0, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)} + backend := &testBackend{db: db} filter := NewRangeFilter(backend, 0, int64(headNum), []common.Address{{}}, nil) filter.Logs(context.Background()) d := time.Since(start) diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 4d47bafc0fc7..40a903d44b26 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -32,7 +32,6 @@ import ( type Backend interface { ChainDb() ethdb.Database - EventMux() *event.TypeMux HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) HeaderByHash(ctx context.Context, blockHash common.Hash) (*types.Header, error) GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error) @@ -42,6 +41,7 @@ type Backend interface { SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription + SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription BloomStatus() (uint64, uint64) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index 7bf7db629711..921797acb6cf 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -67,10 +67,6 @@ const ( chainEvChanSize = 10 ) -var ( - ErrInvalidSubscriptionID = errors.New("invalid id") -) - type subscription struct { id rpc.ID typ Type @@ -86,25 +82,25 @@ type subscription struct { // EventSystem creates subscriptions, processes events and broadcasts them to the // subscription which match the subscription criteria. type EventSystem struct { - mux *event.TypeMux backend Backend lightMode bool lastHead *types.Header // Subscriptions - txsSub event.Subscription // Subscription for new transaction event - logsSub event.Subscription // Subscription for new log event - rmLogsSub event.Subscription // Subscription for removed log event - chainSub event.Subscription // Subscription for new chain event - pendingLogSub *event.TypeMuxSubscription // Subscription for pending log event + txsSub event.Subscription // Subscription for new transaction event + logsSub event.Subscription // Subscription for new log event + rmLogsSub event.Subscription // Subscription for removed log event + pendingLogsSub event.Subscription // Subscription for pending log event + chainSub event.Subscription // Subscription for new chain event // Channels - install chan *subscription // install filter for event notification - uninstall chan *subscription // remove filter for event notification - txsCh chan core.NewTxsEvent // Channel to receive new transactions event - logsCh chan []*types.Log // Channel to receive new log event - rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event - chainCh chan core.ChainEvent // Channel to receive new chain event + install chan *subscription // install filter for event notification + uninstall chan *subscription // remove filter for event notification + txsCh chan core.NewTxsEvent // Channel to receive new transactions event + logsCh chan []*types.Log // Channel to receive new log event + pendingLogsCh chan []*types.Log // Channel to receive new log event + rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event + chainCh chan core.ChainEvent // Channel to receive new chain event } // NewEventSystem creates a new manager that listens for event on the given mux, @@ -113,17 +109,17 @@ type EventSystem struct { // // The returned manager has a loop that needs to be stopped with the Stop function // or by stopping the given mux. -func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventSystem { +func NewEventSystem(backend Backend, lightMode bool) *EventSystem { m := &EventSystem{ - mux: mux, - backend: backend, - lightMode: lightMode, - install: make(chan *subscription), - uninstall: make(chan *subscription), - txsCh: make(chan core.NewTxsEvent, txChanSize), - logsCh: make(chan []*types.Log, logsChanSize), - rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize), - chainCh: make(chan core.ChainEvent, chainEvChanSize), + backend: backend, + lightMode: lightMode, + install: make(chan *subscription), + uninstall: make(chan *subscription), + txsCh: make(chan core.NewTxsEvent, txChanSize), + logsCh: make(chan []*types.Log, logsChanSize), + rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize), + pendingLogsCh: make(chan []*types.Log, logsChanSize), + chainCh: make(chan core.ChainEvent, chainEvChanSize), } // Subscribe events @@ -131,12 +127,10 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh) m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh) m.chainSub = m.backend.SubscribeChainEvent(m.chainCh) - // TODO(rjl493456442): use feed to subscribe pending log event - m.pendingLogSub = m.mux.Subscribe(core.PendingLogsEvent{}) + m.pendingLogsSub = m.backend.SubscribePendingLogsEvent(m.pendingLogsCh) // Make sure none of the subscriptions are empty - if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || - m.pendingLogSub.Closed() { + if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.pendingLogsSub == nil { log.Crit("Subscribe for event system failed") } @@ -314,58 +308,61 @@ func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscript type filterIndex map[Type]map[rpc.ID]*subscription -// broadcast event to filters that match criteria. -func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) { - if ev == nil { +func (es *EventSystem) handleLogs(filters filterIndex, ev []*types.Log) { + if len(ev) == 0 { return } + for _, f := range filters[LogsSubscription] { + matchedLogs := filterLogs(ev, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics) + if len(matchedLogs) > 0 { + f.logs <- matchedLogs + } + } +} - switch e := ev.(type) { - case []*types.Log: - if len(e) > 0 { - for _, f := range filters[LogsSubscription] { - if matchedLogs := filterLogs(e, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { - f.logs <- matchedLogs - } - } +func (es *EventSystem) handlePendingLogs(filters filterIndex, ev []*types.Log) { + if len(ev) == 0 { + return + } + for _, f := range filters[PendingLogsSubscription] { + matchedLogs := filterLogs(ev, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics) + if len(matchedLogs) > 0 { + f.logs <- matchedLogs } - case core.RemovedLogsEvent: - for _, f := range filters[LogsSubscription] { - if matchedLogs := filterLogs(e.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { - f.logs <- matchedLogs - } + } +} + +func (es *EventSystem) handleRemovedLogs(filters filterIndex, ev core.RemovedLogsEvent) { + for _, f := range filters[LogsSubscription] { + matchedLogs := filterLogs(ev.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics) + if len(matchedLogs) > 0 { + f.logs <- matchedLogs } - case *event.TypeMuxEvent: - if muxe, ok := e.Data.(core.PendingLogsEvent); ok { - for _, f := range filters[PendingLogsSubscription] { - if e.Time.After(f.created) { - if matchedLogs := filterLogs(muxe.Logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { - f.logs <- matchedLogs - } + } +} + +func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent) { + hashes := make([]common.Hash, 0, len(ev.Txs)) + for _, tx := range ev.Txs { + hashes = append(hashes, tx.Hash()) + } + for _, f := range filters[PendingTransactionsSubscription] { + f.hashes <- hashes + } +} + +func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent) { + for _, f := range filters[BlocksSubscription] { + f.headers <- ev.Block.Header() + } + if es.lightMode && len(filters[LogsSubscription]) > 0 { + es.lightFilterNewHead(ev.Block.Header(), func(header *types.Header, remove bool) { + for _, f := range filters[LogsSubscription] { + if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 { + f.logs <- matchedLogs } } - } - case core.NewTxsEvent: - hashes := make([]common.Hash, 0, len(e.Txs)) - for _, tx := range e.Txs { - hashes = append(hashes, tx.Hash()) - } - for _, f := range filters[PendingTransactionsSubscription] { - f.hashes <- hashes - } - case core.ChainEvent: - for _, f := range filters[BlocksSubscription] { - f.headers <- e.Block.Header() - } - if es.lightMode && len(filters[LogsSubscription]) > 0 { - es.lightFilterNewHead(e.Block.Header(), func(header *types.Header, remove bool) { - for _, f := range filters[LogsSubscription] { - if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 { - f.logs <- matchedLogs - } - } - }) - } + }) } } @@ -446,10 +443,10 @@ func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common. func (es *EventSystem) eventLoop() { // Ensure all subscriptions get cleaned up defer func() { - es.pendingLogSub.Unsubscribe() es.txsSub.Unsubscribe() es.logsSub.Unsubscribe() es.rmLogsSub.Unsubscribe() + es.pendingLogsSub.Unsubscribe() es.chainSub.Unsubscribe() }() @@ -460,20 +457,16 @@ func (es *EventSystem) eventLoop() { for { select { - // Handle subscribed events case ev := <-es.txsCh: - es.broadcast(index, ev) + es.handleTxsEvent(index, ev) case ev := <-es.logsCh: - es.broadcast(index, ev) + es.handleLogs(index, ev) case ev := <-es.rmLogsCh: - es.broadcast(index, ev) + es.handleRemovedLogs(index, ev) + case ev := <-es.pendingLogsCh: + es.handlePendingLogs(index, ev) case ev := <-es.chainCh: - es.broadcast(index, ev) - case ev, active := <-es.pendingLogSub.Chan(): - if !active { // system stopped - return - } - es.broadcast(index, ev) + es.handleChainEvent(index, ev) case f := <-es.install: if f.typ == MinedAndPendingLogsSubscription { diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index fb4f7e7b8bc2..a96d85f85c66 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -39,23 +39,20 @@ import ( ) type testBackend struct { - mux *event.TypeMux - db ethdb.Database - sections uint64 - txFeed *event.Feed - rmLogsFeed *event.Feed - logsFeed *event.Feed - chainFeed *event.Feed + mux *event.TypeMux + db ethdb.Database + sections uint64 + txFeed event.Feed + logsFeed event.Feed + rmLogsFeed event.Feed + pendingLogsFeed event.Feed + chainFeed event.Feed } func (b *testBackend) ChainDb() ethdb.Database { return b.db } -func (b *testBackend) EventMux() *event.TypeMux { - return b.mux -} - func (b *testBackend) HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) { var hash common.Hash var num uint64 @@ -102,6 +99,10 @@ func (b *testBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscript return b.logsFeed.Subscribe(ch) } +func (b *testBackend) SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription { + return b.pendingLogsFeed.Subscribe(ch) +} + func (b *testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription { return b.chainFeed.Subscribe(ch) } @@ -146,13 +147,8 @@ func TestBlockSubscription(t *testing.T) { t.Parallel() var ( - mux = new(event.TypeMux) db = rawdb.NewMemoryDatabase() - txFeed = new(event.Feed) - rmLogsFeed = new(event.Feed) - logsFeed = new(event.Feed) - chainFeed = new(event.Feed) - backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed} + backend = &testBackend{db: db} api = NewPublicFilterAPI(backend, false) genesis = new(core.Genesis).MustCommit(db) chain, _ = core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 10, func(i int, gen *core.BlockGen) {}) @@ -191,7 +187,7 @@ func TestBlockSubscription(t *testing.T) { time.Sleep(1 * time.Second) for _, e := range chainEvents { - chainFeed.Send(e) + backend.chainFeed.Send(e) } <-sub0.Err() @@ -203,14 +199,9 @@ func TestPendingTxFilter(t *testing.T) { t.Parallel() var ( - mux = new(event.TypeMux) - db = rawdb.NewMemoryDatabase() - txFeed = new(event.Feed) - rmLogsFeed = new(event.Feed) - logsFeed = new(event.Feed) - chainFeed = new(event.Feed) - backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed} - api = NewPublicFilterAPI(backend, false) + db = rawdb.NewMemoryDatabase() + backend = &testBackend{db: db} + api = NewPublicFilterAPI(backend, false) transactions = []*types.Transaction{ types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil), @@ -226,7 +217,7 @@ func TestPendingTxFilter(t *testing.T) { fid0 := api.NewPendingTransactionFilter() time.Sleep(1 * time.Second) - txFeed.Send(core.NewTxsEvent{Txs: transactions}) + backend.txFeed.Send(core.NewTxsEvent{Txs: transactions}) timeout := time.Now().Add(1 * time.Second) for { @@ -263,14 +254,9 @@ func TestPendingTxFilter(t *testing.T) { // If not it must return an error. func TestLogFilterCreation(t *testing.T) { var ( - mux = new(event.TypeMux) - db = rawdb.NewMemoryDatabase() - txFeed = new(event.Feed) - rmLogsFeed = new(event.Feed) - logsFeed = new(event.Feed) - chainFeed = new(event.Feed) - backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed} - api = NewPublicFilterAPI(backend, false) + db = rawdb.NewMemoryDatabase() + backend = &testBackend{db: db} + api = NewPublicFilterAPI(backend, false) testCases = []struct { crit FilterCriteria @@ -312,14 +298,9 @@ func TestInvalidLogFilterCreation(t *testing.T) { t.Parallel() var ( - mux = new(event.TypeMux) - db = rawdb.NewMemoryDatabase() - txFeed = new(event.Feed) - rmLogsFeed = new(event.Feed) - logsFeed = new(event.Feed) - chainFeed = new(event.Feed) - backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed} - api = NewPublicFilterAPI(backend, false) + db = rawdb.NewMemoryDatabase() + backend = &testBackend{db: db} + api = NewPublicFilterAPI(backend, false) ) // different situations where log filter creation should fail. @@ -339,15 +320,10 @@ func TestInvalidLogFilterCreation(t *testing.T) { func TestInvalidGetLogsRequest(t *testing.T) { var ( - mux = new(event.TypeMux) - db = rawdb.NewMemoryDatabase() - txFeed = new(event.Feed) - rmLogsFeed = new(event.Feed) - logsFeed = new(event.Feed) - chainFeed = new(event.Feed) - backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed} - api = NewPublicFilterAPI(backend, false) - blockHash = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111") + db = rawdb.NewMemoryDatabase() + backend = &testBackend{db: db} + api = NewPublicFilterAPI(backend, false) + blockHash = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111") ) // Reason: Cannot specify both BlockHash and FromBlock/ToBlock) @@ -369,14 +345,9 @@ func TestLogFilter(t *testing.T) { t.Parallel() var ( - mux = new(event.TypeMux) - db = rawdb.NewMemoryDatabase() - txFeed = new(event.Feed) - rmLogsFeed = new(event.Feed) - logsFeed = new(event.Feed) - chainFeed = new(event.Feed) - backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed} - api = NewPublicFilterAPI(backend, false) + db = rawdb.NewMemoryDatabase() + backend = &testBackend{db: db} + api = NewPublicFilterAPI(backend, false) firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111") secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222") @@ -386,7 +357,7 @@ func TestLogFilter(t *testing.T) { secondTopic = common.HexToHash("0x2222222222222222222222222222222222222222222222222222222222222222") notUsedTopic = common.HexToHash("0x9999999999999999999999999999999999999999999999999999999999999999") - // posted twice, once as vm.Logs and once as core.PendingLogsEvent + // posted twice, once as regular logs and once as pending logs. allLogs = []*types.Log{ {Address: firstAddr}, {Address: firstAddr, Topics: []common.Hash{firstTopic}, BlockNumber: 1}, @@ -439,11 +410,11 @@ func TestLogFilter(t *testing.T) { // raise events time.Sleep(1 * time.Second) - if nsend := logsFeed.Send(allLogs); nsend == 0 { - t.Fatal("Shoud have at least one subscription") + if nsend := backend.logsFeed.Send(allLogs); nsend == 0 { + t.Fatal("Logs event not delivered") } - if err := mux.Post(core.PendingLogsEvent{Logs: allLogs}); err != nil { - t.Fatal(err) + if nsend := backend.pendingLogsFeed.Send(allLogs); nsend == 0 { + t.Fatal("Pending logs event not delivered") } for i, tt := range testCases { @@ -488,14 +459,9 @@ func TestPendingLogsSubscription(t *testing.T) { t.Parallel() var ( - mux = new(event.TypeMux) - db = rawdb.NewMemoryDatabase() - txFeed = new(event.Feed) - rmLogsFeed = new(event.Feed) - logsFeed = new(event.Feed) - chainFeed = new(event.Feed) - backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed} - api = NewPublicFilterAPI(backend, false) + db = rawdb.NewMemoryDatabase() + backend = &testBackend{db: db} + api = NewPublicFilterAPI(backend, false) firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111") secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222") @@ -507,26 +473,18 @@ func TestPendingLogsSubscription(t *testing.T) { fourthTopic = common.HexToHash("0x4444444444444444444444444444444444444444444444444444444444444444") notUsedTopic = common.HexToHash("0x9999999999999999999999999999999999999999999999999999999999999999") - allLogs = []core.PendingLogsEvent{ - {Logs: []*types.Log{{Address: firstAddr, Topics: []common.Hash{}, BlockNumber: 0}}}, - {Logs: []*types.Log{{Address: firstAddr, Topics: []common.Hash{firstTopic}, BlockNumber: 1}}}, - {Logs: []*types.Log{{Address: secondAddr, Topics: []common.Hash{firstTopic}, BlockNumber: 2}}}, - {Logs: []*types.Log{{Address: thirdAddress, Topics: []common.Hash{secondTopic}, BlockNumber: 3}}}, - {Logs: []*types.Log{{Address: thirdAddress, Topics: []common.Hash{secondTopic}, BlockNumber: 4}}}, - {Logs: []*types.Log{ + allLogs = [][]*types.Log{ + {{Address: firstAddr, Topics: []common.Hash{}, BlockNumber: 0}}, + {{Address: firstAddr, Topics: []common.Hash{firstTopic}, BlockNumber: 1}}, + {{Address: secondAddr, Topics: []common.Hash{firstTopic}, BlockNumber: 2}}, + {{Address: thirdAddress, Topics: []common.Hash{secondTopic}, BlockNumber: 3}}, + {{Address: thirdAddress, Topics: []common.Hash{secondTopic}, BlockNumber: 4}}, + { {Address: thirdAddress, Topics: []common.Hash{firstTopic}, BlockNumber: 5}, {Address: thirdAddress, Topics: []common.Hash{thirdTopic}, BlockNumber: 5}, {Address: thirdAddress, Topics: []common.Hash{fourthTopic}, BlockNumber: 5}, {Address: firstAddr, Topics: []common.Hash{firstTopic}, BlockNumber: 5}, - }}, - } - - convertLogs = func(pl []core.PendingLogsEvent) []*types.Log { - var logs []*types.Log - for _, l := range pl { - logs = append(logs, l.Logs...) - } - return logs + }, } testCases = []struct { @@ -536,21 +494,52 @@ func TestPendingLogsSubscription(t *testing.T) { sub *Subscription }{ // match all - {ethereum.FilterQuery{}, convertLogs(allLogs), nil, nil}, + { + ethereum.FilterQuery{}, flattenLogs(allLogs), + nil, nil, + }, // match none due to no matching addresses - {ethereum.FilterQuery{Addresses: []common.Address{{}, notUsedAddress}, Topics: [][]common.Hash{nil}}, []*types.Log{}, nil, nil}, + { + ethereum.FilterQuery{Addresses: []common.Address{{}, notUsedAddress}, Topics: [][]common.Hash{nil}}, + nil, + nil, nil, + }, // match logs based on addresses, ignore topics - {ethereum.FilterQuery{Addresses: []common.Address{firstAddr}}, append(convertLogs(allLogs[:2]), allLogs[5].Logs[3]), nil, nil}, + { + ethereum.FilterQuery{Addresses: []common.Address{firstAddr}}, + append(flattenLogs(allLogs[:2]), allLogs[5][3]), + nil, nil, + }, // match none due to no matching topics (match with address) - {ethereum.FilterQuery{Addresses: []common.Address{secondAddr}, Topics: [][]common.Hash{{notUsedTopic}}}, []*types.Log{}, nil, nil}, + { + ethereum.FilterQuery{Addresses: []common.Address{secondAddr}, Topics: [][]common.Hash{{notUsedTopic}}}, + nil, nil, nil, + }, // match logs based on addresses and topics - {ethereum.FilterQuery{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}}, append(convertLogs(allLogs[3:5]), allLogs[5].Logs[0]), nil, nil}, + { + ethereum.FilterQuery{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}}, + append(flattenLogs(allLogs[3:5]), allLogs[5][0]), + nil, nil, + }, // match logs based on multiple addresses and "or" topics - {ethereum.FilterQuery{Addresses: []common.Address{secondAddr, thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}}, append(convertLogs(allLogs[2:5]), allLogs[5].Logs[0]), nil, nil}, + { + ethereum.FilterQuery{Addresses: []common.Address{secondAddr, thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}}, + append(flattenLogs(allLogs[2:5]), allLogs[5][0]), + nil, + nil, + }, // block numbers are ignored for filters created with New***Filter, these return all logs that match the given criteria when the state changes - {ethereum.FilterQuery{Addresses: []common.Address{firstAddr}, FromBlock: big.NewInt(2), ToBlock: big.NewInt(3)}, append(convertLogs(allLogs[:2]), allLogs[5].Logs[3]), nil, nil}, + { + ethereum.FilterQuery{Addresses: []common.Address{firstAddr}, FromBlock: big.NewInt(2), ToBlock: big.NewInt(3)}, + append(flattenLogs(allLogs[:2]), allLogs[5][3]), + nil, nil, + }, // multiple pending logs, should match only 2 topics from the logs in block 5 - {ethereum.FilterQuery{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, fourthTopic}}}, []*types.Log{allLogs[5].Logs[0], allLogs[5].Logs[2]}, nil, nil}, + { + ethereum.FilterQuery{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, fourthTopic}}}, + []*types.Log{allLogs[5][0], allLogs[5][2]}, + nil, nil, + }, } ) @@ -593,10 +582,15 @@ func TestPendingLogsSubscription(t *testing.T) { // raise events time.Sleep(1 * time.Second) - // allLogs are type of core.PendingLogsEvent - for _, l := range allLogs { - if err := mux.Post(l); err != nil { - t.Fatal(err) - } + for _, ev := range allLogs { + backend.pendingLogsFeed.Send(ev) + } +} + +func flattenLogs(pl [][]*types.Log) []*types.Log { + var logs []*types.Log + for _, l := range pl { + logs = append(logs, l...) } + return logs } diff --git a/eth/filters/filter_test.go b/eth/filters/filter_test.go index 1b74d21df7e0..98de7db846b6 100644 --- a/eth/filters/filter_test.go +++ b/eth/filters/filter_test.go @@ -29,7 +29,6 @@ import ( "github.com/XinFinOrg/XDPoSChain/core" "github.com/XinFinOrg/XDPoSChain/core/types" "github.com/XinFinOrg/XDPoSChain/crypto" - "github.com/XinFinOrg/XDPoSChain/event" "github.com/XinFinOrg/XDPoSChain/params" ) @@ -50,18 +49,13 @@ func BenchmarkFilters(b *testing.B) { defer os.RemoveAll(dir) var ( - db, _ = rawdb.NewLevelDBDatabase(dir, 0, 0, "") - mux = new(event.TypeMux) - txFeed = new(event.Feed) - rmLogsFeed = new(event.Feed) - logsFeed = new(event.Feed) - chainFeed = new(event.Feed) - backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed} - key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") - addr1 = crypto.PubkeyToAddress(key1.PublicKey) - addr2 = common.BytesToAddress([]byte("jeff")) - addr3 = common.BytesToAddress([]byte("ethereum")) - addr4 = common.BytesToAddress([]byte("random addresses please")) + db, _ = rawdb.NewLevelDBDatabase(dir, 0, 0, "") + backend = &testBackend{db: db} + key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + addr1 = crypto.PubkeyToAddress(key1.PublicKey) + addr2 = common.BytesToAddress([]byte("jeff")) + addr3 = common.BytesToAddress([]byte("ethereum")) + addr4 = common.BytesToAddress([]byte("random addresses please")) ) defer db.Close() @@ -115,15 +109,10 @@ func TestFilters(t *testing.T) { defer os.RemoveAll(dir) var ( - db, _ = rawdb.NewLevelDBDatabase(dir, 0, 0, "") - mux = new(event.TypeMux) - txFeed = new(event.Feed) - rmLogsFeed = new(event.Feed) - logsFeed = new(event.Feed) - chainFeed = new(event.Feed) - backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed} - key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") - addr = crypto.PubkeyToAddress(key1.PublicKey) + db, _ = rawdb.NewLevelDBDatabase(dir, 0, 0, "") + backend = &testBackend{db: db} + key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + addr = crypto.PubkeyToAddress(key1.PublicKey) hash1 = common.BytesToHash([]byte("topic1")) hash2 = common.BytesToHash([]byte("topic2")) diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index d862977d2c5f..dec881847d97 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -49,9 +49,8 @@ type Backend interface { ProtocolVersion() int SuggestPrice(ctx context.Context) (*big.Int, error) ChainDb() ethdb.Database - EventMux() *event.TypeMux AccountManager() *accounts.Manager - RPCGasCap() uint64 // global gas cap for eth_call over rpc: DoS protection + RPCGasCap() uint64 // global gas cap for eth_call over rpc: DoS protection RPCTxFeeCap() float64 // global tx fee cap for all transaction related APIs XDCxService() *XDCx.XDCX LendingService() *XDCxlending.Lending @@ -88,6 +87,7 @@ type Backend interface { OrderTxPoolContent() (map[common.Address]types.OrderTransactions, map[common.Address]types.OrderTransactions) OrderStats() (pending int, queued int) SendLendingTx(ctx context.Context, signedTx *types.LendingTransaction) error + SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription ChainConfig() *params.ChainConfig CurrentBlock() *types.Block diff --git a/les/api_backend.go b/les/api_backend.go index 3e93efe9601f..110d5fb705b3 100644 --- a/les/api_backend.go +++ b/les/api_backend.go @@ -241,6 +241,13 @@ func (b *LesApiBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscri return b.eth.blockchain.SubscribeLogsEvent(ch) } +func (b *LesApiBackend) SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription { + return event.NewSubscription(func(quit <-chan struct{}) error { + <-quit + return nil + }) +} + func (b *LesApiBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription { return b.eth.blockchain.SubscribeRemovedLogsEvent(ch) } @@ -261,10 +268,6 @@ func (b *LesApiBackend) ChainDb() ethdb.Database { return b.eth.chainDb } -func (b *LesApiBackend) EventMux() *event.TypeMux { - return b.eth.eventMux -} - func (b *LesApiBackend) AccountManager() *accounts.Manager { return b.eth.accountManager } diff --git a/miner/miner.go b/miner/miner.go index 4a9d34b9f7fa..2d5a971cd17f 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -182,3 +182,9 @@ func (self *Miner) SetEtherbase(addr common.Address) { self.coinbase = addr self.worker.setEtherbase(addr) } + +// SubscribePendingLogs starts delivering logs from pending transactions +// to the given channel. +func (self *Miner) SubscribePendingLogs(ch chan<- []*types.Log) event.Subscription { + return self.worker.pendingLogsFeed.Subscribe(ch) +} diff --git a/miner/worker.go b/miner/worker.go index a97f55ceabbb..4b084b6c52c8 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -104,6 +104,9 @@ type worker struct { mu sync.Mutex + // Feeds + pendingLogsFeed event.Feed + // update loop mux *event.TypeMux txsCh chan core.NewTxsEvent @@ -322,7 +325,7 @@ func (self *worker) update() { } feeCapacity := state.GetTRC21FeeCapacityFromState(self.current.state) txset, specialTxs := types.NewTransactionsByPriceAndNonce(self.current.signer, txs, nil, feeCapacity) - self.current.commitTransactions(self.mux, feeCapacity, txset, specialTxs, self.chain, self.coinbase) + self.current.commitTransactions(self.mux, feeCapacity, txset, specialTxs, self.chain, self.coinbase, &self.pendingLogsFeed) self.currentMu.Unlock() } else { // If we're mining, but nothing is being processed, wake on new transactions @@ -781,7 +784,7 @@ func (self *worker) commitNewWork() { specialTxs = append(specialTxs, txStateRoot) } } - work.commitTransactions(self.mux, feeCapacity, txs, specialTxs, self.chain, self.coinbase) + work.commitTransactions(self.mux, feeCapacity, txs, specialTxs, self.chain, self.coinbase, &self.pendingLogsFeed) // compute uncles for the new block. var ( uncles []*types.Header @@ -801,7 +804,7 @@ func (self *worker) commitNewWork() { self.push(work) } -func (env *Work) commitTransactions(mux *event.TypeMux, balanceFee map[common.Address]*big.Int, txs *types.TransactionsByPriceAndNonce, specialTxs types.Transactions, bc *core.BlockChain, coinbase common.Address) { +func (env *Work) commitTransactions(mux *event.TypeMux, balanceFee map[common.Address]*big.Int, txs *types.TransactionsByPriceAndNonce, specialTxs types.Transactions, bc *core.BlockChain, coinbase common.Address, pendingLogsFeed *event.Feed) { gp := new(core.GasPool).AddGas(env.header.GasLimit) balanceUpdated := map[common.Address]*big.Int{} totalFeeUsed := big.NewInt(0) @@ -1028,29 +1031,25 @@ func (env *Work) commitTransactions(mux *event.TypeMux, balanceFee map[common.Ad } } state.UpdateTRC21Fee(env.state, balanceUpdated, totalFeeUsed) - if len(coalescedLogs) > 0 || env.tcount > 0 { - // make a copy, the state caches the logs and these logs get "upgraded" from pending to mined - // logs by filling in the block hash when the block was mined by the local miner. This can - // cause a race condition if a log was "upgraded" before the PendingLogsEvent is processed. + // make a copy, the state caches the logs and these logs get "upgraded" from pending to mined + // logs by filling in the block hash when the block was mined by the local miner. This can + // cause a race condition if a log was "upgraded" before the PendingLogsEvent is processed. + if len(coalescedLogs) > 0 { cpy := make([]*types.Log, len(coalescedLogs)) for i, l := range coalescedLogs { cpy[i] = new(types.Log) *cpy[i] = *l } - go func(logs []*types.Log, tcount int) { - if len(logs) > 0 { - err := mux.Post(core.PendingLogsEvent{Logs: logs}) - if err != nil { - log.Warn("[commitTransactions] Error when sending PendingLogsEvent", "LogLength", len(logs)) - } - } - if tcount > 0 { - err := mux.Post(core.PendingStateEvent{}) - if err != nil { - log.Warn("[commitTransactions] Error when sending PendingStateEvent", "tcount", tcount) - } + pendingLogsFeed.Send(cpy) + } + if env.tcount > 0 { + go func(tcount int) { + err := mux.Post(core.PendingStateEvent{}) + if err != nil { + log.Warn("[commitTransactions] Error when sending PendingStateEvent", "tcount", tcount) } - }(cpy, env.tcount) + }(env.tcount) + } }