Skip to content

Commit

Permalink
xds: apply valid resources while NACKing update (grpc#8506)
Browse files Browse the repository at this point in the history
Implementing [gRFC A46](grpc/proposal#260)
  • Loading branch information
dapengzhang0 committed Sep 13, 2021
1 parent dbf9202 commit d25aa8e
Show file tree
Hide file tree
Showing 2 changed files with 248 additions and 96 deletions.
135 changes: 71 additions & 64 deletions xds/src/main/java/io/grpc/xds/ClientXdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ final class ClientXdsClient extends AbstractXdsClient {
protected void handleLdsResponse(String versionInfo, List<Any> resources, String nonce) {
Map<String, ParsedResource> parsedResources = new HashMap<>(resources.size());
Set<String> unpackedResources = new HashSet<>(resources.size());
Set<String> invalidResources = new HashSet<>();
List<String> errors = new ArrayList<>();
Set<String> retainedRdsResources = new HashSet<>();

Expand Down Expand Up @@ -222,6 +223,7 @@ protected void handleLdsResponse(String versionInfo, List<Any> resources, String
} catch (ResourceInvalidException e) {
errors.add(
"LDS response Listener '" + listenerName + "' validation error: " + e.getMessage());
invalidResources.add(listenerName);
continue;
}

Expand All @@ -231,19 +233,9 @@ protected void handleLdsResponse(String versionInfo, List<Any> resources, String
getLogger().log(XdsLogLevel.INFO,
"Received LDS Response version {0} nonce {1}. Parsed resources: {2}",
versionInfo, nonce, unpackedResources);

if (!errors.isEmpty()) {
handleResourcesRejected(ResourceType.LDS, unpackedResources, versionInfo, nonce, errors);
return;
}

handleResourcesAccepted(ResourceType.LDS, parsedResources, versionInfo, nonce);
for (String resource : rdsResourceSubscribers.keySet()) {
if (!retainedRdsResources.contains(resource)) {
ResourceSubscriber subscriber = rdsResourceSubscribers.get(resource);
subscriber.onAbsent();
}
}
handleResourceUpdate(
ResourceType.LDS, parsedResources, invalidResources, retainedRdsResources, versionInfo,
nonce, errors);
}

private LdsUpdate processClientSideListener(
Expand Down Expand Up @@ -1310,6 +1302,7 @@ static StructOrError<ClusterWeight> parseClusterWeight(
protected void handleRdsResponse(String versionInfo, List<Any> resources, String nonce) {
Map<String, ParsedResource> parsedResources = new HashMap<>(resources.size());
Set<String> unpackedResources = new HashSet<>(resources.size());
Set<String> invalidResources = new HashSet<>();
List<String> errors = new ArrayList<>();

for (int i = 0; i < resources.size(); i++) {
Expand Down Expand Up @@ -1337,6 +1330,7 @@ protected void handleRdsResponse(String versionInfo, List<Any> resources, String
errors.add(
"RDS response RouteConfiguration '" + routeConfigName + "' validation error: " + e
.getMessage());
invalidResources.add(routeConfigName);
continue;
}

Expand All @@ -1345,12 +1339,9 @@ protected void handleRdsResponse(String versionInfo, List<Any> resources, String
getLogger().log(XdsLogLevel.INFO,
"Received RDS Response version {0} nonce {1}. Parsed resources: {2}",
versionInfo, nonce, unpackedResources);

if (!errors.isEmpty()) {
handleResourcesRejected(ResourceType.RDS, unpackedResources, versionInfo, nonce, errors);
} else {
handleResourcesAccepted(ResourceType.RDS, parsedResources, versionInfo, nonce);
}
handleResourceUpdate(
ResourceType.RDS, parsedResources, invalidResources, Collections.<String>emptySet(),
versionInfo, nonce, errors);
}

private static RdsUpdate processRouteConfiguration(
Expand All @@ -1374,6 +1365,7 @@ private static RdsUpdate processRouteConfiguration(
protected void handleCdsResponse(String versionInfo, List<Any> resources, String nonce) {
Map<String, ParsedResource> parsedResources = new HashMap<>(resources.size());
Set<String> unpackedResources = new HashSet<>(resources.size());
Set<String> invalidResources = new HashSet<>();
List<String> errors = new ArrayList<>();
Set<String> retainedEdsResources = new HashSet<>();

Expand Down Expand Up @@ -1410,28 +1402,17 @@ protected void handleCdsResponse(String versionInfo, List<Any> resources, String
} catch (ResourceInvalidException e) {
errors.add(
"CDS response Cluster '" + clusterName + "' validation error: " + e.getMessage());
invalidResources.add(clusterName);
continue;
}
parsedResources.put(clusterName, new ParsedResource(cdsUpdate, resource));
}
getLogger().log(XdsLogLevel.INFO,
"Received CDS Response version {0} nonce {1}. Parsed resources: {2}",
versionInfo, nonce, unpackedResources);

if (!errors.isEmpty()) {
handleResourcesRejected(ResourceType.CDS, unpackedResources, versionInfo, nonce, errors);
return;
}

handleResourcesAccepted(ResourceType.CDS, parsedResources, versionInfo, nonce);
// CDS responses represents the state of the world, EDS resources not referenced in CDS
// resources should be deleted.
for (String resource : edsResourceSubscribers.keySet()) {
ResourceSubscriber subscriber = edsResourceSubscribers.get(resource);
if (!retainedEdsResources.contains(resource)) {
subscriber.onAbsent();
}
}
handleResourceUpdate(
ResourceType.CDS, parsedResources, invalidResources, retainedEdsResources, versionInfo,
nonce, errors);
}

@VisibleForTesting
Expand Down Expand Up @@ -1612,6 +1593,7 @@ private static StructOrError<CdsUpdate.Builder> parseNonAggregateCluster(
protected void handleEdsResponse(String versionInfo, List<Any> resources, String nonce) {
Map<String, ParsedResource> parsedResources = new HashMap<>(resources.size());
Set<String> unpackedResources = new HashSet<>(resources.size());
Set<String> invalidResources = new HashSet<>();
List<String> errors = new ArrayList<>();

for (int i = 0; i < resources.size(); i++) {
Expand Down Expand Up @@ -1646,16 +1628,17 @@ protected void handleEdsResponse(String versionInfo, List<Any> resources, String
} catch (ResourceInvalidException e) {
errors.add("EDS response ClusterLoadAssignment '" + clusterName
+ "' validation error: " + e.getMessage());
invalidResources.add(clusterName);
continue;
}
parsedResources.put(clusterName, new ParsedResource(edsUpdate, resource));
}

if (!errors.isEmpty()) {
handleResourcesRejected(ResourceType.EDS, unpackedResources, versionInfo, nonce, errors);
} else {
handleResourcesAccepted(ResourceType.EDS, parsedResources, versionInfo, nonce);
}
getLogger().log(
XdsLogLevel.INFO, "Received EDS Response version {0} nonce {1}. Parsed resources: {2}",
versionInfo, nonce, unpackedResources);
handleResourceUpdate(
ResourceType.EDS, parsedResources, invalidResources, Collections.<String>emptySet(),
versionInfo, nonce, errors);
}

private static EdsUpdate processClusterLoadAssignment(ClusterLoadAssignment assignment)
Expand Down Expand Up @@ -2045,43 +2028,67 @@ private void cleanUpResourceTimers() {
}
}

private void handleResourcesAccepted(
ResourceType type, Map<String, ParsedResource> parsedResources, String version,
String nonce) {
ackResponse(type, version, nonce);

private void handleResourceUpdate(
ResourceType type, Map<String, ParsedResource> parsedResources, Set<String> invalidResources,
Set<String> retainedResources, String version, String nonce, List<String> errors) {
String errorDetail = null;
if (errors.isEmpty()) {
checkArgument(invalidResources.isEmpty(), "found invalid resources but missing errors");
ackResponse(type, version, nonce);
} else {
errorDetail = Joiner.on('\n').join(errors);
getLogger().log(XdsLogLevel.WARNING,
"Failed processing {0} Response version {1} nonce {2}. Errors:\n{3}",
type, version, nonce, errorDetail);
nackResponse(type, nonce, errorDetail);
}
long updateTime = timeProvider.currentTimeNanos();
for (Map.Entry<String, ResourceSubscriber> entry : getSubscribedResourcesMap(type).entrySet()) {
String resourceName = entry.getKey();
ResourceSubscriber subscriber = entry.getValue();
// Attach error details to the subscribed resources that included in the ADS update.
if (invalidResources.contains(resourceName)) {
subscriber.onRejected(version, updateTime, errorDetail);
}
// Notify the watchers.
if (parsedResources.containsKey(resourceName)) {
subscriber.onData(parsedResources.get(resourceName), version, updateTime);
} else if (type == ResourceType.LDS || type == ResourceType.CDS) {
if (subscriber.data != null && invalidResources.contains(resourceName)) {
// Update is rejected but keep using the cached data.
if (type == ResourceType.LDS) {
LdsUpdate ldsUpdate = (LdsUpdate) subscriber.data;
io.grpc.xds.HttpConnectionManager hcm = ldsUpdate.httpConnectionManager();
if (hcm != null) {
String rdsName = hcm.rdsName();
if (rdsName != null) {
retainedResources.add(rdsName);
}
}
} else {
CdsUpdate cdsUpdate = (CdsUpdate) subscriber.data;
String edsName = cdsUpdate.edsServiceName();
if (edsName == null) {
edsName = cdsUpdate.clusterName();
}
retainedResources.add(edsName);
}
continue;
}
// For State of the World services, notify watchers when their watched resource is missing
// from the ADS update.
subscriber.onAbsent();
}
}
}

private void handleResourcesRejected(
ResourceType type, Set<String> unpackedResourceNames, String version,
String nonce, List<String> errors) {
String errorDetail = Joiner.on('\n').join(errors);
getLogger().log(XdsLogLevel.WARNING,
"Failed processing {0} Response version {1} nonce {2}. Errors:\n{3}",
type, version, nonce, errorDetail);
nackResponse(type, nonce, errorDetail);

long updateTime = timeProvider.currentTimeNanos();
for (Map.Entry<String, ResourceSubscriber> entry : getSubscribedResourcesMap(type).entrySet()) {
String resourceName = entry.getKey();
ResourceSubscriber subscriber = entry.getValue();

// Attach error details to the subscribed resources that included in the ADS update.
if (unpackedResourceNames.contains(resourceName)) {
subscriber.onRejected(version, updateTime, errorDetail);
// LDS/CDS responses represents the state of the world, RDS/EDS resources not referenced in
// LDS/CDS resources should be deleted.
if (type == ResourceType.LDS || type == ResourceType.CDS) {
Map<String, ResourceSubscriber> dependentSubscribers =
type == ResourceType.LDS ? rdsResourceSubscribers : edsResourceSubscribers;
for (String resource : dependentSubscribers.keySet()) {
if (!retainedResources.contains(resource)) {
dependentSubscribers.get(resource).onAbsent();
}
}
}
}
Expand Down
Loading

0 comments on commit d25aa8e

Please sign in to comment.