Skip to content

Commit

Permalink
Refactor JD container creation (#14664)
Browse files Browse the repository at this point in the history
* refactor jd container creation

* fixes

* fix lint

* fix flakey test

* remove retry from chain config creation

* fix

---------

Co-authored-by: Connor Stein <[email protected]>
  • Loading branch information
AnieeG and connorwstein authored Oct 7, 2024
1 parent 67d4939 commit 4c58f67
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 88 deletions.
32 changes: 8 additions & 24 deletions integration-tests/deployment/devenv/build_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (

ctf_config "github.com/smartcontractkit/chainlink-testing-framework/lib/config"
ctftestenv "github.com/smartcontractkit/chainlink-testing-framework/lib/docker/test_env"
"github.com/smartcontractkit/chainlink-testing-framework/lib/docker/test_env/job_distributor"
"github.com/smartcontractkit/chainlink-testing-framework/lib/networks"
"github.com/smartcontractkit/chainlink-testing-framework/lib/utils/ptr"

Expand Down Expand Up @@ -70,6 +69,7 @@ func CreateDockerEnv(t *testing.T) (
builder := test_env.NewCLTestEnvBuilder().
WithTestConfig(&cfg).
WithTestInstance(t).
WithJobDistributor(cfg.CCIP.JobDistributorConfig).
WithStandardCleanup()

// if private ethereum networks are provided, we will use them to create the test environment
Expand All @@ -82,37 +82,21 @@ func CreateDockerEnv(t *testing.T) (

chains := CreateChainConfigFromNetworks(t, env, privateEthereumNetworks, cfg.GetNetworkConfig())

var jdConfig JDConfig
jdConfig := JDConfig{
GRPC: cfg.CCIP.JobDistributorConfig.GetJDGRPC(),
WSRPC: cfg.CCIP.JobDistributorConfig.GetJDWSRPC(),
}
// TODO : move this as a part of test_env setup with an input in testconfig
// if JD is not provided, we will spin up a new JD
if cfg.CCIP.GetJDGRPC() == "" && cfg.CCIP.GetJDWSRPC() == "" {
jdDB, err := ctftestenv.NewPostgresDb(
[]string{env.DockerNetwork.Name},
ctftestenv.WithPostgresDbName(cfg.CCIP.GetJDDBName()),
ctftestenv.WithPostgresImageVersion(cfg.CCIP.GetJDDBVersion()),
)
require.NoError(t, err)
err = jdDB.StartContainer()
require.NoError(t, err)

jd := job_distributor.New([]string{env.DockerNetwork.Name},
job_distributor.WithImage(cfg.CCIP.GetJDImage()),
job_distributor.WithVersion(cfg.CCIP.GetJDVersion()),
job_distributor.WithDBURL(jdDB.InternalURL.String()),
)
err = jd.StartContainer()
require.NoError(t, err)
if jdConfig.GRPC == "" || jdConfig.WSRPC == "" {
jd := env.JobDistributor
require.NotNil(t, jd, "JD is not found in test environment")
jdConfig = JDConfig{
GRPC: jd.Grpc,
// we will use internal wsrpc for nodes on same docker network to connect to JD
WSRPC: jd.InternalWSRPC,
Creds: insecure.NewCredentials(),
}
} else {
jdConfig = JDConfig{
GRPC: cfg.CCIP.GetJDGRPC(),
WSRPC: cfg.CCIP.GetJDWSRPC(),
}
}
require.NotEmpty(t, jdConfig, "JD config is empty")

Expand Down
93 changes: 45 additions & 48 deletions integration-tests/deployment/devenv/don.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,55 +215,33 @@ func (n *Node) CreateCCIPOCRSupportedChains(ctx context.Context, chains []JDChai
}
}
// JD silently fails to update nodeChainConfig. Therefore, we fetch the node config and
// if it's not updated , we retry creating the chain config.
// as a workaround, we keep trying creating the chain config for 5 times until it's created
retryCount := 1
created := false
for retryCount < 5 {
chainConfigId, err := n.gqlClient.CreateJobDistributorChainConfig(ctx, client.JobDistributorChainConfigInput{
JobDistributorID: n.JDId,
ChainID: chainId,
ChainType: chain.ChainType,
AccountAddr: pointer.GetString(accountAddr),
AdminAddr: n.adminAddr,
Ocr2Enabled: true,
Ocr2IsBootstrap: isBootstrap,
Ocr2Multiaddr: n.multiAddr,
Ocr2P2PPeerID: pointer.GetString(peerID),
Ocr2KeyBundleID: ocr2BundleId,
Ocr2Plugins: `{"commit":true,"execute":true,"median":false,"mercury":false}`,
})
if err != nil {
return fmt.Errorf("failed to create CCIPOCR2SupportedChains for node %s: %w", n.Name, err)
}
// JD doesn't update the node chain config immediately, so we need to wait for it to be updated
err = retry.Do(ctx, retry.WithMaxRetries(3, retry.NewFibonacci(1*time.Second)), func(ctx context.Context) error {
nodeChainConfigs, err := jd.ListNodeChainConfigs(context.Background(), &nodev1.ListNodeChainConfigsRequest{
Filter: &nodev1.ListNodeChainConfigsRequest_Filter{
NodeIds: []string{n.NodeId},
}})
if err != nil {
return fmt.Errorf("failed to list node chain configs for node %s: %w", n.Name, err)
}
if nodeChainConfigs != nil && len(nodeChainConfigs.ChainConfigs) == i+1 {
return nil
}
return fmt.Errorf("node chain config not updated properly")
})
if err == nil {
created = true
break
}
// delete the node chain config if it's not updated properly and retry
err = n.gqlClient.DeleteJobDistributorChainConfig(ctx, chainConfigId)
if err != nil {
return fmt.Errorf("failed to delete job distributor chain config for node %s: %w", n.Name, err)
}

retryCount++
// if it's not updated , throw an error
_, err = n.gqlClient.CreateJobDistributorChainConfig(ctx, client.JobDistributorChainConfigInput{
JobDistributorID: n.JDId,
ChainID: chainId,
ChainType: chain.ChainType,
AccountAddr: pointer.GetString(accountAddr),
AdminAddr: n.adminAddr,
Ocr2Enabled: true,
Ocr2IsBootstrap: isBootstrap,
Ocr2Multiaddr: n.multiAddr,
Ocr2P2PPeerID: pointer.GetString(peerID),
Ocr2KeyBundleID: ocr2BundleId,
Ocr2Plugins: `{"commit":true,"execute":true,"median":false,"mercury":false}`,
})
if err != nil {
return fmt.Errorf("failed to create CCIPOCR2SupportedChains for node %s: %w", n.Name, err)
}
if !created {
return fmt.Errorf("failed to create CCIPOCR2SupportedChains for node %s", n.Name)
// query the node chain config to check if it's created
nodeChainConfigs, err := jd.ListNodeChainConfigs(context.Background(), &nodev1.ListNodeChainConfigsRequest{
Filter: &nodev1.ListNodeChainConfigsRequest_Filter{
NodeIds: []string{n.NodeId},
}})
if err != nil {
return fmt.Errorf("failed to list node chain configs for node %s: %w", n.Name, err)
}
if nodeChainConfigs == nil || len(nodeChainConfigs.ChainConfigs) < i+1 {
return fmt.Errorf("failed to create chain config for node %s", n.Name)
}
}
return nil
Expand Down Expand Up @@ -358,6 +336,25 @@ func (n *Node) SetUpAndLinkJobDistributor(ctx context.Context, jd JobDistributor
if err != nil {
return err
}
// wait for the node to connect to the job distributor
err = retry.Do(ctx, retry.WithMaxDuration(1*time.Minute, retry.NewFibonacci(1*time.Second)), func(ctx context.Context) error {
getRes, err := jd.GetNode(ctx, &nodev1.GetNodeRequest{
Id: n.NodeId,
})
if err != nil {
return fmt.Errorf("failed to get node %s: %w", n.Name, err)
}
if getRes.GetNode() == nil {
return fmt.Errorf("no node found for node id %s", n.NodeId)
}
if !getRes.GetNode().IsConnected {
return retry.RetryableError(fmt.Errorf("node %s not connected to job distributor", n.Name))
}
return nil
})
if err != nil {
return fmt.Errorf("failed to connect node %s to job distributor: %w", n.Name, err)
}
n.JDId = id
return nil
}
Expand Down
32 changes: 32 additions & 0 deletions integration-tests/docker/test_env/test_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@ import (
"github.com/rs/zerolog/log"
tc "github.com/testcontainers/testcontainers-go"

"github.com/smartcontractkit/chainlink-testing-framework/lib/docker/test_env/job_distributor"

"github.com/smartcontractkit/chainlink-testing-framework/lib/blockchain"
ctf_config "github.com/smartcontractkit/chainlink-testing-framework/lib/config"
"github.com/smartcontractkit/chainlink-testing-framework/lib/docker"
"github.com/smartcontractkit/chainlink-testing-framework/lib/docker/test_env"
"github.com/smartcontractkit/chainlink-testing-framework/lib/logging"
"github.com/smartcontractkit/chainlink-testing-framework/lib/logstream"
"github.com/smartcontractkit/chainlink-testing-framework/lib/utils/runid"

"github.com/smartcontractkit/chainlink/integration-tests/testconfig/ccip"
"github.com/smartcontractkit/chainlink/v2/core/services/chainlink"

d "github.com/smartcontractkit/chainlink/integration-tests/docker"
Expand All @@ -41,6 +45,7 @@ type CLClusterTestEnv struct {
PrivateEthereumConfigs []*ctf_config.EthereumNetworkConfig
EVMNetworks []*blockchain.EVMNetwork
rpcProviders map[int64]*test_env.RpcProvider
JobDistributor *job_distributor.Component
l zerolog.Logger
t *testing.T
isSimulatedNetwork bool
Expand Down Expand Up @@ -109,6 +114,33 @@ func (te *CLClusterTestEnv) StartEthereumNetwork(cfg *ctf_config.EthereumNetwork
return n, rpc, nil
}

func (te *CLClusterTestEnv) StartJobDistributor(cfg *ccip.JDConfig) error {
jdDB, err := test_env.NewPostgresDb(
[]string{te.DockerNetwork.Name},
test_env.WithPostgresDbName(cfg.GetJDDBName()),
test_env.WithPostgresImageVersion(cfg.GetJDDBVersion()),
)
if err != nil {
return fmt.Errorf("failed to create postgres db for job-distributor: %w", err)
}
err = jdDB.StartContainer()
if err != nil {
return fmt.Errorf("failed to start postgres db for job-distributor: %w", err)
}
jd := job_distributor.New([]string{te.DockerNetwork.Name},
job_distributor.WithImage(cfg.GetJDImage()),
job_distributor.WithVersion(cfg.GetJDVersion()),
job_distributor.WithDBURL(jdDB.InternalURL.String()),
)
jd.LogStream = te.LogStream
err = jd.StartContainer()
if err != nil {
return fmt.Errorf("failed to start job-distributor: %w", err)
}
te.JobDistributor = jd
return nil
}

func (te *CLClusterTestEnv) StartMockAdapter() error {
return te.MockAdapter.StartContainer()
}
Expand Down
14 changes: 14 additions & 0 deletions integration-tests/docker/test_env/test_env_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/smartcontractkit/chainlink-testing-framework/lib/testsummary"
"github.com/smartcontractkit/chainlink-testing-framework/lib/utils/osutil"

"github.com/smartcontractkit/chainlink/integration-tests/testconfig/ccip"
"github.com/smartcontractkit/chainlink/integration-tests/types/config/node"
"github.com/smartcontractkit/chainlink/v2/core/services/chainlink"
)
Expand All @@ -46,6 +47,7 @@ type ChainlinkNodeLogScannerSettings struct {
type CLTestEnvBuilder struct {
hasLogStream bool
hasKillgrave bool
jdConfig *ccip.JDConfig
clNodeConfig *chainlink.Config
secretsConfig string
clNodesCount int
Expand Down Expand Up @@ -210,6 +212,11 @@ func (b *CLTestEnvBuilder) WithCustomCleanup(customFn func()) *CLTestEnvBuilder
return b
}

func (b *CLTestEnvBuilder) WithJobDistributor(cfg ccip.JDConfig) *CLTestEnvBuilder {
b.jdConfig = &cfg
return b
}

type EVMNetworkOption = func(*blockchain.EVMNetwork) *blockchain.EVMNetwork

// WithEVMNetworkOptions sets the options for the EVM network. This is especially useful for simulated networks, which
Expand Down Expand Up @@ -396,6 +403,12 @@ func (b *CLTestEnvBuilder) Build() (*CLClusterTestEnv, error) {
log.Warn().Msg("Chainlink node log scanner settings provided, but LogStream is not enabled. Ignoring Chainlink node log scanner settings, as no logs will be available.")
}

if b.jdConfig != nil {
err := b.te.StartJobDistributor(b.jdConfig)
if err != nil {
return nil, err
}
}
// in this case we will use the builder only to start chains, not the cluster, because currently we support only 1 network config per cluster
if len(b.privateEthereumNetworks) > 1 {
b.te.rpcProviders = make(map[int64]*test_env.RpcProvider)
Expand Down Expand Up @@ -550,6 +563,7 @@ func (b *CLTestEnvBuilder) Build() (*CLClusterTestEnv, error) {
b.l.Info().
Str("privateEthereumNetwork", enDesc).
Bool("hasKillgrave", b.hasKillgrave).
Bool("hasJobDistributor", b.jdConfig != nil).
Int("clNodesCount", b.clNodesCount).
Strs("customNodeCsaKeys", b.customNodeCsaKeys).
Strs("defaultNodeCsaKeys", b.defaultNodeCsaKeys).
Expand Down
1 change: 1 addition & 0 deletions integration-tests/docker/test_env/test_env_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"

ctf_config "github.com/smartcontractkit/chainlink-testing-framework/lib/config"

env "github.com/smartcontractkit/chainlink/integration-tests/types/envcommon"
)

Expand Down
32 changes: 16 additions & 16 deletions integration-tests/testconfig/ccip/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,59 +51,59 @@ type JDConfig struct {
JDWSRPC *string `toml:",omitempty"`
}

func (o *Config) Validate() error {
return nil
}

// TODO: include all JD specific input in generic secret handling
func (o *Config) GetJDGRPC() string {
grpc := pointer.GetString(o.JobDistributorConfig.JDGRPC)
func (o *JDConfig) GetJDGRPC() string {
grpc := pointer.GetString(o.JDGRPC)
if grpc == "" {
return ctfconfig.MustReadEnvVar_String(E2E_JD_GRPC)
}
return grpc
}

func (o *Config) GetJDWSRPC() string {
wsrpc := pointer.GetString(o.JobDistributorConfig.JDWSRPC)
func (o *JDConfig) GetJDWSRPC() string {
wsrpc := pointer.GetString(o.JDWSRPC)
if wsrpc == "" {
return ctfconfig.MustReadEnvVar_String(E2E_JD_WSRPC)
}
return wsrpc
}

func (o *Config) GetJDImage() string {
image := pointer.GetString(o.JobDistributorConfig.Image)
func (o *JDConfig) GetJDImage() string {
image := pointer.GetString(o.Image)
if image == "" {
return ctfconfig.MustReadEnvVar_String(E2E_JD_IMAGE)
}
return image
}

func (o *Config) GetJDVersion() string {
version := pointer.GetString(o.JobDistributorConfig.Version)
func (o *JDConfig) GetJDVersion() string {
version := pointer.GetString(o.Version)
if version == "" {
return ctfconfig.MustReadEnvVar_String(E2E_JD_VERSION)
}
return version
}

func (o *Config) GetJDDBName() string {
dbname := pointer.GetString(o.JobDistributorConfig.DBName)
func (o *JDConfig) GetJDDBName() string {
dbname := pointer.GetString(o.DBName)
if dbname == "" {
return DEFAULT_DB_NAME
}
return dbname
}

func (o *Config) GetJDDBVersion() string {
dbversion := pointer.GetString(o.JobDistributorConfig.DBVersion)
func (o *JDConfig) GetJDDBVersion() string {
dbversion := pointer.GetString(o.DBVersion)
if dbversion == "" {
return DEFAULT_DB_VERSION
}
return dbversion
}

func (o *Config) Validate() error {
return nil
}

func (o *Config) GetHomeChainSelector(evmNetworks []blockchain.EVMNetwork) (uint64, error) {
homeChainSelector, err := strconv.ParseUint(pointer.GetString(o.HomeChainSelector), 10, 64)
if err != nil {
Expand Down

0 comments on commit 4c58f67

Please sign in to comment.