Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add logs to identify when assumptions of log queuing behaviour are violated #12846

Merged
merged 11 commits into from
May 9, 2024
5 changes: 5 additions & 0 deletions .changeset/heavy-mails-rule.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Add logs for when the assumptions of how the log buffer will be used are violated #internal
Original file line number Diff line number Diff line change
Expand Up @@ -76,38 +76,80 @@ type logBuffer struct {
lastBlockSeen *atomic.Int64
// map of upkeep id to its queue
queues map[string]*upkeepLogQueue
lock sync.RWMutex
// map for then number of times we have enqueued logs for a block number
enqueuedBlocks map[int64]map[string]int
lock sync.RWMutex
}

func NewLogBuffer(lggr logger.Logger, lookback, blockRate, logLimit uint32) LogBuffer {
return &logBuffer{
lggr: lggr.Named("KeepersRegistry.LogEventBufferV1"),
opts: newLogBufferOptions(lookback, blockRate, logLimit),
lastBlockSeen: new(atomic.Int64),
queues: make(map[string]*upkeepLogQueue),
lggr: lggr.Named("KeepersRegistry.LogEventBufferV1"),
opts: newLogBufferOptions(lookback, blockRate, logLimit),
lastBlockSeen: new(atomic.Int64),
enqueuedBlocks: map[int64]map[string]int{},
queues: make(map[string]*upkeepLogQueue),
}
}

