Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BCF-3052 - Job Based KV Store and juelsFeePerCoin reboot persistence #12392

Merged
merged 22 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/lemon-ladybugs-doubt.md
ilija42 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Add kv store tied to jobs and use it for juels fee per coin cache to store persisted values for backup
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we phrase this in a way that node ops will understand? e.g. by referencing the relevant job spec field

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Job spec changes are already refferenced in previous changeset when the cache was added

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I even need to add changesets that aren't useful for NOPs?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, according to the new process every change needs a changeset. Releng will filter out NOP relevant changes to add to the changelog.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We now almost always require changeset because of new CI

69 changes: 69 additions & 0 deletions core/services/job/kv_orm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package job

import (
"encoding/json"
"fmt"
"time"

"github.com/jmoiron/sqlx"
"github.com/jmoiron/sqlx/types"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
)

// KVStore is a simple KV store that can store and retrieve serializable data.
//
//go:generate mockery --quiet --name KVStore --output ./mocks/ --case=underscore
type KVStore interface {
Store(key string, val interface{}) error
Get(key string, dest interface{}) error
}

type kVStore struct {
jobID int32
q pg.Q
lggr logger.SugaredLogger
}

var _ KVStore = (*kVStore)(nil)

func NewKVStore(jobID int32, db *sqlx.DB, cfg pg.QConfig, lggr logger.Logger) kVStore {
namedLogger := logger.Sugared(lggr.Named("JobORM"))
return kVStore{
jobID: jobID,
q: pg.NewQ(db, namedLogger, cfg),
lggr: namedLogger,
}
}

// Store saves serializable value by key.
func (kv kVStore) Store(key string, val interface{}) error {
jsonVal, err := json.Marshal(val)
if err != nil {
return err
}

sql := `INSERT INTO job_kv_store (id, key, val)
ilija42 marked this conversation as resolved.
Show resolved Hide resolved
VALUES ($1, $2, $3)
ON CONFLICT (id, key) DO UPDATE SET
val = EXCLUDED.val,
updated_at = $4
RETURNING id;`
ilija42 marked this conversation as resolved.
Show resolved Hide resolved

if err = kv.q.ExecQ(sql, kv.jobID, key, types.JSONText(jsonVal), time.Now()); err != nil {
return fmt.Errorf("failed to store value: %s for key: %s for jobID: %d : %w", string(jsonVal), key, kv.jobID, err)
}
return nil
}

// Get retrieves serializable value by key.
func (kv kVStore) Get(key string, dest interface{}) error {
var ret json.RawMessage
sql := "SELECT val FROM job_kv_store WHERE id = $1 AND key = $2"
if err := kv.q.Get(&ret, sql, kv.jobID, key); err != nil {
return fmt.Errorf("failed to get value by key: %s for jobID: %d : %w", key, kv.jobID, err)
}

return json.Unmarshal(ret, dest)
}
54 changes: 54 additions & 0 deletions core/services/job/kv_orm_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package job_test

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/bridges"
"github.com/smartcontractkit/chainlink/v2/core/internal/cltest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/directrequest"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
"github.com/smartcontractkit/chainlink/v2/core/testdata/testspecs"
)

func TestJobKVStore(t *testing.T) {
config := configtest.NewTestGeneralConfig(t)
db := pgtest.NewSqlxDB(t)

lggr := logger.TestLogger(t)

pipelineORM := pipeline.NewORM(db, logger.TestLogger(t), config.Database(), config.JobPipeline().MaxSuccessfulRuns())
bridgesORM := bridges.NewORM(db, logger.TestLogger(t), config.Database())

jobID := int32(1337)
kvStore := job.NewKVStore(jobID, db, config.Database(), lggr)
jobORM := NewTestORM(t, db, pipelineORM, bridgesORM, cltest.NewKeyStore(t, db, config.Database()), config.Database())

jb, err := directrequest.ValidatedDirectRequestSpec(testspecs.GetDirectRequestSpec())
require.NoError(t, err)
jb.ID = jobID
require.NoError(t, jobORM.CreateJob(&jb))

key := "test_key"

type testData struct {
Test string `json:"test"`
}

td1 := testData{Test: "value1"}
td2 := testData{Test: "value2"}

var retData testData
require.NoError(t, kvStore.Store(key, td1))
require.NoError(t, kvStore.Get(key, &retData))
require.Equal(t, td1, retData)

require.NoError(t, kvStore.Store(key, td2))
require.NoError(t, kvStore.Get(key, &retData))
require.Equal(t, td2, retData)
ilija42 marked this conversation as resolved.
Show resolved Hide resolved
}
60 changes: 60 additions & 0 deletions core/services/job/mocks/kv_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,8 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, jb job.Job) ([]job.Servi
}
lggr := logger.Sugared(d.lggr.Named(jb.ExternalJobID.String()).With(lggrCtx.Args()...))

