diff --git a/docs/changelog/115266.yaml b/docs/changelog/115266.yaml new file mode 100644 index 0000000000000..1d7fb1368c0e8 --- /dev/null +++ b/docs/changelog/115266.yaml @@ -0,0 +1,6 @@ +pr: 115266 +summary: ES|QL CCS uses `skip_unavailable` setting for handling disconnected remote + clusters +area: ES|QL +type: enhancement +issues: [ 114531 ] diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 7bf3204b7e1a6..ea3e649de9ef8 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -185,6 +185,7 @@ static TransportVersion def(int id) { public static final TransportVersion INDEX_REQUEST_REMOVE_METERING = def(8_780_00_0); public static final TransportVersion CPU_STAT_STRING_PARSING = def(8_781_00_0); public static final TransportVersion QUERY_RULES_RETRIEVER = def(8_782_00_0); + public static final TransportVersion ESQL_CCS_EXEC_INFO_WITH_FAILURES = def(8_783_00_0); /* * STOP! READ THIS FIRST! No, really, diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichUnavailableClustersIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichUnavailableClustersIT.java new file mode 100644 index 0000000000000..d142752d0c408 --- /dev/null +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichUnavailableClustersIT.java @@ -0,0 +1,690 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.action; + +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.ingest.common.IngestCommonPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.reindex.ReindexPlugin; +import org.elasticsearch.test.AbstractMultiClustersTestCase; +import org.elasticsearch.transport.RemoteClusterAware; +import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction; +import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction; +import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.plan.logical.Enrich; +import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; +import org.junit.Before; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList; +import static org.elasticsearch.xpack.esql.action.CrossClustersEnrichIT.enrichHosts; +import static org.elasticsearch.xpack.esql.action.CrossClustersEnrichIT.enrichVendors; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +/** + * This IT test is the dual of CrossClustersEnrichIT, which tests "happy path" + * and this one tests unavailable cluster scenarios using (most of) the same tests. + */ +public class CrossClusterEnrichUnavailableClustersIT extends AbstractMultiClustersTestCase { + + public static String REMOTE_CLUSTER_1 = "c1"; + public static String REMOTE_CLUSTER_2 = "c2"; + + @Override + protected Collection remoteClusterAlias() { + return List.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2); + } + + @Override + protected boolean reuseClusters() { + return false; + } + + private Collection allClusters() { + return CollectionUtils.appendToCopy(remoteClusterAlias(), LOCAL_CLUSTER); + } + + @Override + protected Collection> nodePlugins(String clusterAlias) { + List> plugins = new ArrayList<>(super.nodePlugins(clusterAlias)); + plugins.add(EsqlPlugin.class); + plugins.add(CrossClustersEnrichIT.LocalStateEnrich.class); + plugins.add(IngestCommonPlugin.class); + plugins.add(ReindexPlugin.class); + return plugins; + } + + @Override + protected Settings nodeSettings() { + return Settings.builder().put(super.nodeSettings()).put(XPackSettings.SECURITY_ENABLED.getKey(), false).build(); + } + + @Before + public void setupHostsEnrich() { + // the hosts policy are identical on every node + Map allHosts = Map.of( + "192.168.1.2", + "Windows", + "192.168.1.3", + "MacOS", + "192.168.1.4", + "Linux", + "192.168.1.5", + "Android", + "192.168.1.6", + "iOS", + "192.168.1.7", + "Windows", + "192.168.1.8", + "MacOS", + "192.168.1.9", + "Linux", + "192.168.1.10", + "Linux", + "192.168.1.11", + "Windows" + ); + for (String cluster : allClusters()) { + Client client = client(cluster); + client.admin().indices().prepareCreate("hosts").setMapping("ip", "type=ip", "os", "type=keyword").get(); + for (Map.Entry h : allHosts.entrySet()) { + client.prepareIndex("hosts").setSource("ip", h.getKey(), "os", h.getValue()).get(); + } + client.admin().indices().prepareRefresh("hosts").get(); + client.execute( + PutEnrichPolicyAction.INSTANCE, + new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "hosts", CrossClustersEnrichIT.hostPolicy) + ).actionGet(); + client.execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "hosts")) + .actionGet(); + assertAcked(client.admin().indices().prepareDelete("hosts")); + } + } + + @Before + public void setupVendorPolicy() { + var localVendors = Map.of("Windows", "Microsoft", "MacOS", "Apple", "iOS", "Apple", "Android", "Samsung", "Linux", "Redhat"); + var c1Vendors = Map.of("Windows", "Microsoft", "MacOS", "Apple", "iOS", "Apple", "Android", "Google", "Linux", "Suse"); + var c2Vendors = Map.of("Windows", "Microsoft", "MacOS", "Apple", "iOS", "Apple", "Android", "Sony", "Linux", "Ubuntu"); + var vendors = Map.of(LOCAL_CLUSTER, localVendors, "c1", c1Vendors, "c2", c2Vendors); + for (Map.Entry> e : vendors.entrySet()) { + Client client = client(e.getKey()); + client.admin().indices().prepareCreate("vendors").setMapping("os", "type=keyword", "vendor", "type=keyword").get(); + for (Map.Entry v : e.getValue().entrySet()) { + client.prepareIndex("vendors").setSource("os", v.getKey(), "vendor", v.getValue()).get(); + } + client.admin().indices().prepareRefresh("vendors").get(); + client.execute( + PutEnrichPolicyAction.INSTANCE, + new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "vendors", CrossClustersEnrichIT.vendorPolicy) + ).actionGet(); + client.execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "vendors")) + .actionGet(); + assertAcked(client.admin().indices().prepareDelete("vendors")); + } + } + + @Before + public void setupEventsIndices() { + record Event(long timestamp, String user, String host) {} + + List e0 = List.of( + new Event(1, "matthew", "192.168.1.3"), + new Event(2, "simon", "192.168.1.5"), + new Event(3, "park", "192.168.1.2"), + new Event(4, "andrew", "192.168.1.7"), + new Event(5, "simon", "192.168.1.20"), + new Event(6, "kevin", "192.168.1.2"), + new Event(7, "akio", "192.168.1.5"), + new Event(8, "luke", "192.168.1.2"), + new Event(9, "jack", "192.168.1.4") + ); + List e1 = List.of( + new Event(1, "andres", "192.168.1.2"), + new Event(2, "sergio", "192.168.1.6"), + new Event(3, "kylian", "192.168.1.8"), + new Event(4, "andrew", "192.168.1.9"), + new Event(5, "jack", "192.168.1.3"), + new Event(6, "kevin", "192.168.1.4"), + new Event(7, "akio", "192.168.1.7"), + new Event(8, "kevin", "192.168.1.21"), + new Event(9, "andres", "192.168.1.8") + ); + List e2 = List.of( + new Event(1, "park", "192.168.1.25"), + new Event(2, "akio", "192.168.1.5"), + new Event(3, "park", "192.168.1.2"), + new Event(4, "kevin", "192.168.1.3") + ); + for (var c : Map.of(LOCAL_CLUSTER, e0, "c1", e1, "c2", e2).entrySet()) { + Client client = client(c.getKey()); + client.admin() + .indices() + .prepareCreate("events") + .setMapping("timestamp", "type=long", "user", "type=keyword", "host", "type=ip") + .get(); + for (var e : c.getValue()) { + client.prepareIndex("events").setSource("timestamp", e.timestamp, "user", e.user, "host", e.host).get(); + } + client.admin().indices().prepareRefresh("events").get(); + } + } + + public void testEnrichWithHostsPolicyAndDisconnectedRemotesWithSkipUnavailableTrue() throws IOException { + setSkipUnavailable(REMOTE_CLUSTER_1, true); + setSkipUnavailable(REMOTE_CLUSTER_2, true); + + try { + // close remote-cluster-1 so that it is unavailable + cluster(REMOTE_CLUSTER_1).close(); + + Tuple includeCCSMetadata = CrossClustersEnrichIT.randomIncludeCCSMetadata(); + Boolean requestIncludeMeta = includeCCSMetadata.v1(); + boolean responseExpectMeta = includeCCSMetadata.v2(); + + { + Enrich.Mode mode = randomFrom(Enrich.Mode.values()); + String query = "FROM *:events | eval ip= TO_STR(host) | " + enrichHosts(mode) + " | stats c = COUNT(*) by os | SORT os"; + try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) { + List> rows = getValuesList(resp); + assertThat(rows.size(), greaterThanOrEqualTo(1)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertCCSExecutionInfoDetails(executionInfo); + + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2))); + + EsqlExecutionInfo.Cluster cluster1 = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertThat(cluster1.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(cluster1.getTotalShards(), equalTo(0)); + assertThat(cluster1.getSuccessfulShards(), equalTo(0)); + assertThat(cluster1.getSkippedShards(), equalTo(0)); + assertThat(cluster1.getFailedShards(), equalTo(0)); + + EsqlExecutionInfo.Cluster cluster2 = executionInfo.getCluster(REMOTE_CLUSTER_2); + assertThat(cluster2.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(cluster2.getTotalShards(), greaterThanOrEqualTo(0)); + assertThat(cluster2.getSuccessfulShards(), equalTo(cluster2.getSuccessfulShards())); + assertThat(cluster2.getSkippedShards(), equalTo(0)); + assertThat(cluster2.getFailedShards(), equalTo(0)); + } + } + + // close remote-cluster-2 so that it is also unavailable + cluster(REMOTE_CLUSTER_2).close(); + + { + Enrich.Mode mode = randomFrom(Enrich.Mode.values()); + String query = "FROM *:events | eval ip= TO_STR(host) | " + enrichHosts(mode) + " | stats c = COUNT(*) by os | SORT os"; + try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) { + List columns = resp.columns(); + assertThat(columns.size(), equalTo(1)); + // column from an empty result should be {"name":"","type":"null"} + assertThat(columns.get(0).name(), equalTo("")); + assertThat(columns.get(0).type(), equalTo(DataType.NULL)); + + List> rows = getValuesList(resp); + assertThat(rows.size(), equalTo(0)); + + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertCCSExecutionInfoDetails(executionInfo); + + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2))); + + EsqlExecutionInfo.Cluster cluster1 = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertThat(cluster1.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(cluster1.getTotalShards(), equalTo(0)); + assertThat(cluster1.getSuccessfulShards(), equalTo(0)); + assertThat(cluster1.getSkippedShards(), equalTo(0)); + assertThat(cluster1.getFailedShards(), equalTo(0)); + + EsqlExecutionInfo.Cluster cluster2 = executionInfo.getCluster(REMOTE_CLUSTER_2); + assertThat(cluster2.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(cluster2.getTotalShards(), equalTo(0)); + assertThat(cluster2.getSuccessfulShards(), equalTo(0)); + assertThat(cluster2.getSkippedShards(), equalTo(0)); + assertThat(cluster2.getFailedShards(), equalTo(0)); + } + } + } finally { + clearSkipUnavailable(); + } + } + + public void testEnrichWithHostsPolicyAndDisconnectedRemotesWithSkipUnavailableFalse() throws IOException { + setSkipUnavailable(REMOTE_CLUSTER_1, true); + setSkipUnavailable(REMOTE_CLUSTER_2, false); + + try { + // close remote-cluster-1 so that it is unavailable + cluster(REMOTE_CLUSTER_1).close(); + + Tuple includeCCSMetadata = CrossClustersEnrichIT.randomIncludeCCSMetadata(); + Boolean requestIncludeMeta = includeCCSMetadata.v1(); + boolean responseExpectMeta = includeCCSMetadata.v2(); + + { + Enrich.Mode mode = randomFrom(Enrich.Mode.values()); + String query = "FROM *:events | EVAL ip= TO_STR(host) | " + enrichHosts(mode) + " | STATS c = COUNT(*) by os | SORT os"; + try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) { + List> rows = getValuesList(resp); + assertThat(rows.size(), greaterThanOrEqualTo(1)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertCCSExecutionInfoDetails(executionInfo); + + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2))); + + EsqlExecutionInfo.Cluster cluster1 = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertThat(cluster1.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(cluster1.getTotalShards(), equalTo(0)); + assertThat(cluster1.getSuccessfulShards(), equalTo(0)); + assertThat(cluster1.getSkippedShards(), equalTo(0)); + assertThat(cluster1.getFailedShards(), equalTo(0)); + + EsqlExecutionInfo.Cluster cluster2 = executionInfo.getCluster(REMOTE_CLUSTER_2); + assertThat(cluster2.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(cluster2.getTotalShards(), greaterThanOrEqualTo(0)); + assertThat(cluster2.getSuccessfulShards(), equalTo(cluster2.getSuccessfulShards())); + assertThat(cluster2.getSkippedShards(), equalTo(0)); + assertThat(cluster2.getFailedShards(), equalTo(0)); + } + } + + // close remote-cluster-2 so that it is also unavailable + cluster(REMOTE_CLUSTER_2).close(); + { + Enrich.Mode mode = randomFrom(Enrich.Mode.values()); + String query = "FROM *:events | eval ip= TO_STR(host) | " + enrichHosts(mode) + " | stats c = COUNT(*) by os | SORT os"; + Exception exception = expectThrows(Exception.class, () -> runQuery(query, requestIncludeMeta)); + assertTrue(ExceptionsHelper.isRemoteUnavailableException(exception)); + } + } finally { + clearSkipUnavailable(); + } + } + + public void testEnrichTwiceThenAggsWithUnavailableRemotes() throws IOException { + Tuple includeCCSMetadata = CrossClustersEnrichIT.randomIncludeCCSMetadata(); + Boolean requestIncludeMeta = includeCCSMetadata.v1(); + boolean responseExpectMeta = includeCCSMetadata.v2(); + + boolean skipUnavailableRemote1 = randomBoolean(); + setSkipUnavailable(REMOTE_CLUSTER_1, skipUnavailableRemote1); + setSkipUnavailable(REMOTE_CLUSTER_2, true); + + try { + // close remote-cluster-2 so that it is unavailable + cluster(REMOTE_CLUSTER_2).close(); + + for (var hostMode : Enrich.Mode.values()) { + String query = String.format(Locale.ROOT, """ + FROM *:events,events + | eval ip= TO_STR(host) + | %s + | %s + | stats c = COUNT(*) by vendor + | sort vendor + """, enrichHosts(hostMode), enrichVendors(Enrich.Mode.COORDINATOR)); + try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) { + assertThat(getValuesList(resp).size(), greaterThanOrEqualTo(1)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + assertThat(executionInfo.clusterAliases(), equalTo(Set.of("", REMOTE_CLUSTER_1, REMOTE_CLUSTER_2))); + assertCCSExecutionInfoDetails(executionInfo); + + EsqlExecutionInfo.Cluster cluster1 = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertThat(cluster1.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(cluster1.getTotalShards(), greaterThanOrEqualTo(0)); + assertThat(cluster1.getSuccessfulShards(), equalTo(cluster1.getSuccessfulShards())); + assertThat(cluster1.getSkippedShards(), equalTo(0)); + assertThat(cluster1.getFailedShards(), equalTo(0)); + + EsqlExecutionInfo.Cluster cluster2 = executionInfo.getCluster(REMOTE_CLUSTER_2); + assertThat(cluster2.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(cluster2.getTotalShards(), equalTo(0)); + assertThat(cluster2.getSuccessfulShards(), equalTo(0)); + assertThat(cluster2.getSkippedShards(), equalTo(0)); + assertThat(cluster2.getFailedShards(), equalTo(0)); + + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); + assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(localCluster.getTotalShards(), greaterThan(0)); + assertThat(localCluster.getSuccessfulShards(), equalTo(localCluster.getTotalShards())); + assertThat(localCluster.getSkippedShards(), equalTo(0)); + assertThat(localCluster.getFailedShards(), equalTo(0)); + } + } + + // close remote-cluster-1 so that it is also unavailable + cluster(REMOTE_CLUSTER_1).close(); + + for (var hostMode : Enrich.Mode.values()) { + String query = String.format(Locale.ROOT, """ + FROM *:events,events + | eval ip= TO_STR(host) + | %s + | %s + | stats c = COUNT(*) by vendor + | sort vendor + """, enrichHosts(hostMode), enrichVendors(Enrich.Mode.COORDINATOR)); + if (skipUnavailableRemote1 == false) { + Exception exception = expectThrows(Exception.class, () -> runQuery(query, requestIncludeMeta)); + assertTrue(ExceptionsHelper.isRemoteUnavailableException(exception)); + } else { + try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) { + assertThat(getValuesList(resp).size(), greaterThanOrEqualTo(1)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + assertThat(executionInfo.clusterAliases(), equalTo(Set.of("", REMOTE_CLUSTER_1, REMOTE_CLUSTER_2))); + assertCCSExecutionInfoDetails(executionInfo); + + EsqlExecutionInfo.Cluster cluster1 = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertThat(cluster1.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(cluster1.getTotalShards(), equalTo(0)); + assertThat(cluster1.getSuccessfulShards(), equalTo(0)); + assertThat(cluster1.getSkippedShards(), equalTo(0)); + assertThat(cluster1.getFailedShards(), equalTo(0)); + assertThat(cluster1.getTook().millis(), greaterThanOrEqualTo(0L)); + + EsqlExecutionInfo.Cluster cluster2 = executionInfo.getCluster(REMOTE_CLUSTER_2); + assertThat(cluster2.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(cluster2.getTotalShards(), equalTo(0)); + assertThat(cluster2.getSuccessfulShards(), equalTo(0)); + assertThat(cluster2.getSkippedShards(), equalTo(0)); + assertThat(cluster2.getFailedShards(), equalTo(0)); + + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); + assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(localCluster.getTotalShards(), greaterThan(0)); + assertThat(localCluster.getSuccessfulShards(), equalTo(localCluster.getTotalShards())); + assertThat(localCluster.getSkippedShards(), equalTo(0)); + assertThat(localCluster.getFailedShards(), equalTo(0)); + } + } + } + } finally { + clearSkipUnavailable(); + } + } + + public void testEnrichCoordinatorThenAnyWithSingleUnavailableRemoteAndLocal() throws IOException { + Tuple includeCCSMetadata = CrossClustersEnrichIT.randomIncludeCCSMetadata(); + Boolean requestIncludeMeta = includeCCSMetadata.v1(); + boolean responseExpectMeta = includeCCSMetadata.v2(); + + boolean skipUnavailableRemote1 = randomBoolean(); + setSkipUnavailable(REMOTE_CLUSTER_1, skipUnavailableRemote1); + + try { + // close remote-cluster-1 so that it is unavailable + cluster(REMOTE_CLUSTER_1).close(); + String query = String.format(Locale.ROOT, """ + FROM %s:events,events + | eval ip= TO_STR(host) + | %s + | %s + | stats c = COUNT(*) by vendor + | sort vendor + """, REMOTE_CLUSTER_1, enrichHosts(Enrich.Mode.COORDINATOR), enrichVendors(Enrich.Mode.ANY)); + if (skipUnavailableRemote1 == false) { + Exception exception = expectThrows(Exception.class, () -> runQuery(query, requestIncludeMeta)); + assertTrue(ExceptionsHelper.isRemoteUnavailableException(exception)); + } else { + try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) { + assertThat(getValuesList(resp).size(), greaterThanOrEqualTo(1)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + assertThat( + executionInfo.clusterAliases(), + equalTo(Set.of(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, REMOTE_CLUSTER_1)) + ); + assertCCSExecutionInfoDetails(executionInfo); + + EsqlExecutionInfo.Cluster cluster1 = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertThat(cluster1.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(cluster1.getTotalShards(), equalTo(0)); + assertThat(cluster1.getSuccessfulShards(), equalTo(0)); + assertThat(cluster1.getSkippedShards(), equalTo(0)); + assertThat(cluster1.getFailedShards(), equalTo(0)); + + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); + assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(localCluster.getTotalShards(), greaterThan(0)); + assertThat(localCluster.getSuccessfulShards(), equalTo(localCluster.getTotalShards())); + assertThat(localCluster.getSkippedShards(), equalTo(0)); + assertThat(localCluster.getFailedShards(), equalTo(0)); + } + } + } finally { + clearSkipUnavailable(); + } + } + + public void testEnrichCoordinatorThenAnyWithSingleUnavailableRemoteAndNotLocal() throws IOException { + Tuple includeCCSMetadata = CrossClustersEnrichIT.randomIncludeCCSMetadata(); + Boolean requestIncludeMeta = includeCCSMetadata.v1(); + boolean responseExpectMeta = includeCCSMetadata.v2(); + + boolean skipUnavailableRemote1 = randomBoolean(); + setSkipUnavailable(REMOTE_CLUSTER_1, skipUnavailableRemote1); + + try { + // close remote-cluster-1 so that it is unavailable + cluster(REMOTE_CLUSTER_1).close(); + String query = String.format(Locale.ROOT, """ + FROM %s:events + | eval ip= TO_STR(host) + | %s + | %s + | stats c = COUNT(*) by vendor + | sort vendor + """, REMOTE_CLUSTER_1, enrichHosts(Enrich.Mode.COORDINATOR), enrichVendors(Enrich.Mode.ANY)); + if (skipUnavailableRemote1 == false) { + Exception exception = expectThrows(Exception.class, () -> runQuery(query, requestIncludeMeta)); + assertTrue(ExceptionsHelper.isRemoteUnavailableException(exception)); + } else { + try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) { + List columns = resp.columns(); + assertThat(columns.size(), equalTo(1)); + // column from an empty result should be {"name":"","type":"null"} + assertThat(columns.get(0).name(), equalTo("")); + assertThat(columns.get(0).type(), equalTo(DataType.NULL)); + + assertThat(getValuesList(resp).size(), equalTo(0)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1))); + assertCCSExecutionInfoDetails(executionInfo); + + EsqlExecutionInfo.Cluster cluster1 = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertThat(cluster1.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(cluster1.getTotalShards(), equalTo(0)); + assertThat(cluster1.getSuccessfulShards(), equalTo(0)); + assertThat(cluster1.getSkippedShards(), equalTo(0)); + assertThat(cluster1.getFailedShards(), equalTo(0)); + } + } + } finally { + clearSkipUnavailable(); + } + } + + public void testEnrichRemoteWithVendor() throws IOException { + Tuple includeCCSMetadata = CrossClustersEnrichIT.randomIncludeCCSMetadata(); + Boolean requestIncludeMeta = includeCCSMetadata.v1(); + boolean responseExpectMeta = includeCCSMetadata.v2(); + + boolean skipUnavailableRemote2 = randomBoolean(); + setSkipUnavailable(REMOTE_CLUSTER_1, true); + setSkipUnavailable(REMOTE_CLUSTER_2, skipUnavailableRemote2); + + try { + // close remote-cluster-1 so that it is unavailable + cluster(REMOTE_CLUSTER_1).close(); + + for (Enrich.Mode hostMode : List.of(Enrich.Mode.ANY, Enrich.Mode.REMOTE)) { + var query = String.format(Locale.ROOT, """ + FROM *:events,events + | eval ip= TO_STR(host) + | %s + | %s + | stats c = COUNT(*) by vendor + | sort vendor + """, enrichHosts(hostMode), enrichVendors(Enrich.Mode.REMOTE)); + try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) { + assertThat(getValuesList(resp).size(), greaterThan(0)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + assertThat( + executionInfo.clusterAliases(), + equalTo(Set.of(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, REMOTE_CLUSTER_1, REMOTE_CLUSTER_2)) + ); + assertCCSExecutionInfoDetails(executionInfo); + + EsqlExecutionInfo.Cluster cluster1 = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertThat(cluster1.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(cluster1.getTotalShards(), equalTo(0)); + assertThat(cluster1.getSuccessfulShards(), equalTo(0)); + assertThat(cluster1.getSkippedShards(), equalTo(0)); + assertThat(cluster1.getFailedShards(), equalTo(0)); + assertThat(cluster1.getTook().millis(), greaterThanOrEqualTo(0L)); + + EsqlExecutionInfo.Cluster cluster2 = executionInfo.getCluster(REMOTE_CLUSTER_2); + assertThat(cluster2.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(cluster2.getTotalShards(), greaterThan(0)); + assertThat(cluster2.getSuccessfulShards(), equalTo(cluster2.getSuccessfulShards())); + assertThat(cluster2.getSkippedShards(), equalTo(0)); + assertThat(cluster2.getFailedShards(), equalTo(0)); + + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); + assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(localCluster.getTotalShards(), greaterThan(0)); + assertThat(localCluster.getSuccessfulShards(), equalTo(localCluster.getTotalShards())); + assertThat(localCluster.getSkippedShards(), equalTo(0)); + assertThat(localCluster.getFailedShards(), equalTo(0)); + } + } + + // close remote-cluster-2 so that it is also unavailable + cluster(REMOTE_CLUSTER_2).close(); + + for (Enrich.Mode hostMode : List.of(Enrich.Mode.ANY, Enrich.Mode.REMOTE)) { + var query = String.format(Locale.ROOT, """ + FROM *:events,events + | eval ip= TO_STR(host) + | %s + | %s + | stats c = COUNT(*) by vendor + | sort vendor + """, enrichHosts(hostMode), enrichVendors(Enrich.Mode.REMOTE)); + if (skipUnavailableRemote2 == false) { + Exception exception = expectThrows(Exception.class, () -> runQuery(query, requestIncludeMeta)); + assertTrue(ExceptionsHelper.isRemoteUnavailableException(exception)); + } else { + + try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) { + assertThat(getValuesList(resp).size(), greaterThan(0)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + assertThat( + executionInfo.clusterAliases(), + equalTo(Set.of(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, REMOTE_CLUSTER_1, REMOTE_CLUSTER_2)) + ); + assertCCSExecutionInfoDetails(executionInfo); + + EsqlExecutionInfo.Cluster cluster1 = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertThat(cluster1.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(cluster1.getTotalShards(), equalTo(0)); + assertThat(cluster1.getSuccessfulShards(), equalTo(0)); + assertThat(cluster1.getSkippedShards(), equalTo(0)); + assertThat(cluster1.getFailedShards(), equalTo(0)); + assertThat(cluster1.getTook().millis(), greaterThanOrEqualTo(0L)); + + EsqlExecutionInfo.Cluster cluster2 = executionInfo.getCluster(REMOTE_CLUSTER_2); + assertThat(cluster2.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(cluster2.getTotalShards(), equalTo(0)); + assertThat(cluster2.getSuccessfulShards(), equalTo(0)); + assertThat(cluster2.getSkippedShards(), equalTo(0)); + assertThat(cluster2.getFailedShards(), equalTo(0)); + + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); + assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(localCluster.getTotalShards(), greaterThan(0)); + assertThat(localCluster.getSuccessfulShards(), equalTo(localCluster.getTotalShards())); + assertThat(localCluster.getSkippedShards(), equalTo(0)); + assertThat(localCluster.getFailedShards(), equalTo(0)); + } + } + } + } finally { + clearSkipUnavailable(); + } + } + + protected EsqlQueryResponse runQuery(String query, Boolean ccsMetadataInResponse) { + EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest(); + request.query(query); + request.pragmas(AbstractEsqlIntegTestCase.randomPragmas()); + if (randomBoolean()) { + request.profile(true); + } + if (ccsMetadataInResponse != null) { + request.includeCCSMetadata(ccsMetadataInResponse); + } + return client(LOCAL_CLUSTER).execute(EsqlQueryAction.INSTANCE, request).actionGet(30, TimeUnit.SECONDS); + } + + private static void assertCCSExecutionInfoDetails(EsqlExecutionInfo executionInfo) { + assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L)); + assertTrue(executionInfo.isCrossClusterSearch()); + + for (String clusterAlias : executionInfo.clusterAliases()) { + EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(clusterAlias); + assertThat(cluster.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(cluster.getTook().millis(), lessThanOrEqualTo(executionInfo.overallTook().millis())); + } + } + + private void setSkipUnavailable(String clusterAlias, boolean skip) { + client(LOCAL_CLUSTER).admin() + .cluster() + .prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT) + .setPersistentSettings(Settings.builder().put("cluster.remote." + clusterAlias + ".skip_unavailable", skip).build()) + .get(); + } + + private void clearSkipUnavailable() { + Settings.Builder settingsBuilder = Settings.builder() + .putNull("cluster.remote." + REMOTE_CLUSTER_1 + ".skip_unavailable") + .putNull("cluster.remote." + REMOTE_CLUSTER_2 + ".skip_unavailable"); + client(LOCAL_CLUSTER).admin() + .cluster() + .prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT) + .setPersistentSettings(settingsBuilder.build()) + .get(); + } +} diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryUnavailableRemotesIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryUnavailableRemotesIT.java new file mode 100644 index 0000000000000..0f1aa8541fdd9 --- /dev/null +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryUnavailableRemotesIT.java @@ -0,0 +1,525 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.action; + +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.compute.operator.exchange.ExchangeService; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.AbstractMultiClustersTestCase; +import org.elasticsearch.test.XContentTestUtils; +import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +public class CrossClusterQueryUnavailableRemotesIT extends AbstractMultiClustersTestCase { + private static final String REMOTE_CLUSTER_1 = "cluster-a"; + private static final String REMOTE_CLUSTER_2 = "cluster-b"; + + @Override + protected Collection remoteClusterAlias() { + return List.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2); + } + + @Override + protected boolean reuseClusters() { + return false; + } + + @Override + protected Collection> nodePlugins(String clusterAlias) { + List> plugins = new ArrayList<>(super.nodePlugins(clusterAlias)); + plugins.add(EsqlPlugin.class); + plugins.add(org.elasticsearch.xpack.esql.action.CrossClustersQueryIT.InternalExchangePlugin.class); + return plugins; + } + + public static class InternalExchangePlugin extends Plugin { + @Override + public List> getSettings() { + return List.of( + Setting.timeSetting( + ExchangeService.INACTIVE_SINKS_INTERVAL_SETTING, + TimeValue.timeValueSeconds(30), + Setting.Property.NodeScope + ) + ); + } + } + + public void testCCSAgainstDisconnectedRemoteWithSkipUnavailableTrue() throws Exception { + int numClusters = 3; + Map testClusterInfo = setupClusters(numClusters); + int localNumShards = (Integer) testClusterInfo.get("local.num_shards"); + int remote2NumShards = (Integer) testClusterInfo.get("remote2.num_shards"); + setSkipUnavailable(REMOTE_CLUSTER_1, true); + setSkipUnavailable(REMOTE_CLUSTER_2, true); + + Tuple includeCCSMetadata = randomIncludeCCSMetadata(); + Boolean requestIncludeMeta = includeCCSMetadata.v1(); + boolean responseExpectMeta = includeCCSMetadata.v2(); + + try { + // close remote-cluster-1 so that it is unavailable + cluster(REMOTE_CLUSTER_1).close(); + + try (EsqlQueryResponse resp = runQuery("FROM logs-*,*:logs-* | STATS sum (v)", requestIncludeMeta)) { + List> values = getValuesList(resp); + assertThat(values, hasSize(1)); + assertThat(values.get(0), equalTo(List.of(330L))); + + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertNotNull(executionInfo); + assertThat(executionInfo.isCrossClusterSearch(), is(true)); + long overallTookMillis = executionInfo.overallTook().millis(); + assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2, LOCAL_CLUSTER))); + + EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertThat(remote1Cluster.getIndexExpression(), equalTo("logs-*")); + assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(remote1Cluster.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(remote1Cluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis)); + assertThat(remote1Cluster.getTotalShards(), equalTo(0)); + assertThat(remote1Cluster.getSuccessfulShards(), equalTo(0)); + assertThat(remote1Cluster.getSkippedShards(), equalTo(0)); + assertThat(remote1Cluster.getFailedShards(), equalTo(0)); + + EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE_CLUSTER_2); + assertThat(remote2Cluster.getIndexExpression(), equalTo("logs-*")); + assertThat(remote2Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(remote2Cluster.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(remote2Cluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis)); + assertThat(remote2Cluster.getTotalShards(), equalTo(remote2NumShards)); + assertThat(remote2Cluster.getSuccessfulShards(), equalTo(remote2NumShards)); + assertThat(remote2Cluster.getSkippedShards(), equalTo(0)); + assertThat(remote2Cluster.getFailedShards(), equalTo(0)); + + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); + assertThat(localCluster.getIndexExpression(), equalTo("logs-*")); + assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(localCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis)); + assertThat(localCluster.getTotalShards(), equalTo(localNumShards)); + assertThat(localCluster.getSuccessfulShards(), equalTo(localNumShards)); + assertThat(localCluster.getSkippedShards(), equalTo(0)); + assertThat(localCluster.getFailedShards(), equalTo(0)); + + // ensure that the _clusters metadata is present only if requested + assertClusterMetadataInResponse(resp, responseExpectMeta); + } + + // scenario where there are no indices to match because + // 1) the local cluster indexExpression and REMOTE_CLUSTER_2 indexExpression match no indices + // 2) the REMOTE_CLUSTER_1 is unavailable + // 3) both remotes are marked as skip_un=true + String query = "FROM nomatch*," + REMOTE_CLUSTER_1 + ":logs-*," + REMOTE_CLUSTER_2 + ":nomatch* | STATS sum (v)"; + try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) { + List> values = getValuesList(resp); + assertThat(values, hasSize(0)); + + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertNotNull(executionInfo); + assertThat(executionInfo.isCrossClusterSearch(), is(true)); + long overallTookMillis = executionInfo.overallTook().millis(); + assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2, LOCAL_CLUSTER))); + + EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertThat(remote1Cluster.getIndexExpression(), equalTo("logs-*")); + assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(remote1Cluster.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(remote1Cluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis)); + assertThat(remote1Cluster.getTotalShards(), equalTo(0)); + assertThat(remote1Cluster.getSuccessfulShards(), equalTo(0)); + assertThat(remote1Cluster.getSkippedShards(), equalTo(0)); + assertThat(remote1Cluster.getFailedShards(), equalTo(0)); + + EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE_CLUSTER_2); + assertThat(remote2Cluster.getIndexExpression(), equalTo("nomatch*")); + assertThat(remote2Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(remote2Cluster.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(remote2Cluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis)); + assertThat(remote2Cluster.getTotalShards(), equalTo(0)); + assertThat(remote2Cluster.getSuccessfulShards(), equalTo(0)); + assertThat(remote2Cluster.getSkippedShards(), equalTo(0)); + assertThat(remote2Cluster.getFailedShards(), equalTo(0)); + + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); + assertThat(localCluster.getIndexExpression(), equalTo("nomatch*")); + // local cluster should never be marked as SKIPPED + assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(localCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis)); + assertThat(localCluster.getTotalShards(), equalTo(0)); + assertThat(localCluster.getSuccessfulShards(), equalTo(0)); + assertThat(localCluster.getSkippedShards(), equalTo(0)); + assertThat(localCluster.getFailedShards(), equalTo(0)); + + // ensure that the _clusters metadata is present only if requested + assertClusterMetadataInResponse(resp, responseExpectMeta); + } + + // close remote-cluster-2 so that it is also unavailable + cluster(REMOTE_CLUSTER_2).close(); + + try (EsqlQueryResponse resp = runQuery("FROM logs-*,*:logs-* | STATS sum (v)", requestIncludeMeta)) { + List> values = getValuesList(resp); + assertThat(values, hasSize(1)); + assertThat(values.get(0), equalTo(List.of(45L))); + + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertNotNull(executionInfo); + assertThat(executionInfo.isCrossClusterSearch(), is(true)); + long overallTookMillis = executionInfo.overallTook().millis(); + assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2, LOCAL_CLUSTER))); + + EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertThat(remote1Cluster.getIndexExpression(), equalTo("logs-*")); + assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(remote1Cluster.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(remote1Cluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis)); + assertThat(remote1Cluster.getTotalShards(), equalTo(0)); + assertThat(remote1Cluster.getSuccessfulShards(), equalTo(0)); + assertThat(remote1Cluster.getSkippedShards(), equalTo(0)); + assertThat(remote1Cluster.getFailedShards(), equalTo(0)); + + EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE_CLUSTER_2); + assertThat(remote2Cluster.getIndexExpression(), equalTo("logs-*")); + assertThat(remote2Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(remote2Cluster.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(remote2Cluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis)); + assertThat(remote2Cluster.getTotalShards(), equalTo(0)); + assertThat(remote2Cluster.getSuccessfulShards(), equalTo(0)); + assertThat(remote2Cluster.getSkippedShards(), equalTo(0)); + assertThat(remote2Cluster.getFailedShards(), equalTo(0)); + + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); + assertThat(localCluster.getIndexExpression(), equalTo("logs-*")); + assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(localCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis)); + assertThat(localCluster.getTotalShards(), equalTo(localNumShards)); + assertThat(localCluster.getSuccessfulShards(), equalTo(localNumShards)); + assertThat(localCluster.getSkippedShards(), equalTo(0)); + assertThat(localCluster.getFailedShards(), equalTo(0)); + + // ensure that the _clusters metadata is present only if requested + assertClusterMetadataInResponse(resp, responseExpectMeta); + } + } finally { + clearSkipUnavailable(numClusters); + } + } + + public void testRemoteOnlyCCSAgainstDisconnectedRemoteWithSkipUnavailableTrue() throws Exception { + int numClusters = 3; + setupClusters(numClusters); + setSkipUnavailable(REMOTE_CLUSTER_1, true); + setSkipUnavailable(REMOTE_CLUSTER_2, true); + + try { + // close remote cluster 1 so that it is unavailable + cluster(REMOTE_CLUSTER_1).close(); + + Tuple includeCCSMetadata = randomIncludeCCSMetadata(); + Boolean requestIncludeMeta = includeCCSMetadata.v1(); + boolean responseExpectMeta = includeCCSMetadata.v2(); + + // query only the REMOTE_CLUSTER_1 + try (EsqlQueryResponse resp = runQuery("FROM " + REMOTE_CLUSTER_1 + ":logs-* | STATS sum (v)", requestIncludeMeta)) { + List columns = resp.columns(); + assertThat(columns.size(), equalTo(1)); + // column from an empty result should be {"name":"","type":"null"} + assertThat(columns.get(0).name(), equalTo("")); + assertThat(columns.get(0).type(), equalTo(DataType.NULL)); + + List> values = getValuesList(resp); + assertThat(values, hasSize(0)); + + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertNotNull(executionInfo); + assertThat(executionInfo.isCrossClusterSearch(), is(true)); + long overallTookMillis = executionInfo.overallTook().millis(); + assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1))); + + EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertThat(remoteCluster.getIndexExpression(), equalTo("logs-*")); + assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(remoteCluster.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(remoteCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis)); + assertThat(remoteCluster.getTotalShards(), equalTo(0)); + assertThat(remoteCluster.getSuccessfulShards(), equalTo(0)); + assertThat(remoteCluster.getSkippedShards(), equalTo(0)); + assertThat(remoteCluster.getFailedShards(), equalTo(0)); + + // ensure that the _clusters metadata is present only if requested + assertClusterMetadataInResponse(resp, responseExpectMeta); + } + + // close remote cluster 2 so that it is also unavailable + cluster(REMOTE_CLUSTER_2).close(); + + // query only the both remote clusters + try ( + EsqlQueryResponse resp = runQuery( + "FROM " + REMOTE_CLUSTER_1 + ":logs-*," + REMOTE_CLUSTER_2 + ":logs-* | STATS sum (v)", + requestIncludeMeta + ) + ) { + List columns = resp.columns(); + assertThat(columns.size(), equalTo(1)); + // column from an empty result should be {"name":"","type":"null"} + assertThat(columns.get(0).name(), equalTo("")); + assertThat(columns.get(0).type(), equalTo(DataType.NULL)); + + List> values = getValuesList(resp); + assertThat(values, hasSize(0)); + + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertNotNull(executionInfo); + assertThat(executionInfo.isCrossClusterSearch(), is(true)); + long overallTookMillis = executionInfo.overallTook().millis(); + assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2))); + + EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertThat(remote1Cluster.getIndexExpression(), equalTo("logs-*")); + assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(remote1Cluster.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(remote1Cluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis)); + assertThat(remote1Cluster.getTotalShards(), equalTo(0)); + assertThat(remote1Cluster.getSuccessfulShards(), equalTo(0)); + assertThat(remote1Cluster.getSkippedShards(), equalTo(0)); + assertThat(remote1Cluster.getFailedShards(), equalTo(0)); + + EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE_CLUSTER_2); + assertThat(remote2Cluster.getIndexExpression(), equalTo("logs-*")); + assertThat(remote2Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(remote2Cluster.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(remote2Cluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis)); + assertThat(remote2Cluster.getTotalShards(), equalTo(0)); + assertThat(remote2Cluster.getSuccessfulShards(), equalTo(0)); + assertThat(remote2Cluster.getSkippedShards(), equalTo(0)); + assertThat(remote2Cluster.getFailedShards(), equalTo(0)); + + // ensure that the _clusters metadata is present only if requested + assertClusterMetadataInResponse(resp, responseExpectMeta); + } + + } finally { + clearSkipUnavailable(numClusters); + } + } + + public void testCCSAgainstDisconnectedRemoteWithSkipUnavailableFalse() throws Exception { + int numClusters = 2; + setupClusters(numClusters); + setSkipUnavailable(REMOTE_CLUSTER_1, false); + + try { + // close the remote cluster so that it is unavailable + cluster(REMOTE_CLUSTER_1).close(); + + Tuple includeCCSMetadata = randomIncludeCCSMetadata(); + Boolean requestIncludeMeta = includeCCSMetadata.v1(); + + final Exception exception = expectThrows( + Exception.class, + () -> runQuery("FROM logs-*,*:logs-* | STATS sum (v)", requestIncludeMeta) + ); + assertThat(ExceptionsHelper.isRemoteUnavailableException(exception), is(true)); + } finally { + clearSkipUnavailable(numClusters); + } + } + + public void testRemoteOnlyCCSAgainstDisconnectedRemoteWithSkipUnavailableFalse() throws Exception { + int numClusters = 3; + setupClusters(numClusters); + setSkipUnavailable(REMOTE_CLUSTER_1, false); + setSkipUnavailable(REMOTE_CLUSTER_2, randomBoolean()); + + try { + Tuple includeCCSMetadata = randomIncludeCCSMetadata(); + Boolean requestIncludeMeta = includeCCSMetadata.v1(); + { + // close the remote cluster so that it is unavailable + cluster(REMOTE_CLUSTER_1).close(); + Exception exception = expectThrows(Exception.class, () -> runQuery("FROM *:logs-* | STATS sum (v)", requestIncludeMeta)); + assertThat(ExceptionsHelper.isRemoteUnavailableException(exception), is(true)); + } + { + // close remote cluster 2 so that it is unavailable + cluster(REMOTE_CLUSTER_2).close(); + Exception exception = expectThrows(Exception.class, () -> runQuery("FROM *:logs-* | STATS sum (v)", requestIncludeMeta)); + assertThat(ExceptionsHelper.isRemoteUnavailableException(exception), is(true)); + } + } finally { + clearSkipUnavailable(numClusters); + } + } + + private void setSkipUnavailable(String clusterAlias, boolean skip) { + client(LOCAL_CLUSTER).admin() + .cluster() + .prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT) + .setPersistentSettings(Settings.builder().put("cluster.remote." + clusterAlias + ".skip_unavailable", skip).build()) + .get(); + } + + private void clearSkipUnavailable(int numClusters) { + assert numClusters == 2 || numClusters == 3 : "Only 2 or 3 clusters supported"; + Settings.Builder settingsBuilder = Settings.builder().putNull("cluster.remote." + REMOTE_CLUSTER_1 + ".skip_unavailable"); + if (numClusters == 3) { + settingsBuilder.putNull("cluster.remote." + REMOTE_CLUSTER_2 + ".skip_unavailable"); + } + client(LOCAL_CLUSTER).admin() + .cluster() + .prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT) + .setPersistentSettings(settingsBuilder.build()) + .get(); + } + + private static void assertClusterMetadataInResponse(EsqlQueryResponse resp, boolean responseExpectMeta) { + try { + final Map esqlResponseAsMap = XContentTestUtils.convertToMap(resp); + final Object clusters = esqlResponseAsMap.get("_clusters"); + if (responseExpectMeta) { + assertNotNull(clusters); + // test a few entries to ensure it looks correct (other tests do a full analysis of the metadata in the response) + @SuppressWarnings("unchecked") + Map inner = (Map) clusters; + assertTrue(inner.containsKey("total")); + assertTrue(inner.containsKey("details")); + } else { + assertNull(clusters); + } + } catch (IOException e) { + fail("Could not convert ESQL response to Map: " + e); + } + } + + protected EsqlQueryResponse runQuery(String query, Boolean ccsMetadataInResponse) { + EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest(); + request.query(query); + request.pragmas(AbstractEsqlIntegTestCase.randomPragmas()); + request.profile(randomInt(5) == 2); + request.columnar(randomBoolean()); + if (ccsMetadataInResponse != null) { + request.includeCCSMetadata(ccsMetadataInResponse); + } + return runQuery(request); + } + + protected EsqlQueryResponse runQuery(EsqlQueryRequest request) { + return client(LOCAL_CLUSTER).execute(EsqlQueryAction.INSTANCE, request).actionGet(30, TimeUnit.SECONDS); + } + + /** + * v1: value to send to runQuery (can be null; null means use default value) + * v2: whether to expect CCS Metadata in the response (cannot be null) + * @return + */ + public static Tuple randomIncludeCCSMetadata() { + return switch (randomIntBetween(1, 3)) { + case 1 -> new Tuple<>(Boolean.TRUE, Boolean.TRUE); + case 2 -> new Tuple<>(Boolean.FALSE, Boolean.FALSE); + case 3 -> new Tuple<>(null, Boolean.FALSE); + default -> throw new AssertionError("should not get here"); + }; + } + + Map setupClusters(int numClusters) { + assert numClusters == 2 || numClusters == 3 : "2 or 3 clusters supported not: " + numClusters; + String localIndex = "logs-1"; + int numShardsLocal = randomIntBetween(1, 5); + populateLocalIndices(localIndex, numShardsLocal); + + String remoteIndex = "logs-2"; + int numShardsRemote = randomIntBetween(1, 5); + populateRemoteIndices(REMOTE_CLUSTER_1, remoteIndex, numShardsRemote); + + Map clusterInfo = new HashMap<>(); + clusterInfo.put("local.num_shards", numShardsLocal); + clusterInfo.put("local.index", localIndex); + clusterInfo.put("remote.num_shards", numShardsRemote); + clusterInfo.put("remote.index", remoteIndex); + + if (numClusters == 3) { + int numShardsRemote2 = randomIntBetween(1, 5); + populateRemoteIndices(REMOTE_CLUSTER_2, remoteIndex, numShardsRemote2); + clusterInfo.put("remote2.index", remoteIndex); + clusterInfo.put("remote2.num_shards", numShardsRemote2); + } + + return clusterInfo; + } + + void populateLocalIndices(String indexName, int numShards) { + Client localClient = client(LOCAL_CLUSTER); + assertAcked( + localClient.admin() + .indices() + .prepareCreate(indexName) + .setSettings(Settings.builder().put("index.number_of_shards", numShards)) + .setMapping("id", "type=keyword", "tag", "type=keyword", "v", "type=long") + ); + for (int i = 0; i < 10; i++) { + localClient.prepareIndex(indexName).setSource("id", "local-" + i, "tag", "local", "v", i).get(); + } + localClient.admin().indices().prepareRefresh(indexName).get(); + } + + void populateRemoteIndices(String clusterAlias, String indexName, int numShards) { + Client remoteClient = client(clusterAlias); + assertAcked( + remoteClient.admin() + .indices() + .prepareCreate(indexName) + .setSettings(Settings.builder().put("index.number_of_shards", numShards)) + .setMapping("id", "type=keyword", "tag", "type=keyword", "v", "type=long") + ); + for (int i = 0; i < 10; i++) { + remoteClient.prepareIndex(indexName).setSource("id", "remote-" + i, "tag", "remote", "v", i * i).get(); + } + remoteClient.admin().indices().prepareRefresh(indexName).get(); + } +} diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java index ddd5cff014ed2..ba44adb5a85e0 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java @@ -25,6 +25,7 @@ import org.elasticsearch.test.AbstractMultiClustersTestCase; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.XContentTestUtils; +import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; import org.elasticsearch.xpack.esql.plugin.QueryPragmas; @@ -246,7 +247,8 @@ public void testSearchesWhereMissingIndicesAreSpecified() { EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); assertThat(localCluster.getIndexExpression(), equalTo("no_such_index")); - assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + // TODO: a follow on PR will change this to throw an Exception when the local cluster requests a concrete index that is missing + assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L)); assertThat(localCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis)); assertThat(localCluster.getTotalShards(), equalTo(0)); @@ -499,7 +501,7 @@ public void testCCSExecutionOnSearchesWithLimit0() { EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); assertThat(localCluster.getIndexExpression(), equalTo("nomatch*")); - assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L)); assertThat(localCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis)); assertThat(remoteCluster.getTotalShards(), equalTo(0)); @@ -803,6 +805,14 @@ Map setupTwoClusters() { clusterInfo.put("local.index", localIndex); clusterInfo.put("remote.num_shards", numShardsRemote); clusterInfo.put("remote.index", remoteIndex); + + String skipUnavailableKey = Strings.format("cluster.remote.%s.skip_unavailable", REMOTE_CLUSTER); + Setting skipUnavailableSetting = cluster(REMOTE_CLUSTER).clusterService().getClusterSettings().get(skipUnavailableKey); + boolean skipUnavailable = (boolean) cluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).clusterService() + .getClusterSettings() + .get(skipUnavailableSetting); + clusterInfo.put("remote.skip_unavailable", skipUnavailable); + return clusterInfo; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java index aeac14091f378..f2ab0355304b3 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.esql.action; import org.elasticsearch.TransportVersions; +import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -281,6 +282,7 @@ public static class Cluster implements ToXContentFragment, Writeable { private final Integer successfulShards; private final Integer skippedShards; private final Integer failedShards; + private final List failures; private final TimeValue took; // search latency for this cluster sub-search /** @@ -300,7 +302,7 @@ public String toString() { } public Cluster(String clusterAlias, String indexExpression) { - this(clusterAlias, indexExpression, true, Cluster.Status.RUNNING, null, null, null, null, null); + this(clusterAlias, indexExpression, true, Cluster.Status.RUNNING, null, null, null, null, null, null); } /** @@ -312,7 +314,7 @@ public Cluster(String clusterAlias, String indexExpression) { * @param skipUnavailable whether this Cluster is marked as skip_unavailable in remote cluster settings */ public Cluster(String clusterAlias, String indexExpression, boolean skipUnavailable) { - this(clusterAlias, indexExpression, skipUnavailable, Cluster.Status.RUNNING, null, null, null, null, null); + this(clusterAlias, indexExpression, skipUnavailable, Cluster.Status.RUNNING, null, null, null, null, null, null); } /** @@ -324,7 +326,7 @@ public Cluster(String clusterAlias, String indexExpression, boolean skipUnavaila * @param status current status of the search on this Cluster */ public Cluster(String clusterAlias, String indexExpression, boolean skipUnavailable, Cluster.Status status) { - this(clusterAlias, indexExpression, skipUnavailable, status, null, null, null, null, null); + this(clusterAlias, indexExpression, skipUnavailable, status, null, null, null, null, null, null); } public Cluster( @@ -336,6 +338,7 @@ public Cluster( Integer successfulShards, Integer skippedShards, Integer failedShards, + List failures, TimeValue took ) { assert clusterAlias != null : "clusterAlias cannot be null"; @@ -349,6 +352,11 @@ public Cluster( this.successfulShards = successfulShards; this.skippedShards = skippedShards; this.failedShards = failedShards; + if (failures == null) { + this.failures = List.of(); + } else { + this.failures = failures; + } this.took = took; } @@ -362,6 +370,11 @@ public Cluster(StreamInput in) throws IOException { this.failedShards = in.readOptionalInt(); this.took = in.readOptionalTimeValue(); this.skipUnavailable = in.readBoolean(); + if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_CCS_EXEC_INFO_WITH_FAILURES)) { + this.failures = Collections.unmodifiableList(in.readCollectionAsList(ShardSearchFailure::readShardSearchFailure)); + } else { + this.failures = List.of(); + } } @Override @@ -375,6 +388,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalInt(failedShards); out.writeOptionalTimeValue(took); out.writeBoolean(skipUnavailable); + if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_CCS_EXEC_INFO_WITH_FAILURES)) { + out.writeCollection(failures); + } } /** @@ -387,12 +403,12 @@ public void writeTo(StreamOutput out) throws IOException { * All other fields can be set and override the value in the "copyFrom" Cluster. */ public static class Builder { - private String indexExpression; private Cluster.Status status; private Integer totalShards; private Integer successfulShards; private Integer skippedShards; private Integer failedShards; + private List failures; private TimeValue took; private final Cluster original; @@ -408,22 +424,18 @@ public Builder(Cluster copyFrom) { public Cluster build() { return new Cluster( original.getClusterAlias(), - indexExpression == null ? original.getIndexExpression() : indexExpression, + original.getIndexExpression(), original.isSkipUnavailable(), status != null ? status : original.getStatus(), totalShards != null ? totalShards : original.getTotalShards(), successfulShards != null ? successfulShards : original.getSuccessfulShards(), skippedShards != null ? skippedShards : original.getSkippedShards(), failedShards != null ? failedShards : original.getFailedShards(), + failures != null ? failures : original.getFailures(), took != null ? took : original.getTook() ); } - public Cluster.Builder setIndexExpression(String indexExpression) { - this.indexExpression = indexExpression; - return this; - } - public Cluster.Builder setStatus(Cluster.Status status) { this.status = status; return this; @@ -449,6 +461,11 @@ public Cluster.Builder setFailedShards(int failedShards) { return this; } + public Cluster.Builder setFailures(List failures) { + this.failures = failures; + return this; + } + public Cluster.Builder setTook(TimeValue took) { this.took = took; return this; @@ -466,7 +483,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(STATUS_FIELD.getPreferredName(), getStatus().toString()); builder.field(INDICES_FIELD.getPreferredName(), indexExpression); if (took != null) { - // TODO: change this to took_nanos and call took.nanos? builder.field(TOOK.getPreferredName(), took.millis()); } if (totalShards != null) { @@ -483,6 +499,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } builder.endObject(); } + if (failures != null && failures.size() > 0) { + builder.startArray(RestActions.FAILURES_FIELD.getPreferredName()); + for (ShardSearchFailure failure : failures) { + failure.toXContent(builder, params); + } + builder.endArray(); + } } builder.endObject(); return builder; @@ -529,6 +552,10 @@ public Integer getFailedShards() { return failedShards; } + public List getFailures() { + return failures; + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/EnrichResolution.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/EnrichResolution.java index 7fb279f18b1dc..4f6886edc5fbc 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/EnrichResolution.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/EnrichResolution.java @@ -23,6 +23,7 @@ public final class EnrichResolution { private final Map resolvedPolicies = ConcurrentCollections.newConcurrentMap(); private final Map errors = ConcurrentCollections.newConcurrentMap(); + private final Map unavailableClusters = ConcurrentCollections.newConcurrentMap(); public ResolvedEnrichPolicy getResolvedPolicy(String policyName, Enrich.Mode mode) { return resolvedPolicies.get(new Key(policyName, mode)); @@ -51,6 +52,14 @@ public void addError(String policyName, Enrich.Mode mode, String reason) { errors.putIfAbsent(new Key(policyName, mode), reason); } + public void addUnavailableCluster(String clusterAlias, Exception e) { + unavailableClusters.put(clusterAlias, e); + } + + public Map getUnavailableClusters() { + return unavailableClusters; + } + private record Key(String policyName, Enrich.Mode mode) { } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java index e67c406e26929..77ef5ef597bb5 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.esql.enrich; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.search.SearchRequest; @@ -50,6 +51,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -113,12 +115,27 @@ public void resolvePolicies( final boolean includeLocal = remoteClusters.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); lookupPolicies(remoteClusters, includeLocal, unresolvedPolicies, listener.map(lookupResponses -> { final EnrichResolution enrichResolution = new EnrichResolution(); + + Map lookupResponsesToProcess = new HashMap<>(); + + for (Map.Entry entry : lookupResponses.entrySet()) { + String clusterAlias = entry.getKey(); + if (entry.getValue().connectionError != null) { + enrichResolution.addUnavailableCluster(clusterAlias, entry.getValue().connectionError); + // remove unavailable cluster from the list of clusters which is used below to create the ResolvedEnrichPolicy + remoteClusters.remove(clusterAlias); + } else { + lookupResponsesToProcess.put(clusterAlias, entry.getValue()); + } + } + for (UnresolvedPolicy unresolved : unresolvedPolicies) { Tuple resolved = mergeLookupResults( unresolved, calculateTargetClusters(unresolved.mode, includeLocal, remoteClusters), - lookupResponses + lookupResponsesToProcess ); + if (resolved.v1() != null) { enrichResolution.addResolvedPolicy(unresolved.name, unresolved.mode, resolved.v1()); } else { @@ -149,13 +166,16 @@ private Tuple mergeLookupResults( Collection targetClusters, Map lookupResults ) { - assert targetClusters.isEmpty() == false; String policyName = unresolved.name; + if (targetClusters.isEmpty()) { + return Tuple.tuple(null, "enrich policy [" + policyName + "] cannot be resolved since remote clusters are unavailable"); + } final Map policies = new HashMap<>(); final List failures = new ArrayList<>(); for (String cluster : targetClusters) { LookupResponse lookupResult = lookupResults.get(cluster); if (lookupResult != null) { + assert lookupResult.connectionError == null : "Should never have a non-null connectionError here"; ResolvedEnrichPolicy policy = lookupResult.policies.get(policyName); if (policy != null) { policies.put(cluster, policy); @@ -261,22 +281,34 @@ private void lookupPolicies( if (remotePolicies.isEmpty() == false) { for (String cluster : remoteClusters) { ActionListener lookupListener = refs.acquire(resp -> lookupResponses.put(cluster, resp)); - getRemoteConnection( - cluster, - lookupListener.delegateFailureAndWrap( - (delegate, connection) -> transportService.sendRequest( + getRemoteConnection(cluster, new ActionListener() { + @Override + public void onResponse(Transport.Connection connection) { + transportService.sendRequest( connection, RESOLVE_ACTION_NAME, new LookupRequest(cluster, remotePolicies), TransportRequestOptions.EMPTY, - new ActionListenerResponseHandler<>( - delegate, - LookupResponse::new, - threadPool.executor(ThreadPool.Names.SEARCH) - ) - ) - ) - ); + new ActionListenerResponseHandler<>(lookupListener.delegateResponse((l, e) -> { + if (ExceptionsHelper.isRemoteUnavailableException(e) + && remoteClusterService.isSkipUnavailable(cluster)) { + l.onResponse(new LookupResponse(e)); + } else { + l.onFailure(e); + } + }), LookupResponse::new, threadPool.executor(ThreadPool.Names.SEARCH)) + ); + } + + @Override + public void onFailure(Exception e) { + if (ExceptionsHelper.isRemoteUnavailableException(e) && remoteClusterService.isSkipUnavailable(cluster)) { + lookupListener.onResponse(new LookupResponse(e)); + } else { + lookupListener.onFailure(e); + } + } + }); } } // local cluster @@ -323,16 +355,30 @@ public void writeTo(StreamOutput out) throws IOException { private static class LookupResponse extends TransportResponse { final Map policies; final Map failures; + // does not need to be Writable since this indicates a failure to contact a remote cluster, so only set on querying cluster + final transient Exception connectionError; LookupResponse(Map policies, Map failures) { this.policies = policies; this.failures = failures; + this.connectionError = null; + } + + /** + * Use this constructor when the remote cluster is unavailable to indicate inability to do the enrich policy lookup + * @param connectionError Exception received when trying to connect to a remote cluster + */ + LookupResponse(Exception connectionError) { + this.policies = Collections.emptyMap(); + this.failures = Collections.emptyMap(); + this.connectionError = connectionError; } LookupResponse(StreamInput in) throws IOException { PlanStreamInput planIn = new PlanStreamInput(in, in.namedWriteableRegistry(), null); this.policies = planIn.readMap(StreamInput::readString, ResolvedEnrichPolicy::new); this.failures = planIn.readMap(StreamInput::readString, StreamInput::readString); + this.connectionError = null; } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/index/IndexResolution.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/index/IndexResolution.java index 371aa1b632309..b2eaefcf09d65 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/index/IndexResolution.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/index/IndexResolution.java @@ -6,27 +6,28 @@ */ package org.elasticsearch.xpack.esql.index; +import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure; import org.elasticsearch.core.Nullable; import java.util.Collections; +import java.util.Map; import java.util.Objects; -import java.util.Set; public final class IndexResolution { - public static IndexResolution valid(EsIndex index, Set unavailableClusters) { + public static IndexResolution valid(EsIndex index, Map unavailableClusters) { Objects.requireNonNull(index, "index must not be null if it was found"); Objects.requireNonNull(unavailableClusters, "unavailableClusters must not be null"); return new IndexResolution(index, null, unavailableClusters); } public static IndexResolution valid(EsIndex index) { - return valid(index, Collections.emptySet()); + return valid(index, Collections.emptyMap()); } public static IndexResolution invalid(String invalid) { Objects.requireNonNull(invalid, "invalid must not be null to signal that the index is invalid"); - return new IndexResolution(null, invalid, Collections.emptySet()); + return new IndexResolution(null, invalid, Collections.emptyMap()); } public static IndexResolution notFound(String name) { @@ -39,9 +40,9 @@ public static IndexResolution notFound(String name) { private final String invalid; // remote clusters included in the user's index expression that could not be connected to - private final Set unavailableClusters; + private final Map unavailableClusters; - private IndexResolution(EsIndex index, @Nullable String invalid, Set unavailableClusters) { + private IndexResolution(EsIndex index, @Nullable String invalid, Map unavailableClusters) { this.index = index; this.invalid = invalid; this.unavailableClusters = unavailableClusters; @@ -70,7 +71,11 @@ public boolean isValid() { return invalid == null; } - public Set getUnavailableClusters() { + /** + * @return Map of unavailable clusters (could not be connected to during field-caps query). Key of map is cluster alias, + * value is the {@link FieldCapabilitiesFailure} describing the issue. + */ + public Map getUnavailableClusters() { return unavailableClusters; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index ccd167942340c..1e78f454b7531 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -7,8 +7,11 @@ package org.elasticsearch.xpack.esql.session; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; +import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure; +import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.Iterators; @@ -22,7 +25,9 @@ import org.elasticsearch.indices.IndicesExpressionGrouper; import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; +import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.RemoteClusterAware; +import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; import org.elasticsearch.xpack.esql.action.EsqlQueryRequest; import org.elasticsearch.xpack.esql.analysis.Analyzer; @@ -44,6 +49,7 @@ import org.elasticsearch.xpack.esql.enrich.ResolvedEnrichPolicy; import org.elasticsearch.xpack.esql.expression.UnresolvedNamePattern; import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry; +import org.elasticsearch.xpack.esql.index.EsIndex; import org.elasticsearch.xpack.esql.index.IndexResolution; import org.elasticsearch.xpack.esql.index.MappingException; import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer; @@ -68,6 +74,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -143,12 +150,105 @@ public void execute( analyzedPlan( parse(request.query(), request.params()), executionInfo, - listener.delegateFailureAndWrap( - (next, analyzedPlan) -> executeOptimizedPlan(request, executionInfo, runPhase, optimizedPlan(analyzedPlan), next) - ) + new LogicalPlanActionListener(request, executionInfo, runPhase, listener) ); } + /** + * ActionListener that receives LogicalPlan or error from logical planning. + * Any Exception sent to onFailure stops processing, but not all are fatal (return a 4xx or 5xx), so + * the onFailure handler determines whether to return an empty successful result or a 4xx/5xx error. + */ + class LogicalPlanActionListener implements ActionListener { + private final EsqlQueryRequest request; + private final EsqlExecutionInfo executionInfo; + private final BiConsumer> runPhase; + private final ActionListener listener; + + LogicalPlanActionListener( + EsqlQueryRequest request, + EsqlExecutionInfo executionInfo, + BiConsumer> runPhase, + ActionListener listener + ) { + this.request = request; + this.executionInfo = executionInfo; + this.runPhase = runPhase; + this.listener = listener; + } + + @Override + public void onResponse(LogicalPlan analyzedPlan) { + executeOptimizedPlan(request, executionInfo, runPhase, optimizedPlan(analyzedPlan), listener); + } + + /** + * Whether to return an empty result (HTTP status 200) for a CCS rather than a top level 4xx/5xx error. + * + * For cases where field-caps had no indices to search and the remotes were unavailable, we + * return an empty successful response (200) if all remotes are marked with skip_unavailable=true. + * + * Note: a follow-on PR will expand this logic to handle cases where no indices could be found to match + * on any of the requested clusters. + */ + private boolean returnSuccessWithEmptyResult(Exception e) { + if (executionInfo.isCrossClusterSearch() == false) { + return false; + } + + if (e instanceof NoClustersToSearchException || ExceptionsHelper.isRemoteUnavailableException(e)) { + for (String clusterAlias : executionInfo.clusterAliases()) { + if (executionInfo.isSkipUnavailable(clusterAlias) == false + && clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false) { + return false; + } + } + return true; + } + return false; + } + + @Override + public void onFailure(Exception e) { + if (returnSuccessWithEmptyResult(e)) { + executionInfo.markEndQuery(); + Exception exceptionForResponse; + if (e instanceof ConnectTransportException) { + // when field-caps has no field info (since no clusters could be connected to or had matching indices) + // it just throws the first exception in its list, so this odd special handling is here is to avoid + // having one specific remote alias name in all failure lists in the metadata response + exceptionForResponse = new RemoteTransportException( + "connect_transport_exception - unable to connect to remote cluster", + null + ); + } else { + exceptionForResponse = e; + } + for (String clusterAlias : executionInfo.clusterAliases()) { + executionInfo.swapCluster(clusterAlias, (k, v) -> { + EsqlExecutionInfo.Cluster.Builder builder = new EsqlExecutionInfo.Cluster.Builder(v).setTook( + executionInfo.overallTook() + ).setTotalShards(0).setSuccessfulShards(0).setSkippedShards(0).setFailedShards(0); + if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) { + // never mark local cluster as skipped + builder.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL); + } else { + builder.setStatus(EsqlExecutionInfo.Cluster.Status.SKIPPED); + // add this exception to the failures list only if there is no failure already recorded there + if (v.getFailures() == null || v.getFailures().size() == 0) { + builder.setFailures(List.of(new ShardSearchFailure(exceptionForResponse))); + } + } + return builder.build(); + }); + } + listener.onResponse(new Result(Analyzer.NO_FIELDS, Collections.emptyList(), Collections.emptyList(), executionInfo)); + } else { + listener.onFailure(e); + } + } + } + /** * Execute an analyzed plan. Most code should prefer calling {@link #execute} but * this is public for testing. See {@link Phased} for the sequence of operations. @@ -161,8 +261,8 @@ public void executeOptimizedPlan( ActionListener listener ) { LogicalPlan firstPhase = Phased.extractFirstPhase(optimizedPlan); + updateExecutionInfoAtEndOfPlanning(executionInfo); if (firstPhase == null) { - updateExecutionInfoAtEndOfPlanning(executionInfo); runPhase.accept(logicalPlanToPhysicalPlan(optimizedPlan, request), listener); } else { executePhased(new ArrayList<>(), optimizedPlan, request, executionInfo, firstPhase, runPhase, listener); @@ -242,17 +342,30 @@ private void preAnalyze( .stream() .map(ResolvedEnrichPolicy::matchField) .collect(Collectors.toSet()); - preAnalyzeIndices(parsed, executionInfo, l.delegateFailureAndWrap((ll, indexResolution) -> { + Map unavailableClusters = enrichResolution.getUnavailableClusters(); + preAnalyzeIndices(parsed, executionInfo, unavailableClusters, l.delegateFailureAndWrap((ll, indexResolution) -> { + // TODO in follow-PR (for skip_unavailble handling of missing concrete indexes) add some tests for invalid index + // resolution to updateExecutionInfo if (indexResolution.isValid()) { updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.getUnavailableClusters()); + if (executionInfo.isCrossClusterSearch() + && executionInfo.getClusterStateCount(EsqlExecutionInfo.Cluster.Status.RUNNING) == 0) { + // for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel + // Exception to let the LogicalPlanActionListener decide how to proceed + ll.onFailure(new NoClustersToSearchException()); + return; + } + Set newClusters = enrichPolicyResolver.groupIndicesPerCluster( indexResolution.get().concreteIndices().toArray(String[]::new) ).keySet(); // If new clusters appear when resolving the main indices, we need to resolve the enrich policies again // or exclude main concrete indices. Since this is rare, it's simpler to resolve the enrich policies again. // TODO: add a test for this - if (targetClusters.containsAll(newClusters) == false) { + if (targetClusters.containsAll(newClusters) == false + // do not bother with a re-resolution if only remotes were requested and all were offline + && executionInfo.getClusterStateCount(EsqlExecutionInfo.Cluster.Status.RUNNING) > 0) { enrichPolicyResolver.resolvePolicies( newClusters, unresolvedPolicies, @@ -269,6 +382,7 @@ private void preAnalyze( private void preAnalyzeIndices( LogicalPlan parsed, EsqlExecutionInfo executionInfo, + Map unavailableClusters, // known to be unavailable from the enrich policy API call ActionListener listener, Set enrichPolicyMatchFields ) { @@ -288,10 +402,34 @@ private void preAnalyzeIndices( String indexExpr = Strings.arrayToCommaDelimitedString(entry.getValue().indices()); executionInfo.swapCluster(clusterAlias, (k, v) -> { assert v == null : "No cluster for " + clusterAlias + " should have been added to ExecutionInfo yet"; - return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.isSkipUnavailable(clusterAlias)); + if (unavailableClusters.containsKey(k)) { + return new EsqlExecutionInfo.Cluster( + clusterAlias, + indexExpr, + executionInfo.isSkipUnavailable(clusterAlias), + EsqlExecutionInfo.Cluster.Status.SKIPPED, + 0, + 0, + 0, + 0, + List.of(new ShardSearchFailure(unavailableClusters.get(k))), + new TimeValue(0) + ); + } else { + return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.isSkipUnavailable(clusterAlias)); + } }); } - indexResolver.resolveAsMergedMapping(table.index(), fieldNames, listener); + // if the preceding call to the enrich policy API found unavailable clusters, recreate the index expression to search + // based only on available clusters (which could now be an empty list) + String indexExpressionToResolve = createIndexExpressionFromAvailableClusters(executionInfo); + if (indexExpressionToResolve.isEmpty()) { + // if this was a pure remote CCS request (no local indices) and all remotes are offline, return an empty IndexResolution + listener.onResponse(IndexResolution.valid(new EsIndex(table.index(), Map.of(), Map.of()))); + } else { + // call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types + indexResolver.resolveAsMergedMapping(indexExpressionToResolve, fieldNames, listener); + } } else { try { // occurs when dealing with local relations (row a = 1) @@ -302,6 +440,30 @@ private void preAnalyzeIndices( } } + // visible for testing + static String createIndexExpressionFromAvailableClusters(EsqlExecutionInfo executionInfo) { + StringBuilder sb = new StringBuilder(); + for (String clusterAlias : executionInfo.clusterAliases()) { + EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(clusterAlias); + if (cluster.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) { + if (cluster.getClusterAlias().equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) { + sb.append(executionInfo.getCluster(clusterAlias).getIndexExpression()).append(','); + } else { + String indexExpression = executionInfo.getCluster(clusterAlias).getIndexExpression(); + for (String index : indexExpression.split(",")) { + sb.append(clusterAlias).append(':').append(index).append(','); + } + } + } + } + + if (sb.length() > 0) { + return sb.substring(0, sb.length() - 1); + } else { + return ""; + } + } + static Set fieldNames(LogicalPlan parsed, Set enrichPolicyMatchFields) { if (false == parsed.anyMatch(plan -> plan instanceof Aggregate || plan instanceof Project)) { // no explicit columns selection, for example "from employees" @@ -446,14 +608,28 @@ public PhysicalPlan optimizedPhysicalPlan(LogicalPlan optimizedPlan) { return plan; } - // visible for testing - static void updateExecutionInfoWithUnavailableClusters(EsqlExecutionInfo executionInfo, Set unavailableClusters) { - for (String clusterAlias : unavailableClusters) { - executionInfo.swapCluster( - clusterAlias, - (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SKIPPED).build() + static void updateExecutionInfoWithUnavailableClusters(EsqlExecutionInfo execInfo, Map unavailable) { + for (Map.Entry entry : unavailable.entrySet()) { + String clusterAlias = entry.getKey(); + boolean skipUnavailable = execInfo.getCluster(clusterAlias).isSkipUnavailable(); + RemoteTransportException e = new RemoteTransportException( + Strings.format("Remote cluster [%s] (with setting skip_unavailable=%s) is not available", clusterAlias, skipUnavailable), + entry.getValue().getException() ); - // TODO: follow-on PR will set SKIPPED status when skip_unavailable=true and throw an exception when skip_un=false + if (skipUnavailable) { + execInfo.swapCluster( + clusterAlias, + (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SKIPPED) + .setTotalShards(0) + .setSuccessfulShards(0) + .setSkippedShards(0) + .setFailedShards(0) + .setFailures(List.of(new ShardSearchFailure(e))) + .build() + ); + } else { + throw e; + } } } @@ -466,16 +642,22 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionIn } Set clustersRequested = executionInfo.clusterAliases(); Set clustersWithNoMatchingIndices = Sets.difference(clustersRequested, clustersWithResolvedIndices); - clustersWithNoMatchingIndices.removeAll(indexResolution.getUnavailableClusters()); + clustersWithNoMatchingIndices.removeAll(indexResolution.getUnavailableClusters().keySet()); /* * These are clusters in the original request that are not present in the field-caps response. They were * specified with an index or indices that do not exist, so the search on that cluster is done. * Mark it as SKIPPED with 0 shards searched and took=0. */ for (String c : clustersWithNoMatchingIndices) { + // TODO: in a follow-on PR, throw a Verification(400 status code) for local and remotes with skip_unavailable=false if + // they were requested with one or more concrete indices + // for now we never mark the local cluster as SKIPPED + final var status = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(c) + ? EsqlExecutionInfo.Cluster.Status.SUCCESSFUL + : EsqlExecutionInfo.Cluster.Status.SKIPPED; executionInfo.swapCluster( c, - (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SKIPPED) + (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(status) .setTook(new TimeValue(0)) .setTotalShards(0) .setSuccessfulShards(0) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java index c0f94bccc50a4..f76f7798dece8 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java @@ -90,6 +90,7 @@ public void resolveAsMergedMapping(String indexWildcard, Set fieldNames, public IndexResolution mergedMappings(String indexPattern, FieldCapabilitiesResponse fieldCapsResponse) { assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH_COORDINATION); // too expensive to run this on a transport worker if (fieldCapsResponse.getIndexResponses().isEmpty()) { + // TODO in follow-on PR, handle the case where remotes were specified with non-existent indices, according to skip_unavailable return IndexResolution.notFound(indexPattern); } @@ -158,18 +159,18 @@ public IndexResolution mergedMappings(String indexPattern, FieldCapabilitiesResp for (FieldCapabilitiesIndexResponse ir : fieldCapsResponse.getIndexResponses()) { concreteIndices.put(ir.getIndexName(), ir.getIndexMode()); } - Set unavailableRemoteClusters = determineUnavailableRemoteClusters(fieldCapsResponse.getFailures()); - return IndexResolution.valid(new EsIndex(indexPattern, rootFields, concreteIndices), unavailableRemoteClusters); + Map unavailableRemotes = determineUnavailableRemoteClusters(fieldCapsResponse.getFailures()); + return IndexResolution.valid(new EsIndex(indexPattern, rootFields, concreteIndices), unavailableRemotes); } // visible for testing - static Set determineUnavailableRemoteClusters(List failures) { - Set unavailableRemotes = new HashSet<>(); + static Map determineUnavailableRemoteClusters(List failures) { + Map unavailableRemotes = new HashMap<>(); for (FieldCapabilitiesFailure failure : failures) { if (ExceptionsHelper.isRemoteUnavailableException(failure.getException())) { for (String indexExpression : failure.getIndices()) { if (indexExpression.indexOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR) > 0) { - unavailableRemotes.add(RemoteClusterAware.parseClusterAlias(indexExpression)); + unavailableRemotes.put(RemoteClusterAware.parseClusterAlias(indexExpression), failure); } } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/NoClustersToSearchException.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/NoClustersToSearchException.java new file mode 100644 index 0000000000000..f7ae78a521933 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/NoClustersToSearchException.java @@ -0,0 +1,15 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.session; + +/** + * Sentinel exception indicating that logical planning could not find any clusters to search + * when, for a remote-only cross-cluster search, all clusters have been marked as SKIPPED. + * Intended for use only on the querying coordinating during ES|QL logical planning. + */ +public class NoClustersToSearchException extends RuntimeException {} diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java index 27343bf7ce205..4aaf4f6cccf0f 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java @@ -147,6 +147,7 @@ EsqlExecutionInfo createExecutionInfo() { 10, 3, 0, + null, new TimeValue(4444L) ) ); @@ -161,6 +162,7 @@ EsqlExecutionInfo createExecutionInfo() { 12, 5, 0, + null, new TimeValue(4999L) ) ); @@ -498,6 +500,7 @@ private static EsqlExecutionInfo.Cluster parseCluster(String clusterAlias, XCont successfulShardsFinal, skippedShardsFinal, failedShardsFinal, + null, tookTimeValue ); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java index 5fbd5dd28050f..625cb5628d039 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java @@ -313,6 +313,7 @@ public void testAcquireComputeRunningOnRemoteClusterFillsInTookTime() { 10, 3, 0, + null, null // to be filled in the acquireCompute listener ) ); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionTests.java index 32b31cf78650b..dddfa67338419 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionTests.java @@ -7,117 +7,200 @@ package org.elasticsearch.xpack.esql.session; +import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure; +import org.elasticsearch.common.Strings; import org.elasticsearch.index.IndexMode; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.transport.NoSeedNodeLeftException; import org.elasticsearch.transport.RemoteClusterAware; +import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; import org.elasticsearch.xpack.esql.core.type.EsField; import org.elasticsearch.xpack.esql.index.EsIndex; import org.elasticsearch.xpack.esql.index.IndexResolution; import org.elasticsearch.xpack.esql.type.EsFieldTests; +import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; public class EsqlSessionTests extends ESTestCase { - public void testUpdateExecutionInfoWithUnavailableClusters() { - // skip_unavailable=true clusters are unavailable, both marked as SKIPPED + public void testCreateIndexExpressionFromAvailableClusters() { + final String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; + final String remote1Alias = "remote1"; + final String remote2Alias = "remote2"; + + // no clusters marked as skipped { - final String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; - final String remote1Alias = "remote1"; - final String remote2Alias = "remote2"; EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", true)); - EsqlSession.updateExecutionInfoWithUnavailableClusters(executionInfo, Set.of(remote1Alias, remote2Alias)); + String indexExpr = EsqlSession.createIndexExpressionFromAvailableClusters(executionInfo); + List list = Arrays.stream(Strings.splitStringByCommaToArray(indexExpr)).toList(); + assertThat(list.size(), equalTo(5)); + assertThat( + new HashSet<>(list), + equalTo(Strings.commaDelimitedListToSet("logs*,remote1:*,remote2:mylogs1,remote2:mylogs2,remote2:logs*")) + ); + } - assertThat(executionInfo.clusterAliases(), equalTo(Set.of(localClusterAlias, remote1Alias, remote2Alias))); - assertNull(executionInfo.overallTook()); + // one cluster marked as skipped, so not present in revised index expression + { + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); + executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); + executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*,foo", true)); + executionInfo.swapCluster( + remote2Alias, + (k, v) -> new EsqlExecutionInfo.Cluster( + remote2Alias, + "mylogs1,mylogs2,logs*", + true, + EsqlExecutionInfo.Cluster.Status.SKIPPED + ) + ); - EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias); - assertThat(localCluster.getIndexExpression(), equalTo("logs*")); - assertClusterStatusAndHasNullCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); + String indexExpr = EsqlSession.createIndexExpressionFromAvailableClusters(executionInfo); + List list = Arrays.stream(Strings.splitStringByCommaToArray(indexExpr)).toList(); + assertThat(list.size(), equalTo(3)); + assertThat(new HashSet<>(list), equalTo(Strings.commaDelimitedListToSet("logs*,remote1:*,remote1:foo"))); + } - EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias); - assertThat(remote1Cluster.getIndexExpression(), equalTo("*")); - assertClusterStatusAndHasNullCounts(remote1Cluster, EsqlExecutionInfo.Cluster.Status.SKIPPED); + // two clusters marked as skipped, so only local cluster present in revised index expression + { + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); + executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); + executionInfo.swapCluster( + remote1Alias, + (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*,foo", true, EsqlExecutionInfo.Cluster.Status.SKIPPED) + ); + executionInfo.swapCluster( + remote2Alias, + (k, v) -> new EsqlExecutionInfo.Cluster( + remote2Alias, + "mylogs1,mylogs2,logs*", + true, + EsqlExecutionInfo.Cluster.Status.SKIPPED + ) + ); - EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias); - assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1,mylogs2,logs*")); - assertClusterStatusAndHasNullCounts(remote2Cluster, EsqlExecutionInfo.Cluster.Status.SKIPPED); + assertThat(EsqlSession.createIndexExpressionFromAvailableClusters(executionInfo), equalTo("logs*")); + } + + // only remotes present and all marked as skipped, so in revised index expression should be empty string + { + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); + executionInfo.swapCluster( + remote1Alias, + (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*,foo", true, EsqlExecutionInfo.Cluster.Status.SKIPPED) + ); + executionInfo.swapCluster( + remote2Alias, + (k, v) -> new EsqlExecutionInfo.Cluster( + remote2Alias, + "mylogs1,mylogs2,logs*", + true, + EsqlExecutionInfo.Cluster.Status.SKIPPED + ) + ); + + assertThat(EsqlSession.createIndexExpressionFromAvailableClusters(executionInfo), equalTo("")); } + } + + public void testUpdateExecutionInfoWithUnavailableClusters() { + final String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; + final String remote1Alias = "remote1"; + final String remote2Alias = "remote2"; - // skip_unavailable=false cluster is unavailable, marked as SKIPPED // TODO: in follow on PR this will change to throwing an - // Exception + // skip_unavailable=true clusters are unavailable, both marked as SKIPPED { - final String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; - final String remote1Alias = "remote1"; - final String remote2Alias = "remote2"; EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); - executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", false)); + executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", true)); - EsqlSession.updateExecutionInfoWithUnavailableClusters(executionInfo, Set.of(remote2Alias)); + var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect")); + var unvailableClusters = Map.of(remote1Alias, failure, remote2Alias, failure); + EsqlSession.updateExecutionInfoWithUnavailableClusters(executionInfo, unvailableClusters); assertThat(executionInfo.clusterAliases(), equalTo(Set.of(localClusterAlias, remote1Alias, remote2Alias))); assertNull(executionInfo.overallTook()); EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias); assertThat(localCluster.getIndexExpression(), equalTo("logs*")); - assertClusterStatusAndHasNullCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); + assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias); assertThat(remote1Cluster.getIndexExpression(), equalTo("*")); - assertClusterStatusAndHasNullCounts(remote1Cluster, EsqlExecutionInfo.Cluster.Status.RUNNING); + assertClusterStatusAndShardCounts(remote1Cluster, EsqlExecutionInfo.Cluster.Status.SKIPPED); EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias); assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1,mylogs2,logs*")); - assertClusterStatusAndHasNullCounts(remote2Cluster, EsqlExecutionInfo.Cluster.Status.SKIPPED); + assertClusterStatusAndShardCounts(remote2Cluster, EsqlExecutionInfo.Cluster.Status.SKIPPED); + } + + // skip_unavailable=false cluster is unavailable, throws Exception + { + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); + executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); + executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); + executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", false)); + + var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect")); + RemoteTransportException e = expectThrows( + RemoteTransportException.class, + () -> EsqlSession.updateExecutionInfoWithUnavailableClusters(executionInfo, Map.of(remote2Alias, failure)) + ); + assertThat(e.status().getStatus(), equalTo(500)); + assertThat( + e.getDetailedMessage(), + containsString("Remote cluster [remote2] (with setting skip_unavailable=false) is not available") + ); + assertThat(e.getCause().getMessage(), containsString("unable to connect")); } // all clusters available, no Clusters in ExecutionInfo should be modified { - final String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; - final String remote1Alias = "remote1"; - final String remote2Alias = "remote2"; EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", false)); - EsqlSession.updateExecutionInfoWithUnavailableClusters(executionInfo, Set.of()); + EsqlSession.updateExecutionInfoWithUnavailableClusters(executionInfo, Map.of()); assertThat(executionInfo.clusterAliases(), equalTo(Set.of(localClusterAlias, remote1Alias, remote2Alias))); assertNull(executionInfo.overallTook()); EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias); assertThat(localCluster.getIndexExpression(), equalTo("logs*")); - assertClusterStatusAndHasNullCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); + assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias); assertThat(remote1Cluster.getIndexExpression(), equalTo("*")); - assertClusterStatusAndHasNullCounts(remote1Cluster, EsqlExecutionInfo.Cluster.Status.RUNNING); + assertClusterStatusAndShardCounts(remote1Cluster, EsqlExecutionInfo.Cluster.Status.RUNNING); EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias); assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1,mylogs2,logs*")); - assertClusterStatusAndHasNullCounts(remote2Cluster, EsqlExecutionInfo.Cluster.Status.RUNNING); + assertClusterStatusAndShardCounts(remote2Cluster, EsqlExecutionInfo.Cluster.Status.RUNNING); } } public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { + final String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; + final String remote1Alias = "remote1"; + final String remote2Alias = "remote2"; // all clusters present in EsIndex, so no updates to EsqlExecutionInfo should happen { - final String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; - final String remote1Alias = "remote1"; - final String remote2Alias = "remote2"; EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); @@ -139,28 +222,25 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { IndexMode.STANDARD ) ); - IndexResolution indexResolution = IndexResolution.valid(esIndex, Set.of()); + IndexResolution indexResolution = IndexResolution.valid(esIndex, Map.of()); EsqlSession.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias); assertThat(localCluster.getIndexExpression(), equalTo("logs*")); - assertClusterStatusAndHasNullCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); + assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias); assertThat(remote1Cluster.getIndexExpression(), equalTo("*")); - assertClusterStatusAndHasNullCounts(remote1Cluster, EsqlExecutionInfo.Cluster.Status.RUNNING); + assertClusterStatusAndShardCounts(remote1Cluster, EsqlExecutionInfo.Cluster.Status.RUNNING); EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias); assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1,mylogs2,logs*")); - assertClusterStatusAndHasNullCounts(remote2Cluster, EsqlExecutionInfo.Cluster.Status.RUNNING); + assertClusterStatusAndShardCounts(remote2Cluster, EsqlExecutionInfo.Cluster.Status.RUNNING); } // remote1 is missing from EsIndex info, so it should be updated and marked as SKIPPED with 0 total shards, 0 took time, etc. { - final String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; - final String remote1Alias = "remote1"; - final String remote2Alias = "remote2"; EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); @@ -180,13 +260,13 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { IndexMode.STANDARD ) ); - IndexResolution indexResolution = IndexResolution.valid(esIndex, Set.of()); + IndexResolution indexResolution = IndexResolution.valid(esIndex, Map.of()); EsqlSession.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias); assertThat(localCluster.getIndexExpression(), equalTo("logs*")); - assertClusterStatusAndHasNullCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); + assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias); assertThat(remote1Cluster.getIndexExpression(), equalTo("*")); @@ -199,14 +279,11 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias); assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1,mylogs2,logs*")); - assertClusterStatusAndHasNullCounts(remote2Cluster, EsqlExecutionInfo.Cluster.Status.RUNNING); + assertClusterStatusAndShardCounts(remote2Cluster, EsqlExecutionInfo.Cluster.Status.RUNNING); } // all remotes are missing from EsIndex info, so they should be updated and marked as SKIPPED with 0 total shards, 0 took time, etc. { - final String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; - final String remote1Alias = "remote1"; - final String remote2Alias = "remote2"; EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); @@ -217,21 +294,21 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { randomMapping(), Map.of("logs-a", IndexMode.STANDARD) ); - // mark remote1 as unavailable - IndexResolution indexResolution = IndexResolution.valid(esIndex, Set.of(remote1Alias)); + // remote1 is unavailable + var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect")); + IndexResolution indexResolution = IndexResolution.valid(esIndex, Map.of(remote1Alias, failure)); EsqlSession.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias); assertThat(localCluster.getIndexExpression(), equalTo("logs*")); - assertClusterStatusAndHasNullCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); + assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias); assertThat(remote1Cluster.getIndexExpression(), equalTo("*")); - // remote1 is left as RUNNING, since another method (updateExecutionInfoWithUnavailableClusters) not under test changes status + // since remote1 is in the unavailable Map (passed to IndexResolution.valid), it's status will not be changed + // by updateExecutionInfoWithClustersWithNoMatchingIndices (it is handled in updateExecutionInfoWithUnavailableClusters) assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING)); - assertNull(remote1Cluster.getTook()); - assertNull(remote1Cluster.getTotalShards()); EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias); assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1,mylogs2,logs*")); @@ -242,6 +319,25 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { assertThat(remote2Cluster.getSkippedShards(), equalTo(0)); assertThat(remote2Cluster.getFailedShards(), equalTo(0)); } + + // all remotes are missing from EsIndex info. Since one is configured with skip_unavailable=false, + // an exception should be thrown + { + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); + executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*")); + executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); + executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", false)); + + EsIndex esIndex = new EsIndex( + "logs*,remote2:mylogs1,remote2:mylogs2,remote2:logs*", + randomMapping(), + Map.of("logs-a", IndexMode.STANDARD) + ); + + var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect")); + IndexResolution indexResolution = IndexResolution.valid(esIndex, Map.of(remote1Alias, failure)); + EsqlSession.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); + } } public void testUpdateExecutionInfoAtEndOfPlanning() { @@ -288,13 +384,22 @@ public void testUpdateExecutionInfoAtEndOfPlanning() { assertNull(remote2Cluster.getTook()); } - private void assertClusterStatusAndHasNullCounts(EsqlExecutionInfo.Cluster cluster, EsqlExecutionInfo.Cluster.Status status) { + private void assertClusterStatusAndShardCounts(EsqlExecutionInfo.Cluster cluster, EsqlExecutionInfo.Cluster.Status status) { assertThat(cluster.getStatus(), equalTo(status)); assertNull(cluster.getTook()); - assertNull(cluster.getTotalShards()); - assertNull(cluster.getSuccessfulShards()); - assertNull(cluster.getSkippedShards()); - assertNull(cluster.getFailedShards()); + if (status == EsqlExecutionInfo.Cluster.Status.RUNNING) { + assertNull(cluster.getTotalShards()); + assertNull(cluster.getSuccessfulShards()); + assertNull(cluster.getSkippedShards()); + assertNull(cluster.getFailedShards()); + } else if (status == EsqlExecutionInfo.Cluster.Status.SKIPPED) { + assertThat(cluster.getTotalShards(), equalTo(0)); + assertThat(cluster.getSuccessfulShards(), equalTo(0)); + assertThat(cluster.getSkippedShards(), equalTo(0)); + assertThat(cluster.getFailedShards(), equalTo(0)); + } else { + fail("Unexpected status: " + status); + } } private static Map randomMapping() { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/IndexResolverTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/IndexResolverTests.java index 51497b5ca5093..d6e410305afaa 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/IndexResolverTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/IndexResolverTests.java @@ -15,6 +15,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Set; import static org.hamcrest.Matchers.equalTo; @@ -33,8 +34,8 @@ public void testDetermineUnavailableRemoteClusters() { ) ); - Set unavailableClusters = IndexResolver.determineUnavailableRemoteClusters(failures); - assertThat(unavailableClusters, equalTo(Set.of("remote1", "remote2"))); + Map unavailableClusters = IndexResolver.determineUnavailableRemoteClusters(failures); + assertThat(unavailableClusters.keySet(), equalTo(Set.of("remote1", "remote2"))); } // one cluster with "remote unavailable" with two failures @@ -43,8 +44,8 @@ public void testDetermineUnavailableRemoteClusters() { failures.add(new FieldCapabilitiesFailure(new String[] { "remote2:mylogs1" }, new NoSuchRemoteClusterException("remote2"))); failures.add(new FieldCapabilitiesFailure(new String[] { "remote2:mylogs1" }, new NoSeedNodeLeftException("no seed node"))); - Set unavailableClusters = IndexResolver.determineUnavailableRemoteClusters(failures); - assertThat(unavailableClusters, equalTo(Set.of("remote2"))); + Map unavailableClusters = IndexResolver.determineUnavailableRemoteClusters(failures); + assertThat(unavailableClusters.keySet(), equalTo(Set.of("remote2"))); } // two clusters, one "remote unavailable" type exceptions and one with another type @@ -57,23 +58,23 @@ public void testDetermineUnavailableRemoteClusters() { new IllegalStateException("Unable to open any connections") ) ); - Set unavailableClusters = IndexResolver.determineUnavailableRemoteClusters(failures); - assertThat(unavailableClusters, equalTo(Set.of("remote2"))); + Map unavailableClusters = IndexResolver.determineUnavailableRemoteClusters(failures); + assertThat(unavailableClusters.keySet(), equalTo(Set.of("remote2"))); } // one cluster1 with exception not known to indicate "remote unavailable" { List failures = new ArrayList<>(); failures.add(new FieldCapabilitiesFailure(new String[] { "remote1:mylogs1" }, new RuntimeException("foo"))); - Set unavailableClusters = IndexResolver.determineUnavailableRemoteClusters(failures); - assertThat(unavailableClusters, equalTo(Set.of())); + Map unavailableClusters = IndexResolver.determineUnavailableRemoteClusters(failures); + assertThat(unavailableClusters.keySet(), equalTo(Set.of())); } // empty failures list { List failures = new ArrayList<>(); - Set unavailableClusters = IndexResolver.determineUnavailableRemoteClusters(failures); - assertThat(unavailableClusters, equalTo(Set.of())); + Map unavailableClusters = IndexResolver.determineUnavailableRemoteClusters(failures); + assertThat(unavailableClusters.keySet(), equalTo(Set.of())); } } } diff --git a/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityEsqlIT.java b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityEsqlIT.java index 1a236ccb6aa06..d5b3141b539eb 100644 --- a/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityEsqlIT.java +++ b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityEsqlIT.java @@ -34,9 +34,12 @@ import java.util.Base64; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -491,6 +494,10 @@ public void testCrossClusterQueryWithRemoteDLSAndFLS() throws Exception { assertThat(flatList, containsInAnyOrder("engineering")); } + /** + * Note: invalid_remote is "invalid" because it has a bogus API key and the cluster does not exist (cannot be connected to) + */ + @SuppressWarnings("unchecked") public void testCrossClusterQueryAgainstInvalidRemote() throws Exception { configureRemoteCluster(); populateData(); @@ -512,22 +519,53 @@ public void testCrossClusterQueryAgainstInvalidRemote() throws Exception { ); // invalid remote with local index should return local results - var q = "FROM invalid_remote:employees,employees | SORT emp_id DESC | LIMIT 10"; - Response response = performRequestWithRemoteSearchUser(esqlRequest(q)); - assertLocalOnlyResults(response); - - // only calling an invalid remote should error - ResponseException error = expectThrows(ResponseException.class, () -> { - var q2 = "FROM invalid_remote:employees | SORT emp_id DESC | LIMIT 10"; - performRequestWithRemoteSearchUser(esqlRequest(q2)); - }); - - if (skipUnavailable == false) { - assertThat(error.getResponse().getStatusLine().getStatusCode(), equalTo(401)); - assertThat(error.getMessage(), containsString("unable to find apikey")); - } else { - assertThat(error.getResponse().getStatusLine().getStatusCode(), equalTo(500)); - assertThat(error.getMessage(), containsString("Unable to connect to [invalid_remote]")); + { + var q = "FROM invalid_remote:employees,employees | SORT emp_id DESC | LIMIT 10"; + Response response = performRequestWithRemoteSearchUser(esqlRequest(q)); + // TODO: when skip_unavailable=false for invalid_remote, a fatal exception should be thrown + // this does not yet happen because field-caps returns nothing for this cluster, rather + // than an error, so the current code cannot detect that error. Follow on PR will handle this. + assertLocalOnlyResults(response); + } + + { + var q = "FROM invalid_remote:employees | SORT emp_id DESC | LIMIT 10"; + // errors from invalid remote should be ignored if the cluster is marked with skip_unavailable=true + if (skipUnavailable) { + // expected response: + // {"took":1,"columns":[],"values":[],"_clusters":{"total":1,"successful":0,"running":0,"skipped":1,"partial":0, + // "failed":0,"details":{"invalid_remote":{"status":"skipped","indices":"employees","took":1,"_shards": + // {"total":0,"successful":0,"skipped":0,"failed":0},"failures":[{"shard":-1,"index":null,"reason": + // {"type":"remote_transport_exception", + // "reason":"[connect_transport_exception - unable to connect to remote cluster]"}}]}}}} + Response response = performRequestWithRemoteSearchUser(esqlRequest(q)); + assertOK(response); + Map responseAsMap = entityAsMap(response); + List columns = (List) responseAsMap.get("columns"); + List values = (List) responseAsMap.get("values"); + assertThat(columns.size(), equalTo(1)); + Map column1 = (Map) columns.get(0); + assertThat(column1.get("name").toString(), equalTo("")); + assertThat(values.size(), equalTo(0)); + Map clusters = (Map) responseAsMap.get("_clusters"); + Map details = (Map) clusters.get("details"); + Map invalidRemoteEntry = (Map) details.get("invalid_remote"); + assertThat(invalidRemoteEntry.get("status").toString(), equalTo("skipped")); + List failures = (List) invalidRemoteEntry.get("failures"); + assertThat(failures.size(), equalTo(1)); + Map failuresMap = (Map) failures.get(0); + Map reason = (Map) failuresMap.get("reason"); + assertThat(reason.get("type").toString(), equalTo("remote_transport_exception")); + assertThat(reason.get("reason").toString(), containsString("unable to connect to remote cluster")); + + } else { + // errors from invalid remote should throw an exception if the cluster is marked with skip_unavailable=false + ResponseException error = expectThrows(ResponseException.class, () -> { + final Response response1 = performRequestWithRemoteSearchUser(esqlRequest(q)); + }); + assertThat(error.getResponse().getStatusLine().getStatusCode(), equalTo(401)); + assertThat(error.getMessage(), containsString("unable to find apikey")); + } } } @@ -887,7 +925,16 @@ public void testAlias() throws Exception { Request request = esqlRequest("FROM " + index + " | KEEP emp_id | SORT emp_id | LIMIT 100"); ResponseException error = expectThrows(ResponseException.class, () -> performRequestWithRemoteSearchUser(request)); assertThat(error.getResponse().getStatusLine().getStatusCode(), equalTo(400)); - assertThat(error.getMessage(), containsString(" Unknown index [" + index + "]")); + String expectedIndexExpressionInError = index.replace("*", "my_remote_cluster"); + Pattern p = Pattern.compile("Unknown index \\[([^\\]]+)\\]"); + Matcher m = p.matcher(error.getMessage()); + assertTrue("Pattern matcher to parse error message did not find matching string: " + error.getMessage(), m.find()); + String unknownIndexExpressionInErrorMessage = m.group(1); + Set actualUnknownIndexes = org.elasticsearch.common.Strings.commaDelimitedListToSet( + unknownIndexExpressionInErrorMessage + ); + Set expectedUnknownIndexes = org.elasticsearch.common.Strings.commaDelimitedListToSet(expectedIndexExpressionInError); + assertThat(actualUnknownIndexes, equalTo(expectedUnknownIndexes)); } for (var index : List.of( @@ -920,6 +967,7 @@ protected Request esqlRequest(String command) throws IOException { XContentBuilder body = JsonXContent.contentBuilder(); body.startObject(); body.field("query", command); + body.field("include_ccs_metadata", true); if (Build.current().isSnapshot() && randomBoolean()) { Settings.Builder settings = Settings.builder(); if (randomBoolean()) {