Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce number of ETH RPC calls during block polling #2775

Merged
merged 15 commits into from
Mar 23, 2023
1 change: 1 addition & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#### General
- \#2758 Accept only active Os to receive traffic and redeem tickets (@leszko)
- \#2775 Reduce number of ETH RPC calls during block polling (@leszko)

#### Broadcaster

Expand Down
2 changes: 1 addition & 1 deletion cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -930,7 +930,7 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
// Backfill events that the node has missed since its last seen block. This method will block
// and the node will not continue setup until it finishes
glog.Infof("Backfilling block events (this can take a while)...\n")
if err := blockWatcher.BackfillEvents(blockWatchCtx); err != nil {
if err := blockWatcher.BackfillEvents(blockWatchCtx, nil); err != nil {
glog.Errorf("Failed to backfill events: %v", err)
return
}
Expand Down
181 changes: 34 additions & 147 deletions eth/blockwatch/block_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
// a hard limit of 10,000 logs returned by a single `eth_getLogs` query by Infura's Ethereum nodes so
// we need to try and stay below it. Parity, Geth and Alchemy all have much higher limits (if any) on
// the number of logs returned so Infura is by far the limiting factor.
var maxBlocksInGetLogsQuery = 60
var maxBlocksInGetLogsQuery = 1000

// EventType describes the types of events emitted by blockwatch.Watcher. A block can be discovered
// and added to our representation of the chain. During a block re-org, a block previously stored
Expand Down Expand Up @@ -90,8 +90,8 @@ func New(config Config) *Watcher {
// Note that the latest block is never backfilled here from logs. It will be polled separately in syncToLatestBlock().
// The reason for that is that we always need to propagate events from the latest block even if it does not contain
// events which are filtered out during the backfilling process.
func (w *Watcher) BackfillEvents(ctx context.Context) error {
events, err := w.getMissedEventsToBackfill(ctx)
func (w *Watcher) BackfillEvents(ctx context.Context, chainHead *MiniHeader) error {
var events, err = w.getMissedEventsToBackfill(ctx, chainHead)
if err != nil {
return err
}
Expand Down Expand Up @@ -153,146 +153,12 @@ func (w *Watcher) syncToLatestBlock(ctx context.Context) error {
w.Lock()
defer w.Unlock()

if err := w.BackfillEvents(ctx); err != nil {
return err
}

newestHeader, err := w.client.HeaderByNumber(nil)
if err != nil {
return err
}

lastSeenHeader, err := w.stack.Peek()
if err != nil {
return err
}

if lastSeenHeader == nil {
return w.pollNextBlock()
}

for i := lastSeenHeader.Number; i.Cmp(newestHeader.Number) < 0; i = i.Add(i, big.NewInt(1)) {
if err := w.pollNextBlock(); err != nil {
return err
}
}
return nil
}

// pollNextBlock polls for the next block header to be added to the block stack.
// If there are no blocks on the stack, it fetches the first block at the specified
// `startBlockDepth` supplied at instantiation.
func (w *Watcher) pollNextBlock() error {
var nextBlockNumber *big.Int
latestHeader, err := w.stack.Peek()
if err != nil {
return err
}
if latestHeader == nil {
if w.startBlockDepth == rpc.LatestBlockNumber {
nextBlockNumber = nil // Fetch latest block
} else {
nextBlockNumber = big.NewInt(int64(w.startBlockDepth))
}
} else {
nextBlockNumber = big.NewInt(0).Add(latestHeader.Number, big.NewInt(1))
}
nextHeader, err := w.client.HeaderByNumber(nextBlockNumber)
if err != nil {
if err == ethereum.NotFound {
return nil // Noop and wait next polling interval
}
return err
}

events := []*Event{}
events, err = w.buildCanonicalChain(nextHeader, events)
// Even if an error occurred, we still want to emit the events gathered since we might have
// popped blocks off the Stack and they won't be re-added
if len(events) != 0 {
w.blockFeed.Send(w.enrichWithL1(events))
}
if err != nil {
return err
}
return nil
}

func (w *Watcher) buildCanonicalChain(nextHeader *MiniHeader, events []*Event) ([]*Event, error) {
latestHeader, err := w.stack.Peek()
if err != nil {
return nil, err
}
// Is the stack empty or is it the next block?
if latestHeader == nil || nextHeader.Parent == latestHeader.Hash {
nextHeader, err := w.addLogs(nextHeader)
if err != nil {
// Due to block re-orgs & Ethereum node services load-balancing requests across multiple nodes
// a block header might be returned, but when fetching it's logs, an "unknown block" error is
// returned. This is expected to happen sometimes, and we simply return the events gathered so
// far and pick back up where we left off on the next polling interval.
if isUnknownBlockErr(err) {
glog.V(5).Infof("missing logs for blockNumber=%v - fetching on next polling interval", nextHeader.Number)
return events, nil
}
return events, err
}
err = w.stack.Push(nextHeader)
if err != nil {
return events, err
}
events = append(events, &Event{
Type: Added,
BlockHeader: nextHeader,
})
return events, nil
}

// Pop latestHeader from the stack. We already have a reference to it.
if _, err := w.stack.Pop(); err != nil {
return events, err
}
events = append(events, &Event{
Type: Removed,
BlockHeader: latestHeader,
})

nextParentHeader, err := w.client.HeaderByHash(nextHeader.Parent)
if err != nil {
if err == ethereum.NotFound {
glog.V(5).Infof("block header not found blockHash=%v - fetching on next polling interval", nextHeader.Parent.Hex())
// Noop and wait next polling interval. We remove the popped blocks
// and refetch them on the next polling interval.
return events, nil
}
return events, err
}
events, err = w.buildCanonicalChain(nextParentHeader, events)
if err != nil {
return events, err
}
nextHeader, err = w.addLogs(nextHeader)
if err != nil {
// Due to block re-orgs & Ethereum node services load-balancing requests across multiple nodes
// a block header might be returned, but when fetching it's logs, an "unknown block" error is
// returned. This is expected to happen sometimes, and we simply return the events gathered so
// far and pick back up where we left off on the next polling interval.
if isUnknownBlockErr(err) {
glog.V(5).Infof("missing logs for blockNumber=%v - fetching on next polling interval", nextHeader.Number)
return events, nil
}
return events, err
}
err = w.stack.Push(nextHeader)
if err != nil {
return events, err
}
events = append(events, &Event{
Type: Added,
BlockHeader: nextHeader,
})

return events, nil
return w.BackfillEvents(ctx, newestHeader)
}

func (w *Watcher) addLogs(header *MiniHeader) (*MiniHeader, error) {
Expand All @@ -315,7 +181,7 @@ func (w *Watcher) addLogs(header *MiniHeader) (*MiniHeader, error) {
// If the stored block is older than the latest block, it batch-fetches the events for missing blocks,
// re-sets the stored blocks and returns the block events found.
// Note that the latest block is never backfilled, and will be polled separately during in syncToLatestBlock().
func (w *Watcher) getMissedEventsToBackfill(ctx context.Context) ([]*Event, error) {
func (w *Watcher) getMissedEventsToBackfill(ctx context.Context, chainHead *MiniHeader) ([]*Event, error) {
events := []*Event{}

var (
Expand All @@ -329,27 +195,38 @@ func (w *Watcher) getMissedEventsToBackfill(ctx context.Context) ([]*Event, erro
return events, err
}

latestBlock, err := w.client.HeaderByNumber(nil)
if err != nil {
return events, err
if chainHead == nil {
var err error
chainHead, err = w.client.HeaderByNumber(nil)
if err != nil {
return events, err
}
}

// Latest block will be polled separately in syncToLatestBlock(), so it's not backfilled.
preLatestBlockNum := int(latestBlock.Number.Int64()) - 1
latestBlockNum := int(chainHead.Number.Int64())

if latestRetainedBlock != nil {
latestRetainedBlockNum = int(latestRetainedBlock.Number.Int64())
// Events for latestRetainedBlock already processed, start at latestRetainedBlock + 1
startBlockNum = latestRetainedBlockNum + 1
} else {
err = w.stack.Push(chainHead)
if err != nil {
return events, err
}

events = append(events, &Event{
Type: Added,
BlockHeader: chainHead,
})
return events, nil
}

if blocksElapsed = preLatestBlockNum - startBlockNum; blocksElapsed <= 0 {
if blocksElapsed = latestBlockNum - startBlockNum; blocksElapsed <= 0 {
return events, nil
}

logs, furthestBlockProcessed := w.getLogsInBlockRange(ctx, startBlockNum, preLatestBlockNum)
logs, furthestBlockProcessed := w.getLogsInBlockRange(ctx, startBlockNum, latestBlockNum)
if furthestBlockProcessed > latestRetainedBlockNum {
// If we have processed blocks further then the latestRetainedBlock in the DB, we
// want to remove all blocks from the DB and insert the furthestBlockProcessed
Expand All @@ -365,7 +242,12 @@ func (w *Watcher) getMissedEventsToBackfill(ctx context.Context) ([]*Event, erro
}
}
// Add furthest block processed into the DB
latestHeader, err := w.client.HeaderByNumber(big.NewInt(int64(furthestBlockProcessed)))
var latestHeader *MiniHeader
if chainHead.Number.Int64() == int64(furthestBlockProcessed) {
latestHeader = chainHead
} else {
latestHeader, err = w.client.HeaderByNumber(big.NewInt(int64(furthestBlockProcessed)))
}
if err != nil {
return events, err
}
Expand All @@ -374,6 +256,11 @@ func (w *Watcher) getMissedEventsToBackfill(ctx context.Context) ([]*Event, erro
return events, err
}

events = append(events, &Event{
Type: Added,
BlockHeader: latestHeader,
})

// If no logs found, noop
if len(logs) == 0 {
return events, nil
Expand Down
17 changes: 7 additions & 10 deletions eth/blockwatch/block_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ func TestWatcher(t *testing.T) {
for i := 0; i < fakeClient.NumberOfTimesteps(); i++ {
scenarioLabel := fakeClient.GetScenarioLabel()

err := watcher.pollNextBlock()
require.NoError(t, err)

retainedBlocks, err := watcher.InspectRetainedBlocks()
require.NoError(t, err)
expectedRetainedBlocks := fakeClient.ExpectedRetainedBlocks()
Expand Down Expand Up @@ -204,15 +201,15 @@ func TestGetMissedEventsToBackfillSomeMissed(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
events, err := watcher.getMissedEventsToBackfill(ctx)
events, err := watcher.getMissedEventsToBackfill(ctx, nil)
require.NoError(t, err)
assert.Len(t, events, 1)
assert.Len(t, events, 2)

// Check that block 30 is now in the DB, and block 5 was removed.
headers, err := store.FindAllMiniHeadersSortedByNumber()
require.NoError(t, err)
require.Len(t, headers, 1)
assert.Equal(t, big.NewInt(29), headers[0].Number)
assert.Equal(t, big.NewInt(30), headers[0].Number)
}

func TestGetMissedEventsToBackfillNoneMissed(t *testing.T) {
Expand All @@ -236,7 +233,7 @@ func TestGetMissedEventsToBackfillNoneMissed(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
events, err := watcher.getMissedEventsToBackfill(ctx)
events, err := watcher.getMissedEventsToBackfill(ctx, nil)
require.NoError(t, err)
assert.Len(t, events, 0)

Expand All @@ -262,13 +259,13 @@ func TestGetMissedEventsToBackfill_NOOP(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
events, err := watcher.getMissedEventsToBackfill(ctx)
events, err := watcher.getMissedEventsToBackfill(ctx, nil)
require.NoError(t, err)
assert.Len(t, events, 0)
assert.Len(t, events, 1)

headers, err := store.FindAllMiniHeadersSortedByNumber()
require.NoError(t, err)
require.Len(t, headers, 0)
require.Len(t, headers, 1)
}

var logStub = types.Log{
Expand Down