Skip to content

Commit

Permalink
Add sharddistributor outbounds (#6616)
Browse files Browse the repository at this point in the history
What changed?
Added outbounds and config for shard distributor

Why?
These outbounds will be used to instanciate the shard distributor. We use single grpc outbound as:

We only support GRPC in shard distributor
We only support unary outbounds for frontend in the OSS currently, so we do the same for shard distributor.
How did you test it?
Tested locally and added some unit tests

Potential risks
Should be very low risk, just adds some instanciations.

Release notes

Documentation Changes
  • Loading branch information
jakobht authored Jan 15, 2025
1 parent d500a06 commit f791bd6
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 47 deletions.
10 changes: 10 additions & 0 deletions common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ type (
// To use Async APIs for a domain first specify the queue using Admin API.
// Either refer to one of the predefined queues in this config or alternatively specify the queue details inline in the API call.
AsyncWorkflowQueues map[string]AsyncWorkflowQueueProvider `yaml:"asyncWorkflowQueues"`
// ShardDistributorClient is the config for shard distributor client
// Shard distributor is used to distribute shards across multiple cadence service instances
// Note: This is not recommended for use, it's still experimental
ShardDistributorClient ShardDistributorClient `yaml:"shardDistributorClient"`
}

// Membership holds peer provider configuration.
Expand Down Expand Up @@ -592,6 +596,12 @@ type (
URI string `yaml:"URI"`
}

// ShardDistributorClient contains the config items for shard distributor
ShardDistributorClient struct {
// The host and port of the shard distributor server
HostPort string `yaml:"hostPort"`
}

// YamlNode is a lazy-unmarshaler, because *yaml.Node only exists in gopkg.in/yaml.v3, not v2,
// and go.uber.org/config currently uses only v2.
YamlNode struct {
Expand Down
5 changes: 5 additions & 0 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,3 +308,8 @@ func (v FailoverType) String() string {
return "Unknown"
}
}

const (
ShardModeHashRing = "hash-ring"
ShardModeShardDistributor = "shard-distributor"
)
20 changes: 20 additions & 0 deletions common/rpc/outbounds.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,3 +263,23 @@ func IsGRPCOutbound(config transport.ClientConfig) bool {
}
return namer.TransportName() == grpc.TransportName
}
func NewSingleGRPCOutboundBuilder(outboundName string, serviceName string, address string) OutboundsBuilder {
return singleGRPCOutbound{outboundName, serviceName, address}
}

type singleGRPCOutbound struct {
outboundName string
serviceName string
address string
}

func (b singleGRPCOutbound) Build(grpc *grpc.Transport, _ *tchannel.Transport) (*Outbounds, error) {
return &Outbounds{
Outbounds: yarpc.Outbounds{
b.outboundName: {
ServiceName: b.serviceName,
Unary: grpc.NewSingleOutbound(b.address),
},
},
}, nil
}
12 changes: 12 additions & 0 deletions common/rpc/outbounds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,18 @@ func TestDirectOutbound(t *testing.T) {
assert.NotNil(t, outbounds["cadence-history"].Unary)
}

func TestSingleGRPCOutbound(t *testing.T) {
grpc := &grpc.Transport{}
tchannel := &tchannel.Transport{}

builder := NewSingleGRPCOutboundBuilder("grpc-only-out", "grpc-service-name", "http://example.com:1234")

outBound, err := builder.Build(grpc, tchannel)
assert.NoError(t, err)
assert.Equal(t, "grpc-service-name", outBound.Outbounds["grpc-only-out"].ServiceName)
assert.NotNil(t, outBound.Outbounds["grpc-only-out"].Unary)
}

func TestIsGRPCOutboud(t *testing.T) {
assert.True(t, IsGRPCOutbound(&transport.OutboundConfig{Outbounds: transport.Outbounds{Unary: (&grpc.Transport{}).NewSingleOutbound("localhost:1234")}}))
assert.False(t, IsGRPCOutbound(&transport.OutboundConfig{Outbounds: transport.Outbounds{Unary: (&tchannel.Transport{}).NewSingleOutbound("localhost:1234")}}))
Expand Down
57 changes: 33 additions & 24 deletions common/rpc/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,31 +133,40 @@ func NewParams(serviceName string, config *config.Config, dc *dynamicconfig.Coll
}
}

