Skip to content

Commit

Permalink
Add logs to identify when assumptions of log queuing behaviour are vi…
Browse files Browse the repository at this point in the history
…olated (#12846)

* Add logs to identify when assumptions of log queuing behaviour are violated

* Add tests

* go import

* Add changeset

* Update enqueuing assumption

* Update tests

* Extract block tracking into a separate function

* Clean up outdated enqueued blocks

* Clean up imports

* Ignore reord buffer in cleanup

* Cleanup test name
  • Loading branch information
ferglor authored and akuzni2 committed May 11, 2024
1 parent 3394086 commit c7a4043
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 12 deletions.
5 changes: 5 additions & 0 deletions .changeset/heavy-mails-rule.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Add logs for when the assumptions of how the log buffer will be used are violated #internal
Original file line number Diff line number Diff line change
Expand Up @@ -76,38 +76,80 @@ type logBuffer struct {
lastBlockSeen *atomic.Int64
// map of upkeep id to its queue
queues map[string]*upkeepLogQueue
lock sync.RWMutex
// map for then number of times we have enqueued logs for a block number
enqueuedBlocks map[int64]map[string]int
lock sync.RWMutex
}

func NewLogBuffer(lggr logger.Logger, lookback, blockRate, logLimit uint32) LogBuffer {
return &logBuffer{
lggr: lggr.Named("KeepersRegistry.LogEventBufferV1"),
opts: newLogBufferOptions(lookback, blockRate, logLimit),
lastBlockSeen: new(atomic.Int64),
queues: make(map[string]*upkeepLogQueue),
lggr: lggr.Named("KeepersRegistry.LogEventBufferV1"),
opts: newLogBufferOptions(lookback, blockRate, logLimit),
lastBlockSeen: new(atomic.Int64),
enqueuedBlocks: map[int64]map[string]int{},
queues: make(map[string]*upkeepLogQueue),
}
}

// Enqueue adds logs to the buffer and might also drop logs if the limit for the
// given upkeep was exceeded. It will create a new buffer if it does not exist.
// Logs are expected to be enqueued in increasing order of block number.
// All logs for an upkeep on a particular block will be enqueued in a single Enqueue call.
// Returns the number of logs that were added and number of logs that were dropped.
func (b *logBuffer) Enqueue(uid *big.Int, logs ...logpoller.Log) (int, int) {
buf, ok := b.getUpkeepQueue(uid)
if !ok || buf == nil {
buf = newUpkeepLogQueue(b.lggr, uid, b.opts)
b.setUpkeepQueue(uid, buf)
}
latestBlock := latestBlockNumber(logs...)
if b.lastBlockSeen.Load() < latestBlock {
b.lastBlockSeen.Store(latestBlock)

latestLogBlock, uniqueBlocks := blockStatistics(logs...)
if lastBlockSeen := b.lastBlockSeen.Load(); lastBlockSeen < latestLogBlock {
b.lastBlockSeen.Store(latestLogBlock)
} else if latestLogBlock < lastBlockSeen {
b.lggr.Debugw("enqueuing logs from a block older than latest seen block", "logBlock", latestLogBlock, "lastBlockSeen", lastBlockSeen)
}

b.trackBlockNumbersForUpkeep(uid, uniqueBlocks)

blockThreshold := b.lastBlockSeen.Load() - int64(b.opts.lookback.Load())
if blockThreshold <= 0 {
blockThreshold = 1
}

// clean up enqueued block counts
for block := range b.enqueuedBlocks {
if block < blockThreshold {
delete(b.enqueuedBlocks, block)
}
}

return buf.enqueue(blockThreshold, logs...)
}

// trackBlockNumbersForUpkeep keeps track of the number of times we enqueue logs for an upkeep,
// for a specific block number. The expectation is that we will only enqueue logs for an upkeep for a
// specific block number once, i.e. all logs for an upkeep for a block, will be enqueued in a single
// enqueue call. In the event that we see upkeep logs enqueued for a particular block more than once,
// we log a message.
func (b *logBuffer) trackBlockNumbersForUpkeep(uid *big.Int, uniqueBlocks map[int64]bool) {
for blockNumber := range uniqueBlocks {
if blockNumbers, ok := b.enqueuedBlocks[blockNumber]; ok {
if upkeepBlockInstances, ok := blockNumbers[uid.String()]; ok {
blockNumbers[uid.String()] = upkeepBlockInstances + 1
b.lggr.Debugw("enqueuing logs again for a previously seen block for this upkeep", "blockNumber", blockNumber, "numberOfEnqueues", b.enqueuedBlocks[blockNumber], "upkeepID", uid.String())
} else {
blockNumbers[uid.String()] = 1
}
b.enqueuedBlocks[blockNumber] = blockNumbers
} else {
b.enqueuedBlocks[blockNumber] = map[string]int{
uid.String(): 1,
}
}
}
}

// Dequeue greedly pulls logs from the buffers.
// Returns logs and the number of remaining logs in the buffer.
func (b *logBuffer) Dequeue(block int64, blockRate, upkeepLimit, maxResults int, upkeepSelector func(id *big.Int) bool) ([]BufferedLog, int) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
Expand Down Expand Up @@ -50,6 +51,96 @@ func TestLogEventBufferV1_SyncFilters(t *testing.T) {
require.Equal(t, 1, buf.NumOfUpkeeps())
}

type readableLogger struct {
logger.Logger
DebugwFn func(msg string, keysAndValues ...interface{})
NamedFn func(name string) logger.Logger
WithFn func(args ...interface{}) logger.Logger
}

func (l *readableLogger) Debugw(msg string, keysAndValues ...interface{}) {
l.DebugwFn(msg, keysAndValues...)
}

func (l *readableLogger) Named(name string) logger.Logger {
return l
}

func (l *readableLogger) With(args ...interface{}) logger.Logger {
return l
}

func TestLogEventBufferV1_EnqueueViolations(t *testing.T) {
t.Run("enqueuing logs for a block older than latest seen logs a message", func(t *testing.T) {
logReceived := false
readableLogger := &readableLogger{
DebugwFn: func(msg string, keysAndValues ...interface{}) {
if msg == "enqueuing logs from a block older than latest seen block" {
logReceived = true
assert.Equal(t, "logBlock", keysAndValues[0])
assert.Equal(t, int64(1), keysAndValues[1])
assert.Equal(t, "lastBlockSeen", keysAndValues[2])
assert.Equal(t, int64(2), keysAndValues[3])
}
},
}

logBufferV1 := NewLogBuffer(readableLogger, 10, 20, 1)

buf := logBufferV1.(*logBuffer)

buf.Enqueue(big.NewInt(1),
logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x1"), LogIndex: 0},
logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x1"), LogIndex: 1},
)
buf.Enqueue(big.NewInt(2),
logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x2"), LogIndex: 0},
)

assert.Equal(t, 1, buf.enqueuedBlocks[2]["1"])
assert.Equal(t, 1, buf.enqueuedBlocks[1]["2"])
assert.True(t, true, logReceived)
})

t.Run("enqueuing logs for the same block over multiple calls logs a message", func(t *testing.T) {
logReceived := false
readableLogger := &readableLogger{
DebugwFn: func(msg string, keysAndValues ...interface{}) {
if msg == "enqueuing logs again for a previously seen block" {
logReceived = true
assert.Equal(t, "blockNumber", keysAndValues[0])
assert.Equal(t, int64(3), keysAndValues[1])
assert.Equal(t, "numberOfEnqueues", keysAndValues[2])
assert.Equal(t, 2, keysAndValues[3])
}
},
}

logBufferV1 := NewLogBuffer(readableLogger, 10, 20, 1)

buf := logBufferV1.(*logBuffer)

buf.Enqueue(big.NewInt(1),
logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0},
logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 1},
)
buf.Enqueue(big.NewInt(2),
logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x2"), LogIndex: 0},
)
buf.Enqueue(big.NewInt(3),
logpoller.Log{BlockNumber: 3, TxHash: common.HexToHash("0x3a"), LogIndex: 0},
)
buf.Enqueue(big.NewInt(3),
logpoller.Log{BlockNumber: 3, TxHash: common.HexToHash("0x3b"), LogIndex: 0},
)

assert.Equal(t, 1, buf.enqueuedBlocks[2]["2"])
assert.Equal(t, 1, buf.enqueuedBlocks[1]["1"])
assert.Equal(t, 2, buf.enqueuedBlocks[3]["3"])
assert.True(t, true, logReceived)
})
}

func TestLogEventBufferV1_Dequeue(t *testing.T) {
tests := []struct {
name string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
)

// LogSorter sorts the logs based on block number, tx hash and log index.
// LogSorter sorts the logs primarily by block number, then by log index, and finally by tx hash.
// returns true if b should come before a.
func LogSorter(a, b logpoller.Log) bool {
return LogComparator(a, b) > 0
Expand Down Expand Up @@ -57,13 +57,17 @@ func logID(l logpoller.Log) string {
return hex.EncodeToString(ext.LogIdentifier())
}

// latestBlockNumber returns the latest block number from the given logs
func latestBlockNumber(logs ...logpoller.Log) int64 {
// blockStatistics returns the latest block number from the given logs, and a map of unique block numbers
func blockStatistics(logs ...logpoller.Log) (int64, map[int64]bool) {
var latest int64
uniqueBlocks := map[int64]bool{}

for _, l := range logs {
if l.BlockNumber > latest {
latest = l.BlockNumber
}
uniqueBlocks[l.BlockNumber] = true
}
return latest

return latest, uniqueBlocks
}

0 comments on commit c7a4043

Please sign in to comment.