kvStore := job.NewKVStore(jb.ID, d.db, d.cfg.Database(), lggr)

rid, err := spec.RelayID()
if err != nil {
return nil, ErrJobSpecNoRelayer{Err: err, PluginName: string(spec.PluginType)}
Expand Down Expand Up @@ -448,7 +450,7 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, jb job.Job) ([]job.Servi
return d.newServicesLLO(ctx, lggr, jb, bootstrapPeers, kb, ocrDB, lc, ocrLogger)

case types.Median:
return d.newServicesMedian(ctx, lggr, jb, bootstrapPeers, kb, ocrDB, lc, ocrLogger)
return d.newServicesMedian(ctx, lggr, jb, bootstrapPeers, kb, kvStore, ocrDB, lc, ocrLogger)

case types.DKG:
return d.newServicesDKG(lggr, jb, bootstrapPeers, kb, ocrDB, lc, ocrLogger)
Expand Down Expand Up @@ -927,6 +929,7 @@ func (d *Delegate) newServicesMedian(
jb job.Job,
bootstrapPeers []commontypes.BootstrapperLocator,
kb ocr2key.KeyBundle,
kvStore job.KVStore,
ocrDB *db,
lc ocrtypes.LocalConfig,
ocrLogger commontypes.Logger,
Expand Down Expand Up @@ -962,7 +965,7 @@ func (d *Delegate) newServicesMedian(
return nil, ErrRelayNotEnabled{Err: err, PluginName: "median", Relay: spec.Relay}
}

medianServices, err2 := median.NewMedianServices(ctx, jb, d.isNewlyCreatedJob, relayer, d.pipelineRunner, lggr, oracleArgsNoPlugin, mConfig, enhancedTelemChan, errorLog)
medianServices, err2 := median.NewMedianServices(ctx, jb, d.isNewlyCreatedJob, relayer, kvStore, d.pipelineRunner, lggr, oracleArgsNoPlugin, mConfig, enhancedTelemChan, errorLog)

if ocrcommon.ShouldCollectEnhancedTelemetry(&jb) {
enhancedTelemService := ocrcommon.NewEnhancedTelemetryService(&jb, enhancedTelemChan, make(chan struct{}), d.monitoringEndpointGen.GenMonitoringEndpoint(rid.Network, rid.ChainID, spec.ContractID, synchronization.EnhancedEA), lggr.Named("EnhancedTelemetry"))
Expand Down
3 changes: 2 additions & 1 deletion core/services/ocr2/plugins/median/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func NewMedianServices(ctx context.Context,
jb job.Job,
isNewlyCreatedJob bool,
relayer loop.Relayer,
kvStore job.KVStore,
pipelineRunner pipeline.Runner,
lggr logger.Logger,
argsNoPlugin libocr.OCR2OracleArgs,
Expand Down Expand Up @@ -128,7 +129,7 @@ func NewMedianServices(ctx context.Context,

if !pluginConfig.JuelsPerFeeCoinCacheDisabled {
lggr.Infof("juelsPerFeeCoin data source caching is enabled")
if juelsPerFeeCoinSource, err = ocrcommon.NewInMemoryDataSourceCache(juelsPerFeeCoinSource, pluginConfig.JuelsPerFeeCoinCacheDuration.Duration()); err != nil {
if juelsPerFeeCoinSource, err = ocrcommon.NewInMemoryDataSourceCache(juelsPerFeeCoinSource, kvStore, pluginConfig.JuelsPerFeeCoinCacheDuration.Duration()); err != nil {
return nil, err
}
}
Expand Down
60 changes: 46 additions & 14 deletions core/services/ocrcommon/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
ocr2types "github.com/smartcontractkit/libocr/offchainreporting2plus/types"

"github.com/smartcontractkit/chainlink/v2/core/bridges"
serializablebig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
Expand Down Expand Up @@ -99,20 +100,22 @@ func NewInMemoryDataSource(pr pipeline.Runner, jb job.Job, spec pipeline.Spec, l
}
}

const defaultInMemoryCacheDuration = time.Minute * 5
const defaultCacheFreshness = time.Minute * 5
const dataSourceCacheKey = "dscache"

func NewInMemoryDataSourceCache(ds median.DataSource, cacheExpiryDuration time.Duration) (median.DataSource, error) {
func NewInMemoryDataSourceCache(ds median.DataSource, kvStore job.KVStore, cacheFreshness time.Duration) (median.DataSource, error) {
inMemoryDS, ok := ds.(*inMemoryDataSource)
if !ok {
return nil, errors.Errorf("unsupported data source type: %T, only inMemoryDataSource supported", ds)
}

if cacheExpiryDuration == 0 {
cacheExpiryDuration = defaultInMemoryCacheDuration
if cacheFreshness == 0 {
cacheFreshness = defaultCacheFreshness
}

dsCache := &inMemoryDataSourceCache{
cacheExpiration: cacheExpiryDuration,
kvStore: kvStore,
cacheFreshness: cacheFreshness,
inMemoryDataSource: inMemoryDS,
}
go func() { dsCache.updater() }()
Expand Down Expand Up @@ -217,20 +220,23 @@ func (ds *inMemoryDataSource) Observe(ctx context.Context, timestamp ocr2types.R
// If cache update is overdue Observe defaults to standard inMemoryDataSource behaviour.
type inMemoryDataSourceCache struct {
*inMemoryDataSource
cacheExpiration time.Duration
// cacheFreshness indicates duration between cache updates.
// Even if updates fail, previous values are returned.
cacheFreshness time.Duration
mu sync.RWMutex
latestUpdateErr error
latestTrrs pipeline.TaskRunResults
latestResult pipeline.FinalResult
kvStore job.KVStore
}

// updater periodically updates data source cache.
func (ds *inMemoryDataSourceCache) updater() {
ticker := time.NewTicker(ds.cacheExpiration)
ticker := time.NewTicker(ds.cacheFreshness)
for ; true; <-ticker.C {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
if err := ds.updateCache(ctx); err != nil {
ds.lggr.Infow("failed to update cache", "err", err)
ds.lggr.Warnf("failed to update cache", "err", err)
}
cancel()
}
Expand All @@ -239,15 +245,35 @@ func (ds *inMemoryDataSourceCache) updater() {
func (ds *inMemoryDataSourceCache) updateCache(ctx context.Context) error {
ds.mu.Lock()
defer ds.mu.Unlock()
_, ds.latestTrrs, ds.latestUpdateErr = ds.executeRun(ctx)
if ds.latestUpdateErr != nil {
return errors.Wrapf(ds.latestUpdateErr, "error executing run for spec ID %v", ds.spec.ID)
} else if ds.latestTrrs.FinalResult(ds.lggr).HasErrors() {
ds.latestUpdateErr = errjoin.Join(ds.latestTrrs.FinalResult(ds.lggr).AllErrors...)

// check for any errors
_, latestTrrs, latestUpdateErr := ds.executeRun(ctx)
if latestTrrs.FinalResult(ds.lggr).HasErrors() {
latestUpdateErr = errjoin.Join(append(latestTrrs.FinalResult(ds.lggr).AllErrors, latestUpdateErr)...)
}

if latestUpdateErr != nil {
previousUpdateErr := ds.latestUpdateErr
ds.latestUpdateErr = latestUpdateErr
// raise log severity
if previousUpdateErr != nil {
ds.lggr.Errorf("consecutive cache updates errored: previous err: %w new err: %w", previousUpdateErr, ds.latestUpdateErr)
}
return errors.Wrapf(ds.latestUpdateErr, "error executing run for spec ID %v", ds.spec.ID)
}

ds.latestTrrs = latestTrrs
ds.latestResult = ds.latestTrrs.FinalResult(ds.lggr)
value, err := ds.inMemoryDataSource.parse(ds.latestResult)
if err != nil {
return errors.Wrapf(err, "invalid result")
}

// backup in case data source fails continuously and node gets rebooted
if err = ds.kvStore.Store(dataSourceCacheKey, serializablebig.New(value)); err != nil {
ds.lggr.Errorf("failed to persist latest task run value", err)
}

return nil
}

Expand All @@ -261,7 +287,7 @@ func (ds *inMemoryDataSourceCache) get(ctx context.Context) (pipeline.FinalResul
ds.mu.RUnlock()

if err := ds.updateCache(ctx); err != nil {
ds.lggr.Errorw("failed to update cache, returning stale result now", "err", err)
ds.lggr.Warnf("failed to update cache, returning stale result now", "err", err)
}

ds.mu.RLock()
Expand All @@ -270,7 +296,13 @@ func (ds *inMemoryDataSourceCache) get(ctx context.Context) (pipeline.FinalResul
}

func (ds *inMemoryDataSourceCache) Observe(ctx context.Context, timestamp ocr2types.ReportTimestamp) (*big.Int, error) {
var val serializablebig.Big
latestResult, latestTrrs := ds.get(ctx)
if latestTrrs == nil {
ds.lggr.Errorf("cache is empty, returning persisted value now")
ilija42 marked this conversation as resolved.
Show resolved Hide resolved
return val.ToInt(), ds.kvStore.Get(dataSourceCacheKey, &val)
}

setEATelemetry(ds.inMemoryDataSource, latestResult, latestTrrs, ObservationTimestamp{
Round: timestamp.Round,
Epoch: timestamp.Epoch,
Expand Down
Loading
Loading