diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java index f2ab0355304b3..80bb2afe57122 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java @@ -229,7 +229,7 @@ public Iterator toXContentChunked(ToXContent.Params params b.field(SKIPPED_FIELD.getPreferredName(), getClusterStateCount(Cluster.Status.SKIPPED)); b.field(PARTIAL_FIELD.getPreferredName(), getClusterStateCount(Cluster.Status.PARTIAL)); b.field(FAILED_FIELD.getPreferredName(), getClusterStateCount(Cluster.Status.FAILED)); - // each clusterinfo defines its own field object name + // each Cluster object defines its own field object name b.xContentObject("details", clusterInfo.values().iterator()); }); } @@ -352,11 +352,7 @@ public Cluster( this.successfulShards = successfulShards; this.skippedShards = skippedShards; this.failedShards = failedShards; - if (failures == null) { - this.failures = List.of(); - } else { - this.failures = failures; - } + this.failures = failures == null ? Collections.emptyList() : failures; this.took = took; } @@ -373,7 +369,7 @@ public Cluster(StreamInput in) throws IOException { if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_CCS_EXEC_INFO_WITH_FAILURES)) { this.failures = Collections.unmodifiableList(in.readCollectionAsList(ShardSearchFailure::readShardSearchFailure)); } else { - this.failures = List.of(); + this.failures = Collections.emptyList(); } } @@ -475,7 +471,7 @@ public Cluster.Builder setTook(TimeValue took) { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { String name = clusterAlias; - if (clusterAlias.equals("")) { + if (clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) { name = LOCAL_CLUSTER_NAME_REPRESENTATION; } builder.startObject(name); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java index 77ef5ef597bb5..c8a7a6bcc4e98 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java @@ -289,24 +289,17 @@ public void onResponse(Transport.Connection connection) { RESOLVE_ACTION_NAME, new LookupRequest(cluster, remotePolicies), TransportRequestOptions.EMPTY, - new ActionListenerResponseHandler<>(lookupListener.delegateResponse((l, e) -> { - if (ExceptionsHelper.isRemoteUnavailableException(e) - && remoteClusterService.isSkipUnavailable(cluster)) { - l.onResponse(new LookupResponse(e)); - } else { - l.onFailure(e); - } - }), LookupResponse::new, threadPool.executor(ThreadPool.Names.SEARCH)) + new ActionListenerResponseHandler<>( + lookupListener.delegateResponse((l, e) -> failIfSkipUnavailableFalse(e, cluster, l)), + LookupResponse::new, + threadPool.executor(ThreadPool.Names.SEARCH) + ) ); } @Override public void onFailure(Exception e) { - if (ExceptionsHelper.isRemoteUnavailableException(e) && remoteClusterService.isSkipUnavailable(cluster)) { - lookupListener.onResponse(new LookupResponse(e)); - } else { - lookupListener.onFailure(e); - } + failIfSkipUnavailableFalse(e, cluster, lookupListener); } }); } @@ -331,6 +324,14 @@ public void onFailure(Exception e) { } } + private void failIfSkipUnavailableFalse(Exception e, String cluster, ActionListener lookupListener) { + if (ExceptionsHelper.isRemoteUnavailableException(e) && remoteClusterService.isSkipUnavailable(cluster)) { + lookupListener.onResponse(new LookupResponse(e)); + } else { + lookupListener.onFailure(e); + } + } + private static class LookupRequest extends TransportRequest { private final String clusterAlias; private final Collection policyNames; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index a4405c32ff91c..504689fdac39b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -149,7 +149,7 @@ public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, P analyzedPlan( parse(request.query(), request.params()), executionInfo, - new CcsUtils.CssPartialErrorsActionListener(executionInfo, listener) { + new EsqlSessionCCSUtils.CssPartialErrorsActionListener(executionInfo, listener) { @Override public void onResponse(LogicalPlan analyzedPlan) { executeOptimizedPlan(request, executionInfo, planRunner, optimizedPlan(analyzedPlan), listener); @@ -171,7 +171,7 @@ public void executeOptimizedPlan( ) { PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan, request); // TODO: this could be snuck into the underlying listener - CcsUtils.updateExecutionInfoAtEndOfPlanning(executionInfo); + EsqlSessionCCSUtils.updateExecutionInfoAtEndOfPlanning(executionInfo); // execute any potential subplans executeSubPlans(physicalPlan, planRunner, executionInfo, request, listener); } @@ -308,8 +308,8 @@ private void preAnalyze( // TODO in follow-PR (for skip_unavailble handling of missing concrete indexes) add some tests for invalid index // resolution to updateExecutionInfo if (indexResolution.isValid()) { - CcsUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); - CcsUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.getUnavailableClusters()); + EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); + EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.getUnavailableClusters()); if (executionInfo.isCrossClusterSearch() && executionInfo.getClusterStateCount(EsqlExecutionInfo.Cluster.Status.RUNNING) == 0) { // for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel @@ -383,7 +383,7 @@ private void preAnalyzeIndices( } // if the preceding call to the enrich policy API found unavailable clusters, recreate the index expression to search // based only on available clusters (which could now be an empty list) - String indexExpressionToResolve = CcsUtils.createIndexExpressionFromAvailableClusters(executionInfo); + String indexExpressionToResolve = EsqlSessionCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo); if (indexExpressionToResolve.isEmpty()) { // if this was a pure remote CCS request (no local indices) and all remotes are offline, return an empty IndexResolution listener.onResponse(IndexResolution.valid(new EsIndex(table.index(), Map.of(), Map.of()))); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/CcsUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java similarity index 66% rename from x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/CcsUtils.java rename to x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java index a9314e6f65d87..80709d8f6c4f7 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/CcsUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java @@ -23,14 +23,30 @@ import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -class CcsUtils { +class EsqlSessionCCSUtils { - private CcsUtils() {} + private EsqlSessionCCSUtils() {} + + // visible for testing + static Map determineUnavailableRemoteClusters(List failures) { + Map unavailableRemotes = new HashMap<>(); + for (FieldCapabilitiesFailure failure : failures) { + if (ExceptionsHelper.isRemoteUnavailableException(failure.getException())) { + for (String indexExpression : failure.getIndices()) { + if (indexExpression.indexOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR) > 0) { + unavailableRemotes.put(RemoteClusterAware.parseClusterAlias(indexExpression), failure); + } + } + } + } + return unavailableRemotes; + } /** * ActionListener that receives LogicalPlan or error from logical planning. @@ -46,70 +62,73 @@ abstract static class CssPartialErrorsActionListener implements ActionListener { + EsqlExecutionInfo.Cluster.Builder builder = new EsqlExecutionInfo.Cluster.Builder(v).setTook(executionInfo.overallTook()) + .setTotalShards(0) + .setSuccessfulShards(0) + .setSkippedShards(0) + .setFailedShards(0); + if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) { + // never mark local cluster as skipped + builder.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL); } else { - exceptionForResponse = e; - } - for (String clusterAlias : executionInfo.clusterAliases()) { - executionInfo.swapCluster(clusterAlias, (k, v) -> { - EsqlExecutionInfo.Cluster.Builder builder = new EsqlExecutionInfo.Cluster.Builder(v).setTook( - executionInfo.overallTook() - ).setTotalShards(0).setSuccessfulShards(0).setSkippedShards(0).setFailedShards(0); - if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) { - // never mark local cluster as skipped - builder.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL); - } else { - builder.setStatus(EsqlExecutionInfo.Cluster.Status.SKIPPED); - // add this exception to the failures list only if there is no failure already recorded there - if (v.getFailures() == null || v.getFailures().size() == 0) { - builder.setFailures(List.of(new ShardSearchFailure(exceptionForResponse))); - } - } - return builder.build(); - }); + builder.setStatus(EsqlExecutionInfo.Cluster.Status.SKIPPED); + // add this exception to the failures list only if there is no failure already recorded there + if (v.getFailures() == null || v.getFailures().size() == 0) { + builder.setFailures(List.of(new ShardSearchFailure(exceptionForResponse))); + } } - listener.onResponse(new Result(Analyzer.NO_FIELDS, Collections.emptyList(), Collections.emptyList(), executionInfo)); - } else { - listener.onFailure(e); - } + return builder.build(); + }); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java index f76f7798dece8..210f991306bac 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java @@ -6,9 +6,7 @@ */ package org.elasticsearch.xpack.esql.session; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesIndexResponse; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse; @@ -20,7 +18,6 @@ import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.mapper.TimeSeriesParams; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.xpack.esql.action.EsqlResolveFieldsAction; import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.core.type.DateEsField; @@ -159,23 +156,8 @@ public IndexResolution mergedMappings(String indexPattern, FieldCapabilitiesResp for (FieldCapabilitiesIndexResponse ir : fieldCapsResponse.getIndexResponses()) { concreteIndices.put(ir.getIndexName(), ir.getIndexMode()); } - Map unavailableRemotes = determineUnavailableRemoteClusters(fieldCapsResponse.getFailures()); - return IndexResolution.valid(new EsIndex(indexPattern, rootFields, concreteIndices), unavailableRemotes); - } - - // visible for testing - static Map determineUnavailableRemoteClusters(List failures) { - Map unavailableRemotes = new HashMap<>(); - for (FieldCapabilitiesFailure failure : failures) { - if (ExceptionsHelper.isRemoteUnavailableException(failure.getException())) { - for (String indexExpression : failure.getIndices()) { - if (indexExpression.indexOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR) > 0) { - unavailableRemotes.put(RemoteClusterAware.parseClusterAlias(indexExpression), failure); - } - } - } - } - return unavailableRemotes; + EsIndex esIndex = new EsIndex(indexPattern, rootFields, concreteIndices); + return IndexResolution.valid(esIndex, EsqlSessionCCSUtils.determineUnavailableRemoteClusters(fieldCapsResponse.getFailures())); } private boolean allNested(List caps) { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java similarity index 66% rename from x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionTests.java rename to x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java index 1f814b841f19d..e60024ecd5db4 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java @@ -7,11 +7,15 @@ package org.elasticsearch.xpack.esql.session; +import org.apache.lucene.index.CorruptIndexException; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure; +import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.common.Strings; import org.elasticsearch.index.IndexMode; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.NoSeedNodeLeftException; +import org.elasticsearch.transport.NoSuchRemoteClusterException; import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; @@ -20,18 +24,20 @@ import org.elasticsearch.xpack.esql.index.IndexResolution; import org.elasticsearch.xpack.esql.type.EsFieldTests; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Predicate; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; -public class EsqlSessionTests extends ESTestCase { +public class EsqlSessionCCSUtilsTests extends ESTestCase { public void testCreateIndexExpressionFromAvailableClusters() { final String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; @@ -45,7 +51,7 @@ public void testCreateIndexExpressionFromAvailableClusters() { executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", true)); - String indexExpr = CcsUtils.createIndexExpressionFromAvailableClusters(executionInfo); + String indexExpr = EsqlSessionCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo); List list = Arrays.stream(Strings.splitStringByCommaToArray(indexExpr)).toList(); assertThat(list.size(), equalTo(5)); assertThat( @@ -69,7 +75,7 @@ public void testCreateIndexExpressionFromAvailableClusters() { ) ); - String indexExpr = CcsUtils.createIndexExpressionFromAvailableClusters(executionInfo); + String indexExpr = EsqlSessionCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo); List list = Arrays.stream(Strings.splitStringByCommaToArray(indexExpr)).toList(); assertThat(list.size(), equalTo(3)); assertThat(new HashSet<>(list), equalTo(Strings.commaDelimitedListToSet("logs*,remote1:*,remote1:foo"))); @@ -93,7 +99,7 @@ public void testCreateIndexExpressionFromAvailableClusters() { ) ); - assertThat(CcsUtils.createIndexExpressionFromAvailableClusters(executionInfo), equalTo("logs*")); + assertThat(EsqlSessionCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo), equalTo("logs*")); } // only remotes present and all marked as skipped, so in revised index expression should be empty string @@ -113,7 +119,7 @@ public void testCreateIndexExpressionFromAvailableClusters() { ) ); - assertThat(CcsUtils.createIndexExpressionFromAvailableClusters(executionInfo), equalTo("")); + assertThat(EsqlSessionCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo), equalTo("")); } } @@ -131,7 +137,7 @@ public void testUpdateExecutionInfoWithUnavailableClusters() { var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect")); var unvailableClusters = Map.of(remote1Alias, failure, remote2Alias, failure); - CcsUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, unvailableClusters); + EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, unvailableClusters); assertThat(executionInfo.clusterAliases(), equalTo(Set.of(localClusterAlias, remote1Alias, remote2Alias))); assertNull(executionInfo.overallTook()); @@ -159,7 +165,7 @@ public void testUpdateExecutionInfoWithUnavailableClusters() { var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect")); RemoteTransportException e = expectThrows( RemoteTransportException.class, - () -> CcsUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, Map.of(remote2Alias, failure)) + () -> EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, Map.of(remote2Alias, failure)) ); assertThat(e.status().getStatus(), equalTo(500)); assertThat( @@ -176,7 +182,7 @@ public void testUpdateExecutionInfoWithUnavailableClusters() { executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", false)); - CcsUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, Map.of()); + EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, Map.of()); assertThat(executionInfo.clusterAliases(), equalTo(Set.of(localClusterAlias, remote1Alias, remote2Alias))); assertNull(executionInfo.overallTook()); @@ -224,7 +230,7 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { ); IndexResolution indexResolution = IndexResolution.valid(esIndex, Map.of()); - CcsUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); + EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias); assertThat(localCluster.getIndexExpression(), equalTo("logs*")); @@ -262,7 +268,7 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { ); IndexResolution indexResolution = IndexResolution.valid(esIndex, Map.of()); - CcsUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); + EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias); assertThat(localCluster.getIndexExpression(), equalTo("logs*")); @@ -298,7 +304,7 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect")); IndexResolution indexResolution = IndexResolution.valid(esIndex, Map.of(remote1Alias, failure)); - CcsUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); + EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias); assertThat(localCluster.getIndexExpression(), equalTo("logs*")); @@ -336,7 +342,63 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect")); IndexResolution indexResolution = IndexResolution.valid(esIndex, Map.of(remote1Alias, failure)); - CcsUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); + EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); + } + } + + public void testDetermineUnavailableRemoteClusters() { + // two clusters, both "remote unavailable" type exceptions + { + List failures = new ArrayList<>(); + failures.add(new FieldCapabilitiesFailure(new String[] { "remote2:mylogs1" }, new NoSuchRemoteClusterException("remote2"))); + failures.add( + new FieldCapabilitiesFailure( + new String[] { "remote1:foo", "remote1:bar" }, + new IllegalStateException("Unable to open any connections") + ) + ); + + Map unavailableClusters = EsqlSessionCCSUtils.determineUnavailableRemoteClusters(failures); + assertThat(unavailableClusters.keySet(), equalTo(Set.of("remote1", "remote2"))); + } + + // one cluster with "remote unavailable" with two failures + { + List failures = new ArrayList<>(); + failures.add(new FieldCapabilitiesFailure(new String[] { "remote2:mylogs1" }, new NoSuchRemoteClusterException("remote2"))); + failures.add(new FieldCapabilitiesFailure(new String[] { "remote2:mylogs1" }, new NoSeedNodeLeftException("no seed node"))); + + Map unavailableClusters = EsqlSessionCCSUtils.determineUnavailableRemoteClusters(failures); + assertThat(unavailableClusters.keySet(), equalTo(Set.of("remote2"))); + } + + // two clusters, one "remote unavailable" type exceptions and one with another type + { + List failures = new ArrayList<>(); + failures.add(new FieldCapabilitiesFailure(new String[] { "remote1:mylogs1" }, new CorruptIndexException("foo", "bar"))); + failures.add( + new FieldCapabilitiesFailure( + new String[] { "remote2:foo", "remote2:bar" }, + new IllegalStateException("Unable to open any connections") + ) + ); + Map unavailableClusters = EsqlSessionCCSUtils.determineUnavailableRemoteClusters(failures); + assertThat(unavailableClusters.keySet(), equalTo(Set.of("remote2"))); + } + + // one cluster1 with exception not known to indicate "remote unavailable" + { + List failures = new ArrayList<>(); + failures.add(new FieldCapabilitiesFailure(new String[] { "remote1:mylogs1" }, new RuntimeException("foo"))); + Map unavailableClusters = EsqlSessionCCSUtils.determineUnavailableRemoteClusters(failures); + assertThat(unavailableClusters.keySet(), equalTo(Set.of())); + } + + // empty failures list + { + List failures = new ArrayList<>(); + Map unavailableClusters = EsqlSessionCCSUtils.determineUnavailableRemoteClusters(failures); + assertThat(unavailableClusters.keySet(), equalTo(Set.of())); } } @@ -358,7 +420,7 @@ public void testUpdateExecutionInfoAtEndOfPlanning() { Thread.sleep(1); } catch (InterruptedException e) {} - CcsUtils.updateExecutionInfoAtEndOfPlanning(executionInfo); + EsqlSessionCCSUtils.updateExecutionInfoAtEndOfPlanning(executionInfo); assertThat(executionInfo.planningTookTime().millis(), greaterThanOrEqualTo(0L)); assertNull(executionInfo.overallTook()); @@ -410,4 +472,111 @@ private static Map randomMapping() { } return result; } + + public void testReturnSuccessWithEmptyResult() { + String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; + String remote1Alias = "remote1"; + String remote2Alias = "remote2"; + String remote3Alias = "remote3"; + NoClustersToSearchException noClustersException = new NoClustersToSearchException(); + Predicate skipUnPredicate = s -> { + if (s.equals("remote2") || s.equals("remote3")) { + return true; + } + return false; + }; + + EsqlExecutionInfo.Cluster localCluster = new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false); + EsqlExecutionInfo.Cluster remote1 = new EsqlExecutionInfo.Cluster(remote1Alias, "logs*", false); + EsqlExecutionInfo.Cluster remote2 = new EsqlExecutionInfo.Cluster(remote2Alias, "logs*", true); + EsqlExecutionInfo.Cluster remote3 = new EsqlExecutionInfo.Cluster(remote3Alias, "logs*", true); + + // not a cross-cluster cluster search, so do not return empty result + { + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, randomBoolean()); + executionInfo.swapCluster(localClusterAlias, (k, v) -> localCluster); + assertFalse(EsqlSessionCCSUtils.returnSuccessWithEmptyResult(executionInfo, noClustersException)); + } + + // local cluster is present, so do not return empty result + { + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, randomBoolean()); + executionInfo.swapCluster(localClusterAlias, (k, v) -> localCluster); + executionInfo.swapCluster(remote1Alias, (k, v) -> remote1); + // TODO: this logic will be added in the follow-on PR that handles missing indices + // assertFalse(EsqlSessionCCSUtils.returnSuccessWithEmptyResult(executionInfo, noClustersException)); + } + + // remote-only, one cluster is skip_unavailable=false, so do not return empty result + { + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, randomBoolean()); + executionInfo.swapCluster(remote1Alias, (k, v) -> remote1); + executionInfo.swapCluster(remote2Alias, (k, v) -> remote2); + assertFalse(EsqlSessionCCSUtils.returnSuccessWithEmptyResult(executionInfo, noClustersException)); + } + + // remote-only, all clusters are skip_unavailable=true, so should return empty result with + // NoSuchClustersException or "remote unavailable" type exception + { + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, randomBoolean()); + executionInfo.swapCluster(remote2Alias, (k, v) -> remote2); + executionInfo.swapCluster(remote3Alias, (k, v) -> remote3); + Exception e = randomFrom( + new NoSuchRemoteClusterException("foo"), + noClustersException, + new NoSeedNodeLeftException("foo"), + new IllegalStateException("unknown host") + ); + assertTrue(EsqlSessionCCSUtils.returnSuccessWithEmptyResult(executionInfo, e)); + } + + // remote-only, all clusters are skip_unavailable=true, but exception is not "remote unavailable" so return false + // Note: this functionality may change in follow-on PRs, so remove this test in that case + { + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, randomBoolean()); + executionInfo.swapCluster(remote2Alias, (k, v) -> remote2); + executionInfo.swapCluster(remote3Alias, (k, v) -> remote3); + assertFalse(EsqlSessionCCSUtils.returnSuccessWithEmptyResult(executionInfo, new NullPointerException())); + } + } + + public void testUpdateExecutionInfoToReturnEmptyResult() { + String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; + String remote1Alias = "remote1"; + String remote2Alias = "remote2"; + String remote3Alias = "remote3"; + ConnectTransportException transportEx = new ConnectTransportException(null, "foo"); + Predicate skipUnPredicate = s -> { + if (s.startsWith("remote")) { + return true; + } + return false; + }; + + EsqlExecutionInfo.Cluster localCluster = new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false); + EsqlExecutionInfo.Cluster remote1 = new EsqlExecutionInfo.Cluster(remote1Alias, "logs*", true); + EsqlExecutionInfo.Cluster remote2 = new EsqlExecutionInfo.Cluster(remote2Alias, "logs*", true); + EsqlExecutionInfo.Cluster remote3 = new EsqlExecutionInfo.Cluster(remote3Alias, "logs*", true); + + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, randomBoolean()); + executionInfo.swapCluster(localCluster.getClusterAlias(), (k, v) -> localCluster); + executionInfo.swapCluster(remote1.getClusterAlias(), (k, v) -> remote1); + executionInfo.swapCluster(remote2.getClusterAlias(), (k, v) -> remote2); + executionInfo.swapCluster(remote3.getClusterAlias(), (k, v) -> remote3); + + assertNull(executionInfo.overallTook()); + + EsqlSessionCCSUtils.updateExecutionInfoToReturnEmptyResult(executionInfo, transportEx); + + assertNotNull(executionInfo.overallTook()); + assertThat(executionInfo.getCluster(localClusterAlias).getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(executionInfo.getCluster(localClusterAlias).getFailures().size(), equalTo(0)); + + for (String remoteAlias : Set.of(remote1Alias, remote2Alias, remote3Alias)) { + assertThat(executionInfo.getCluster(remoteAlias).getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + List remoteFailures = executionInfo.getCluster(remoteAlias).getFailures(); + assertThat(remoteFailures.size(), equalTo(1)); + assertThat(remoteFailures.get(0).reason(), containsString("unable to connect to remote cluster")); + } + } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/IndexResolverTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/IndexResolverTests.java deleted file mode 100644 index d6e410305afaa..0000000000000 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/IndexResolverTests.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.esql.session; - -import org.apache.lucene.index.CorruptIndexException; -import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.transport.NoSeedNodeLeftException; -import org.elasticsearch.transport.NoSuchRemoteClusterException; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import static org.hamcrest.Matchers.equalTo; - -public class IndexResolverTests extends ESTestCase { - - public void testDetermineUnavailableRemoteClusters() { - // two clusters, both "remote unavailable" type exceptions - { - List failures = new ArrayList<>(); - failures.add(new FieldCapabilitiesFailure(new String[] { "remote2:mylogs1" }, new NoSuchRemoteClusterException("remote2"))); - failures.add( - new FieldCapabilitiesFailure( - new String[] { "remote1:foo", "remote1:bar" }, - new IllegalStateException("Unable to open any connections") - ) - ); - - Map unavailableClusters = IndexResolver.determineUnavailableRemoteClusters(failures); - assertThat(unavailableClusters.keySet(), equalTo(Set.of("remote1", "remote2"))); - } - - // one cluster with "remote unavailable" with two failures - { - List failures = new ArrayList<>(); - failures.add(new FieldCapabilitiesFailure(new String[] { "remote2:mylogs1" }, new NoSuchRemoteClusterException("remote2"))); - failures.add(new FieldCapabilitiesFailure(new String[] { "remote2:mylogs1" }, new NoSeedNodeLeftException("no seed node"))); - - Map unavailableClusters = IndexResolver.determineUnavailableRemoteClusters(failures); - assertThat(unavailableClusters.keySet(), equalTo(Set.of("remote2"))); - } - - // two clusters, one "remote unavailable" type exceptions and one with another type - { - List failures = new ArrayList<>(); - failures.add(new FieldCapabilitiesFailure(new String[] { "remote1:mylogs1" }, new CorruptIndexException("foo", "bar"))); - failures.add( - new FieldCapabilitiesFailure( - new String[] { "remote2:foo", "remote2:bar" }, - new IllegalStateException("Unable to open any connections") - ) - ); - Map unavailableClusters = IndexResolver.determineUnavailableRemoteClusters(failures); - assertThat(unavailableClusters.keySet(), equalTo(Set.of("remote2"))); - } - - // one cluster1 with exception not known to indicate "remote unavailable" - { - List failures = new ArrayList<>(); - failures.add(new FieldCapabilitiesFailure(new String[] { "remote1:mylogs1" }, new RuntimeException("foo"))); - Map unavailableClusters = IndexResolver.determineUnavailableRemoteClusters(failures); - assertThat(unavailableClusters.keySet(), equalTo(Set.of())); - } - - // empty failures list - { - List failures = new ArrayList<>(); - Map unavailableClusters = IndexResolver.determineUnavailableRemoteClusters(failures); - assertThat(unavailableClusters.keySet(), equalTo(Set.of())); - } - } -}