Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor CCS handling code from EsqlSession and IndexResolver into dedicated util class #115976

Closed
wants to merge 5 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
More refactorings.
quux00 committed Oct 30, 2024
commit 56ea833cf659fe58008516ad6db272519732748a
Original file line number Diff line number Diff line change
@@ -10,13 +10,11 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.compute.operator.DriverProfile;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.TimeValue;
@@ -75,7 +73,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -150,22 +147,24 @@ public void execute(
analyzedPlan(
parse(request.query(), request.params()),
executionInfo,
new LogicalPlanActionListener(request, executionInfo, runPhase, listener)
new CcsAwarePreAnalysisActionListener(request, executionInfo, runPhase, listener)
);
}

/**
* ActionListener that receives LogicalPlan or error from logical planning.
* ActionListener that receives LogicalPlan or error from preAnalysis.
* Any Exception sent to onFailure stops processing, but not all are fatal (return a 4xx or 5xx), so
* the onFailure handler determines whether to return an empty successful result or a 4xx/5xx error.
* the onFailure handler determines whether to return an empty successful result or a 4xx/5xx error
* based on the skip_unavailable status of the cluster the error came from. (The local cluster
* is always treated like skip_unavailable=false.)
*/
class LogicalPlanActionListener implements ActionListener<LogicalPlan> {
class CcsAwarePreAnalysisActionListener implements ActionListener<LogicalPlan> {
private final EsqlQueryRequest request;
private final EsqlExecutionInfo executionInfo;
private final BiConsumer<PhysicalPlan, ActionListener<Result>> runPhase;
private final ActionListener<Result> listener;

LogicalPlanActionListener(
CcsAwarePreAnalysisActionListener(
EsqlQueryRequest request,
EsqlExecutionInfo executionInfo,
BiConsumer<PhysicalPlan, ActionListener<Result>> runPhase,
@@ -344,15 +343,15 @@ private <T> void preAnalyze(
.collect(Collectors.toSet());
Map<String, Exception> unavailableClusters = enrichResolution.getUnavailableClusters();
preAnalyzeIndices(parsed, executionInfo, unavailableClusters, l.delegateFailureAndWrap((ll, indexResolution) -> {
// TODO in follow-PR (for skip_unavailble handling of missing concrete indexes) add some tests for invalid index
// TODO in follow-PR (for skip_unavailable handling of missing indexes) add some tests for invalid index
// resolution to updateExecutionInfo
if (indexResolution.isValid()) {
EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.getUnavailableClusters());
if (executionInfo.isCrossClusterSearch()
&& executionInfo.getClusterStateCount(EsqlExecutionInfo.Cluster.Status.RUNNING) == 0) {
// for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel
// Exception to let the LogicalPlanActionListener decide how to proceed
// Exception to let the CcsAwarePreAnalysisActionListener decide how to proceed
ll.onFailure(new NoClustersToSearchException());
return;
}
@@ -426,10 +425,10 @@ private void preAnalyzeIndices(
if (indexExpressionToResolve.isEmpty()) {
// if this was a pure remote CCS request (no local indices) and all remotes are offline, return an empty IndexResolution
listener.onResponse(IndexResolution.valid(new EsIndex(table.index(), Map.of(), Map.of())));
} else {
// call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types
indexResolver.resolveAsMergedMapping(indexExpressionToResolve, fieldNames, listener);
return;
}
// call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types
indexResolver.resolveAsMergedMapping(indexExpressionToResolve, fieldNames, listener);
} else {
try {
// occurs when dealing with local relations (row a = 1)
@@ -440,30 +439,6 @@ private void preAnalyzeIndices(
}
}

// visible for testing
static String createIndexExpressionFromAvailableClusters(EsqlExecutionInfo executionInfo) {
StringBuilder sb = new StringBuilder();
for (String clusterAlias : executionInfo.clusterAliases()) {
EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(clusterAlias);
if (cluster.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) {
if (cluster.getClusterAlias().equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) {
sb.append(executionInfo.getCluster(clusterAlias).getIndexExpression()).append(',');
} else {
String indexExpression = executionInfo.getCluster(clusterAlias).getIndexExpression();
for (String index : indexExpression.split(",")) {
sb.append(clusterAlias).append(':').append(index).append(',');
}
}
}
}

if (sb.length() > 0) {
return sb.substring(0, sb.length() - 1);
} else {
return "";
}
}

static Set<String> fieldNames(LogicalPlan parsed, Set<String> enrichPolicyMatchFields) {
if (false == parsed.anyMatch(plan -> plan instanceof Aggregate || plan instanceof Project)) {
// no explicit columns selection, for example "from employees"
@@ -607,86 +582,4 @@ public PhysicalPlan optimizedPhysicalPlan(LogicalPlan optimizedPlan) {
LOGGER.debug("Optimized physical plan:\n{}", plan);
return plan;
}

static void updateExecutionInfoWithUnavailableClusters(EsqlExecutionInfo execInfo, Map<String, FieldCapabilitiesFailure> unavailable) {
for (Map.Entry<String, FieldCapabilitiesFailure> entry : unavailable.entrySet()) {
String clusterAlias = entry.getKey();
boolean skipUnavailable = execInfo.getCluster(clusterAlias).isSkipUnavailable();
RemoteTransportException e = new RemoteTransportException(
Strings.format("Remote cluster [%s] (with setting skip_unavailable=%s) is not available", clusterAlias, skipUnavailable),
entry.getValue().getException()
);
if (skipUnavailable) {
execInfo.swapCluster(
clusterAlias,
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SKIPPED)
.setTotalShards(0)
.setSuccessfulShards(0)
.setSkippedShards(0)
.setFailedShards(0)
.setFailures(List.of(new ShardSearchFailure(e)))
.build()
);
} else {
throw e;
}
}
}

// visible for testing
static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionInfo executionInfo, IndexResolution indexResolution) {
Set<String> clustersWithResolvedIndices = new HashSet<>();
// determine missing clusters
for (String indexName : indexResolution.get().indexNameWithModes().keySet()) {
clustersWithResolvedIndices.add(RemoteClusterAware.parseClusterAlias(indexName));
}
Set<String> clustersRequested = executionInfo.clusterAliases();
Set<String> clustersWithNoMatchingIndices = Sets.difference(clustersRequested, clustersWithResolvedIndices);
clustersWithNoMatchingIndices.removeAll(indexResolution.getUnavailableClusters().keySet());
/*
* These are clusters in the original request that are not present in the field-caps response. They were
* specified with an index or indices that do not exist, so the search on that cluster is done.
* Mark it as SKIPPED with 0 shards searched and took=0.
*/
for (String c : clustersWithNoMatchingIndices) {
// TODO: in a follow-on PR, throw a Verification(400 status code) for local and remotes with skip_unavailable=false if
// they were requested with one or more concrete indices
// for now we never mark the local cluster as SKIPPED
final var status = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(c)
? EsqlExecutionInfo.Cluster.Status.SUCCESSFUL
: EsqlExecutionInfo.Cluster.Status.SKIPPED;
executionInfo.swapCluster(
c,
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(status)
.setTook(new TimeValue(0))
.setTotalShards(0)
.setSuccessfulShards(0)
.setSkippedShards(0)
.setFailedShards(0)
.build()
);
}
}

// visible for testing
static void updateExecutionInfoAtEndOfPlanning(EsqlExecutionInfo execInfo) {
// TODO: this logic assumes a single phase execution model, so it may need to altered once INLINESTATS is made CCS compatible
if (execInfo.isCrossClusterSearch()) {
execInfo.markEndPlanning();
for (String clusterAlias : execInfo.clusterAliases()) {
EsqlExecutionInfo.Cluster cluster = execInfo.getCluster(clusterAlias);
if (cluster.getStatus() == EsqlExecutionInfo.Cluster.Status.SKIPPED) {
execInfo.swapCluster(
clusterAlias,
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTook(execInfo.planningTookTime())
.setTotalShards(0)
.setSuccessfulShards(0)
.setSkippedShards(0)
.setFailedShards(0)
.build()
);
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -51,29 +51,6 @@ static String createIndexExpressionFromAvailableClusters(EsqlExecutionInfo execu
}
}

// static String createIndexExpressionFromAvailableClusters(EsqlExecutionInfo executionInfo) {
// StringBuilder sb = new StringBuilder();
// for (String clusterAlias : executionInfo.clusterAliases()) {
// EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(clusterAlias);
// if (cluster.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) {
// if (cluster.getClusterAlias().equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) {
// sb.append(executionInfo.getCluster(clusterAlias).getIndexExpression()).append(',');
// } else {
// String indexExpression = executionInfo.getCluster(clusterAlias).getIndexExpression();
// for (String index : indexExpression.split(",'")) {
// sb.append(clusterAlias).append(':').append(index).append(',');
// }
// }
// }
// }
//
// if (sb.length() > 0) {
// return sb.substring(0, sb.length() - 1);
// } else {
// return "";
// }
// }

static void updateExecutionInfoWithUnavailableClusters(EsqlExecutionInfo execInfo, Map<String, FieldCapabilitiesFailure> unavailable) {
for (Map.Entry<String, FieldCapabilitiesFailure> entry : unavailable.entrySet()) {
String clusterAlias = entry.getKey();
Original file line number Diff line number Diff line change
@@ -6,7 +6,6 @@
*/
package org.elasticsearch.xpack.esql.session;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesIndexResponse;
@@ -20,7 +19,6 @@
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.mapper.TimeSeriesParams;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.xpack.esql.action.EsqlResolveFieldsAction;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.core.type.DateEsField;
@@ -159,25 +157,12 @@ public IndexResolution mergedMappings(String indexPattern, FieldCapabilitiesResp
for (FieldCapabilitiesIndexResponse ir : fieldCapsResponse.getIndexResponses()) {
concreteIndices.put(ir.getIndexName(), ir.getIndexMode());
}
Map<String, FieldCapabilitiesFailure> unavailableRemotes = determineUnavailableRemoteClusters(fieldCapsResponse.getFailures());
Map<String, FieldCapabilitiesFailure> unavailableRemotes = EsqlSessionCCSUtils.determineUnavailableRemoteClusters(
fieldCapsResponse.getFailures()
);
return IndexResolution.valid(new EsIndex(indexPattern, rootFields, concreteIndices), unavailableRemotes);
}

// visible for testing
static Map<String, FieldCapabilitiesFailure> determineUnavailableRemoteClusters(List<FieldCapabilitiesFailure> failures) {
Map<String, FieldCapabilitiesFailure> unavailableRemotes = new HashMap<>();
for (FieldCapabilitiesFailure failure : failures) {
if (ExceptionsHelper.isRemoteUnavailableException(failure.getException())) {
for (String indexExpression : failure.getIndices()) {
if (indexExpression.indexOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR) > 0) {
unavailableRemotes.put(RemoteClusterAware.parseClusterAlias(indexExpression), failure);
}
}
}
}
return unavailableRemotes;
}

private boolean allNested(List<IndexFieldCapabilities> caps) {
for (IndexFieldCapabilities cap : caps) {
if (false == cap.type().equalsIgnoreCase("nested")) {

This file was deleted.

This file was deleted.