Skip to content

Commit

Permalink
Instanciate shard distributor client (#6620)
Browse files Browse the repository at this point in the history
What changed?
Instanciate the shard distributor client with the correct outbonds, and make it available from the clientBean.

Why?
We need to instanciate the client so we can use it to look up things with the shard distributor

How did you test it?

Potential risks

Release notes

Documentation Changes
  • Loading branch information
jakobht authored Jan 15, 2025
1 parent f791bd6 commit be54846
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 29 deletions.
39 changes: 26 additions & 13 deletions client/clientBean.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/uber/cadence/client/frontend"
"github.com/uber/cadence/client/history"
"github.com/uber/cadence/client/matching"
"github.com/uber/cadence/client/sharddistributor"
"github.com/uber/cadence/client/wrappers/timeout"
"github.com/uber/cadence/common/cluster"
)
Expand All @@ -44,20 +45,22 @@ type (
GetHistoryPeers() history.PeerResolver
GetMatchingClient(domainIDToName DomainIDToNameFunc) (matching.Client, error)
GetFrontendClient() frontend.Client
GetShardDistributorClient() sharddistributor.Client
GetRemoteAdminClient(cluster string) admin.Client
SetRemoteAdminClient(cluster string, client admin.Client)
GetRemoteFrontendClient(cluster string) frontend.Client
}

clientBeanImpl struct {
sync.Mutex
historyClient history.Client
historyPeers history.PeerResolver
matchingClient atomic.Value
frontendClient frontend.Client
remoteAdminClients map[string]admin.Client
remoteFrontendClients map[string]frontend.Client
factory Factory
historyClient history.Client
historyPeers history.PeerResolver
matchingClient atomic.Value
frontendClient frontend.Client
shardDistributorClient sharddistributor.Client
remoteAdminClients map[string]admin.Client
remoteFrontendClients map[string]frontend.Client
factory Factory
}
)

Expand Down Expand Up @@ -96,13 +99,19 @@ func NewClientBean(factory Factory, dispatcher *yarpc.Dispatcher, clusterMetadat
remoteFrontendClients[clusterName] = frontendClient
}

shardDistributorClient, err := factory.NewShardDistributorClient()
if err != nil {
return nil, err
}

return &clientBeanImpl{
factory: factory,
historyClient: historyClient,
historyPeers: historyPeers,
frontendClient: remoteFrontendClients[clusterMetadata.GetCurrentClusterName()],
remoteAdminClients: remoteAdminClients,
remoteFrontendClients: remoteFrontendClients,
factory: factory,
historyClient: historyClient,
historyPeers: historyPeers,
frontendClient: remoteFrontendClients[clusterMetadata.GetCurrentClusterName()],
shardDistributorClient: shardDistributorClient,
remoteAdminClients: remoteAdminClients,
remoteFrontendClients: remoteFrontendClients,
}, nil
}

Expand All @@ -125,6 +134,10 @@ func (h *clientBeanImpl) GetFrontendClient() frontend.Client {
return h.frontendClient
}

func (h *clientBeanImpl) GetShardDistributorClient() sharddistributor.Client {
return h.shardDistributorClient
}

