Skip to content

Commit

Permalink
Introduce weighted load balancer (cadence-workflow#6315)
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll authored Oct 5, 2024
1 parent b92f122 commit 5b92d52
Show file tree
Hide file tree
Showing 14 changed files with 911 additions and 68 deletions.
8 changes: 7 additions & 1 deletion client/clientfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,16 @@ func (cf *rpcClientFactory) NewMatchingClientWithTimeout(

defaultLoadBalancer := matching.NewLoadBalancer(domainIDToName, cf.dynConfig)
roundRobinLoadBalancer := matching.NewRoundRobinLoadBalancer(domainIDToName, cf.dynConfig)
weightedLoadBalancer := matching.NewWeightedLoadBalancer(roundRobinLoadBalancer, domainIDToName, cf.dynConfig, cf.logger)
loadBalancers := map[string]matching.LoadBalancer{
"random": defaultLoadBalancer,
"round-robin": roundRobinLoadBalancer,
"weighted": weightedLoadBalancer,
}
client := matching.NewClient(
rawClient,
peerResolver,
matching.NewMultiLoadBalancer(defaultLoadBalancer, roundRobinLoadBalancer, domainIDToName, cf.dynConfig),
matching.NewMultiLoadBalancer(defaultLoadBalancer, loadBalancers, domainIDToName, cf.dynConfig, cf.logger),
)
client = timeoutwrapper.NewMatchingClient(client, longPollTimeout, timeout)
if errorRate := cf.dynConfig.GetFloat64Property(dynamicconfig.MatchingErrorInjectionRate)(); errorRate != 0 {
Expand Down
19 changes: 17 additions & 2 deletions client/matching/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (c *clientImpl) AddActivityTask(
request.GetForwardedFrom(),
)
request.TaskList.Name = partition
peer, err := c.peerResolver.FromTaskList(partition)
peer, err := c.peerResolver.FromTaskList(request.TaskList.GetName())
if err != nil {
return err
}
Expand Down Expand Up @@ -105,6 +105,7 @@ func (c *clientImpl) PollForActivityTask(
if err != nil {
return nil, err
}
// TODO: update activity response to include backlog count hint and update the weight for partitions
return c.client.PollForActivityTask(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
}

Expand All @@ -119,12 +120,26 @@ func (c *clientImpl) PollForDecisionTask(
persistence.TaskListTypeDecision,
request.GetForwardedFrom(),
)
originalTaskListName := request.PollRequest.GetTaskList().GetName()
request.PollRequest.TaskList.Name = partition
peer, err := c.peerResolver.FromTaskList(request.PollRequest.TaskList.GetName())
if err != nil {
return nil, err
}
return c.client.PollForDecisionTask(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
resp, err := c.client.PollForDecisionTask(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
if err != nil {
return nil, err
}
request.PollRequest.TaskList.Name = originalTaskListName
c.loadBalancer.UpdateWeight(
request.GetDomainUUID(),
*request.PollRequest.GetTaskList(),
persistence.TaskListTypeDecision,
request.GetForwardedFrom(),
partition,
resp.BacklogCountHint,
)
return resp, nil
}

func (c *clientImpl) QueryWorkflow(
Expand Down
1 change: 1 addition & 0 deletions client/matching/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ func TestClient_withResponse(t *testing.T) {
balancer.EXPECT().PickReadPartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeDecision, "").Return(_testPartition)
p.EXPECT().FromTaskList(_testPartition).Return("peer0", nil)
c.EXPECT().PollForDecisionTask(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(&types.MatchingPollForDecisionTaskResponse{}, nil)
balancer.EXPECT().UpdateWeight(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeDecision, "", _testPartition, int64(0))
},
want: &types.MatchingPollForDecisionTaskResponse{},
},
Expand Down
35 changes: 31 additions & 4 deletions client/matching/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination loadbalancer_mock.go -package matching github.com/uber/cadence/client/matching LoadBalancer

package matching

import (
Expand Down Expand Up @@ -57,6 +59,18 @@ type (
taskListType int,
forwardedFrom string,
) string

// UpdateWeight updates the weight of a task list partition.
// Input is name of the original task list as specified by caller. When
// the original task list is a partition, no update should be done.
UpdateWeight(
domainID string,
taskList types.TaskList,
taskListType int,
forwardedFrom string,
partition string,
weight int64,
)
}

defaultLoadBalancer struct {
Expand Down Expand Up @@ -134,9 +148,22 @@ func (lb *defaultLoadBalancer) pickPartition(
}

p := rand.Intn(nPartitions)
if p == 0 {
return taskList.GetName()
}
return getPartitionTaskListName(taskList.GetName(), p)
}

func (lb *defaultLoadBalancer) UpdateWeight(
domainID string,
taskList types.TaskList,
taskListType int,
forwardedFrom string,
partition string,
weight int64,
) {
}

return fmt.Sprintf("%v%v/%v", common.ReservedTaskListPrefix, taskList.GetName(), p)
func getPartitionTaskListName(root string, partition int) string {
if partition <= 0 {
return root
}
return fmt.Sprintf("%v%v/%v", common.ReservedTaskListPrefix, root, partition)
}
30 changes: 21 additions & 9 deletions client/matching/loadbalancer_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

60 changes: 46 additions & 14 deletions client/matching/multi_loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,34 @@ package matching

import (
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/types"
)

type (
multiLoadBalancer struct {
random LoadBalancer
roundRobin LoadBalancer
defaultLoadBalancer LoadBalancer
loadBalancers map[string]LoadBalancer
domainIDToName func(string) (string, error)
loadbalancerStrategy dynamicconfig.StringPropertyFnWithTaskListInfoFilters
logger log.Logger
}
)

func NewMultiLoadBalancer(
random LoadBalancer,
roundRobin LoadBalancer,
defaultLoadBalancer LoadBalancer,
loadBalancers map[string]LoadBalancer,
domainIDToName func(string) (string, error),
dc *dynamicconfig.Collection,
logger log.Logger,
) LoadBalancer {
return &multiLoadBalancer{
random: random,
roundRobin: roundRobin,
defaultLoadBalancer: defaultLoadBalancer,
loadBalancers: loadBalancers,
domainIDToName: domainIDToName,
loadbalancerStrategy: dc.GetStringPropertyFilteredByTaskListInfo(dynamicconfig.TasklistLoadBalancerStrategy),
logger: logger,
}
}

Expand All @@ -58,12 +63,15 @@ func (lb *multiLoadBalancer) PickWritePartition(
) string {
domainName, err := lb.domainIDToName(domainID)
if err != nil {
return lb.random.PickWritePartition(domainID, taskList, taskListType, forwardedFrom)
return lb.defaultLoadBalancer.PickWritePartition(domainID, taskList, taskListType, forwardedFrom)
}
if lb.loadbalancerStrategy(domainName, taskList.GetName(), taskListType) == "round-robin" {
return lb.roundRobin.PickWritePartition(domainID, taskList, taskListType, forwardedFrom)
strategy := lb.loadbalancerStrategy(domainName, taskList.GetName(), taskListType)
loadBalancer, ok := lb.loadBalancers[strategy]
if !ok {
lb.logger.Warn("unsupported load balancer strategy", tag.Value(strategy))
return lb.defaultLoadBalancer.PickWritePartition(domainID, taskList, taskListType, forwardedFrom)
}
return lb.random.PickWritePartition(domainID, taskList, taskListType, forwardedFrom)
return loadBalancer.PickWritePartition(domainID, taskList, taskListType, forwardedFrom)
}

func (lb *multiLoadBalancer) PickReadPartition(
Expand All @@ -74,10 +82,34 @@ func (lb *multiLoadBalancer) PickReadPartition(
) string {
domainName, err := lb.domainIDToName(domainID)
if err != nil {
return lb.random.PickReadPartition(domainID, taskList, taskListType, forwardedFrom)
return lb.defaultLoadBalancer.PickReadPartition(domainID, taskList, taskListType, forwardedFrom)
}
if lb.loadbalancerStrategy(domainName, taskList.GetName(), taskListType) == "round-robin" {
return lb.roundRobin.PickReadPartition(domainID, taskList, taskListType, forwardedFrom)
strategy := lb.loadbalancerStrategy(domainName, taskList.GetName(), taskListType)
loadBalancer, ok := lb.loadBalancers[strategy]
if !ok {
lb.logger.Warn("unsupported load balancer strategy", tag.Value(strategy))
return lb.defaultLoadBalancer.PickReadPartition(domainID, taskList, taskListType, forwardedFrom)
}
return lb.random.PickReadPartition(domainID, taskList, taskListType, forwardedFrom)
return loadBalancer.PickReadPartition(domainID, taskList, taskListType, forwardedFrom)
}

func (lb *multiLoadBalancer) UpdateWeight(
domainID string,
taskList types.TaskList,
taskListType int,
forwardedFrom string,
partition string,
weight int64,
) {
domainName, err := lb.domainIDToName(domainID)
if err != nil {
return
}
strategy := lb.loadbalancerStrategy(domainName, taskList.GetName(), taskListType)
loadBalancer, ok := lb.loadBalancers[strategy]
if !ok {
lb.logger.Warn("unsupported load balancer strategy", tag.Value(strategy))
return
}
loadBalancer.UpdateWeight(domainID, taskList, taskListType, forwardedFrom, partition, weight)
}
Loading

0 comments on commit 5b92d52

Please sign in to comment.