From b1c9315776c906bd671c5be404b5cd0c5c34fdba Mon Sep 17 00:00:00 2001 From: ferglor Date: Fri, 5 Jul 2024 16:19:03 +0100 Subject: [PATCH] Dequeue minimum commitment logs for all upkeeps as a priority (#13653) * Track how many logs we dequeue for each block window * When the minimum dequeue commitment has been met for all blocks, perform best effort * WIP tests * Add iteration step * Clean up reorg blocks, lock whole enqueue process * Add tests * Refactor the dequeue loop into two functions * Update the log limit of arbitrum to 2, update tests * Go imports * Lint * Add changeset * Clean up upkeep selector, drop iteration logic * Clean up unused log limit low * Update changeset * Clean up default selector * Set lookback blocks to 128 * Add comments * Update comments * Update tests * Clean up dequeued counts * Update logs * More accurate logs * Tidy up logging * - Add log limit to buffer opts - Use a default log limit of 1 - Simplify dequeue interface * Clean up based on start window * Dequeue the smallest of either capacity or remaining log limit * Evict reorgd blocks outside of blockStatistics --- .changeset/fluffy-ghosts-sneeze.md | 5 + .../evmregistry/v21/logprovider/buffer_v1.go | 340 ++++++++++------- .../v21/logprovider/buffer_v1_test.go | 162 +------- .../evmregistry/v21/logprovider/factory.go | 8 +- .../evmregistry/v21/logprovider/log.go | 15 - .../evmregistry/v21/logprovider/provider.go | 102 ++--- .../v21/logprovider/provider_test.go | 348 ++++++++++++++++++ 7 files changed, 630 insertions(+), 350 deletions(-) create mode 100644 .changeset/fluffy-ghosts-sneeze.md diff --git a/.changeset/fluffy-ghosts-sneeze.md b/.changeset/fluffy-ghosts-sneeze.md new file mode 100644 index 00000000000..48503995c23 --- /dev/null +++ b/.changeset/fluffy-ghosts-sneeze.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +Dequeue minimum guaranteed upkeeps as a priority #changed diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go index fe15e962e53..e58d5ad9c93 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1.go @@ -1,7 +1,6 @@ package logprovider import ( - "math" "math/big" "sort" "sync" @@ -22,11 +21,11 @@ type LogBuffer interface { // given upkeep was exceeded. Returns the number of logs that were added and number of logs that were dropped. Enqueue(id *big.Int, logs ...logpoller.Log) (added int, dropped int) // Dequeue pulls logs from the buffer that are within the given block window, - // with a maximum number of logs per upkeep and a total maximum number of logs to return. - // It also accepts a function to select upkeeps. + // with a maximum number of logs to return. + // It also accepts a boolean to identify if we are operating under minimum dequeue. // Returns logs (associated to upkeeps) and the number of remaining // logs in that window for the involved upkeeps. - Dequeue(block int64, blockRate, upkeepLimit, maxResults int, upkeepSelector func(id *big.Int) bool) ([]BufferedLog, int) + Dequeue(startWindowBlock int64, maxResults int, minimumDequeue bool) ([]BufferedLog, int) // SetConfig sets the buffer size and the maximum number of logs to keep for each upkeep. SetConfig(lookback, blockRate, logLimit uint32) // NumOfUpkeeps returns the number of upkeeps that are being tracked by the buffer. @@ -35,10 +34,6 @@ type LogBuffer interface { SyncFilters(filterStore UpkeepFilterStore) error } -func DefaultUpkeepSelector(id *big.Int) bool { - return true -} - type logBufferOptions struct { // number of blocks to keep in the buffer lookback *atomic.Uint32 @@ -46,6 +41,8 @@ type logBufferOptions struct { blockRate *atomic.Uint32 // max number of logs to keep in the buffer for each upkeep per window (LogLimit*10) windowLimit *atomic.Uint32 + // number of logs we need to dequeue per upkeep per block window at a minimum + logLimit *atomic.Uint32 } func newLogBufferOptions(lookback, blockRate, logLimit uint32) *logBufferOptions { @@ -53,6 +50,7 @@ func newLogBufferOptions(lookback, blockRate, logLimit uint32) *logBufferOptions windowLimit: new(atomic.Uint32), lookback: new(atomic.Uint32), blockRate: new(atomic.Uint32), + logLimit: new(atomic.Uint32), } opts.override(lookback, blockRate, logLimit) @@ -63,10 +61,7 @@ func (o *logBufferOptions) override(lookback, blockRate, logLimit uint32) { o.windowLimit.Store(logLimit * 10) o.lookback.Store(lookback) o.blockRate.Store(blockRate) -} - -func (o *logBufferOptions) windows() int { - return int(math.Ceil(float64(o.lookback.Load()) / float64(o.blockRate.Load()))) + o.logLimit.Store(logLimit) } type logBuffer struct { @@ -75,21 +70,21 @@ type logBuffer struct { // last block number seen by the buffer lastBlockSeen *atomic.Int64 // map of upkeep id to its queue - queues map[string]*upkeepLogQueue - lock sync.RWMutex + queues map[string]*upkeepLogQueue + queueIDs []string + blockHashes map[int64]string - // map for then number of times we have enqueued logs for a block number - enqueuedBlocks map[int64]map[string]int - enqueuedBlockLock sync.RWMutex + lock sync.RWMutex } func NewLogBuffer(lggr logger.Logger, lookback, blockRate, logLimit uint32) LogBuffer { return &logBuffer{ - lggr: lggr.Named("KeepersRegistry.LogEventBufferV1"), - opts: newLogBufferOptions(lookback, blockRate, logLimit), - lastBlockSeen: new(atomic.Int64), - enqueuedBlocks: map[int64]map[string]int{}, - queues: make(map[string]*upkeepLogQueue), + lggr: lggr.Named("KeepersRegistry.LogEventBufferV1"), + opts: newLogBufferOptions(lookback, blockRate, logLimit), + lastBlockSeen: new(atomic.Int64), + queueIDs: []string{}, + blockHashes: map[int64]string{}, + queues: make(map[string]*upkeepLogQueue), } } @@ -99,67 +94,64 @@ func NewLogBuffer(lggr logger.Logger, lookback, blockRate, logLimit uint32) LogB // All logs for an upkeep on a particular block will be enqueued in a single Enqueue call. // Returns the number of logs that were added and number of logs that were dropped. func (b *logBuffer) Enqueue(uid *big.Int, logs ...logpoller.Log) (int, int) { + b.lock.Lock() + defer b.lock.Unlock() + buf, ok := b.getUpkeepQueue(uid) if !ok || buf == nil { buf = newUpkeepLogQueue(b.lggr, uid, b.opts) b.setUpkeepQueue(uid, buf) } - latestLogBlock, uniqueBlocks := blockStatistics(logs...) + latestLogBlock, reorgBlocks := b.blockStatistics(logs...) + + if len(reorgBlocks) > 0 { + b.evictReorgdLogs(reorgBlocks) + } + if lastBlockSeen := b.lastBlockSeen.Load(); lastBlockSeen < latestLogBlock { b.lastBlockSeen.Store(latestLogBlock) } else if latestLogBlock < lastBlockSeen { b.lggr.Debugw("enqueuing logs with a latest block older older than latest seen block", "logBlock", latestLogBlock, "lastBlockSeen", lastBlockSeen) } - b.trackBlockNumbersForUpkeep(uid, uniqueBlocks) - blockThreshold := b.lastBlockSeen.Load() - int64(b.opts.lookback.Load()) + blockThreshold, _ = getBlockWindow(blockThreshold, int(b.opts.blockRate.Load())) if blockThreshold <= 0 { blockThreshold = 1 } - b.cleanupEnqueuedBlocks(blockThreshold) - return buf.enqueue(blockThreshold, logs...) } -func (b *logBuffer) cleanupEnqueuedBlocks(blockThreshold int64) { - b.enqueuedBlockLock.Lock() - defer b.enqueuedBlockLock.Unlock() - // clean up enqueued block counts - for block := range b.enqueuedBlocks { - if block < blockThreshold { - delete(b.enqueuedBlocks, block) +// blockStatistics returns the latest block number from the given logs, and updates any blocks that have been reorgd +func (b *logBuffer) blockStatistics(logs ...logpoller.Log) (int64, map[int64]bool) { + var latest int64 + reorgBlocks := map[int64]bool{} + + for _, l := range logs { + if l.BlockNumber > latest { + latest = l.BlockNumber } + if hash, ok := b.blockHashes[l.BlockNumber]; ok { + if hash != l.BlockHash.String() { + reorgBlocks[l.BlockNumber] = true + b.lggr.Debugw("encountered reorgd block", "blockNumber", l.BlockNumber) + } + } + b.blockHashes[l.BlockNumber] = l.BlockHash.String() } -} -// trackBlockNumbersForUpkeep keeps track of the number of times we enqueue logs for an upkeep, -// for a specific block number. The expectation is that we will only enqueue logs for an upkeep for a -// specific block number once, i.e. all logs for an upkeep for a block, will be enqueued in a single -// enqueue call. In the event that we see upkeep logs enqueued for a particular block more than once, -// we log a message. -func (b *logBuffer) trackBlockNumbersForUpkeep(uid *big.Int, uniqueBlocks map[int64]bool) { - b.enqueuedBlockLock.Lock() - defer b.enqueuedBlockLock.Unlock() - - if uid == nil { - return - } + return latest, reorgBlocks +} - for blockNumber := range uniqueBlocks { - if blockNumbers, ok := b.enqueuedBlocks[blockNumber]; ok { - if upkeepBlockInstances, ok := blockNumbers[uid.String()]; ok { - blockNumbers[uid.String()] = upkeepBlockInstances + 1 - b.lggr.Debugw("enqueuing logs again for a previously seen block for this upkeep", "blockNumber", blockNumber, "numberOfEnqueues", b.enqueuedBlocks[blockNumber], "upkeepID", uid.String()) - } else { - blockNumbers[uid.String()] = 1 - } - b.enqueuedBlocks[blockNumber] = blockNumbers - } else { - b.enqueuedBlocks[blockNumber] = map[string]int{ - uid.String(): 1, +func (b *logBuffer) evictReorgdLogs(reorgBlocks map[int64]bool) { + for blockNumber := range reorgBlocks { + start, _ := getBlockWindow(blockNumber, int(b.opts.blockRate.Load())) + for _, queue := range b.queues { + if _, ok := queue.logs[blockNumber]; ok { + queue.logs[blockNumber] = []logpoller.Log{} + queue.dequeued[start] = 0 } } } @@ -167,27 +159,36 @@ func (b *logBuffer) trackBlockNumbersForUpkeep(uid *big.Int, uniqueBlocks map[in // Dequeue greedly pulls logs from the buffers. // Returns logs and the number of remaining logs in the buffer. -func (b *logBuffer) Dequeue(block int64, blockRate, upkeepLimit, maxResults int, upkeepSelector func(id *big.Int) bool) ([]BufferedLog, int) { +func (b *logBuffer) Dequeue(startWindowBlock int64, maxResults int, bestEffort bool) ([]BufferedLog, int) { b.lock.RLock() defer b.lock.RUnlock() - start, end := getBlockWindow(block, blockRate) - return b.dequeue(start, end, upkeepLimit, maxResults, upkeepSelector) + return b.dequeue(startWindowBlock, maxResults, bestEffort) } -// dequeue pulls logs from the buffers, depends the given selector (upkeepSelector), -// in block range [start,end] with minimum number of results per upkeep (upkeepLimit) -// and the maximum number of results (capacity). +// dequeue pulls logs from the buffers, in block range [start,end] with minimum number +// of results per upkeep (upkeepLimit) and the maximum number of results (capacity). +// If operating under minimum dequeue, upkeeps are skipped when the minimum number +// of logs have been dequeued for that upkeep. // Returns logs and the number of remaining logs in the buffer for the given range and selector. // NOTE: this method is not thread safe and should be called within a lock. -func (b *logBuffer) dequeue(start, end int64, upkeepLimit, capacity int, upkeepSelector func(id *big.Int) bool) ([]BufferedLog, int) { +func (b *logBuffer) dequeue(start int64, capacity int, minimumDequeue bool) ([]BufferedLog, int) { var result []BufferedLog var remainingLogs int - for _, q := range b.queues { - if !upkeepSelector(q.id) { - // if the upkeep is not selected, skip it + minimumDequeueMet := 0 + + logLimit := int(b.opts.logLimit.Load()) + end := start + int64(b.opts.blockRate.Load()) + + for _, qid := range b.queueIDs { + q := b.queues[qid] + + if minimumDequeue && q.dequeued[start] >= logLimit { + // if we have already dequeued the minimum commitment for this window, skip it + minimumDequeueMet++ continue } + logsInRange := q.sizeOfRange(start, end) if logsInRange == 0 { // if there are no logs in the range, skip the upkeep @@ -198,17 +199,26 @@ func (b *logBuffer) dequeue(start, end int64, upkeepLimit, capacity int, upkeepS remainingLogs += logsInRange continue } - if upkeepLimit > capacity { - // adjust limit if it is higher than the actual capacity - upkeepLimit = capacity + + var logs []logpoller.Log + remaining := 0 + + if minimumDequeue { + logs, remaining = q.dequeue(start, end, min(capacity, logLimit-q.dequeued[start])) + } else { + logs, remaining = q.dequeue(start, end, capacity) } - logs, remaining := q.dequeue(start, end, upkeepLimit) + for _, l := range logs { result = append(result, BufferedLog{ID: q.id, Log: l}) capacity-- } remainingLogs += remaining + + // update the buffer with how many logs we have dequeued for this window + q.dequeued[start] += len(logs) } + b.lggr.Debugw("minimum commitment logs dequeued", "start", start, "end", end, "numUpkeeps", len(b.queues), "minimumDequeueMet", minimumDequeueMet) return result, remainingLogs } @@ -230,30 +240,34 @@ func (b *logBuffer) SyncFilters(filterStore UpkeepFilterStore) error { b.lock.Lock() defer b.lock.Unlock() - for upkeepID := range b.queues { + var newQueueIDs []string + + for _, upkeepID := range b.queueIDs { uid := new(big.Int) _, ok := uid.SetString(upkeepID, 10) if ok && !filterStore.Has(uid) { // remove upkeep that is not in the filter store delete(b.queues, upkeepID) + } else { + newQueueIDs = append(newQueueIDs, upkeepID) } } + b.queueIDs = newQueueIDs + return nil } func (b *logBuffer) getUpkeepQueue(uid *big.Int) (*upkeepLogQueue, bool) { - b.lock.RLock() - defer b.lock.RUnlock() - ub, ok := b.queues[uid.String()] return ub, ok } func (b *logBuffer) setUpkeepQueue(uid *big.Int, buf *upkeepLogQueue) { - b.lock.Lock() - defer b.lock.Unlock() - + if _, ok := b.queues[uid.String()]; !ok { + b.queueIDs = append(b.queueIDs, uid.String()) + sort.Slice(b.queueIDs, func(i, j int) bool { return b.queueIDs[i] < b.queueIDs[j] }) + } b.queues[uid.String()] = buf } @@ -287,21 +301,25 @@ type upkeepLogQueue struct { opts *logBufferOptions // logs is the buffer of logs for the upkeep - logs []logpoller.Log + logs map[int64][]logpoller.Log + blockNumbers []int64 + // states keeps track of the state of the logs that are known to the queue // and the block number they were seen at - states map[string]logTriggerStateEntry - lock sync.RWMutex + states map[string]logTriggerStateEntry + dequeued map[int64]int + lock sync.RWMutex } func newUpkeepLogQueue(lggr logger.Logger, id *big.Int, opts *logBufferOptions) *upkeepLogQueue { - maxLogs := int(opts.windowLimit.Load()) * opts.windows() // limit per window * windows return &upkeepLogQueue{ - lggr: lggr.With("upkeepID", id.String()), - id: id, - opts: opts, - logs: make([]logpoller.Log, 0, maxLogs), - states: make(map[string]logTriggerStateEntry), + lggr: lggr.With("upkeepID", id.String()), + id: id, + opts: opts, + logs: map[int64][]logpoller.Log{}, + blockNumbers: make([]int64, 0), + states: make(map[string]logTriggerStateEntry), + dequeued: map[int64]int{}, } } @@ -311,9 +329,9 @@ func (q *upkeepLogQueue) sizeOfRange(start, end int64) int { defer q.lock.RUnlock() size := 0 - for _, l := range q.logs { - if l.BlockNumber >= start && l.BlockNumber <= end { - size++ + for blockNumber, logs := range q.logs { + if blockNumber >= start && blockNumber <= end { + size += len(logs) } } return size @@ -331,9 +349,11 @@ func (q *upkeepLogQueue) dequeue(start, end int64, limit int) ([]logpoller.Log, var results []logpoller.Log var remaining int - updatedLogs := make([]logpoller.Log, 0) - for _, l := range q.logs { - if l.BlockNumber >= start && l.BlockNumber <= end { + + for blockNumber := start; blockNumber <= end; blockNumber++ { + updatedLogs := make([]logpoller.Log, 0) + blockResults := 0 + for _, l := range q.logs[blockNumber] { if len(results) < limit { results = append(results, l) lid := logID(l) @@ -341,15 +361,18 @@ func (q *upkeepLogQueue) dequeue(start, end int64, limit int) ([]logpoller.Log, s.state = logTriggerStateDequeued q.states[lid] = s } - continue + blockResults++ + } else { + remaining++ + updatedLogs = append(updatedLogs, l) } - remaining++ } - updatedLogs = append(updatedLogs, l) + if blockResults > 0 { + q.logs[blockNumber] = updatedLogs + } } if len(results) > 0 { - q.logs = updatedLogs q.lggr.Debugw("Dequeued logs", "start", start, "end", end, "limit", limit, "results", len(results), "remaining", remaining) } @@ -362,10 +385,6 @@ func (q *upkeepLogQueue) dequeue(start, end int64, limit int) ([]logpoller.Log, // given upkeep was exceeded. Additionally, it will drop logs that are older than blockThreshold. // Returns the number of logs that were added and number of logs that were dropped. func (q *upkeepLogQueue) enqueue(blockThreshold int64, logsToAdd ...logpoller.Log) (int, int) { - q.lock.Lock() - defer q.lock.Unlock() - - logs := q.logs var added int for _, log := range logsToAdd { if log.BlockNumber < blockThreshold { @@ -379,9 +398,15 @@ func (q *upkeepLogQueue) enqueue(blockThreshold int64, logsToAdd ...logpoller.Lo } q.states[lid] = logTriggerStateEntry{state: logTriggerStateEnqueued, block: log.BlockNumber} added++ - logs = append(logs, log) + if logList, ok := q.logs[log.BlockNumber]; ok { + logList = append(logList, log) + q.logs[log.BlockNumber] = logList + } else { + q.logs[log.BlockNumber] = []logpoller.Log{log} + q.blockNumbers = append(q.blockNumbers, log.BlockNumber) + sort.Slice(q.blockNumbers, func(i, j int) bool { return q.blockNumbers[i] < q.blockNumbers[j] }) + } } - q.logs = logs var dropped int if added > 0 { @@ -402,70 +427,99 @@ func (q *upkeepLogQueue) orderLogs() { // sort logs by block number, tx hash and log index // to keep the q sorted and to ensure that logs can be // grouped by block windows for the cleanup - sort.SliceStable(q.logs, func(i, j int) bool { - return LogSorter(q.logs[i], q.logs[j]) - }) + for _, blockNumber := range q.blockNumbers { + toSort := q.logs[blockNumber] + sort.SliceStable(toSort, func(i, j int) bool { + return LogSorter(toSort[i], toSort[j]) + }) + q.logs[blockNumber] = toSort + } } // clean removes logs that are older than blockThreshold and drops logs if the limit for the // given upkeep was exceeded. Returns the number of logs that were dropped. // NOTE: this method is not thread safe and should be called within a lock. func (q *upkeepLogQueue) clean(blockThreshold int64) int { - var dropped, expired int + var totalDropped int + blockRate := int(q.opts.blockRate.Load()) windowLimit := int(q.opts.windowLimit.Load()) - updated := make([]logpoller.Log, 0) // helper variables to keep track of the current window capacity currentWindowCapacity, currentWindowStart := 0, int64(0) - for _, l := range q.logs { - if blockThreshold > l.BlockNumber { // old log, removed - prommetrics.AutomationLogBufferFlow.WithLabelValues(prommetrics.LogBufferFlowDirectionExpired).Inc() - // q.lggr.Debugw("Expiring old log", "blockNumber", l.BlockNumber, "blockThreshold", blockThreshold, "logIndex", l.LogIndex) - logid := logID(l) - delete(q.states, logid) - expired++ - continue - } - start, _ := getBlockWindow(l.BlockNumber, blockRate) - if start != currentWindowStart { - // new window, reset capacity - currentWindowStart = start - currentWindowCapacity = 0 + oldBlockNumbers := make([]int64, 0) + blockNumbers := make([]int64, 0) + + for _, blockNumber := range q.blockNumbers { + var dropped, expired int + + logs := q.logs[blockNumber] + updated := make([]logpoller.Log, 0) + + if blockThreshold > blockNumber { + oldBlockNumbers = append(oldBlockNumbers, blockNumber) + } else { + blockNumbers = append(blockNumbers, blockNumber) } - currentWindowCapacity++ - // if capacity has been reached, drop the log - if currentWindowCapacity > windowLimit { - lid := logID(l) - if s, ok := q.states[lid]; ok { - s.state = logTriggerStateDropped - q.states[lid] = s + + for _, l := range logs { + if blockThreshold > l.BlockNumber { // old log, removed + prommetrics.AutomationLogBufferFlow.WithLabelValues(prommetrics.LogBufferFlowDirectionExpired).Inc() + // q.lggr.Debugw("Expiring old log", "blockNumber", l.BlockNumber, "blockThreshold", blockThreshold, "logIndex", l.LogIndex) + logid := logID(l) + delete(q.states, logid) + expired++ + continue + } + start, _ := getBlockWindow(l.BlockNumber, blockRate) + if start != currentWindowStart { + // new window, reset capacity + currentWindowStart = start + currentWindowCapacity = 0 } - dropped++ - prommetrics.AutomationLogBufferFlow.WithLabelValues(prommetrics.LogBufferFlowDirectionDropped).Inc() - q.lggr.Debugw("Reached log buffer limits, dropping log", "blockNumber", l.BlockNumber, - "blockHash", l.BlockHash, "txHash", l.TxHash, "logIndex", l.LogIndex, "len updated", len(updated), - "currentWindowStart", currentWindowStart, "currentWindowCapacity", currentWindowCapacity, - "maxLogsPerWindow", windowLimit, "blockRate", blockRate) + currentWindowCapacity++ + // if capacity has been reached, drop the log + if currentWindowCapacity > windowLimit { + lid := logID(l) + if s, ok := q.states[lid]; ok { + s.state = logTriggerStateDropped + q.states[lid] = s + } + dropped++ + prommetrics.AutomationLogBufferFlow.WithLabelValues(prommetrics.LogBufferFlowDirectionDropped).Inc() + q.lggr.Debugw("Reached log buffer limits, dropping log", "blockNumber", l.BlockNumber, + "blockHash", l.BlockHash, "txHash", l.TxHash, "logIndex", l.LogIndex, "len updated", len(updated), + "currentWindowStart", currentWindowStart, "currentWindowCapacity", currentWindowCapacity, + "maxLogsPerWindow", windowLimit, "blockRate", blockRate) + continue + } + updated = append(updated, l) + } + + if dropped > 0 || expired > 0 { + totalDropped += dropped + q.logs[blockNumber] = updated + q.lggr.Debugw("Cleaned logs", "dropped", dropped, "expired", expired, "blockThreshold", blockThreshold, "len updated", len(updated), "len before", len(q.logs)) continue } - updated = append(updated, l) } - if dropped > 0 || expired > 0 { - q.lggr.Debugw("Cleaned logs", "dropped", dropped, "expired", expired, "blockThreshold", blockThreshold, "len updated", len(updated), "len before", len(q.logs)) - q.logs = updated + for _, blockNumber := range oldBlockNumbers { + delete(q.logs, blockNumber) + startWindow, _ := getBlockWindow(blockNumber, int(q.opts.blockRate.Load())) + delete(q.dequeued, startWindow) } + q.blockNumbers = blockNumbers q.cleanStates(blockThreshold) - return dropped + return totalDropped } // cleanStates removes states that are older than blockThreshold. // NOTE: this method is not thread safe and should be called within a lock. func (q *upkeepLogQueue) cleanStates(blockThreshold int64) { for lid, s := range q.states { - if s.block <= blockThreshold { + if s.block < blockThreshold { delete(q.states, lid) } } diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go index c41dd3d9bcc..f742d39689c 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer_v1_test.go @@ -23,12 +23,12 @@ func TestLogEventBufferV1(t *testing.T) { logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x2"), LogIndex: 0}, logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x1"), LogIndex: 2}, ) - results, remaining := buf.Dequeue(int64(1), 10, 1, 2, DefaultUpkeepSelector) + results, remaining := buf.Dequeue(int64(1), 2, true) require.Equal(t, 2, len(results)) require.Equal(t, 2, remaining) require.True(t, results[0].ID.Cmp(results[1].ID) != 0) - results, remaining = buf.Dequeue(int64(1), 10, 1, 2, DefaultUpkeepSelector) - require.Equal(t, 2, len(results)) + results, remaining = buf.Dequeue(int64(1), 2, true) + require.Equal(t, 0, len(results)) require.Equal(t, 0, remaining) } @@ -97,8 +97,6 @@ func TestLogEventBufferV1_EnqueueViolations(t *testing.T) { logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x2"), LogIndex: 0}, ) - assert.Equal(t, 1, buf.enqueuedBlocks[2]["1"]) - assert.Equal(t, 1, buf.enqueuedBlocks[1]["2"]) assert.True(t, true, logReceived) }) @@ -134,9 +132,6 @@ func TestLogEventBufferV1_EnqueueViolations(t *testing.T) { logpoller.Log{BlockNumber: 3, TxHash: common.HexToHash("0x3b"), LogIndex: 0}, ) - assert.Equal(t, 1, buf.enqueuedBlocks[2]["2"]) - assert.Equal(t, 1, buf.enqueuedBlocks[1]["1"]) - assert.Equal(t, 2, buf.enqueuedBlocks[3]["3"]) assert.True(t, true, logReceived) }) } @@ -153,7 +148,7 @@ func TestLogEventBufferV1_Dequeue(t *testing.T) { { name: "empty", logsInBuffer: map[*big.Int][]logpoller.Log{}, - args: newDequeueArgs(10, 1, 1, 10, nil), + args: newDequeueArgs(10, 1, 1, 10), lookback: 20, results: []logpoller.Log{}, }, @@ -165,7 +160,7 @@ func TestLogEventBufferV1_Dequeue(t *testing.T) { {BlockNumber: 14, TxHash: common.HexToHash("0x15"), LogIndex: 1}, }, }, - args: newDequeueArgs(10, 5, 3, 10, nil), + args: newDequeueArgs(10, 5, 3, 10), lookback: 20, results: []logpoller.Log{ {}, {}, @@ -191,7 +186,7 @@ func TestLogEventBufferV1_Dequeue(t *testing.T) { {BlockNumber: 14, TxHash: common.HexToHash("0x14"), LogIndex: 12}, }, }, - args: newDequeueArgs(10, 5, 2, 10, nil), + args: newDequeueArgs(10, 5, 2, 10), lookback: 20, results: []logpoller.Log{ {}, {}, {}, {}, @@ -204,25 +199,13 @@ func TestLogEventBufferV1_Dequeue(t *testing.T) { big.NewInt(1): append(createDummyLogSequence(2, 0, 12, common.HexToHash("0x12")), createDummyLogSequence(2, 0, 13, common.HexToHash("0x13"))...), big.NewInt(2): append(createDummyLogSequence(2, 10, 12, common.HexToHash("0x12")), createDummyLogSequence(2, 10, 13, common.HexToHash("0x13"))...), }, - args: newDequeueArgs(10, 5, 3, 4, nil), + args: newDequeueArgs(10, 5, 3, 4), lookback: 20, results: []logpoller.Log{ {}, {}, {}, {}, }, remaining: 4, }, - { - name: "with upkeep selector", - logsInBuffer: map[*big.Int][]logpoller.Log{ - big.NewInt(1): { - {BlockNumber: 12, TxHash: common.HexToHash("0x12"), LogIndex: 0}, - {BlockNumber: 14, TxHash: common.HexToHash("0x15"), LogIndex: 1}, - }, - }, - args: newDequeueArgs(10, 5, 5, 10, func(id *big.Int) bool { return false }), - lookback: 20, - results: []logpoller.Log{}, - }, } for _, tc := range tests { @@ -232,7 +215,9 @@ func TestLogEventBufferV1_Dequeue(t *testing.T) { added, dropped := buf.Enqueue(id, logs...) require.Equal(t, len(logs), added+dropped) } - results, remaining := buf.Dequeue(tc.args.block, tc.args.blockRate, tc.args.upkeepLimit, tc.args.maxResults, tc.args.upkeepSelector) + start, _ := getBlockWindow(tc.args.block, tc.args.blockRate) + + results, remaining := buf.Dequeue(start, tc.args.maxResults, true) require.Equal(t, len(tc.results), len(results)) require.Equal(t, tc.remaining, remaining) }) @@ -518,25 +503,20 @@ func TestLogEventBufferV1_BlockWindow(t *testing.T) { } type dequeueArgs struct { - block int64 - blockRate int - upkeepLimit int - maxResults int - upkeepSelector func(id *big.Int) bool + block int64 + blockRate int + upkeepLimit int + maxResults int } -func newDequeueArgs(block int64, blockRate int, upkeepLimit int, maxResults int, upkeepSelector func(id *big.Int) bool) dequeueArgs { +func newDequeueArgs(block int64, blockRate int, upkeepLimit int, maxResults int) dequeueArgs { args := dequeueArgs{ - block: block, - blockRate: blockRate, - upkeepLimit: upkeepLimit, - maxResults: maxResults, - upkeepSelector: upkeepSelector, + block: block, + blockRate: blockRate, + upkeepLimit: upkeepLimit, + maxResults: maxResults, } - if upkeepSelector == nil { - args.upkeepSelector = DefaultUpkeepSelector - } if blockRate == 0 { args.blockRate = 1 } @@ -561,107 +541,3 @@ func createDummyLogSequence(n, startIndex int, block int64, tx common.Hash) []lo } return logs } - -func Test_trackBlockNumbersForUpkeep(t *testing.T) { - buf := NewLogBuffer(logger.TestLogger(t), 10, 20, 1) - - logBuffer := buf.(*logBuffer) - - for _, tc := range []struct { - uid *big.Int - uniqueBlocks map[int64]bool - wantEnqueuedBlocks map[int64]map[string]int - }{ - { - uid: big.NewInt(1), - uniqueBlocks: map[int64]bool{ - 1: true, - 2: true, - 3: true, - }, - wantEnqueuedBlocks: map[int64]map[string]int{ - 1: { - "1": 1, - }, - 2: { - "1": 1, - }, - 3: { - "1": 1, - }, - }, - }, - { - uid: big.NewInt(2), - uniqueBlocks: map[int64]bool{ - 1: true, - 2: true, - 3: true, - }, - wantEnqueuedBlocks: map[int64]map[string]int{ - 1: { - "1": 1, - "2": 1, - }, - 2: { - "1": 1, - "2": 1, - }, - 3: { - "1": 1, - "2": 1, - }, - }, - }, - { - uid: big.NewInt(2), - uniqueBlocks: map[int64]bool{ - 3: true, - 4: true, - }, - wantEnqueuedBlocks: map[int64]map[string]int{ - 1: { - "1": 1, - "2": 1, - }, - 2: { - "1": 1, - "2": 1, - }, - 3: { - "1": 1, - "2": 2, - }, - 4: { - "2": 1, - }, - }, - }, - { - uniqueBlocks: map[int64]bool{ - 3: true, - 4: true, - }, - wantEnqueuedBlocks: map[int64]map[string]int{ - 1: { - "1": 1, - "2": 1, - }, - 2: { - "1": 1, - "2": 1, - }, - 3: { - "1": 1, - "2": 2, - }, - 4: { - "2": 1, - }, - }, - }, - } { - logBuffer.trackBlockNumbersForUpkeep(tc.uid, tc.uniqueBlocks) - assert.Equal(t, tc.wantEnqueuedBlocks, logBuffer.enqueuedBlocks) - } -} diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/factory.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/factory.go index 64833f9269b..19302624b49 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/factory.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/factory.go @@ -63,7 +63,7 @@ func NewOptions(finalityDepth int64, chainID *big.Int) LogTriggersOptions { // NOTE: o.LookbackBlocks should be set only from within tests func (o *LogTriggersOptions) Defaults(finalityDepth int64) { if o.LookbackBlocks == 0 { - lookbackBlocks := int64(200) + lookbackBlocks := int64(100) if lookbackBlocks < finalityDepth { lookbackBlocks = finalityDepth } @@ -86,7 +86,7 @@ func (o *LogTriggersOptions) Defaults(finalityDepth int64) { func (o *LogTriggersOptions) defaultBlockRate() uint32 { switch o.chainID.Int64() { case 42161, 421613, 421614: // Arbitrum - return 4 + return 2 default: return 1 } @@ -94,13 +94,11 @@ func (o *LogTriggersOptions) defaultBlockRate() uint32 { func (o *LogTriggersOptions) defaultLogLimit() uint32 { switch o.chainID.Int64() { - case 42161, 421613, 421614: // Arbitrum - return 1 case 1, 4, 5, 42, 11155111: // Eth return 20 case 10, 420, 56, 97, 137, 80001, 43113, 43114, 8453, 84531: // Optimism, BSC, Polygon, Avax, Base return 5 default: - return 2 + return 1 } } diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/log.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/log.go index 9603d6da5be..ba577f57130 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/log.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/log.go @@ -56,18 +56,3 @@ func logID(l logpoller.Log) string { copy(ext.BlockHash[:], l.BlockHash[:]) return hex.EncodeToString(ext.LogIdentifier()) } - -// blockStatistics returns the latest block number from the given logs, and a map of unique block numbers -func blockStatistics(logs ...logpoller.Log) (int64, map[int64]bool) { - var latest int64 - uniqueBlocks := map[int64]bool{} - - for _, l := range logs { - if l.BlockNumber > latest { - latest = l.BlockNumber - } - uniqueBlocks[l.BlockNumber] = true - } - - return latest, uniqueBlocks -} diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go index 633188c8396..3609d0a4654 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go @@ -48,8 +48,6 @@ var ( readerThreads = 4 bufferSyncInterval = 10 * time.Minute - // logLimitMinimum is how low the log limit can go. - logLimitMinimum = 1 ) // LogTriggerConfig is an alias for log trigger config. @@ -253,29 +251,6 @@ func (p *logEventProvider) createPayload(id *big.Int, log logpoller.Log) (ocr2ke return payload, nil } -// getBufferDequeueArgs returns the arguments for the buffer to dequeue logs. -// It adjust the log limit low based on the number of upkeeps to ensure that more upkeeps get slots in the result set. -func (p *logEventProvider) getBufferDequeueArgs() (blockRate, logLimitLow, maxResults, numOfUpkeeps int) { - blockRate, logLimitLow, maxResults, numOfUpkeeps = int(p.opts.BlockRate), int(p.opts.LogLimit), MaxPayloads, p.bufferV1.NumOfUpkeeps() - // in case we have more upkeeps than the max results, we reduce the log limit low - // so that more upkeeps will get slots in the result set. - for numOfUpkeeps > maxResults/logLimitLow { - if logLimitLow == logLimitMinimum { - // Log limit low can't go less than logLimitMinimum (1). - // If some upkeeps are not getting slots in the result set, they supposed to be picked up - // in the next iteration if the range is still applicable. - // TODO: alerts to notify the system is at full capacity. - // TODO: handle this case properly by distributing available slots across upkeeps to avoid - // starvation when log volume is high. - p.lggr.Warnw("The system is at full capacity", "maxResults", maxResults, "numOfUpkeeps", numOfUpkeeps, "logLimitLow", logLimitLow) - break - } - p.lggr.Debugw("Too many upkeeps, reducing the log limit low", "maxResults", maxResults, "numOfUpkeeps", numOfUpkeeps, "logLimitLow_before", logLimitLow) - logLimitLow-- - } - return -} - func (p *logEventProvider) getLogsFromBuffer(latestBlock int64) []ocr2keepers.UpkeepPayload { var payloads []ocr2keepers.UpkeepPayload @@ -286,25 +261,11 @@ func (p *logEventProvider) getLogsFromBuffer(latestBlock int64) []ocr2keepers.Up switch p.opts.BufferVersion { case BufferVersionV1: - // in v1, we use a greedy approach - we keep dequeuing logs until we reach the max results or cover the entire range. - blockRate, logLimitLow, maxResults, _ := p.getBufferDequeueArgs() - for len(payloads) < maxResults && start <= latestBlock { - logs, remaining := p.bufferV1.Dequeue(start, blockRate, logLimitLow, maxResults-len(payloads), DefaultUpkeepSelector) - if len(logs) > 0 { - p.lggr.Debugw("Dequeued logs", "start", start, "latestBlock", latestBlock, "logs", len(logs)) - } - for _, l := range logs { - payload, err := p.createPayload(l.ID, l.Log) - if err == nil { - payloads = append(payloads, payload) - } - } - if remaining > 0 { - p.lggr.Debugw("Remaining logs", "start", start, "latestBlock", latestBlock, "remaining", remaining) - // TODO: handle remaining logs in a better way than consuming the entire window, e.g. do not repeat more than x times - continue - } - start += int64(blockRate) + payloads = p.minimumCommitmentDequeue(latestBlock, start) + + // if we have remaining capacity following minimum commitment dequeue, perform a best effort dequeue + if len(payloads) < MaxPayloads { + payloads = p.bestEffortDequeue(latestBlock, start, payloads) } default: logs := p.buffer.dequeueRange(start, latestBlock, AllowedLogsPerUpkeep, MaxPayloads) @@ -319,6 +280,59 @@ func (p *logEventProvider) getLogsFromBuffer(latestBlock int64) []ocr2keepers.Up return payloads } +// minimumCommitmentDequeue dequeues the minimum number of logs per upkeep per block window, when available. +func (p *logEventProvider) minimumCommitmentDequeue(latestBlock, start int64) []ocr2keepers.UpkeepPayload { + var payloads []ocr2keepers.UpkeepPayload + + blockRate := int(p.opts.BlockRate) + + for len(payloads) < MaxPayloads && start <= latestBlock { + startWindow, _ := getBlockWindow(start, blockRate) + + // dequeue the minimum number logs (log limit, varies by chain) per upkeep for this block window + logs, remaining := p.bufferV1.Dequeue(startWindow, MaxPayloads-len(payloads), true) + if len(logs) > 0 { + p.lggr.Debugw("minimum commitment dequeue", "start", start, "latestBlock", latestBlock, "logs", len(logs), "remaining", remaining) + } + for _, l := range logs { + payload, err := p.createPayload(l.ID, l.Log) + if err == nil { + payloads = append(payloads, payload) + } + } + + start += int64(blockRate) + } + + return payloads +} + +// bestEffortDequeue dequeues the remaining logs from the buffer, after the minimum number of logs +// have been dequeued for every upkeep in every block window. +func (p *logEventProvider) bestEffortDequeue(latestBlock, start int64, payloads []ocr2keepers.UpkeepPayload) []ocr2keepers.UpkeepPayload { + blockRate := int(p.opts.BlockRate) + + for len(payloads) < MaxPayloads && start <= latestBlock { + startWindow, _ := getBlockWindow(start, blockRate) + + // dequeue as many logs as we can, based on remaining capacity, for this block window + logs, remaining := p.bufferV1.Dequeue(startWindow, MaxPayloads-len(payloads), false) + if len(logs) > 0 { + p.lggr.Debugw("best effort dequeue", "start", start, "latestBlock", latestBlock, "logs", len(logs), "remaining", remaining) + } + for _, l := range logs { + payload, err := p.createPayload(l.ID, l.Log) + if err == nil { + payloads = append(payloads, payload) + } + } + + start += int64(blockRate) + } + + return payloads +} + // ReadLogs fetches the logs for the given upkeeps. func (p *logEventProvider) ReadLogs(pctx context.Context, ids ...*big.Int) error { ctx, cancel := context.WithTimeout(pctx, readLogsTimeout) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider_test.go index c5bc047e8f4..282f89d370c 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider_test.go @@ -8,6 +8,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/ethereum/go-ethereum/common" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -315,6 +317,352 @@ func newEntry(p *logEventProvider, i int, args ...string) (LogTriggerConfig, upk return cfg, f } +func countRemainingLogs(logs map[int64][]logpoller.Log) int { + count := 0 + for _, logList := range logs { + count += len(logList) + } + return count +} + +func remainingBlockWindowCounts(queues map[string]*upkeepLogQueue, blockRate int) map[int64]int { + blockWindowCounts := map[int64]int{} + + for _, q := range queues { + for blockNumber, logs := range q.logs { + start, _ := getBlockWindow(blockNumber, blockRate) + + blockWindowCounts[start] += len(logs) + } + } + + return blockWindowCounts +} + +func TestLogEventProvider_GetLatestPayloads(t *testing.T) { + t.Run("dequeuing from an empty buffer returns 0 logs", func(t *testing.T) { + opts := NewOptions(200, big.NewInt(42161)) + opts.BufferVersion = "v1" + + logPoller := &mockLogPoller{ + LatestBlockFn: func(ctx context.Context) (int64, error) { + return 100, nil + }, + } + + provider := NewLogProvider(logger.TestLogger(t), logPoller, big.NewInt(42161), &mockedPacker{}, nil, opts) + + ctx := context.Background() + + payloads, err := provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + assert.Equal(t, 0, len(payloads)) + }) + + t.Run("a single log for a single upkeep gets dequeued", func(t *testing.T) { + opts := NewOptions(200, big.NewInt(42161)) + opts.BufferVersion = "v1" + + logPoller := &mockLogPoller{ + LatestBlockFn: func(ctx context.Context) (int64, error) { + return 100, nil + }, + } + + provider := NewLogProvider(logger.TestLogger(t), logPoller, big.NewInt(42161), &mockedPacker{}, nil, opts) + + ctx := context.Background() + + buffer := provider.bufferV1 + + buffer.Enqueue(big.NewInt(1), logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0}) + + payloads, err := provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + assert.Equal(t, 1, len(payloads)) + }) + + t.Run("a log per upkeep for 4 upkeeps across 4 blocks (2 separate block windows) is dequeued, for a total of 4 payloads", func(t *testing.T) { + opts := NewOptions(200, big.NewInt(42161)) + opts.BufferVersion = "v1" + + logPoller := &mockLogPoller{ + LatestBlockFn: func(ctx context.Context) (int64, error) { + return 100, nil + }, + } + + provider := NewLogProvider(logger.TestLogger(t), logPoller, big.NewInt(42161), &mockedPacker{}, nil, opts) + + ctx := context.Background() + + buffer := provider.bufferV1 + + buffer.Enqueue(big.NewInt(1), logpoller.Log{BlockNumber: 1, TxHash: common.HexToHash("0x1"), LogIndex: 0}) + buffer.Enqueue(big.NewInt(2), logpoller.Log{BlockNumber: 2, TxHash: common.HexToHash("0x2"), LogIndex: 0}) + buffer.Enqueue(big.NewInt(3), logpoller.Log{BlockNumber: 3, TxHash: common.HexToHash("0x3"), LogIndex: 0}) + buffer.Enqueue(big.NewInt(4), logpoller.Log{BlockNumber: 4, TxHash: common.HexToHash("0x4"), LogIndex: 0}) + + payloads, err := provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + assert.Equal(t, 4, len(payloads)) + }) + + t.Run("100 logs are dequeued for a single upkeep, 1 log for every block window across 100 blocks followed by best effort", func(t *testing.T) { + opts := NewOptions(200, big.NewInt(42161)) + opts.BufferVersion = "v1" + + logPoller := &mockLogPoller{ + LatestBlockFn: func(ctx context.Context) (int64, error) { + return 101, nil + }, + } + + provider := NewLogProvider(logger.TestLogger(t), logPoller, big.NewInt(42161), &mockedPacker{}, nil, opts) + + ctx := context.Background() + + buffer := provider.bufferV1.(*logBuffer) + + for i := 0; i < 100; i++ { + buffer.Enqueue(big.NewInt(1), logpoller.Log{BlockNumber: int64(i + 1), TxHash: common.HexToHash(fmt.Sprintf("0x%d", i+1)), LogIndex: 0}) + } + + assert.Equal(t, 100, countRemainingLogs(buffer.queues["1"].logs)) + + payloads, err := provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + assert.Equal(t, 100, len(payloads)) + + assert.Equal(t, 0, countRemainingLogs(buffer.queues["1"].logs)) + }) + + t.Run("100 logs are dequeued for two upkeeps, 25 logs each as min commitment (50 logs total best effort), followed by best effort", func(t *testing.T) { + opts := NewOptions(200, big.NewInt(42161)) + opts.BufferVersion = "v1" + + logPoller := &mockLogPoller{ + LatestBlockFn: func(ctx context.Context) (int64, error) { + return 101, nil + }, + } + + provider := NewLogProvider(logger.TestLogger(t), logPoller, big.NewInt(42161), &mockedPacker{}, nil, opts) + + ctx := context.Background() + + buffer := provider.bufferV1.(*logBuffer) + + for i := 0; i < 100; i++ { + buffer.Enqueue(big.NewInt(1), logpoller.Log{BlockNumber: int64(i + 1), TxHash: common.HexToHash(fmt.Sprintf("0x1%d", i+1)), LogIndex: 0}) + buffer.Enqueue(big.NewInt(2), logpoller.Log{BlockNumber: int64(i + 1), TxHash: common.HexToHash(fmt.Sprintf("0x2%d", i+1)), LogIndex: 0}) + } + + assert.Equal(t, 100, countRemainingLogs(buffer.queues["1"].logs)) + assert.Equal(t, 100, countRemainingLogs(buffer.queues["2"].logs)) + + payloads, err := provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + assert.Equal(t, 100, len(payloads)) + + assert.Equal(t, 50, countRemainingLogs(buffer.queues["1"].logs)) + assert.Equal(t, 50, countRemainingLogs(buffer.queues["2"].logs)) + + windowCount := remainingBlockWindowCounts(buffer.queues, 4) + + assert.Equal(t, 2, windowCount[0]) + assert.Equal(t, 4, windowCount[48]) + assert.Equal(t, 4, windowCount[96]) + + // the second dequeue call will retrieve the remaining 100 logs and exhaust the queues + payloads, err = provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + assert.Equal(t, 100, len(payloads)) + + assert.Equal(t, 0, countRemainingLogs(buffer.queues["1"].logs)) + assert.Equal(t, 0, countRemainingLogs(buffer.queues["2"].logs)) + + windowCount = remainingBlockWindowCounts(buffer.queues, 4) + + assert.Equal(t, 0, windowCount[0]) + assert.Equal(t, 0, windowCount[48]) + assert.Equal(t, 0, windowCount[96]) + }) + + t.Run("minimum guaranteed for all windows including an incomplete window followed by best effort", func(t *testing.T) { + opts := NewOptions(200, big.NewInt(42161)) + opts.BufferVersion = "v1" + + logPoller := &mockLogPoller{ + LatestBlockFn: func(ctx context.Context) (int64, error) { + return 102, nil + }, + } + + provider := NewLogProvider(logger.TestLogger(t), logPoller, big.NewInt(42161), &mockedPacker{}, nil, opts) + + ctx := context.Background() + + buffer := provider.bufferV1.(*logBuffer) + + for i := 0; i < 102; i++ { + buffer.Enqueue(big.NewInt(1), logpoller.Log{BlockNumber: int64(i + 1), TxHash: common.HexToHash(fmt.Sprintf("0x1%d", i+1)), LogIndex: 0}) + buffer.Enqueue(big.NewInt(2), logpoller.Log{BlockNumber: int64(i + 1), TxHash: common.HexToHash(fmt.Sprintf("0x2%d", i+1)), LogIndex: 0}) + } + + assert.Equal(t, 102, countRemainingLogs(buffer.queues["1"].logs)) + assert.Equal(t, 102, countRemainingLogs(buffer.queues["2"].logs)) + + payloads, err := provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + assert.Equal(t, 100, len(payloads)) + + windowCount := remainingBlockWindowCounts(buffer.queues, 4) + + assert.Equal(t, 6, windowCount[100]) + + payloads, err = provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + assert.Equal(t, 100, len(payloads)) + + // upkeep 1 has had the minimum number of logs dequeued on the latest (incomplete) window + assert.Equal(t, 1, buffer.queues["1"].dequeued[100]) + // upkeep 2 has had the minimum number of logs dequeued on the latest (incomplete) window + assert.Equal(t, 1, buffer.queues["2"].dequeued[100]) + + // the third dequeue call will retrieve the remaining 100 logs and exhaust the queues + payloads, err = provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + assert.Equal(t, 4, len(payloads)) + + assert.Equal(t, 0, countRemainingLogs(buffer.queues["1"].logs)) + assert.Equal(t, 0, countRemainingLogs(buffer.queues["2"].logs)) + + windowCount = remainingBlockWindowCounts(buffer.queues, 4) + + assert.Equal(t, 0, windowCount[0]) + assert.Equal(t, 0, windowCount[28]) + assert.Equal(t, 0, windowCount[32]) + assert.Equal(t, 0, windowCount[36]) + assert.Equal(t, 0, windowCount[48]) + assert.Equal(t, 0, windowCount[96]) + assert.Equal(t, 0, windowCount[100]) + }) + + t.Run("min dequeue followed by best effort followed by reorg followed by best effort", func(t *testing.T) { + opts := NewOptions(200, big.NewInt(42161)) + opts.BufferVersion = "v1" + + logPoller := &mockLogPoller{ + LatestBlockFn: func(ctx context.Context) (int64, error) { + return 101, nil + }, + } + + provider := NewLogProvider(logger.TestLogger(t), logPoller, big.NewInt(42161), &mockedPacker{}, nil, opts) + + ctx := context.Background() + + buffer := provider.bufferV1.(*logBuffer) + + for i := 0; i < 100; i++ { + buffer.Enqueue(big.NewInt(1), logpoller.Log{BlockNumber: int64(i + 1), TxHash: common.HexToHash(fmt.Sprintf("0x1%d", i+1)), LogIndex: 0}) + buffer.Enqueue(big.NewInt(2), logpoller.Log{BlockNumber: int64(i + 1), TxHash: common.HexToHash(fmt.Sprintf("0x2%d", i+1)), LogIndex: 0}) + } + + assert.Equal(t, 100, countRemainingLogs(buffer.queues["1"].logs)) + assert.Equal(t, 100, countRemainingLogs(buffer.queues["2"].logs)) + + payloads, err := provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + assert.Equal(t, 100, len(payloads)) + + windowCount := remainingBlockWindowCounts(buffer.queues, 4) + + assert.Equal(t, 4, windowCount[28]) + + // reorg block 28 + buffer.Enqueue(big.NewInt(1), logpoller.Log{BlockNumber: int64(28), TxHash: common.HexToHash(fmt.Sprintf("0xreorg1%d", 28)), LogIndex: 0, BlockHash: common.BytesToHash([]byte("reorg"))}) + buffer.Enqueue(big.NewInt(2), logpoller.Log{BlockNumber: int64(28), TxHash: common.HexToHash(fmt.Sprintf("0xreorg2%d", 28)), LogIndex: 0, BlockHash: common.BytesToHash([]byte("reorg"))}) + + windowCount = remainingBlockWindowCounts(buffer.queues, 4) + + assert.Equal(t, 6, windowCount[28]) + + // the second dequeue call will retrieve the remaining 100 logs and exhaust the queues + payloads, err = provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + assert.Equal(t, 100, len(payloads)) + + windowCount = remainingBlockWindowCounts(buffer.queues, 4) + + assert.Equal(t, 0, windowCount[0]) + assert.Equal(t, 0, windowCount[28]) + assert.Equal(t, 0, windowCount[32]) + assert.Equal(t, 0, windowCount[36]) + assert.Equal(t, 0, windowCount[48]) + assert.Equal(t, 2, windowCount[96]) // these 2 remaining logs are because of the 2 re orgd logs taking up dequeue space + }) + + t.Run("sparsely populated blocks", func(t *testing.T) { + opts := NewOptions(200, big.NewInt(42161)) + opts.BufferVersion = "v1" + + logPoller := &mockLogPoller{ + LatestBlockFn: func(ctx context.Context) (int64, error) { + return 100, nil + }, + } + + provider := NewLogProvider(logger.TestLogger(t), logPoller, big.NewInt(42161), &mockedPacker{}, nil, opts) + + ctx := context.Background() + + buffer := provider.bufferV1.(*logBuffer) + + upkeepOmittedOnBlocks := map[int64][]int{ + 1: {5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70, 75, 80, 85, 90, 95, 100}, // upkeep 1 won't have logs on 20 blocks + 2: {2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98, 100}, // upkeep 2 won't have logs on 50 blocks + 3: {3, 13, 23, 33, 43, 53, 63, 73, 83, 93}, // upkeep 3 won't appear on 10 blocks + 4: {1, 25, 50, 75, 100}, // upkeep 4 won't appear on 5 blocks + 5: {}, // upkeep 5 appears on all blocks + } + + for upkeep, skipBlocks := range upkeepOmittedOnBlocks { + blockLoop: + for i := 0; i < 100; i++ { + for _, block := range skipBlocks { + if block == i+1 { + continue blockLoop + } + } + buffer.Enqueue(big.NewInt(upkeep), logpoller.Log{BlockNumber: int64(i + 1), TxHash: common.HexToHash(fmt.Sprintf("0x1%d", i+1)), LogIndex: 0}) + } + } + + assert.Equal(t, 80, countRemainingLogs(buffer.queues["1"].logs)) + assert.Equal(t, 50, countRemainingLogs(buffer.queues["2"].logs)) + assert.Equal(t, 90, countRemainingLogs(buffer.queues["3"].logs)) + assert.Equal(t, 95, countRemainingLogs(buffer.queues["4"].logs)) + assert.Equal(t, 100, countRemainingLogs(buffer.queues["5"].logs)) + + // perform two dequeues + payloads, err := provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + assert.Equal(t, 100, len(payloads)) + + payloads, err = provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + assert.Equal(t, 100, len(payloads)) + + assert.Equal(t, 40, countRemainingLogs(buffer.queues["1"].logs)) + assert.Equal(t, 10, countRemainingLogs(buffer.queues["2"].logs)) + assert.Equal(t, 50, countRemainingLogs(buffer.queues["3"].logs)) + assert.Equal(t, 55, countRemainingLogs(buffer.queues["4"].logs)) + assert.Equal(t, 60, countRemainingLogs(buffer.queues["5"].logs)) + }) +} + type mockedPacker struct { }