Skip to content

Commit

Permalink
BCI-3492 [LogPoller]: Allow withObservedExecAndRowsAffected to report…
Browse files Browse the repository at this point in the history
… non-zero rows affected (#14057)

* Fix withObservedExecAndRowsAffected

Also:
- Change behavior of DeleteExpiredLogs to delete logs which don't match any filter
- Add a test case to ensure the dataset size is published properly during pruning

* pnpm changeset

* changeset #fix -> #bugfix
  • Loading branch information
reductionista authored Aug 8, 2024
1 parent 98fc881 commit e0850a6
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 24 deletions.
5 changes: 5 additions & 0 deletions .changeset/sweet-pumas-refuse.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#bugfix Addresses 2 minor issues with the pruning of LogPoller's db tables: logs not matching any filter will now be pruned, and rows deleted are now properly reported for observability
2 changes: 1 addition & 1 deletion core/chains/evm/logpoller/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func withObservedExecAndRowsAffected(o *ObservedORM, queryName string, queryType
WithLabelValues(o.chainId, queryName, string(queryType)).
Observe(float64(time.Since(queryStarted)))

if err != nil {
if err == nil {
o.datasetSize.
WithLabelValues(o.chainId, queryName, string(queryType)).
Set(float64(rowsAffected))
Expand Down
11 changes: 11 additions & 0 deletions core/chains/evm/logpoller/observability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/prometheus/client_golang/prometheus/testutil"

"github.com/smartcontractkit/chainlink-common/pkg/logger"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils"
ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
Expand Down Expand Up @@ -117,6 +118,16 @@ func TestCountersAreProperlyPopulatedForWrites(t *testing.T) {
assert.Equal(t, float64(20), testutil.ToFloat64(orm.logsInserted.WithLabelValues("420")))
assert.Equal(t, float64(2), testutil.ToFloat64(orm.blocksInserted.WithLabelValues("420")))

rowsAffected, err := orm.DeleteExpiredLogs(ctx, 3)
require.NoError(t, err)
require.Equal(t, int64(3), rowsAffected)
assert.Equal(t, 3, counterFromGaugeByLabels(orm.datasetSize, "420", "DeleteExpiredLogs", "delete"))

rowsAffected, err = orm.DeleteBlocksBefore(ctx, 30, 0)
require.NoError(t, err)
require.Equal(t, int64(2), rowsAffected)
assert.Equal(t, 2, counterFromGaugeByLabels(orm.datasetSize, "420", "DeleteBlocksBefore", "delete"))

// Don't update counters in case of an error
require.Error(t, orm.InsertLogsWithBlock(ctx, logs, NewLogPollerBlock(utils.RandomBytes32(), 0, time.Now(), 0)))
assert.Equal(t, float64(20), testutil.ToFloat64(orm.logsInserted.WithLabelValues("420")))
Expand Down
30 changes: 13 additions & 17 deletions core/chains/evm/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/types/query"

"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"

evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
Expand Down Expand Up @@ -313,34 +314,29 @@ type Exp struct {
ShouldDelete bool
}

// DeleteExpiredLogs removes any logs which either:
// - don't match any currently registered filters, or
// - have a timestamp older than any matching filter's retention, UNLESS there is at
// least one matching filter with retention=0
func (o *DSORM) DeleteExpiredLogs(ctx context.Context, limit int64) (int64, error) {
var err error
var result sql.Result
if limit > 0 {
result, err = o.ds.ExecContext(ctx, `
DELETE FROM evm.logs
query := `DELETE FROM evm.logs
WHERE (evm_chain_id, address, event_sig, block_number) IN (
SELECT l.evm_chain_id, l.address, l.event_sig, l.block_number
FROM evm.logs l
INNER JOIN (
SELECT address, event, MAX(retention) AS retention
LEFT JOIN (
SELECT address, event, CASE WHEN MIN(retention) = 0 THEN 0 ELSE MAX(retention) END AS retention
FROM evm.log_poller_filters
WHERE evm_chain_id = $1
GROUP BY evm_chain_id, address, event
HAVING NOT 0 = ANY(ARRAY_AGG(retention))
) r ON l.evm_chain_id = $1 AND l.address = r.address AND l.event_sig = r.event
AND l.block_timestamp <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second')
LIMIT $2
)`, ubig.New(o.chainID), limit)
WHERE r.retention IS NULL OR (r.retention != 0 AND l.block_timestamp <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second')) %s)`

if limit > 0 {
result, err = o.ds.ExecContext(ctx, fmt.Sprintf(query, "LIMIT $2"), ubig.New(o.chainID), limit)
} else {
result, err = o.ds.ExecContext(ctx, `WITH r AS
( SELECT address, event, MAX(retention) AS retention
FROM evm.log_poller_filters WHERE evm_chain_id=$1
GROUP BY evm_chain_id,address, event HAVING NOT 0 = ANY(ARRAY_AGG(retention))
) DELETE FROM evm.logs l USING r
WHERE l.evm_chain_id = $1 AND l.address=r.address AND l.event_sig=r.event
AND l.block_timestamp <= STATEMENT_TIMESTAMP() - (r.retention / 10^9 * interval '1 second')`, // retention is in nanoseconds (time.Duration aka BIGINT)
ubig.New(o.chainID))
result, err = o.ds.ExecContext(ctx, fmt.Sprintf(query, ""), ubig.New(o.chainID))
}

if err != nil {
Expand Down
13 changes: 7 additions & 6 deletions core/chains/evm/logpoller/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,20 +458,21 @@ func TestORM(t *testing.T) {
time.Sleep(2 * time.Millisecond) // just in case we haven't reached the end of the 1ms retention period
deleted, err := o1.DeleteExpiredLogs(ctx, 0)
require.NoError(t, err)
assert.Equal(t, int64(1), deleted)
assert.Equal(t, int64(4), deleted)

logs, err = o1.SelectLogsByBlockRange(ctx, 1, latest.BlockNumber)
require.NoError(t, err)
// The only log which should be deleted is the one which matches filter1 (ret=1ms) but not filter12 (ret=1 hour)
// Importantly, it shouldn't delete any logs matching only filter0 (ret=0 meaning permanent retention). Anything
// matching filter12 should be kept regardless of what other filters it matches.
assert.Len(t, logs, 7)
// It should have retained the log matching filter0 (due to ret=0 meaning permanent retention) as well as all
// 3 logs matching filter12 (ret=1 hour). It should have deleted 3 logs not matching any filter, as well as 1
// of the 2 logs matching filter1 (ret=1ms)--the one that doesn't also match filter12.
assert.Len(t, logs, 4)

// Delete logs after should delete all logs.
err = o1.DeleteLogsAndBlocksAfter(ctx, 1)
require.NoError(t, err)
logs, err = o1.SelectLogsByBlockRange(ctx, 1, latest.BlockNumber)
require.NoError(t, err)
require.Zero(t, len(logs))
assert.Zero(t, len(logs))
}

type PgxLogger struct {
Expand Down

0 comments on commit e0850a6

Please sign in to comment.