// Enqueue adds logs to the buffer and might also drop logs if the limit for the
// given upkeep was exceeded. It will create a new buffer if it does not exist.
// Logs are expected to be enqueued in increasing order of block number.
// All logs for an upkeep on a particular block will be enqueued in a single Enqueue call.
// Returns the number of logs that were added and number of logs that were dropped.
func (b *logBuffer) Enqueue(uid *big.Int, logs ...logpoller.Log) (int, int) {
buf, ok := b.getUpkeepQueue(uid)
if !ok || buf == nil {
buf = newUpkeepLogQueue(b.lggr, uid, b.opts)
b.setUpkeepQueue(uid, buf)
}
latestBlock := latestBlockNumber(logs...)
if b.lastBlockSeen.Load() < latestBlock {
b.lastBlockSeen.Store(latestBlock)

latestLogBlock, uniqueBlocks := blockStatistics(logs...)
if lastBlockSeen := b.lastBlockSeen.Load(); lastBlockSeen < latestLogBlock {
b.lastBlockSeen.Store(latestLogBlock)
} else if latestLogBlock < lastBlockSeen {
b.lggr.Debugw("enqueuing logs from a block older than latest seen block", "logBlock", latestLogBlock, "lastBlockSeen", lastBlockSeen)
}

b.trackBlockNumbersForUpkeep(uid, uniqueBlocks)

blockThreshold := b.lastBlockSeen.Load() - int64(b.opts.lookback.Load())
if blockThreshold <= 0 {
blockThreshold = 1
}

// clean up enqueued block counts
for block := range b.enqueuedBlocks {
if block < blockThreshold {
delete(b.enqueuedBlocks, block)
}
}

return buf.enqueue(blockThreshold, logs...)
}

// trackBlockNumbersForUpkeep keeps track of the number of times we enqueue logs for an upkeep,
// for a specific block number. The expectation is that we will only enqueue logs for an upkeep for a
// specific block number once, i.e. all logs for an upkeep for a block, will be enqueued in a single
// enqueue call. In the event that we see upkeep logs enqueued for a particular block more than once,
// we log a message.
func (b *logBuffer) trackBlockNumbersForUpkeep(uid *big.Int, uniqueBlocks map[int64]bool) {
for blockNumber := range uniqueBlocks {
if blockNumbers, ok := b.enqueuedBlocks[blockNumber]; ok {
if upkeepBlockInstances, ok := blockNumbers[uid.String()]; ok {
blockNumbers[uid.String()] = upkeepBlockInstances + 1
b.lggr.Debugw("enqueuing logs again for a previously seen block for this upkeep", "blockNumber", blockNumber, "numberOfEnqueues", b.enqueuedBlocks[blockNumber], "upkeepID", uid.String())
Comment on lines +139 to +140
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this logic will change with reorg handling?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes; based on what I have in the spike for reorgs, we should have a set of block numbers available to us to perform this check against 👍

} else {
blockNumbers[uid.String()] = 1
}
b.enqueuedBlocks[blockNumber] = blockNumbers
} else {
b.enqueuedBlocks[blockNumber] = map[string]int{
uid.String(): 1,
}
}
}
}

// Dequeue greedly pulls logs from the buffers.
// Returns logs and the number of remaining logs in the buffer.
func (b *logBuffer) Dequeue(block int64, blockRate, upkeepLimit, maxResults int, upkeepSelector func(id *big.Int) bool) ([]BufferedLog, int) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
Expand Down Expand Up @@ -50,6 +51,96 @@ func TestLogEventBufferV1_SyncFilters(t *testing.T) {
require.Equal(t, 1, buf.NumOfUpkeeps())
}

type readableLogger struct {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for the purposes of catching and validating log messages within the tests

logger.Logger
DebugwFn func(msg string, keysAndValues ...interface{})
NamedFn func(name string) logger.Logger
WithFn func(args ...interface{}) logger.Logger
}

func (l *readableLogger) Debugw(msg string, keysAndValues ...interface{}) {
l.DebugwFn(msg, keysAndValues...)
}

func (l *readableLogger) Named(name string) logger.Logger {
return l
}

func (l *readableLogger) With(args ...interface{}) logger.Logger {
return l
}

func TestLogEventBufferV1_Enqueueviolations(t *testing.T) {
t.Run("enqueuing logs for a block older than latest seen logs a message", func(t *testing.T) {
logReceived := false
readableLogger := &readableLogger{
DebugwFn: func(msg string, keysAndValues ...interface{}) {
if msg == "enqueuing logs from a block older than latest seen block" {
logReceived = true
assert.Equal(t, "logBlock", keysAndValues[0])
assert.Equal(t, int64(1), keysAndValues[1])
assert.Equal(t, "lastBlockSeen", keysAndValues[2])
assert.Equal(t, int64(2), keysAndValues[3])
}
},
}

logBufferV1 := NewLogBuffer(readableLogger, 10, 20, 1)

buf := logBufferV1.(*logBuffer)

buf.Enqueue(big.NewInt(1),
logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x1"), LogIndex: 0},
logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x1"), LogIndex: 1},
)
buf.Enqueue(big.NewInt(2),
logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x2"), LogIndex: 0},
)

assert.Equal(t, 1, buf.enqueuedBlocks[2]["1"])
assert.Equal(t, 1, buf.enqueuedBlocks[1]["2"])
assert.True(t, true, logReceived)
})

t.Run("enqueuing logs for the same block over multiple calls logs a message", func(t *testing.T) {
logReceived := false
readableLogger := &readableLogger{
DebugwFn: func(msg string, keysAndValues ...interface{}) {
if msg == "enqueuing logs again for a previously seen block" {
logReceived = true
assert.Equal(t, "blockNumber", keysAndValues[0])
assert.Equal(t, int64(3), keysAndValues[1])
assert.Equal(t, "numberOfEnqueues", keysAndValues[2])
assert.Equal(t, 2, keysAndValues[3])
}
},
}

logBufferV1 := NewLogBuffer(readableLogger, 10, 20, 1)

buf := logBufferV1.(*logBuffer)

buf.Enqueue(big.NewInt(1),
logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0},
logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 1},
)
buf.Enqueue(big.NewInt(2),
logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x2"), LogIndex: 0},
)
buf.Enqueue(big.NewInt(3),
logpoller.Log{BlockNumber: 3, TxHash: common.HexToHash("0x3a"), LogIndex: 0},
)
buf.Enqueue(big.NewInt(3),
logpoller.Log{BlockNumber: 3, TxHash: common.HexToHash("0x3b"), LogIndex: 0},
)

assert.Equal(t, 1, buf.enqueuedBlocks[2]["2"])
assert.Equal(t, 1, buf.enqueuedBlocks[1]["1"])
assert.Equal(t, 2, buf.enqueuedBlocks[3]["3"])
assert.True(t, true, logReceived)
})
}

func TestLogEventBufferV1_Dequeue(t *testing.T) {
tests := []struct {
name string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
)

// LogSorter sorts the logs based on block number, tx hash and log index.
// LogSorter sorts the logs primarily by block number, then by log index, and finally by tx hash.
// returns true if b should come before a.
func LogSorter(a, b logpoller.Log) bool {
return LogComparator(a, b) > 0
Expand Down Expand Up @@ -57,13 +57,17 @@ func logID(l logpoller.Log) string {
return hex.EncodeToString(ext.LogIdentifier())
}

// latestBlockNumber returns the latest block number from the given logs
func latestBlockNumber(logs ...logpoller.Log) int64 {
// blockStatistics returns the latest block number from the given logs, and a map of unique block numbers
func blockStatistics(logs ...logpoller.Log) (int64, map[int64]bool) {
var latest int64
uniqueBlocks := map[int64]bool{}

for _, l := range logs {
if l.BlockNumber > latest {
latest = l.BlockNumber
}
uniqueBlocks[l.BlockNumber] = true
}
return latest

return latest, uniqueBlocks
}
Loading