Skip to content

Commit

Permalink
BCF-3052 - Job Based KV Store and juelsFeePerCoin reboot persistence (#…
Browse files Browse the repository at this point in the history
…12392)

* Add kv store migration

* Add kv store implementation

* Init kv store in ocr2 delegate and pass into median service

* Update ds cache to have kv store fallback for final observation value

* Prettify ds cache updateCache, add ERR log severity on consecutive errs

* Add ds cache test for cache value persistence

* Remove unused field in jobKVStore

* Make sonar SQL migration lint happy

* Rename TestJobKVStore

* Add kv store mock

* Add changeset file

* Fix sonar sql lint

* Change kv orm to use raw json message instead of jsonText

* minor change

* minor change

* Fix SQ SQL lint

* Add comments in KVStore

* Rename jobKVStore to kVStore and return struct from constructor

* Update core/store/migrate/migrations/0227_kv_store_table.sql

Co-authored-by: Sam <[email protected]>

* Update kVStore sql to match migration

* Add more kv_orm tests

---------

Co-authored-by: Sam <[email protected]>
  • Loading branch information
ilija42 and samsondav committed Mar 12, 2024
1 parent 32507dc commit 4592b9a
Show file tree
Hide file tree
Showing 9 changed files with 353 additions and 41 deletions.
5 changes: 5 additions & 0 deletions .changeset/lemon-ladybugs-doubt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Add kv store tied to jobs and use it for juels fee per coin cache to store persisted values for backup
68 changes: 68 additions & 0 deletions core/services/job/kv_orm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package job

import (
"encoding/json"
"fmt"
"time"

"github.com/jmoiron/sqlx"
"github.com/jmoiron/sqlx/types"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
)

// KVStore is a simple KV store that can store and retrieve serializable data.
//
//go:generate mockery --quiet --name KVStore --output ./mocks/ --case=underscore
type KVStore interface {
Store(key string, val interface{}) error
Get(key string, dest interface{}) error
}

type kVStore struct {
jobID int32
q pg.Q
lggr logger.SugaredLogger
}

var _ KVStore = (*kVStore)(nil)

func NewKVStore(jobID int32, db *sqlx.DB, cfg pg.QConfig, lggr logger.Logger) kVStore {
namedLogger := logger.Sugared(lggr.Named("JobORM"))
return kVStore{
jobID: jobID,
q: pg.NewQ(db, namedLogger, cfg),
lggr: namedLogger,
}
}

// Store saves serializable value by key.
func (kv kVStore) Store(key string, val interface{}) error {
jsonVal, err := json.Marshal(val)
if err != nil {
return err
}

sql := `INSERT INTO job_kv_store (job_id, key, val)
VALUES ($1, $2, $3)
ON CONFLICT (job_id, key) DO UPDATE SET
val = EXCLUDED.val,
updated_at = $4;`

if err = kv.q.ExecQ(sql, kv.jobID, key, types.JSONText(jsonVal), time.Now()); err != nil {
return fmt.Errorf("failed to store value: %s for key: %s for jobID: %d : %w", string(jsonVal), key, kv.jobID, err)
}
return nil
}

// Get retrieves serializable value by key.
func (kv kVStore) Get(key string, dest interface{}) error {
var ret json.RawMessage
sql := "SELECT val FROM job_kv_store WHERE job_id = $1 AND key = $2"
if err := kv.q.Get(&ret, sql, kv.jobID, key); err != nil {
return fmt.Errorf("failed to get value by key: %s for jobID: %d : %w", key, kv.jobID, err)
}

return json.Unmarshal(ret, dest)
}
85 changes: 85 additions & 0 deletions core/services/job/kv_orm_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package job_test

import (
"fmt"
"reflect"
"testing"

"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/bridges"
"github.com/smartcontractkit/chainlink/v2/core/internal/cltest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/directrequest"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
"github.com/smartcontractkit/chainlink/v2/core/testdata/testspecs"
)

