Skip to content

Commit

Permalink
Have the FTFMapperPreprocessor update the plan in-place
Browse files Browse the repository at this point in the history
bpintea committed Jan 27, 2025
1 parent 8fbd0fc commit 62a5862
Showing 9 changed files with 159 additions and 349 deletions.
Original file line number Diff line number Diff line change
@@ -26,6 +26,19 @@ public void set(T value) {
this.value = value;
}

/**
* Sets a value in the holder, but only if none has already been set.
* @param value the new value to set.
* @return the previously held value, if any was set.
*/
public T trySet(T value) {
T old = this.value;
if (old == null) {
this.value = value;
}
return old;
}

public T get() {
return value;
}
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@
import org.elasticsearch.xpack.esql.core.querydsl.query.Query;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.LucenePushdownPredicates;
import org.elasticsearch.xpack.esql.planner.TranslatorHandler;
import org.elasticsearch.xpack.esql.planner.mapper.preprocessor.MappingPreProcessor;

/**
* Expressions implementing this interface can get called on data nodes to provide an Elasticsearch/Lucene query.
@@ -42,4 +43,8 @@ interface SingleValueTranslationAware extends TranslationAware {
*/
Expression singleValueField();
}

interface QueryRewriter extends TranslationAware {
MappingPreProcessor queryRewriter();
}
}
Original file line number Diff line number Diff line change
@@ -32,6 +32,7 @@
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.OrderBy;
import org.elasticsearch.xpack.esql.planner.TranslatorHandler;
import org.elasticsearch.xpack.esql.planner.mapper.preprocessor.MappingPreProcessor;
import org.elasticsearch.xpack.esql.querydsl.query.TranslationAwareExpressionQuery;