func (h *clientBeanImpl) GetRemoteAdminClient(cluster string) admin.Client {
client, ok := h.remoteAdminClients[cluster]
if !ok {
Expand Down
15 changes: 15 additions & 0 deletions client/clientBean_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 39 additions & 0 deletions client/clientfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package client

import (
"fmt"
"time"

adminv1 "github.com/uber/cadence-idl/go/proto/admin/v1"
Expand All @@ -33,10 +34,12 @@ import (
"github.com/uber/cadence/.gen/go/matching/matchingserviceclient"
historyv1 "github.com/uber/cadence/.gen/proto/history/v1"
matchingv1 "github.com/uber/cadence/.gen/proto/matching/v1"
sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1"
"github.com/uber/cadence/client/admin"
"github.com/uber/cadence/client/frontend"
"github.com/uber/cadence/client/history"
"github.com/uber/cadence/client/matching"
"github.com/uber/cadence/client/sharddistributor"
"github.com/uber/cadence/client/wrappers/errorinjectors"
"github.com/uber/cadence/client/wrappers/grpc"
"github.com/uber/cadence/client/wrappers/metered"
Expand All @@ -61,6 +64,9 @@ type (

NewAdminClientWithTimeoutAndConfig(config transport.ClientConfig, timeout time.Duration, largeTimeout time.Duration) (admin.Client, error)
NewFrontendClientWithTimeoutAndConfig(config transport.ClientConfig, timeout time.Duration, longPollTimeout time.Duration) (frontend.Client, error)

NewShardDistributorClient() (sharddistributor.Client, error)
NewShardDistributorClientWithTimeout(timeout time.Duration) (sharddistributor.Client, error)
}

// DomainIDToNameFunc maps a domainID to domain name. Returns error when mapping is not possible.
Expand Down Expand Up @@ -229,3 +235,36 @@ func (cf *rpcClientFactory) NewFrontendClientWithTimeoutAndConfig(
}
return client, nil
}

func (cf *rpcClientFactory) NewShardDistributorClient() (sharddistributor.Client, error) {
return cf.NewShardDistributorClientWithTimeout(timeoutwrapper.ShardDistributorDefaultTimeout)
}

func (cf *rpcClientFactory) NewShardDistributorClientWithTimeout(
timeout time.Duration,
) (sharddistributor.Client, error) {
outboundConfig, ok := cf.rpcFactory.GetDispatcher().OutboundConfig(service.ShardDistributor)
// If no outbound config is found, it means the service is not enabled, we just return nil as we don't want to
// break existing configs.
if !ok {
return nil, nil
}

if !rpc.IsGRPCOutbound(outboundConfig) {
return nil, fmt.Errorf("shard distributor client does not support non-GRPC outbound")
}

client := grpc.NewShardDistributorClient(
sharddistributorv1.NewShardDistributorAPIYARPCClient(outboundConfig),
)

client = timeoutwrapper.NewShardDistributorClient(client, timeout)
if errorRate := cf.dynConfig.GetFloat64Property(dynamicconfig.ShardDistributorErrorInjectionRate)(); errorRate != 0 {
client = errorinjectors.NewShardDistributorClient(client, errorRate, cf.logger)
}
if cf.metricsClient != nil {
client = metered.NewShardDistributorClient(client, cf.metricsClient)
}

return client, nil
}
52 changes: 36 additions & 16 deletions common/resource/resource_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/uber/cadence/client/frontend"
"github.com/uber/cadence/client/history"
"github.com/uber/cadence/client/matching"
"github.com/uber/cadence/client/sharddistributor"
"github.com/uber/cadence/client/wrappers/retryable"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/archiver"
Expand Down Expand Up @@ -108,14 +109,16 @@ type Impl struct {

// internal services clients

sdkClient workflowserviceclient.Interface
frontendRawClient frontend.Client
frontendClient frontend.Client
matchingRawClient matching.Client
matchingClient matching.Client
historyRawClient history.Client
historyClient history.Client
clientBean client.Bean
sdkClient workflowserviceclient.Interface
frontendRawClient frontend.Client
frontendClient frontend.Client
matchingRawClient matching.Client
matchingClient matching.Client
historyRawClient history.Client
historyClient history.Client
shardDistributorRawClient sharddistributor.Client
shardDistributorClient sharddistributor.Client
clientBean client.Bean

// persistence clients
persistenceBean persistenceClient.Bean
Expand Down Expand Up @@ -253,6 +256,21 @@ func New(
serviceConfig.IsErrorRetryableFunction,
)

shardDistributorRawClient := clientBean.GetShardDistributorClient()

// If the raw client is nil, then the client bean is not configured to provide a shard distributor client, so we
// do not wrap and provide a retryable client
var shardDistributorClient sharddistributor.Client
if shardDistributorRawClient == nil {
shardDistributorClient = nil
} else {
shardDistributorClient = retryable.NewShardDistributorClient(
shardDistributorRawClient,
common.CreateShardDistributorServiceRetryPolicy(),
serviceConfig.IsErrorRetryableFunction,
)
}

var historyRawClient history.Client
if params.HistoryClientFn != nil {
logger.Debug("Using history client from HistoryClientFn")
Expand Down Expand Up @@ -336,14 +354,16 @@ func New(

// internal services clients

sdkClient: params.PublicClient,
frontendRawClient: frontendRawClient,
frontendClient: frontendClient,
matchingRawClient: matchingRawClient,
matchingClient: matchingClient,
historyRawClient: historyRawClient,
historyClient: historyClient,
clientBean: clientBean,
sdkClient: params.PublicClient,
frontendRawClient: frontendRawClient,
frontendClient: frontendClient,
matchingRawClient: matchingRawClient,
matchingClient: matchingClient,
historyRawClient: historyRawClient,
historyClient: historyClient,
shardDistributorRawClient: shardDistributorRawClient,
shardDistributorClient: shardDistributorClient,
clientBean: clientBean,

// persistence clients
persistenceBean: persistenceBean,
Expand Down
12 changes: 12 additions & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ const (
frontendServiceOperationMaxInterval = 5 * time.Second
frontendServiceOperationExpirationInterval = 15 * time.Second

shardDistributorServiceOperationInitialInterval = 200 * time.Millisecond
shardDistributorServiceOperationMaxInterval = 10 * time.Second
shardDistributorServiceOperationExpirationInterval = 15 * time.Second

adminServiceOperationInitialInterval = 200 * time.Millisecond
adminServiceOperationMaxInterval = 5 * time.Second
adminServiceOperationExpirationInterval = 15 * time.Second
Expand Down Expand Up @@ -171,6 +175,14 @@ func CreateFrontendServiceRetryPolicy() backoff.RetryPolicy {
return policy
}

func CreateShardDistributorServiceRetryPolicy() backoff.RetryPolicy {
policy := backoff.NewExponentialRetryPolicy(shardDistributorServiceOperationInitialInterval)
policy.SetMaximumInterval(shardDistributorServiceOperationMaxInterval)
policy.SetExpirationInterval(shardDistributorServiceOperationExpirationInterval)

return policy
}

// CreateAdminServiceRetryPolicy creates a retry policy for calls to matching service
func CreateAdminServiceRetryPolicy() backoff.RetryPolicy {
policy := backoff.NewExponentialRetryPolicy(adminServiceOperationInitialInterval)
Expand Down
6 changes: 6 additions & 0 deletions common/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,12 @@ func TestCreateXXXRetryPolicyWithSetExpirationInterval(t *testing.T) {
wantMaximumInterval: replicationServiceBusyMaxInterval,
wantSetExpirationInterval: replicationServiceBusyExpirationInterval,
},
"CreateShardDistributorServiceRetryPolicy": {
createFn: CreateShardDistributorServiceRetryPolicy,
wantInitialInterval: shardDistributorServiceOperationInitialInterval,
wantMaximumInterval: shardDistributorServiceOperationMaxInterval,
wantSetExpirationInterval: shardDistributorServiceOperationExpirationInterval,
},
} {
t.Run(name, func(t *testing.T) {
want := backoff.NewExponentialRetryPolicy(c.wantInitialInterval)
Expand Down

0 comments on commit be54846

Please sign in to comment.