Skip to content

Commit

Permalink
xds: Remember nonces for unknown types
Browse files Browse the repository at this point in the history
If the control plane sends a resource type the client doesn't understand
at-the-moment, the control plane will still expect the client to include
the nonce if the client subscribes to the type in the future.

This most easily happens when unsubscribing the last resource of a type.
Which meant 1cf1927 was insufficient.
  • Loading branch information
ejona86 committed Jan 7, 2025
1 parent aba8a0c commit e3e343d
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 5 deletions.
8 changes: 4 additions & 4 deletions xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ XdsResourceType<?> fromTypeUrl(String typeUrl) {
private class AdsStream implements EventHandler<DiscoveryResponse> {
private boolean responseReceived;
private boolean closed;
// Response nonce for the most recently received discovery responses of each resource type.
// Response nonce for the most recently received discovery responses of each resource type URL.
// Client initiated requests start response nonce with empty string.
// Nonce in each response is echoed back in the following ACK/NACK request. It is
// used for management server to identify which response the client is ACKing/NACking.
Expand All @@ -289,7 +289,7 @@ private class AdsStream implements EventHandler<DiscoveryResponse> {
// map; nonces are only discarded once the stream closes because xds_protocol says "the
// management server should not send a DiscoveryResponse for any DiscoveryRequest that has a
// stale nonce."
private final Map<XdsResourceType<?>, String> respNonces = new HashMap<>();
private final Map<String, String> respNonces = new HashMap<>();
private final StreamingCall<DiscoveryRequest, DiscoveryResponse> call;
private final MethodDescriptor<DiscoveryRequest, DiscoveryResponse> methodDescriptor =
AggregatedDiscoveryServiceGrpc.getStreamAggregatedResourcesMethod();
Expand Down Expand Up @@ -337,7 +337,7 @@ void sendDiscoveryRequest(XdsResourceType<?> type, String versionInfo,
final void sendDiscoveryRequest(XdsResourceType<?> type, Collection<String> resources) {
logger.log(XdsLogLevel.INFO, "Sending {0} request for resources: {1}", type, resources);
sendDiscoveryRequest(type, versions.getOrDefault(type, ""), resources,
respNonces.getOrDefault(type, ""), null);
respNonces.getOrDefault(type.typeUrl(), ""), null);
}

@Override
Expand All @@ -352,6 +352,7 @@ public void onRecvMessage(DiscoveryResponse response) {
public void run() {
// Reset flag as message has been received on a stream
streamClosedNoResponse = false;
respNonces.put(response.getTypeUrl(), response.getNonce());
XdsResourceType<?> type = fromTypeUrl(response.getTypeUrl());
if (logger.isLoggable(XdsLogLevel.DEBUG)) {
logger.log(
Expand Down Expand Up @@ -387,7 +388,6 @@ final void handleRpcResponse(XdsResourceType<?> type, String versionInfo, List<A
return;
}
responseReceived = true;
respNonces.put(type, nonce);
ProcessingTracker processingTracker = new ProcessingTracker(
() -> call.startRecvMessage(), syncContext);
xdsResponseHandler.handleResourceResponse(type, serverInfo, versionInfo, resources, nonce,
Expand Down
5 changes: 4 additions & 1 deletion xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -2898,10 +2898,13 @@ public void edsCleanupNonceAfterUnsubscription() {
xdsClient.cancelXdsResourceWatch(XdsEndpointResource.getInstance(), "A.1", edsResourceWatcher);
verifySubscribedResourcesMetadataSizes(0, 0, 0, 0);
call.verifyRequest(EDS, Arrays.asList(), VERSION_1, "0000", NODE);
// The control plane can send an updated response for the empty subscription list, with a new
// nonce.
call.sendResponse(EDS, Arrays.asList(), VERSION_1, "0001");

// When re-subscribing, the version was forgotten but not the nonce
xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), "A.1", edsResourceWatcher);
call.verifyRequest(EDS, "A.1", "", "0000", NODE, Mockito.timeout(2000));
call.verifyRequest(EDS, "A.1", "", "0001", NODE, Mockito.timeout(2000));
}

@Test
Expand Down

0 comments on commit e3e343d

Please sign in to comment.