diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java b/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java index 66566d76037c..cf96b4d86e33 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java @@ -31,6 +31,10 @@ import com.google.common.collect.ImmutableList; import com.google.protobuf.Any; import com.google.protobuf.UInt32Value; +import io.envoyproxy.envoy.api.v2.Cluster; +import io.envoyproxy.envoy.api.v2.Cluster.DiscoveryType; +import io.envoyproxy.envoy.api.v2.Cluster.EdsClusterConfig; +import io.envoyproxy.envoy.api.v2.Cluster.LbPolicy; import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment; import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy; import io.envoyproxy.envoy.api.v2.DiscoveryRequest; @@ -42,6 +46,7 @@ import io.envoyproxy.envoy.api.v2.core.ConfigSource; import io.envoyproxy.envoy.api.v2.core.HealthStatus; import io.envoyproxy.envoy.api.v2.core.Node; +import io.envoyproxy.envoy.api.v2.core.SelfConfigSource; import io.envoyproxy.envoy.api.v2.core.SocketAddress; import io.envoyproxy.envoy.api.v2.listener.FilterChain; import io.envoyproxy.envoy.api.v2.route.RedirectAction; @@ -68,6 +73,8 @@ import io.grpc.xds.EnvoyProtoData.LbEndpoint; import io.grpc.xds.EnvoyProtoData.Locality; import io.grpc.xds.EnvoyProtoData.LocalityLbEndpoints; +import io.grpc.xds.XdsClient.ClusterUpdate; +import io.grpc.xds.XdsClient.ClusterWatcher; import io.grpc.xds.XdsClient.ConfigUpdate; import io.grpc.xds.XdsClient.ConfigWatcher; import io.grpc.xds.XdsClient.EndpointUpdate; @@ -78,6 +85,7 @@ import java.util.List; import java.util.Queue; import java.util.Set; +import javax.annotation.Nullable; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -134,8 +142,11 @@ public void uncaughtException(Thread t, Throwable e) { @Mock private ConfigWatcher configWatcher; @Mock + private ClusterWatcher clusterWatcher; + @Mock private EndpointWatcher endpointWatcher; + private String serverName; private ManagedChannel channel; private XdsClientImpl xdsClient; @@ -146,7 +157,7 @@ public void setUp() throws IOException { when(backoffPolicy1.nextBackoffNanos()).thenReturn(10L, 100L); when(backoffPolicy2.nextBackoffNanos()).thenReturn(20L, 200L); - String serverName = InProcessServerBuilder.generateName(); + serverName = InProcessServerBuilder.generateName(); AggregatedDiscoveryServiceImplBase serviceImpl = new AggregatedDiscoveryServiceImplBase() { @Override public StreamObserver streamAggregatedResources( @@ -1088,6 +1099,426 @@ public void routeConfigurationRemovedNotifiedToWatcher() { .isEqualTo("Listener for requested resource [foo.googleapis.com:8080] does not exist"); } + /** + * Client receives an CDS response that does not contain a Cluster for the requested resource + * while each received Cluster is valid. The CDS response is ACKed. Cluster watchers are notified + * with an error for resource not found. + */ + @Test + public void cdsResponseWithoutMatchingResource() { + xdsClient.watchClusterData("cluster-foo.googleapis.com", clusterWatcher); + StreamObserver responseObserver = responseObservers.poll(); + StreamObserver requestObserver = requestObservers.poll(); + + // Client sends a CDS request for the only cluster being watched to management server. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, ""))); + + // Management server sends back a CDS response without Cluster for the requested resource. + List clusters = ImmutableList.of( + Any.pack(buildCluster("cluster-bar.googleapis.com", null, false)), + Any.pack(buildCluster("cluster-baz.googleapis.com", null, false))); + DiscoveryResponse response = + buildDiscoveryResponse("0", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0000"); + responseObserver.onNext(response); + + // Client sent an ACK CDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("0", "cluster-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, "0000"))); + + ArgumentCaptor errorStatusCaptor = ArgumentCaptor.forClass(null); + verify(clusterWatcher).onError(errorStatusCaptor.capture()); + Status error = errorStatusCaptor.getValue(); + assertThat(error.getCode()).isEqualTo(Code.NOT_FOUND); + assertThat(error.getDescription()) + .isEqualTo("Requested cluster [cluster-foo.googleapis.com] does not exist"); + } + + /** + * Normal workflow of receiving a CDS response containing Cluster message for a requested + * cluster. + */ + @Test + public void cdsResponseWithMatchingResource() { + xdsClient.watchClusterData("cluster-foo.googleapis.com", clusterWatcher); + StreamObserver responseObserver = responseObservers.poll(); + StreamObserver requestObserver = requestObservers.poll(); + + // Client sends a CDS request for the only cluster being watched to management server. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, ""))); + + // Management server sends back a CDS response without Cluster for the requested resource. + List clusters = ImmutableList.of( + Any.pack(buildCluster("cluster-bar.googleapis.com", null, false)), + Any.pack(buildCluster("cluster-foo.googleapis.com", null, false)), + Any.pack(buildCluster("cluster-baz.googleapis.com", null, false))); + DiscoveryResponse response = + buildDiscoveryResponse("0", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0000"); + responseObserver.onNext(response); + + // Client sent an ACK CDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("0", "cluster-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, "0000"))); + + ArgumentCaptor clusterUpdateCaptor = ArgumentCaptor.forClass(null); + verify(clusterWatcher).onClusterChanged(clusterUpdateCaptor.capture()); + ClusterUpdate clusterUpdate = clusterUpdateCaptor.getValue(); + assertThat(clusterUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(clusterUpdate.getEdsServiceName()) + .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name + assertThat(clusterUpdate.getLbPolicy()).isEqualTo("round_robin"); + assertThat(clusterUpdate.isEnableLrs()).isEqualTo(false); + + // Management server sends back another CDS response updating the requested Cluster. + clusters = ImmutableList.of( + Any.pack(buildCluster("cluster-bar.googleapis.com", null, false)), + Any.pack( + buildCluster("cluster-foo.googleapis.com", "eds-cluster-foo.googleapis.com", true)), + Any.pack(buildCluster("cluster-baz.googleapis.com", null, false))); + response = + buildDiscoveryResponse("1", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0001"); + responseObserver.onNext(response); + + // Client sent an ACK CDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("1", "cluster-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, "0001"))); + + verify(clusterWatcher, times(2)).onClusterChanged(clusterUpdateCaptor.capture()); + clusterUpdate = clusterUpdateCaptor.getValue(); + assertThat(clusterUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(clusterUpdate.getEdsServiceName()) + .isEqualTo("eds-cluster-foo.googleapis.com"); + assertThat(clusterUpdate.getLbPolicy()).isEqualTo("round_robin"); + assertThat(clusterUpdate.isEnableLrs()).isEqualTo(true); + assertThat(clusterUpdate.getLrsServerName()) + .isEqualTo(serverName); // same management server + } + + @Test + public void multipleClusterWatchers() { + ClusterWatcher watcher1 = mock(ClusterWatcher.class); + ClusterWatcher watcher2 = mock(ClusterWatcher.class); + ClusterWatcher watcher3 = mock(ClusterWatcher.class); + xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher1); + xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher2); + xdsClient.watchClusterData("cluster-bar.googleapis.com", watcher3); + + StreamObserver responseObserver = responseObservers.poll(); + StreamObserver requestObserver = requestObservers.poll(); + + // Client sends a CDS request containing all clusters being watched to management server. + verify(requestObserver) + .onNext( + argThat( + new DiscoveryRequestMatcher("", + ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"), + XdsClientImpl.ADS_TYPE_URL_CDS, ""))); + + // Management server sends back a CDS response contains Cluster for only one of + // requested cluster. + List clusters = ImmutableList.of( + Any.pack(buildCluster("cluster-foo.googleapis.com", null, false))); + DiscoveryResponse response = + buildDiscoveryResponse("0", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0000"); + responseObserver.onNext(response); + + // Client sent an ACK CDS request. + verify(requestObserver) + .onNext( + argThat( + new DiscoveryRequestMatcher("0", + ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"), + XdsClientImpl.ADS_TYPE_URL_CDS, "0000"))); + + // Two watchers get notification of cluster update for the cluster they are interested in. + ArgumentCaptor clusterUpdateCaptor1 = ArgumentCaptor.forClass(null); + verify(watcher1).onClusterChanged(clusterUpdateCaptor1.capture()); + ClusterUpdate clusterUpdate1 = clusterUpdateCaptor1.getValue(); + assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(clusterUpdate1.getEdsServiceName()) + .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name + assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin"); + assertThat(clusterUpdate1.isEnableLrs()).isEqualTo(false); + + ArgumentCaptor clusterUpdateCaptor2 = ArgumentCaptor.forClass(null); + verify(watcher2).onClusterChanged(clusterUpdateCaptor2.capture()); + ClusterUpdate clusterUpdate2 = clusterUpdateCaptor2.getValue(); + assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(clusterUpdate2.getEdsServiceName()) + .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name + assertThat(clusterUpdate2.getLbPolicy()).isEqualTo("round_robin"); + assertThat(clusterUpdate2.isEnableLrs()).isEqualTo(false); + + // The other watcher gets an error notification for cluster not found. + ArgumentCaptor errorStatusCaptor = ArgumentCaptor.forClass(null); + verify(watcher3).onError(errorStatusCaptor.capture()); + Status error = errorStatusCaptor.getValue(); + assertThat(error.getCode()).isEqualTo(Code.NOT_FOUND); + assertThat(error.getDescription()) + .isEqualTo("Requested cluster [cluster-bar.googleapis.com] does not exist"); + + // Management server sends back another CDS response contains Clusters for all + // requested clusters. + clusters = ImmutableList.of( + Any.pack(buildCluster("cluster-foo.googleapis.com", null, false)), + Any.pack( + buildCluster("cluster-bar.googleapis.com", + "eds-cluster-bar.googleapis.com", true))); + response = buildDiscoveryResponse("1", clusters, + XdsClientImpl.ADS_TYPE_URL_CDS, "0001"); + responseObserver.onNext(response); + + // Client sent an ACK CDS request. + verify(requestObserver) + .onNext( + argThat( + new DiscoveryRequestMatcher("1", + ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"), + XdsClientImpl.ADS_TYPE_URL_CDS, "0001"))); + + // All watchers received notification for cluster update. + verify(watcher1, times(2)).onClusterChanged(clusterUpdateCaptor1.capture()); + clusterUpdate1 = clusterUpdateCaptor1.getValue(); + assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(clusterUpdate1.getEdsServiceName()) + .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name + assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin"); + assertThat(clusterUpdate1.isEnableLrs()).isEqualTo(false); + + clusterUpdateCaptor2 = ArgumentCaptor.forClass(null); + verify(watcher2, times(2)).onClusterChanged(clusterUpdateCaptor2.capture()); + clusterUpdate2 = clusterUpdateCaptor2.getValue(); + assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(clusterUpdate2.getEdsServiceName()) + .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name + assertThat(clusterUpdate2.getLbPolicy()).isEqualTo("round_robin"); + assertThat(clusterUpdate2.isEnableLrs()).isEqualTo(false); + + ArgumentCaptor clusterUpdateCaptor3 = ArgumentCaptor.forClass(null); + verify(watcher3).onClusterChanged(clusterUpdateCaptor3.capture()); + ClusterUpdate clusterUpdate3 = clusterUpdateCaptor3.getValue(); + assertThat(clusterUpdate3.getClusterName()).isEqualTo("cluster-bar.googleapis.com"); + assertThat(clusterUpdate3.getEdsServiceName()) + .isEqualTo("eds-cluster-bar.googleapis.com"); + assertThat(clusterUpdate3.getLbPolicy()).isEqualTo("round_robin"); + assertThat(clusterUpdate3.isEnableLrs()).isEqualTo(true); + assertThat(clusterUpdate3.getLrsServerName()) + .isEqualTo(serverName); // same management server + } + + /** + * (CDS response caching behavior) Adding cluster watchers interested in some cluster that + * some other endpoint watcher had already been watching on will result in cluster update + * notified to the newly added watcher immediately, without sending new CDS requests. + */ + @Test + public void receivedClusterUpdateNotifiedToWatcherImmediately() { + ClusterWatcher watcher1 = mock(ClusterWatcher.class); + xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher1); + + // Streaming RPC starts after a first watcher is added. + StreamObserver responseObserver = responseObservers.poll(); + StreamObserver requestObserver = requestObservers.poll(); + + // Client sends an CDS request to management server. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, ""))); + + // Management server sends back an CDS response with Cluster for the requested + // cluster. + List clusters = ImmutableList.of( + Any.pack(buildCluster("cluster-foo.googleapis.com", null, false))); + DiscoveryResponse response = + buildDiscoveryResponse("0", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0000"); + responseObserver.onNext(response); + + // Client sent an ACK CDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("0", "cluster-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, "0000"))); + + ArgumentCaptor clusterUpdateCaptor1 = ArgumentCaptor.forClass(null); + verify(watcher1).onClusterChanged(clusterUpdateCaptor1.capture()); + ClusterUpdate clusterUpdate1 = clusterUpdateCaptor1.getValue(); + assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(clusterUpdate1.getEdsServiceName()) + .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name + assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin"); + assertThat(clusterUpdate1.isEnableLrs()).isEqualTo(false); + + // Another cluster watcher interested in the same cluster is added. + ClusterWatcher watcher2 = mock(ClusterWatcher.class); + xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher2); + + // Since the client has received cluster update for this cluster before, cached result is + // notified to the newly added watcher immediately. + ArgumentCaptor clusterUpdateCaptor2 = ArgumentCaptor.forClass(null); + verify(watcher2).onClusterChanged(clusterUpdateCaptor2.capture()); + ClusterUpdate clusterUpdate2 = clusterUpdateCaptor2.getValue(); + assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(clusterUpdate2.getEdsServiceName()) + .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name + assertThat(clusterUpdate2.getLbPolicy()).isEqualTo("round_robin"); + assertThat(clusterUpdate2.isEnableLrs()).isEqualTo(false); + + verifyNoMoreInteractions(requestObserver); + } + + @Test + public void addRemoveClusterWatchersFreely() { + ClusterWatcher watcher1 = mock(ClusterWatcher.class); + xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher1); + + // Streaming RPC starts after a first watcher is added. + StreamObserver responseObserver = responseObservers.poll(); + StreamObserver requestObserver = requestObservers.poll(); + + // Client sends an CDS request to management server. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("", "cluster-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, ""))); + + // Management server sends back a CDS response with Cluster for the requested + // cluster. + List clusters = ImmutableList.of( + Any.pack(buildCluster("cluster-foo.googleapis.com", null, false))); + DiscoveryResponse response = + buildDiscoveryResponse("0", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0000"); + responseObserver.onNext(response); + + // Client sent an ACK CDS request. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("0", "cluster-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, "0000"))); + + ArgumentCaptor clusterUpdateCaptor1 = ArgumentCaptor.forClass(null); + verify(watcher1).onClusterChanged(clusterUpdateCaptor1.capture()); + ClusterUpdate clusterUpdate1 = clusterUpdateCaptor1.getValue(); + assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(clusterUpdate1.getEdsServiceName()) + .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name + assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin"); + assertThat(clusterUpdate1.isEnableLrs()).isEqualTo(false); + + // Add another cluster watcher for a different cluster. + ClusterWatcher watcher2 = mock(ClusterWatcher.class); + xdsClient.watchClusterData("cluster-bar.googleapis.com", watcher2); + + // Client sent a new CDS request for all interested resources. + verify(requestObserver) + .onNext( + argThat( + new DiscoveryRequestMatcher("0", + ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"), + XdsClientImpl.ADS_TYPE_URL_CDS, "0000"))); + + // Management server sends back a CDS response with Cluster for all requested cluster. + clusters = ImmutableList.of( + Any.pack(buildCluster("cluster-foo.googleapis.com", null, false)), + Any.pack( + buildCluster("cluster-bar.googleapis.com", + "eds-cluster-bar.googleapis.com", true))); + response = buildDiscoveryResponse("1", clusters, + XdsClientImpl.ADS_TYPE_URL_CDS, "0001"); + responseObserver.onNext(response); + + // Client sent an ACK CDS request for all interested resources. + verify(requestObserver) + .onNext( + argThat( + new DiscoveryRequestMatcher("1", + ImmutableList.of("cluster-foo.googleapis.com", "cluster-bar.googleapis.com"), + XdsClientImpl.ADS_TYPE_URL_CDS, "0001"))); + + verify(watcher1, times(2)).onClusterChanged(clusterUpdateCaptor1.capture()); + clusterUpdate1 = clusterUpdateCaptor1.getValue(); + assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(clusterUpdate1.getEdsServiceName()) + .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name + assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin"); + assertThat(clusterUpdate1.isEnableLrs()).isEqualTo(false); + + ArgumentCaptor clusterUpdateCaptor2 = ArgumentCaptor.forClass(null); + verify(watcher2).onClusterChanged(clusterUpdateCaptor2.capture()); + ClusterUpdate clusterUpdate2 = clusterUpdateCaptor2.getValue(); + assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-bar.googleapis.com"); + assertThat(clusterUpdate2.getEdsServiceName()) + .isEqualTo("eds-cluster-bar.googleapis.com"); + assertThat(clusterUpdate2.getLbPolicy()).isEqualTo("round_robin"); + assertThat(clusterUpdate2.isEnableLrs()).isEqualTo(true); + assertThat(clusterUpdate2.getLrsServerName()).isEqualTo(serverName); // same management server + + // Cancel one of the watcher. + xdsClient.cancelClusterDataWatch("cluster-foo.googleapis.com", watcher1); + + // Since the cancelled watcher was the last watcher interested in that cluster (but there + // is still interested resource), client sent an new CDS request to unsubscribe from + // that cluster. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("1", "cluster-bar.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, "0001"))); + + // Management server has nothing to respond. + + // Cancel the other watcher. All resources have been unsubscribed. + xdsClient.cancelClusterDataWatch("cluster-bar.googleapis.com", watcher2); + + // All endpoint watchers have been cancelled. + + // Management server sends back a new CDS response. + clusters = ImmutableList.of( + Any.pack(buildCluster("cluster-foo.googleapis.com", null, true)), + Any.pack( + buildCluster("cluster-bar.googleapis.com", null, false))); + response = + buildDiscoveryResponse("2", clusters, XdsClientImpl.ADS_TYPE_URL_CDS, "0002"); + responseObserver.onNext(response); + + // Client sent an ACK CDS request, with resource_names containing the last + // unsubscribed resource. + // FIXME(chengyuanzhang): this is a workaround for CDS protocol, should not verify this. + verify(requestObserver) + .onNext( + argThat( + new DiscoveryRequestMatcher("2", + ImmutableList.of("cluster-bar.googleapis.com"), + XdsClientImpl.ADS_TYPE_URL_CDS, "0002"))); + + // Cancelled watchers do not receive notification. + verifyNoMoreInteractions(watcher1, watcher2); + + // A new cluster watcher is added to watch an old cluster. + ClusterWatcher watcher3 = mock(ClusterWatcher.class); + xdsClient.watchClusterData("cluster-foo.googleapis.com", watcher3); + + // Notified with cached data immediately. + ArgumentCaptor clusterUpdateCaptor3 = ArgumentCaptor.forClass(null); + verify(watcher3).onClusterChanged(clusterUpdateCaptor3.capture()); + ClusterUpdate clusterUpdate3 = clusterUpdateCaptor3.getValue(); + assertThat(clusterUpdate3.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); + assertThat(clusterUpdate3.getEdsServiceName()) + .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name + assertThat(clusterUpdate3.getLbPolicy()).isEqualTo("round_robin"); + assertThat(clusterUpdate3.isEnableLrs()).isEqualTo(true); + assertThat(clusterUpdate2.getLrsServerName()).isEqualTo(serverName); // same management server + + // A CDS request is sent to re-subscribe the cluster again. + verify(requestObserver) + .onNext(eq(buildDiscoveryRequest("2", "cluster-foo.googleapis.com", + XdsClientImpl.ADS_TYPE_URL_CDS, "0002"))); + } + /** * Client receives an EDS response that does not contain a ClusterLoadAssignment for the * requested resource while each received ClusterLoadAssignment is valid. @@ -1836,6 +2267,26 @@ private static VirtualHost buildVirtualHost(List domains, String cluster .build(); } + private static Cluster buildCluster(String clusterName, @Nullable String edsServiceName, + boolean enableLrs) { + Cluster.Builder clusterBuilder = Cluster.newBuilder(); + clusterBuilder.setName(clusterName); + clusterBuilder.setType(DiscoveryType.EDS); + EdsClusterConfig.Builder edsClusterConfigBuilder = EdsClusterConfig.newBuilder(); + edsClusterConfigBuilder.setEdsConfig( + ConfigSource.newBuilder().setAds(AggregatedConfigSource.getDefaultInstance())); + if (edsServiceName != null) { + edsClusterConfigBuilder.setServiceName(edsServiceName); + } + clusterBuilder.setEdsClusterConfig(edsClusterConfigBuilder); + clusterBuilder.setLbPolicy(LbPolicy.ROUND_ROBIN); + if (enableLrs) { + clusterBuilder.setLrsServer( + ConfigSource.newBuilder().setSelf(SelfConfigSource.getDefaultInstance())); + } + return clusterBuilder.build(); + } + private static ClusterLoadAssignment buildClusterLoadAssignment(String clusterName, List localityLbEndpoints, List dropOverloads) {