-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ES|QL CCS uses skip_unavailable setting for handling disconnected remote clusters #115266
Changes from all commits
da778ee
c784ebb
737ca7c
43cf559
6f73b4d
a79a32b
e84e0c8
52ce899
e8626c1
3027b58
21e849c
a7ca28d
2d4bc98
ac1cb85
22871a8
b3956f4
67454fd
e4fa48c
ab24843
1622065
5fbb373
3c95dfe
bb6b96c
06aa298
5fd33f0
59fe3e9
dc122c3
cd56a2e
dc2a75b
bcdaad3
9369d29
4b5c72b
db2a441
8eda8c8
2a4c06c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
pr: 115266 | ||
summary: ES|QL CCS uses `skip_unavailable` setting for handling disconnected remote | ||
clusters | ||
area: ES|QL | ||
type: enhancement | ||
issues: [ 114531 ] |
Large diffs are not rendered by default.
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,7 @@ | |
package org.elasticsearch.xpack.esql.action; | ||
|
||
import org.elasticsearch.TransportVersions; | ||
import org.elasticsearch.action.search.ShardSearchFailure; | ||
import org.elasticsearch.common.io.stream.StreamInput; | ||
import org.elasticsearch.common.io.stream.StreamOutput; | ||
import org.elasticsearch.common.io.stream.Writeable; | ||
|
@@ -281,6 +282,7 @@ public static class Cluster implements ToXContentFragment, Writeable { | |
private final Integer successfulShards; | ||
private final Integer skippedShards; | ||
private final Integer failedShards; | ||
private final List<ShardSearchFailure> failures; | ||
private final TimeValue took; // search latency for this cluster sub-search | ||
|
||
/** | ||
|
@@ -300,7 +302,7 @@ public String toString() { | |
} | ||
|
||
public Cluster(String clusterAlias, String indexExpression) { | ||
this(clusterAlias, indexExpression, true, Cluster.Status.RUNNING, null, null, null, null, null); | ||
this(clusterAlias, indexExpression, true, Cluster.Status.RUNNING, null, null, null, null, null, null); | ||
} | ||
|
||
/** | ||
|
@@ -312,7 +314,7 @@ public Cluster(String clusterAlias, String indexExpression) { | |
* @param skipUnavailable whether this Cluster is marked as skip_unavailable in remote cluster settings | ||
*/ | ||
public Cluster(String clusterAlias, String indexExpression, boolean skipUnavailable) { | ||
this(clusterAlias, indexExpression, skipUnavailable, Cluster.Status.RUNNING, null, null, null, null, null); | ||
this(clusterAlias, indexExpression, skipUnavailable, Cluster.Status.RUNNING, null, null, null, null, null, null); | ||
} | ||
|
||
/** | ||
|
@@ -324,7 +326,7 @@ public Cluster(String clusterAlias, String indexExpression, boolean skipUnavaila | |
* @param status current status of the search on this Cluster | ||
*/ | ||
public Cluster(String clusterAlias, String indexExpression, boolean skipUnavailable, Cluster.Status status) { | ||
this(clusterAlias, indexExpression, skipUnavailable, status, null, null, null, null, null); | ||
this(clusterAlias, indexExpression, skipUnavailable, status, null, null, null, null, null, null); | ||
} | ||
|
||
public Cluster( | ||
|
@@ -336,6 +338,7 @@ public Cluster( | |
Integer successfulShards, | ||
Integer skippedShards, | ||
Integer failedShards, | ||
List<ShardSearchFailure> failures, | ||
TimeValue took | ||
) { | ||
assert clusterAlias != null : "clusterAlias cannot be null"; | ||
|
@@ -349,6 +352,11 @@ public Cluster( | |
this.successfulShards = successfulShards; | ||
this.skippedShards = skippedShards; | ||
this.failedShards = failedShards; | ||
if (failures == null) { | ||
this.failures = List.of(); | ||
} else { | ||
this.failures = failures; | ||
} | ||
this.took = took; | ||
} | ||
|
||
|
@@ -362,6 +370,11 @@ public Cluster(StreamInput in) throws IOException { | |
this.failedShards = in.readOptionalInt(); | ||
this.took = in.readOptionalTimeValue(); | ||
this.skipUnavailable = in.readBoolean(); | ||
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_CCS_EXEC_INFO_WITH_FAILURES)) { | ||
this.failures = Collections.unmodifiableList(in.readCollectionAsList(ShardSearchFailure::readShardSearchFailure)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any reason why the collection is unmodifiable - within the ESQL code we're not using the defensive style since the collections are never modified, rather copied. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The EsqlExecution.Cluster class, like the SearchResponse.Cluster class it is based on, is immutable. You can only change state by coping an existing Cluster and swapping it in. I can change line 374 to use a mutable list, but it won't make any difference since the coding practice for this class is to never modify in place. In fact, the line below where I do |
||
} else { | ||
this.failures = List.of(); | ||
} | ||
} | ||
|
||
@Override | ||
|
@@ -375,6 +388,9 @@ public void writeTo(StreamOutput out) throws IOException { | |
out.writeOptionalInt(failedShards); | ||
out.writeOptionalTimeValue(took); | ||
out.writeBoolean(skipUnavailable); | ||
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_CCS_EXEC_INFO_WITH_FAILURES)) { | ||
out.writeCollection(failures); | ||
} | ||
} | ||
|
||
/** | ||
|
@@ -387,12 +403,12 @@ public void writeTo(StreamOutput out) throws IOException { | |
* All other fields can be set and override the value in the "copyFrom" Cluster. | ||
*/ | ||
public static class Builder { | ||
private String indexExpression; | ||
private Cluster.Status status; | ||
private Integer totalShards; | ||
private Integer successfulShards; | ||
private Integer skippedShards; | ||
private Integer failedShards; | ||
private List<ShardSearchFailure> failures; | ||
private TimeValue took; | ||
private final Cluster original; | ||
|
||
|
@@ -408,22 +424,18 @@ public Builder(Cluster copyFrom) { | |
public Cluster build() { | ||
return new Cluster( | ||
original.getClusterAlias(), | ||
indexExpression == null ? original.getIndexExpression() : indexExpression, | ||
original.getIndexExpression(), | ||
original.isSkipUnavailable(), | ||
status != null ? status : original.getStatus(), | ||
totalShards != null ? totalShards : original.getTotalShards(), | ||
successfulShards != null ? successfulShards : original.getSuccessfulShards(), | ||
skippedShards != null ? skippedShards : original.getSkippedShards(), | ||
failedShards != null ? failedShards : original.getFailedShards(), | ||
failures != null ? failures : original.getFailures(), | ||
took != null ? took : original.getTook() | ||
); | ||
} | ||
|
||
public Cluster.Builder setIndexExpression(String indexExpression) { | ||
this.indexExpression = indexExpression; | ||
return this; | ||
} | ||
|
||
public Cluster.Builder setStatus(Cluster.Status status) { | ||
this.status = status; | ||
return this; | ||
|
@@ -449,6 +461,11 @@ public Cluster.Builder setFailedShards(int failedShards) { | |
return this; | ||
} | ||
|
||
public Cluster.Builder setFailures(List<ShardSearchFailure> failures) { | ||
this.failures = failures; | ||
return this; | ||
} | ||
|
||
public Cluster.Builder setTook(TimeValue took) { | ||
this.took = took; | ||
return this; | ||
|
@@ -466,7 +483,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws | |
builder.field(STATUS_FIELD.getPreferredName(), getStatus().toString()); | ||
builder.field(INDICES_FIELD.getPreferredName(), indexExpression); | ||
if (took != null) { | ||
// TODO: change this to took_nanos and call took.nanos? | ||
builder.field(TOOK.getPreferredName(), took.millis()); | ||
} | ||
if (totalShards != null) { | ||
|
@@ -483,6 +499,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws | |
} | ||
builder.endObject(); | ||
} | ||
if (failures != null && failures.size() > 0) { | ||
builder.startArray(RestActions.FAILURES_FIELD.getPreferredName()); | ||
for (ShardSearchFailure failure : failures) { | ||
failure.toXContent(builder, params); | ||
} | ||
builder.endArray(); | ||
} | ||
} | ||
builder.endObject(); | ||
return builder; | ||
|
@@ -529,6 +552,10 @@ public Integer getFailedShards() { | |
return failedShards; | ||
} | ||
|
||
public List<ShardSearchFailure> getFailures() { | ||
return failures; | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) { | ||
if (this == o) return true; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,7 @@ | |
|
||
package org.elasticsearch.xpack.esql.enrich; | ||
|
||
import org.elasticsearch.ExceptionsHelper; | ||
import org.elasticsearch.action.ActionListener; | ||
import org.elasticsearch.action.ActionListenerResponseHandler; | ||
import org.elasticsearch.action.search.SearchRequest; | ||
|
@@ -50,6 +51,7 @@ | |
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.Collection; | ||
import java.util.Collections; | ||
import java.util.HashMap; | ||
import java.util.HashSet; | ||
import java.util.List; | ||
|
@@ -113,12 +115,27 @@ public void resolvePolicies( | |
final boolean includeLocal = remoteClusters.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); | ||
lookupPolicies(remoteClusters, includeLocal, unresolvedPolicies, listener.map(lookupResponses -> { | ||
final EnrichResolution enrichResolution = new EnrichResolution(); | ||
|
||
Map<String, LookupResponse> lookupResponsesToProcess = new HashMap<>(); | ||
|
||
for (Map.Entry<String, LookupResponse> entry : lookupResponses.entrySet()) { | ||
String clusterAlias = entry.getKey(); | ||
if (entry.getValue().connectionError != null) { | ||
enrichResolution.addUnavailableCluster(clusterAlias, entry.getValue().connectionError); | ||
// remove unavailable cluster from the list of clusters which is used below to create the ResolvedEnrichPolicy | ||
remoteClusters.remove(clusterAlias); | ||
} else { | ||
lookupResponsesToProcess.put(clusterAlias, entry.getValue()); | ||
} | ||
} | ||
|
||
for (UnresolvedPolicy unresolved : unresolvedPolicies) { | ||
Tuple<ResolvedEnrichPolicy, String> resolved = mergeLookupResults( | ||
unresolved, | ||
calculateTargetClusters(unresolved.mode, includeLocal, remoteClusters), | ||
lookupResponses | ||
lookupResponsesToProcess | ||
); | ||
|
||
if (resolved.v1() != null) { | ||
enrichResolution.addResolvedPolicy(unresolved.name, unresolved.mode, resolved.v1()); | ||
} else { | ||
|
@@ -149,13 +166,16 @@ private Tuple<ResolvedEnrichPolicy, String> mergeLookupResults( | |
Collection<String> targetClusters, | ||
Map<String, LookupResponse> lookupResults | ||
) { | ||
assert targetClusters.isEmpty() == false; | ||
String policyName = unresolved.name; | ||
if (targetClusters.isEmpty()) { | ||
return Tuple.tuple(null, "enrich policy [" + policyName + "] cannot be resolved since remote clusters are unavailable"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I couldn't find a test for this situation. Does it make sense to add one now? (I know there are several other follow up PRs in this area) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This code was added in response to one of the IT tests I added failing: In that case invariants in the code below are not met and assertion failures are thrown, so I added this to just leave the code immediately when there are no targetClusters, which can happen when the Enrich Mode type is "REMOTE" and the local cluster was not included in the query. |
||
} | ||
final Map<String, ResolvedEnrichPolicy> policies = new HashMap<>(); | ||
final List<String> failures = new ArrayList<>(); | ||
for (String cluster : targetClusters) { | ||
LookupResponse lookupResult = lookupResults.get(cluster); | ||
if (lookupResult != null) { | ||
assert lookupResult.connectionError == null : "Should never have a non-null connectionError here"; | ||
ResolvedEnrichPolicy policy = lookupResult.policies.get(policyName); | ||
if (policy != null) { | ||
policies.put(cluster, policy); | ||
|
@@ -261,22 +281,34 @@ private void lookupPolicies( | |
if (remotePolicies.isEmpty() == false) { | ||
for (String cluster : remoteClusters) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. By this point, as I understand, we have already run field caps resolution. So we may have some skipped clusters. Do we exclude them somewhere from the list of clusters for which we're going to resolve policies? Because it looks like it's pointless to try to resolve those policies if the cluster will be skipped anyway. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, the enrich policy resolution is done before the field-caps. I'm not sure why or if that's necessary for some reason or just historical accident. IMO, the field-caps call seems the logical one to do first, but that's not how it is, so we need to track unavailable clusters here and then pass that list to the callback listener which is the one that does the field-caps lookup: https://github.com/elastic/elasticsearch/pull/115266/files#diff-40060e2ec9003953a228c4a03bdc80a301949b0d4e3dccc1978798e33a992a73R345 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh ok, I was for some reason assuming field caps go first... then of course yes, enrich should be telling field caps which clusters are skipped, not the other way around. |
||
ActionListener<LookupResponse> lookupListener = refs.acquire(resp -> lookupResponses.put(cluster, resp)); | ||
getRemoteConnection( | ||
cluster, | ||
lookupListener.delegateFailureAndWrap( | ||
(delegate, connection) -> transportService.sendRequest( | ||
getRemoteConnection(cluster, new ActionListener<Transport.Connection>() { | ||
@Override | ||
public void onResponse(Transport.Connection connection) { | ||
transportService.sendRequest( | ||
connection, | ||
RESOLVE_ACTION_NAME, | ||
new LookupRequest(cluster, remotePolicies), | ||
TransportRequestOptions.EMPTY, | ||
new ActionListenerResponseHandler<>( | ||
delegate, | ||
LookupResponse::new, | ||
threadPool.executor(ThreadPool.Names.SEARCH) | ||
) | ||
) | ||
) | ||
); | ||
new ActionListenerResponseHandler<>(lookupListener.delegateResponse((l, e) -> { | ||
if (ExceptionsHelper.isRemoteUnavailableException(e) | ||
&& remoteClusterService.isSkipUnavailable(cluster)) { | ||
l.onResponse(new LookupResponse(e)); | ||
} else { | ||
l.onFailure(e); | ||
} | ||
}), LookupResponse::new, threadPool.executor(ThreadPool.Names.SEARCH)) | ||
); | ||
} | ||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
if (ExceptionsHelper.isRemoteUnavailableException(e) && remoteClusterService.isSkipUnavailable(cluster)) { | ||
lookupListener.onResponse(new LookupResponse(e)); | ||
} else { | ||
lookupListener.onFailure(e); | ||
} | ||
} | ||
}); | ||
Comment on lines
+292
to
+311
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since the code is similar between onFailure and onResponse, pull it out in a separate method. |
||
} | ||
} | ||
// local cluster | ||
|
@@ -323,16 +355,30 @@ public void writeTo(StreamOutput out) throws IOException { | |
private static class LookupResponse extends TransportResponse { | ||
final Map<String, ResolvedEnrichPolicy> policies; | ||
final Map<String, String> failures; | ||
// does not need to be Writable since this indicates a failure to contact a remote cluster, so only set on querying cluster | ||
final transient Exception connectionError; | ||
|
||
LookupResponse(Map<String, ResolvedEnrichPolicy> policies, Map<String, String> failures) { | ||
this.policies = policies; | ||
this.failures = failures; | ||
this.connectionError = null; | ||
} | ||
|
||
/** | ||
* Use this constructor when the remote cluster is unavailable to indicate inability to do the enrich policy lookup | ||
* @param connectionError Exception received when trying to connect to a remote cluster | ||
*/ | ||
LookupResponse(Exception connectionError) { | ||
this.policies = Collections.emptyMap(); | ||
this.failures = Collections.emptyMap(); | ||
this.connectionError = connectionError; | ||
} | ||
|
||
LookupResponse(StreamInput in) throws IOException { | ||
PlanStreamInput planIn = new PlanStreamInput(in, in.namedWriteableRegistry(), null); | ||
this.policies = planIn.readMap(StreamInput::readString, ResolvedEnrichPolicy::new); | ||
this.failures = planIn.readMap(StreamInput::readString, StreamInput::readString); | ||
this.connectionError = null; | ||
} | ||
|
||
@Override | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this.failures = failures == null ? emptyList() : failures