func TestJobKVStore(t *testing.T) {
config := configtest.NewTestGeneralConfig(t)
db := pgtest.NewSqlxDB(t)

lggr := logger.TestLogger(t)

pipelineORM := pipeline.NewORM(db, logger.TestLogger(t), config.Database(), config.JobPipeline().MaxSuccessfulRuns())
bridgesORM := bridges.NewORM(db, logger.TestLogger(t), config.Database())

jobID := int32(1337)
kvStore := job.NewKVStore(jobID, db, config.Database(), lggr)
jobORM := NewTestORM(t, db, pipelineORM, bridgesORM, cltest.NewKeyStore(t, db, config.Database()), config.Database())

jb, err := directrequest.ValidatedDirectRequestSpec(testspecs.GetDirectRequestSpec())
require.NoError(t, err)
jb.ID = jobID
require.NoError(t, jobORM.CreateJob(&jb))

type testData struct {
Test string
}

type nested struct {
Contact testData // Nested struct
}

values := []interface{}{
42, // int
"hello", // string
3.14, // float64
true, // bool
[]int{1, 2, 3}, // slice of ints
map[string]int{"a": 1, "b": 2}, // map of string to int
testData{Test: "value1"}, // regular struct
nested{testData{"value2"}}, // nested struct
}

for i, value := range values {
testKey := "test_key_" + fmt.Sprint(i)
require.NoError(t, kvStore.Store(testKey, value))

// Get the type of the current value
valueType := reflect.TypeOf(value)
// Create a new instance of the value's type
temp := reflect.New(valueType).Interface()

require.NoError(t, kvStore.Get(testKey, &temp))

tempValue := reflect.ValueOf(temp).Elem().Interface()
require.Equal(t, value, tempValue)
}

key := "test_key_updating"
td1 := testData{Test: "value1"}
td2 := testData{Test: "value2"}

var retData testData
require.NoError(t, kvStore.Store(key, td1))
require.NoError(t, kvStore.Get(key, &retData))
require.Equal(t, td1, retData)

require.NoError(t, kvStore.Store(key, td2))
require.NoError(t, kvStore.Get(key, &retData))
require.Equal(t, td2, retData)
}
60 changes: 60 additions & 0 deletions core/services/job/mocks/kv_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,8 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, jb job.Job) ([]job.Servi
}
lggr := logger.Sugared(d.lggr.Named(jb.ExternalJobID.String()).With(lggrCtx.Args()...))

kvStore := job.NewKVStore(jb.ID, d.db, d.cfg.Database(), lggr)

rid, err := spec.RelayID()
if err != nil {
return nil, ErrJobSpecNoRelayer{Err: err, PluginName: string(spec.PluginType)}
Expand Down Expand Up @@ -448,7 +450,7 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, jb job.Job) ([]job.Servi
return d.newServicesLLO(ctx, lggr, jb, bootstrapPeers, kb, ocrDB, lc, ocrLogger)

case types.Median:
return d.newServicesMedian(ctx, lggr, jb, bootstrapPeers, kb, ocrDB, lc, ocrLogger)
return d.newServicesMedian(ctx, lggr, jb, bootstrapPeers, kb, kvStore, ocrDB, lc, ocrLogger)

case types.DKG:
return d.newServicesDKG(lggr, jb, bootstrapPeers, kb, ocrDB, lc, ocrLogger)
Expand Down Expand Up @@ -927,6 +929,7 @@ func (d *Delegate) newServicesMedian(
jb job.Job,
bootstrapPeers []commontypes.BootstrapperLocator,
kb ocr2key.KeyBundle,
kvStore job.KVStore,
ocrDB *db,
lc ocrtypes.LocalConfig,
ocrLogger commontypes.Logger,
Expand Down Expand Up @@ -962,7 +965,7 @@ func (d *Delegate) newServicesMedian(
return nil, ErrRelayNotEnabled{Err: err, PluginName: "median", Relay: spec.Relay}
}

medianServices, err2 := median.NewMedianServices(ctx, jb, d.isNewlyCreatedJob, relayer, d.pipelineRunner, lggr, oracleArgsNoPlugin, mConfig, enhancedTelemChan, errorLog)
medianServices, err2 := median.NewMedianServices(ctx, jb, d.isNewlyCreatedJob, relayer, kvStore, d.pipelineRunner, lggr, oracleArgsNoPlugin, mConfig, enhancedTelemChan, errorLog)

if ocrcommon.ShouldCollectEnhancedTelemetry(&jb) {
enhancedTelemService := ocrcommon.NewEnhancedTelemetryService(&jb, enhancedTelemChan, make(chan struct{}), d.monitoringEndpointGen.GenMonitoringEndpoint(rid.Network, rid.ChainID, spec.ContractID, synchronization.EnhancedEA), lggr.Named("EnhancedTelemetry"))
Expand Down
3 changes: 2 additions & 1 deletion core/services/ocr2/plugins/median/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func NewMedianServices(ctx context.Context,
jb job.Job,
isNewlyCreatedJob bool,
relayer loop.Relayer,
kvStore job.KVStore,
pipelineRunner pipeline.Runner,
lggr logger.Logger,
argsNoPlugin libocr.OCR2OracleArgs,
Expand Down Expand Up @@ -128,7 +129,7 @@ func NewMedianServices(ctx context.Context,

if !pluginConfig.JuelsPerFeeCoinCacheDisabled {
lggr.Infof("juelsPerFeeCoin data source caching is enabled")
if juelsPerFeeCoinSource, err = ocrcommon.NewInMemoryDataSourceCache(juelsPerFeeCoinSource, pluginConfig.JuelsPerFeeCoinCacheDuration.Duration()); err != nil {
if juelsPerFeeCoinSource, err = ocrcommon.NewInMemoryDataSourceCache(juelsPerFeeCoinSource, kvStore, pluginConfig.JuelsPerFeeCoinCacheDuration.Duration()); err != nil {
return nil, err
}
}
Expand Down
Loading

0 comments on commit 4592b9a

Please sign in to comment.