Skip to content

Commit

Permalink
eth/filters: remove use of event.TypeMux for pending logs (#20312)
Browse files Browse the repository at this point in the history
  • Loading branch information
fjl authored Dec 10, 2019
1 parent b8bc9b3 commit d90d1db
Show file tree
Hide file tree
Showing 13 changed files with 230 additions and 237 deletions.
22 changes: 17 additions & 5 deletions accounts/abi/bind/backends/simulated.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func NewSimulatedBackendWithDatabase(database ethdb.Database, alloc core.Genesis
database: database,
blockchain: blockchain,
config: genesis.Config,
events: filters.NewEventSystem(new(event.TypeMux), &filterBackend{database, blockchain}, false),
events: filters.NewEventSystem(&filterBackend{database, blockchain}, false),
}
backend.rollback()
return backend
Expand Down Expand Up @@ -502,22 +502,34 @@ func (fb *filterBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*ty
}

func (fb *filterBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
return event.NewSubscription(func(quit <-chan struct{}) error {
<-quit
return nil
})
return nullSubscription()
}

func (fb *filterBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
return fb.bc.SubscribeChainEvent(ch)
}

func (fb *filterBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription {
return fb.bc.SubscribeRemovedLogsEvent(ch)
}

func (fb *filterBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
return fb.bc.SubscribeLogsEvent(ch)
}

func (fb *filterBackend) SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription {
return nullSubscription()
}

func (fb *filterBackend) BloomStatus() (uint64, uint64) { return 4096, 0 }

func (fb *filterBackend) ServiceFilter(ctx context.Context, ms *bloombits.MatcherSession) {
panic("not supported")
}

func nullSubscription() event.Subscription {
return event.NewSubscription(func(quit <-chan struct{}) error {
<-quit
return nil
})
}
5 changes: 0 additions & 5 deletions core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,6 @@ import (
// NewTxsEvent is posted when a batch of transactions enter the transaction pool.
type NewTxsEvent struct{ Txs []*types.Transaction }

// PendingLogsEvent is posted pre mining and notifies of pending logs.
type PendingLogsEvent struct {
Logs []*types.Log
}

// NewMinedBlockEvent is posted when a block has been imported.
type NewMinedBlockEvent struct{ Block *types.Block }

Expand Down
4 changes: 4 additions & 0 deletions eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,10 @@ func (b *EthAPIBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEven
return b.eth.BlockChain().SubscribeRemovedLogsEvent(ch)
}

func (b *EthAPIBackend) SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription {
return b.eth.miner.SubscribePendingLogs(ch)
}

func (b *EthAPIBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
return b.eth.BlockChain().SubscribeChainEvent(ch)
}
Expand Down
5 changes: 2 additions & 3 deletions eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,8 @@ type PublicFilterAPI struct {
func NewPublicFilterAPI(backend Backend, lightMode bool) *PublicFilterAPI {
api := &PublicFilterAPI{
backend: backend,
mux: backend.EventMux(),
chainDb: backend.ChainDb(),
events: NewEventSystem(backend.EventMux(), backend, lightMode),
events: NewEventSystem(backend, lightMode),
filters: make(map[rpc.ID]*filter),
}
go api.timeoutLoop()
Expand Down Expand Up @@ -428,7 +427,7 @@ func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
hashes := f.hashes
f.hashes = nil
return returnHashes(hashes), nil
case LogsSubscription:
case LogsSubscription, MinedAndPendingLogsSubscription:
logs := f.logs
f.logs = nil
return returnLogs(logs), nil
Expand Down
7 changes: 2 additions & 5 deletions eth/filters/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/node"
)

Expand Down Expand Up @@ -122,14 +121,13 @@ func benchmarkBloomBits(b *testing.B, sectionSize uint64) {

b.Log("Running filter benchmarks...")
start = time.Now()
mux := new(event.TypeMux)
var backend *testBackend

for i := 0; i < benchFilterCnt; i++ {
if i%20 == 0 {
db.Close()
db, _ = rawdb.NewLevelDBDatabase(benchDataDir, 128, 1024, "")
backend = &testBackend{mux, db, cnt, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)}
backend = &testBackend{db: db, sections: cnt}
}
var addr common.Address
addr[0] = byte(i)
Expand Down Expand Up @@ -173,8 +171,7 @@ func BenchmarkNoBloomBits(b *testing.B) {

b.Log("Running filter benchmarks...")
start := time.Now()
mux := new(event.TypeMux)
backend := &testBackend{mux, db, 0, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)}
backend := &testBackend{db: db}
filter := NewRangeFilter(backend, 0, int64(*headNum), []common.Address{{}}, nil)
filter.Logs(context.Background())
d := time.Since(start)
Expand Down
2 changes: 1 addition & 1 deletion eth/filters/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (

type Backend interface {
ChainDb() ethdb.Database
EventMux() *event.TypeMux
HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error)
HeaderByHash(ctx context.Context, blockHash common.Hash) (*types.Header, error)
GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)
Expand All @@ -42,6 +41,7 @@ type Backend interface {
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription

BloomStatus() (uint64, uint64)
ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
Expand Down
167 changes: 79 additions & 88 deletions eth/filters/filter_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package filters

import (
"context"
"errors"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -58,7 +57,6 @@ const (
)

const (

// txChanSize is the size of channel listening to NewTxsEvent.
// The number is referenced from the size of tx pool.
txChanSize = 4096
Expand All @@ -70,10 +68,6 @@ const (
chainEvChanSize = 10
)

var (
ErrInvalidSubscriptionID = errors.New("invalid id")
)

type subscription struct {
id rpc.ID
typ Type
Expand All @@ -89,25 +83,25 @@ type subscription struct {
// EventSystem creates subscriptions, processes events and broadcasts them to the
// subscription which match the subscription criteria.
type EventSystem struct {
mux *event.TypeMux
backend Backend
lightMode bool
lastHead *types.Header

// Subscriptions
txsSub event.Subscription // Subscription for new transaction event
logsSub event.Subscription // Subscription for new log event
rmLogsSub event.Subscription // Subscription for removed log event
chainSub event.Subscription // Subscription for new chain event
pendingLogSub *event.TypeMuxSubscription // Subscription for pending log event
txsSub event.Subscription // Subscription for new transaction event
logsSub event.Subscription // Subscription for new log event
rmLogsSub event.Subscription // Subscription for removed log event
pendingLogsSub event.Subscription // Subscription for pending log event
chainSub event.Subscription // Subscription for new chain event

// Channels
install chan *subscription // install filter for event notification
uninstall chan *subscription // remove filter for event notification
txsCh chan core.NewTxsEvent // Channel to receive new transactions event
logsCh chan []*types.Log // Channel to receive new log event
rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
chainCh chan core.ChainEvent // Channel to receive new chain event
install chan *subscription // install filter for event notification
uninstall chan *subscription // remove filter for event notification
txsCh chan core.NewTxsEvent // Channel to receive new transactions event
logsCh chan []*types.Log // Channel to receive new log event
pendingLogsCh chan []*types.Log // Channel to receive new log event
rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
chainCh chan core.ChainEvent // Channel to receive new chain event
}

// NewEventSystem creates a new manager that listens for event on the given mux,
Expand All @@ -116,30 +110,28 @@ type EventSystem struct {
//
// The returned manager has a loop that needs to be stopped with the Stop function
// or by stopping the given mux.
func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventSystem {
func NewEventSystem(backend Backend, lightMode bool) *EventSystem {
m := &EventSystem{
mux: mux,
backend: backend,
lightMode: lightMode,
install: make(chan *subscription),
uninstall: make(chan *subscription),
txsCh: make(chan core.NewTxsEvent, txChanSize),
logsCh: make(chan []*types.Log, logsChanSize),
rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize),
chainCh: make(chan core.ChainEvent, chainEvChanSize),
backend: backend,
lightMode: lightMode,
install: make(chan *subscription),
uninstall: make(chan *subscription),
txsCh: make(chan core.NewTxsEvent, txChanSize),
logsCh: make(chan []*types.Log, logsChanSize),
rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize),
pendingLogsCh: make(chan []*types.Log, logsChanSize),
chainCh: make(chan core.ChainEvent, chainEvChanSize),
}

// Subscribe events
m.txsSub = m.backend.SubscribeNewTxsEvent(m.txsCh)
m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh)
m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh)
m.chainSub = m.backend.SubscribeChainEvent(m.chainCh)
// TODO(rjl493456442): use feed to subscribe pending log event
m.pendingLogSub = m.mux.Subscribe(core.PendingLogsEvent{})
m.pendingLogsSub = m.backend.SubscribePendingLogsEvent(m.pendingLogsCh)

// Make sure none of the subscriptions are empty
if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil ||
m.pendingLogSub.Closed() {
if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.pendingLogsSub == nil {
log.Crit("Subscribe for event system failed")
}

Expand Down Expand Up @@ -316,58 +308,61 @@ func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscript

type filterIndex map[Type]map[rpc.ID]*subscription

// broadcast event to filters that match criteria.
func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) {
if ev == nil {
func (es *EventSystem) handleLogs(filters filterIndex, ev []*types.Log) {
if len(ev) == 0 {
return
}
for _, f := range filters[LogsSubscription] {
matchedLogs := filterLogs(ev, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics)
if len(matchedLogs) > 0 {
f.logs <- matchedLogs
}
}
}

switch e := ev.(type) {
case []*types.Log:
if len(e) > 0 {
for _, f := range filters[LogsSubscription] {
if matchedLogs := filterLogs(e, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
f.logs <- matchedLogs
}
}
func (es *EventSystem) handlePendingLogs(filters filterIndex, ev []*types.Log) {
if len(ev) == 0 {
return
}
for _, f := range filters[PendingLogsSubscription] {
matchedLogs := filterLogs(ev, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics)
if len(matchedLogs) > 0 {
f.logs <- matchedLogs
}
case core.RemovedLogsEvent:
for _, f := range filters[LogsSubscription] {
if matchedLogs := filterLogs(e.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
f.logs <- matchedLogs
}
}
}

func (es *EventSystem) handleRemovedLogs(filters filterIndex, ev core.RemovedLogsEvent) {
for _, f := range filters[LogsSubscription] {
matchedLogs := filterLogs(ev.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics)
if len(matchedLogs) > 0 {
f.logs <- matchedLogs
}
case *event.TypeMuxEvent:
if muxe, ok := e.Data.(core.PendingLogsEvent); ok {
for _, f := range filters[PendingLogsSubscription] {
if e.Time.After(f.created) {
if matchedLogs := filterLogs(muxe.Logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
f.logs <- matchedLogs
}
}
}

func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent) {
hashes := make([]common.Hash, 0, len(ev.Txs))
for _, tx := range ev.Txs {
hashes = append(hashes, tx.Hash())
}
for _, f := range filters[PendingTransactionsSubscription] {
f.hashes <- hashes
}
}

func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent) {
for _, f := range filters[BlocksSubscription] {
f.headers <- ev.Block.Header()
}
if es.lightMode && len(filters[LogsSubscription]) > 0 {
es.lightFilterNewHead(ev.Block.Header(), func(header *types.Header, remove bool) {
for _, f := range filters[LogsSubscription] {
if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 {
f.logs <- matchedLogs
}
}
}
case core.NewTxsEvent:
hashes := make([]common.Hash, 0, len(e.Txs))
for _, tx := range e.Txs {
hashes = append(hashes, tx.Hash())
}
for _, f := range filters[PendingTransactionsSubscription] {
f.hashes <- hashes
}
case core.ChainEvent:
for _, f := range filters[BlocksSubscription] {
f.headers <- e.Block.Header()
}
if es.lightMode && len(filters[LogsSubscription]) > 0 {
es.lightFilterNewHead(e.Block.Header(), func(header *types.Header, remove bool) {
for _, f := range filters[LogsSubscription] {
if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 {
f.logs <- matchedLogs
}
}
})
}
})
}
}

Expand Down Expand Up @@ -448,10 +443,10 @@ func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.
func (es *EventSystem) eventLoop() {
// Ensure all subscriptions get cleaned up
defer func() {
es.pendingLogSub.Unsubscribe()
es.txsSub.Unsubscribe()
es.logsSub.Unsubscribe()
es.rmLogsSub.Unsubscribe()
es.pendingLogsSub.Unsubscribe()
es.chainSub.Unsubscribe()
}()

Expand All @@ -462,20 +457,16 @@ func (es *EventSystem) eventLoop() {

for {
select {
// Handle subscribed events
case ev := <-es.txsCh:
es.broadcast(index, ev)
es.handleTxsEvent(index, ev)
case ev := <-es.logsCh:
es.broadcast(index, ev)
es.handleLogs(index, ev)
case ev := <-es.rmLogsCh:
es.broadcast(index, ev)
es.handleRemovedLogs(index, ev)
case ev := <-es.pendingLogsCh:
es.handlePendingLogs(index, ev)
case ev := <-es.chainCh:
es.broadcast(index, ev)
case ev, active := <-es.pendingLogSub.Chan():
if !active { // system stopped
return
}
es.broadcast(index, ev)
es.handleChainEvent(index, ev)

case f := <-es.install:
if f.typ == MinedAndPendingLogsSubscription {
Expand Down
Loading

0 comments on commit d90d1db

Please sign in to comment.