import java.util.List;
@@ -50,7 +51,11 @@
* These functions needs to be pushed down to Lucene queries to be executed - there's no Evaluator for them, but depend on
* {@link org.elasticsearch.xpack.esql.optimizer.LocalPhysicalPlanOptimizer} to rewrite them into Lucene queries.
*/
public abstract class FullTextFunction extends Function implements TranslationAware, PostAnalysisPlanVerificationAware {
public abstract class FullTextFunction extends Function
implements
TranslationAware,
TranslationAware.QueryRewriter,
PostAnalysisPlanVerificationAware {

private final Expression query;
private final QueryBuilder queryBuilder;
@@ -116,6 +121,11 @@ public Object queryAsObject() {
return BytesRefs.toString(queryAsObject);
}

@Override
public MappingPreProcessor queryRewriter() {
return FullTextFunctionMapperPreprocessor.INSTANCE;
}

/**
* Returns the param ordinal for the query parameter so it can be used in error messages
*
@@ -279,26 +289,6 @@ private static boolean onlyFullTextFunctionsInExpression(Expression expression)
return false;
}

/**
* Checks whether an expression contains a full text function as part of it
*
* @param expression expression to check
* @return true if the expression or any of its children is a full text function, false otherwise
*/
private static boolean anyFullTextFunctionsInExpression(Expression expression) {
if (expression instanceof FullTextFunction) {
return true;
}

for (Expression child : expression.children()) {
if (anyFullTextFunctionsInExpression(child)) {
return true;
}
}

return false;
}

/**
* Checks all commands that exist before a specific type satisfy conditions.
*
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.expression.function.fulltext;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ResolvedIndices;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.xpack.esql.core.util.Holder;
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.planner.TranslatorHandler;
import org.elasticsearch.xpack.esql.planner.mapper.preprocessor.MappingPreProcessor;
import org.elasticsearch.xpack.esql.plugin.TransportActionServices;
import org.elasticsearch.xpack.esql.session.IndexResolver;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;

/**
* Some {@link FullTextFunction} implementations such as {@link org.elasticsearch.xpack.esql.expression.function.fulltext.Match}
* will be translated to a {@link QueryBuilder} that require a rewrite phase on the coordinator.
* {@link FullTextFunctionMapperPreprocessor#preprocess(LogicalPlan, TransportActionServices, ActionListener)} will rewrite the plan by
* replacing {@link FullTextFunction} expression with new ones that hold rewritten {@link QueryBuilder}s.
*/
public final class FullTextFunctionMapperPreprocessor implements MappingPreProcessor {

public static final FullTextFunctionMapperPreprocessor INSTANCE = new FullTextFunctionMapperPreprocessor();

@Override
public void preprocess(LogicalPlan plan, TransportActionServices services, ActionListener<LogicalPlan> listener) {
Rewriteable.rewriteAndFetch(
new FullTextFunctionsRewritable(plan),
queryRewriteContext(services, indexNames(plan)),
listener.delegateFailureAndWrap((l, r) -> l.onResponse(r.plan))
);
}

private static QueryRewriteContext queryRewriteContext(TransportActionServices services, Set<String> indexNames) {
ResolvedIndices resolvedIndices = ResolvedIndices.resolveWithIndexNamesAndOptions(
indexNames.toArray(String[]::new),
IndexResolver.FIELD_CAPS_INDICES_OPTIONS,
services.clusterService().state(),
services.indexNameExpressionResolver(),
services.transportService().getRemoteClusterService(),
System.currentTimeMillis()
);

return services.searchService().getRewriteContext(System::currentTimeMillis, resolvedIndices, null);
}

public Set<String> indexNames(LogicalPlan plan) {
Set<String> indexNames = new HashSet<>();
plan.forEachDown(EsRelation.class, esRelation -> indexNames.addAll(esRelation.index().concreteIndices()));
return indexNames;
}

private record FullTextFunctionsRewritable(LogicalPlan plan)
implements
Rewriteable<FullTextFunctionMapperPreprocessor.FullTextFunctionsRewritable> {
@Override
public FullTextFunctionsRewritable rewrite(QueryRewriteContext ctx) throws IOException {
Holder<IOException> exceptionHolder = new Holder<>();
Holder<Boolean> updated = new Holder<>(false);
LogicalPlan newPlan = plan.transformExpressionsDown(FullTextFunction.class, f -> {
QueryBuilder builder = f.queryBuilder(), initial = builder;
builder = builder == null ? f.asQuery(TranslatorHandler.TRANSLATOR_HANDLER).asBuilder() : builder;
try {
builder = builder.rewrite(ctx);
} catch (IOException e) {
exceptionHolder.trySet(e);
}
var rewrite = builder != initial;
updated.set(updated.get() || rewrite);
return rewrite ? f.replaceQueryBuilder(builder) : f;
});
if (exceptionHolder.get() != null) {
throw exceptionHolder.get();
}
return updated.get() ? new FullTextFunctionsRewritable(newPlan) : this;
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -8,41 +8,42 @@
package org.elasticsearch.xpack.esql.planner.mapper.preprocessor;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.esql.capabilities.TranslationAware;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plugin.TransportActionServices;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class MapperPreprocessorExecutor {

private final TransportActionServices services;
private final List<MappingPreProcessor> proprocessors = new ArrayList<>();

public MapperPreprocessorExecutor(TransportActionServices services) {
this.services = services;
}

public MapperPreprocessorExecutor addPreprocessor(MappingPreProcessor preProcessor) {
proprocessors.add(preProcessor);
return this;
}

public MapperPreprocessorExecutor addPreprocessors(Collection<MappingPreProcessor> preProcessors) {
proprocessors.addAll(preProcessors);
return this;
public void execute(LogicalPlan plan, ActionListener<LogicalPlan> listener) {
execute(plan, queryRewriters(plan), 0, listener);
}

public void execute(LogicalPlan plan, ActionListener<LogicalPlan> listener) {
execute(plan, 0, listener);
private static List<MappingPreProcessor> queryRewriters(LogicalPlan plan) {
Set<MappingPreProcessor> queryRewriters = new HashSet<>();
plan.forEachExpressionDown(e -> {
if (e instanceof TranslationAware.QueryRewriter qr) {
queryRewriters.add(qr.queryRewriter());
}
});
return List.copyOf(queryRewriters);
}

private void execute(LogicalPlan plan, int index, ActionListener<LogicalPlan> listener) {
if (index == proprocessors.size()) {
private void execute(LogicalPlan plan, List<MappingPreProcessor> preprocessors, int index, ActionListener<LogicalPlan> listener) {
if (index == preprocessors.size()) {
listener.onResponse(plan);
} else {
proprocessors.get(index).preprocess(plan, services, listener.delegateFailureAndWrap((l, p) -> execute(p, index + 1, l)));
preprocessors.get(index)
.preprocess(plan, services, listener.delegateFailureAndWrap((l, p) -> execute(p, preprocessors, index + 1, l)));
}
}
}
Original file line number Diff line number Diff line change
@@ -73,7 +73,6 @@
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.planner.mapper.Mapper;
import org.elasticsearch.xpack.esql.planner.mapper.preprocessor.FullTextFunctionMapperPreprocessor;
import org.elasticsearch.xpack.esql.planner.mapper.preprocessor.MapperPreprocessorExecutor;
import org.elasticsearch.xpack.esql.plugin.TransportActionServices;
import org.elasticsearch.xpack.esql.stats.PlanningMetrics;
@@ -145,9 +144,7 @@ public EsqlSession(
this.physicalPlanOptimizer = new PhysicalPlanOptimizer(new PhysicalOptimizerContext(configuration));
this.planningMetrics = planningMetrics;
this.indicesExpressionGrouper = indicesExpressionGrouper;
this.mapperPreprocessorExecutor = new MapperPreprocessorExecutor(services).addPreprocessor(
new FullTextFunctionMapperPreprocessor()
);
this.mapperPreprocessorExecutor = new MapperPreprocessorExecutor(services);
}

public String sessionId() {
@@ -167,12 +164,25 @@ public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, P
new EsqlSessionCCSUtils.CssPartialErrorsActionListener(executionInfo, listener) {
@Override
public void onResponse(LogicalPlan analyzedPlan) {
executeOptimizedPlan(request, executionInfo, planRunner, optimizedPlan(analyzedPlan), listener);
preMapping(request, executionInfo, planRunner, optimizedPlan(analyzedPlan), listener);
}
}
);
}

public void preMapping(
EsqlQueryRequest request,
EsqlExecutionInfo executionInfo,
PlanRunner planRunner,
LogicalPlan optimizedPlan,
ActionListener<Result> listener
) {
mapperPreprocessorExecutor.execute(optimizedPlan, listener.delegateFailureAndWrap((l, p) -> {
p.setOptimized(); // might have been updated by the preprocessor
executeOptimizedPlan(request, executionInfo, planRunner, p, listener);
}));
}

/**
* Execute an analyzed plan. Most code should prefer calling {@link #execute} but
* this is public for testing.
@@ -184,13 +194,11 @@ public void executeOptimizedPlan(
LogicalPlan optimizedPlan,
ActionListener<Result> listener
) {
mapperPreprocessorExecutor.execute(optimizedPlan, listener.delegateFailureAndWrap((l, p) -> {
PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(p, request);
// TODO: this could be snuck into the underlying listener
EsqlSessionCCSUtils.updateExecutionInfoAtEndOfPlanning(executionInfo);
// execute any potential subplans
executeSubPlans(physicalPlan, planRunner, executionInfo, request, l);
}));
PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan, request);
// TODO: this could be snuck into the underlying listener
EsqlSessionCCSUtils.updateExecutionInfoAtEndOfPlanning(executionInfo);
// execute any potential subplans
executeSubPlans(physicalPlan, planRunner, executionInfo, request, listener);
}

private record PlanTuple(PhysicalPlan physical, LogicalPlan logical) {}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -332,11 +332,12 @@ private static String getInferenceIdForForField(Collection<IndexMetadata> indexM
protected boolean doEquals(SemanticQueryBuilder other) {
return Objects.equals(fieldName, other.fieldName)
&& Objects.equals(query, other.query)
&& Objects.equals(inferenceResults, other.inferenceResults);
&& Objects.equals(inferenceResults, other.inferenceResults)
&& Objects.equals(inferenceResultsSupplier, other.inferenceResultsSupplier);
}

@Override
protected int doHashCode() {
return Objects.hash(fieldName, query, inferenceResults);
return Objects.hash(fieldName, query, inferenceResults, inferenceResultsSupplier);
}
}

0 comments on commit 62a5862

Please sign in to comment.