From c4d2e7f6c4c4865912e138151b9c33ed74efd703 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Wed, 16 Aug 2023 09:49:02 -0700 Subject: [PATCH 1/6] cdsbalancer: cleanup aggregate cluster tests --- internal/testutils/xds/e2e/clientresources.go | 72 +- .../balancer/cdsbalancer/cdsbalancer_test.go | 7 + .../cdsbalancer/cluster_handler_test.go | 1506 ++++++----------- .../e2e_test/aggregate_cluster_test.go | 50 +- .../clusterresolver/resource_resolver.go | 2 +- 5 files changed, 624 insertions(+), 1013 deletions(-) diff --git a/internal/testutils/xds/e2e/clientresources.go b/internal/testutils/xds/e2e/clientresources.go index 7f219b5d569e..7bf4151b6c4c 100644 --- a/internal/testutils/xds/e2e/clientresources.go +++ b/internal/testutils/xds/e2e/clientresources.go @@ -33,6 +33,7 @@ import ( v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + v3aggregateclusterpb "github.com/envoyproxy/go-control-plane/envoy/extensions/clusters/aggregate/v3" v3routerpb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3" v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" v3tlspb "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3" @@ -450,12 +451,37 @@ const ( LoadBalancingPolicyRingHash ) +// ClusterType specifies the type of the Cluster resource. +type ClusterType int + +const ( + // ClusterTypeEDS specifies a Cluster that uses EDS to resolve endpoints. + ClusterTypeEDS ClusterType = iota + // ClusterTypeLogicalDNS specifies a Cluster that uses DNS to resolve + // endpoints. + ClusterTypeLogicalDNS + // ClusterTypeAggregate specifies a Cluster that is made up of child + // clusters. + ClusterTypeAggregate +) + // ClusterOptions contains options to configure a Cluster resource. type ClusterOptions struct { + Type ClusterType // ClusterName is the name of the Cluster resource. ClusterName string - // ServiceName is the EDS service name of the Cluster. + // ServiceName is the EDS service name of the Cluster. Applicable only when + // cluster type is EDS. ServiceName string + // ChildNames is the list of child Cluster names. Applicable only when + // cluster type is Aggregate. + ChildNames []string + // DNSHostName is the dns host name of the Cluster. Applicable only when the + // cluster type is DNS. + DNSHostName string + // DNSPort is the port number of the Cluster. Applicable only when the + // cluster type is DNS. + DNSPort uint32 // Policy is the LB policy to be used. Policy LoadBalancingPolicy // SecurityLevel determines the security configuration for the Cluster. @@ -504,17 +530,51 @@ func ClusterResourceWithOptions(opts ClusterOptions) *v3clusterpb.Cluster { lbPolicy = v3clusterpb.Cluster_RING_HASH } cluster := &v3clusterpb.Cluster{ - Name: opts.ClusterName, - ClusterDiscoveryType: &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_EDS}, - EdsClusterConfig: &v3clusterpb.Cluster_EdsClusterConfig{ + Name: opts.ClusterName, + LbPolicy: lbPolicy, + } + switch opts.Type { + case ClusterTypeEDS: + cluster.ClusterDiscoveryType = &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_EDS} + cluster.EdsClusterConfig = &v3clusterpb.Cluster_EdsClusterConfig{ EdsConfig: &v3corepb.ConfigSource{ ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{ Ads: &v3corepb.AggregatedConfigSource{}, }, }, ServiceName: opts.ServiceName, - }, - LbPolicy: lbPolicy, + } + case ClusterTypeLogicalDNS: + cluster.ClusterDiscoveryType = &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_LOGICAL_DNS} + cluster.LoadAssignment = &v3endpointpb.ClusterLoadAssignment{ + Endpoints: []*v3endpointpb.LocalityLbEndpoints{{ + LbEndpoints: []*v3endpointpb.LbEndpoint{{ + HostIdentifier: &v3endpointpb.LbEndpoint_Endpoint{ + Endpoint: &v3endpointpb.Endpoint{ + Address: &v3corepb.Address{ + Address: &v3corepb.Address_SocketAddress{ + SocketAddress: &v3corepb.SocketAddress{ + Address: opts.DNSHostName, + PortSpecifier: &v3corepb.SocketAddress_PortValue{ + PortValue: opts.DNSPort, + }, + }, + }, + }, + }, + }, + }}, + }}, + } + case ClusterTypeAggregate: + cluster.ClusterDiscoveryType = &v3clusterpb.Cluster_ClusterType{ + ClusterType: &v3clusterpb.Cluster_CustomClusterType{ + Name: "envoy.clusters.aggregate", + TypedConfig: testutils.MarshalAny(&v3aggregateclusterpb.ClusterConfig{ + Clusters: opts.ChildNames, + }), + }, + } } if tlsContext != nil { cluster.TransportSocket = &v3corepb.TransportSocket{ diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go index 1d02c4810912..727a98c2fa51 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go @@ -61,7 +61,11 @@ import ( const ( clusterName = "cluster1" + edsClusterName = clusterName + "-eds" + dnsClusterName = clusterName + "-dns" serviceName = "service1" + dnsHostName = "dns_host" + dnsPort = uint32(8080) defaultTestTimeout = 5 * time.Second defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen. ) @@ -218,6 +222,9 @@ func setupWithManagementServer(t *testing.T) (*e2e.ManagementServer, string, *gr } return nil }, + // Required for aggregate clusters as all resources cannot be requested + // at once. + AllowResourceSubset: true, }) t.Cleanup(cleanup) diff --git a/xds/internal/balancer/cdsbalancer/cluster_handler_test.go b/xds/internal/balancer/cdsbalancer/cluster_handler_test.go index ee989ec3ef73..3978c1027f3e 100644 --- a/xds/internal/balancer/cdsbalancer/cluster_handler_test.go +++ b/xds/internal/balancer/cdsbalancer/cluster_handler_test.go @@ -18,1078 +18,650 @@ package cdsbalancer import ( "context" - "errors" + "encoding/json" "fmt" + "strings" "testing" - - "github.com/google/go-cmp/cmp" - "google.golang.org/grpc/xds/internal/testutils/fakeclient" - "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" -) - -const ( - edsService = "EDS Service" - logicalDNSService = "Logical DNS Service" - edsService2 = "EDS Service 2" - logicalDNSService2 = "Logical DNS Service 2" - aggregateClusterService = "Aggregate Cluster Service" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/internal/stubserver" + "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/serviceconfig" + "google.golang.org/grpc/status" + "google.golang.org/grpc/xds/internal/balancer/clusterresolver" + + v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" + v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" + testgrpc "google.golang.org/grpc/interop/grpc_testing" + testpb "google.golang.org/grpc/interop/grpc_testing" ) -// setupTests creates a clusterHandler with a fake xds client for control over -// xds client. -func setupTests() (*clusterHandler, *fakeclient.Client) { - xdsC := fakeclient.NewClient() - ch := newClusterHandler(&cdsBalancer{xdsClient: xdsC}) - return ch, xdsC +// makeAggregateClusterResource returns an aggregate cluster resource with the +// given name and list of child names. +func makeAggregateClusterResource(name string, childNames []string) *v3clusterpb.Cluster { + return e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ + ClusterName: name, + Type: e2e.ClusterTypeAggregate, + ChildNames: childNames, + }) } -// Simplest case: the cluster handler receives a cluster name, handler starts a -// watch for that cluster, xds client returns that it is a Leaf Node (EDS or -// LogicalDNS), not a tree, so expectation that update is written to buffer -// which will be read by CDS LB. -func (s) TestSuccessCaseLeafNode(t *testing.T) { - tests := []struct { - name string - clusterName string - clusterUpdate xdsresource.ClusterUpdate - }{ - { - name: "test-update-root-cluster-EDS-success", - clusterName: edsService, - clusterUpdate: xdsresource.ClusterUpdate{ - ClusterType: xdsresource.ClusterTypeEDS, - ClusterName: edsService, - }, - }, - { - name: "test-update-root-cluster-Logical-DNS-success", - clusterName: logicalDNSService, - clusterUpdate: xdsresource.ClusterUpdate{ - ClusterType: xdsresource.ClusterTypeLogicalDNS, - ClusterName: logicalDNSService, - }, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - ch, fakeClient := setupTests() - // When you first update the root cluster, it should hit the code - // path which will start a cluster node for that root. Updating the - // root cluster logically represents a ping from a ClientConn. - ch.updateRootCluster(test.clusterName) - // Starting a cluster node involves communicating with the - // xdsClient, telling it to watch a cluster. - ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer ctxCancel() - gotCluster, err := fakeClient.WaitForWatchCluster(ctx) - if err != nil { - t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) - } - if gotCluster != test.clusterName { - t.Fatalf("xdsClient.WatchCDS called for cluster: %v, want: %v", gotCluster, test.clusterName) - } - // Invoke callback with xds client with a certain clusterUpdate. Due - // to this cluster update filling out the whole cluster tree, as the - // cluster is of a root type (EDS or Logical DNS) and not an - // aggregate cluster, this should trigger the ClusterHandler to - // write to the update buffer to update the CDS policy. - fakeClient.InvokeWatchClusterCallback(test.clusterUpdate, nil) - select { - case chu := <-ch.updateChannel: - if diff := cmp.Diff(chu.updates, []xdsresource.ClusterUpdate{test.clusterUpdate}); diff != "" { - t.Fatalf("got unexpected cluster update, diff (-got, +want): %v", diff) - } - case <-ctx.Done(): - t.Fatal("Timed out waiting for update from update channel.") - } - // Close the clusterHandler. This is meant to be called when the CDS - // Balancer is closed, and the call should cancel the watch for this - // cluster. - ch.close() - clusterNameDeleted, err := fakeClient.WaitForCancelClusterWatch(ctx) - if err != nil { - t.Fatalf("xdsClient.CancelCDS failed with error: %v", err) - } - if clusterNameDeleted != test.clusterName { - t.Fatalf("xdsClient.CancelCDS called for cluster %v, want: %v", clusterNameDeleted, logicalDNSService) - } - }) - } +// makeLogicalDNSClusterResource returns a LOGICAL_DNS cluster resource with the +// given name and given DNS host and port. +func makeLogicalDNSClusterResource(name, dnsHost string, dnsPort uint32) *v3clusterpb.Cluster { + return e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ + ClusterName: name, + Type: e2e.ClusterTypeLogicalDNS, + DNSHostName: dnsHost, + DNSPort: dnsPort, + }) } -// The cluster handler receives a cluster name, handler starts a watch for that -// cluster, xds client returns that it is a Leaf Node (EDS or LogicalDNS), not a -// tree, so expectation that first update is written to buffer which will be -// read by CDS LB. Then, send a new cluster update that is different, with the -// expectation that it is also written to the update buffer to send back to CDS. -func (s) TestSuccessCaseLeafNodeThenNewUpdate(t *testing.T) { +// Tests the case where the cluster resource requested by the cds LB policy is +// a leaf cluster. Subsequently the management server pushes an update to the +// same leaf cluster. Verifies that the load balancing configuration pushed to +// the cluster_resolver LB policy is as expected. +func (s) TestClusterHandlerSuccess_LeafNode(t *testing.T) { tests := []struct { - name string - clusterName string - clusterUpdate xdsresource.ClusterUpdate - newClusterUpdate xdsresource.ClusterUpdate + name string + firstClusterResource *v3clusterpb.Cluster + secondClusterResource *v3clusterpb.Cluster + wantFirstChildCfg serviceconfig.LoadBalancingConfig + wantSecondChildCfg serviceconfig.LoadBalancingConfig }{ - {name: "test-update-root-cluster-then-new-update-EDS-success", - clusterName: edsService, - clusterUpdate: xdsresource.ClusterUpdate{ - ClusterType: xdsresource.ClusterTypeEDS, - ClusterName: edsService, + { + name: "eds", + firstClusterResource: e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone), + secondClusterResource: e2e.DefaultCluster(clusterName, serviceName+"-new", e2e.SecurityLevelNone), + wantFirstChildCfg: &clusterresolver.LBConfig{ + DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{{ + Cluster: clusterName, + Type: clusterresolver.DiscoveryMechanismTypeEDS, + EDSServiceName: serviceName, + OutlierDetection: json.RawMessage(`{}`), + }}, + XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`), }, - newClusterUpdate: xdsresource.ClusterUpdate{ - ClusterType: xdsresource.ClusterTypeEDS, - ClusterName: edsService2, + wantSecondChildCfg: &clusterresolver.LBConfig{ + DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{{ + Cluster: clusterName, + Type: clusterresolver.DiscoveryMechanismTypeEDS, + EDSServiceName: serviceName + "-new", + OutlierDetection: json.RawMessage(`{}`), + }}, + XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`), }, }, { - name: "test-update-root-cluster-then-new-update-Logical-DNS-success", - clusterName: logicalDNSService, - clusterUpdate: xdsresource.ClusterUpdate{ - ClusterType: xdsresource.ClusterTypeLogicalDNS, - ClusterName: logicalDNSService, + name: "dns", + firstClusterResource: makeLogicalDNSClusterResource(clusterName, "dns_host", uint32(8080)), + secondClusterResource: makeLogicalDNSClusterResource(clusterName, "dns_host_new", uint32(8080)), + wantFirstChildCfg: &clusterresolver.LBConfig{ + DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{{ + Cluster: clusterName, + Type: clusterresolver.DiscoveryMechanismTypeLogicalDNS, + DNSHostname: "dns_host:8080", + OutlierDetection: json.RawMessage(`{}`), + }}, + XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`), }, - newClusterUpdate: xdsresource.ClusterUpdate{ - ClusterType: xdsresource.ClusterTypeLogicalDNS, - ClusterName: logicalDNSService2, + wantSecondChildCfg: &clusterresolver.LBConfig{ + DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{{ + Cluster: clusterName, + Type: clusterresolver.DiscoveryMechanismTypeLogicalDNS, + DNSHostname: "dns_host_new:8080", + OutlierDetection: json.RawMessage(`{}`), + }}, + XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`), }, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - ch, fakeClient := setupTests() - ch.updateRootCluster(test.clusterName) - ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer ctxCancel() - _, err := fakeClient.WaitForWatchCluster(ctx) - if err != nil { - t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) + lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) + mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t) + + // Push the first cluster resource through the management server and + // verify the configuration pushed to the child policy. + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Clusters: []*v3clusterpb.Cluster{test.firstClusterResource}, + SkipValidation: true, } - fakeClient.InvokeWatchClusterCallback(test.clusterUpdate, nil) - select { - case <-ch.updateChannel: - case <-ctx.Done(): - t.Fatal("Timed out waiting for update from updateChannel.") + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) } - - // Check that sending the same cluster update also induces an update - // to be written to update buffer. - fakeClient.InvokeWatchClusterCallback(test.clusterUpdate, nil) - shouldNotHappenCtx, shouldNotHappenCtxCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) - defer shouldNotHappenCtxCancel() - select { - case <-ch.updateChannel: - case <-shouldNotHappenCtx.Done(): - t.Fatal("Timed out waiting for update from updateChannel.") + if err := compareLoadBalancingConfig(ctx, lbCfgCh, test.wantFirstChildCfg); err != nil { + t.Fatal(err) } - // Above represents same thing as the simple - // TestSuccessCaseLeafNode, extra behavior + validation (clusterNode - // which is a leaf receives a changed clusterUpdate, which should - // ping clusterHandler, which should then write to the update - // buffer). - fakeClient.InvokeWatchClusterCallback(test.newClusterUpdate, nil) - select { - case chu := <-ch.updateChannel: - if diff := cmp.Diff(chu.updates, []xdsresource.ClusterUpdate{test.newClusterUpdate}); diff != "" { - t.Fatalf("got unexpected cluster update, diff (-got, +want): %v", diff) - } - case <-ctx.Done(): - t.Fatal("Timed out waiting for update from updateChannel.") + // Push the second cluster resource through the management server and + // verify the configuration pushed to the child policy. + resources.Clusters[0] = test.secondClusterResource + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + if err := compareLoadBalancingConfig(ctx, lbCfgCh, test.wantSecondChildCfg); err != nil { + t.Fatal(err) } }) } } -// TestUpdateRootClusterAggregateSuccess tests the case where an aggregate -// cluster is a root pointing to two child clusters one of type EDS and the -// other of type LogicalDNS. This test will then send cluster updates for both -// the children, and at the end there should be a successful clusterUpdate -// written to the update buffer to send back to CDS. -func (s) TestUpdateRootClusterAggregateSuccess(t *testing.T) { - ch, fakeClient := setupTests() - ch.updateRootCluster(aggregateClusterService) - - ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer ctxCancel() - gotCluster, err := fakeClient.WaitForWatchCluster(ctx) - if err != nil { - t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) - } - if gotCluster != aggregateClusterService { - t.Fatalf("xdsClient.WatchCDS called for cluster: %v, want: %v", gotCluster, aggregateClusterService) - } - - // The xdsClient telling the clusterNode that the cluster type is an - // aggregate cluster which will cause a lot of downstream behavior. For a - // cluster type that isn't an aggregate, the behavior is simple. The - // clusterNode will simply get a successful update, which will then ping the - // clusterHandler which will successfully build an update to send to the CDS - // policy. In the aggregate cluster case, the handleResp callback must also - // start watches for the aggregate cluster's children. The ping to the - // clusterHandler at the end of handleResp should be a no-op, as neither the - // EDS or LogicalDNS child clusters have received an update yet. - fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{ - ClusterType: xdsresource.ClusterTypeAggregate, - ClusterName: aggregateClusterService, - PrioritizedClusterNames: []string{edsService, logicalDNSService}, - }, nil) - - // xds client should be called to start a watch for one of the child - // clusters of the aggregate. The order of the children in the update - // written to the buffer to send to CDS matters, however there is no - // guarantee on the order it will start the watches of the children. - gotCluster, err = fakeClient.WaitForWatchCluster(ctx) - if err != nil { - t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) +// Tests the case where the cluster resource requested by the cds LB policy is +// an aggregate cluster root pointing to two child clusters, one of type EDS and +// the other of type LogicalDNS. Verifies that the load balancing configuration +// pushed to the cluster_resolver LB policy is as expected. The test then +// updates the aggregate cluster to point to a different set of child clusters +// and verifies that the load balancing configuration pushed to the +// cluster_resolver LB policy is as expected. +func (s) TestAggregateClusterSuccess_ThenUpdateChildClusters(t *testing.T) { + lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) + mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t) + + // Configure the management server with the aggregate cluster resource + // pointing to two child clusters. + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Clusters: []*v3clusterpb.Cluster{ + makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}), + e2e.DefaultCluster(edsClusterName, serviceName, e2e.SecurityLevelNone), + makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort), + }, + SkipValidation: true, } - if gotCluster != edsService { - if gotCluster != logicalDNSService { - t.Fatalf("xdsClient.WatchCDS called for cluster: %v, want: %v", gotCluster, edsService) - } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) } - // xds client should then be called to start a watch for the second child - // cluster. - gotCluster, err = fakeClient.WaitForWatchCluster(ctx) - if err != nil { - t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) + wantChildCfg := &clusterresolver.LBConfig{ + DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{ + { + Cluster: edsClusterName, + Type: clusterresolver.DiscoveryMechanismTypeEDS, + EDSServiceName: serviceName, + OutlierDetection: json.RawMessage(`{}`), + }, + { + Cluster: dnsClusterName, + Type: clusterresolver.DiscoveryMechanismTypeLogicalDNS, + DNSHostname: fmt.Sprintf("%s:%d", dnsHostName, dnsPort), + OutlierDetection: json.RawMessage(`{}`), + }, + }, + XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`), } - if gotCluster != edsService { - if gotCluster != logicalDNSService { - t.Fatalf("xdsClient.WatchCDS called for cluster: %v, want: %v", gotCluster, logicalDNSService) - } + if err := compareLoadBalancingConfig(ctx, lbCfgCh, wantChildCfg); err != nil { + t.Fatal(err) } - // The handleResp() call on the root aggregate cluster should not ping the - // cluster handler to try and construct an update, as the handleResp() - // callback knows that when a child is created, it cannot possibly build a - // successful update yet. Thus, there should be nothing in the update - // channel. - - shouldNotHappenCtx, shouldNotHappenCtxCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) - defer shouldNotHappenCtxCancel() - - select { - case <-ch.updateChannel: - t.Fatal("Cluster Handler wrote an update to updateChannel when it shouldn't have, as each node in the full cluster tree has not yet received an update") - case <-shouldNotHappenCtx.Done(): - } - - // Send callback for the EDS child cluster. - fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{ - ClusterType: xdsresource.ClusterTypeEDS, - ClusterName: edsService, - }, nil) - - // EDS child cluster will ping the Cluster Handler, to try an update, which - // still won't successfully build as the LogicalDNS child of the root - // aggregate cluster has not yet received and handled an update. - select { - case <-ch.updateChannel: - t.Fatal("Cluster Handler wrote an update to updateChannel when it shouldn't have, as each node in the full cluster tree has not yet received an update") - case <-shouldNotHappenCtx.Done(): + const dnsClusterNameNew = dnsClusterName + "-new" + const dnsHostNameNew = dnsHostName + "-new" + resources = e2e.UpdateOptions{ + NodeID: nodeID, + Clusters: []*v3clusterpb.Cluster{ + makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterNameNew}), + e2e.DefaultCluster(edsClusterName, serviceName, e2e.SecurityLevelNone), + makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort), + makeLogicalDNSClusterResource(dnsClusterNameNew, dnsHostNameNew, dnsPort), + }, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + wantChildCfg = &clusterresolver.LBConfig{ + DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{ + { + Cluster: edsClusterName, + Type: clusterresolver.DiscoveryMechanismTypeEDS, + EDSServiceName: serviceName, + OutlierDetection: json.RawMessage(`{}`), + }, + { + Cluster: dnsClusterNameNew, + Type: clusterresolver.DiscoveryMechanismTypeLogicalDNS, + DNSHostname: fmt.Sprintf("%s:%d", dnsHostNameNew, dnsPort), + OutlierDetection: json.RawMessage(`{}`), + }, + }, + XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`), } - - // Invoke callback for Logical DNS child cluster. - - fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{ - ClusterType: xdsresource.ClusterTypeLogicalDNS, - ClusterName: logicalDNSService, - }, nil) - - // Will Ping Cluster Handler, which will finally successfully build an - // update as all nodes in the tree of clusters have received an update. - // Since this cluster is an aggregate cluster comprised of two children, the - // returned update should be length 2, as the xds cluster resolver LB policy - // only cares about the full list of LogicalDNS and EDS clusters - // representing the base nodes of the tree of clusters. This list should be - // ordered as per the cluster update. - select { - case chu := <-ch.updateChannel: - if diff := cmp.Diff(chu.updates, []xdsresource.ClusterUpdate{{ - ClusterType: xdsresource.ClusterTypeEDS, - ClusterName: edsService, - }, { - ClusterType: xdsresource.ClusterTypeLogicalDNS, - ClusterName: logicalDNSService, - }}); diff != "" { - t.Fatalf("got unexpected cluster update, diff (-got, +want): %v", diff) - } - case <-ctx.Done(): - t.Fatal("Timed out waiting for the cluster update to be written to the update buffer.") + if err := compareLoadBalancingConfig(ctx, lbCfgCh, wantChildCfg); err != nil { + t.Fatal(err) } } -// TestUpdateRootClusterAggregateThenChangeChild tests the scenario where you -// have an aggregate cluster with an EDS child and a LogicalDNS child, then you -// change one of the children and send an update for the changed child. This -// should write a new update to the update buffer to send back to CDS. -func (s) TestUpdateRootClusterAggregateThenChangeChild(t *testing.T) { - // This initial code is the same as the test for the aggregate success case, - // except without validations. This will get this test to the point where it - // can change one of the children. - ch, fakeClient := setupTests() - ch.updateRootCluster(aggregateClusterService) - - ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer ctxCancel() - _, err := fakeClient.WaitForWatchCluster(ctx) - if err != nil { - t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) +// Tests the case where the cluster resource requested by the cds LB policy is +// an aggregate cluster root pointing to two child clusters, one of type EDS and +// the other of type LogicalDNS. Verifies that the load balancing configuration +// pushed to the cluster_resolver LB policy is as expected. The test then +// updates the root cluster resource requested by the cds LB policy to a leaf +// cluster of type EDS and verifies the load balancing configuration pushed to +// the cluster_resolver LB policy. +func (s) TestAggregateClusterSuccess_ThenChangeRootToEDS(t *testing.T) { + lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) + mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t) + + // Configure the management server with the aggregate cluster resource + // pointing to two child clusters. + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Clusters: []*v3clusterpb.Cluster{ + makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}), + e2e.DefaultCluster(edsClusterName, serviceName, e2e.SecurityLevelNone), + makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort), + }, + SkipValidation: true, } - - fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{ - ClusterType: xdsresource.ClusterTypeAggregate, - ClusterName: aggregateClusterService, - PrioritizedClusterNames: []string{edsService, logicalDNSService}, - }, nil) - fakeClient.WaitForWatchCluster(ctx) - fakeClient.WaitForWatchCluster(ctx) - fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{ - ClusterType: xdsresource.ClusterTypeEDS, - ClusterName: edsService, - }, nil) - fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{ - ClusterType: xdsresource.ClusterTypeLogicalDNS, - ClusterName: logicalDNSService, - }, nil) - - select { - case <-ch.updateChannel: - case <-ctx.Done(): - t.Fatal("Timed out waiting for the cluster update to be written to the update buffer.") + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) } - fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{ - ClusterType: xdsresource.ClusterTypeAggregate, - ClusterName: aggregateClusterService, - PrioritizedClusterNames: []string{edsService, logicalDNSService2}, - }, nil) - - // The cluster update let's the aggregate cluster know that it's children - // are now edsService and logicalDNSService2, which implies that the - // aggregateCluster lost it's old logicalDNSService child. Thus, the - // logicalDNSService child should be deleted. - clusterNameDeleted, err := fakeClient.WaitForCancelClusterWatch(ctx) - if err != nil { - t.Fatalf("xdsClient.CancelCDS failed with error: %v", err) + wantChildCfg := &clusterresolver.LBConfig{ + DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{ + { + Cluster: edsClusterName, + Type: clusterresolver.DiscoveryMechanismTypeEDS, + EDSServiceName: serviceName, + OutlierDetection: json.RawMessage(`{}`), + }, + { + Cluster: dnsClusterName, + Type: clusterresolver.DiscoveryMechanismTypeLogicalDNS, + DNSHostname: fmt.Sprintf("%s:%d", dnsHostName, dnsPort), + OutlierDetection: json.RawMessage(`{}`), + }, + }, + XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`), } - if clusterNameDeleted != logicalDNSService { - t.Fatalf("xdsClient.CancelCDS called for cluster %v, want: %v", clusterNameDeleted, logicalDNSService) + if err := compareLoadBalancingConfig(ctx, lbCfgCh, wantChildCfg); err != nil { + t.Fatal(err) } - // The handleResp() callback should then start a watch for - // logicalDNSService2. - clusterNameCreated, err := fakeClient.WaitForWatchCluster(ctx) - if err != nil { - t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) + resources = e2e.UpdateOptions{ + NodeID: nodeID, + Clusters: []*v3clusterpb.Cluster{ + e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone), + makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort), + }, + SkipValidation: true, } - if clusterNameCreated != logicalDNSService2 { - t.Fatalf("xdsClient.WatchCDS called for cluster %v, want: %v", clusterNameCreated, logicalDNSService2) + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) } - - // handleResp() should try and send an update here, but it will fail as - // logicalDNSService2 has not yet received an update. - shouldNotHappenCtx, shouldNotHappenCtxCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) - defer shouldNotHappenCtxCancel() - select { - case <-ch.updateChannel: - t.Fatal("Cluster Handler wrote an update to updateChannel when it shouldn't have, as each node in the full cluster tree has not yet received an update") - case <-shouldNotHappenCtx.Done(): + wantChildCfg = &clusterresolver.LBConfig{ + DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{{ + Cluster: clusterName, + Type: clusterresolver.DiscoveryMechanismTypeEDS, + EDSServiceName: serviceName, + OutlierDetection: json.RawMessage(`{}`), + }}, + XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`), } - - // Invoke a callback for the new logicalDNSService2 - this will fill out the - // tree with successful updates. - fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{ - ClusterType: xdsresource.ClusterTypeLogicalDNS, - ClusterName: logicalDNSService2, - }, nil) - - // Behavior: This update make every node in the tree of cluster have - // received an update. Thus, at the end of this callback, when you ping the - // clusterHandler to try and construct an update, the update should now - // successfully be written to update buffer to send back to CDS. This new - // update should contain the new child of LogicalDNS2. - - select { - case chu := <-ch.updateChannel: - if diff := cmp.Diff(chu.updates, []xdsresource.ClusterUpdate{{ - ClusterType: xdsresource.ClusterTypeEDS, - ClusterName: edsService, - }, { - ClusterType: xdsresource.ClusterTypeLogicalDNS, - ClusterName: logicalDNSService2, - }}); diff != "" { - t.Fatalf("got unexpected cluster update, diff (-got, +want): %v", diff) - } - case <-ctx.Done(): - t.Fatal("Timed out waiting for the cluster update to be written to the update buffer.") + if err := compareLoadBalancingConfig(ctx, lbCfgCh, wantChildCfg); err != nil { + t.Fatal(err) } } -// TestUpdateRootClusterAggregateThenChangeRootToEDS tests the situation where -// you have a fully updated aggregate cluster (where AggregateCluster success -// test gets you) as the root cluster, then you update that root cluster to a -// cluster of type EDS. -func (s) TestUpdateRootClusterAggregateThenChangeRootToEDS(t *testing.T) { - // This initial code is the same as the test for the aggregate success case, - // except without validations. This will get this test to the point where it - // can update the root cluster to one of type EDS. - ch, fakeClient := setupTests() - ch.updateRootCluster(aggregateClusterService) - - ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer ctxCancel() - _, err := fakeClient.WaitForWatchCluster(ctx) - if err != nil { - t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) - } - - fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{ - ClusterType: xdsresource.ClusterTypeAggregate, - ClusterName: aggregateClusterService, - PrioritizedClusterNames: []string{edsService, logicalDNSService}, - }, nil) - fakeClient.WaitForWatchCluster(ctx) - fakeClient.WaitForWatchCluster(ctx) - fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{ - ClusterType: xdsresource.ClusterTypeEDS, - ClusterName: edsService, - }, nil) - fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{ - ClusterType: xdsresource.ClusterTypeLogicalDNS, - ClusterName: logicalDNSService, - }, nil) +// Tests the case where a requested cluster resource switches between being a +// leaf and an aggregate cluster. In each of these cases, the test verifies that +// the appropriate load balancing configuration is pushed to the +// cluster_resolver LB policy. +func (s) TestAggregatedClusterSuccess_SwitchBetweenLeafAndAggregate(t *testing.T) { + lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) + mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t) - select { - case <-ch.updateChannel: - case <-ctx.Done(): - t.Fatal("Timed out waiting for the cluster update to be written to the update buffer.") + // Start off with the requested cluster being a leaf EDS cluster. + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone)}, + SkipValidation: true, } - - // Changes the root aggregate cluster to a EDS cluster. This should delete - // the root aggregate cluster and all of it's children by successfully - // canceling the watches for them. - ch.updateRootCluster(edsService2) - - // Reads from the cancel channel, should first be type Aggregate, then EDS - // then Logical DNS. - clusterNameDeleted, err := fakeClient.WaitForCancelClusterWatch(ctx) - if err != nil { - t.Fatalf("xdsClient.CancelCDS failed with error: %v", err) - } - if clusterNameDeleted != aggregateClusterService { - t.Fatalf("xdsClient.CancelCDS called for cluster %v, want: %v", clusterNameDeleted, logicalDNSService) - } - - clusterNameDeleted, err = fakeClient.WaitForCancelClusterWatch(ctx) - if err != nil { - t.Fatalf("xdsClient.CancelCDS failed with error: %v", err) - } - if clusterNameDeleted != edsService { - t.Fatalf("xdsClient.CancelCDS called for cluster %v, want: %v", clusterNameDeleted, logicalDNSService) - } - - clusterNameDeleted, err = fakeClient.WaitForCancelClusterWatch(ctx) - if err != nil { - t.Fatalf("xdsClient.CancelCDS failed with error: %v", err) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + wantChildCfg := &clusterresolver.LBConfig{ + DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{{ + Cluster: clusterName, + Type: clusterresolver.DiscoveryMechanismTypeEDS, + EDSServiceName: serviceName, + OutlierDetection: json.RawMessage(`{}`), + }}, + XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`), + } + if err := compareLoadBalancingConfig(ctx, lbCfgCh, wantChildCfg); err != nil { + t.Fatal(err) + } + + // Switch the requested cluster to be an aggregate cluster pointing to two + // child clusters. + resources = e2e.UpdateOptions{ + NodeID: nodeID, + Clusters: []*v3clusterpb.Cluster{ + makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}), + e2e.DefaultCluster(edsClusterName, serviceName, e2e.SecurityLevelNone), + makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort), + }, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + wantChildCfg = &clusterresolver.LBConfig{ + DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{ + { + Cluster: edsClusterName, + Type: clusterresolver.DiscoveryMechanismTypeEDS, + EDSServiceName: serviceName, + OutlierDetection: json.RawMessage(`{}`), + }, + { + Cluster: dnsClusterName, + Type: clusterresolver.DiscoveryMechanismTypeLogicalDNS, + DNSHostname: fmt.Sprintf("%s:%d", dnsHostName, dnsPort), + OutlierDetection: json.RawMessage(`{}`), + }, + }, + XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`), } - if clusterNameDeleted != logicalDNSService { - t.Fatalf("xdsClient.CancelCDS called for cluster %v, want: %v", clusterNameDeleted, logicalDNSService) + if err := compareLoadBalancingConfig(ctx, lbCfgCh, wantChildCfg); err != nil { + t.Fatal(err) } - // After deletion, it should start a watch for the EDS Cluster. The behavior - // for this EDS Cluster receiving an update from xds client and then - // successfully writing an update to send back to CDS is already tested in - // the updateEDS success case. - gotCluster, err := fakeClient.WaitForWatchCluster(ctx) - if err != nil { - t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) + // Switch the cluster back to a leaf EDS cluster. + resources = e2e.UpdateOptions{ + NodeID: nodeID, + Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone)}, + SkipValidation: true, } - if gotCluster != edsService2 { - t.Fatalf("xdsClient.WatchCDS called for cluster: %v, want: %v", gotCluster, edsService2) + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) } -} - -// TestHandleRespInvokedWithError tests that when handleResp is invoked with an -// error, that the error is successfully written to the update buffer. -func (s) TestHandleRespInvokedWithError(t *testing.T) { - ch, fakeClient := setupTests() - ch.updateRootCluster(edsService) - ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer ctxCancel() - _, err := fakeClient.WaitForWatchCluster(ctx) - if err != nil { - t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) + wantChildCfg = &clusterresolver.LBConfig{ + DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{{ + Cluster: clusterName, + Type: clusterresolver.DiscoveryMechanismTypeEDS, + EDSServiceName: serviceName, + OutlierDetection: json.RawMessage(`{}`), + }}, + XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`), } - fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{}, errors.New("some error")) - select { - case chu := <-ch.updateChannel: - if chu.err.Error() != "some error" { - t.Fatalf("Did not receive the expected error, instead received: %v", chu.err.Error()) - } - case <-ctx.Done(): - t.Fatal("Timed out waiting for update from update channel.") + if err := compareLoadBalancingConfig(ctx, lbCfgCh, wantChildCfg); err != nil { + t.Fatal(err) } } -// TestSwitchClusterNodeBetweenLeafAndAggregated tests having an existing -// cluster node switch between a leaf and an aggregated cluster. When the -// cluster switches from a leaf to an aggregated cluster, it should add -// children, and when it switches back to a leaf, it should delete those new -// children and also successfully write a cluster update to the update buffer. -func (s) TestSwitchClusterNodeBetweenLeafAndAggregated(t *testing.T) { - // Getting the test to the point where there's a root cluster which is a eds - // leaf. - ch, fakeClient := setupTests() - ch.updateRootCluster(edsService2) - ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer ctxCancel() - _, err := fakeClient.WaitForWatchCluster(ctx) - if err != nil { - t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) - } - fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{ - ClusterType: xdsresource.ClusterTypeEDS, - ClusterName: edsService2, - }, nil) - select { - case <-ch.updateChannel: - case <-ctx.Done(): - t.Fatal("Timed out waiting for update from update channel.") - } - // Switch the cluster to an aggregate cluster, this should cause two new - // child watches to be created. - fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{ - ClusterType: xdsresource.ClusterTypeAggregate, - ClusterName: edsService2, - PrioritizedClusterNames: []string{edsService, logicalDNSService}, - }, nil) - - // xds client should be called to start a watch for one of the child - // clusters of the aggregate. The order of the children in the update - // written to the buffer to send to CDS matters, however there is no - // guarantee on the order it will start the watches of the children. - gotCluster, err := fakeClient.WaitForWatchCluster(ctx) - if err != nil { - t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) - } - if gotCluster != edsService { - if gotCluster != logicalDNSService { - t.Fatalf("xdsClient.WatchCDS called for cluster: %v, want: %v", gotCluster, edsService) - } - } - - // xds client should then be called to start a watch for the second child - // cluster. - gotCluster, err = fakeClient.WaitForWatchCluster(ctx) - if err != nil { - t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) - } - if gotCluster != edsService { - if gotCluster != logicalDNSService { - t.Fatalf("xdsClient.WatchCDS called for cluster: %v, want: %v", gotCluster, logicalDNSService) - } - } - - // After starting a watch for the second child cluster, there should be no - // more watches started on the xds client. - shouldNotHappenCtx, shouldNotHappenCtxCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) - defer shouldNotHappenCtxCancel() - gotCluster, err = fakeClient.WaitForWatchCluster(shouldNotHappenCtx) - if err == nil { - t.Fatalf("xdsClient.WatchCDS called for cluster: %v, no more watches should be started.", gotCluster) - } - - // The handleResp() call on the root aggregate cluster should not ping the - // cluster handler to try and construct an update, as the handleResp() - // callback knows that when a child is created, it cannot possibly build a - // successful update yet. Thus, there should be nothing in the update - // channel. - - shouldNotHappenCtx, shouldNotHappenCtxCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) - defer shouldNotHappenCtxCancel() - - select { - case <-ch.updateChannel: - t.Fatal("Cluster Handler wrote an update to updateChannel when it shouldn't have, as each node in the full cluster tree has not yet received an update") - case <-shouldNotHappenCtx.Done(): - } - - // Switch the cluster back to an EDS Cluster. This should cause the two - // children to be deleted. - fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{ - ClusterType: xdsresource.ClusterTypeEDS, - ClusterName: edsService2, - }, nil) - - // Should delete the two children (no guarantee of ordering deleted, which - // is ok), then successfully write an update to the update buffer as the - // full cluster tree has received updates. - clusterNameDeleted, err := fakeClient.WaitForCancelClusterWatch(ctx) - if err != nil { - t.Fatalf("xdsClient.CancelCDS failed with error: %v", err) - } - // No guarantee of ordering, so one of the children should be deleted first. - if clusterNameDeleted != edsService { - if clusterNameDeleted != logicalDNSService { - t.Fatalf("xdsClient.CancelCDS called for cluster %v, want either: %v or: %v", clusterNameDeleted, edsService, logicalDNSService) - } - } - // Then the other child should be deleted. - clusterNameDeleted, err = fakeClient.WaitForCancelClusterWatch(ctx) - if err != nil { - t.Fatalf("xdsClient.CancelCDS failed with error: %v", err) - } - if clusterNameDeleted != edsService { - if clusterNameDeleted != logicalDNSService { - t.Fatalf("xdsClient.CancelCDS called for cluster %v, want either: %v or: %v", clusterNameDeleted, edsService, logicalDNSService) - } - } - - // After cancelling a watch for the second child cluster, there should be no - // more watches cancelled on the xds client. - shouldNotHappenCtx, shouldNotHappenCtxCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) - defer shouldNotHappenCtxCancel() - gotCluster, err = fakeClient.WaitForCancelClusterWatch(shouldNotHappenCtx) - if err == nil { - t.Fatalf("xdsClient.WatchCDS called for cluster: %v, no more watches should be cancelled.", gotCluster) - } - - // Then an update should successfully be written to the update buffer. - select { - case chu := <-ch.updateChannel: - if diff := cmp.Diff(chu.updates, []xdsresource.ClusterUpdate{{ - ClusterType: xdsresource.ClusterTypeEDS, - ClusterName: edsService2, - }}); diff != "" { - t.Fatalf("got unexpected cluster update, diff (-got, +want): %v", diff) - } - case <-ctx.Done(): - t.Fatal("Timed out waiting for update from update channel.") +// Tests the scenario where an aggregate cluster exceeds the maximum depth, +// which is 16. Verfies that the channel moves to TRANSIENT_FAILURE, and the +// error is propagated to RPC callers. The test then modifies the graph to no +// longer exceed maximum depth, and verifies that an RPC can be made +// successfully. +func (s) TestAggregatedClusterFailure_ExceedsMaxStackDepth(t *testing.T) { + mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t) + + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Clusters: []*v3clusterpb.Cluster{ + makeAggregateClusterResource(clusterName, []string{clusterName + "-1"}), + makeAggregateClusterResource(clusterName+"-1", []string{clusterName + "-2"}), + makeAggregateClusterResource(clusterName+"-2", []string{clusterName + "-3"}), + makeAggregateClusterResource(clusterName+"-3", []string{clusterName + "-4"}), + makeAggregateClusterResource(clusterName+"-4", []string{clusterName + "-5"}), + makeAggregateClusterResource(clusterName+"-5", []string{clusterName + "-6"}), + makeAggregateClusterResource(clusterName+"-6", []string{clusterName + "-7"}), + makeAggregateClusterResource(clusterName+"-7", []string{clusterName + "-8"}), + makeAggregateClusterResource(clusterName+"-8", []string{clusterName + "-9"}), + makeAggregateClusterResource(clusterName+"-9", []string{clusterName + "-10"}), + makeAggregateClusterResource(clusterName+"-10", []string{clusterName + "-11"}), + makeAggregateClusterResource(clusterName+"-11", []string{clusterName + "-12"}), + makeAggregateClusterResource(clusterName+"-12", []string{clusterName + "-13"}), + makeAggregateClusterResource(clusterName+"-13", []string{clusterName + "-14"}), + makeAggregateClusterResource(clusterName+"-14", []string{clusterName + "-15"}), + makeAggregateClusterResource(clusterName+"-15", []string{clusterName + "-16"}), + e2e.DefaultCluster(clusterName+"-16", serviceName, e2e.SecurityLevelNone), + }, + SkipValidation: true, } -} - -// TestExceedsMaxStackDepth tests the scenario where an aggregate cluster -// exceeds the maximum depth, which is 16. This should cause an error to be -// written to the update buffer. -func (s) TestExceedsMaxStackDepth(t *testing.T) { - ch, fakeClient := setupTests() - ch.updateRootCluster("cluster0") ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - _, err := fakeClient.WaitForWatchCluster(ctx) - if err != nil { - t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) } - for i := 0; i <= 15; i++ { - fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{ - ClusterType: xdsresource.ClusterTypeAggregate, - ClusterName: "cluster" + fmt.Sprint(i), - PrioritizedClusterNames: []string{"cluster" + fmt.Sprint(i+1)}, - }, nil) - if i == 15 { - // The 16th iteration will try and create a cluster which exceeds - // max stack depth and will thus error, so no CDS Watch will be - // started for the child. - continue - } - _, err = fakeClient.WaitForWatchCluster(ctx) - if err != nil { - t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) + for state := cc.GetState(); state != connectivity.TransientFailure; state = cc.GetState() { + if !cc.WaitForStateChange(ctx, state) { + t.Fatalf("Timed out waiting for state change. got %v; want %v", state, connectivity.TransientFailure) } } - select { - case chu := <-ch.updateChannel: - if chu.err.Error() != "aggregate cluster graph exceeds max depth" { - t.Fatalf("Did not receive the expected error, instead received: %v", chu.err.Error()) - } - case <-ctx.Done(): - t.Fatal("Timed out waiting for an error to be written to update channel.") - } -} -// TestDiamondDependency tests a diamond shaped aggregate cluster (A->[B,C]; -// B->D; C->D). Due to both B and C pointing to D as it's child, it should be -// ignored for C. Once all 4 clusters have received a CDS update, an update -// should be then written to the update buffer, specifying a single Cluster D. -func (s) TestDiamondDependency(t *testing.T) { - ch, fakeClient := setupTests() - ch.updateRootCluster("clusterA") - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - _, err := fakeClient.WaitForWatchCluster(ctx) - if err != nil { - t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) - } - fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{ - ClusterType: xdsresource.ClusterTypeAggregate, - ClusterName: "clusterA", - PrioritizedClusterNames: []string{"clusterB", "clusterC"}, - }, nil) - // Two watches should be started for both child clusters. - _, err = fakeClient.WaitForWatchCluster(ctx) - if err != nil { - t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) - } - _, err = fakeClient.WaitForWatchCluster(ctx) - if err != nil { - t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) + const wantErr = "aggregate cluster graph exceeds max depth" + client := testgrpc.NewTestServiceClient(cc) + _, err := client.EmptyCall(ctx, &testpb.Empty{}) + if code := status.Code(err); code != codes.Unavailable { + t.Fatalf("EmptyCall() failed with code: %v, want %v", code, codes.Unavailable) + } + if err != nil && !strings.Contains(err.Error(), wantErr) { + t.Fatalf("EmptyCall() failed with err: %v, want err containing: %v", err, wantErr) + } + + // Start a test service backend. + server := stubserver.StartTestService(t, nil) + t.Cleanup(server.Stop) + + // Update the aggregate cluster resource to no longer exceed max depth. + resources = e2e.UpdateOptions{ + NodeID: nodeID, + Clusters: []*v3clusterpb.Cluster{ + makeAggregateClusterResource(clusterName, []string{clusterName + "-1"}), + makeAggregateClusterResource(clusterName+"-1", []string{clusterName + "-2"}), + makeAggregateClusterResource(clusterName+"-2", []string{clusterName + "-3"}), + makeAggregateClusterResource(clusterName+"-3", []string{clusterName + "-4"}), + makeAggregateClusterResource(clusterName+"-4", []string{clusterName + "-5"}), + makeAggregateClusterResource(clusterName+"-5", []string{clusterName + "-6"}), + makeAggregateClusterResource(clusterName+"-6", []string{clusterName + "-7"}), + makeAggregateClusterResource(clusterName+"-7", []string{clusterName + "-8"}), + makeAggregateClusterResource(clusterName+"-8", []string{clusterName + "-9"}), + makeAggregateClusterResource(clusterName+"-9", []string{clusterName + "-10"}), + makeAggregateClusterResource(clusterName+"-10", []string{clusterName + "-11"}), + makeAggregateClusterResource(clusterName+"-11", []string{clusterName + "-12"}), + makeAggregateClusterResource(clusterName+"-12", []string{clusterName + "-13"}), + makeAggregateClusterResource(clusterName+"-13", []string{clusterName + "-14"}), + makeAggregateClusterResource(clusterName+"-14", []string{clusterName + "-15"}), + e2e.DefaultCluster(clusterName+"-15", serviceName, e2e.SecurityLevelNone), + }, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})}, + SkipValidation: true, } - // B -> D. - fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{ - ClusterType: xdsresource.ClusterTypeAggregate, - ClusterName: "clusterB", - PrioritizedClusterNames: []string{"clusterD"}, - }, nil) - _, err = fakeClient.WaitForWatchCluster(ctx) - if err != nil { - t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) } - // This shouldn't cause an update to be written to the update buffer, - // as cluster C has not received a cluster update yet. - fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{ - ClusterType: xdsresource.ClusterTypeEDS, - ClusterName: "clusterD", - }, nil) - - sCtx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) - defer cancel() - select { - case <-ch.updateChannel: - t.Fatal("an update should not have been written to the update buffer") - case <-sCtx.Done(): - } - - // This update for C should cause an update to be written to the update - // buffer. When you search this aggregated cluster graph, each node has - // received an update. This update should only contain one clusterD, as - // clusterC does not add a clusterD child update due to the clusterD update - // already having been added as a child of clusterB. - fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{ - ClusterType: xdsresource.ClusterTypeAggregate, - ClusterName: "clusterC", - PrioritizedClusterNames: []string{"clusterD"}, - }, nil) - - select { - case chu := <-ch.updateChannel: - if diff := cmp.Diff(chu.updates, []xdsresource.ClusterUpdate{{ - ClusterType: xdsresource.ClusterTypeEDS, - ClusterName: "clusterD", - }}); diff != "" { - t.Fatalf("got unexpected cluster update, diff (-got, +want): %v", diff) - } - case <-ctx.Done(): - t.Fatal("Timed out waiting for the cluster update to be written to the update buffer.") + // Verify that a successful RPC can be made. + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + t.Fatalf("EmptyCall() failed: %v", err) } } -// TestIgnoreDups tests the cluster (A->[B, C]; B->[C, D]). Only one watch -// should be started for cluster C. The update written to the update buffer -// should only contain one instance of cluster C correctly as a higher priority -// than D. -func (s) TestIgnoreDups(t *testing.T) { - ch, fakeClient := setupTests() - ch.updateRootCluster("clusterA") - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - _, err := fakeClient.WaitForWatchCluster(ctx) - if err != nil { - t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) - } - fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{ - ClusterType: xdsresource.ClusterTypeAggregate, - ClusterName: "clusterA", - PrioritizedClusterNames: []string{"clusterB", "clusterC"}, - }, nil) - // Two watches should be started, one for each child cluster. - _, err = fakeClient.WaitForWatchCluster(ctx) - if err != nil { - t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) - } - _, err = fakeClient.WaitForWatchCluster(ctx) - if err != nil { - t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) - } - // The child cluster C should not have a watch started for it, as it is - // already part of the aggregate cluster graph as the child of the root - // cluster clusterA and has already had a watch started for it. - fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{ - ClusterType: xdsresource.ClusterTypeAggregate, - ClusterName: "clusterB", - PrioritizedClusterNames: []string{"clusterC", "clusterD"}, - }, nil) - // Only one watch should be started, which should be for clusterD. - name, err := fakeClient.WaitForWatchCluster(ctx) - if err != nil { - t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) - } - if name != "clusterD" { - t.Fatalf("xdsClient.WatchCDS called for cluster: %v, want: clusterD", name) - } - - sCtx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) - defer cancel() - if _, err = fakeClient.WaitForWatchCluster(sCtx); err == nil { - t.Fatalf("only one watch should have been started for the children of clusterB") +// Tests a diamond shaped aggregate cluster (A->[B,C]; B->D; C->D). Verifies +// that the load balancing configuration pushed to the cluster_resolver LB +// policy specifies cluster D only once. +func (s) TestAggregatedClusterSuccess_DiamondDependency(t *testing.T) { + lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) + mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t) + + // Configure the management server with an aggregate cluster resource having + // a diamond dependency pattern. + const ( + clusterNameA = clusterName // cluster name in cds LB policy config + clusterNameB = clusterName + "-B" + clusterNameC = clusterName + "-C" + clusterNameD = clusterName + "-D" + ) + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Clusters: []*v3clusterpb.Cluster{ + makeAggregateClusterResource(clusterNameA, []string{clusterNameB, clusterNameC}), + makeAggregateClusterResource(clusterNameB, []string{clusterNameD}), + makeAggregateClusterResource(clusterNameC, []string{clusterNameD}), + e2e.DefaultCluster(clusterNameD, serviceName, e2e.SecurityLevelNone), + }, + SkipValidation: true, } - - // This update should not cause an update to be written to the update - // buffer, as each cluster in the tree has not yet received a cluster - // update. With cluster B ignoring cluster C, the system should function as - // if cluster C was not a child of cluster B (meaning all 4 clusters should - // be required to get an update). - fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{ - ClusterType: xdsresource.ClusterTypeEDS, - ClusterName: "clusterC", - }, nil) - sCtx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - select { - case <-ch.updateChannel: - t.Fatal("an update should not have been written to the update buffer") - case <-sCtx.Done(): + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) } - // This update causes all 4 clusters in the aggregated cluster graph to have - // received an update, so an update should be written to the update buffer - // with only a single occurrence of cluster C as a higher priority than - // cluster D. - fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{ - ClusterType: xdsresource.ClusterTypeEDS, - ClusterName: "clusterD", - }, nil) - select { - case chu := <-ch.updateChannel: - if diff := cmp.Diff(chu.updates, []xdsresource.ClusterUpdate{{ - ClusterType: xdsresource.ClusterTypeEDS, - ClusterName: "clusterC", - }, { - ClusterType: xdsresource.ClusterTypeEDS, - ClusterName: "clusterD", - }}); diff != "" { - t.Fatalf("got unexpected cluster update, diff (-got, +want): %v", diff) - } - case <-ctx.Done(): - t.Fatal("Timed out waiting for the cluster update to be written to the update buffer.") + wantChildCfg := &clusterresolver.LBConfig{ + DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{{ + Cluster: clusterNameD, + Type: clusterresolver.DiscoveryMechanismTypeEDS, + EDSServiceName: serviceName, + OutlierDetection: json.RawMessage(`{}`), + }}, + XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`), } - - // Delete A's ref to C by updating A with only child B. Since B still has a - // reference to C, C's watch should not be canceled, and also an update - // should correctly be built. - fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{ - ClusterType: xdsresource.ClusterTypeAggregate, - ClusterName: "clusterA", - PrioritizedClusterNames: []string{"clusterB"}, - }, nil) - - select { - case chu := <-ch.updateChannel: - if diff := cmp.Diff(chu.updates, []xdsresource.ClusterUpdate{{ - ClusterType: xdsresource.ClusterTypeEDS, - ClusterName: "clusterC", - }, { - ClusterType: xdsresource.ClusterTypeEDS, - ClusterName: "clusterD", - }}); diff != "" { - t.Fatalf("got unexpected cluster update, diff (-got, +want): %v", diff) - } - case <-ctx.Done(): - t.Fatal("Timed out waiting for the cluster update to be written to the update buffer.") + if err := compareLoadBalancingConfig(ctx, lbCfgCh, wantChildCfg); err != nil { + t.Fatal(err) } } -// TestErrorStateWholeTree tests the scenario where the aggregate cluster graph -// exceeds max depth. An error should be written to the update channel. -// Afterward, if a valid response comes in for another cluster, no update should -// be written to the update channel, as the aggregate cluster graph is still in -// the same error state. -func (s) TestErrorStateWholeTree(t *testing.T) { - ch, fakeClient := setupTests() - ch.updateRootCluster("cluster0") +// Tests the case where the aggregate cluster graph contains duplicates (A->[B, +// C]; B->[C, D]). Verifies that the load balancing configuration pushed to the +// cluster_resolver LB policy does not contain duplicates. +func (s) TestAggregatedClusterSuccess_IgnoreDups(t *testing.T) { + lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) + mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t) + + // Configure the management server with an aggregate cluster resource that + // has duplicates in the graph. + const ( + clusterNameA = clusterName // cluster name in cds LB policy config + clusterNameB = clusterName + "-B" + clusterNameC = clusterName + "-C" + clusterNameD = clusterName + "-D" + ) + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Clusters: []*v3clusterpb.Cluster{ + makeAggregateClusterResource(clusterNameA, []string{clusterNameB, clusterNameC}), + makeAggregateClusterResource(clusterNameB, []string{clusterNameC, clusterNameD}), + e2e.DefaultCluster(clusterNameC, serviceName, e2e.SecurityLevelNone), + e2e.DefaultCluster(clusterNameD, serviceName, e2e.SecurityLevelNone), + }, + SkipValidation: true, + } ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - _, err := fakeClient.WaitForWatchCluster(ctx) - if err != nil { - t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) } - for i := 0; i <= 15; i++ { - fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{ - ClusterType: xdsresource.ClusterTypeAggregate, - ClusterName: "cluster" + fmt.Sprint(i), - PrioritizedClusterNames: []string{"cluster" + fmt.Sprint(i+1)}, - }, nil) - if i == 15 { - // The 16th iteration will try and create a cluster which exceeds - // max stack depth and will thus error, so no CDS Watch will be - // started for the child. - continue - } - _, err = fakeClient.WaitForWatchCluster(ctx) - if err != nil { - t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) - } - } - select { - case chu := <-ch.updateChannel: - if chu.err.Error() != "aggregate cluster graph exceeds max depth" { - t.Fatalf("Did not receive the expected error, instead received: %v", chu.err.Error()) - } - case <-ctx.Done(): - t.Fatal("Timed out waiting for an error to be written to update channel.") - } - - // Invoke a cluster callback for a node in the graph that rests within the - // allowed depth. This will cause the system to try and construct a cluster - // update, and it shouldn't write an update as the aggregate cluster graph - // is still in an error state. Since the graph continues to stay in an error - // state, no new error needs to be written to the update buffer as that - // would be redundant information. - fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{ - ClusterType: xdsresource.ClusterTypeAggregate, - ClusterName: "cluster3", - PrioritizedClusterNames: []string{"cluster4"}, - }, nil) - - sCtx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) - defer cancel() - select { - case <-ch.updateChannel: - t.Fatal("an update should not have been written to the update buffer") - case <-sCtx.Done(): - } - - // Invoke the same cluster update for cluster 15, specifying it has a child - // cluster16. This should cause an error to be written to the update buffer, - // as it still exceeds the max depth. - fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{ - ClusterType: xdsresource.ClusterTypeAggregate, - ClusterName: "cluster15", - PrioritizedClusterNames: []string{"cluster16"}, - }, nil) - select { - case chu := <-ch.updateChannel: - if chu.err.Error() != "aggregate cluster graph exceeds max depth" { - t.Fatalf("Did not receive the expected error, instead received: %v", chu.err.Error()) - } - case <-ctx.Done(): - t.Fatal("Timed out waiting for an error to be written to update channel.") + wantChildCfg := &clusterresolver.LBConfig{ + DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{ + { + Cluster: clusterNameC, + Type: clusterresolver.DiscoveryMechanismTypeEDS, + EDSServiceName: serviceName, + OutlierDetection: json.RawMessage(`{}`), + }, + { + Cluster: clusterNameD, + Type: clusterresolver.DiscoveryMechanismTypeEDS, + EDSServiceName: serviceName, + OutlierDetection: json.RawMessage(`{}`), + }, + }, + XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`), } - - // When you remove the child of cluster15 that causes the graph to be in the - // error state of exceeding max depth, the update should successfully - // construct and be written to the update buffer. - fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{ - ClusterType: xdsresource.ClusterTypeEDS, - ClusterName: "cluster15", - }, nil) - - select { - case chu := <-ch.updateChannel: - if diff := cmp.Diff(chu.updates, []xdsresource.ClusterUpdate{{ - ClusterType: xdsresource.ClusterTypeEDS, - ClusterName: "cluster15", - }}); diff != "" { - t.Fatalf("got unexpected cluster update, diff (-got, +want): %v", diff) - } - case <-ctx.Done(): - t.Fatal("Timed out waiting for the cluster update to be written to the update buffer.") + if err := compareLoadBalancingConfig(ctx, lbCfgCh, wantChildCfg); err != nil { + t.Fatal(err) } } -// TestNodeChildOfItself tests the scenario where the aggregate cluster graph -// has a node that has child node of itself. The case for this is A -> A, and -// since there is no base cluster (EDS or Logical DNS), no update should be -// written if it tries to build a cluster update. -func (s) TestNodeChildOfItself(t *testing.T) { - ch, fakeClient := setupTests() - ch.updateRootCluster("clusterA") - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - _, err := fakeClient.WaitForWatchCluster(ctx) - if err != nil { - t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) +// Tests the scenario where the aggregate cluster graph has a node that has +// child node of itself. The case for this is A -> A, and since there is no base +// cluster (EDS or Logical DNS), no configuration should be pushed to the child +// policy. Then the test updates A -> B, where B is a leaf EDS cluster. Verifies +// that configuration is pushed to the child policy and that an RPC can be +// successfully made. +func (s) TestAggregatedCluster_NodeChildOfItself(t *testing.T) { + lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) + mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t) + + const ( + clusterNameA = clusterName // cluster name in cds LB policy config + clusterNameB = clusterName + "-B" + ) + // Configure the management server with an aggregate cluster resource whose + // child is itself. + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Clusters: []*v3clusterpb.Cluster{makeAggregateClusterResource(clusterNameA, []string{clusterNameA})}, + SkipValidation: true, } - // Invoke the callback informing the cluster handler that clusterA has a - // child that it is itself. Due to this child cluster being a duplicate, no - // watch should be started. Since there are no leaf nodes (i.e. EDS or - // Logical DNS), no update should be written to the update buffer. - fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{ - ClusterType: xdsresource.ClusterTypeAggregate, - ClusterName: "clusterA", - PrioritizedClusterNames: []string{"clusterA"}, - }, nil) - sCtx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - if _, err := fakeClient.WaitForWatchCluster(sCtx); err == nil { - t.Fatal("Watch should not have been started for clusterA") + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) } - sCtx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) - defer cancel() + select { - case <-ch.updateChannel: - t.Fatal("update should not have been written to update buffer") - case <-sCtx.Done(): + case cfg := <-lbCfgCh: + t.Fatalf("Unexpected configuration pushed to child policy: %v", cfg) + case <-time.After(defaultTestShortTimeout): } - // Invoke the callback again informing the cluster handler that clusterA has - // a child that it is itself. Due to this child cluster being a duplicate, - // no watch should be started. Since there are no leaf nodes (i.e. EDS or - // Logical DNS), no update should be written to the update buffer. - fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{ - ClusterType: xdsresource.ClusterTypeAggregate, - ClusterName: "clusterA", - PrioritizedClusterNames: []string{"clusterA"}, - }, nil) + // Start a test service backend. + server := stubserver.StartTestService(t, nil) + t.Cleanup(server.Stop) - sCtx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) - defer cancel() - if _, err := fakeClient.WaitForWatchCluster(sCtx); err == nil { - t.Fatal("Watch should not have been started for clusterA") + // Update the aggregate cluster to point to a leaf EDS cluster. + resources = e2e.UpdateOptions{ + NodeID: nodeID, + Clusters: []*v3clusterpb.Cluster{ + makeAggregateClusterResource(clusterNameA, []string{clusterNameB}), + e2e.DefaultCluster(clusterNameB, serviceName, e2e.SecurityLevelNone), + }, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})}, + SkipValidation: true, } - sCtx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) - defer cancel() - select { - case <-ch.updateChannel: - t.Fatal("update should not have been written to update buffer, as clusterB has not received an update yet") - case <-sCtx.Done(): + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) } - // Inform the cluster handler that clusterA now has clusterB as a child. - // This should not cancel the watch for A, as it is still the root cluster - // and still has a ref count, not write an update to update buffer as - // cluster B has not received an update yet, and start a new watch for - // cluster B as it is not a duplicate. - fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{ - ClusterType: xdsresource.ClusterTypeAggregate, - ClusterName: "clusterA", - PrioritizedClusterNames: []string{"clusterB"}, - }, nil) - - sCtx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) - defer cancel() - if _, err := fakeClient.WaitForCancelClusterWatch(sCtx); err == nil { - t.Fatal("clusterA should not have been canceled, as it is still the root cluster") + // Verify the configuration pushed to the child policy. + wantChildCfg := &clusterresolver.LBConfig{ + DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{{ + Cluster: clusterNameB, + Type: clusterresolver.DiscoveryMechanismTypeEDS, + EDSServiceName: serviceName, + OutlierDetection: json.RawMessage(`{}`), + }}, + XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`), } - - sCtx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) - defer cancel() - select { - case <-ch.updateChannel: - t.Fatal("update should not have been written to update buffer, as clusterB has not received an update yet") - case <-sCtx.Done(): + if err := compareLoadBalancingConfig(ctx, lbCfgCh, wantChildCfg); err != nil { + t.Fatal(err) } - gotCluster, err := fakeClient.WaitForWatchCluster(ctx) - if err != nil { - t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) - } - if gotCluster != "clusterB" { - t.Fatalf("xdsClient.WatchCDS called for cluster: %v, want: %v", gotCluster, "clusterB") + // Verify that a successful RPC can be made. + client := testgrpc.NewTestServiceClient(cc) + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + t.Fatalf("EmptyCall() failed: %v", err) } } diff --git a/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go b/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go index f466dcca7bda..036f2375a12b 100644 --- a/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go +++ b/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go @@ -33,7 +33,6 @@ import ( "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/stubserver" - "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/pickfirst" "google.golang.org/grpc/internal/testutils/xds/e2e" "google.golang.org/grpc/peer" @@ -50,7 +49,6 @@ import ( v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" - v3aggregateclusterpb "github.com/envoyproxy/go-control-plane/envoy/extensions/clusters/aggregate/v3" v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" testgrpc "google.golang.org/grpc/interop/grpc_testing" testpb "google.golang.org/grpc/interop/grpc_testing" @@ -59,48 +57,22 @@ import ( // makeAggregateClusterResource returns an aggregate cluster resource with the // given name and list of child names. func makeAggregateClusterResource(name string, childNames []string) *v3clusterpb.Cluster { - return &v3clusterpb.Cluster{ - Name: name, - ClusterDiscoveryType: &v3clusterpb.Cluster_ClusterType{ - ClusterType: &v3clusterpb.Cluster_CustomClusterType{ - Name: "envoy.clusters.aggregate", - TypedConfig: testutils.MarshalAny(&v3aggregateclusterpb.ClusterConfig{ - Clusters: childNames, - }), - }, - }, - LbPolicy: v3clusterpb.Cluster_ROUND_ROBIN, - } + return e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ + ClusterName: name, + Type: e2e.ClusterTypeAggregate, + ChildNames: childNames, + }) } // makeLogicalDNSClusterResource returns a LOGICAL_DNS cluster resource with the // given name and given DNS host and port. func makeLogicalDNSClusterResource(name, dnsHost string, dnsPort uint32) *v3clusterpb.Cluster { - return &v3clusterpb.Cluster{ - Name: name, - ClusterDiscoveryType: &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_LOGICAL_DNS}, - LbPolicy: v3clusterpb.Cluster_ROUND_ROBIN, - LoadAssignment: &v3endpointpb.ClusterLoadAssignment{ - Endpoints: []*v3endpointpb.LocalityLbEndpoints{{ - LbEndpoints: []*v3endpointpb.LbEndpoint{{ - HostIdentifier: &v3endpointpb.LbEndpoint_Endpoint{ - Endpoint: &v3endpointpb.Endpoint{ - Address: &v3corepb.Address{ - Address: &v3corepb.Address_SocketAddress{ - SocketAddress: &v3corepb.SocketAddress{ - Address: dnsHost, - PortSpecifier: &v3corepb.SocketAddress_PortValue{ - PortValue: dnsPort, - }, - }, - }, - }, - }, - }, - }}, - }}, - }, - } + return e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ + ClusterName: name, + Type: e2e.ClusterTypeLogicalDNS, + DNSHostName: dnsHost, + DNSPort: dnsPort, + }) } // setupDNS unregisters the DNS resolver and registers a manual resolver for the diff --git a/xds/internal/balancer/clusterresolver/resource_resolver.go b/xds/internal/balancer/clusterresolver/resource_resolver.go index b9a81e9ba829..151c54dae6d0 100644 --- a/xds/internal/balancer/clusterresolver/resource_resolver.go +++ b/xds/internal/balancer/clusterresolver/resource_resolver.go @@ -200,7 +200,7 @@ func (rr *resourceResolver) updateMechanisms(mechanisms []DiscoveryMechanism) { for dm, r := range rr.childrenMap { if !newDMs[dm] { delete(rr.childrenMap, dm) - r.r.stop() + go r.r.stop() } } // Regenerate even if there's no change in discovery mechanism, in case From a231e857f3599b7232226aa27e01aefa80422911 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Wed, 23 Aug 2023 23:51:56 +0000 Subject: [PATCH 2/6] review comments about comments --- .../cdsbalancer/cluster_handler_test.go | 46 +++++++++++-------- 1 file changed, 27 insertions(+), 19 deletions(-) diff --git a/xds/internal/balancer/cdsbalancer/cluster_handler_test.go b/xds/internal/balancer/cdsbalancer/cluster_handler_test.go index 3978c1027f3e..40604110849c 100644 --- a/xds/internal/balancer/cdsbalancer/cluster_handler_test.go +++ b/xds/internal/balancer/cdsbalancer/cluster_handler_test.go @@ -61,10 +61,11 @@ func makeLogicalDNSClusterResource(name, dnsHost string, dnsPort uint32) *v3clus }) } -// Tests the case where the cluster resource requested by the cds LB policy is -// a leaf cluster. Subsequently the management server pushes an update to the -// same leaf cluster. Verifies that the load balancing configuration pushed to -// the cluster_resolver LB policy is as expected. +// Tests the case where the cluster resource requested by the cds LB policy is a +// leaf cluster. The management server sends two updates for the same leaf +// cluster resource. The test verifies that the load balancing configuration +// pushed to the cluster_resolver LB policy is contains the expected discovery +// mechanism corresponding to the leaf cluster, on both occassions. func (s) TestClusterHandlerSuccess_LeafNode(t *testing.T) { tests := []struct { name string @@ -159,9 +160,11 @@ func (s) TestClusterHandlerSuccess_LeafNode(t *testing.T) { // an aggregate cluster root pointing to two child clusters, one of type EDS and // the other of type LogicalDNS. Verifies that the load balancing configuration // pushed to the cluster_resolver LB policy is as expected. The test then -// updates the aggregate cluster to point to a different set of child clusters -// and verifies that the load balancing configuration pushed to the -// cluster_resolver LB policy is as expected. +// updates the aggregate cluster to point to two child clusters, the same leaf +// cluster of type EDS and a different leaf cluster of type LogicalDNS and +// verifies that the load balancing configuration pushed to the cluster_resolver +// LB policy contains the expected discovery mechanisms corresponding to the +// child clusters. func (s) TestAggregateClusterSuccess_ThenUpdateChildClusters(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t) @@ -243,11 +246,12 @@ func (s) TestAggregateClusterSuccess_ThenUpdateChildClusters(t *testing.T) { // Tests the case where the cluster resource requested by the cds LB policy is // an aggregate cluster root pointing to two child clusters, one of type EDS and -// the other of type LogicalDNS. Verifies that the load balancing configuration -// pushed to the cluster_resolver LB policy is as expected. The test then -// updates the root cluster resource requested by the cds LB policy to a leaf -// cluster of type EDS and verifies the load balancing configuration pushed to -// the cluster_resolver LB policy. +// the other of type LogicalDNS. The test verifies that the load balancing +// configuration pushed to the cluster_resolver LB policy contains the discovery +// mechanisms for both child clusters. The test then updates the root cluster +// resource requested by the cds LB policy to a leaf cluster of type EDS and +// verifies the load balancing configuration pushed to the cluster_resolver LB +// policy contains a single discovery mechanism. func (s) TestAggregateClusterSuccess_ThenChangeRootToEDS(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t) @@ -316,9 +320,10 @@ func (s) TestAggregateClusterSuccess_ThenChangeRootToEDS(t *testing.T) { } // Tests the case where a requested cluster resource switches between being a -// leaf and an aggregate cluster. In each of these cases, the test verifies that -// the appropriate load balancing configuration is pushed to the -// cluster_resolver LB policy. +// leaf and an aggregate cluster pointing to an EDS and LogicalDNS child +// cluster. In each of these cases, the test verifies that the load balancing +// configuration pushed to the cluster_resolver LB policy contains the expected +// discovery mechanisms. func (s) TestAggregatedClusterSuccess_SwitchBetweenLeafAndAggregate(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t) @@ -408,8 +413,8 @@ func (s) TestAggregatedClusterSuccess_SwitchBetweenLeafAndAggregate(t *testing.T // Tests the scenario where an aggregate cluster exceeds the maximum depth, // which is 16. Verfies that the channel moves to TRANSIENT_FAILURE, and the // error is propagated to RPC callers. The test then modifies the graph to no -// longer exceed maximum depth, and verifies that an RPC can be made -// successfully. +// longer exceed maximum depth, but be at the maximum allowed depth, and +// verifies that an RPC can be made successfully. func (s) TestAggregatedClusterFailure_ExceedsMaxStackDepth(t *testing.T) { mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t) @@ -462,7 +467,8 @@ func (s) TestAggregatedClusterFailure_ExceedsMaxStackDepth(t *testing.T) { server := stubserver.StartTestService(t, nil) t.Cleanup(server.Stop) - // Update the aggregate cluster resource to no longer exceed max depth. + // Update the aggregate cluster resource to no longer exceed max depth, and + // be at the maximum depth allowed. resources = e2e.UpdateOptions{ NodeID: nodeID, Clusters: []*v3clusterpb.Cluster{ @@ -543,7 +549,9 @@ func (s) TestAggregatedClusterSuccess_DiamondDependency(t *testing.T) { // Tests the case where the aggregate cluster graph contains duplicates (A->[B, // C]; B->[C, D]). Verifies that the load balancing configuration pushed to the -// cluster_resolver LB policy does not contain duplicates. +// cluster_resolver LB policy does not contain duplicates, and that the +// discovery mechanism corresponding to cluster C is of higher priority than the +// discovery mechanism for cluser D. func (s) TestAggregatedClusterSuccess_IgnoreDups(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t) From 6eeca11ec67a8cf6574de0284b72a09c1590f963 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Thu, 24 Aug 2023 12:29:19 +0000 Subject: [PATCH 3/6] re-add logic to verify that config is pushed only after the cluster graph is completely resolved --- .../cdsbalancer/cluster_handler_test.go | 64 +++++++++++++++++-- 1 file changed, 57 insertions(+), 7 deletions(-) diff --git a/xds/internal/balancer/cdsbalancer/cluster_handler_test.go b/xds/internal/balancer/cdsbalancer/cluster_handler_test.go index 40604110849c..603c9fcfc351 100644 --- a/xds/internal/balancer/cdsbalancer/cluster_handler_test.go +++ b/xds/internal/balancer/cdsbalancer/cluster_handler_test.go @@ -27,6 +27,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/internal/pretty" "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" @@ -170,13 +171,13 @@ func (s) TestAggregateClusterSuccess_ThenUpdateChildClusters(t *testing.T) { mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t) // Configure the management server with the aggregate cluster resource - // pointing to two child clusters. + // pointing to two child clusters. But don't include resource corresponding + // to the LogicalDNS cluster yet. resources := e2e.UpdateOptions{ NodeID: nodeID, Clusters: []*v3clusterpb.Cluster{ makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}), e2e.DefaultCluster(edsClusterName, serviceName, e2e.SecurityLevelNone), - makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort), }, SkipValidation: true, } @@ -186,6 +187,21 @@ func (s) TestAggregateClusterSuccess_ThenUpdateChildClusters(t *testing.T) { t.Fatal(err) } + // Verify that no configuration is pushed to the child policy yet, because + // not all clusters making up the aggregate cluster have been resolved yet. + select { + case cfg := <-lbCfgCh: + t.Fatalf("Child policy received configuration when not expected to: %s", pretty.ToJSON(cfg)) + case <-time.After(defaultTestShortTimeout): + } + + // Now configure the LogicalDNS cluster in the management server. This + // should result in configuration being pushed down to the child policy. + resources.Clusters = append(resources.Clusters, makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort)) + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + wantChildCfg := &clusterresolver.LBConfig{ DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{ { @@ -510,7 +526,9 @@ func (s) TestAggregatedClusterSuccess_DiamondDependency(t *testing.T) { mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t) // Configure the management server with an aggregate cluster resource having - // a diamond dependency pattern. + // a diamond dependency pattern. Don't configure the resource for cluster C + // yet. This will help us verify that no configuration is pushed to the + // child policy until the whole cluster graph is resolved. const ( clusterNameA = clusterName // cluster name in cds LB policy config clusterNameB = clusterName + "-B" @@ -522,7 +540,6 @@ func (s) TestAggregatedClusterSuccess_DiamondDependency(t *testing.T) { Clusters: []*v3clusterpb.Cluster{ makeAggregateClusterResource(clusterNameA, []string{clusterNameB, clusterNameC}), makeAggregateClusterResource(clusterNameB, []string{clusterNameD}), - makeAggregateClusterResource(clusterNameC, []string{clusterNameD}), e2e.DefaultCluster(clusterNameD, serviceName, e2e.SecurityLevelNone), }, SkipValidation: true, @@ -533,6 +550,22 @@ func (s) TestAggregatedClusterSuccess_DiamondDependency(t *testing.T) { t.Fatal(err) } + // Verify that no configuration is pushed to the child policy yet, because + // not all clusters making up the aggregate cluster have been resolved yet. + select { + case cfg := <-lbCfgCh: + t.Fatalf("Child policy received configuration when not expected to: %s", pretty.ToJSON(cfg)) + case <-time.After(defaultTestShortTimeout): + } + + // Now configure the resource for cluster C in the management server, + // thereby completing the cluster graph. This should result in configuration + // being pushed down to the child policy. + resources.Clusters = append(resources.Clusters, makeAggregateClusterResource(clusterNameC, []string{clusterNameD})) + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + wantChildCfg := &clusterresolver.LBConfig{ DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{{ Cluster: clusterNameD, @@ -557,7 +590,9 @@ func (s) TestAggregatedClusterSuccess_IgnoreDups(t *testing.T) { mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t) // Configure the management server with an aggregate cluster resource that - // has duplicates in the graph. + // has duplicates in the graph. Don't configure the resource for cluster C + // yet. This will help us verify that no configuration is pushed to the + // child policy until the whole cluster graph is resolved. const ( clusterNameA = clusterName // cluster name in cds LB policy config clusterNameB = clusterName + "-B" @@ -569,7 +604,6 @@ func (s) TestAggregatedClusterSuccess_IgnoreDups(t *testing.T) { Clusters: []*v3clusterpb.Cluster{ makeAggregateClusterResource(clusterNameA, []string{clusterNameB, clusterNameC}), makeAggregateClusterResource(clusterNameB, []string{clusterNameC, clusterNameD}), - e2e.DefaultCluster(clusterNameC, serviceName, e2e.SecurityLevelNone), e2e.DefaultCluster(clusterNameD, serviceName, e2e.SecurityLevelNone), }, SkipValidation: true, @@ -580,6 +614,22 @@ func (s) TestAggregatedClusterSuccess_IgnoreDups(t *testing.T) { t.Fatal(err) } + // Verify that no configuration is pushed to the child policy yet, because + // not all clusters making up the aggregate cluster have been resolved yet. + select { + case cfg := <-lbCfgCh: + t.Fatalf("Child policy received configuration when not expected to: %s", pretty.ToJSON(cfg)) + case <-time.After(defaultTestShortTimeout): + } + + // Now configure the resource for cluster C in the management server, + // thereby completing the cluster graph. This should result in configuration + // being pushed down to the child policy. + resources.Clusters = append(resources.Clusters, e2e.DefaultCluster(clusterNameC, serviceName, e2e.SecurityLevelNone)) + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + wantChildCfg := &clusterresolver.LBConfig{ DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{ { @@ -631,7 +681,7 @@ func (s) TestAggregatedCluster_NodeChildOfItself(t *testing.T) { select { case cfg := <-lbCfgCh: - t.Fatalf("Unexpected configuration pushed to child policy: %v", cfg) + t.Fatalf("Child policy received configuration when not expected to: %s", pretty.ToJSON(cfg)) case <-time.After(defaultTestShortTimeout): } From 9b1a66019ea16a7eba9c5bb36306f0302eab24a5 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Thu, 24 Aug 2023 12:37:44 +0000 Subject: [PATCH 4/6] rebase to master + switch to testutils.AwaitState --- xds/internal/balancer/cdsbalancer/cluster_handler_test.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/xds/internal/balancer/cdsbalancer/cluster_handler_test.go b/xds/internal/balancer/cdsbalancer/cluster_handler_test.go index 603c9fcfc351..68cb11d71332 100644 --- a/xds/internal/balancer/cdsbalancer/cluster_handler_test.go +++ b/xds/internal/balancer/cdsbalancer/cluster_handler_test.go @@ -463,11 +463,7 @@ func (s) TestAggregatedClusterFailure_ExceedsMaxStackDepth(t *testing.T) { t.Fatal(err) } - for state := cc.GetState(); state != connectivity.TransientFailure; state = cc.GetState() { - if !cc.WaitForStateChange(ctx, state) { - t.Fatalf("Timed out waiting for state change. got %v; want %v", state, connectivity.TransientFailure) - } - } + testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure) const wantErr = "aggregate cluster graph exceeds max depth" client := testgrpc.NewTestServiceClient(cc) From 2e470567a9cd13d7a5b05d025c90dabba2422456 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Fri, 25 Aug 2023 01:58:44 +0000 Subject: [PATCH 5/6] fix typo --- xds/internal/balancer/cdsbalancer/cluster_handler_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xds/internal/balancer/cdsbalancer/cluster_handler_test.go b/xds/internal/balancer/cdsbalancer/cluster_handler_test.go index 68cb11d71332..6c575178a813 100644 --- a/xds/internal/balancer/cdsbalancer/cluster_handler_test.go +++ b/xds/internal/balancer/cdsbalancer/cluster_handler_test.go @@ -66,7 +66,7 @@ func makeLogicalDNSClusterResource(name, dnsHost string, dnsPort uint32) *v3clus // leaf cluster. The management server sends two updates for the same leaf // cluster resource. The test verifies that the load balancing configuration // pushed to the cluster_resolver LB policy is contains the expected discovery -// mechanism corresponding to the leaf cluster, on both occassions. +// mechanism corresponding to the leaf cluster, on both occasions. func (s) TestClusterHandlerSuccess_LeafNode(t *testing.T) { tests := []struct { name string From 3bd8b9b7b992db977e5494f6cb476a594d717d1a Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Fri, 25 Aug 2023 02:07:54 +0000 Subject: [PATCH 6/6] final comments about comments --- .../cdsbalancer/cluster_handler_test.go | 36 +++++++++++-------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/xds/internal/balancer/cdsbalancer/cluster_handler_test.go b/xds/internal/balancer/cdsbalancer/cluster_handler_test.go index 6c575178a813..33ec8091583f 100644 --- a/xds/internal/balancer/cdsbalancer/cluster_handler_test.go +++ b/xds/internal/balancer/cdsbalancer/cluster_handler_test.go @@ -159,20 +159,22 @@ func (s) TestClusterHandlerSuccess_LeafNode(t *testing.T) { // Tests the case where the cluster resource requested by the cds LB policy is // an aggregate cluster root pointing to two child clusters, one of type EDS and -// the other of type LogicalDNS. Verifies that the load balancing configuration -// pushed to the cluster_resolver LB policy is as expected. The test then -// updates the aggregate cluster to point to two child clusters, the same leaf -// cluster of type EDS and a different leaf cluster of type LogicalDNS and -// verifies that the load balancing configuration pushed to the cluster_resolver -// LB policy contains the expected discovery mechanisms corresponding to the -// child clusters. +// the other of type LogicalDNS. The test verifies that load balancing +// configuration is pushed to the cluster_resolver LB policy only when all child +// clusters are resolved and it also verifies that the pushed configuration +// contains the expected discovery mechanisms. The test then updates the +// aggregate cluster to point to two child clusters, the same leaf cluster of +// type EDS and a different leaf cluster of type LogicalDNS and verifies that +// the load balancing configuration pushed to the cluster_resolver LB policy +// contains the expected discovery mechanisms. func (s) TestAggregateClusterSuccess_ThenUpdateChildClusters(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t) // Configure the management server with the aggregate cluster resource - // pointing to two child clusters. But don't include resource corresponding - // to the LogicalDNS cluster yet. + // pointing to two child clusters, one EDS and one LogicalDNS. Include the + // resource corresponding to the EDS cluster here, but don't include + // resource corresponding to the LogicalDNS cluster yet. resources := e2e.UpdateOptions{ NodeID: nodeID, Clusters: []*v3clusterpb.Cluster{ @@ -516,15 +518,17 @@ func (s) TestAggregatedClusterFailure_ExceedsMaxStackDepth(t *testing.T) { // Tests a diamond shaped aggregate cluster (A->[B,C]; B->D; C->D). Verifies // that the load balancing configuration pushed to the cluster_resolver LB -// policy specifies cluster D only once. +// policy specifies cluster D only once. Also verifies that configuration is +// pushed only after all child clusters are resolved. func (s) TestAggregatedClusterSuccess_DiamondDependency(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t) // Configure the management server with an aggregate cluster resource having - // a diamond dependency pattern. Don't configure the resource for cluster C - // yet. This will help us verify that no configuration is pushed to the - // child policy until the whole cluster graph is resolved. + // a diamond dependency pattern, (A->[B,C]; B->D; C->D). Includes resources + // for cluster A, B and D, but don't include the resource for cluster C yet. + // This will help us verify that no configuration is pushed to the child + // policy until the whole cluster graph is resolved. const ( clusterNameA = clusterName // cluster name in cds LB policy config clusterNameB = clusterName + "-B" @@ -580,13 +584,15 @@ func (s) TestAggregatedClusterSuccess_DiamondDependency(t *testing.T) { // C]; B->[C, D]). Verifies that the load balancing configuration pushed to the // cluster_resolver LB policy does not contain duplicates, and that the // discovery mechanism corresponding to cluster C is of higher priority than the -// discovery mechanism for cluser D. +// discovery mechanism for cluser D. Also verifies that the configuration is +// pushed only after all child clusters are resolved. func (s) TestAggregatedClusterSuccess_IgnoreDups(t *testing.T) { lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t) mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t) // Configure the management server with an aggregate cluster resource that - // has duplicates in the graph. Don't configure the resource for cluster C + // has duplicates in the graph, (A->[B, C]; B->[C, D]). Include resources + // for clusters A, B and D, but don't configure the resource for cluster C // yet. This will help us verify that no configuration is pushed to the // child policy until the whole cluster graph is resolved. const (