Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ESQL: Missing enrich policies on skip_unavailable=true clusters no longer fail the query #116972

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
c53f65c
Init commit EnrichResolution changed
quux00 Nov 18, 2024
ad7c1c4
Sympatico with the original branch - new changes start after this
quux00 Nov 18, 2024
65e3654
Changed away from Tuple<String, Boolean> to Map<String, Boolean> targ…
quux00 Nov 18, 2024
4733271
Minor cleanup after deciding to not handle errors for skip_un=true cl…
quux00 Nov 19, 2024
d7e4df5
Changed local cluster to map to skip_un=false for purposes of enrich …
quux00 Nov 19, 2024
10d13cc
Modified EnrichPolicyResolverTests to match new skip_unavailable beha…
quux00 Nov 19, 2024
3705eb3
The enrich policy re-resolution now also passes in the map of cluster…
quux00 Nov 19, 2024
ebc931e
Added missing enrich policy tests to RemoteClusterSecurityEsqlIT
quux00 Nov 19, 2024
7e4cfaf
Update docs/changelog/116972.yaml
quux00 Nov 19, 2024
e12d7dd
Merge remote-tracking branch 'elastic/main' into esql-ccs/skip_un-enr…
quux00 Nov 20, 2024
8acb25c
Minor changes based on PR feedback
quux00 Nov 20, 2024
22961c2
Merge remote-tracking branch 'elastic/main' into esql-ccs/skip_un-enr…
quux00 Nov 20, 2024
c9517ac
Added test from Pawan Karthik for RCS2 testing of unavailable remotes…
quux00 Nov 20, 2024
ac92ab1
Merge remote-tracking branch 'elastic/main' into esql-ccs/skip_un-enr…
quux00 Nov 20, 2024
404a01a
Merge remote-tracking branch 'elastic/main' into esql-ccs/skip_un-enr…
quux00 Nov 20, 2024
920f1d4
Merge remote-tracking branch 'elastic/main' into esql-ccs/skip_un-enr…
quux00 Nov 21, 2024
d94a7c5
PR feedback changes and added Pawan's CrossClusterEsqlRCS1EnrichUnava…
quux00 Nov 21, 2024
8117530
Merge remote-tracking branch 'elastic/main' into esql-ccs/skip_un-enr…
quux00 Nov 22, 2024
f503eac
Merge remote-tracking branch 'elastic/main' into esql-ccs/skip_un-enr…
quux00 Nov 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/116972.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 116972
summary: "ESQL: Missing enrich policies on skip_unavailable=true clusters do not fail\
\ the query"
area: ES|QL
type: enhancement
issues: []

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ public final class EnrichResolution {

private final Map<Key, ResolvedEnrichPolicy> resolvedPolicies = ConcurrentCollections.newConcurrentMap();
private final Map<Key, String> errors = ConcurrentCollections.newConcurrentMap();
private final Map<String, Exception> unavailableClusters = ConcurrentCollections.newConcurrentMap();
// skip_unavailable=true remote clusters that are unavailable or had errors when resolving the enrich policy
private final Map<String, Exception> unusableRemotes = ConcurrentCollections.newConcurrentMap();

public ResolvedEnrichPolicy getResolvedPolicy(String policyName, Enrich.Mode mode) {
return resolvedPolicies.get(new Key(policyName, mode));
Expand All @@ -34,11 +35,17 @@ public Collection<ResolvedEnrichPolicy> resolvedEnrichPolicies() {

}

// created for testing
public boolean hasErrors() {
return errors.size() > 0;
quux00 marked this conversation as resolved.
Show resolved Hide resolved
}

public String getError(String policyName, Enrich.Mode mode) {
final String error = errors.get(new Key(policyName, mode));
if (error != null) {
return error;
} else {
// TODO: I don't understand this code - why not just return null? Why is it wrong to call this when there's no errors?
assert false : "unresolved enrich policy [" + policyName + "] mode [" + mode + "]";
return "unresolved enrich policy [" + policyName + "] mode [" + mode + "]";
}
Expand All @@ -52,12 +59,16 @@ public void addError(String policyName, Enrich.Mode mode, String reason) {
errors.putIfAbsent(new Key(policyName, mode), reason);
}

public void addUnavailableCluster(String clusterAlias, Exception e) {
unavailableClusters.put(clusterAlias, e);
public void addUnusableRemote(String clusterAlias, Exception e) {
unusableRemotes.put(clusterAlias, e);
}

public Map<String, Exception> getUnavailableClusters() {
return unavailableClusters;
/**
* @return Remote clusters that are either unavailable (disconnected) or had a failure when resolving the enrich policy.
* Map key is cluster alias. Map value is the Exception showing the particular error encountered.
*/
public Map<String, Exception> unusableRemotes() {
return unusableRemotes;
}

private record Key(String policyName, Enrich.Mode mode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
Expand Down Expand Up @@ -53,7 +52,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -98,81 +96,111 @@ public record UnresolvedPolicy(String name, Enrich.Mode mode) {
/**
* Resolves a set of enrich policies
*
* @param targetClusters the target clusters
* @param targetClusters the target clusters; key: cluster alias; value: skip_unavailable setting (null for local cluster)
smalyshev marked this conversation as resolved.
Show resolved Hide resolved
* @param unresolvedPolicies the unresolved policies
* @param listener notified with the enrich resolution
*/
public void resolvePolicies(
Collection<String> targetClusters,
Map<String, Boolean> targetClusters,
Collection<UnresolvedPolicy> unresolvedPolicies,
ActionListener<EnrichResolution> listener
) {
if (unresolvedPolicies.isEmpty() || targetClusters.isEmpty()) {
listener.onResponse(new EnrichResolution());
return;
}
final Set<String> remoteClusters = new HashSet<>(targetClusters);
final boolean includeLocal = remoteClusters.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
lookupPolicies(remoteClusters, includeLocal, unresolvedPolicies, listener.map(lookupResponses -> {
final Map<String, Boolean> remotes = new HashMap<>(targetClusters);
final boolean includeLocal = targetClusters.containsKey(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
remotes.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
lookupPolicies(remotes.keySet(), includeLocal, unresolvedPolicies, listener.map(lookupResponses -> {
final EnrichResolution enrichResolution = new EnrichResolution();

Map<String, LookupResponse> lookupResponsesToProcess = new HashMap<>();

for (Map.Entry<String, LookupResponse> entry : lookupResponses.entrySet()) {
String clusterAlias = entry.getKey();
if (entry.getValue().connectionError != null) {
enrichResolution.addUnavailableCluster(clusterAlias, entry.getValue().connectionError);
enrichResolution.addUnusableRemote(clusterAlias, entry.getValue().connectionError);
// remove unavailable cluster from the list of clusters which is used below to create the ResolvedEnrichPolicy
remoteClusters.remove(clusterAlias);
Boolean removedVal = remotes.remove(clusterAlias);
assert removedVal != null : "Remote " + clusterAlias + " was not removed from the remotes list.";
smalyshev marked this conversation as resolved.
Show resolved Hide resolved
} else {
lookupResponsesToProcess.put(clusterAlias, entry.getValue());
}
}

for (UnresolvedPolicy unresolved : unresolvedPolicies) {
Tuple<ResolvedEnrichPolicy, String> resolved = mergeLookupResults(
MergedPolicyLookupResult resolved = mergeLookupResults(
unresolved,
calculateTargetClusters(unresolved.mode, includeLocal, remoteClusters),
calculateTargetClusters(unresolved.mode, includeLocal, remotes),
lookupResponsesToProcess
);

if (resolved.v1() != null) {
enrichResolution.addResolvedPolicy(unresolved.name, unresolved.mode, resolved.v1());
} else {
assert resolved.v2() != null;
enrichResolution.addError(unresolved.name, unresolved.mode, resolved.v2());
for (Map.Entry<String, Exception> entry : resolved.remotesToBeSkipped().entrySet()) {
enrichResolution.addUnusableRemote(entry.getKey(), entry.getValue());
}
// If a remote-only CCS is done and all the remotes are "unusable" (due to missing enrich policies)
// *and* skippable (skip_unavailable=true), don't fill in either the resolved policy or the error on
// the enrichResolution object. The only field with contents will be the "unusable remotes" field.
if (resolved.resolvedPolicy() != null) {
enrichResolution.addResolvedPolicy(unresolved.name, unresolved.mode, resolved.resolvedPolicy());
} else if (resolved.error() != null) {
enrichResolution.addError(unresolved.name, unresolved.mode, resolved.error());
}
}
return enrichResolution;
}));
}

private Collection<String> calculateTargetClusters(Enrich.Mode mode, boolean includeLocal, Set<String> remoteClusters) {
record MergedPolicyLookupResult(
@Nullable ResolvedEnrichPolicy resolvedPolicy,
Map<String, Exception> remotesToBeSkipped,
@Nullable String error
) {}

/**
* @param mode
* @param includeLocal
* @param remoteClusters
* @return Map where key: String=clusterAlias, value: Boolean=skipUnavailable setting
*/
private Map<String, Boolean> calculateTargetClusters(Enrich.Mode mode, boolean includeLocal, Map<String, Boolean> remoteClusters) {
return switch (mode) {
case ANY -> CollectionUtils.appendToCopy(remoteClusters, RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
case COORDINATOR -> List.of(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
case REMOTE -> includeLocal
? CollectionUtils.appendToCopy(remoteClusters, RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)
: remoteClusters;
case ANY -> copyAndAddLocalCluster(remoteClusters);
case COORDINATOR -> copyAndAddLocalCluster(Collections.emptyMap());
quux00 marked this conversation as resolved.
Show resolved Hide resolved
case REMOTE -> includeLocal ? copyAndAddLocalCluster(remoteClusters) : remoteClusters;
};
}

private Map<String, Boolean> copyAndAddLocalCluster(Map<String, Boolean> remoteClusters) {
Map<String, Boolean> newMap = new HashMap<>(remoteClusters);
// technically the local cluster has no skip_unavailable setting, but for enrich policy resolution
// purposes we treat errors on the local cluster as fatal, so set it as false to simplify downstream code
newMap.put(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, Boolean.FALSE);
return newMap;
}

/**
* Resolve an enrich policy by merging the lookup responses from the target clusters.
* @return a resolved enrich policy or an error
*/
private Tuple<ResolvedEnrichPolicy, String> mergeLookupResults(
private MergedPolicyLookupResult mergeLookupResults(
UnresolvedPolicy unresolved,
Collection<String> targetClusters,
Map<String, Boolean> targetClusters,
Map<String, LookupResponse> lookupResults
) {
String policyName = unresolved.name;
if (targetClusters.isEmpty()) {
return Tuple.tuple(null, "enrich policy [" + policyName + "] cannot be resolved since remote clusters are unavailable");
String error = "enrich policy [" + policyName + "] cannot be resolved since remote clusters are unavailable";
return new MergedPolicyLookupResult(null, Collections.emptyMap(), error);
}
// key for this map is cluster alias; a cluster will be in this map only if it has resolved the enrich policy/policies needed
final Map<String, ResolvedEnrichPolicy> policies = new HashMap<>();
final List<String> failures = new ArrayList<>();
for (String cluster : targetClusters) {
final List<String> failures = new ArrayList<>(); // (fatal) failures on local or skip_unavailable=false remote
final Map<String, Exception> remotesToBeSkipped = new HashMap<>(); // skip_unavailable=true remotes enrich policy failures
for (Map.Entry<String, Boolean> clusterInfo : targetClusters.entrySet()) {
String cluster = clusterInfo.getKey();
boolean skipUnavailable = clusterInfo.getValue();
LookupResponse lookupResult = lookupResults.get(cluster);
if (lookupResult != null) {
assert lookupResult.connectionError == null : "Should never have a non-null connectionError here";
Expand All @@ -182,35 +210,52 @@ private Tuple<ResolvedEnrichPolicy, String> mergeLookupResults(
} else {
final String failure = lookupResult.failures.get(policyName);
if (failure != null) {
failures.add(failure);
if (skipUnavailable) {
remotesToBeSkipped.put(cluster, new IllegalStateException(failure));
} else {
failures.add(failure);
}
} else if (skipUnavailable) {
// code path where there was no enrich policy found at all on the remote
remotesToBeSkipped.put(cluster, new IllegalStateException("failed to resolve enrich policy [" + policyName + "]"));
}
}
}
}
if (targetClusters.size() != policies.size()) {
if (targetClusters.size() != policies.size() + remotesToBeSkipped.size()) {
final String reason;
if (failures.isEmpty()) {
List<String> missingClusters = targetClusters.stream().filter(c -> policies.containsKey(c) == false).sorted().toList();
reason = missingPolicyError(policyName, targetClusters, missingClusters);
List<String> missingClusters = targetClusters.keySet()
Copy link
Contributor Author

@quux00 quux00 Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not convinced this block (under (if failures.isEmpty)) will ever execute now, but I was apprehensive to remove it. I couldn't find a way to enter this block based on my testing.

.stream()
.filter(c -> policies.containsKey(c) == false)
.sorted()
.toList();
reason = missingPolicyError(policyName, targetClusters.keySet(), missingClusters);
} else {
reason = "failed to resolve enrich policy [" + policyName + "]; reason " + failures;
}
return Tuple.tuple(null, reason);
return new MergedPolicyLookupResult(null, remotesToBeSkipped, reason);
} else if (policies.isEmpty()) {
// no target cluster had a valid enrich policy and all are skip_unavailable=true
assert targetClusters.values().stream().allMatch(b -> b == Boolean.TRUE) : "Not all target clusters are skip_unavailable=true";
return new MergedPolicyLookupResult(null, remotesToBeSkipped, null);
}

Map<String, EsField> mappings = new HashMap<>();
Map<String, String> concreteIndices = new HashMap<>();
ResolvedEnrichPolicy last = null;
// loop over clusters with a ResolvedEnrichPolicy - ensure no errors within the policy
Copy link
Contributor Author

@quux00 quux00 Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that mismatches across policies (being checked in the section below here) are still fatal errors for skip_unavailable=true clusters. I started down the road of having these be skippable errors, but that looks rather tricky to pull off. At a minimum, you'd have to partition the policies by "skip_unavailable" and build a canonical list of fields/types/etc. from skip_un=false clusters and then compare the skip_un=true clusters and then if any mismatches are found, those aren't fatal, but you pull that cluster out of the list to be resolved for field-caps. Not impossible but this section would require a significant rewrite, so I decided to only handle missing enrich policies (and policies that have errors on the remote cluster during resolution), but still fail them based on mismatches between policies.

for (Map.Entry<String, ResolvedEnrichPolicy> e : policies.entrySet()) {
ResolvedEnrichPolicy curr = e.getValue();
if (last != null && last.matchField().equals(curr.matchField()) == false) {
String error = "enrich policy [" + policyName + "] has different match fields ";
error += "[" + last.matchField() + ", " + curr.matchField() + "] across clusters";
return Tuple.tuple(null, error);
return new MergedPolicyLookupResult(null, remotesToBeSkipped, error);
}
if (last != null && last.matchType().equals(curr.matchType()) == false) {
String error = "enrich policy [" + policyName + "] has different match types ";
error += "[" + last.matchType() + ", " + curr.matchType() + "] across clusters";
return Tuple.tuple(null, error);
return new MergedPolicyLookupResult(null, remotesToBeSkipped, error);
}
// merge mappings
for (Map.Entry<String, EsField> m : curr.mapping().entrySet()) {
Expand All @@ -226,7 +271,7 @@ private Tuple<ResolvedEnrichPolicy, String> mergeLookupResults(
if (old != null && old.getDataType().equals(field.getDataType()) == false) {
String error = "field [" + m.getKey() + "] of enrich policy [" + policyName + "] has different data types ";
error += "[" + old.getDataType() + ", " + field.getDataType() + "] across clusters";
return Tuple.tuple(null, error);
return new MergedPolicyLookupResult(null, remotesToBeSkipped, error);
}
}
if (last != null) {
Expand All @@ -237,7 +282,8 @@ private Tuple<ResolvedEnrichPolicy, String> mergeLookupResults(
var diff = counts.entrySet().stream().filter(f -> f.getValue() < 2).map(Map.Entry::getKey).limit(20).sorted().toList();
if (diff.isEmpty() == false) {
String detailed = "these fields are missing in some policies: " + diff;
return Tuple.tuple(null, "enrich policy [" + policyName + "] has different enrich fields across clusters; " + detailed);
String fullMessage = "enrich policy [" + policyName + "] has different enrich fields across clusters; " + detailed;
return new MergedPolicyLookupResult(null, remotesToBeSkipped, fullMessage);
}
}
// merge concrete indices
Expand All @@ -246,7 +292,7 @@ private Tuple<ResolvedEnrichPolicy, String> mergeLookupResults(
}
assert last != null;
var resolved = new ResolvedEnrichPolicy(last.matchField(), last.matchType(), last.enrichFields(), concreteIndices, mappings);
return Tuple.tuple(resolved, null);
return new MergedPolicyLookupResult(resolved, remotesToBeSkipped, null);
}

private String missingPolicyError(String policyName, Collection<String> targetClusters, List<String> missingClusters) {
Expand Down
Loading