From 7e5898e7c569b8cc32d6eee9bc8ad1393a8fcc61 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Mon, 3 Jun 2024 15:32:58 -0700 Subject: [PATCH] xds: unify xDS client creation APIs meant for testing (#7268) --- internal/internal.go | 13 +- internal/testutils/xds/bootstrap/bootstrap.go | 16 +- internal/xds/bootstrap/bootstrap.go | 3 + test/xds/xds_server_test.go | 199 ---------- xds/googledirectpath/googlec2p_test.go | 8 +- .../cdsbalancer/cdsbalancer_security_test.go | 2 +- .../balancer/cdsbalancer/cdsbalancer_test.go | 4 +- .../e2e_test/aggregate_cluster_test.go | 14 +- .../clusterresolver/e2e_test/balancer_test.go | 2 +- .../clusterresolver/e2e_test/eds_impl_test.go | 25 +- xds/internal/resolver/xds_resolver.go | 2 +- xds/internal/resolver/xds_resolver_test.go | 12 +- xds/internal/server/listener_wrapper_test.go | 8 +- xds/internal/server/rds_handler_test.go | 2 +- xds/internal/xdsclient/authority.go | 9 +- xds/internal/xdsclient/client_new.go | 136 +++---- xds/internal/xdsclient/clientimpl_watchers.go | 1 - xds/internal/xdsclient/loadreport_test.go | 21 +- .../xdsclient/tests/authority_test.go | 25 +- .../xdsclient/tests/cds_watchers_test.go | 40 +- xds/internal/xdsclient/tests/dump_test.go | 2 +- .../xdsclient/tests/eds_watchers_test.go | 37 +- .../tests/federation_watchers_test.go | 2 +- .../xdsclient/tests/lds_watchers_test.go | 42 +-- .../xdsclient/tests/misc_watchers_test.go | 4 +- .../xdsclient/tests/rds_watchers_test.go | 40 +- .../xdsclient/tests/resource_update_test.go | 97 +++-- .../xdsclient/xdsresource/resource_type.go | 22 -- xds/server.go | 2 +- xds/server_ext_test.go | 348 ++++++++++++++++++ 30 files changed, 646 insertions(+), 492 deletions(-) create mode 100644 xds/server_ext_test.go diff --git a/internal/internal.go b/internal/internal.go index 46843a3e6d3c..5d6653986923 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -193,16 +193,9 @@ var ( ChannelzTurnOffForTesting func() - // TriggerXDSResourceNameNotFoundForTesting triggers the resource-not-found - // error for a given resource type and name. This is usually triggered when - // the associated watch timer fires. For testing purposes, having this - // function makes events more predictable than relying on timer events. - TriggerXDSResourceNameNotFoundForTesting any // func(func(xdsresource.Type, string), string, string) error - - // TriggerXDSResourceNameNotFoundClient invokes the testing xDS Client - // singleton to invoke resource not found for a resource type name and - // resource name. - TriggerXDSResourceNameNotFoundClient any // func(string, string) error + // TriggerXDSResourceNotFoundForTesting causes the provided xDS Client to + // invoke resource-not-found error for the given resource type and name. + TriggerXDSResourceNotFoundForTesting any // func(xdsclient.XDSClient, xdsresource.Type, string) error // FromOutgoingContextRaw returns the un-merged, intermediary contents of // metadata.rawMD. diff --git a/internal/testutils/xds/bootstrap/bootstrap.go b/internal/testutils/xds/bootstrap/bootstrap.go index f91ec6ae7eb2..cac946a8ee1a 100644 --- a/internal/testutils/xds/bootstrap/bootstrap.go +++ b/internal/testutils/xds/bootstrap/bootstrap.go @@ -120,11 +120,17 @@ func Contents(opts Options) ([]byte, error) { // resources with empty authority. auths := map[string]authority{"": {}} for n, auURI := range opts.Authorities { - auths[n] = authority{XdsServers: []server{{ - ServerURI: auURI, - ChannelCreds: []creds{{Type: "insecure"}}, - ServerFeatures: cfg.XdsServers[0].ServerFeatures, - }}} + // If the authority server URI is empty, set it to an empty authority + // config, resulting in it using the top-level xds server config. + a := authority{} + if auURI != "" { + a = authority{XdsServers: []server{{ + ServerURI: auURI, + ChannelCreds: []creds{{Type: "insecure"}}, + ServerFeatures: cfg.XdsServers[0].ServerFeatures, + }}} + } + auths[n] = a } cfg.Authorities = auths diff --git a/internal/xds/bootstrap/bootstrap.go b/internal/xds/bootstrap/bootstrap.go index f89f03dd9ac9..b8b92a6cb550 100644 --- a/internal/xds/bootstrap/bootstrap.go +++ b/internal/xds/bootstrap/bootstrap.go @@ -292,6 +292,9 @@ func (a *Authority) UnmarshalJSON(data []byte) error { // Config provides the xDS client with several key bits of information that it // requires in its interaction with the management server. The Config is // initialized from the bootstrap file. +// +// Users must use one of the NewConfigXxx() functions to create a Config +// instance, and not initialize it manually. type Config struct { // XDSServer is the management server to connect to. // diff --git a/test/xds/xds_server_test.go b/test/xds/xds_server_test.go index 3ae8628fc20a..f4dcac71d3c3 100644 --- a/test/xds/xds_server_test.go +++ b/test/xds/xds_server_test.go @@ -30,7 +30,6 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" @@ -224,204 +223,6 @@ func (s) TestRDSNack(t *testing.T) { waitForFailedRPCWithStatus(ctx, t, cc, status.New(codes.Unavailable, "error from xDS configuration for matched route configuration")) } -// TestResourceNotFoundRDS tests the case where an LDS points to an RDS which -// returns resource not found. Before getting the resource not found, the xDS -// Server has not received all configuration needed, so it should Accept and -// Close any new connections. After it has received the resource not found -// error, the server should move to serving, successfully Accept Connections, -// and fail at the L7 level with resource not found specified. -func (s) TestResourceNotFoundRDS(t *testing.T) { - managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{}) - defer cleanup() - lis, err := testutils.LocalTCPListener() - if err != nil { - t.Fatalf("testutils.LocalTCPListener() failed: %v", err) - } - // Setup the management server to respond with a listener resource that - // specifies a route name to watch, and no RDS resource corresponding to - // this route name. - host, port, err := hostPortFromListener(lis) - if err != nil { - t.Fatalf("failed to retrieve host and port of server: %v", err) - } - - listener := e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, "routeName") - resources := e2e.UpdateOptions{ - NodeID: nodeID, - Listeners: []*v3listenerpb.Listener{listener}, - SkipValidation: true, - } - - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - if err := managementServer.Update(ctx, resources); err != nil { - t.Fatal(err) - } - serving := grpcsync.NewEvent() - modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) { - t.Logf("serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err) - if args.Mode == connectivity.ServingModeServing { - serving.Fire() - } - }) - - server, err := xds.NewGRPCServer(grpc.Creds(insecure.NewCredentials()), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents)) - if err != nil { - t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err) - } - defer server.Stop() - testgrpc.RegisterTestServiceServer(server, &testService{}) - go func() { - if err := server.Serve(lis); err != nil { - t.Errorf("Serve() failed: %v", err) - } - }() - - cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - t.Fatalf("failed to dial local test server: %v", err) - } - defer cc.Close() - - waitForFailedRPCWithStatus(ctx, t, cc, errAcceptAndClose) - - // Invoke resource not found - this should result in L7 RPC error with - // unavailable receive on serving as a result, should trigger it to go - // serving. Poll as watch might not be started yet to trigger resource not - // found. -loop: - for { - if err := internal.TriggerXDSResourceNameNotFoundClient.(func(string, string) error)("RouteConfigResource", "routeName"); err != nil { - t.Fatalf("Failed to trigger resource name not found for testing: %v", err) - } - select { - case <-serving.Done(): - break loop - case <-ctx.Done(): - t.Fatalf("timed out waiting for serving mode to go serving") - case <-time.After(time.Millisecond): - } - } - waitForFailedRPCWithStatus(ctx, t, cc, status.New(codes.Unavailable, "error from xDS configuration for matched route configuration")) -} - -// TestServingModeChanges tests the Server's logic as it transitions from Not -// Ready to Ready, then to Not Ready. Before it goes Ready, connections should -// be accepted and closed. After it goes ready, RPC's should proceed as normal -// according to matched route configuration. After it transitions back into not -// ready (through an explicit LDS Resource Not Found), previously running RPC's -// should be gracefully closed and still work, and new RPC's should fail. -func (s) TestServingModeChanges(t *testing.T) { - managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{}) - defer cleanup() - lis, err := testutils.LocalTCPListener() - if err != nil { - t.Fatalf("testutils.LocalTCPListener() failed: %v", err) - } - // Setup the management server to respond with a listener resource that - // specifies a route name to watch. Due to not having received the full - // configuration, this should cause the server to be in mode Serving. - host, port, err := hostPortFromListener(lis) - if err != nil { - t.Fatalf("failed to retrieve host and port of server: %v", err) - } - - listener := e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, "routeName") - resources := e2e.UpdateOptions{ - NodeID: nodeID, - Listeners: []*v3listenerpb.Listener{listener}, - SkipValidation: true, - } - - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - if err := managementServer.Update(ctx, resources); err != nil { - t.Fatal(err) - } - - serving := grpcsync.NewEvent() - modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) { - t.Logf("serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err) - if args.Mode == connectivity.ServingModeServing { - serving.Fire() - } - }) - - server, err := xds.NewGRPCServer(grpc.Creds(insecure.NewCredentials()), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents)) - if err != nil { - t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err) - } - defer server.Stop() - testgrpc.RegisterTestServiceServer(server, &testService{}) - go func() { - if err := server.Serve(lis); err != nil { - t.Errorf("Serve() failed: %v", err) - } - }() - cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - t.Fatalf("failed to dial local test server: %v", err) - } - defer cc.Close() - - waitForFailedRPCWithStatus(ctx, t, cc, errAcceptAndClose) - routeConfig := e2e.RouteConfigNonForwardingAction("routeName") - resources = e2e.UpdateOptions{ - NodeID: nodeID, - Listeners: []*v3listenerpb.Listener{listener}, - Routes: []*v3routepb.RouteConfiguration{routeConfig}, - } - defer cancel() - if err := managementServer.Update(ctx, resources); err != nil { - t.Fatal(err) - } - - select { - case <-ctx.Done(): - t.Fatal("timeout waiting for the xDS Server to go Serving") - case <-serving.Done(): - } - - // A unary RPC should work once it transitions into serving. (need this same - // assertion from LDS resource not found triggering it). - waitForSuccessfulRPC(ctx, t, cc) - - // Start a stream before switching the server to not serving. Due to the - // stream being created before the graceful stop of the underlying - // connection, it should be able to continue even after the server switches - // to not serving. - c := testgrpc.NewTestServiceClient(cc) - stream, err := c.FullDuplexCall(ctx) - if err != nil { - t.Fatalf("cc.FullDuplexCall failed: %f", err) - } - - // Invoke the lds resource not found - this should cause the server to - // switch to not serving. This should gracefully drain connections, and fail - // RPC's after. (how to assert accepted + closed) does this make it's way to - // application layer? (should work outside of resource not found... - - // Invoke LDS Resource not found here (tests graceful close) - if err := internal.TriggerXDSResourceNameNotFoundClient.(func(string, string) error)("ListenerResource", listener.GetName()); err != nil { - t.Fatalf("Failed to trigger resource name not found for testing: %v", err) - } - - // New RPCs on that connection should eventually start failing. Due to - // Graceful Stop any started streams continue to work. - if err = stream.Send(&testpb.StreamingOutputCallRequest{}); err != nil { - t.Fatalf("stream.Send() failed: %v, should continue to work due to graceful stop", err) - } - if err = stream.CloseSend(); err != nil { - t.Fatalf("stream.CloseSend() failed: %v, should continue to work due to graceful stop", err) - } - if _, err = stream.Recv(); err != io.EOF { - t.Fatalf("unexpected error: %v, expected an EOF error", err) - } - - // New RPCs on that connection should eventually start failing. - waitForFailedRPCWithStatus(ctx, t, cc, errAcceptAndClose) -} - // TestMultipleUpdatesImmediatelySwitch tests the case where you get an LDS // specifying RDS A, B, and C (with A being matched to). The Server should be in // not serving until it receives all 3 RDS Configurations, and then transition diff --git a/xds/googledirectpath/googlec2p_test.go b/xds/googledirectpath/googlec2p_test.go index 879b585661d3..5483cf55d9e2 100644 --- a/xds/googledirectpath/googlec2p_test.go +++ b/xds/googledirectpath/googlec2p_test.go @@ -26,7 +26,6 @@ import ( "time" "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal/envconfig" @@ -232,14 +231,9 @@ func TestBuildXDS(t *testing.T) { if tt.tdURI != "" { wantConfig.XDSServer.ServerURI = tt.tdURI } - cmpOpts := cmp.Options{ - cmpopts.IgnoreFields(bootstrap.ServerConfig{}, "Creds"), - cmp.AllowUnexported(bootstrap.ServerConfig{}), - protocmp.Transform(), - } select { case gotConfig := <-configCh: - if diff := cmp.Diff(wantConfig, gotConfig, cmpOpts); diff != "" { + if diff := cmp.Diff(wantConfig, gotConfig, protocmp.Transform()); diff != "" { t.Fatalf("Unexpected diff in bootstrap config (-want +got):\n%s", diff) } case <-time.After(time.Second): diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_security_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_security_test.go index f0de02127e3b..05931951d10d 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_security_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_security_test.go @@ -137,7 +137,7 @@ func registerWrappedCDSPolicyWithNewSubConnOverride(t *testing.T, ch chan *xdscr func setupForSecurityTests(t *testing.T, bootstrapContents []byte, clientCreds, serverCreds credentials.TransportCredentials) (*grpc.ClientConn, string) { t.Helper() - xdsClient, xdsClose, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + xdsClient, xdsClose, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go index bdfb072531b7..d9294092d0aa 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go @@ -228,7 +228,7 @@ func setupWithManagementServer(t *testing.T) (*e2e.ManagementServer, string, *gr }) t.Cleanup(cleanup) - xdsC, xdsClose, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + xdsC, xdsClose, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -344,7 +344,7 @@ func (s) TestConfigurationUpdate_EmptyCluster(t *testing.T) { // Setup a management server and an xDS client to talk to it. _, _, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{}) t.Cleanup(cleanup) - xdsClient, xdsClose, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + xdsClient, xdsClose, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } diff --git a/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go b/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go index 3b2dead52165..c7d9a096039d 100644 --- a/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go +++ b/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go @@ -35,19 +35,16 @@ import ( "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils/pickfirst" "google.golang.org/grpc/internal/testutils/xds/e2e" - "google.golang.org/grpc/internal/xds/bootstrap" "google.golang.org/grpc/peer" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" "google.golang.org/grpc/serviceconfig" "google.golang.org/grpc/status" - xdstestutils "google.golang.org/grpc/xds/internal/testutils" "google.golang.org/grpc/xds/internal/xdsclient" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version" "google.golang.org/protobuf/types/known/wrapperspb" v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" - v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" testgrpc "google.golang.org/grpc/interop/grpc_testing" @@ -1107,12 +1104,13 @@ func (s) TestAggregateCluster_Fallback_EDS_ResourceNotFound(t *testing.T) { // Create an xDS client talking to the above management server, configured // with a short watch expiry timeout. - xdsClient, close, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{ - XDSServer: xdstestutils.ServerConfigForAddress(t, mgmtServer.Address), - NodeProto: &v3corepb.Node{Id: nodeID}, - }, defaultTestWatchExpiryTimeout, time.Duration(0)) + bc, err := e2e.DefaultBootstrapContents(nodeID, mgmtServer.Address) if err != nil { - t.Fatalf("failed to create xds client: %v", err) + t.Fatalf("Failed to create bootstrap configuration: %v", err) + } + xdsClient, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc, WatchExpiryTimeout: defaultTestWatchExpiryTimeout}) + if err != nil { + t.Fatalf("Failed to create an xDS client: %v", err) } defer close() diff --git a/xds/internal/balancer/clusterresolver/e2e_test/balancer_test.go b/xds/internal/balancer/clusterresolver/e2e_test/balancer_test.go index 34ce0fe8c4d0..e04dbe1c36ca 100644 --- a/xds/internal/balancer/clusterresolver/e2e_test/balancer_test.go +++ b/xds/internal/balancer/clusterresolver/e2e_test/balancer_test.go @@ -72,7 +72,7 @@ func setupAndDial(t *testing.T, bootstrapContents []byte) (*grpc.ClientConn, fun t.Helper() // Create an xDS client for use by the cluster_resolver LB policy. - xdsC, xdsClose, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + xdsC, xdsClose, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } diff --git a/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go b/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go index 4b7c67ea014b..207993c29834 100644 --- a/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go +++ b/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go @@ -35,13 +35,11 @@ import ( "google.golang.org/grpc/internal/testutils" rrutil "google.golang.org/grpc/internal/testutils/roundrobin" "google.golang.org/grpc/internal/testutils/xds/e2e" - "google.golang.org/grpc/internal/xds/bootstrap" "google.golang.org/grpc/peer" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" "google.golang.org/grpc/serviceconfig" "google.golang.org/grpc/status" - xdstestutils "google.golang.org/grpc/xds/internal/testutils" "google.golang.org/grpc/xds/internal/xdsclient" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version" "google.golang.org/protobuf/types/known/wrapperspb" @@ -149,7 +147,7 @@ func (s) TestEDS_OneLocality(t *testing.T) { } // Create an xDS client for use by the cluster_resolver LB policy. - client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -278,7 +276,7 @@ func (s) TestEDS_MultipleLocalities(t *testing.T) { } // Create an xDS client for use by the cluster_resolver LB policy. - client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -436,7 +434,7 @@ func (s) TestEDS_EndpointsHealth(t *testing.T) { } // Create an xDS client for use by the cluster_resolver LB policy. - client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -503,7 +501,7 @@ func (s) TestEDS_EmptyUpdate(t *testing.T) { } // Create an xDS client for use by the cluster_resolver LB policy. - client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -887,7 +885,7 @@ func (s) TestEDS_BadUpdateWithoutPreviousGoodUpdate(t *testing.T) { } // Create an xDS client for use by the cluster_resolver LB policy. - xdsClient, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + xdsClient, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -953,7 +951,7 @@ func (s) TestEDS_BadUpdateWithPreviousGoodUpdate(t *testing.T) { } // Create an xDS client for use by the cluster_resolver LB policy. - xdsClient, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + xdsClient, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -1025,12 +1023,13 @@ func (s) TestEDS_ResourceNotFound(t *testing.T) { // Create an xDS client talking to the above management server, configured // with a short watch expiry timeout. nodeID := uuid.New().String() - xdsClient, close, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{ - XDSServer: xdstestutils.ServerConfigForAddress(t, mgmtServer.Address), - NodeProto: &v3corepb.Node{Id: nodeID}, - }, defaultTestWatchExpiryTimeout, time.Duration(0)) + bc, err := e2e.DefaultBootstrapContents(nodeID, mgmtServer.Address) if err != nil { - t.Fatalf("failed to create xds client: %v", err) + t.Fatalf("Failed to create bootstrap configuration: %v", err) + } + xdsClient, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc, WatchExpiryTimeout: defaultTestWatchExpiryTimeout}) + if err != nil { + t.Fatalf("Failed to create an xDS client: %v", err) } defer close() diff --git a/xds/internal/resolver/xds_resolver.go b/xds/internal/resolver/xds_resolver.go index 19061261f479..40dd97267811 100644 --- a/xds/internal/resolver/xds_resolver.go +++ b/xds/internal/resolver/xds_resolver.go @@ -50,7 +50,7 @@ const Scheme = "xds" func newBuilderForTesting(config []byte) (resolver.Builder, error) { return &xdsResolverBuilder{ newXDSClient: func() (xdsclient.XDSClient, func(), error) { - return xdsclient.NewWithBootstrapContentsForTesting(config) + return xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: config}) }, }, nil } diff --git a/xds/internal/resolver/xds_resolver_test.go b/xds/internal/resolver/xds_resolver_test.go index a9780493f394..fb3560d65453 100644 --- a/xds/internal/resolver/xds_resolver_test.go +++ b/xds/internal/resolver/xds_resolver_test.go @@ -36,7 +36,6 @@ import ( "google.golang.org/grpc/internal/testutils" xdsbootstrap "google.golang.org/grpc/internal/testutils/xds/bootstrap" "google.golang.org/grpc/internal/testutils/xds/e2e" - "google.golang.org/grpc/internal/xds/bootstrap" "google.golang.org/grpc/metadata" "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" @@ -46,7 +45,6 @@ import ( "google.golang.org/grpc/xds/internal/httpfilter" xdsresolver "google.golang.org/grpc/xds/internal/resolver" rinternal "google.golang.org/grpc/xds/internal/resolver/internal" - xdstestutils "google.golang.org/grpc/xds/internal/testutils" "google.golang.org/grpc/xds/internal/xdsclient" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version" "google.golang.org/protobuf/proto" @@ -257,17 +255,17 @@ func (s) TestResolverWatchCallbackAfterClose(t *testing.T) { // Tests that the xDS resolver's Close method closes the xDS client. func (s) TestResolverCloseClosesXDSClient(t *testing.T) { - bootstrapCfg := &bootstrap.Config{ - XDSServer: xdstestutils.ServerConfigForAddress(t, "dummy-management-server-address"), - } - // Override xDS client creation to use bootstrap configuration pointing to a // dummy management server. Also close a channel when the returned xDS // client is closed. origNewClient := rinternal.NewXDSClient closeCh := make(chan struct{}) rinternal.NewXDSClient = func() (xdsclient.XDSClient, func(), error) { - c, cancel, err := xdsclient.NewWithConfigForTesting(bootstrapCfg, defaultTestTimeout, defaultTestTimeout) + bc, err := e2e.DefaultBootstrapContents(uuid.New().String(), "dummy-management-server-address") + if err != nil { + t.Fatalf("Failed to create bootstrap configuration: %v", err) + } + c, cancel, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc, WatchExpiryTimeout: defaultTestTimeout}) return c, grpcsync.OnceFunc(func() { close(closeCh) cancel() diff --git a/xds/internal/server/listener_wrapper_test.go b/xds/internal/server/listener_wrapper_test.go index 2ba7efa28861..1fc0dd49b23c 100644 --- a/xds/internal/server/listener_wrapper_test.go +++ b/xds/internal/server/listener_wrapper_test.go @@ -29,6 +29,10 @@ import ( "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" + xdsinternal "google.golang.org/grpc/xds/internal" + "google.golang.org/grpc/xds/internal/xdsclient" + "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" + "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version" v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" @@ -134,7 +138,9 @@ func (s) TestListenerWrapper(t *testing.T) { } // Invoke lds resource not found - should go back to non serving. - if err := internal.TriggerXDSResourceNameNotFoundClient.(func(string, string) error)("ListenerResource", listener.GetName()); err != nil { + triggerResourceNotFound := internal.TriggerXDSResourceNotFoundForTesting.(func(xdsclient.XDSClient, xdsresource.Type, string) error) + listenerResourceType := xdsinternal.ResourceTypeMapForTesting[version.V3ListenerURL].(xdsresource.Type) + if err := triggerResourceNotFound(xdsC, listenerResourceType, listener.GetName()); err != nil { t.Fatalf("Failed to trigger resource name not found for testing: %v", err) } select { diff --git a/xds/internal/server/rds_handler_test.go b/xds/internal/server/rds_handler_test.go index 81b59cabc223..faaa62cd98e7 100644 --- a/xds/internal/server/rds_handler_test.go +++ b/xds/internal/server/rds_handler_test.go @@ -107,7 +107,7 @@ func xdsSetupForTests(t *testing.T) (*e2e.ManagementServer, string, chan []strin }) t.Cleanup(cleanup) - xdsC, cancel, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + xdsC, cancel, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) if err != nil { t.Fatal(err) } diff --git a/xds/internal/xdsclient/authority.go b/xds/internal/xdsclient/authority.go index c855234bb511..b0763a024031 100644 --- a/xds/internal/xdsclient/authority.go +++ b/xds/internal/xdsclient/authority.go @@ -370,7 +370,9 @@ func (a *authority) startWatchTimersLocked(rType xdsresource.Type, resourceNames continue } state.wTimer = time.AfterFunc(a.watchExpiryTimeout, func() { - a.handleWatchTimerExpiry(rType, resourceName, state) + a.resourcesMu.Lock() + a.handleWatchTimerExpiryLocked(rType, resourceName, state) + a.resourcesMu.Unlock() }) state.wState = watchStateRequested } @@ -514,10 +516,7 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w } } -func (a *authority) handleWatchTimerExpiry(rType xdsresource.Type, resourceName string, state *resourceState) { - a.resourcesMu.Lock() - defer a.resourcesMu.Unlock() - +func (a *authority) handleWatchTimerExpiryLocked(rType xdsresource.Type, resourceName string, state *resourceState) { if a.closed { return } diff --git a/xds/internal/xdsclient/client_new.go b/xds/internal/xdsclient/client_new.go index 81c14e2439fd..8dec8f34b209 100644 --- a/xds/internal/xdsclient/client_new.go +++ b/xds/internal/xdsclient/client_new.go @@ -24,7 +24,6 @@ import ( "encoding/json" "fmt" "sync" - "sync/atomic" "time" "google.golang.org/grpc/internal" @@ -49,7 +48,12 @@ func New() (XDSClient, func(), error) { return newRefCountedWithConfig(nil) } -// NewWithConfig returns a new xDS client configured by the given config. +// NewWithConfig is similar to New, except that it uses the provided bootstrap +// configuration to create the xDS client if and only if the bootstrap +// environment variables are not defined. +// +// The returned client is a reference counted singleton instance. This function +// creates a new client only when one doesn't already exist. // // The second return value represents a close function which releases the // caller's reference on the returned client. The caller is expected to invoke @@ -57,10 +61,10 @@ func New() (XDSClient, func(), error) { // only when all references are released, and it is safe for the caller to // invoke this close function multiple times. // -// # Internal/Testing Only +// # Internal Only // -// This function should ONLY be used for internal (c2p resolver) and/or testing -// purposese. DO NOT use this elsewhere. Use New() instead. +// This function should ONLY be used by the internal google-c2p resolver. +// DO NOT use this elsewhere. Use New() instead. func NewWithConfig(config *bootstrap.Config) (XDSClient, func(), error) { return newRefCountedWithConfig(config) } @@ -84,38 +88,23 @@ func newWithConfig(config *bootstrap.Config, watchExpiryTimeout time.Duration, i return c, nil } -// NewWithConfigForTesting returns an xDS client for the specified bootstrap -// config, separate from the global singleton. -// -// The second return value represents a close function which the caller is -// expected to invoke once they are done using the client. It is safe for the -// caller to invoke this close function multiple times. -// -// # Testing Only -// -// This function should ONLY be used for testing purposes. -// TODO(easwars): Document the new close func. -func NewWithConfigForTesting(config *bootstrap.Config, watchExpiryTimeout, authorityIdleTimeout time.Duration) (XDSClient, func(), error) { - cl, err := newWithConfig(config, watchExpiryTimeout, authorityIdleTimeout) - if err != nil { - return nil, nil, err - } - return cl, grpcsync.OnceFunc(cl.close), nil -} +// OptionsForTesting contains options to configure xDS client creation for +// testing purposes only. +type OptionsForTesting struct { + // Contents contain a JSON representation of the bootstrap configuration to + // be used when creating the xDS client. + Contents []byte -func init() { - internal.TriggerXDSResourceNameNotFoundClient = triggerXDSResourceNameNotFoundClient -} - -var singletonClientForTesting = atomic.Pointer[clientRefCounted]{} + // WatchExpiryTimeout is the timeout for xDS resource watch expiry. If + // unspecified, uses the default value used in non-test code. + WatchExpiryTimeout time.Duration -func triggerXDSResourceNameNotFoundClient(resourceType, resourceName string) error { - c := singletonClientForTesting.Load() - return internal.TriggerXDSResourceNameNotFoundForTesting.(func(func(xdsresource.Type, string) error, string, string) error)(c.clientImpl.triggerResourceNotFoundForTesting, resourceType, resourceName) + // AuthorityIdleTimeout is the timeout before idle authorities are deleted. + // If unspecified, uses the default value used in non-test code. + AuthorityIdleTimeout time.Duration } -// NewWithBootstrapContentsForTesting returns an xDS client for this config, -// separate from the global singleton. +// NewForTesting returns an xDS client configured with the provided options. // // The second return value represents a close function which the caller is // expected to invoke once they are done using the client. It is safe for the @@ -124,56 +113,69 @@ func triggerXDSResourceNameNotFoundClient(resourceType, resourceName string) err // # Testing Only // // This function should ONLY be used for testing purposes. -func NewWithBootstrapContentsForTesting(contents []byte) (XDSClient, func(), error) { - // Normalize the contents +func NewForTesting(opts OptionsForTesting) (XDSClient, func(), error) { + if opts.WatchExpiryTimeout == 0 { + opts.WatchExpiryTimeout = defaultWatchExpiryTimeout + } + if opts.AuthorityIdleTimeout == 0 { + opts.AuthorityIdleTimeout = defaultIdleAuthorityDeleteTimeout + } + + // Normalize the input configuration, as this is used as the key in the map + // of xDS clients created for testing. buf := bytes.Buffer{} - err := json.Indent(&buf, contents, "", "") + err := json.Indent(&buf, opts.Contents, "", "") if err != nil { return nil, nil, fmt.Errorf("xds: error normalizing JSON: %v", err) } - contents = bytes.TrimSpace(buf.Bytes()) + opts.Contents = bytes.TrimSpace(buf.Bytes()) - c, err := getOrMakeClientForTesting(contents) - if err != nil { - return nil, nil, err - } - singletonClientForTesting.Store(c) - return c, grpcsync.OnceFunc(func() { + clientsMu.Lock() + defer clientsMu.Unlock() + + var client *clientRefCounted + closeFunc := grpcsync.OnceFunc(func() { clientsMu.Lock() defer clientsMu.Unlock() - if c.decrRef() == 0 { - c.close() - delete(clients, string(contents)) - singletonClientForTesting.Store(nil) + if client.decrRef() == 0 { + client.close() + delete(clients, string(opts.Contents)) } - }), nil -} - -// getOrMakeClientForTesting creates a new reference counted client (separate -// from the global singleton) for the given config, or returns an existing one. -// It takes care of incrementing the reference count for the returned client, -// and leaves the caller responsible for decrementing the reference count once -// the client is no longer needed. -func getOrMakeClientForTesting(config []byte) (*clientRefCounted, error) { - clientsMu.Lock() - defer clientsMu.Unlock() + }) - if c := clients[string(config)]; c != nil { + // If an xDS client exists for the given configuration, increment its + // reference count and return it. + if c := clients[string(opts.Contents)]; c != nil { c.incrRef() - return c, nil + client = c + return c, closeFunc, nil } - bcfg, err := bootstrap.NewConfigFromContents(config) + // Create a new xDS client for the given configuration + bcfg, err := bootstrap.NewConfigFromContents(opts.Contents) if err != nil { - return nil, fmt.Errorf("bootstrap config %s: %v", string(config), err) + return nil, nil, fmt.Errorf("bootstrap config %s: %v", string(opts.Contents), err) } - cImpl, err := newWithConfig(bcfg, defaultWatchExpiryTimeout, defaultIdleAuthorityDeleteTimeout) + cImpl, err := newWithConfig(bcfg, opts.WatchExpiryTimeout, opts.AuthorityIdleTimeout) if err != nil { - return nil, fmt.Errorf("creating xDS client: %v", err) + return nil, nil, fmt.Errorf("creating xDS client: %v", err) } - c := &clientRefCounted{clientImpl: cImpl, refCount: 1} - clients[string(config)] = c - return c, nil + client = &clientRefCounted{clientImpl: cImpl, refCount: 1} + clients[string(opts.Contents)] = client + + return client, closeFunc, nil +} + +func init() { + internal.TriggerXDSResourceNotFoundForTesting = triggerXDSResourceNotFoundForTesting +} + +func triggerXDSResourceNotFoundForTesting(client XDSClient, typ xdsresource.Type, name string) error { + crc, ok := client.(*clientRefCounted) + if !ok { + return fmt.Errorf("xDS client is of type %T, want %T", client, &clientRefCounted{}) + } + return crc.clientImpl.triggerResourceNotFoundForTesting(typ, name) } var ( diff --git a/xds/internal/xdsclient/clientimpl_watchers.go b/xds/internal/xdsclient/clientimpl_watchers.go index f64124dad643..22b8eb0107c9 100644 --- a/xds/internal/xdsclient/clientimpl_watchers.go +++ b/xds/internal/xdsclient/clientimpl_watchers.go @@ -96,7 +96,6 @@ func (r *resourceTypeRegistry) maybeRegister(rType xdsresource.Type) error { } func (c *clientImpl) triggerResourceNotFoundForTesting(rType xdsresource.Type, resourceName string) error { - // Return early if the client is already closed. if c == nil || c.done.HasFired() { return fmt.Errorf("attempt to trigger resource-not-found-error for resource %q of type %q, but client is closed", rType.TypeName(), resourceName) } diff --git a/xds/internal/xdsclient/loadreport_test.go b/xds/internal/xdsclient/loadreport_test.go index a201a5a5dee2..42037243bfae 100644 --- a/xds/internal/xdsclient/loadreport_test.go +++ b/xds/internal/xdsclient/loadreport_test.go @@ -21,26 +21,21 @@ package xdsclient import ( "context" "testing" - "time" "github.com/google/go-cmp/cmp" + "github.com/google/uuid" "google.golang.org/grpc/codes" + "google.golang.org/grpc/internal/testutils/xds/e2e" "google.golang.org/grpc/internal/testutils/xds/fakeserver" - "google.golang.org/grpc/internal/xds/bootstrap" "google.golang.org/grpc/status" xdstestutils "google.golang.org/grpc/xds/internal/testutils" "google.golang.org/protobuf/testing/protocmp" - v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" v3lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v3" "google.golang.org/protobuf/types/known/durationpb" ) -const ( - defaultClientWatchExpiryTimeout = 15 * time.Second -) - func (s) TestLRSClient(t *testing.T) { fs1, sCleanup, err := fakeserver.StartServer(nil) if err != nil { @@ -48,13 +43,15 @@ func (s) TestLRSClient(t *testing.T) { } defer sCleanup() + nodeID := uuid.New().String() serverCfg1 := xdstestutils.ServerConfigForAddress(t, fs1.Address) - xdsC, close, err := NewWithConfigForTesting(&bootstrap.Config{ - XDSServer: serverCfg1, - NodeProto: &v3corepb.Node{}, - }, defaultClientWatchExpiryTimeout, time.Duration(0)) + bc, err := e2e.DefaultBootstrapContents(nodeID, fs1.Address) + if err != nil { + t.Fatalf("Failed to create bootstrap configuration: %v", err) + } + xdsC, close, err := NewForTesting(OptionsForTesting{Contents: bc, WatchExpiryTimeout: defaultTestWatchExpiryTimeout}) if err != nil { - t.Fatalf("failed to create xds client: %v", err) + t.Fatalf("Failed to create an xDS client: %v", err) } defer close() diff --git a/xds/internal/xdsclient/tests/authority_test.go b/xds/internal/xdsclient/tests/authority_test.go index cfc0be63037a..24be1d4c275e 100644 --- a/xds/internal/xdsclient/tests/authority_test.go +++ b/xds/internal/xdsclient/tests/authority_test.go @@ -25,14 +25,13 @@ import ( "github.com/google/uuid" "google.golang.org/grpc/internal/testutils" + testbootstrap "google.golang.org/grpc/internal/testutils/xds/bootstrap" "google.golang.org/grpc/internal/testutils/xds/e2e" - "google.golang.org/grpc/internal/xds/bootstrap" xdstestutils "google.golang.org/grpc/xds/internal/testutils" "google.golang.org/grpc/xds/internal/xdsclient" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" - v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" ) const ( @@ -89,17 +88,21 @@ func setupForAuthorityTests(ctx context.Context, t *testing.T, idleTimeout time. // have empty server configs, and therefore end up using the default server // config, which points to the above management server. nodeID := uuid.New().String() - client, close, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{ - XDSServer: xdstestutils.ServerConfigForAddress(t, defaultAuthorityServer.Address), - NodeProto: &v3corepb.Node{Id: nodeID}, - Authorities: map[string]*bootstrap.Authority{ - testAuthority1: {}, - testAuthority2: {}, - testAuthority3: {XDSServer: xdstestutils.ServerConfigForAddress(t, nonDefaultAuthorityServer.Address)}, + bootstrapContents, err := testbootstrap.Contents(testbootstrap.Options{ + NodeID: nodeID, + ServerURI: defaultAuthorityServer.Address, + Authorities: map[string]string{ + testAuthority1: "", + testAuthority2: "", + testAuthority3: nonDefaultAuthorityServer.Address, }, - }, defaultTestWatchExpiryTimeout, idleTimeout) + }) if err != nil { - t.Fatalf("failed to create xds client: %v", err) + t.Fatalf("Failed to create bootstrap configuration: %v", err) + } + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents, WatchExpiryTimeout: defaultTestWatchExpiryTimeout, AuthorityIdleTimeout: idleTimeout}) + if err != nil { + t.Fatalf("Failed to create an xDS client: %v", err) } resources := e2e.UpdateOptions{ diff --git a/xds/internal/xdsclient/tests/cds_watchers_test.go b/xds/internal/xdsclient/tests/cds_watchers_test.go index f48a74451672..ace15d8df97c 100644 --- a/xds/internal/xdsclient/tests/cds_watchers_test.go +++ b/xds/internal/xdsclient/tests/cds_watchers_test.go @@ -31,8 +31,6 @@ import ( "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" - "google.golang.org/grpc/internal/xds/bootstrap" - xdstestutils "google.golang.org/grpc/xds/internal/testutils" "google.golang.org/grpc/xds/internal/xdsclient" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" @@ -180,7 +178,7 @@ func (s) TestCDSWatch(t *testing.T) { defer cleanup() // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -307,7 +305,7 @@ func (s) TestCDSWatch_TwoWatchesForSameResourceName(t *testing.T) { defer cleanup() // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -387,7 +385,7 @@ func (s) TestCDSWatch_ThreeWatchesForDifferentResourceNames(t *testing.T) { defer cleanup() // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -477,7 +475,7 @@ func (s) TestCDSWatch_ResourceCaching(t *testing.T) { defer cleanup() // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -547,12 +545,13 @@ func (s) TestCDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) { } defer mgmtServer.Stop() - client, close, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{ - XDSServer: xdstestutils.ServerConfigForAddress(t, mgmtServer.Address), - NodeProto: &v3corepb.Node{}, - }, defaultTestWatchExpiryTimeout, time.Duration(0)) + bc, err := e2e.DefaultBootstrapContents("", mgmtServer.Address) if err != nil { - t.Fatalf("failed to create xds client: %v", err) + t.Fatalf("Failed to create bootstrap configuration: %v", err) + } + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc, WatchExpiryTimeout: defaultTestWatchExpiryTimeout}) + if err != nil { + t.Fatalf("Failed to create an xDS client: %v", err) } defer close() @@ -587,12 +586,13 @@ func (s) TestCDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) { // Create an xDS client talking to the above management server. nodeID := uuid.New().String() - client, close, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{ - XDSServer: xdstestutils.ServerConfigForAddress(t, mgmtServer.Address), - NodeProto: &v3corepb.Node{Id: nodeID}, - }, defaultTestWatchExpiryTimeout, time.Duration(0)) + bc, err := e2e.DefaultBootstrapContents(nodeID, mgmtServer.Address) + if err != nil { + t.Fatalf("Failed to create bootstrap configuration: %v", err) + } + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc, WatchExpiryTimeout: defaultTestWatchExpiryTimeout}) if err != nil { - t.Fatalf("failed to create xds client: %v", err) + t.Fatalf("Failed to create an xDS client: %v", err) } defer close() @@ -650,7 +650,7 @@ func (s) TestCDSWatch_ResourceRemoved(t *testing.T) { defer cleanup() // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -754,7 +754,7 @@ func (s) TestCDSWatch_NACKError(t *testing.T) { defer cleanup() // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -800,7 +800,7 @@ func (s) TestCDSWatch_PartialValid(t *testing.T) { defer cleanup() // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -869,7 +869,7 @@ func (s) TestCDSWatch_PartialResponse(t *testing.T) { defer cleanup() // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } diff --git a/xds/internal/xdsclient/tests/dump_test.go b/xds/internal/xdsclient/tests/dump_test.go index 91e88c8ae5e8..3b93b11e9e52 100644 --- a/xds/internal/xdsclient/tests/dump_test.go +++ b/xds/internal/xdsclient/tests/dump_test.go @@ -74,7 +74,7 @@ func (s) TestDumpResources(t *testing.T) { defer cleanup() // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } diff --git a/xds/internal/xdsclient/tests/eds_watchers_test.go b/xds/internal/xdsclient/tests/eds_watchers_test.go index e1adf56ca0fa..48179137436f 100644 --- a/xds/internal/xdsclient/tests/eds_watchers_test.go +++ b/xds/internal/xdsclient/tests/eds_watchers_test.go @@ -31,14 +31,11 @@ import ( "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" - "google.golang.org/grpc/internal/xds/bootstrap" "google.golang.org/grpc/xds/internal" - xdstestutils "google.golang.org/grpc/xds/internal/testutils" "google.golang.org/grpc/xds/internal/xdsclient" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" "google.golang.org/protobuf/types/known/wrapperspb" - v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" ) @@ -212,7 +209,7 @@ func (s) TestEDSWatch(t *testing.T) { defer cleanup() // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -379,7 +376,7 @@ func (s) TestEDSWatch_TwoWatchesForSameResourceName(t *testing.T) { defer cleanup() // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -460,7 +457,7 @@ func (s) TestEDSWatch_ThreeWatchesForDifferentResourceNames(t *testing.T) { defer cleanup() // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -555,7 +552,7 @@ func (s) TestEDSWatch_ResourceCaching(t *testing.T) { defer cleanup() // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -636,12 +633,13 @@ func (s) TestEDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) { } defer mgmtServer.Stop() - client, close, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{ - XDSServer: xdstestutils.ServerConfigForAddress(t, mgmtServer.Address), - NodeProto: &v3corepb.Node{}, - }, defaultTestWatchExpiryTimeout, time.Duration(0)) + bc, err := e2e.DefaultBootstrapContents("", mgmtServer.Address) if err != nil { - t.Fatalf("failed to create xds client: %v", err) + t.Fatalf("Failed to create bootstrap configuration: %v", err) + } + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc, WatchExpiryTimeout: defaultTestWatchExpiryTimeout}) + if err != nil { + t.Fatalf("Failed to create an xDS client: %v", err) } defer close() @@ -676,12 +674,13 @@ func (s) TestEDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) { // Create an xDS client talking to the above management server. nodeID := uuid.New().String() - client, close, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{ - XDSServer: xdstestutils.ServerConfigForAddress(t, mgmtServer.Address), - NodeProto: &v3corepb.Node{Id: nodeID}, - }, defaultTestWatchExpiryTimeout, time.Duration(0)) + bc, err := e2e.DefaultBootstrapContents(nodeID, mgmtServer.Address) + if err != nil { + t.Fatalf("Failed to create bootstrap configuration: %v", err) + } + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc, WatchExpiryTimeout: defaultTestWatchExpiryTimeout}) if err != nil { - t.Fatalf("failed to create xds client: %v", err) + t.Fatalf("Failed to create an xDS client: %v", err) } defer close() @@ -741,7 +740,7 @@ func (s) TestEDSWatch_NACKError(t *testing.T) { defer cleanup() // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -787,7 +786,7 @@ func (s) TestEDSWatch_PartialValid(t *testing.T) { defer cleanup() // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } diff --git a/xds/internal/xdsclient/tests/federation_watchers_test.go b/xds/internal/xdsclient/tests/federation_watchers_test.go index f7e533182441..bd55dcae0b37 100644 --- a/xds/internal/xdsclient/tests/federation_watchers_test.go +++ b/xds/internal/xdsclient/tests/federation_watchers_test.go @@ -69,7 +69,7 @@ func setupForFederationWatchersTest(t *testing.T) (*e2e.ManagementServer, string t.Fatalf("Failed to create bootstrap file: %v", err) } // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } diff --git a/xds/internal/xdsclient/tests/lds_watchers_test.go b/xds/internal/xdsclient/tests/lds_watchers_test.go index bbab416158ee..2c276d373197 100644 --- a/xds/internal/xdsclient/tests/lds_watchers_test.go +++ b/xds/internal/xdsclient/tests/lds_watchers_test.go @@ -33,12 +33,9 @@ import ( "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" - "google.golang.org/grpc/internal/xds/bootstrap" - xdstestutils "google.golang.org/grpc/xds/internal/testutils" "google.golang.org/grpc/xds/internal/xdsclient" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" - v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" v3routerpb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3" v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" @@ -226,7 +223,7 @@ func (s) TestLDSWatch(t *testing.T) { defer cleanup() // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -353,7 +350,7 @@ func (s) TestLDSWatch_TwoWatchesForSameResourceName(t *testing.T) { defer cleanup() // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -434,7 +431,7 @@ func (s) TestLDSWatch_ThreeWatchesForDifferentResourceNames(t *testing.T) { defer cleanup() // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -519,7 +516,7 @@ func (s) TestLDSWatch_ResourceCaching(t *testing.T) { defer cleanup() // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -589,12 +586,14 @@ func (s) TestLDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) { } defer mgmtServer.Stop() - client, close, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{ - XDSServer: xdstestutils.ServerConfigForAddress(t, mgmtServer.Address), - NodeProto: &v3corepb.Node{}, - }, defaultTestWatchExpiryTimeout, time.Duration(0)) + nodeID := uuid.New().String() + bc, err := e2e.DefaultBootstrapContents(nodeID, mgmtServer.Address) + if err != nil { + t.Fatalf("Failed to create bootstrap configuration: %v", err) + } + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc, WatchExpiryTimeout: defaultTestWatchExpiryTimeout}) if err != nil { - t.Fatalf("failed to create xds client: %v", err) + t.Fatalf("Failed to create an xDS client: %v", err) } defer close() @@ -629,12 +628,13 @@ func (s) TestLDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) { // Create an xDS client talking to the above management server. nodeID := uuid.New().String() - client, close, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{ - XDSServer: xdstestutils.ServerConfigForAddress(t, mgmtServer.Address), - NodeProto: &v3corepb.Node{Id: nodeID}, - }, defaultTestWatchExpiryTimeout, time.Duration(0)) + bc, err := e2e.DefaultBootstrapContents(nodeID, mgmtServer.Address) + if err != nil { + t.Fatalf("Failed to create bootstrap configuration: %v", err) + } + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc, WatchExpiryTimeout: defaultTestWatchExpiryTimeout}) if err != nil { - t.Fatalf("failed to create xds client: %v", err) + t.Fatalf("Failed to create an xDS client: %v", err) } defer close() @@ -692,7 +692,7 @@ func (s) TestLDSWatch_ResourceRemoved(t *testing.T) { defer cleanup() // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -795,7 +795,7 @@ func (s) TestLDSWatch_NACKError(t *testing.T) { defer cleanup() // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -841,7 +841,7 @@ func (s) TestLDSWatch_PartialValid(t *testing.T) { defer cleanup() // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -911,7 +911,7 @@ func (s) TestLDSWatch_PartialResponse(t *testing.T) { defer cleanup() // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } diff --git a/xds/internal/xdsclient/tests/misc_watchers_test.go b/xds/internal/xdsclient/tests/misc_watchers_test.go index d6f7bc9e6484..511856486c41 100644 --- a/xds/internal/xdsclient/tests/misc_watchers_test.go +++ b/xds/internal/xdsclient/tests/misc_watchers_test.go @@ -102,7 +102,7 @@ func (s) TestWatchCallAnotherWatch(t *testing.T) { defer cleanup() // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -213,7 +213,7 @@ func (s) TestNodeProtoSentOnlyInFirstRequest(t *testing.T) { } // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } diff --git a/xds/internal/xdsclient/tests/rds_watchers_test.go b/xds/internal/xdsclient/tests/rds_watchers_test.go index 51508a985189..3c29e7da8c23 100644 --- a/xds/internal/xdsclient/tests/rds_watchers_test.go +++ b/xds/internal/xdsclient/tests/rds_watchers_test.go @@ -31,13 +31,10 @@ import ( "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" - "google.golang.org/grpc/internal/xds/bootstrap" - xdstestutils "google.golang.org/grpc/xds/internal/testutils" "google.golang.org/grpc/xds/internal/xdsclient" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" "google.golang.org/protobuf/types/known/wrapperspb" - v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" ) @@ -214,7 +211,7 @@ func (s) TestRDSWatch(t *testing.T) { defer cleanup() // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -381,7 +378,7 @@ func (s) TestRDSWatch_TwoWatchesForSameResourceName(t *testing.T) { defer cleanup() // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -462,7 +459,7 @@ func (s) TestRDSWatch_ThreeWatchesForDifferentResourceNames(t *testing.T) { defer cleanup() // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -557,7 +554,7 @@ func (s) TestRDSWatch_ResourceCaching(t *testing.T) { defer cleanup() // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -637,13 +634,15 @@ func (s) TestRDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) { } defer mgmtServer.Stop() - // Create an xDS client talking to a non-existent management server. - client, close, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{ - XDSServer: xdstestutils.ServerConfigForAddress(t, mgmtServer.Address), - NodeProto: &v3corepb.Node{}, - }, defaultTestWatchExpiryTimeout, time.Duration(0)) + // Create an xDS client talking to the above management server. + nodeID := uuid.New().String() + bc, err := e2e.DefaultBootstrapContents(nodeID, mgmtServer.Address) + if err != nil { + t.Fatalf("Failed to create bootstrap configuration: %v", err) + } + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc, WatchExpiryTimeout: defaultTestWatchExpiryTimeout}) if err != nil { - t.Fatalf("failed to create xds client: %v", err) + t.Fatalf("Failed to create an xDS client: %v", err) } defer close() @@ -678,12 +677,13 @@ func (s) TestRDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) { // Create an xDS client talking to the above management server. nodeID := uuid.New().String() - client, close, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{ - XDSServer: xdstestutils.ServerConfigForAddress(t, mgmtServer.Address), - NodeProto: &v3corepb.Node{Id: nodeID}, - }, defaultTestWatchExpiryTimeout, time.Duration(0)) + bc, err := e2e.DefaultBootstrapContents(nodeID, mgmtServer.Address) + if err != nil { + t.Fatalf("Failed to create bootstrap configuration: %v", err) + } + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc, WatchExpiryTimeout: defaultTestWatchExpiryTimeout}) if err != nil { - t.Fatalf("failed to create xds client: %v", err) + t.Fatalf("Failed to create an xDS client: %v", err) } defer close() @@ -743,7 +743,7 @@ func (s) TestRDSWatch_NACKError(t *testing.T) { defer cleanup() // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } @@ -789,7 +789,7 @@ func (s) TestRDSWatch_PartialValid(t *testing.T) { defer cleanup() // Create an xDS client with the above bootstrap contents. - client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } diff --git a/xds/internal/xdsclient/tests/resource_update_test.go b/xds/internal/xdsclient/tests/resource_update_test.go index dea76f4844b4..5a684a00e125 100644 --- a/xds/internal/xdsclient/tests/resource_update_test.go +++ b/xds/internal/xdsclient/tests/resource_update_test.go @@ -32,7 +32,6 @@ import ( "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" "google.golang.org/grpc/internal/testutils/xds/fakeserver" - "google.golang.org/grpc/internal/xds/bootstrap" "google.golang.org/grpc/xds/internal" xdstestutils "google.golang.org/grpc/xds/internal/testutils" "google.golang.org/grpc/xds/internal/xdsclient" @@ -284,12 +283,13 @@ func (s) TestHandleListenerResponseFromManagementServer(t *testing.T) { // Create an xDS client talking to the above management server. nodeID := uuid.New().String() - client, close, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{ - XDSServer: xdstestutils.ServerConfigForAddress(t, mgmtServer.Address), - NodeProto: &v3corepb.Node{Id: nodeID}, - }, defaultTestWatchExpiryTimeout, time.Duration(0)) + bc, err := e2e.DefaultBootstrapContents(nodeID, mgmtServer.Address) if err != nil { - t.Fatalf("failed to create xds client: %v", err) + t.Fatalf("Failed to create bootstrap configuration: %v", err) + } + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc, WatchExpiryTimeout: defaultTestWatchExpiryTimeout}) + if err != nil { + t.Fatalf("Failed to create an xDS client: %v", err) } defer close() t.Logf("Created xDS client to %s", mgmtServer.Address) @@ -308,13 +308,20 @@ func (s) TestHandleListenerResponseFromManagementServer(t *testing.T) { t.Fatalf("Timeout when waiting for discovery request at the management server: %v", ctx) } wantReq := &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{ - Node: &v3corepb.Node{Id: nodeID}, + Node: &v3corepb.Node{ + Id: nodeID, + UserAgentName: "gRPC Go", + ClientFeatures: []string{ + "envoy.lb.does_not_support_overprovisioning", + "xds.config.resource-in-sotw", + }, + }, ResourceNames: []string{test.resourceName}, TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", }} gotReq := val.(*fakeserver.Request) - if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { - t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq) + if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform(), protocmp.IgnoreFields(&v3corepb.Node{}, "user_agent_version")); diff != "" { + t.Fatalf("Discovery request received with unexpected diff (-got +want):\n%s\n got: %+v, want: %+v", diff, gotReq, wantReq) } t.Logf("Discovery request received at management server") @@ -554,12 +561,13 @@ func (s) TestHandleRouteConfigResponseFromManagementServer(t *testing.T) { // Create an xDS client talking to the above management server. nodeID := uuid.New().String() - client, close, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{ - XDSServer: xdstestutils.ServerConfigForAddress(t, mgmtServer.Address), - NodeProto: &v3corepb.Node{Id: nodeID}, - }, defaultTestWatchExpiryTimeout, time.Duration(0)) + bc, err := e2e.DefaultBootstrapContents(nodeID, mgmtServer.Address) if err != nil { - t.Fatalf("failed to create xds client: %v", err) + t.Fatalf("Failed to create bootstrap configuration: %v", err) + } + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc, WatchExpiryTimeout: defaultTestWatchExpiryTimeout}) + if err != nil { + t.Fatalf("Failed to create an xDS client: %v", err) } defer close() t.Logf("Created xDS client to %s", mgmtServer.Address) @@ -578,13 +586,20 @@ func (s) TestHandleRouteConfigResponseFromManagementServer(t *testing.T) { t.Fatalf("Timeout when waiting for discovery request at the management server: %v", ctx) } wantReq := &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{ - Node: &v3corepb.Node{Id: nodeID}, + Node: &v3corepb.Node{ + Id: nodeID, + UserAgentName: "gRPC Go", + ClientFeatures: []string{ + "envoy.lb.does_not_support_overprovisioning", + "xds.config.resource-in-sotw", + }, + }, ResourceNames: []string{test.resourceName}, TypeUrl: "type.googleapis.com/envoy.config.route.v3.RouteConfiguration", }} gotReq := val.(*fakeserver.Request) - if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { - t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq) + if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform(), protocmp.IgnoreFields(&v3corepb.Node{}, "user_agent_version")); diff != "" { + t.Fatalf("Discovery request received with unexpected diff (-got +want):\n%s\n got: %+v, want: %+v", diff, gotReq, wantReq) } t.Logf("Discovery request received at management server") @@ -785,12 +800,13 @@ func (s) TestHandleClusterResponseFromManagementServer(t *testing.T) { // Create an xDS client talking to the above management server. nodeID := uuid.New().String() - client, close, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{ - XDSServer: xdstestutils.ServerConfigForAddress(t, mgmtServer.Address), - NodeProto: &v3corepb.Node{Id: nodeID}, - }, defaultTestWatchExpiryTimeout, time.Duration(0)) + bc, err := e2e.DefaultBootstrapContents(nodeID, mgmtServer.Address) if err != nil { - t.Fatalf("failed to create xds client: %v", err) + t.Fatalf("Failed to create bootstrap configuration: %v", err) + } + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc, WatchExpiryTimeout: defaultTestWatchExpiryTimeout}) + if err != nil { + t.Fatalf("Failed to create an xDS client: %v", err) } defer close() t.Logf("Created xDS client to %s", mgmtServer.Address) @@ -809,13 +825,20 @@ func (s) TestHandleClusterResponseFromManagementServer(t *testing.T) { t.Fatalf("Timeout when waiting for discovery request at the management server: %v", ctx) } wantReq := &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{ - Node: &v3corepb.Node{Id: nodeID}, + Node: &v3corepb.Node{ + Id: nodeID, + UserAgentName: "gRPC Go", + ClientFeatures: []string{ + "envoy.lb.does_not_support_overprovisioning", + "xds.config.resource-in-sotw", + }, + }, ResourceNames: []string{test.resourceName}, TypeUrl: "type.googleapis.com/envoy.config.cluster.v3.Cluster", }} gotReq := val.(*fakeserver.Request) - if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { - t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq) + if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform(), protocmp.IgnoreFields(&v3corepb.Node{}, "user_agent_version")); diff != "" { + t.Fatalf("Discovery request received with unexpected diff (-got +want):\n%s\n got: %+v, want: %+v", diff, gotReq, wantReq) } t.Logf("Discovery request received at management server") @@ -1124,12 +1147,13 @@ func (s) TestHandleEndpointsResponseFromManagementServer(t *testing.T) { // Create an xDS client talking to the above management server. nodeID := uuid.New().String() - client, close, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{ - XDSServer: xdstestutils.ServerConfigForAddress(t, mgmtServer.Address), - NodeProto: &v3corepb.Node{Id: nodeID}, - }, defaultTestWatchExpiryTimeout, time.Duration(0)) + bc, err := e2e.DefaultBootstrapContents(nodeID, mgmtServer.Address) if err != nil { - t.Fatalf("failed to create xds client: %v", err) + t.Fatalf("Failed to create bootstrap configuration: %v", err) + } + client, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bc, WatchExpiryTimeout: defaultTestWatchExpiryTimeout}) + if err != nil { + t.Fatalf("Failed to create an xDS client: %v", err) } defer close() t.Logf("Created xDS client to %s", mgmtServer.Address) @@ -1148,13 +1172,20 @@ func (s) TestHandleEndpointsResponseFromManagementServer(t *testing.T) { t.Fatalf("Timeout when waiting for discovery request at the management server: %v", ctx) } wantReq := &fakeserver.Request{Req: &v3discoverypb.DiscoveryRequest{ - Node: &v3corepb.Node{Id: nodeID}, + Node: &v3corepb.Node{ + Id: nodeID, + UserAgentName: "gRPC Go", + ClientFeatures: []string{ + "envoy.lb.does_not_support_overprovisioning", + "xds.config.resource-in-sotw", + }, + }, ResourceNames: []string{test.resourceName}, TypeUrl: "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", }} gotReq := val.(*fakeserver.Request) - if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { - t.Fatalf("Discovery request received at management server is %+v, want %+v", gotReq, wantReq) + if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform(), protocmp.IgnoreFields(&v3corepb.Node{}, "user_agent_version")); diff != "" { + t.Fatalf("Discovery request received with unexpected diff (-got +want):\n%s\n got: %+v, want: %+v", diff, gotReq, wantReq) } t.Logf("Discovery request received at management server") diff --git a/xds/internal/xdsclient/xdsresource/resource_type.go b/xds/internal/xdsclient/xdsresource/resource_type.go index a1e15e2d3e21..3b3a8e79c2b9 100644 --- a/xds/internal/xdsclient/xdsresource/resource_type.go +++ b/xds/internal/xdsclient/xdsresource/resource_type.go @@ -25,9 +25,6 @@ package xdsresource import ( - "fmt" - - "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/xds/bootstrap" xdsinternal "google.golang.org/grpc/xds/internal" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version" @@ -40,8 +37,6 @@ func init() { xdsinternal.ResourceTypeMapForTesting[version.V3RouteConfigURL] = routeConfigType xdsinternal.ResourceTypeMapForTesting[version.V3ClusterURL] = clusterType xdsinternal.ResourceTypeMapForTesting[version.V3EndpointsURL] = endpointsType - - internal.TriggerXDSResourceNameNotFoundForTesting = triggerResourceNotFoundForTesting } // Producer contains a single method to discover resource configuration from a @@ -171,20 +166,3 @@ func (r resourceTypeState) TypeName() string { func (r resourceTypeState) AllResourcesRequiredInSotW() bool { return r.allResourcesRequiredInSotW } - -func triggerResourceNotFoundForTesting(cb func(Type, string) error, typeName, resourceName string) error { - var typ Type - switch typeName { - case ListenerResourceTypeName: - typ = listenerType - case RouteConfigTypeName: - typ = routeConfigType - case ClusterResourceTypeName: - typ = clusterType - case EndpointsResourceTypeName: - typ = endpointsType - default: - return fmt.Errorf("unknown type name %q", typeName) - } - return cb(typ, resourceName) -} diff --git a/xds/server.go b/xds/server.go index b5eb806207ad..126aff067c4c 100644 --- a/xds/server.go +++ b/xds/server.go @@ -96,7 +96,7 @@ func NewGRPCServer(opts ...grpc.ServerOption) (*GRPCServer, error) { if s.opts.bootstrapContentsForTesting != nil { // Bootstrap file contents may be specified as a server option for tests. newXDSClient = func() (xdsclient.XDSClient, func(), error) { - return xdsclient.NewWithBootstrapContentsForTesting(s.opts.bootstrapContentsForTesting) + return xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: s.opts.bootstrapContentsForTesting}) } } xdsClient, xdsClientClose, err := newXDSClient() diff --git a/xds/server_ext_test.go b/xds/server_ext_test.go new file mode 100644 index 000000000000..de4791f6654d --- /dev/null +++ b/xds/server_ext_test.go @@ -0,0 +1,348 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xds_test + +import ( + "context" + "fmt" + "io" + "net" + "strconv" + "strings" + "testing" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/grpcsync" + "google.golang.org/grpc/internal/grpctest" + "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/status" + "google.golang.org/grpc/xds" + xdsinternal "google.golang.org/grpc/xds/internal" + "google.golang.org/grpc/xds/internal/xdsclient" + "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" + "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version" + + v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + testgrpc "google.golang.org/grpc/interop/grpc_testing" + testpb "google.golang.org/grpc/interop/grpc_testing" +) + +var errAcceptAndClose = status.New(codes.Unavailable, "") + +type s struct { + grpctest.Tester +} + +func Test(t *testing.T) { + grpctest.RunSubTests(t, s{}) +} + +const ( + defaultTestTimeout = 10 * time.Second + defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen. +) + +type testService struct { + testgrpc.UnimplementedTestServiceServer +} + +func (*testService) EmptyCall(context.Context, *testpb.Empty) (*testpb.Empty, error) { + return &testpb.Empty{}, nil +} + +func (*testService) UnaryCall(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return &testpb.SimpleResponse{}, nil +} + +func (*testService) FullDuplexCall(stream testgrpc.TestService_FullDuplexCallServer) error { + for { + _, err := stream.Recv() // hangs here forever if stream doesn't shut down...doesn't receive EOF without any errors + if err == io.EOF { + return nil + } + } +} + +func hostPortFromListener(lis net.Listener) (string, uint32, error) { + host, p, err := net.SplitHostPort(lis.Addr().String()) + if err != nil { + return "", 0, fmt.Errorf("net.SplitHostPort(%s) failed: %v", lis.Addr().String(), err) + } + port, err := strconv.ParseInt(p, 10, 32) + if err != nil { + return "", 0, fmt.Errorf("strconv.ParseInt(%s, 10, 32) failed: %v", p, err) + } + return host, uint32(port), nil +} + +// TestServingModeChanges tests the Server's logic as it transitions from Not +// Ready to Ready, then to Not Ready. Before it goes Ready, connections should +// be accepted and closed. After it goes ready, RPC's should proceed as normal +// according to matched route configuration. After it transitions back into not +// ready (through an explicit LDS Resource Not Found), previously running RPC's +// should be gracefully closed and still work, and new RPC's should fail. +func (s) TestServingModeChanges(t *testing.T) { + managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{}) + defer cleanup() + lis, err := testutils.LocalTCPListener() + if err != nil { + t.Fatalf("testutils.LocalTCPListener() failed: %v", err) + } + // Setup the management server to respond with a listener resource that + // specifies a route name to watch. Due to not having received the full + // configuration, this should cause the server to be in mode Serving. + host, port, err := hostPortFromListener(lis) + if err != nil { + t.Fatalf("failed to retrieve host and port of server: %v", err) + } + + listener := e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, "routeName") + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{listener}, + SkipValidation: true, + } + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + serving := grpcsync.NewEvent() + modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) { + t.Logf("serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err) + if args.Mode == connectivity.ServingModeServing { + serving.Fire() + } + }) + + server, err := xds.NewGRPCServer(grpc.Creds(insecure.NewCredentials()), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents)) + if err != nil { + t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err) + } + defer server.Stop() + testgrpc.RegisterTestServiceServer(server, &testService{}) + go func() { + if err := server.Serve(lis); err != nil { + t.Errorf("Serve() failed: %v", err) + } + }() + cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("failed to dial local test server: %v", err) + } + defer cc.Close() + + waitForFailedRPCWithStatus(ctx, t, cc, errAcceptAndClose) + routeConfig := e2e.RouteConfigNonForwardingAction("routeName") + resources = e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{listener}, + Routes: []*v3routepb.RouteConfiguration{routeConfig}, + } + defer cancel() + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + select { + case <-ctx.Done(): + t.Fatal("timeout waiting for the xDS Server to go Serving") + case <-serving.Done(): + } + + // A unary RPC should work once it transitions into serving. (need this same + // assertion from LDS resource not found triggering it). + waitForSuccessfulRPC(ctx, t, cc) + + // Start a stream before switching the server to not serving. Due to the + // stream being created before the graceful stop of the underlying + // connection, it should be able to continue even after the server switches + // to not serving. + c := testgrpc.NewTestServiceClient(cc) + stream, err := c.FullDuplexCall(ctx) + if err != nil { + t.Fatalf("cc.FullDuplexCall failed: %f", err) + } + + // Invoke the lds resource not found - this should cause the server to + // switch to not serving. This should gracefully drain connections, and fail + // RPC's after. (how to assert accepted + closed) does this make it's way to + // application layer? (should work outside of resource not found... + + // Invoke LDS Resource not found here (tests graceful close) + xdsC, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) + if err != nil { + t.Fatalf("Failed to find xDS client for configuration: %v", string(bootstrapContents)) + } + defer close() + triggerResourceNotFound := internal.TriggerXDSResourceNotFoundForTesting.(func(xdsclient.XDSClient, xdsresource.Type, string) error) + listenerResourceType := xdsinternal.ResourceTypeMapForTesting[version.V3ListenerURL].(xdsresource.Type) + if err := triggerResourceNotFound(xdsC, listenerResourceType, listener.GetName()); err != nil { + t.Fatalf("Failed to trigger resource name not found for testing: %v", err) + } + + // New RPCs on that connection should eventually start failing. Due to + // Graceful Stop any started streams continue to work. + if err = stream.Send(&testpb.StreamingOutputCallRequest{}); err != nil { + t.Fatalf("stream.Send() failed: %v, should continue to work due to graceful stop", err) + } + if err = stream.CloseSend(); err != nil { + t.Fatalf("stream.CloseSend() failed: %v, should continue to work due to graceful stop", err) + } + if _, err = stream.Recv(); err != io.EOF { + t.Fatalf("unexpected error: %v, expected an EOF error", err) + } + + // New RPCs on that connection should eventually start failing. + waitForFailedRPCWithStatus(ctx, t, cc, errAcceptAndClose) +} + +// TestResourceNotFoundRDS tests the case where an LDS points to an RDS which +// returns resource not found. Before getting the resource not found, the xDS +// Server has not received all configuration needed, so it should Accept and +// Close any new connections. After it has received the resource not found +// error, the server should move to serving, successfully Accept Connections, +// and fail at the L7 level with resource not found specified. +func (s) TestResourceNotFoundRDS(t *testing.T) { + managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{}) + defer cleanup() + lis, err := testutils.LocalTCPListener() + if err != nil { + t.Fatalf("testutils.LocalTCPListener() failed: %v", err) + } + // Setup the management server to respond with a listener resource that + // specifies a route name to watch, and no RDS resource corresponding to + // this route name. + host, port, err := hostPortFromListener(lis) + if err != nil { + t.Fatalf("failed to retrieve host and port of server: %v", err) + } + + const routeConfigResourceName = "routeName" + listener := e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, routeConfigResourceName) + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{listener}, + SkipValidation: true, + } + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + serving := grpcsync.NewEvent() + modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) { + t.Logf("serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err) + if args.Mode == connectivity.ServingModeServing { + serving.Fire() + } + }) + + server, err := xds.NewGRPCServer(grpc.Creds(insecure.NewCredentials()), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents)) + if err != nil { + t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err) + } + defer server.Stop() + testgrpc.RegisterTestServiceServer(server, &testService{}) + go func() { + if err := server.Serve(lis); err != nil { + t.Errorf("Serve() failed: %v", err) + } + }() + + cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("failed to dial local test server: %v", err) + } + defer cc.Close() + + waitForFailedRPCWithStatus(ctx, t, cc, errAcceptAndClose) + + // Lookup the xDS client in use based on the bootstrap configuration. The + // client was created as part of creating the xDS enabled gRPC server. + xdsC, close, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{Contents: bootstrapContents}) + if err != nil { + t.Fatalf("Failed to find xDS client for configuration: %v", string(bootstrapContents)) + } + defer close() + + // Invoke resource not found - this should result in L7 RPC error with + // unavailable receive on serving as a result, should trigger it to go + // serving. Poll as watch might not be started yet to trigger resource not + // found. + triggerResourceNotFound := internal.TriggerXDSResourceNotFoundForTesting.(func(xdsclient.XDSClient, xdsresource.Type, string) error) + routeConfigResourceType := xdsinternal.ResourceTypeMapForTesting[version.V3RouteConfigURL].(xdsresource.Type) +loop: + for { + if err := triggerResourceNotFound(xdsC, routeConfigResourceType, routeConfigResourceName); err != nil { + t.Fatalf("Failed to trigger resource name not found for testing: %v", err) + } + select { + case <-serving.Done(): + break loop + case <-ctx.Done(): + t.Fatalf("timed out waiting for serving mode to go serving") + case <-time.After(time.Millisecond): + } + } + waitForFailedRPCWithStatus(ctx, t, cc, status.New(codes.Unavailable, "error from xDS configuration for matched route configuration")) +} + +func waitForSuccessfulRPC(ctx context.Context, t *testing.T, cc *grpc.ClientConn) { + t.Helper() + + c := testgrpc.NewTestServiceClient(cc) + if _, err := c.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + t.Fatalf("rpc EmptyCall() failed: %v", err) + } +} + +// waitForFailedRPCWithStatus makes unary RPC's until it receives the expected +// status in a polling manner. Fails if the RPC made does not return the +// expected status before the context expires. +func waitForFailedRPCWithStatus(ctx context.Context, t *testing.T, cc *grpc.ClientConn, st *status.Status) { + t.Helper() + + c := testgrpc.NewTestServiceClient(cc) + ticker := time.NewTicker(10 * time.Millisecond) + defer ticker.Stop() + var err error + for { + select { + case <-ctx.Done(): + t.Fatalf("failure when waiting for RPCs to fail with certain status %v: %v. most recent error received from RPC: %v", st, ctx.Err(), err) + case <-ticker.C: + _, err = c.EmptyCall(ctx, &testpb.Empty{}) + if status.Code(err) == st.Code() && strings.Contains(err.Error(), st.Message()) { + t.Logf("most recent error happy case: %v", err.Error()) + return + } + } + } +}