Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KS-106: Refactor Discover Database implementation to support DON2DON use case #13401

Merged
merged 3 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/rich-melons-sin.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#db_update add persistence for DON-2-DON discovery announcements
2 changes: 1 addition & 1 deletion core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) {

var externalPeerWrapper p2ptypes.PeerWrapper
if cfg.Capabilities().Peering().Enabled() {
externalPeer := externalp2p.NewExternalPeerWrapper(keyStore.P2P(), cfg.Capabilities().Peering(), globalLogger)
externalPeer := externalp2p.NewExternalPeerWrapper(keyStore.P2P(), cfg.Capabilities().Peering(), opts.DS, globalLogger)
signer := externalPeer
externalPeerWrapper = externalPeer

Expand Down
48 changes: 37 additions & 11 deletions core/services/ocrcommon/discoverer_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ocrcommon

import (
"context"
"fmt"

"github.com/lib/pq"
"github.com/pkg/errors"
Expand All @@ -14,35 +15,60 @@ import (

var _ ocrnetworking.DiscovererDatabase = &DiscovererDatabase{}

const (
// ocrDiscovererTable is the name of the table used to store OCR announcements
ocrDiscovererTable = "ocr_discoverer_announcements"
// don2donDiscovererTable is the name of the table used to store DON2DON announcements
don2donDiscovererTable = "don2don_discoverer_announcements"
)

// DiscovererDatabase is a key-value store for p2p announcements
// that are based on the RageP2P library and bootstrap nodes
type DiscovererDatabase struct {
ds sqlutil.DataSource
peerID string
ds sqlutil.DataSource
peerID string
tableName string
}

// NewOCRDiscovererDatabase creates a new DiscovererDatabase for OCR announcements
func NewOCRDiscovererDatabase(ds sqlutil.DataSource, peerID string) *DiscovererDatabase {
return &DiscovererDatabase{
ds: ds,
peerID: peerID,
tableName: ocrDiscovererTable,
}
}

func NewDiscovererDatabase(ds sqlutil.DataSource, peerID string) *DiscovererDatabase {
// NewDON2DONDiscovererDatabase creates a new DiscovererDatabase for DON2DON announcements
func NewDON2DONDiscovererDatabase(ds sqlutil.DataSource, peerID string) *DiscovererDatabase {
krehermann marked this conversation as resolved.
Show resolved Hide resolved
return &DiscovererDatabase{
ds,
peerID,
ds: ds,
peerID: peerID,
tableName: don2donDiscovererTable,
}
}

// StoreAnnouncement has key-value-store semantics and stores a peerID (key) and an associated serialized
// announcement (value).
func (d *DiscovererDatabase) StoreAnnouncement(ctx context.Context, peerID string, ann []byte) error {
_, err := d.ds.ExecContext(ctx, `
INSERT INTO ocr_discoverer_announcements (local_peer_id, remote_peer_id, ann, created_at, updated_at)
VALUES ($1,$2,$3,NOW(),NOW()) ON CONFLICT (local_peer_id, remote_peer_id) DO UPDATE SET
q := fmt.Sprintf(`
INSERT INTO %s (local_peer_id, remote_peer_id, ann, created_at, updated_at)
VALUES ($1,$2,$3,NOW(),NOW()) ON CONFLICT (local_peer_id, remote_peer_id) DO UPDATE SET
ann = EXCLUDED.ann,
updated_at = EXCLUDED.updated_at
;`, d.peerID, peerID, ann)
;`, d.tableName)

_, err := d.ds.ExecContext(ctx,
q, d.peerID, peerID, ann)
return errors.Wrap(err, "DiscovererDatabase failed to StoreAnnouncement")
}

// ReadAnnouncements returns one serialized announcement (if available) for each of the peerIDs in the form of a map
// keyed by each announcement's corresponding peer ID.
func (d *DiscovererDatabase) ReadAnnouncements(ctx context.Context, peerIDs []string) (results map[string][]byte, err error) {
rows, err := d.ds.QueryContext(ctx, `
SELECT remote_peer_id, ann FROM ocr_discoverer_announcements WHERE remote_peer_id = ANY($1) AND local_peer_id = $2`, pq.Array(peerIDs), d.peerID)
q := fmt.Sprintf(`SELECT remote_peer_id, ann FROM %s WHERE remote_peer_id = ANY($1) AND local_peer_id = $2`, d.tableName)

rows, err := d.ds.QueryContext(ctx, q, pq.Array(peerIDs), d.peerID)
if err != nil {
return nil, errors.Wrap(err, "DiscovererDatabase failed to ReadAnnouncements")
}
Expand Down
142 changes: 82 additions & 60 deletions core/services/ocrcommon/discoverer_database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ocrcommon_test
import (
"crypto/ed25519"
"crypto/rand"
"fmt"
"testing"

ragep2ptypes "github.com/smartcontractkit/libocr/ragep2p/types"
Expand All @@ -21,66 +22,87 @@ func Test_DiscovererDatabase(t *testing.T) {
localPeerID1 := mustRandomP2PPeerID(t)
localPeerID2 := mustRandomP2PPeerID(t)

dd1 := ocrcommon.NewDiscovererDatabase(db, localPeerID1.Raw())
dd2 := ocrcommon.NewDiscovererDatabase(db, localPeerID2.Raw())

ctx := testutils.Context(t)

t.Run("StoreAnnouncement writes a value", func(t *testing.T) {
ann := []byte{1, 2, 3}
err := dd1.StoreAnnouncement(ctx, "remote1", ann)
assert.NoError(t, err)

// test upsert
ann = []byte{4, 5, 6}
err = dd1.StoreAnnouncement(ctx, "remote1", ann)
assert.NoError(t, err)

// write a different value
ann = []byte{7, 8, 9}
err = dd1.StoreAnnouncement(ctx, "remote2", ann)
assert.NoError(t, err)
})

t.Run("ReadAnnouncements reads values filtered by given peerIDs", func(t *testing.T) {
announcements, err := dd1.ReadAnnouncements(ctx, []string{"remote1", "remote2"})
require.NoError(t, err)

assert.Len(t, announcements, 2)
assert.Equal(t, []byte{4, 5, 6}, announcements["remote1"])
assert.Equal(t, []byte{7, 8, 9}, announcements["remote2"])

announcements, err = dd1.ReadAnnouncements(ctx, []string{"remote1"})
require.NoError(t, err)

assert.Len(t, announcements, 1)
assert.Equal(t, []byte{4, 5, 6}, announcements["remote1"])
})

t.Run("is scoped to local peer ID", func(t *testing.T) {
ann := []byte{10, 11, 12}
err := dd2.StoreAnnouncement(ctx, "remote1", ann)
assert.NoError(t, err)

announcements, err := dd2.ReadAnnouncements(ctx, []string{"remote1"})
require.NoError(t, err)
assert.Len(t, announcements, 1)
assert.Equal(t, []byte{10, 11, 12}, announcements["remote1"])

announcements, err = dd1.ReadAnnouncements(ctx, []string{"remote1"})
require.NoError(t, err)
assert.Len(t, announcements, 1)
assert.Equal(t, []byte{4, 5, 6}, announcements["remote1"])
})

t.Run("persists data across restarts", func(t *testing.T) {
dd3 := ocrcommon.NewDiscovererDatabase(db, localPeerID1.Raw())

announcements, err := dd3.ReadAnnouncements(ctx, []string{"remote1"})
require.NoError(t, err)
assert.Len(t, announcements, 1)
assert.Equal(t, []byte{4, 5, 6}, announcements["remote1"])
})
type test struct {
name string
dd1 *ocrcommon.DiscovererDatabase
dd2 *ocrcommon.DiscovererDatabase
}

tests := []test{
{
name: "ocr discoverer database",
dd1: ocrcommon.NewOCRDiscovererDatabase(db, localPeerID1.Raw()),
dd2: ocrcommon.NewOCRDiscovererDatabase(db, localPeerID2.Raw()),
},
{
name: "don2don discoverer database",
dd1: ocrcommon.NewDON2DONDiscovererDatabase(db, localPeerID1.Raw()),
dd2: ocrcommon.NewDON2DONDiscovererDatabase(db, localPeerID2.Raw()),
},
}

for _, tt := range tests {
dd1 := tt.dd1
dd2 := tt.dd2

ctx := testutils.Context(t)

t.Run(fmt.Sprintf("%s StoreAnnouncement writes a value", tt.name), func(t *testing.T) {
ann := []byte{1, 2, 3}
err := dd1.StoreAnnouncement(ctx, "remote1", ann)
assert.NoError(t, err)

// test upsert
ann = []byte{4, 5, 6}
err = dd1.StoreAnnouncement(ctx, "remote1", ann)
assert.NoError(t, err)

// write a different value
ann = []byte{7, 8, 9}
err = dd1.StoreAnnouncement(ctx, "remote2", ann)
assert.NoError(t, err)
})

t.Run(fmt.Sprintf("%s ReadAnnouncements reads values filtered by given peerIDs", tt.name), func(t *testing.T) {
announcements, err := dd1.ReadAnnouncements(ctx, []string{"remote1", "remote2"})
require.NoError(t, err)

assert.Len(t, announcements, 2)
assert.Equal(t, []byte{4, 5, 6}, announcements["remote1"])
assert.Equal(t, []byte{7, 8, 9}, announcements["remote2"])

announcements, err = dd1.ReadAnnouncements(ctx, []string{"remote1"})
require.NoError(t, err)

assert.Len(t, announcements, 1)
assert.Equal(t, []byte{4, 5, 6}, announcements["remote1"])
})

t.Run(fmt.Sprintf("%s is scoped to local peer ID", tt.name), func(t *testing.T) {
ann := []byte{10, 11, 12}
err := dd2.StoreAnnouncement(ctx, "remote1", ann)
assert.NoError(t, err)

announcements, err := dd2.ReadAnnouncements(ctx, []string{"remote1"})
require.NoError(t, err)
assert.Len(t, announcements, 1)
assert.Equal(t, []byte{10, 11, 12}, announcements["remote1"])

announcements, err = dd1.ReadAnnouncements(ctx, []string{"remote1"})
require.NoError(t, err)
assert.Len(t, announcements, 1)
assert.Equal(t, []byte{4, 5, 6}, announcements["remote1"])
})

t.Run(fmt.Sprintf("%s persists data across restarts", tt.name), func(t *testing.T) {
dd3 := ocrcommon.NewOCRDiscovererDatabase(db, localPeerID1.Raw())

announcements, err := dd3.ReadAnnouncements(ctx, []string{"remote1"})
require.NoError(t, err)
assert.Len(t, announcements, 1)
assert.Equal(t, []byte{4, 5, 6}, announcements["remote1"])
})
}
}

func mustRandomP2PPeerID(t *testing.T) p2pkey.PeerID {
Expand Down
2 changes: 1 addition & 1 deletion core/services/ocrcommon/peer_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (p *SingletonPeerWrapper) peerConfig() (ocrnetworking.PeerConfig, error) {
}
p.PeerID = key.PeerID()

discovererDB := NewDiscovererDatabase(p.ds, p.PeerID.Raw())
discovererDB := NewOCRDiscovererDatabase(p.ds, p.PeerID.Raw())

config := p.p2pCfg
peerConfig := ocrnetworking.PeerConfig{
Expand Down
26 changes: 15 additions & 11 deletions core/services/p2p/wrapper/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@ import (
"github.com/smartcontractkit/libocr/commontypes"
ragetypes "github.com/smartcontractkit/libocr/ragep2p/types"

"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"

"github.com/smartcontractkit/chainlink/v2/core/config"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore"
"github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon"
"github.com/smartcontractkit/chainlink/v2/core/services/p2p"
"github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
)
Expand All @@ -23,16 +26,18 @@ type peerWrapper struct {
p2pConfig config.P2P
privateKey ed25519.PrivateKey
lggr logger.Logger
ds sqlutil.DataSource
}

var _ types.PeerWrapper = &peerWrapper{}
var _ types.Signer = &peerWrapper{}

func NewExternalPeerWrapper(keystoreP2P keystore.P2P, p2pConfig config.P2P, lggr logger.Logger) *peerWrapper {
func NewExternalPeerWrapper(keystoreP2P keystore.P2P, p2pConfig config.P2P, ds sqlutil.DataSource, lggr logger.Logger) *peerWrapper {
return &peerWrapper{
keystoreP2P: keystoreP2P,
p2pConfig: p2pConfig,
lggr: lggr,
ds: ds,
}
}

Expand All @@ -42,28 +47,27 @@ func (e *peerWrapper) GetPeer() types.Peer {

// convert to "external" P2P PeerConfig, which is independent of OCR
// this has to be done in Start() because keystore is not unlocked at construction time
func convertPeerConfig(keystoreP2P keystore.P2P, p2pConfig config.P2P) (p2p.PeerConfig, error) {
key, err := keystoreP2P.GetOrFirst(p2pConfig.PeerID())
func (e *peerWrapper) convertPeerConfig() (p2p.PeerConfig, error) {
key, err := e.keystoreP2P.GetOrFirst(e.p2pConfig.PeerID())
if err != nil {
return p2p.PeerConfig{}, err
}

// TODO(KS-106): use real DB
discovererDB := p2p.NewInMemoryDiscovererDatabase()
bootstrappers, err := convertBootstrapperLocators(p2pConfig.V2().DefaultBootstrappers())
discovererDB := ocrcommon.NewDON2DONDiscovererDatabase(e.ds, key.PeerID().Raw())
bootstrappers, err := convertBootstrapperLocators(e.p2pConfig.V2().DefaultBootstrappers())
if err != nil {
return p2p.PeerConfig{}, err
}

peerConfig := p2p.PeerConfig{
PrivateKey: key.PrivKey,

ListenAddresses: p2pConfig.V2().ListenAddresses(),
AnnounceAddresses: p2pConfig.V2().AnnounceAddresses(),
ListenAddresses: e.p2pConfig.V2().ListenAddresses(),
AnnounceAddresses: e.p2pConfig.V2().AnnounceAddresses(),
Bootstrappers: bootstrappers,

DeltaReconcile: p2pConfig.V2().DeltaReconcile().Duration(),
DeltaDial: p2pConfig.V2().DeltaDial().Duration(),
DeltaReconcile: e.p2pConfig.V2().DeltaReconcile().Duration(),
DeltaDial: e.p2pConfig.V2().DeltaDial().Duration(),
DiscovererDatabase: discovererDB,

// NOTE: this is equivalent to prometheus.DefaultRegisterer, but we need to use a separate
Expand Down Expand Up @@ -95,7 +99,7 @@ func convertBootstrapperLocators(bootstrappers []commontypes.BootstrapperLocator
}

func (e *peerWrapper) Start(ctx context.Context) error {
cfg, err := convertPeerConfig(e.keystoreP2P, e.p2pConfig)
cfg, err := e.convertPeerConfig()
if err != nil {
return err
}
Expand Down
5 changes: 4 additions & 1 deletion core/services/p2p/wrapper/wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/chainlink"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/p2pkey"
Expand All @@ -18,6 +19,8 @@ import (
)

func TestPeerWrapper_CleanStartClose(t *testing.T) {
db := pgtest.NewSqlxDB(t)

lggr := logger.TestLogger(t)
port := freeport.GetOne(t)
cfg := configtest.NewGeneralConfig(t, func(c *chainlink.Config, s *chainlink.Secrets) {
Expand All @@ -30,7 +33,7 @@ func TestPeerWrapper_CleanStartClose(t *testing.T) {
require.NoError(t, err)
keystoreP2P.On("GetOrFirst", mock.Anything).Return(key, nil)

wrapper := wrapper.NewExternalPeerWrapper(keystoreP2P, cfg.Capabilities().Peering(), lggr)
wrapper := wrapper.NewExternalPeerWrapper(keystoreP2P, cfg.Capabilities().Peering(), db, lggr)
require.NotNil(t, wrapper)
require.NoError(t, wrapper.Start(testutils.Context(t)))
require.NoError(t, wrapper.Close())
Expand Down
14 changes: 14 additions & 0 deletions core/store/migrate/migrations/0240_don2don_discoverer.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
-- +goose Up
-- this migration is for the don2don_discoverer_announcements table
-- it is essentially the same as ocr_discoverer_announcements but scoped to the don2don use case
-- both cases are based on RageP2P library and bootstrap nodes. for now but we want to keep their addresses separate to avoid accidental cross-communication
CREATE TABLE don2don_discoverer_announcements (
local_peer_id text NOT NULL,
remote_peer_id text NOT NULL,
ann bytea NOT NULL,
created_at timestamptz not null,
updated_at timestamptz not null,
PRIMARY KEY(local_peer_id, remote_peer_id)
);
-- +goose Down
DROP TABLE don2don_discoverer_announcements;
Loading