From 448ec4f37e6ade0b9e547fd50bece6a22a317bf3 Mon Sep 17 00:00:00 2001 From: Jiajing LU Date: Tue, 30 Jul 2024 23:46:01 +0800 Subject: [PATCH] xds: XdsClient should unsubscribe on last resource (#11264) Otherwise, the server will continue sending updates and if we re-subscribe to the last resource, the server won't re-send it. Also completely remove the per-type state, as it could only add confusion. --- .../grpc/xds/client/ControlPlaneClient.java | 10 +++- .../io/grpc/xds/client/XdsClientImpl.java | 2 +- .../grpc/xds/GrpcXdsClientImplTestBase.java | 46 ++++++++++++++++++- .../io/grpc/xds/GrpcXdsClientImplV3Test.java | 5 +- 4 files changed, 57 insertions(+), 6 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java b/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java index 761c10ede6a..3074d1120ad 100644 --- a/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java +++ b/xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java @@ -152,8 +152,14 @@ void adjustResourceSubscription(XdsResourceType resourceType) { startRpcStream(); } Collection resources = resourceStore.getSubscribedResources(serverInfo, resourceType); - if (resources != null) { - adsStream.sendDiscoveryRequest(resourceType, resources); + if (resources == null) { + resources = Collections.emptyList(); + } + adsStream.sendDiscoveryRequest(resourceType, resources); + if (resources.isEmpty()) { + // The resource type no longer has subscribing resources; clean up references to it + versions.remove(resourceType); + adsStream.respNonces.remove(resourceType); } } diff --git a/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java index 969660bf7d4..79147cd9862 100644 --- a/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java +++ b/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java @@ -281,7 +281,7 @@ public void cancelXdsResourceWatch(XdsResourceType @SuppressWarnings("unchecked") public void run() { ResourceSubscriber subscriber = - (ResourceSubscriber) resourceSubscribers.get(type).get(resourceName);; + (ResourceSubscriber) resourceSubscribers.get(type).get(resourceName); subscriber.removeWatcher(watcher); if (!subscriber.isWatched()) { subscriber.cancelResourceWatch(); diff --git a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java index fd276a849ce..6b04edcb9b8 100644 --- a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java +++ b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java @@ -133,6 +133,7 @@ import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; import org.mockito.stubbing.Answer; +import org.mockito.verification.VerificationMode; /** * Tests for {@link XdsClientImpl}. @@ -2757,6 +2758,37 @@ public void edsResourceNotFound() { verifySubscribedResourcesMetadataSizes(0, 0, 0, 1); } + @Test + public void edsCleanupNonceAfterUnsubscription() { + Assume.assumeFalse(ignoreResourceDeletion()); + + // Suppose we have an EDS subscription A.1 + xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), "A.1", edsResourceWatcher); + DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); + assertThat(call).isNotNull(); + call.verifyRequest(EDS, "A.1", "", "", NODE); + + // EDS -> {A.1}, version 1 + List dropOverloads = ImmutableList.of(); + List endpointsV1 = ImmutableList.of(lbEndpointHealthy); + ImmutableMap resourcesV1 = ImmutableMap.of( + "A.1", Any.pack(mf.buildClusterLoadAssignment("A.1", endpointsV1, dropOverloads))); + call.sendResponse(EDS, resourcesV1.values().asList(), VERSION_1, "0000"); + // {A.1} -> ACK, version 1 + call.verifyRequest(EDS, "A.1", VERSION_1, "0000", NODE); + verify(edsResourceWatcher, times(1)).onChanged(any()); + + // trigger an EDS resource unsubscription. + xdsClient.cancelXdsResourceWatch(XdsEndpointResource.getInstance(), "A.1", edsResourceWatcher); + verifySubscribedResourcesMetadataSizes(0, 0, 0, 0); + call.verifyRequest(EDS, Arrays.asList(), VERSION_1, "0000", NODE); + + // When re-subscribing, the version and nonce were properly forgotten, so the request is the + // same as the initial request + xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), "A.1", edsResourceWatcher); + call.verifyRequest(EDS, "A.1", "", "", NODE, Mockito.timeout(2000).times(2)); + } + @Test public void edsResponseErrorHandling_allResourcesFailedUnpack() { DiscoveryRpcCall call = startResourceWatcher(XdsEndpointResource.getInstance(), EDS_RESOURCE, @@ -3787,10 +3819,22 @@ protected abstract static class DiscoveryRpcCall { protected void verifyRequest( XdsResourceType type, List resources, String versionInfo, String nonce, - Node node) { + Node node, VerificationMode verificationMode) { throw new UnsupportedOperationException(); } + protected void verifyRequest( + XdsResourceType type, List resources, String versionInfo, String nonce, + Node node) { + verifyRequest(type, resources, versionInfo, nonce, node, Mockito.timeout(2000)); + } + + protected void verifyRequest( + XdsResourceType type, String resource, String versionInfo, String nonce, + Node node, VerificationMode verificationMode) { + verifyRequest(type, ImmutableList.of(resource), versionInfo, nonce, node, verificationMode); + } + protected void verifyRequest( XdsResourceType type, String resource, String versionInfo, String nonce, Node node) { verifyRequest(type, ImmutableList.of(resource), versionInfo, nonce, node); diff --git a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplV3Test.java b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplV3Test.java index 71d0895a252..2b2ce5cbd72 100644 --- a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplV3Test.java +++ b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplV3Test.java @@ -118,6 +118,7 @@ import org.mockito.ArgumentMatcher; import org.mockito.InOrder; import org.mockito.Mockito; +import org.mockito.verification.VerificationMode; /** * Tests for {@link XdsClientImpl} with protocol version v3. @@ -205,8 +206,8 @@ private DiscoveryRpcCallV3(StreamObserver requestObserver, @Override protected void verifyRequest( XdsResourceType type, List resources, String versionInfo, String nonce, - EnvoyProtoData.Node node) { - verify(requestObserver, Mockito.timeout(2000)).onNext(argThat(new DiscoveryRequestMatcher( + EnvoyProtoData.Node node, VerificationMode verificationMode) { + verify(requestObserver, verificationMode).onNext(argThat(new DiscoveryRequestMatcher( node.toEnvoyProtoNode(), versionInfo, resources, type.typeUrl(), nonce, null, null))); }