Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MultiNode Integration: Initial Setup #824

Merged
merged 29 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
60673aa
MultiNode integration setup
DylanTinianov Aug 20, 2024
dc85772
Merge branch 'develop' into BCI-2835-integrate-multinode
DylanTinianov Aug 27, 2024
e6683e6
Merge branch 'develop' into BCI-2835-integrate-multinode
DylanTinianov Sep 3, 2024
1c68485
Update MultiNode files
DylanTinianov Sep 3, 2024
c44a6ce
Add MultiNode flag
DylanTinianov Sep 3, 2024
64db86a
Remove internal dependency
DylanTinianov Sep 3, 2024
f7c1bc9
Fix build
DylanTinianov Sep 3, 2024
9e91b47
Fix import cycle
DylanTinianov Sep 4, 2024
354dc50
tidy
DylanTinianov Sep 4, 2024
60c3352
Update client_test.go
DylanTinianov Sep 4, 2024
dcec343
Merge branch 'develop' into BCI-2835-integrate-multinode
DylanTinianov Sep 4, 2024
8e2306b
lint
DylanTinianov Sep 4, 2024
b8d6755
Fix duplicate metrics
DylanTinianov Sep 4, 2024
2cb4d77
Add chain multinode flag
DylanTinianov Sep 5, 2024
0b33b1f
Extend client
DylanTinianov Sep 6, 2024
6641bc9
Merge branch 'develop' into BCI-2835-integrate-multinode
DylanTinianov Sep 6, 2024
d8d312c
Address comments
DylanTinianov Sep 10, 2024
aa3c068
Merge branch 'develop' into BCI-2835-integrate-multinode
DylanTinianov Sep 10, 2024
3c3756e
lint
DylanTinianov Sep 12, 2024
7c8b55d
Merge branch 'develop' into BCI-2835-integrate-multinode
DylanTinianov Sep 12, 2024
2521670
Fix lint overflow issues
DylanTinianov Sep 12, 2024
5b5cfd6
Update transaction_sender.go
DylanTinianov Sep 12, 2024
690f812
Fix lint
DylanTinianov Sep 12, 2024
fd3823b
Validate node config
DylanTinianov Sep 12, 2024
4bf96b7
Update toml.go
DylanTinianov Sep 12, 2024
c1b83a5
Add SendOnly nodes
DylanTinianov Sep 18, 2024
8aa39f6
Use test context
DylanTinianov Sep 19, 2024
0a016db
lint
DylanTinianov Sep 19, 2024
923f8ea
Merge branch 'develop' into BCI-2835-integrate-multinode
aalu1418 Sep 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/go-viper/mapstructure/v2 v2.1.0
github.com/google/uuid v1.6.0
github.com/hashicorp/go-plugin v1.6.2-0.20240829161738-06afb6d7ae99
github.com/jpillora/backoff v1.0.0
github.com/pelletier/go-toml/v2 v2.2.0
github.com/prometheus/client_golang v1.17.0
github.com/smartcontractkit/chainlink-common v0.2.2-0.20240913191949-44d96950c886
Expand Down Expand Up @@ -58,7 +59,6 @@ require (
github.com/hashicorp/go-hclog v1.5.0 // indirect
github.com/hashicorp/yamux v0.1.1 // indirect
github.com/invopop/jsonschema v0.12.0 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.15.15 // indirect
github.com/kr/pretty v0.3.1 // indirect
Expand Down
92 changes: 87 additions & 5 deletions pkg/solana/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"io"
"math/big"
"math/rand"
"strconv"
Expand All @@ -22,6 +23,8 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink-common/pkg/types/core"

mn "github.com/smartcontractkit/chainlink-solana/pkg/solana/client/multinode"

"github.com/smartcontractkit/chainlink-solana/pkg/solana/client"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/config"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/monitor"
Expand Down Expand Up @@ -85,6 +88,10 @@ type chain struct {
balanceMonitor services.Service
lggr logger.Logger

// if multiNode is enabled, the clientCache will not be used
multiNode *mn.MultiNode[mn.StringID, *client.Client]
txSender *mn.TransactionSender[*solanago.Transaction, mn.StringID, *client.Client]

// tracking node chain id for verification
clientCache map[string]*verifiedCachedClient // map URL -> {client, chainId} [mainnet/testnet/devnet/localnet]
clientLock sync.RWMutex
Expand Down Expand Up @@ -114,7 +121,8 @@ func (v *verifiedCachedClient) verifyChainID() (bool, error) {
v.chainIDVerifiedLock.Lock()
defer v.chainIDVerifiedLock.Unlock()

v.chainID, err = v.ReaderWriter.ChainID()
strID, err := v.ReaderWriter.ChainID(context.Background())
v.chainID = strID.String()
if err != nil {
v.chainIDVerified = false
return v.chainIDVerified, fmt.Errorf("failed to fetch ChainID in verifiedCachedClient: %w", err)
Expand Down Expand Up @@ -186,13 +194,13 @@ func (v *verifiedCachedClient) LatestBlockhash() (*rpc.GetLatestBlockhashResult,
return v.ReaderWriter.LatestBlockhash()
}

func (v *verifiedCachedClient) ChainID() (string, error) {
func (v *verifiedCachedClient) ChainID(ctx context.Context) (mn.StringID, error) {
aalu1418 marked this conversation as resolved.
Show resolved Hide resolved
verified, err := v.verifyChainID()
if !verified {
return "", err
}

return v.chainID, nil
return mn.StringID(v.chainID), nil
}

func (v *verifiedCachedClient) GetFeeForMessage(msg string) (uint64, error) {
Expand Down Expand Up @@ -221,6 +229,66 @@ func newChain(id string, cfg *config.TOMLConfig, ks loop.Keystore, lggr logger.L
lggr: logger.Named(lggr, "Chain"),
clientCache: map[string]*verifiedCachedClient{},
}

if cfg.MultiNodeEnabled() {
chainFamily := "solana"

mnCfg := cfg.MultiNodeConfig()

var nodes []mn.Node[mn.StringID, *client.Client]
var sendOnlyNodes []mn.SendOnlyNode[mn.StringID, *client.Client]

for i, nodeInfo := range cfg.ListNodes() {
rpcClient, err := client.NewClient(nodeInfo.URL.String(), cfg, DefaultRequestTimeout, logger.Named(lggr, "Client."+*nodeInfo.Name))
if err != nil {
lggr.Warnw("failed to create client", "name", *nodeInfo.Name, "solana-url", nodeInfo.URL.String(), "err", err.Error())
return nil, fmt.Errorf("failed to create client: %w", err)
}

newNode := mn.NewNode[mn.StringID, *client.Head, *client.Client](
mnCfg, mnCfg, lggr, *nodeInfo.URL.URL(), nil, *nodeInfo.Name,
i, mn.StringID(id), 0, rpcClient, chainFamily)

if nodeInfo.SendOnly {
sendOnlyNodes = append(sendOnlyNodes, newNode)
} else {
nodes = append(nodes, newNode)
}
}

multiNode := mn.NewMultiNode[mn.StringID, *client.Client](
lggr,
mn.NodeSelectionModeRoundRobin,
0,
nodes,
sendOnlyNodes,
mn.StringID(id),
chainFamily,
mnCfg.DeathDeclarationDelay(),
)

// TODO: implement error classification; move logic to separate file if large
// TODO: might be useful to reference anza-xyz/agave@master/sdk/src/transaction/error.rs
classifySendError := func(tx *solanago.Transaction, err error) mn.SendTxReturnCode {
return 0 // TODO ClassifySendError(err, clientErrors, logger.Sugared(logger.Nop()), tx, common.Address{}, false)
}

txSender := mn.NewTransactionSender[*solanago.Transaction, mn.StringID, *client.Client](
lggr,
mn.StringID(id),
chainFamily,
multiNode,
classifySendError,
0, // use the default value provided by the implementation
)

ch.multiNode = multiNode
ch.txSender = txSender

// clientCache will not be used if multinode is enabled
ch.clientCache = nil
}

tc := func() (client.ReaderWriter, error) {
return ch.getClient()
}
Expand Down Expand Up @@ -330,6 +398,10 @@ func (c *chain) ChainID() string {

// getClient returns a client, randomly selecting one from available and valid nodes
func (c *chain) getClient() (client.ReaderWriter, error) {
if c.cfg.MultiNodeEnabled() {
return c.multiNode.SelectRPC()
}

var node *config.Node
var client client.ReaderWriter
nodes := c.cfg.ListNodes()
Expand Down Expand Up @@ -409,7 +481,12 @@ func (c *chain) Start(ctx context.Context) error {
c.lggr.Debug("Starting txm")
c.lggr.Debug("Starting balance monitor")
var ms services.MultiStart
return ms.Start(ctx, c.txm, c.balanceMonitor)
startAll := []services.StartClose{c.txm, c.balanceMonitor}
if c.cfg.MultiNodeEnabled() {
c.lggr.Debug("Starting multinode")
startAll = append(startAll, c.multiNode, c.txSender)
}
return ms.Start(ctx, startAll...)
})
}

Expand All @@ -418,7 +495,12 @@ func (c *chain) Close() error {
c.lggr.Debug("Stopping")
c.lggr.Debug("Stopping txm")
c.lggr.Debug("Stopping balance monitor")
return services.CloseAll(c.txm, c.balanceMonitor)
closeAll := []io.Closer{c.txm, c.balanceMonitor}
if c.cfg.MultiNodeEnabled() {
c.lggr.Debug("Stopping multinode")
closeAll = append(closeAll, c.multiNode, c.txSender)
}
return services.CloseAll(closeAll...)
})
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/solana/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/config"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"

"github.com/smartcontractkit/chainlink-solana/pkg/solana/client"
solcfg "github.com/smartcontractkit/chainlink-solana/pkg/solana/config"
Expand Down Expand Up @@ -174,7 +175,7 @@ func TestSolanaChain_VerifiedClient(t *testing.T) {
testChain.id = "incorrect"
c, err = testChain.verifiedClient(node)
assert.NoError(t, err)
_, err = c.ChainID()
_, err = c.ChainID(tests.Context(t))
// expect error from id mismatch (even if using a cached client) when performing RPC calls
assert.Error(t, err)
assert.Equal(t, fmt.Sprintf("client returned mismatched chain id (expected: %s, got: %s): %s", "incorrect", "devnet", node.URL), err.Error())
Expand Down
82 changes: 78 additions & 4 deletions pkg/solana/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ import (
"context"
"errors"
"fmt"
"math/big"
"time"

mn "github.com/smartcontractkit/chainlink-solana/pkg/solana/client/multinode"

"github.com/gagliardetto/solana-go"
"github.com/gagliardetto/solana-go/rpc"
"golang.org/x/sync/singleflight"
Expand Down Expand Up @@ -33,7 +36,7 @@ type Reader interface {
Balance(addr solana.PublicKey) (uint64, error)
SlotHeight() (uint64, error)
LatestBlockhash() (*rpc.GetLatestBlockhashResult, error)
ChainID() (string, error)
ChainID(ctx context.Context) (mn.StringID, error)
GetFeeForMessage(msg string) (uint64, error)
GetLatestBlock() (*rpc.GetBlockResult, error)
}
Expand Down Expand Up @@ -65,6 +68,27 @@ type Client struct {
requestGroup *singleflight.Group
}

type Head struct {
rpc.GetBlockResult
}

func (h *Head) BlockNumber() int64 {
if !h.IsValid() {
return 0
}
// nolint:gosec
// G115: integer overflow conversion uint64 -> int64
return int64(*h.BlockHeight)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

random thought: why are we using a signed int as a block number and not a uint? can block numbers be negative?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I was thinking about this too. Will have to ask Dmytro if there's a reason when he's back, although I've never heard of negative block numbers. I was looking into using unsigned int in the Chainlink repo, but I didn't want to increase the scope of my MultiNode PR.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to be a legacy from the initial iteration to create generalized components. I'm not sure why it was defined as int64.
I agree with Dylan that we should keep this change out of scope for the MultiNode PR

}

func (h *Head) BlockDifficulty() *big.Int {
return nil
}

func (h *Head) IsValid() bool {
return h.BlockHeight != nil
}

func NewClient(endpoint string, cfg config.Config, requestTimeout time.Duration, log logger.Logger) (*Client, error) {
return &Client{
url: endpoint,
Expand All @@ -79,6 +103,56 @@ func NewClient(endpoint string, cfg config.Config, requestTimeout time.Duration,
}, nil
}

var _ mn.RPCClient[mn.StringID, *Head] = (*Client)(nil)
var _ mn.SendTxRPCClient[*solana.Transaction] = (*Client)(nil)

// TODO: BCI-4061: Implement Client for MultiNode

func (c *Client) Dial(ctx context.Context) error {
//TODO implement me
panic("implement me")
}

func (c *Client) SubscribeToHeads(ctx context.Context) (<-chan *Head, mn.Subscription, error) {
//TODO implement me
panic("implement me")
}

func (c *Client) SubscribeToFinalizedHeads(ctx context.Context) (<-chan *Head, mn.Subscription, error) {
//TODO implement me
panic("implement me")
}
Comment on lines +111 to +124
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this require WS support?

i thought we were slowly moving away from it because it's not reliable? we never implemented WS support because it had issues with memory leaks and instead went with a polling mechanism

Copy link
Contributor Author

@DylanTinianov DylanTinianov Sep 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not require WS support and polling will be used for the implementation.


func (c *Client) Ping(ctx context.Context) error {
//TODO implement me
panic("implement me")
}

func (c *Client) IsSyncing(ctx context.Context) (bool, error) {
//TODO implement me
panic("implement me")
}

func (c *Client) UnsubscribeAllExcept(subs ...mn.Subscription) {
//TODO implement me
panic("implement me")
}

func (c *Client) Close() {
//TODO implement me
panic("implement me")
}

func (c *Client) GetInterceptedChainInfo() (latest, highestUserObservations mn.ChainInfo) {
//TODO implement me
panic("implement me")
}

func (c *Client) SendTransaction(ctx context.Context, tx *solana.Transaction) error {
// TODO: Implement
return nil
}

func (c *Client) latency(name string) func() {
start := time.Now()
return func() {
Expand Down Expand Up @@ -142,11 +216,11 @@ func (c *Client) LatestBlockhash() (*rpc.GetLatestBlockhashResult, error) {
return v.(*rpc.GetLatestBlockhashResult), err
}

func (c *Client) ChainID() (string, error) {
func (c *Client) ChainID(ctx context.Context) (mn.StringID, error) {
done := c.latency("chain_id")
defer done()

ctx, cancel := context.WithTimeout(context.Background(), c.contextDuration)
ctx, cancel := context.WithTimeout(ctx, c.contextDuration)
defer cancel()
v, err, _ := c.requestGroup.Do("GetGenesisHash", func() (interface{}, error) {
return c.rpc.GetGenesisHash(ctx)
Expand All @@ -168,7 +242,7 @@ func (c *Client) ChainID() (string, error) {
c.log.Warnf("unknown genesis hash - assuming solana chain is 'localnet'")
network = "localnet"
}
return network, nil
return mn.StringID(network), nil
}

func (c *Client) GetFeeForMessage(msg string) (uint64, error) {
Expand Down
9 changes: 5 additions & 4 deletions pkg/solana/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/logger"

mn "github.com/smartcontractkit/chainlink-solana/pkg/solana/client/multinode"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/config"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/monitor"
)
Expand Down Expand Up @@ -76,9 +77,9 @@ func TestClient_Reader_Integration(t *testing.T) {
assert.Equal(t, uint64(5000), fee)

// get chain ID based on gensis hash
network, err := c.ChainID()
network, err := c.ChainID(context.Background())
assert.NoError(t, err)
assert.Equal(t, "localnet", network)
assert.Equal(t, mn.StringID("localnet"), network)

// get account info (also tested inside contract_test)
res, err := c.GetAccountInfoWithOpts(context.TODO(), solana.PublicKey{}, &rpc.GetAccountInfoOpts{Commitment: rpc.CommitmentFinalized})
Expand Down Expand Up @@ -120,9 +121,9 @@ func TestClient_Reader_ChainID(t *testing.T) {

// get chain ID based on gensis hash
for _, n := range networks {
network, err := c.ChainID()
network, err := c.ChainID(context.Background())
assert.NoError(t, err)
assert.Equal(t, n, network)
assert.Equal(t, mn.StringID(n), network)
}
}

Expand Down
Loading
Loading