Skip to content

Commit

Permalink
feat/rpcadaemon_logs_sub (#3751)
Browse files Browse the repository at this point in the history
  • Loading branch information
primalcs authored and Alex Sharp committed Mar 26, 2022
1 parent 64a07f0 commit 893a98c
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 0 deletions.
35 changes: 35 additions & 0 deletions cmd/rpcdaemon/commands/eth_filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,38 @@ func (api *APIImpl) NewPendingTransactions(ctx context.Context) (*rpc.Subscripti

return rpcSub, nil
}

// SubscribeLogs send a notification each time a new log appears.
func (api *APIImpl) SubscribeLogs(ctx context.Context) (*rpc.Subscription, error) {
if api.filters == nil {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}

rpcSub := notifier.CreateSubscription()

go func() {
defer debug.LogPanic()
logs := make(chan *types.Log, 1)
defer close(logs)
id := api.filters.SubscribeLogs(logs)
defer api.filters.UnsubscribeLogs(id)

for {
select {
case h := <-logs:
err := notifier.Notify(rpcSub.ID, h)
if err != nil {
log.Warn("error while notifying subscription", "err", err)
}
case <-rpcSub.Err():
return
}
}
}()

return rpcSub, nil
}
69 changes: 69 additions & 0 deletions cmd/rpcdaemon/filters/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ import (
"sync"
"time"

"github.com/ledgerwatch/erigon-lib/gointerfaces"
"github.com/ledgerwatch/erigon/common"

"github.com/ledgerwatch/erigon-lib/gointerfaces/grpcutil"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
"github.com/ledgerwatch/erigon-lib/gointerfaces/txpool"
"github.com/ledgerwatch/erigon/cmd/rpcdaemon/services"
Expand All @@ -28,6 +32,7 @@ type (
PendingLogsSubID SubscriptionID
PendingBlockSubID SubscriptionID
PendingTxsSubID SubscriptionID
LogsSubID SubscriptionID
)

type Filters struct {
Expand All @@ -39,6 +44,7 @@ type Filters struct {
pendingLogsSubs map[PendingLogsSubID]chan types.Logs
pendingBlockSubs map[PendingBlockSubID]chan *types.Block
pendingTxsSubs map[PendingTxsSubID]chan []types.Transaction
logsSubs map[LogsSubID]chan *types.Log
}

func New(ctx context.Context, ethBackend services.ApiBackend, txPool txpool.TxpoolClient, mining txpool.MiningClient) *Filters {
Expand All @@ -49,6 +55,7 @@ func New(ctx context.Context, ethBackend services.ApiBackend, txPool txpool.Txpo
pendingTxsSubs: make(map[PendingTxsSubID]chan []types.Transaction),
pendingLogsSubs: make(map[PendingLogsSubID]chan types.Logs),
pendingBlockSubs: make(map[PendingBlockSubID]chan *types.Block),
logsSubs: make(map[LogsSubID]chan *types.Log),
}

go func() {
Expand Down Expand Up @@ -83,6 +90,31 @@ func New(ctx context.Context, ethBackend services.ApiBackend, txPool txpool.Txpo
}
}()

go func() {
if ethBackend == nil {
return
}
for {
select {
case <-ctx.Done():
return
default:
}
if err := ethBackend.SubscribeLogs(ctx, ff.OnNewLogs); err != nil {
select {
case <-ctx.Done():
return
default:
}
if grpcutil.IsEndOfStream(err) || grpcutil.IsRetryLater(err) {
time.Sleep(3 * time.Second)
continue
}
log.Warn("rpc filters: error subscribing to logs", "err", err)
}
}
}()

if txPool != nil {
go func() {
for {
Expand Down Expand Up @@ -337,6 +369,20 @@ func (ff *Filters) UnsubscribePendingTxs(id PendingTxsSubID) {
delete(ff.pendingTxsSubs, id)
}

func (ff *Filters) SubscribeLogs(out chan *types.Log) LogsSubID {
ff.mu.Lock()
defer ff.mu.Unlock()
id := LogsSubID(generateSubscriptionID())
ff.logsSubs[id] = out
return id
}

func (ff *Filters) UnsubscribeLogs(id LogsSubID) {
ff.mu.Lock()
defer ff.mu.Unlock()
delete(ff.logsSubs, id)
}

func (ff *Filters) OnNewEvent(event *remote.SubscribeReply) {
ff.mu.RLock()
defer ff.mu.RUnlock()
Expand Down Expand Up @@ -411,6 +457,29 @@ func (ff *Filters) OnNewTx(reply *txpool.OnAddReply) {
}
}

func (ff *Filters) OnNewLogs(reply *remote.SubscribeLogsReply) {
lg := &types.Log{
Address: gointerfaces.ConvertH160toAddress(reply.Address),
Data: reply.Data,
BlockNumber: reply.BlockNumber,
TxHash: gointerfaces.ConvertH256ToHash(reply.TransactionHash),
TxIndex: uint(reply.TransactionIndex),
BlockHash: gointerfaces.ConvertH256ToHash(reply.BlockHash),
Index: uint(reply.LogIndex),
Removed: reply.Removed,
}
t := make([]common.Hash, 0)
for _, v := range reply.Topics {
t = append(t, gointerfaces.ConvertH256ToHash(v))
}
lg.Topics = t
ff.mu.RLock()
defer ff.mu.RUnlock()
for _, v := range ff.logsSubs {
v <- lg
}
}

func generateSubscriptionID() SubscriptionID {
var id [32]byte

Expand Down
23 changes: 23 additions & 0 deletions cmd/rpcdaemon/services/eth_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type ApiBackend interface {
ProtocolVersion(ctx context.Context) (uint64, error)
ClientVersion(ctx context.Context) (string, error)
Subscribe(ctx context.Context, cb func(*remote.SubscribeReply)) error
SubscribeLogs(ctx context.Context, cb func(*remote.SubscribeLogsReply)) error
NodeInfo(ctx context.Context, limit uint32) ([]p2p.NodeInfo, error)
}

Expand Down Expand Up @@ -145,6 +146,28 @@ func (back *RemoteBackend) Subscribe(ctx context.Context, onNewEvent func(*remot
return nil
}

func (back *RemoteBackend) SubscribeLogs(ctx context.Context, onNewLogs func(reply *remote.SubscribeLogsReply)) error {
subscription, err := back.remoteEthBackend.SubscribeLogs(ctx, grpc.WaitForReady(true))
if err != nil {
if s, ok := status.FromError(err); ok {
return errors.New(s.Message())
}
return err
}
for {
logs, err := subscription.Recv()
if errors.Is(err, io.EOF) {
log.Info("rpcdaemon: the logs subscription channel was closed")
break
}
if err != nil {
return err
}
onNewLogs(logs)
}
return nil
}

func (back *RemoteBackend) NodeInfo(ctx context.Context, limit uint32) ([]p2p.NodeInfo, error) {
nodes, err := back.remoteEthBackend.NodeInfo(ctx, &remote.NodesInfoRequest{Limit: limit})
if err != nil {
Expand Down

0 comments on commit 893a98c

Please sign in to comment.