Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BCF-3029 changes required to work with the updated chainlink common key value store #12634

Merged
merged 1 commit into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/large-flowers-agree.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

Update keyvalue store to be compatible with the interface required in chainlink common
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ require (
github.com/prometheus/client_golang v1.17.0
github.com/shopspring/decimal v1.3.1
github.com/smartcontractkit/chainlink-automation v1.0.2
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240401172519-4bfc659b80bf
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240402105740-0be47ab9cf63
github.com/smartcontractkit/chainlink-vrf v0.0.0-20231120191722-fef03814f868
github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000
github.com/smartcontractkit/libocr v0.0.0-20240326191951-2bbe9382d052
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1187,8 +1187,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq
github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE=
github.com/smartcontractkit/chainlink-automation v1.0.2 h1:xsfyuswL15q2YBGQT3qn2SBz6fnSKiSW7XZ8IZQLpnI=
github.com/smartcontractkit/chainlink-automation v1.0.2/go.mod h1:RjboV0Qd7YP+To+OrzHGXaxUxoSONveCoAK2TQ1INLU=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240401172519-4bfc659b80bf h1:yW8rTFycozLVnXRyOgZWGktnmzoFLxSWh1xPJXsp7vg=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240401172519-4bfc659b80bf/go.mod h1:kstYjAGqBswdZpl7YkSPeXBDVwaY1VaR6tUMPWl8ykA=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240402105740-0be47ab9cf63 h1:wX78l6lMQ6hfwqpOkavD/IyXqBDZ8MZOhhBE9z15Sd0=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240402105740-0be47ab9cf63/go.mod h1:kstYjAGqBswdZpl7YkSPeXBDVwaY1VaR6tUMPWl8ykA=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8 h1:I326nw5GwHQHsLKHwtu5Sb9EBLylC8CfUd7BFAS0jtg=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8/go.mod h1:a65NtrK4xZb01mf0dDNghPkN2wXgcqFQ55ADthVBgMc=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 h1:xFSv8561jsLtF6gYZr/zW2z5qUUAkcFkApin2mnbYTo=
Expand Down
37 changes: 16 additions & 21 deletions core/services/job/kv_orm.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package job

import (
"encoding/json"
"context"
"fmt"
"time"

"github.com/jmoiron/sqlx"
"github.com/jmoiron/sqlx/types"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
Expand All @@ -16,8 +15,8 @@ import (
//
//go:generate mockery --quiet --name KVStore --output ./mocks/ --case=underscore
type KVStore interface {
Store(key string, val interface{}) error
Get(key string, dest interface{}) error
Store(ctx context.Context, key string, val []byte) error
Get(ctx context.Context, key string) ([]byte, error)
}

type kVStore struct {
Expand All @@ -37,32 +36,28 @@ func NewKVStore(jobID int32, db *sqlx.DB, cfg pg.QConfig, lggr logger.Logger) kV
}
}

// Store saves serializable value by key.
func (kv kVStore) Store(key string, val interface{}) error {
jsonVal, err := json.Marshal(val)
if err != nil {
return err
}
// Store saves []byte value by key.
func (kv kVStore) Store(ctx context.Context, key string, val []byte) error {

sql := `INSERT INTO job_kv_store (job_id, key, val)
sql := `INSERT INTO job_kv_store (job_id, key, val_bytea)
VALUES ($1, $2, $3)
ON CONFLICT (job_id, key) DO UPDATE SET
val = EXCLUDED.val,
val_bytea = EXCLUDED.val_bytea,
updated_at = $4;`

if err = kv.q.ExecQ(sql, kv.jobID, key, types.JSONText(jsonVal), time.Now()); err != nil {
return fmt.Errorf("failed to store value: %s for key: %s for jobID: %d : %w", string(jsonVal), key, kv.jobID, err)
if _, err := kv.q.ExecContext(ctx, sql, kv.jobID, key, val, time.Now()); err != nil {
return fmt.Errorf("failed to store value: %s for key: %s for jobID: %d : %w", string(val), key, kv.jobID, err)
}
return nil
}

// Get retrieves serializable value by key.
func (kv kVStore) Get(key string, dest interface{}) error {
var ret json.RawMessage
sql := "SELECT val FROM job_kv_store WHERE job_id = $1 AND key = $2"
if err := kv.q.Get(&ret, sql, kv.jobID, key); err != nil {
return fmt.Errorf("failed to get value by key: %s for jobID: %d : %w", key, kv.jobID, err)
// Get retrieves []byte value by key.
func (kv kVStore) Get(ctx context.Context, key string) ([]byte, error) {
var val []byte
sql := "SELECT val_bytea FROM job_kv_store WHERE job_id = $1 AND key = $2"
if err := kv.q.GetContext(ctx, &val, sql, kv.jobID, key); err != nil {
return nil, fmt.Errorf("failed to get value by key: %s for jobID: %d : %w", key, kv.jobID, err)
}

return json.Unmarshal(ret, dest)
return val, nil
}
63 changes: 26 additions & 37 deletions core/services/job/kv_orm_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package job_test

import (
"context"
"fmt"
"reflect"
"testing"

"github.com/stretchr/testify/assert"

"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/bridges"
Expand All @@ -19,6 +21,9 @@ import (
)

func TestJobKVStore(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

config := configtest.NewTestGeneralConfig(t)
db := pgtest.NewSqlxDB(t)

Expand All @@ -36,52 +41,36 @@ func TestJobKVStore(t *testing.T) {
jb.ID = jobID
require.NoError(t, jobORM.CreateJob(&jb))

type testData struct {
Test string
var values = [][]byte{
[]byte("Hello"),
[]byte("World"),
[]byte("Go"),
}

type nested struct {
Contact testData // Nested struct
}

values := []interface{}{
42, // int
"hello", // string
3.14, // float64
true, // bool
[]int{1, 2, 3}, // slice of ints
map[string]int{"a": 1, "b": 2}, // map of string to int
testData{Test: "value1"}, // regular struct
nested{testData{"value2"}}, // nested struct
}

for i, value := range values {
for i, insertBytes := range values {
testKey := "test_key_" + fmt.Sprint(i)
require.NoError(t, kvStore.Store(testKey, value))

// Get the type of the current value
valueType := reflect.TypeOf(value)
// Create a new instance of the value's type
temp := reflect.New(valueType).Interface()
require.NoError(t, kvStore.Store(ctx, testKey, insertBytes))

require.NoError(t, kvStore.Get(testKey, &temp))
var readBytes []byte
readBytes, err = kvStore.Get(ctx, testKey)
assert.NoError(t, err)

tempValue := reflect.ValueOf(temp).Elem().Interface()
require.Equal(t, value, tempValue)
require.Equal(t, insertBytes, readBytes)
}

key := "test_key_updating"
td1 := testData{Test: "value1"}
td2 := testData{Test: "value2"}
td1 := []byte("value1")
td2 := []byte("value2")

var retData testData
require.NoError(t, kvStore.Store(key, td1))
require.NoError(t, kvStore.Get(key, &retData))
require.Equal(t, td1, retData)
require.NoError(t, kvStore.Store(ctx, key, td1))
fetchedBytes, err := kvStore.Get(ctx, key)
require.NoError(t, err)
require.Equal(t, td1, fetchedBytes)

require.NoError(t, kvStore.Store(key, td2))
require.NoError(t, kvStore.Get(key, &retData))
require.Equal(t, td2, retData)
require.NoError(t, kvStore.Store(ctx, key, td2))
fetchedBytes, err = kvStore.Get(ctx, key)
require.NoError(t, err)
require.Equal(t, td2, fetchedBytes)

require.NoError(t, jobORM.DeleteJob(jobID))
}
44 changes: 30 additions & 14 deletions core/services/job/mocks/kv_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 7 additions & 3 deletions core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,8 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, jb job.Job) ([]job.Servi
return d.newServicesOCR2Functions(ctx, lggr, jb, bootstrapPeers, kb, ocrDB, thresholdPluginDB, s4PluginDB, lc, ocrLogger)

case types.GenericPlugin:
return d.newServicesGenericPlugin(ctx, lggr, jb, bootstrapPeers, kb, ocrDB, lc, ocrLogger, d.capabilitiesRegistry)
return d.newServicesGenericPlugin(ctx, lggr, jb, bootstrapPeers, kb, ocrDB, lc, ocrLogger, d.capabilitiesRegistry,
kvStore)

default:
return nil, errors.Errorf("plugin type %s not supported", spec.PluginType)
Expand Down Expand Up @@ -531,6 +532,7 @@ func (d *Delegate) newServicesGenericPlugin(
lc ocrtypes.LocalConfig,
ocrLogger commontypes.Logger,
capabilitiesRegistry types.CapabilitiesRegistry,
keyValueStore types.KeyValueStore,
) (srvs []job.ServiceCtx, err error) {
spec := jb.OCR2OracleSpec

Expand Down Expand Up @@ -649,7 +651,8 @@ func (d *Delegate) newServicesGenericPlugin(

switch pCfg.OCRVersion {
case 2:
plugin := reportingplugins.NewLOOPPService(pluginLggr, grpcOpts, cmdFn, pluginConfig, providerClientConn, pr, ta, errorLog)
plugin := reportingplugins.NewLOOPPService(pluginLggr, grpcOpts, cmdFn, pluginConfig, providerClientConn, pr, ta,
errorLog, keyValueStore)
oracleArgs := libocr2.OCR2OracleArgs{
BinaryNetworkEndpointFactory: d.peerWrapper.Peer2,
V2Bootstrappers: bootstrapPeers,
Expand All @@ -674,7 +677,8 @@ func (d *Delegate) newServicesGenericPlugin(

case 3:
//OCR3 with OCR2 OnchainKeyring and ContractTransmitter
plugin := ocr3.NewLOOPPService(pluginLggr, grpcOpts, cmdFn, pluginConfig, providerClientConn, pr, ta, errorLog, capabilitiesRegistry)
plugin := ocr3.NewLOOPPService(pluginLggr, grpcOpts, cmdFn, pluginConfig, providerClientConn, pr, ta, errorLog,
capabilitiesRegistry, keyValueStore)
contractTransmitter := ocrcommon.NewOCR3ContractTransmitterAdapter(provider.ContractTransmitter())
oracleArgs := libocr2.OCR3OracleArgs[[]byte]{
BinaryNetworkEndpointFactory: d.peerWrapper.Peer2,
Expand Down
21 changes: 18 additions & 3 deletions core/services/ocrcommon/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package ocrcommon

import (
"context"
"encoding/json"
errjoin "errors"
"fmt"
"math/big"
"sync"
"time"
Expand Down Expand Up @@ -308,7 +310,13 @@ func (ds *inMemoryDataSourceCache) updateCache(ctx context.Context) error {
}

// backup in case data source fails continuously and node gets rebooted
if err = ds.kvStore.Store(dataSourceCacheKey, &ResultTimePair{Result: *serializablebig.New(value), Time: time.Now()}); err != nil {

timePairBytes, err := json.Marshal(&ResultTimePair{Result: *serializablebig.New(value), Time: time.Now()})
if err != nil {
return fmt.Errorf("failed to marshal result time pair, err: %w", err)
}

if err = ds.kvStore.Store(ctx, dataSourceCacheKey, timePairBytes); err != nil {
ds.lggr.Errorf("failed to persist latest task run value, err: %v", err)
}

Expand Down Expand Up @@ -338,9 +346,16 @@ func (ds *inMemoryDataSourceCache) Observe(ctx context.Context, timestamp ocr2ty
latestResult, latestTrrs := ds.get(ctx)
if latestTrrs == nil {
ds.lggr.Warnf("cache is empty, returning persisted value now")
if err := ds.kvStore.Get(dataSourceCacheKey, &resTime); err != nil {
return nil, err

timePairBytes, err := ds.kvStore.Get(ctx, dataSourceCacheKey)
if err != nil {
return nil, fmt.Errorf("failed to get result time pair bytes, err: %w", err)
}

if err := json.Unmarshal(timePairBytes, &resTime); err != nil {
return nil, fmt.Errorf("failed to unmarshal result time pair bytes, err: %w", err)
}

if time.Since(resTime.Time) >= defaultCacheFreshnessAlert {
ds.lggr.Errorf("cache hasn't been updated for over %v, latestUpdateErr is: %v", defaultCacheFreshnessAlert, ds.latestUpdateErr)
}
Expand Down
15 changes: 8 additions & 7 deletions core/services/ocrcommon/data_source_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ocrcommon_test

import (
"encoding/json"
"fmt"
"math/big"
"testing"
Expand Down Expand Up @@ -75,8 +76,8 @@ func Test_CachedInMemoryDataSourceErrHandling(t *testing.T) {
runner := pipelinemocks.NewRunner(t)
ds := ocrcommon.NewInMemoryDataSource(runner, job.Job{}, pipeline.Spec{}, logger.TestLogger(t))
mockKVStore := mocks.KVStore{}
mockKVStore.On("Store", mock.Anything, mock.Anything).Return(nil)
mockKVStore.On("Get", mock.Anything, mock.IsType(&ocrcommon.ResultTimePair{})).Return(nil)
mockKVStore.On("Store", mock.Anything, mock.Anything, mock.Anything).Return(nil)
mockKVStore.On("Get", mock.Anything, mock.Anything).Return(nil, nil)
dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, time.Second*2)
require.NoError(t, err)
servicetest.Run(t, dsCache)
Expand Down Expand Up @@ -105,10 +106,10 @@ func Test_CachedInMemoryDataSourceErrHandling(t *testing.T) {

mockKVStore := mocks.KVStore{}
persistedVal := serializablebig.NewI(1337)
mockKVStore.On("Get", mock.Anything, mock.IsType(&ocrcommon.ResultTimePair{})).Return(nil).Run(func(args mock.Arguments) {
arg := args.Get(1).(*ocrcommon.ResultTimePair)
arg.Result = *persistedVal
})

result, err := json.Marshal(&ocrcommon.ResultTimePair{Result: *persistedVal, Time: time.Now()})
assert.NoError(t, err)
mockKVStore.On("Get", mock.Anything, mock.Anything).Return(result, nil)

// set updater to a long time so that it doesn't log errors after the test is done
dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, time.Hour*100)
Expand All @@ -127,7 +128,7 @@ func Test_CachedInMemoryDataSourceErrHandling(t *testing.T) {
ds := ocrcommon.NewInMemoryDataSource(runner, job.Job{}, pipeline.Spec{}, logger.TestLogger(t))

mockKVStore := mocks.KVStore{}
mockKVStore.On("Get", mock.Anything, mock.IsType(&ocrcommon.ResultTimePair{})).Return(nil).Return(assert.AnError)
mockKVStore.On("Get", mock.Anything, mock.Anything).Return(nil, assert.AnError)

// set updater to a long time so that it doesn't log errors after the test is done
dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, time.Hour*100)
Expand Down
Loading
Loading