Skip to content

Commit

Permalink
Syncer (#13427)
Browse files Browse the repository at this point in the history
* [KS-211] Implement basic syncer

* Rename config entry Capabilities.Registry -> Capabilities.ExternalRegistry

* Drop --gen-go-grpc options as they aren't needed

* Fully-qualified proto name

* Some more comments

* Correctly set ExternalRegistry defaults

* Add logging

* Unpad signatures

* Fix tests

* Add changeset

* Correctly close syncer in tests

* Client: remoteDON -> myDON
  • Loading branch information
cedric-cordenier authored Jun 13, 2024
1 parent a8e766f commit 66f1547
Show file tree
Hide file tree
Showing 38 changed files with 2,429 additions and 751 deletions.
5 changes: 5 additions & 0 deletions .changeset/shaggy-ears-share.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#internal Add RegistrySyncer
110 changes: 110 additions & 0 deletions core/capabilities/reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package capabilities

import (
"context"
"encoding/json"

"github.com/smartcontractkit/chainlink-common/pkg/types"
kcr "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/keystone/generated/keystone_capability_registry"
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
evmrelaytypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types"
)

type remoteRegistryReader struct {
r types.ContractReader
}

var _ reader = (*remoteRegistryReader)(nil)

type hashedCapabilityID [32]byte
type donID uint32

type state struct {
IDsToDONs map[donID]kcr.CapabilityRegistryDONInfo
IDsToNodes map[p2ptypes.PeerID]kcr.CapabilityRegistryNodeInfo
IDsToCapabilities map[hashedCapabilityID]kcr.CapabilityRegistryCapability
}

func (r *remoteRegistryReader) state(ctx context.Context) (state, error) {
dons := []kcr.CapabilityRegistryDONInfo{}
err := r.r.GetLatestValue(ctx, "capabilityRegistry", "getDONs", nil, &dons)
if err != nil {
return state{}, err
}

idsToDONs := map[donID]kcr.CapabilityRegistryDONInfo{}
for _, d := range dons {
idsToDONs[donID(d.Id)] = d
}

caps := kcr.GetCapabilities{}
err = r.r.GetLatestValue(ctx, "capabilityRegistry", "getCapabilities", nil, &caps)
if err != nil {
return state{}, err
}

idsToCapabilities := map[hashedCapabilityID]kcr.CapabilityRegistryCapability{}
for i, c := range caps.Capabilities {
idsToCapabilities[caps.HashedCapabilityIds[i]] = c
}

nodes := &kcr.GetNodes{}
err = r.r.GetLatestValue(ctx, "capabilityRegistry", "getNodes", nil, &nodes)
if err != nil {
return state{}, err
}

idsToNodes := map[p2ptypes.PeerID]kcr.CapabilityRegistryNodeInfo{}
for _, node := range nodes.NodeInfo {
idsToNodes[node.P2pId] = node
}

return state{IDsToDONs: idsToDONs, IDsToCapabilities: idsToCapabilities, IDsToNodes: idsToNodes}, nil
}

type contractReaderFactory interface {
NewContractReader(context.Context, []byte) (types.ContractReader, error)
}

func newRemoteRegistryReader(ctx context.Context, relayer contractReaderFactory, remoteRegistryAddress string) (*remoteRegistryReader, error) {
contractReaderConfig := evmrelaytypes.ChainReaderConfig{
Contracts: map[string]evmrelaytypes.ChainContractReader{
"capabilityRegistry": {
ContractABI: kcr.CapabilityRegistryABI,
Configs: map[string]*evmrelaytypes.ChainReaderDefinition{
"getDONs": {
ChainSpecificName: "getDONs",
},
"getCapabilities": {
ChainSpecificName: "getCapabilities",
},
"getNodes": {
ChainSpecificName: "getNodes",
},
},
},
},
}

contractReaderConfigEncoded, err := json.Marshal(contractReaderConfig)
if err != nil {
return nil, err
}

cr, err := relayer.NewContractReader(ctx, contractReaderConfigEncoded)
if err != nil {
return nil, err
}

err = cr.Bind(ctx, []types.BoundContract{
{
Address: remoteRegistryAddress,
Name: "capabilityRegistry",
},
})
if err != nil {
return nil, err
}

return &remoteRegistryReader{r: cr}, err
}
210 changes: 210 additions & 0 deletions core/capabilities/reader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
package capabilities

import (
"context"
"crypto/rand"
"encoding/json"
"fmt"
"math/big"
"testing"
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/accounts/abi/bind/backends"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/types"
evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
kcr "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/keystone/generated/keystone_capability_registry"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm"
evmrelaytypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types"
)

var writeChainCapability = kcr.CapabilityRegistryCapability{
LabelledName: "write-chain",
Version: "1.0.1",
ResponseType: uint8(1),
}

func startNewChainWithRegistry(t *testing.T) (*kcr.CapabilityRegistry, common.Address, *bind.TransactOpts, *backends.SimulatedBackend) {
owner := testutils.MustNewSimTransactor(t)

oneEth, _ := new(big.Int).SetString("100000000000000000000", 10)
gasLimit := ethconfig.Defaults.Miner.GasCeil * 2 // 60 M blocks

simulatedBackend := backends.NewSimulatedBackend(core.GenesisAlloc{owner.From: {
Balance: oneEth,
}}, gasLimit)
simulatedBackend.Commit()

capabilityRegistryAddress, _, capabilityRegistry, err := kcr.DeployCapabilityRegistry(owner, simulatedBackend)
require.NoError(t, err, "DeployCapabilityRegistry failed")

fmt.Println("Deployed CapabilityRegistry at", capabilityRegistryAddress.Hex())
simulatedBackend.Commit()

return capabilityRegistry, capabilityRegistryAddress, owner, simulatedBackend
}

type crFactory struct {
lggr logger.Logger
logPoller logpoller.LogPoller
client evmclient.Client
}

func (c *crFactory) NewContractReader(ctx context.Context, cfg []byte) (types.ContractReader, error) {
crCfg := &evmrelaytypes.ChainReaderConfig{}
if err := json.Unmarshal(cfg, crCfg); err != nil {
return nil, err
}
svc, err := evm.NewChainReaderService(ctx, c.lggr, c.logPoller, c.client, *crCfg)
if err != nil {
return nil, err
}

return svc, svc.Start(ctx)
}

func newContractReaderFactory(t *testing.T, simulatedBackend *backends.SimulatedBackend) *crFactory {
lggr := logger.TestLogger(t)
client := evmclient.NewSimulatedBackendClient(
t,
simulatedBackend,
testutils.SimulatedChainID,
)
db := pgtest.NewSqlxDB(t)
lp := logpoller.NewLogPoller(
logpoller.NewORM(testutils.SimulatedChainID, db, lggr),
client,
lggr,
logpoller.Opts{
PollPeriod: 100 * time.Millisecond,
FinalityDepth: 2,
BackfillBatchSize: 3,
RpcBatchSize: 2,
KeepFinalizedBlocksDepth: 1000,
},
)
return &crFactory{
lggr: lggr,
client: client,
logPoller: lp,
}
}

func randomWord() [32]byte {
word := make([]byte, 32)
_, err := rand.Read(word)
if err != nil {
panic(err)
}
return [32]byte(word)
}

func TestReader_Integration(t *testing.T) {
ctx := testutils.Context(t)
reg, regAddress, owner, sim := startNewChainWithRegistry(t)

_, err := reg.AddCapabilities(owner, []kcr.CapabilityRegistryCapability{writeChainCapability})
require.NoError(t, err, "AddCapability failed for %s", writeChainCapability.LabelledName)
sim.Commit()

cid, err := reg.GetHashedCapabilityId(&bind.CallOpts{}, writeChainCapability.LabelledName, writeChainCapability.Version)
require.NoError(t, err)

_, err = reg.AddNodeOperators(owner, []kcr.CapabilityRegistryNodeOperator{
{
Admin: owner.From,
Name: "TEST_NOP",
},
})
require.NoError(t, err)

nodeSet := [][32]byte{
randomWord(),
randomWord(),
randomWord(),
}

nodes := []kcr.CapabilityRegistryNodeInfo{
{
// The first NodeOperatorId has id 1 since the id is auto-incrementing.
NodeOperatorId: uint32(1),
Signer: randomWord(),
P2pId: nodeSet[0],
HashedCapabilityIds: [][32]byte{cid},
},
{
// The first NodeOperatorId has id 1 since the id is auto-incrementing.
NodeOperatorId: uint32(1),
Signer: randomWord(),
P2pId: nodeSet[1],
HashedCapabilityIds: [][32]byte{cid},
},
{
// The first NodeOperatorId has id 1 since the id is auto-incrementing.
NodeOperatorId: uint32(1),
Signer: randomWord(),
P2pId: nodeSet[2],
HashedCapabilityIds: [][32]byte{cid},
},
}
_, err = reg.AddNodes(owner, nodes)
require.NoError(t, err)

cfgs := []kcr.CapabilityRegistryCapabilityConfiguration{
{
CapabilityId: cid,
Config: []byte(`{"hello": "world"}`),
},
}
_, err = reg.AddDON(
owner,
nodeSet,
cfgs,
true,
true,
1,
)
sim.Commit()

require.NoError(t, err)

factory := newContractReaderFactory(t, sim)
reader, err := newRemoteRegistryReader(ctx, factory, regAddress.Hex())
require.NoError(t, err)

s, err := reader.state(ctx)
require.NoError(t, err)
assert.Len(t, s.IDsToCapabilities, 1)

gotCap := s.IDsToCapabilities[cid]
assert.Equal(t, writeChainCapability, gotCap)

assert.Len(t, s.IDsToDONs, 1)
assert.Equal(t, kcr.CapabilityRegistryDONInfo{
Id: 1, // initial Id
ConfigCount: 1, // initial Count
IsPublic: true,
AcceptsWorkflows: true,
F: 1,
NodeP2PIds: nodeSet,
CapabilityConfigurations: cfgs,
}, s.IDsToDONs[1])

assert.Len(t, s.IDsToNodes, 3)
assert.Equal(t, map[p2ptypes.PeerID]kcr.CapabilityRegistryNodeInfo{
nodeSet[0]: nodes[0],
nodeSet[1]: nodes[1],
nodeSet[2]: nodes[2],
}, s.IDsToNodes)
}
7 changes: 6 additions & 1 deletion core/capabilities/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@ package capabilities

import (
"context"
"errors"
"fmt"
"sync"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink/v2/core/logger"
)

var (
ErrCapabilityAlreadyExists = errors.New("capability already exists")
)

// Registry is a struct for the registry of capabilities.
// Registry is safe for concurrent use.
type Registry struct {
Expand Down Expand Up @@ -141,7 +146,7 @@ func (r *Registry) Add(ctx context.Context, c capabilities.BaseCapability) error
id := info.ID
_, ok := r.m[id]
if ok {
return fmt.Errorf("capability with id: %s already exists", id)
return fmt.Errorf("%w: id %s found in registry", ErrCapabilityAlreadyExists, id)
}

r.m[id] = c
Expand Down
3 changes: 2 additions & 1 deletion core/capabilities/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package capabilities_test

import (
"context"
"errors"
"fmt"
"testing"

Expand Down Expand Up @@ -85,7 +86,7 @@ func TestRegistry_NoDuplicateIDs(t *testing.T) {
c2 := &mockCapability{CapabilityInfo: ci}

err = r.Add(ctx, c2)
assert.ErrorContains(t, err, "capability with id: [email protected] already exists")
assert.True(t, errors.Is(err, coreCapabilities.ErrCapabilityAlreadyExists))
}

func TestRegistry_ChecksExecutionAPIByType(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions core/capabilities/remote/trigger_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
//
// TriggerPublisher communicates with corresponding TriggerSubscribers on remote nodes.
type triggerPublisher struct {
config types.RemoteTriggerConfig
config *types.RemoteTriggerConfig
underlying commoncap.TriggerCapability
capInfo commoncap.CapabilityInfo
capDonInfo commoncap.DON
Expand All @@ -48,7 +48,7 @@ type pubRegState struct {
var _ types.Receiver = &triggerPublisher{}
var _ services.Service = &triggerPublisher{}

func NewTriggerPublisher(config types.RemoteTriggerConfig, underlying commoncap.TriggerCapability, capInfo commoncap.CapabilityInfo, capDonInfo commoncap.DON, workflowDONs map[string]commoncap.DON, dispatcher types.Dispatcher, lggr logger.Logger) *triggerPublisher {
func NewTriggerPublisher(config *types.RemoteTriggerConfig, underlying commoncap.TriggerCapability, capInfo commoncap.CapabilityInfo, capDonInfo commoncap.DON, workflowDONs map[string]commoncap.DON, dispatcher types.Dispatcher, lggr logger.Logger) *triggerPublisher {
config.ApplyDefaults()
return &triggerPublisher{
config: config,
Expand Down
2 changes: 1 addition & 1 deletion core/capabilities/remote/trigger_publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestTriggerPublisher_Register(t *testing.T) {
}

dispatcher := remoteMocks.NewDispatcher(t)
config := remotetypes.RemoteTriggerConfig{
config := &remotetypes.RemoteTriggerConfig{
RegistrationRefreshMs: 100,
RegistrationExpiryMs: 100_000,
MinResponsesToAggregate: 1,
Expand Down
Loading

0 comments on commit 66f1547

Please sign in to comment.