Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gracefulswitch: add ParseConfig and make UpdateClientConnState call SwitchTo if needed #7035

Merged
merged 6 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,14 @@ var (
// an init() function), and is not thread-safe. If multiple Balancers are
// registered with the same name, the one registered last will take effect.
func Register(b Builder) {
if strings.ToLower(b.Name()) != b.Name() {
name := strings.ToLower(b.Name())
if name != b.Name() {
// TODO: Skip the use of strings.ToLower() to index the map after v1.59
// is released to switch to case sensitive balancer registry. Also,
// remove this warning and update the docstrings for Register and Get.
logger.Warningf("Balancer registered with name %q. grpc-go will be switching to case sensitive balancer registries soon", b.Name())
}
m[strings.ToLower(b.Name())] = b
m[name] = b
}

// unregisterForTesting deletes the balancer with the given name from the
Expand Down
2 changes: 1 addition & 1 deletion balancer/rls/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func (s) TestParseConfigErrors(t *testing.T) {
"childPolicy": [{"grpclb": {"childPolicy": [{"pickfirst": {}}]}}],
"childPolicyConfigTargetFieldName": "serviceName"
}`),
wantErr: "invalid loadBalancingConfig: no supported policies found",
wantErr: "no supported policies found in config",
},
{
desc: "no child policy",
Expand Down
57 changes: 7 additions & 50 deletions balancer_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ package grpc
import (
"context"
"fmt"
"strings"
"sync"

"google.golang.org/grpc/balancer"
Expand Down Expand Up @@ -66,7 +65,8 @@ type ccBalancerWrapper struct {
}

// newCCBalancerWrapper creates a new balancer wrapper in idle state. The
// underlying balancer is not created until the switchTo() method is invoked.
// underlying balancer is not created until the updateClientConnState() method
// is invoked.
func newCCBalancerWrapper(cc *ClientConn) *ccBalancerWrapper {
ctx, cancel := context.WithCancel(cc.ctx)
ccb := &ccBalancerWrapper{
Expand Down Expand Up @@ -97,6 +97,11 @@ func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnStat
if ctx.Err() != nil || ccb.balancer == nil {
return
}
name := gracefulswitch.ChildName(ccs.BalancerConfig)
if ccb.curBalancerName != name {
ccb.curBalancerName = name
channelz.Infof(logger, ccb.cc.channelz, "Channel switches to new LB policy %q", name)
}
err := ccb.balancer.UpdateClientConnState(*ccs)
if logger.V(2) && err != nil {
logger.Infof("error from balancer.UpdateClientConnState: %v", err)
Expand All @@ -120,54 +125,6 @@ func (ccb *ccBalancerWrapper) resolverError(err error) {
})
}

// switchTo is invoked by grpc to instruct the balancer wrapper to switch to the
// LB policy identified by name.
//
// ClientConn calls newCCBalancerWrapper() at creation time. Upon receipt of the
// first good update from the name resolver, it determines the LB policy to use
// and invokes the switchTo() method. Upon receipt of every subsequent update
// from the name resolver, it invokes this method.
//
// the ccBalancerWrapper keeps track of the current LB policy name, and skips
// the graceful balancer switching process if the name does not change.
func (ccb *ccBalancerWrapper) switchTo(name string) {
ccb.serializer.Schedule(func(ctx context.Context) {
if ctx.Err() != nil || ccb.balancer == nil {
return
}
// TODO: Other languages use case-sensitive balancer registries. We should
// switch as well. See: https://github.com/grpc/grpc-go/issues/5288.
if strings.EqualFold(ccb.curBalancerName, name) {
return
}
ccb.buildLoadBalancingPolicy(name)
})
}

// buildLoadBalancingPolicy performs the following:
// - retrieve a balancer builder for the given name. Use the default LB
// policy, pick_first, if no LB policy with name is found in the registry.
// - instruct the gracefulswitch balancer to switch to the above builder. This
// will actually build the new balancer.
// - update the `curBalancerName` field
//
// Must be called from a serializer callback.
func (ccb *ccBalancerWrapper) buildLoadBalancingPolicy(name string) {
builder := balancer.Get(name)
if builder == nil {
channelz.Warningf(logger, ccb.cc.channelz, "Channel switches to new LB policy %q, since the specified LB policy %q was not registered", PickFirstBalancerName, name)
builder = newPickfirstBuilder()
} else {
channelz.Infof(logger, ccb.cc.channelz, "Channel switches to new LB policy %q", name)
}

if err := ccb.balancer.SwitchTo(builder); err != nil {
channelz.Errorf(logger, ccb.cc.channelz, "Channel failed to build new LB policy %q: %v", name, err)
return
}
ccb.curBalancerName = builder.Name()
}

// close initiates async shutdown of the wrapper. cc.mu must be held when
// calling this function. To determine the wrapper has finished shutting down,
// the channel should block on ccb.serializer.Done() without cc.mu held.
Expand Down
14 changes: 2 additions & 12 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,7 @@ func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
var emptyServiceConfig *ServiceConfig

func init() {
balancer.Register(pickfirstBuilder{})
cfg := parseServiceConfig("{}")
if cfg.Err != nil {
panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err))
Expand Down Expand Up @@ -777,7 +778,7 @@ func (cc *ClientConn) updateResolverStateAndUnlock(s resolver.State, err error)

var balCfg serviceconfig.LoadBalancingConfig
if cc.sc != nil && cc.sc.lbConfig != nil {
balCfg = cc.sc.lbConfig.cfg
balCfg = cc.sc.lbConfig
}
bw := cc.balancerWrapper
cc.mu.Unlock()
Expand Down Expand Up @@ -1074,17 +1075,6 @@ func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSel
} else {
cc.retryThrottler.Store((*retryThrottler)(nil))
}

var newBalancerName string
if cc.sc == nil || (cc.sc.lbConfig == nil && cc.sc.LB == nil) {
// No service config or no LB policy specified in config.
newBalancerName = PickFirstBalancerName
} else if cc.sc.lbConfig != nil {
newBalancerName = cc.sc.lbConfig.name
} else { // cc.sc.LB != nil
newBalancerName = *cc.sc.LB
}
cc.balancerWrapper.switchTo(newBalancerName)
}

func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) {
Expand Down
83 changes: 83 additions & 0 deletions internal/balancer/gracefulswitch/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package gracefulswitch

import (
"encoding/json"
"fmt"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/serviceconfig"
)

type lbConfig struct {
serviceconfig.LoadBalancingConfig

childBuilder balancer.Builder
childConfig serviceconfig.LoadBalancingConfig
}

func ChildName(l serviceconfig.LoadBalancingConfig) string {
return l.(*lbConfig).childBuilder.Name()
}

// ParseConfig parses a child config list and returns a LB config for the
// gracefulswitch Balancer.
//
// cfg is expected to be a json.RawMessage containing a JSON array of LB policy
// names + configs as the format of the "loadBalancingConfig" field in
// ServiceConfig. It returns a type that should be passed to
// UpdateClientConnState in the BalancerConfig field.
func ParseConfig(cfg json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
var lbCfg []map[string]json.RawMessage
if err := json.Unmarshal(cfg, &lbCfg); err != nil {
return nil, err
}

Check warning on line 51 in internal/balancer/gracefulswitch/config.go

View check run for this annotation

Codecov / codecov/patch

internal/balancer/gracefulswitch/config.go#L50-L51

Added lines #L50 - L51 were not covered by tests
for i, e := range lbCfg {
if len(e) != 1 {
return nil, fmt.Errorf("expected a JSON struct with one entry; received entry %v at index %d", e, i)
}

Check warning on line 55 in internal/balancer/gracefulswitch/config.go

View check run for this annotation

Codecov / codecov/patch

internal/balancer/gracefulswitch/config.go#L54-L55

Added lines #L54 - L55 were not covered by tests

var name string
var jsonCfg json.RawMessage
for name, jsonCfg = range e {
}

builder := balancer.Get(name)
if builder == nil {
// Skip unregistered balancer names.
continue
}

parser, ok := builder.(balancer.ConfigParser)
if !ok {
// This is a valid child with no config.
return &lbConfig{childBuilder: builder}, nil
}

cfg, err := parser.ParseConfig(jsonCfg)
if err != nil {
return nil, fmt.Errorf("error parsing config for policy %q: %v", name, err)
}

return &lbConfig{childBuilder: builder, childConfig: cfg}, nil
}

return nil, fmt.Errorf("no supported policies found in config: %v", string(cfg))
}
45 changes: 40 additions & 5 deletions internal/balancer/gracefulswitch/gracefulswitch.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,23 @@
// process is not complete when this method returns. This method must be called
// synchronously alongside the rest of the balancer.Balancer methods this
// Graceful Switch Balancer implements.
//
// Deprecated: use ParseConfig and pass a parsed config to UpdateClientConnState
// to cause the Balancer to automatically change to the new child when necessary.
func (gsb *Balancer) SwitchTo(builder balancer.Builder) error {
_, err := gsb.switchTo(builder)
return err
}

func (gsb *Balancer) switchTo(builder balancer.Builder) (*balancerWrapper, error) {
gsb.mu.Lock()
if gsb.closed {
gsb.mu.Unlock()
return errBalancerClosed
return nil, errBalancerClosed
}
bw := &balancerWrapper{
gsb: gsb,
builder: builder,
gsb: gsb,
lastState: balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable),
Expand Down Expand Up @@ -129,7 +138,7 @@
gsb.balancerCurrent = nil
}
gsb.mu.Unlock()
return balancer.ErrBadResolverState
return nil, balancer.ErrBadResolverState

Check warning on line 141 in internal/balancer/gracefulswitch/gracefulswitch.go

View check run for this annotation

Codecov / codecov/patch

internal/balancer/gracefulswitch/gracefulswitch.go#L141

Added line #L141 was not covered by tests
}

// This write doesn't need to take gsb.mu because this field never gets read
Expand All @@ -138,7 +147,7 @@
// bw.Balancer field will never be forwarded to until this SwitchTo()
// function returns.
bw.Balancer = newBalancer
return nil
return bw, nil
}

// Returns nil if the graceful switch balancer is closed.
Expand All @@ -152,12 +161,33 @@
}

// UpdateClientConnState forwards the update to the latest balancer created.
//
// If the state's BalancerConfig is the config returned by a call to
// gracefulswitch.ParseConfig, then this function will automatically SwitchTo
dfawley marked this conversation as resolved.
Show resolved Hide resolved
// the balancer indicated by the config before forwarding its config to it, if
// necessary.
func (gsb *Balancer) UpdateClientConnState(state balancer.ClientConnState) error {
// The resolver data is only relevant to the most recent LB Policy.
balToUpdate := gsb.latestBalancer()

gsbCfg, ok := state.BalancerConfig.(*lbConfig)
if ok {
// Switch to the child in the config unless it is already active.
if balToUpdate == nil || gsbCfg.childBuilder.Name() != balToUpdate.builder.Name() {
var err error
balToUpdate, err = gsb.switchTo(gsbCfg.childBuilder)
if err != nil {
return fmt.Errorf("could not switch to new child balancer: %w", err)
}

Check warning on line 181 in internal/balancer/gracefulswitch/gracefulswitch.go

View check run for this annotation

Codecov / codecov/patch

internal/balancer/gracefulswitch/gracefulswitch.go#L180-L181

Added lines #L180 - L181 were not covered by tests
}
// Unwrap the child balancer's config.
state.BalancerConfig = gsbCfg.childConfig
}

if balToUpdate == nil {
return errBalancerClosed
}

// Perform this call without gsb.mu to prevent deadlocks if the child calls
// back into the channel. The latest balancer can never be closed during a
// call from the channel, even without gsb.mu held.
Expand All @@ -169,6 +199,10 @@
// The resolver data is only relevant to the most recent LB Policy.
balToUpdate := gsb.latestBalancer()
if balToUpdate == nil {
gsb.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: base.NewErrPicker(err),
})
return
}
// Perform this call without gsb.mu to prevent deadlocks if the child calls
Expand Down Expand Up @@ -261,7 +295,8 @@
// graceful switch logic.
type balancerWrapper struct {
balancer.Balancer
gsb *Balancer
gsb *Balancer
builder balancer.Builder

lastState balancer.State
subconns map[balancer.SubConn]bool // subconns created by this balancer
Expand Down
14 changes: 3 additions & 11 deletions pickfirst.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,15 @@ const (
logPrefix = "[pick-first-lb %p] "
)

func newPickfirstBuilder() balancer.Builder {
return &pickfirstBuilder{}
}

type pickfirstBuilder struct{}

func (*pickfirstBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
func (pickfirstBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
b := &pickfirstBalancer{cc: cc}
b.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(logPrefix, b))
return b
}

func (*pickfirstBuilder) Name() string {
func (pickfirstBuilder) Name() string {
return PickFirstBalancerName
}

Expand All @@ -63,7 +59,7 @@ type pfConfig struct {
ShuffleAddressList bool `json:"shuffleAddressList"`
}

func (*pickfirstBuilder) ParseConfig(js json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
func (pickfirstBuilder) ParseConfig(js json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
var cfg pfConfig
if err := json.Unmarshal(js, &cfg); err != nil {
return nil, fmt.Errorf("pickfirst: unable to unmarshal LB policy config: %s, error: %v", string(js), err)
Expand Down Expand Up @@ -243,7 +239,3 @@ func (i *idlePicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
i.subConn.Connect()
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}

func init() {
balancer.Register(newPickfirstBuilder())
}
Loading
Loading