Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/services/ocr2/plugins/ocr2keeper/evmregister/v21/upkeepstate: use sqlutil instead of pg.QOpts #12806

Merged
merged 1 commit into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/poor-socks-travel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

core/services/ocr2/plugins/ocr2keeper/evmregister/v21/upkeepstate: use sqlutil instead of pg.QOpts #internal
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
package upkeepstate

import (
"context"
"math/big"
"time"

"github.com/jmoiron/sqlx"
"github.com/lib/pq"

"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
)

type orm struct {
chainID *ubig.Big
q pg.Q
ds sqlutil.DataSource
}

type persistedStateRecord struct {
Expand All @@ -27,17 +26,15 @@ type persistedStateRecord struct {
}

// NewORM creates an ORM scoped to chainID.
func NewORM(chainID *big.Int, db *sqlx.DB, lggr logger.Logger, cfg pg.QConfig) *orm {
func NewORM(chainID *big.Int, ds sqlutil.DataSource) *orm {
return &orm{
chainID: ubig.New(chainID),
q: pg.NewQ(db, lggr.Named("ORM"), cfg),
ds: ds,
}
}

// BatchInsertRecords is idempotent and sets upkeep state values in db
func (o *orm) BatchInsertRecords(state []persistedStateRecord, qopts ...pg.QOpt) error {
q := o.q.WithOpts(qopts...)

func (o *orm) BatchInsertRecords(ctx context.Context, state []persistedStateRecord) error {
if len(state) == 0 {
return nil
}
Expand Down Expand Up @@ -65,17 +62,16 @@ func (o *orm) BatchInsertRecords(state []persistedStateRecord, qopts ...pg.QOpt)
})
}

return q.ExecQNamed(`INSERT INTO evm.upkeep_states
_, err := o.ds.NamedExecContext(ctx, `INSERT INTO evm.upkeep_states
(evm_chain_id, work_id, completion_state, block_number, inserted_at, upkeep_id, ineligibility_reason) VALUES
(:evm_chain_id, :work_id, :completion_state, :block_number, :inserted_at, :upkeep_id, :ineligibility_reason) ON CONFLICT (evm_chain_id, work_id) DO NOTHING`, rows)
return err
}

// SelectStatesByWorkIDs searches the data store for stored states for the
// provided work ids and configured chain id
func (o *orm) SelectStatesByWorkIDs(workIDs []string, qopts ...pg.QOpt) (states []persistedStateRecord, err error) {
q := o.q.WithOpts(qopts...)

err = q.Select(&states, `SELECT upkeep_id, work_id, completion_state, block_number, ineligibility_reason, inserted_at
func (o *orm) SelectStatesByWorkIDs(ctx context.Context, workIDs []string) (states []persistedStateRecord, err error) {
err = o.ds.SelectContext(ctx, &states, `SELECT upkeep_id, work_id, completion_state, block_number, ineligibility_reason, inserted_at
FROM evm.upkeep_states
WHERE work_id = ANY($1) AND evm_chain_id = $2::NUMERIC`, pq.Array(workIDs), o.chainID)

Expand All @@ -87,9 +83,8 @@ func (o *orm) SelectStatesByWorkIDs(workIDs []string, qopts ...pg.QOpt) (states
}

// DeleteExpired prunes stored states older than to the provided time
func (o *orm) DeleteExpired(expired time.Time, qopts ...pg.QOpt) error {
q := o.q.WithOpts(qopts...)
_, err := q.Exec(`DELETE FROM evm.upkeep_states WHERE inserted_at <= $1 AND evm_chain_id::NUMERIC = $2`, expired, o.chainID)
func (o *orm) DeleteExpired(ctx context.Context, expired time.Time) error {
_, err := o.ds.ExecContext(ctx, `DELETE FROM evm.upkeep_states WHERE inserted_at <= $1 AND evm_chain_id::NUMERIC = $2`, expired, o.chainID)

return err
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,17 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"

ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
)

func TestInsertSelectDelete(t *testing.T) {
lggr, _ := logger.TestLoggerObserved(t, zapcore.ErrorLevel)
ctx := testutils.Context(t)
chainID := testutils.FixtureChainID
db := pgtest.NewSqlxDB(t)
orm := NewORM(chainID, db, lggr, pgtest.NewQConfig(true))
orm := NewORM(chainID, db)

inserted := []persistedStateRecord{
{
Expand All @@ -32,20 +30,20 @@ func TestInsertSelectDelete(t *testing.T) {
},
}

err := orm.BatchInsertRecords(inserted)
err := orm.BatchInsertRecords(ctx, inserted)

require.NoError(t, err, "no error expected from insert")

states, err := orm.SelectStatesByWorkIDs([]string{"0x1"})
states, err := orm.SelectStatesByWorkIDs(ctx, []string{"0x1"})

require.NoError(t, err, "no error expected from select")
require.Len(t, states, 1, "records return should equal records inserted")

err = orm.DeleteExpired(time.Now())
err = orm.DeleteExpired(ctx, time.Now())

assert.NoError(t, err, "no error expected from delete")

states, err = orm.SelectStatesByWorkIDs([]string{"0x1"})
states, err = orm.SelectStatesByWorkIDs(ctx, []string{"0x1"})

require.NoError(t, err, "no error expected from select")
require.Len(t, states, 0, "records return should be empty since records were deleted")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import (
"sync"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
ocr2keepers "github.com/smartcontractkit/chainlink-common/pkg/types/automation"

"github.com/smartcontractkit/chainlink-common/pkg/services"

ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/core"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

Expand All @@ -31,9 +31,9 @@ const (
)

type ORM interface {
BatchInsertRecords([]persistedStateRecord, ...pg.QOpt) error
SelectStatesByWorkIDs([]string, ...pg.QOpt) ([]persistedStateRecord, error)
DeleteExpired(time.Time, ...pg.QOpt) error
BatchInsertRecords(context.Context, []persistedStateRecord) error
SelectStatesByWorkIDs(context.Context, []string) ([]persistedStateRecord, error)
DeleteExpired(context.Context, time.Time) error
}

// UpkeepStateStore is the interface for managing upkeeps final state in a local store.
Expand Down Expand Up @@ -152,7 +152,7 @@ func (u *upkeepStateStore) flush(ctx context.Context) {
u.sem <- struct{}{}

go func() {
if err := u.orm.BatchInsertRecords(batch, pg.WithParentCtx(ctx)); err != nil {
if err := u.orm.BatchInsertRecords(ctx, batch); err != nil {
u.lggr.Errorw("error inserting records", "err", err)
}
<-u.sem
Expand Down Expand Up @@ -268,7 +268,7 @@ func (u *upkeepStateStore) fetchPerformed(ctx context.Context, workIDs ...string
// fetchFromDB fetches all upkeeps indicated as ineligible from the db to
// populate the cache.
func (u *upkeepStateStore) fetchFromDB(ctx context.Context, workIDs ...string) error {
states, err := u.orm.SelectStatesByWorkIDs(workIDs, pg.WithParentCtx(ctx))
states, err := u.orm.SelectStatesByWorkIDs(ctx, workIDs)
if err != nil {
return err
}
Expand Down Expand Up @@ -320,7 +320,9 @@ func (u *upkeepStateStore) cleanup(ctx context.Context) error {
func (u *upkeepStateStore) cleanDB(ctx context.Context) error {
tm := time.Now().Add(-1 * u.retention)

return u.orm.DeleteExpired(tm, pg.WithParentCtx(ctx), pg.WithLongQueryTimeout())
ctx, cancel := context.WithTimeout(sqlutil.WithoutDefaultTimeout(ctx), time.Minute)
Copy link
Collaborator

@dimriou dimriou Apr 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it ok to remove the timeout completely?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How come? I think these are still necessary to replace the default timeout which may be too short.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering if it's ok to never timeout, compared to what we have now. I don't have a strong preference, just noticed the difference.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is preserving the original "long" timeout of 1m.

defer cancel()
return u.orm.DeleteExpired(ctx, tm)
}

// cleanupCache removes any records from the cache that are older than the TTL.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
)

func TestUpkeepStateStore(t *testing.T) {
Expand Down Expand Up @@ -329,20 +328,16 @@ func TestUpkeepStateStore_SetSelectIntegration(t *testing.T) {
lggr, observedLogs := logger.TestLoggerObserved(t, zapcore.ErrorLevel)
chainID := testutils.FixtureChainID
db := pgtest.NewSqlxDB(t)
realORM := NewORM(chainID, db, lggr, pgtest.NewQConfig(true))
realORM := NewORM(chainID, db)
insertFinished := make(chan struct{}, 1)
orm := &wrappedORM{
BatchInsertRecordsFn: func(records []persistedStateRecord, opt ...pg.QOpt) error {
err := realORM.BatchInsertRecords(records, opt...)
BatchInsertRecordsFn: func(ctx context.Context, records []persistedStateRecord) error {
err := realORM.BatchInsertRecords(ctx, records)
insertFinished <- struct{}{}
return err
},
SelectStatesByWorkIDsFn: func(strings []string, opt ...pg.QOpt) ([]persistedStateRecord, error) {
return realORM.SelectStatesByWorkIDs(strings, opt...)
},
DeleteExpiredFn: func(t time.Time, opt ...pg.QOpt) error {
return realORM.DeleteExpired(t, opt...)
},
SelectStatesByWorkIDsFn: realORM.SelectStatesByWorkIDs,
DeleteExpiredFn: realORM.DeleteExpired,
}
scanner := &mockScanner{}
store := NewUpkeepStateStore(orm, lggr, scanner)
Expand Down Expand Up @@ -389,20 +384,16 @@ func TestUpkeepStateStore_emptyDB(t *testing.T) {
lggr, observedLogs := logger.TestLoggerObserved(t, zapcore.ErrorLevel)
chainID := testutils.FixtureChainID
db := pgtest.NewSqlxDB(t)
realORM := NewORM(chainID, db, lggr, pgtest.NewQConfig(true))
realORM := NewORM(chainID, db)
insertFinished := make(chan struct{}, 1)
orm := &wrappedORM{
BatchInsertRecordsFn: func(records []persistedStateRecord, opt ...pg.QOpt) error {
err := realORM.BatchInsertRecords(records, opt...)
BatchInsertRecordsFn: func(ctx context.Context, records []persistedStateRecord) error {
err := realORM.BatchInsertRecords(ctx, records)
insertFinished <- struct{}{}
return err
},
SelectStatesByWorkIDsFn: func(strings []string, opt ...pg.QOpt) ([]persistedStateRecord, error) {
return realORM.SelectStatesByWorkIDs(strings, opt...)
},
DeleteExpiredFn: func(t time.Time, opt ...pg.QOpt) error {
return realORM.DeleteExpired(t, opt...)
},
SelectStatesByWorkIDsFn: realORM.SelectStatesByWorkIDs,
DeleteExpiredFn: realORM.DeleteExpired,
}
scanner := &mockScanner{}
store := NewUpkeepStateStore(orm, lggr, scanner)
Expand All @@ -427,7 +418,7 @@ func TestUpkeepStateStore_Upsert(t *testing.T) {
ctx := testutils.Context(t)
lggr := logger.TestLogger(t)
chainID := testutils.FixtureChainID
orm := NewORM(chainID, db, lggr, pgtest.NewQConfig(true))
orm := NewORM(chainID, db)

store := NewUpkeepStateStore(orm, lggr, &mockScanner{})

Expand Down Expand Up @@ -560,11 +551,11 @@ func (_m *mockORM) setErr(err error) {
_m.err = err
}

func (_m *mockORM) BatchInsertRecords(state []persistedStateRecord, opts ...pg.QOpt) error {
func (_m *mockORM) BatchInsertRecords(ctx context.Context, state []persistedStateRecord) error {
return nil
}

func (_m *mockORM) SelectStatesByWorkIDs(workIDs []string, opts ...pg.QOpt) ([]persistedStateRecord, error) {
func (_m *mockORM) SelectStatesByWorkIDs(ctx context.Context, workIDs []string) ([]persistedStateRecord, error) {
_m.lock.Lock()
defer _m.lock.Unlock()

Expand All @@ -574,7 +565,7 @@ func (_m *mockORM) SelectStatesByWorkIDs(workIDs []string, opts ...pg.QOpt) ([]p
return res, _m.err
}

func (_m *mockORM) DeleteExpired(tm time.Time, opts ...pg.QOpt) error {
func (_m *mockORM) DeleteExpired(ctx context.Context, tm time.Time) error {
_m.lock.Lock()
defer _m.lock.Unlock()

Expand All @@ -585,19 +576,19 @@ func (_m *mockORM) DeleteExpired(tm time.Time, opts ...pg.QOpt) error {
}

type wrappedORM struct {
BatchInsertRecordsFn func([]persistedStateRecord, ...pg.QOpt) error
SelectStatesByWorkIDsFn func([]string, ...pg.QOpt) ([]persistedStateRecord, error)
DeleteExpiredFn func(time.Time, ...pg.QOpt) error
BatchInsertRecordsFn func(context.Context, []persistedStateRecord) error
SelectStatesByWorkIDsFn func(context.Context, []string) ([]persistedStateRecord, error)
DeleteExpiredFn func(context.Context, time.Time) error
}

func (o *wrappedORM) BatchInsertRecords(r []persistedStateRecord, q ...pg.QOpt) error {
return o.BatchInsertRecordsFn(r, q...)
func (o *wrappedORM) BatchInsertRecords(ctx context.Context, r []persistedStateRecord) error {
return o.BatchInsertRecordsFn(ctx, r)
}

func (o *wrappedORM) SelectStatesByWorkIDs(ids []string, q ...pg.QOpt) ([]persistedStateRecord, error) {
return o.SelectStatesByWorkIDsFn(ids, q...)
func (o *wrappedORM) SelectStatesByWorkIDs(ctx context.Context, ids []string) ([]persistedStateRecord, error) {
return o.SelectStatesByWorkIDsFn(ctx, ids)
}

func (o *wrappedORM) DeleteExpired(t time.Time, q ...pg.QOpt) error {
return o.DeleteExpiredFn(t, q...)
func (o *wrappedORM) DeleteExpired(ctx context.Context, t time.Time) error {
return o.DeleteExpiredFn(ctx, t)
}
10 changes: 5 additions & 5 deletions core/services/relay/evm/ocr2keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ import (
"fmt"

"github.com/ethereum/go-ethereum/common"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"

"github.com/smartcontractkit/libocr/offchainreporting2plus/chains/evmutil"
"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"
ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"

"github.com/smartcontractkit/chainlink-automation/pkg/v3/plugin"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
commontypes "github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink-common/pkg/types/automation"

Expand Down Expand Up @@ -65,17 +65,17 @@ type OCR2KeeperRelayer interface {

// ocr2keeperRelayer is the relayer with added DKG and OCR2Keeper provider functions.
type ocr2keeperRelayer struct {
db *sqlx.DB
ds sqlutil.DataSource
chain legacyevm.Chain
lggr logger.Logger
ethKeystore keystore.Eth
dbCfg pg.QConfig
}

// NewOCR2KeeperRelayer is the constructor of ocr2keeperRelayer
func NewOCR2KeeperRelayer(db *sqlx.DB, chain legacyevm.Chain, lggr logger.Logger, ethKeystore keystore.Eth, dbCfg pg.QConfig) OCR2KeeperRelayer {
func NewOCR2KeeperRelayer(ds sqlutil.DataSource, chain legacyevm.Chain, lggr logger.Logger, ethKeystore keystore.Eth, dbCfg pg.QConfig) OCR2KeeperRelayer {
return &ocr2keeperRelayer{
db: db,
ds: ds,
chain: chain,
lggr: lggr,
ethKeystore: ethKeystore,
Expand Down Expand Up @@ -126,7 +126,7 @@ func (r *ocr2keeperRelayer) NewOCR2KeeperProvider(rargs commontypes.RelayArgs, p

finalityDepth := client.Config().EVM().FinalityDepth()

orm := upkeepstate.NewORM(client.ID(), r.db, r.lggr, r.dbCfg)
orm := upkeepstate.NewORM(client.ID(), r.ds)
scanner := upkeepstate.NewPerformedEventsScanner(r.lggr, client.LogPoller(), addr, finalityDepth)
services.upkeepStateStore = upkeepstate.NewUpkeepStateStore(orm, r.lggr, scanner)

Expand Down
Loading