return Params{
ServiceName: serviceName,
HTTP: http,
TChannelAddress: net.JoinHostPort(listenIP.String(), strconv.Itoa(int(serviceConfig.RPC.Port))),
GRPCAddress: net.JoinHostPort(listenIP.String(), strconv.Itoa(int(serviceConfig.RPC.GRPCPort))),
GRPCMaxMsgSize: serviceConfig.RPC.GRPCMaxMsgSize,
OutboundsBuilder: CombineOutbounds(
NewDirectOutboundBuilder(
service.History,
enableGRPCOutbound,
outboundTLS[service.History],
NewDirectPeerChooserFactory(service.History, logger, metricsCl),
dc.GetBoolProperty(dynamicconfig.EnableConnectionRetainingDirectChooser),
),
NewDirectOutboundBuilder(
service.Matching,
enableGRPCOutbound,
outboundTLS[service.Matching],
NewDirectPeerChooserFactory(service.Matching, logger, metricsCl),
dc.GetBoolProperty(dynamicconfig.EnableConnectionRetainingDirectChooser),
),
publicClientOutbound,
outboundsBuilders := []OutboundsBuilder{
NewDirectOutboundBuilder(
service.History,
enableGRPCOutbound,
outboundTLS[service.History],
NewDirectPeerChooserFactory(service.History, logger, metricsCl),
dc.GetBoolProperty(dynamicconfig.EnableConnectionRetainingDirectChooser),
),
NewDirectOutboundBuilder(
service.Matching,
enableGRPCOutbound,
outboundTLS[service.Matching],
NewDirectPeerChooserFactory(service.Matching, logger, metricsCl),
dc.GetBoolProperty(dynamicconfig.EnableConnectionRetainingDirectChooser),
),
InboundTLS: inboundTLS,
OutboundTLS: outboundTLS,
publicClientOutbound,
}
if config.ShardDistributorClient.HostPort != "" {
outboundsBuilders = append(outboundsBuilders, NewSingleGRPCOutboundBuilder(
service.ShardDistributor,
service.ShardDistributor,
config.ShardDistributorClient.HostPort,
))
}

return Params{
ServiceName: serviceName,
HTTP: http,
TChannelAddress: net.JoinHostPort(listenIP.String(), strconv.Itoa(int(serviceConfig.RPC.Port))),
GRPCAddress: net.JoinHostPort(listenIP.String(), strconv.Itoa(int(serviceConfig.RPC.GRPCPort))),
GRPCMaxMsgSize: serviceConfig.RPC.GRPCMaxMsgSize,
OutboundsBuilder: CombineOutbounds(outboundsBuilders...),
InboundTLS: inboundTLS,
OutboundTLS: outboundTLS,
InboundMiddleware: yarpc.InboundMiddleware{
// order matters: ForwardPartitionConfigMiddleware must be applied after ClientPartitionConfigMiddleware
Unary: yarpc.UnaryInboundMiddleware(&PinotComparatorMiddleware{}, &InboundMetricsMiddleware{}, &ClientPartitionConfigMiddleware{}, &ForwardPartitionConfigMiddleware{}),
Expand Down
5 changes: 3 additions & 2 deletions common/rpc/params_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ func TestNewParams(t *testing.T) {
dc := dynamicconfig.NewNopCollection()
makeConfig := func(svc config.Service) *config.Config {
return &config.Config{
PublicClient: config.PublicClient{HostPort: "localhost:9999"},
Services: map[string]config.Service{"frontend": svc}}
PublicClient: config.PublicClient{HostPort: "localhost:9999"},
ShardDistributorClient: config.ShardDistributorClient{HostPort: "localhost:9998"},
Services: map[string]config.Service{"frontend": svc}}
}
logger := testlogger.New(t)
metricsCl := metrics.NewNoopMetricsClient()
Expand Down
3 changes: 3 additions & 0 deletions config/development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,6 @@ dynamicconfig:
blobstore:
filestore:
outputDirectory: "/tmp/blobstore"

shardDistributorClient:
hostPort: "localhost:7943"
23 changes: 2 additions & 21 deletions host/onebox.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ import (
"go.uber.org/cadence/compatibility"
"go.uber.org/yarpc"
"go.uber.org/yarpc/api/transport"
"go.uber.org/yarpc/transport/grpc"
"go.uber.org/yarpc/transport/tchannel"

adminClient "github.com/uber/cadence/client/admin"
frontendClient "github.com/uber/cadence/client/frontend"
Expand Down Expand Up @@ -1138,8 +1136,8 @@ func (c *cadenceImpl) newRPCFactory(serviceName string, host membership.HostInfo

// For integration tests to generate client out of the same outbound.
OutboundsBuilder: rpc.CombineOutbounds(
&singleGRPCOutbound{testOutboundName(serviceName), serviceName, grpcAddress},
&singleGRPCOutbound{rpc.OutboundPublicClient, service.Frontend, frontendGrpcAddress},
rpc.NewSingleGRPCOutboundBuilder(testOutboundName(serviceName), serviceName, grpcAddress),
rpc.NewSingleGRPCOutboundBuilder(rpc.OutboundPublicClient, service.Frontend, frontendGrpcAddress),
rpc.NewCrossDCOutbounds(c.clusterMetadata.GetAllClusterInfo(), rpc.NewDNSPeerChooserFactory(0, c.logger)),
rpc.NewDirectOutboundBuilder(service.History, true, nil, directOutboundPCF, directConnRetainFn),
rpc.NewDirectOutboundBuilder(service.Matching, true, nil, directOutboundPCF, directConnRetainFn),
Expand All @@ -1152,23 +1150,6 @@ func testOutboundName(name string) string {
return "test-" + name
}

type singleGRPCOutbound struct {
outboundName string
serviceName string
address string
}

func (b singleGRPCOutbound) Build(grpc *grpc.Transport, _ *tchannel.Transport) (*rpc.Outbounds, error) {
return &rpc.Outbounds{
Outbounds: yarpc.Outbounds{
b.outboundName: {
ServiceName: b.serviceName,
Unary: grpc.NewSingleOutbound(b.address),
},
},
}, nil
}

type versionMiddleware struct {
}

Expand Down

0 comments on commit f791bd6

Please sign in to comment.