Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: Implementation of xds_resolver using LDS/RDS #3183

Merged
merged 10 commits into from
Nov 19, 2019
Merged
11 changes: 2 additions & 9 deletions xds/experimental/xds_experimental.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,6 @@
package experimental

import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/resolver"
xdsbalancer "google.golang.org/grpc/xds/internal/balancer"
xdsresolver "google.golang.org/grpc/xds/internal/resolver"
_ "google.golang.org/grpc/xds/internal/balancer" // Register the xds_balancer
_ "google.golang.org/grpc/xds/internal/resolver" // Register the xds_resolver
)

func init() {
resolver.Register(xdsresolver.NewBuilder())
balancer.Register(xdsbalancer.NewBalancerBuilder())
}
11 changes: 5 additions & 6 deletions xds/internal/balancer/xds.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,13 @@ var (
}
)

type xdsBalancerBuilder struct{}

// NewBalancerBuilder creates a new implementation of the balancer.Builder
// interface for the xDS balancer.
func NewBalancerBuilder() balancer.Builder {
return &xdsBalancerBuilder{}
func init() {
balancer.Register(&xdsBalancerBuilder{})
}

type xdsBalancerBuilder struct{}

// Build helps implement the balancer.Builder interface.
func (b *xdsBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
ctx, cancel := context.WithCancel(context.Background())
x := &xdsBalancer{
Expand Down
10 changes: 9 additions & 1 deletion xds/internal/client/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ import (
"io/ioutil"
"os"

corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
"github.com/golang/protobuf/jsonpb"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/google"
"google.golang.org/grpc/grpclog"

corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
)

const (
Expand Down Expand Up @@ -147,6 +148,13 @@ func NewConfig() *Config {
}
}

// If we don't find a nodeProto in the bootstrap file, we just create a new
// one and set the version number here. That way, callers of this function
// can always expect that the NodeProto field is non-nil.
if config.NodeProto == nil {
config.NodeProto = &corepb.Node{BuildVersion: gRPCVersion}
}

grpclog.Infof("xds: bootstrap.NewConfig returning: %+v", config)
return config
}
41 changes: 25 additions & 16 deletions xds/internal/client/bootstrap/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ import (
"os"
"testing"

corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
"github.com/golang/protobuf/proto"
structpb "github.com/golang/protobuf/ptypes/struct"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/google"

corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
structpb "github.com/golang/protobuf/ptypes/struct"
)

var (
Expand Down Expand Up @@ -197,7 +198,13 @@ func TestNewConfig(t *testing.T) {
{"nonExistentBootstrapFile", &Config{}},
{"empty", &Config{}},
{"badJSON", &Config{}},
{"emptyNodeProto", &Config{BalancerName: "trafficdirector.googleapis.com:443"}},
{
"emptyNodeProto",
&Config{
BalancerName: "trafficdirector.googleapis.com:443",
NodeProto: &corepb.Node{BuildVersion: gRPCVersion},
},
},
{"emptyXdsServer", &Config{NodeProto: nodeProto}},
{"unknownTopLevelFieldInFile", nilCredsConfig},
{"unknownFieldInNodeProto", &Config{NodeProto: nodeProto}},
Expand All @@ -209,19 +216,21 @@ func TestNewConfig(t *testing.T) {
}

for _, test := range tests {
if err := os.Setenv(fileEnv, test.name); err != nil {
t.Fatalf("os.Setenv(%s, %s) failed with error: %v", fileEnv, test.name, err)
}
config := NewConfig()
if config.BalancerName != test.wantConfig.BalancerName {
t.Errorf("%s: config.BalancerName is %s, want %s", test.name, config.BalancerName, test.wantConfig.BalancerName)
}
if !proto.Equal(config.NodeProto, test.wantConfig.NodeProto) {
t.Errorf("%s: config.NodeProto is %#v, want %#v", test.name, config.NodeProto, test.wantConfig.NodeProto)
}
if (config.Creds != nil) != (test.wantConfig.Creds != nil) {
t.Errorf("%s: config.Creds is %#v, want %#v", test.name, config.Creds, test.wantConfig.Creds)
}
t.Run(test.name, func(t *testing.T) {
if err := os.Setenv(fileEnv, test.name); err != nil {
t.Fatalf("os.Setenv(%s, %s) failed with error: %v", fileEnv, test.name, err)
}
config := NewConfig()
if config.BalancerName != test.wantConfig.BalancerName {
t.Errorf("config.BalancerName is %s, want %s", config.BalancerName, test.wantConfig.BalancerName)
}
if !proto.Equal(config.NodeProto, test.wantConfig.NodeProto) {
t.Errorf("config.NodeProto is %#v, want %#v", config.NodeProto, test.wantConfig.NodeProto)
}
if (config.Creds != nil) != (test.wantConfig.Creds != nil) {
t.Errorf("config.Creds is %#v, want %#v", config.Creds, test.wantConfig.Creds)
}
})
}
}

Expand Down
166 changes: 166 additions & 0 deletions xds/internal/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Package client implementation a full fledged gRPC client for the xDS API
// used by the xds resolver and balancer implementations.
package client

import (
"context"
"errors"
"fmt"
"sync"

"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/xds/internal/client/bootstrap"
)

// For overriding in unittests.
var dialFunc = grpc.DialContext

// Options provides all parameters required for the creation of an xDS client.
type Options struct {
// Config contains a fully populated bootstrap config. It is the
// responsibility of the caller to use some sane defaults here if the
// bootstrap process returned with certain fields left unspecified.
Config bootstrap.Config
// Backoff is the backoff strategy to be used when reconnecting to the xDS
// server. If left unspecified, a default exponential backoff strategy
// would be used.
// TODO: If we have no reason to use an exponential backoff with
// non-default values, remove this field, and directly use the appropriate
// one in v2Client.
Backoff backoff.Strategy
// DialOpts contains dial options to be used when dialing the xDS server.
DialOpts []grpc.DialOption
}

// Client is a full fledged gRPC client which queries a set of discovery APIs
// (collectively termed as xDS) on a remote management server, to discover
// various dynamic resources. A single client object will be shared by the xds
// resolver and balancer implementations.
type Client struct {
cc *grpc.ClientConn // Connection to the xDS server
v2c *v2Client // Actual xDS client implementation using the v2 API

mu sync.Mutex
serviceCallback func(ServiceUpdate, error)
ldsCancel func()
rdsCancel func()
}

// NewClient returns a new xdsClient configured with opts.
func NewClient(opts Options) (*Client, error) {
switch {
case opts.Config.BalancerName == "":
return nil, errors.New("xds: no xds_server name provided in options")
case opts.Config.Creds == nil:
return nil, errors.New("xds: no credentials provided in options")
case opts.Config.NodeProto == nil:
return nil, errors.New("xds: no node_proto provided in options")
}

dopts := append([]grpc.DialOption{opts.Config.Creds}, opts.DialOpts...)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cc, err := dialFunc(ctx, opts.Config.BalancerName, dopts...)
if err != nil {
// An error from a non-blocking dial indicates something serious.
return nil, fmt.Errorf("xds: failed to dial balancer {%s}: %v", opts.Config.BalancerName, err)
}

bs := backoff.DefaultExponential.Backoff
if opts.Backoff != nil {
bs = opts.Backoff.Backoff
}
c := &Client{
cc: cc,
v2c: newV2Client(cc, opts.Config.NodeProto, bs),
}
return c, nil
}

// Close closes the gRPC connection to the xDS server.
func (c *Client) Close() {
c.v2c.close()
c.cc.Close()
}

// ServiceUpdate contains update about the service.
type ServiceUpdate struct {
Cluster string
}

// handleLDSUpdate is the LDS watcher callback we registered with the v2Client.
func (c *Client) handleLDSUpdate(u ldsUpdate, err error) {
if err != nil {
c.mu.Lock()
if c.serviceCallback != nil {
c.serviceCallback(ServiceUpdate{}, err)
}
c.mu.Unlock()
return
}

grpclog.Infof("xds: client received LDS update: %+v, err: %v", u, err)
c.mu.Lock()
c.rdsCancel = c.v2c.watchRDS(u.routeName, c.handleRDSUpdate)
c.mu.Unlock()
}

// handleRDSUpdate is the RDS watcher callback we registered with the v2Client.
func (c *Client) handleRDSUpdate(u rdsUpdate, err error) {
if err != nil {
c.mu.Lock()
if c.serviceCallback != nil {
c.serviceCallback(ServiceUpdate{}, err)
}
c.mu.Unlock()
return
}

grpclog.Infof("xds: client received RDS update: %+v, err: %v", u, err)
c.mu.Lock()
if c.serviceCallback != nil {
c.serviceCallback(ServiceUpdate{Cluster: u.clusterName}, nil)
}
c.mu.Unlock()
}

// WatchForServiceUpdate uses LDS and RDS protocols to discover resource
// information for the provided target.
func (c *Client) WatchForServiceUpdate(target string, callback func(ServiceUpdate, error)) (cancel func()) {
c.mu.Lock()
c.serviceCallback = callback
c.ldsCancel = c.v2c.watchLDS(target, c.handleLDSUpdate)
c.mu.Unlock()

return func() {
c.mu.Lock()
c.serviceCallback = nil
if c.ldsCancel != nil {
c.ldsCancel()
}
if c.rdsCancel != nil {
c.rdsCancel()
}
c.mu.Unlock()
}
}
Loading