Skip to content

Commit

Permalink
KS-106: Refactor Discover Database implementation to support DON2DON …
Browse files Browse the repository at this point in the history
…use case (#13401)

* Refactor Discover Database implementation to support DON2DON use case

* add tests

* documentation
  • Loading branch information
krehermann authored Jun 4, 2024
1 parent a57c2a5 commit 905830c
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 85 deletions.
5 changes: 5 additions & 0 deletions .changeset/rich-melons-sin.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#db_update add persistence for DON-2-DON discovery announcements
2 changes: 1 addition & 1 deletion core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) {

var externalPeerWrapper p2ptypes.PeerWrapper
if cfg.Capabilities().Peering().Enabled() {
externalPeer := externalp2p.NewExternalPeerWrapper(keyStore.P2P(), cfg.Capabilities().Peering(), globalLogger)
externalPeer := externalp2p.NewExternalPeerWrapper(keyStore.P2P(), cfg.Capabilities().Peering(), opts.DS, globalLogger)
signer := externalPeer
externalPeerWrapper = externalPeer

Expand Down
48 changes: 37 additions & 11 deletions core/services/ocrcommon/discoverer_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ocrcommon

import (
"context"
"fmt"

"github.com/lib/pq"
"github.com/pkg/errors"
Expand All @@ -14,35 +15,60 @@ import (

var _ ocrnetworking.DiscovererDatabase = &DiscovererDatabase{}

const (
// ocrDiscovererTable is the name of the table used to store OCR announcements
ocrDiscovererTable = "ocr_discoverer_announcements"
// don2donDiscovererTable is the name of the table used to store DON2DON announcements
don2donDiscovererTable = "don2don_discoverer_announcements"
)

// DiscovererDatabase is a key-value store for p2p announcements
// that are based on the RageP2P library and bootstrap nodes
type DiscovererDatabase struct {
ds sqlutil.DataSource
peerID string
ds sqlutil.DataSource
peerID string
tableName string
}

// NewOCRDiscovererDatabase creates a new DiscovererDatabase for OCR announcements
func NewOCRDiscovererDatabase(ds sqlutil.DataSource, peerID string) *DiscovererDatabase {
return &DiscovererDatabase{
ds: ds,
peerID: peerID,
tableName: ocrDiscovererTable,
}
}

func NewDiscovererDatabase(ds sqlutil.DataSource, peerID string) *DiscovererDatabase {
// NewDON2DONDiscovererDatabase creates a new DiscovererDatabase for DON2DON announcements
func NewDON2DONDiscovererDatabase(ds sqlutil.DataSource, peerID string) *DiscovererDatabase {
return &DiscovererDatabase{
ds,
peerID,
ds: ds,
peerID: peerID,
tableName: don2donDiscovererTable,
}
}

// StoreAnnouncement has key-value-store semantics and stores a peerID (key) and an associated serialized
// announcement (value).
func (d *DiscovererDatabase) StoreAnnouncement(ctx context.Context, peerID string, ann []byte) error {
_, err := d.ds.ExecContext(ctx, `
INSERT INTO ocr_discoverer_announcements (local_peer_id, remote_peer_id, ann, created_at, updated_at)
VALUES ($1,$2,$3,NOW(),NOW()) ON CONFLICT (local_peer_id, remote_peer_id) DO UPDATE SET
q := fmt.Sprintf(`
INSERT INTO %s (local_peer_id, remote_peer_id, ann, created_at, updated_at)
VALUES ($1,$2,$3,NOW(),NOW()) ON CONFLICT (local_peer_id, remote_peer_id) DO UPDATE SET
ann = EXCLUDED.ann,
updated_at = EXCLUDED.updated_at
;`, d.peerID, peerID, ann)
;`, d.tableName)

_, err := d.ds.ExecContext(ctx,
q, d.peerID, peerID, ann)
return errors.Wrap(err, "DiscovererDatabase failed to StoreAnnouncement")
}

// ReadAnnouncements returns one serialized announcement (if available) for each of the peerIDs in the form of a map
// keyed by each announcement's corresponding peer ID.
func (d *DiscovererDatabase) ReadAnnouncements(ctx context.Context, peerIDs []string) (results map[string][]byte, err error) {
rows, err := d.ds.QueryContext(ctx, `
SELECT remote_peer_id, ann FROM ocr_discoverer_announcements WHERE remote_peer_id = ANY($1) AND local_peer_id = $2`, pq.Array(peerIDs), d.peerID)
q := fmt.Sprintf(`SELECT remote_peer_id, ann FROM %s WHERE remote_peer_id = ANY($1) AND local_peer_id = $2`, d.tableName)

rows, err := d.ds.QueryContext(ctx, q, pq.Array(peerIDs), d.peerID)
if err != nil {
return nil, errors.Wrap(err, "DiscovererDatabase failed to ReadAnnouncements")
}
Expand Down
142 changes: 82 additions & 60 deletions core/services/ocrcommon/discoverer_database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ocrcommon_test
import (
"crypto/ed25519"
"crypto/rand"
"fmt"
"testing"

ragep2ptypes "github.com/smartcontractkit/libocr/ragep2p/types"
Expand All @@ -21,66 +22,87 @@ func Test_DiscovererDatabase(t *testing.T) {
localPeerID1 := mustRandomP2PPeerID(t)
localPeerID2 := mustRandomP2PPeerID(t)

dd1 := ocrcommon.NewDiscovererDatabase(db, localPeerID1.Raw())
dd2 := ocrcommon.NewDiscovererDatabase(db, localPeerID2.Raw())

ctx := testutils.Context(t)

t.Run("StoreAnnouncement writes a value", func(t *testing.T) {
ann := []byte{1, 2, 3}
err := dd1.StoreAnnouncement(ctx, "remote1", ann)
assert.NoError(t, err)

// test upsert
ann = []byte{4, 5, 6}
err = dd1.StoreAnnouncement(ctx, "remote1", ann)
assert.NoError(t, err)

// write a different value
ann = []byte{7, 8, 9}
err = dd1.StoreAnnouncement(ctx, "remote2", ann)
assert.NoError(t, err)
})

t.Run("ReadAnnouncements reads values filtered by given peerIDs", func(t *testing.T) {
announcements, err := dd1.ReadAnnouncements(ctx, []string{"remote1", "remote2"})
require.NoError(t, err)

assert.Len(t, announcements, 2)
assert.Equal(t, []byte{4, 5, 6}, announcements["remote1"])
assert.Equal(t, []byte{7, 8, 9}, announcements["remote2"])

announcements, err = dd1.ReadAnnouncements(ctx, []string{"remote1"})
require.NoError(t, err)

assert.Len(t, announcements, 1)
assert.Equal(t, []byte{4, 5, 6}, announcements["remote1"])
})

t.Run("is scoped to local peer ID", func(t *testing.T) {
ann := []byte{10, 11, 12}
err := dd2.StoreAnnouncement(ctx, "remote1", ann)
assert.NoError(t, err)

announcements, err := dd2.ReadAnnouncements(ctx, []string{"remote1"})
require.NoError(t, err)
assert.Len(t, announcements, 1)
assert.Equal(t, []byte{10, 11, 12}, announcements["remote1"])

announcements, err = dd1.ReadAnnouncements(ctx, []string{"remote1"})
require.NoError(t, err)
assert.Len(t, announcements, 1)
assert.Equal(t, []byte{4, 5, 6}, announcements["remote1"])
})

t.Run("persists data across restarts", func(t *testing.T) {
dd3 := ocrcommon.NewDiscovererDatabase(db, localPeerID1.Raw())

announcements, err := dd3.ReadAnnouncements(ctx, []string{"remote1"})
require.NoError(t, err)
assert.Len(t, announcements, 1)
assert.Equal(t, []byte{4, 5, 6}, announcements["remote1"])
})
type test struct {
name string
dd1 *ocrcommon.DiscovererDatabase
dd2 *ocrcommon.DiscovererDatabase
}

tests := []test{
{
name: "ocr discoverer database",
dd1: ocrcommon.NewOCRDiscovererDatabase(db, localPeerID1.Raw()),
dd2: ocrcommon.NewOCRDiscovererDatabase(db, localPeerID2.Raw()),
},
{
name: "don2don discoverer database",
dd1: ocrcommon.NewDON2DONDiscovererDatabase(db, localPeerID1.Raw()),
dd2: ocrcommon.NewDON2DONDiscovererDatabase(db, localPeerID2.Raw()),
},
}

for _, tt := range tests {
dd1 := tt.dd1
dd2 := tt.dd2

ctx := testutils.Context(t)

t.Run(fmt.Sprintf("%s StoreAnnouncement writes a value", tt.name), func(t *testing.T) {
ann := []byte{1, 2, 3}
err := dd1.StoreAnnouncement(ctx, "remote1", ann)
assert.NoError(t, err)

// test upsert
ann = []byte{4, 5, 6}
err = dd1.StoreAnnouncement(ctx, "remote1", ann)
assert.NoError(t, err)

// write a different value
ann = []byte{7, 8, 9}
err = dd1.StoreAnnouncement(ctx, "remote2", ann)
assert.NoError(t, err)
})

t.Run(fmt.Sprintf("%s ReadAnnouncements reads values filtered by given peerIDs", tt.name), func(t *testing.T) {
announcements, err := dd1.ReadAnnouncements(ctx, []string{"remote1", "remote2"})
require.NoError(t, err)

assert.Len(t, announcements, 2)
assert.Equal(t, []byte{4, 5, 6}, announcements["remote1"])
assert.Equal(t, []byte{7, 8, 9}, announcements["remote2"])

announcements, err = dd1.ReadAnnouncements(ctx, []string{"remote1"})
require.NoError(t, err)

assert.Len(t, announcements, 1)
assert.Equal(t, []byte{4, 5, 6}, announcements["remote1"])
})

t.Run(fmt.Sprintf("%s is scoped to local peer ID", tt.name), func(t *testing.T) {
ann := []byte{10, 11, 12}
err := dd2.StoreAnnouncement(ctx, "remote1", ann)
assert.NoError(t, err)

announcements, err := dd2.ReadAnnouncements(ctx, []string{"remote1"})
require.NoError(t, err)
assert.Len(t, announcements, 1)
assert.Equal(t, []byte{10, 11, 12}, announcements["remote1"])

announcements, err = dd1.ReadAnnouncements(ctx, []string{"remote1"})
require.NoError(t, err)
assert.Len(t, announcements, 1)
assert.Equal(t, []byte{4, 5, 6}, announcements["remote1"])
})

t.Run(fmt.Sprintf("%s persists data across restarts", tt.name), func(t *testing.T) {
dd3 := ocrcommon.NewOCRDiscovererDatabase(db, localPeerID1.Raw())

announcements, err := dd3.ReadAnnouncements(ctx, []string{"remote1"})
require.NoError(t, err)
assert.Len(t, announcements, 1)
assert.Equal(t, []byte{4, 5, 6}, announcements["remote1"])
})
}
}

func mustRandomP2PPeerID(t *testing.T) p2pkey.PeerID {
Expand Down
2 changes: 1 addition & 1 deletion core/services/ocrcommon/peer_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (p *SingletonPeerWrapper) peerConfig() (ocrnetworking.PeerConfig, error) {
}
p.PeerID = key.PeerID()

discovererDB := NewDiscovererDatabase(p.ds, p.PeerID.Raw())
discovererDB := NewOCRDiscovererDatabase(p.ds, p.PeerID.Raw())

config := p.p2pCfg
peerConfig := ocrnetworking.PeerConfig{
Expand Down
26 changes: 15 additions & 11 deletions core/services/p2p/wrapper/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@ import (
"github.com/smartcontractkit/libocr/commontypes"
ragetypes "github.com/smartcontractkit/libocr/ragep2p/types"

"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"

"github.com/smartcontractkit/chainlink/v2/core/config"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore"
"github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon"
"github.com/smartcontractkit/chainlink/v2/core/services/p2p"
"github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
)
Expand All @@ -23,16 +26,18 @@ type peerWrapper struct {
p2pConfig config.P2P
privateKey ed25519.PrivateKey
lggr logger.Logger
ds sqlutil.DataSource
}

var _ types.PeerWrapper = &peerWrapper{}
var _ types.Signer = &peerWrapper{}

func NewExternalPeerWrapper(keystoreP2P keystore.P2P, p2pConfig config.P2P, lggr logger.Logger) *peerWrapper {
func NewExternalPeerWrapper(keystoreP2P keystore.P2P, p2pConfig config.P2P, ds sqlutil.DataSource, lggr logger.Logger) *peerWrapper {
return &peerWrapper{
keystoreP2P: keystoreP2P,
p2pConfig: p2pConfig,
lggr: lggr,
ds: ds,
}
}

Expand All @@ -42,28 +47,27 @@ func (e *peerWrapper) GetPeer() types.Peer {

// convert to "external" P2P PeerConfig, which is independent of OCR
// this has to be done in Start() because keystore is not unlocked at construction time
func convertPeerConfig(keystoreP2P keystore.P2P, p2pConfig config.P2P) (p2p.PeerConfig, error) {
key, err := keystoreP2P.GetOrFirst(p2pConfig.PeerID())
func (e *peerWrapper) convertPeerConfig() (p2p.PeerConfig, error) {
key, err := e.keystoreP2P.GetOrFirst(e.p2pConfig.PeerID())
if err != nil {
return p2p.PeerConfig{}, err
}

// TODO(KS-106): use real DB
discovererDB := p2p.NewInMemoryDiscovererDatabase()
bootstrappers, err := convertBootstrapperLocators(p2pConfig.V2().DefaultBootstrappers())
discovererDB := ocrcommon.NewDON2DONDiscovererDatabase(e.ds, key.PeerID().Raw())
bootstrappers, err := convertBootstrapperLocators(e.p2pConfig.V2().DefaultBootstrappers())
if err != nil {
return p2p.PeerConfig{}, err
}

peerConfig := p2p.PeerConfig{
PrivateKey: key.PrivKey,

ListenAddresses: p2pConfig.V2().ListenAddresses(),
AnnounceAddresses: p2pConfig.V2().AnnounceAddresses(),
ListenAddresses: e.p2pConfig.V2().ListenAddresses(),
AnnounceAddresses: e.p2pConfig.V2().AnnounceAddresses(),
Bootstrappers: bootstrappers,

DeltaReconcile: p2pConfig.V2().DeltaReconcile().Duration(),
DeltaDial: p2pConfig.V2().DeltaDial().Duration(),
DeltaReconcile: e.p2pConfig.V2().DeltaReconcile().Duration(),
DeltaDial: e.p2pConfig.V2().DeltaDial().Duration(),
DiscovererDatabase: discovererDB,

// NOTE: this is equivalent to prometheus.DefaultRegisterer, but we need to use a separate
Expand Down Expand Up @@ -95,7 +99,7 @@ func convertBootstrapperLocators(bootstrappers []commontypes.BootstrapperLocator
}

func (e *peerWrapper) Start(ctx context.Context) error {
cfg, err := convertPeerConfig(e.keystoreP2P, e.p2pConfig)
cfg, err := e.convertPeerConfig()
if err != nil {
return err
}
Expand Down
5 changes: 4 additions & 1 deletion core/services/p2p/wrapper/wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/chainlink"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/p2pkey"
Expand All @@ -18,6 +19,8 @@ import (
)

func TestPeerWrapper_CleanStartClose(t *testing.T) {
db := pgtest.NewSqlxDB(t)

lggr := logger.TestLogger(t)
port := freeport.GetOne(t)
cfg := configtest.NewGeneralConfig(t, func(c *chainlink.Config, s *chainlink.Secrets) {
Expand All @@ -30,7 +33,7 @@ func TestPeerWrapper_CleanStartClose(t *testing.T) {
require.NoError(t, err)
keystoreP2P.On("GetOrFirst", mock.Anything).Return(key, nil)

wrapper := wrapper.NewExternalPeerWrapper(keystoreP2P, cfg.Capabilities().Peering(), lggr)
wrapper := wrapper.NewExternalPeerWrapper(keystoreP2P, cfg.Capabilities().Peering(), db, lggr)
require.NotNil(t, wrapper)
require.NoError(t, wrapper.Start(testutils.Context(t)))
require.NoError(t, wrapper.Close())
Expand Down
14 changes: 14 additions & 0 deletions core/store/migrate/migrations/0240_don2don_discoverer.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
-- +goose Up
-- this migration is for the don2don_discoverer_announcements table
-- it is essentially the same as ocr_discoverer_announcements but scoped to the don2don use case
-- both cases are based on RageP2P library and bootstrap nodes. for now but we want to keep their addresses separate to avoid accidental cross-communication
CREATE TABLE don2don_discoverer_announcements (
local_peer_id text NOT NULL,
remote_peer_id text NOT NULL,
ann bytea NOT NULL,
created_at timestamptz not null,
updated_at timestamptz not null,
PRIMARY KEY(local_peer_id, remote_peer_id)
);
-- +goose Down
DROP TABLE don2don_discoverer_announcements;

0 comments on commit 905830c

Please sign in to comment.