Skip to content

Commit

Permalink
Fixed CPU usage issues caused by inefficiencies in HeadTracker (#13230)
Browse files Browse the repository at this point in the history
* Fixed CPU usage issues caused by inefficiencies in HeadTracker

* added comments

* revert heads back to the fix

(cherry picked from commit 6f1ebca)
  • Loading branch information
dhaidashenko committed May 20, 2024
1 parent 1d0b861 commit 5daefad
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 33 deletions.
10 changes: 10 additions & 0 deletions .changeset/early-shoes-sit.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
"chainlink": patch
---

Fixed CPU usage issues caused by inefficiencies in HeadTracker.

HeadTracker's support of finality tags caused a drastic increase in the number of tracked blocks on the Arbitrum chain (from 50 to 12,000), which has led to a 30% increase in CPU usage.

The fix improves the data structure for tracking blocks and makes lookup more efficient. BenchmarkHeadTracker_Backfill shows 40x time reduction.
#bugfix
42 changes: 41 additions & 1 deletion core/chains/evm/headtracker/head_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox/mailboxtest"

htmocks "github.com/smartcontractkit/chainlink/v2/common/headtracker/mocks"
evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
evmclimocks "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client/mocks"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/headtracker"
httypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/headtracker/types"
Expand Down Expand Up @@ -983,7 +984,46 @@ func TestHeadTracker_Backfill(t *testing.T) {
})
}

func createHeadTracker(t *testing.T, ethClient *evmclimocks.Client, config headtracker.Config, htConfig headtracker.HeadTrackerConfig, orm headtracker.ORM) *headTrackerUniverse {
// BenchmarkHeadTracker_Backfill - benchmarks HeadTracker's Backfill with focus on efficiency after initial
// backfill on start up
func BenchmarkHeadTracker_Backfill(b *testing.B) {
cfg := configtest.NewGeneralConfig(b, nil)

evmcfg := evmtest.NewChainScopedConfig(b, cfg)
db := pgtest.NewSqlxDB(b)
chainID := big.NewInt(evmclient.NullClientChainID)
orm := headtracker.NewORM(*chainID, db)
ethClient := evmclimocks.NewClient(b)
ethClient.On("ConfiguredChainID").Return(chainID)
ht := createHeadTracker(b, ethClient, evmcfg.EVM(), evmcfg.EVM().HeadTracker(), orm)
ctx := tests.Context(b)
makeHash := func(n int64) gethCommon.Hash {
return gethCommon.BigToHash(big.NewInt(n))
}
const finalityDepth = 12000 // observed value on Arbitrum
makeBlock := func(n int64) *evmtypes.Head {
return &evmtypes.Head{Number: n, Hash: makeHash(n), ParentHash: makeHash(n - 1)}
}
latest := makeBlock(finalityDepth)
finalized := makeBlock(1)
ethClient.On("HeadByHash", mock.Anything, mock.Anything).Return(func(_ context.Context, hash gethCommon.Hash) (*evmtypes.Head, error) {
number := hash.Big().Int64()
return makeBlock(number), nil
})
// run initial backfill to populate the database
err := ht.headTracker.Backfill(ctx, latest, finalized)
require.NoError(b, err)
b.ResetTimer()
// focus benchmark on processing of a new latest block
for i := 0; i < b.N; i++ {
latest = makeBlock(int64(finalityDepth + i))
finalized = makeBlock(int64(i + 1))
err := ht.headTracker.Backfill(ctx, latest, finalized)
require.NoError(b, err)
}
}

func createHeadTracker(t testing.TB, ethClient *evmclimocks.Client, config headtracker.Config, htConfig headtracker.HeadTrackerConfig, orm headtracker.ORM) *headTrackerUniverse {
lggr, ob := logger.TestObserved(t, zap.DebugLevel)
hb := headtracker.NewHeadBroadcaster(lggr)
hs := headtracker.NewHeadSaver(lggr, orm, config, htConfig)
Expand Down
54 changes: 22 additions & 32 deletions core/chains/evm/headtracker/heads.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ type Heads interface {
}

type heads struct {
heads []*evmtypes.Head
mu sync.RWMutex
heads []*evmtypes.Head
headsMap map[common.Hash]*evmtypes.Head
mu sync.RWMutex
}

func NewHeads() Heads {
Expand All @@ -48,12 +49,11 @@ func (h *heads) HeadByHash(hash common.Hash) *evmtypes.Head {
h.mu.RLock()
defer h.mu.RUnlock()

for _, head := range h.heads {
if head.Hash == hash {
return head
}
if h.headsMap == nil {
return nil
}
return nil

return h.headsMap[hash]
}

func (h *heads) Count() int {
Expand All @@ -74,26 +74,23 @@ func (h *heads) MarkFinalized(finalized common.Hash, minBlockToKeep int64) bool
}

// deep copy to avoid race on head.Parent
h.heads = deepCopy(h.heads, minBlockToKeep)
h.heads, h.headsMap = deepCopy(h.heads, minBlockToKeep)

head := h.heads[0]
foundFinalized := false
for head != nil {
if head.Hash == finalized {
foundFinalized = true
}

// we might see finalized to move back in chain due to request to lagging RPC,
// we should not override the flag in such cases
head.IsFinalized = head.IsFinalized || foundFinalized
head = head.Parent
finalizedHead, ok := h.headsMap[finalized]
if !ok {
return false
}
for finalizedHead != nil {
finalizedHead.IsFinalized = true
finalizedHead = finalizedHead.Parent
}

return foundFinalized
return true
}

func deepCopy(oldHeads []*evmtypes.Head, minBlockToKeep int64) []*evmtypes.Head {
func deepCopy(oldHeads []*evmtypes.Head, minBlockToKeep int64) ([]*evmtypes.Head, map[common.Hash]*evmtypes.Head) {
headsMap := make(map[common.Hash]*evmtypes.Head, len(oldHeads))
heads := make([]*evmtypes.Head, 0, len(headsMap))
for _, head := range oldHeads {
if head.Hash == head.ParentHash {
// shouldn't happen but it is untrusted input
Expand All @@ -111,18 +108,11 @@ func deepCopy(oldHeads []*evmtypes.Head, minBlockToKeep int64) []*evmtypes.Head
// prefer head that was already in heads as it might have been marked as finalized on previous run
if _, ok := headsMap[head.Hash]; !ok {
headsMap[head.Hash] = &headCopy
heads = append(heads, &headCopy)
}
}

heads := make([]*evmtypes.Head, 0, len(headsMap))
// unsorted unique heads
{
for _, head := range headsMap {
heads = append(heads, head)
}
}

// sort the heads
// sort the heads as original slice might be out of order
sort.SliceStable(heads, func(i, j int) bool {
// sorting from the highest number to lowest
return heads[i].Number > heads[j].Number
Expand All @@ -137,13 +127,13 @@ func deepCopy(oldHeads []*evmtypes.Head, minBlockToKeep int64) []*evmtypes.Head
}
}

return heads
return heads, headsMap
}

func (h *heads) AddHeads(newHeads ...*evmtypes.Head) {
h.mu.Lock()
defer h.mu.Unlock()

// deep copy to avoid race on head.Parent
h.heads = deepCopy(append(h.heads, newHeads...), 0)
h.heads, h.headsMap = deepCopy(append(h.heads, newHeads...), 0)
}

0 comments on commit 5daefad

Please sign in to comment.