From 47012b6d902726af97b6e80f024047252d01b034 Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Tue, 16 Apr 2024 19:22:36 +0100 Subject: [PATCH 01/11] Add logs to identify when assumptions of log queuing behaviour are violated --- .../evmregistry/v21/logprovider/buffer_v1.go | 35 ++++++++++++++----- .../evmregistry/v21/logprovider/log.go | 9 +++-- 2 files changed, 32 insertions(+), 12 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go index fbc1da075df..221c171c1d9 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go @@ -75,21 +75,25 @@ type logBuffer struct { // last block number seen by the buffer lastBlockSeen *atomic.Int64 // map of upkeep id to its queue - queues map[string]*upkeepLogQueue - lock sync.RWMutex + queues map[string]*upkeepLogQueue + enqueuedBlocks map[int64]int + lock sync.RWMutex } func NewLogBuffer(lggr logger.Logger, lookback, blockRate, logLimit uint32) LogBuffer { return &logBuffer{ - lggr: lggr.Named("KeepersRegistry.LogEventBufferV1"), - opts: newLogBufferOptions(lookback, blockRate, logLimit), - lastBlockSeen: new(atomic.Int64), - queues: make(map[string]*upkeepLogQueue), + lggr: lggr.Named("KeepersRegistry.LogEventBufferV1"), + opts: newLogBufferOptions(lookback, blockRate, logLimit), + lastBlockSeen: new(atomic.Int64), + enqueuedBlocks: map[int64]int{}, + queues: make(map[string]*upkeepLogQueue), } } // Enqueue adds logs to the buffer and might also drop logs if the limit for the // given upkeep was exceeded. It will create a new buffer if it does not exist. +// Logs are expected to be enqueued in increasing order of block number. +// All logs for a particular block will be enqueued at once and not across separate calls. // Returns the number of logs that were added and number of logs that were dropped. func (b *logBuffer) Enqueue(uid *big.Int, logs ...logpoller.Log) (int, int) { buf, ok := b.getUpkeepQueue(uid) @@ -97,10 +101,23 @@ func (b *logBuffer) Enqueue(uid *big.Int, logs ...logpoller.Log) (int, int) { buf = newUpkeepLogQueue(b.lggr, uid, b.opts) b.setUpkeepQueue(uid, buf) } - latestBlock := latestBlockNumber(logs...) - if b.lastBlockSeen.Load() < latestBlock { - b.lastBlockSeen.Store(latestBlock) + + latestLogBlock, uniqueBlocks := latestBlockNumber(logs...) + if lastBlockSeen := b.lastBlockSeen.Load(); lastBlockSeen < latestLogBlock { + b.lastBlockSeen.Store(latestLogBlock) + } else if latestLogBlock < lastBlockSeen { + b.lggr.Debugw("enqueuing logs from a block older than latest seen block", "logBlock", latestLogBlock, "lastBlockSeen", lastBlockSeen) + } + + for block := range uniqueBlocks { + if _, ok := b.enqueuedBlocks[block]; ok { + b.enqueuedBlocks[block] = b.enqueuedBlocks[block] + 1 + b.lggr.Debugw("enqueuing logs again for a previously seen block", "block", block, "timesSeen", b.enqueuedBlocks[block]) + } else { + b.enqueuedBlocks[block] = 1 + } } + blockThreshold := b.lastBlockSeen.Load() - int64(b.opts.lookback.Load()) if blockThreshold <= 0 { blockThreshold = 1 diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/log.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/log.go index 9156e341688..51369c9843e 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/log.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/log.go @@ -8,7 +8,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" ) -// LogSorter sorts the logs based on block number, tx hash and log index. +// LogSorter sorts the logs primarily by block number, then by log index, and finally by tx hash. // returns true if b should come before a. func LogSorter(a, b logpoller.Log) bool { return LogComparator(a, b) > 0 @@ -58,12 +58,15 @@ func logID(l logpoller.Log) string { } // latestBlockNumber returns the latest block number from the given logs -func latestBlockNumber(logs ...logpoller.Log) int64 { +func latestBlockNumber(logs ...logpoller.Log) (int64, map[int64]bool) { var latest int64 + uniqueBlocks := map[int64]bool{} + for _, l := range logs { if l.BlockNumber > latest { latest = l.BlockNumber } + uniqueBlocks[l.BlockNumber] = true } - return latest + return latest, uniqueBlocks } From 7ff1cd9a078620af3d726c5d01e1be5257170238 Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Tue, 16 Apr 2024 22:38:55 +0100 Subject: [PATCH 02/11] Add tests --- .../evmregistry/v21/logprovider/buffer_v1.go | 13 +-- .../v21/logprovider/buffer_v1_test.go | 91 +++++++++++++++++++ 2 files changed, 98 insertions(+), 6 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go index 221c171c1d9..7d3739aae97 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go @@ -75,7 +75,8 @@ type logBuffer struct { // last block number seen by the buffer lastBlockSeen *atomic.Int64 // map of upkeep id to its queue - queues map[string]*upkeepLogQueue + queues map[string]*upkeepLogQueue + // map for then number of times we have enqueued logs for a block number enqueuedBlocks map[int64]int lock sync.RWMutex } @@ -109,12 +110,12 @@ func (b *logBuffer) Enqueue(uid *big.Int, logs ...logpoller.Log) (int, int) { b.lggr.Debugw("enqueuing logs from a block older than latest seen block", "logBlock", latestLogBlock, "lastBlockSeen", lastBlockSeen) } - for block := range uniqueBlocks { - if _, ok := b.enqueuedBlocks[block]; ok { - b.enqueuedBlocks[block] = b.enqueuedBlocks[block] + 1 - b.lggr.Debugw("enqueuing logs again for a previously seen block", "block", block, "timesSeen", b.enqueuedBlocks[block]) + for blockNumber := range uniqueBlocks { + if _, ok := b.enqueuedBlocks[blockNumber]; ok { + b.enqueuedBlocks[blockNumber] = b.enqueuedBlocks[blockNumber] + 1 + b.lggr.Debugw("enqueuing logs again for a previously seen block", "blockNumber", blockNumber, "numberOfEnqueues", b.enqueuedBlocks[blockNumber]) } else { - b.enqueuedBlocks[block] = 1 + b.enqueuedBlocks[blockNumber] = 1 } } diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go index 19f806d35b9..9fe664753c6 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go @@ -1,6 +1,7 @@ package logprovider import ( + "github.com/stretchr/testify/assert" "math/big" "testing" @@ -50,6 +51,96 @@ func TestLogEventBufferV1_SyncFilters(t *testing.T) { require.Equal(t, 1, buf.NumOfUpkeeps()) } +type readableLogger struct { + logger.Logger + DebugwFn func(msg string, keysAndValues ...interface{}) + NamedFn func(name string) logger.Logger + WithFn func(args ...interface{}) logger.Logger +} + +func (l *readableLogger) Debugw(msg string, keysAndValues ...interface{}) { + l.DebugwFn(msg, keysAndValues...) +} + +func (l *readableLogger) Named(name string) logger.Logger { + return l +} + +func (l *readableLogger) With(args ...interface{}) logger.Logger { + return l +} + +func TestLogEventBufferV1_Enqueueviolations(t *testing.T) { + t.Run("enqueuing logs for a block older than latest seen logs a message", func(t *testing.T) { + logReceived := false + readableLogger := &readableLogger{ + DebugwFn: func(msg string, keysAndValues ...interface{}) { + if msg == "enqueuing logs from a block older than latest seen block" { + logReceived = true + assert.Equal(t, "logBlock", keysAndValues[0]) + assert.Equal(t, int64(1), keysAndValues[1]) + assert.Equal(t, "lastBlockSeen", keysAndValues[2]) + assert.Equal(t, int64(2), keysAndValues[3]) + } + }, + } + + logBufferV1 := NewLogBuffer(readableLogger, 10, 20, 1) + + buf := logBufferV1.(*logBuffer) + + buf.Enqueue(big.NewInt(1), + logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x1"), LogIndex: 0}, + logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x1"), LogIndex: 1}, + ) + buf.Enqueue(big.NewInt(2), + logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x2"), LogIndex: 0}, + ) + + assert.Equal(t, 1, buf.enqueuedBlocks[2]) + assert.Equal(t, 1, buf.enqueuedBlocks[1]) + assert.True(t, true, logReceived) + }) + + t.Run("enqueuing logs for the same block over multiple calls logs a message", func(t *testing.T) { + logReceived := false + readableLogger := &readableLogger{ + DebugwFn: func(msg string, keysAndValues ...interface{}) { + if msg == "enqueuing logs again for a previously seen block" { + logReceived = true + assert.Equal(t, "blockNumber", keysAndValues[0]) + assert.Equal(t, int64(3), keysAndValues[1]) + assert.Equal(t, "numberOfEnqueues", keysAndValues[2]) + assert.Equal(t, 2, keysAndValues[3]) + } + }, + } + + logBufferV1 := NewLogBuffer(readableLogger, 10, 20, 1) + + buf := logBufferV1.(*logBuffer) + + buf.Enqueue(big.NewInt(1), + logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0}, + logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 1}, + ) + buf.Enqueue(big.NewInt(2), + logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x2"), LogIndex: 0}, + ) + buf.Enqueue(big.NewInt(3), + logpoller.Log{BlockNumber: 3, TxHash: common.HexToHash("0x3a"), LogIndex: 0}, + ) + buf.Enqueue(big.NewInt(3), + logpoller.Log{BlockNumber: 3, TxHash: common.HexToHash("0x3b"), LogIndex: 0}, + ) + + assert.Equal(t, 1, buf.enqueuedBlocks[2]) + assert.Equal(t, 1, buf.enqueuedBlocks[1]) + assert.Equal(t, 2, buf.enqueuedBlocks[3]) + assert.True(t, true, logReceived) + }) +} + func TestLogEventBufferV1_Dequeue(t *testing.T) { tests := []struct { name string From 4e1906588529b05a7b67f7564f646465b58031c0 Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Tue, 16 Apr 2024 22:46:09 +0100 Subject: [PATCH 03/11] go import --- .../ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go index 9fe664753c6..2404ebabfd5 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go @@ -1,10 +1,11 @@ package logprovider import ( - "github.com/stretchr/testify/assert" "math/big" "testing" + "github.com/stretchr/testify/assert" + "github.com/ethereum/go-ethereum/common" "github.com/stretchr/testify/require" From 381702ab0bb4b832264a1d548d58dff810ca0784 Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Tue, 16 Apr 2024 22:54:45 +0100 Subject: [PATCH 04/11] Add changeset --- .changeset/heavy-mails-rule.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/heavy-mails-rule.md diff --git a/.changeset/heavy-mails-rule.md b/.changeset/heavy-mails-rule.md new file mode 100644 index 00000000000..fdb6b3929b3 --- /dev/null +++ b/.changeset/heavy-mails-rule.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +Add logs for when the assumptions of how the log buffer will be used are violated #internal From 7ef5f7305c61131a7ffc64a7ad9fe6cdc61b5be0 Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Thu, 18 Apr 2024 00:21:49 +0100 Subject: [PATCH 05/11] Update enqueuing assumption --- .../evmregistry/v21/logprovider/buffer_v1.go | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go index 7d3739aae97..d9f5f5d353c 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go @@ -77,7 +77,7 @@ type logBuffer struct { // map of upkeep id to its queue queues map[string]*upkeepLogQueue // map for then number of times we have enqueued logs for a block number - enqueuedBlocks map[int64]int + enqueuedBlocks map[int64]map[string]int lock sync.RWMutex } @@ -86,7 +86,7 @@ func NewLogBuffer(lggr logger.Logger, lookback, blockRate, logLimit uint32) LogB lggr: lggr.Named("KeepersRegistry.LogEventBufferV1"), opts: newLogBufferOptions(lookback, blockRate, logLimit), lastBlockSeen: new(atomic.Int64), - enqueuedBlocks: map[int64]int{}, + enqueuedBlocks: map[int64]map[string]int{}, queues: make(map[string]*upkeepLogQueue), } } @@ -94,7 +94,7 @@ func NewLogBuffer(lggr logger.Logger, lookback, blockRate, logLimit uint32) LogB // Enqueue adds logs to the buffer and might also drop logs if the limit for the // given upkeep was exceeded. It will create a new buffer if it does not exist. // Logs are expected to be enqueued in increasing order of block number. -// All logs for a particular block will be enqueued at once and not across separate calls. +// All logs for an upkeep on a particular block will be enqueued in a single Enqueue call. // Returns the number of logs that were added and number of logs that were dropped. func (b *logBuffer) Enqueue(uid *big.Int, logs ...logpoller.Log) (int, int) { buf, ok := b.getUpkeepQueue(uid) @@ -111,11 +111,18 @@ func (b *logBuffer) Enqueue(uid *big.Int, logs ...logpoller.Log) (int, int) { } for blockNumber := range uniqueBlocks { - if _, ok := b.enqueuedBlocks[blockNumber]; ok { - b.enqueuedBlocks[blockNumber] = b.enqueuedBlocks[blockNumber] + 1 - b.lggr.Debugw("enqueuing logs again for a previously seen block", "blockNumber", blockNumber, "numberOfEnqueues", b.enqueuedBlocks[blockNumber]) + if blockNumbers, ok := b.enqueuedBlocks[blockNumber]; ok { + if count, ok := blockNumbers[uid.String()]; ok { + blockNumbers[uid.String()] = count + 1 + b.lggr.Debugw("enqueuing logs again for a previously seen block for this upkeep", "blockNumber", blockNumber, "numberOfEnqueues", b.enqueuedBlocks[blockNumber], "upkeepID", uid.String()) + } else { + blockNumbers[uid.String()] = 1 + } + b.enqueuedBlocks[blockNumber] = blockNumbers } else { - b.enqueuedBlocks[blockNumber] = 1 + b.enqueuedBlocks[blockNumber] = map[string]int{ + uid.String(): 1, + } } } From f6a8162a958cdcecc1359c1609034f6df9569b69 Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Thu, 18 Apr 2024 00:29:54 +0100 Subject: [PATCH 06/11] Update tests --- .../evmregistry/v21/logprovider/buffer_v1_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go index 2404ebabfd5..7c2316f1a2e 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go @@ -98,8 +98,8 @@ func TestLogEventBufferV1_Enqueueviolations(t *testing.T) { logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x2"), LogIndex: 0}, ) - assert.Equal(t, 1, buf.enqueuedBlocks[2]) - assert.Equal(t, 1, buf.enqueuedBlocks[1]) + assert.Equal(t, 1, buf.enqueuedBlocks[2]["1"]) + assert.Equal(t, 1, buf.enqueuedBlocks[1]["2"]) assert.True(t, true, logReceived) }) @@ -135,9 +135,9 @@ func TestLogEventBufferV1_Enqueueviolations(t *testing.T) { logpoller.Log{BlockNumber: 3, TxHash: common.HexToHash("0x3b"), LogIndex: 0}, ) - assert.Equal(t, 1, buf.enqueuedBlocks[2]) - assert.Equal(t, 1, buf.enqueuedBlocks[1]) - assert.Equal(t, 2, buf.enqueuedBlocks[3]) + assert.Equal(t, 1, buf.enqueuedBlocks[2]["2"]) + assert.Equal(t, 1, buf.enqueuedBlocks[1]["1"]) + assert.Equal(t, 2, buf.enqueuedBlocks[3]["3"]) assert.True(t, true, logReceived) }) } From 2710e62074a38cfcc8d1c06cec36278111d7f4ba Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Thu, 25 Apr 2024 15:59:17 +0100 Subject: [PATCH 07/11] Extract block tracking into a separate function --- .../evmregistry/v21/logprovider/buffer_v1.go | 28 +++++++++++++------ .../evmregistry/v21/logprovider/log.go | 5 ++-- 2 files changed, 22 insertions(+), 11 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go index d9f5f5d353c..0406ca0c0f7 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go @@ -103,17 +103,33 @@ func (b *logBuffer) Enqueue(uid *big.Int, logs ...logpoller.Log) (int, int) { b.setUpkeepQueue(uid, buf) } - latestLogBlock, uniqueBlocks := latestBlockNumber(logs...) + latestLogBlock, uniqueBlocks := blockStatistics(logs...) if lastBlockSeen := b.lastBlockSeen.Load(); lastBlockSeen < latestLogBlock { b.lastBlockSeen.Store(latestLogBlock) } else if latestLogBlock < lastBlockSeen { b.lggr.Debugw("enqueuing logs from a block older than latest seen block", "logBlock", latestLogBlock, "lastBlockSeen", lastBlockSeen) } + b.trackBlockNumbersForUpkeep(uid, uniqueBlocks) + + blockThreshold := b.lastBlockSeen.Load() - int64(b.opts.lookback.Load()) + if blockThreshold <= 0 { + blockThreshold = 1 + } + + return buf.enqueue(blockThreshold, logs...) +} + +// trackBlockNumbersForUpkeep keeps track of the number of times we enqueue logs for an upkeep, +// for a specific block number. The expectation is that we will only enqueue logs for an upkeep for a +// specific block number once, i.e. all logs for an upkeep for a block, will be enqueued in a single +// enqueue call. In the event that we see upkeep logs enqueued for a particular block more than once, +// we log a message. +func (b *logBuffer) trackBlockNumbersForUpkeep(uid *big.Int, uniqueBlocks map[int64]bool) { for blockNumber := range uniqueBlocks { if blockNumbers, ok := b.enqueuedBlocks[blockNumber]; ok { - if count, ok := blockNumbers[uid.String()]; ok { - blockNumbers[uid.String()] = count + 1 + if upkeepBlockInstances, ok := blockNumbers[uid.String()]; ok { + blockNumbers[uid.String()] = upkeepBlockInstances + 1 b.lggr.Debugw("enqueuing logs again for a previously seen block for this upkeep", "blockNumber", blockNumber, "numberOfEnqueues", b.enqueuedBlocks[blockNumber], "upkeepID", uid.String()) } else { blockNumbers[uid.String()] = 1 @@ -125,12 +141,6 @@ func (b *logBuffer) Enqueue(uid *big.Int, logs ...logpoller.Log) (int, int) { } } } - - blockThreshold := b.lastBlockSeen.Load() - int64(b.opts.lookback.Load()) - if blockThreshold <= 0 { - blockThreshold = 1 - } - return buf.enqueue(blockThreshold, logs...) } // Dequeue greedly pulls logs from the buffers. diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/log.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/log.go index 51369c9843e..9603d6da5be 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/log.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/log.go @@ -57,8 +57,8 @@ func logID(l logpoller.Log) string { return hex.EncodeToString(ext.LogIdentifier()) } -// latestBlockNumber returns the latest block number from the given logs -func latestBlockNumber(logs ...logpoller.Log) (int64, map[int64]bool) { +// blockStatistics returns the latest block number from the given logs, and a map of unique block numbers +func blockStatistics(logs ...logpoller.Log) (int64, map[int64]bool) { var latest int64 uniqueBlocks := map[int64]bool{} @@ -68,5 +68,6 @@ func latestBlockNumber(logs ...logpoller.Log) (int64, map[int64]bool) { } uniqueBlocks[l.BlockNumber] = true } + return latest, uniqueBlocks } From 396425e4a43eceaadeccf2a46082439af7bcfcf3 Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Thu, 25 Apr 2024 16:48:48 +0100 Subject: [PATCH 08/11] Clean up outdated enqueued blocks --- .../ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go index 0406ca0c0f7..a5a7e0b12ab 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go @@ -117,6 +117,13 @@ func (b *logBuffer) Enqueue(uid *big.Int, logs ...logpoller.Log) (int, int) { blockThreshold = 1 } + // clean up enqueued block counts + for block := range b.enqueuedBlocks { + if block < blockThreshold-reorgBuffer { + delete(b.enqueuedBlocks, block) + } + } + return buf.enqueue(blockThreshold, logs...) } From ec2aa0aab66444e08bcdb48a814e2c8f60e6f7fa Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Tue, 30 Apr 2024 17:25:12 +0100 Subject: [PATCH 09/11] Clean up imports --- .../ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go index 7c2316f1a2e..56f9606b8a8 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go @@ -4,9 +4,8 @@ import ( "math/big" "testing" - "github.com/stretchr/testify/assert" - "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" From 7619d2dc774f57134fda255d5afb14f7bf0a57b2 Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Fri, 3 May 2024 16:02:38 +0100 Subject: [PATCH 10/11] Ignore reord buffer in cleanup --- .../plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go index a5a7e0b12ab..b556b142984 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go @@ -119,7 +119,7 @@ func (b *logBuffer) Enqueue(uid *big.Int, logs ...logpoller.Log) (int, int) { // clean up enqueued block counts for block := range b.enqueuedBlocks { - if block < blockThreshold-reorgBuffer { + if block < blockThreshold { delete(b.enqueuedBlocks, block) } } From 4bf1c356e7dc303afa0867633a0a002685546ba3 Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Sat, 4 May 2024 01:00:07 +0100 Subject: [PATCH 11/11] Cleanup test name --- .../ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go index 56f9606b8a8..d6b4f43ac6e 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go @@ -70,7 +70,7 @@ func (l *readableLogger) With(args ...interface{}) logger.Logger { return l } -func TestLogEventBufferV1_Enqueueviolations(t *testing.T) { +func TestLogEventBufferV1_EnqueueViolations(t *testing.T) { t.Run("enqueuing logs for a block older than latest seen logs a message", func(t *testing.T) { logReceived := false readableLogger := &readableLogger{