Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.17] Address mapping and compute engine runtime field issues (#117792) #118048

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/117792.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 117792
summary: Address mapping and compute engine runtime field issues
area: Mapping
type: bug
issues:
- 117644
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,9 @@ public Query termQuery(Object value, SearchExecutionContext context) {
protected void parseCreateField(DocumentParserContext context) {
// Run-time fields are mapped to this mapper, so it needs to handle storing values for use in synthetic source.
// #parseValue calls this method once the run-time field is created.
if (context.dynamic() == ObjectMapper.Dynamic.RUNTIME && context.canAddIgnoredField()) {
var fieldType = context.mappingLookup().getFieldType(path);
boolean isRuntimeField = fieldType instanceof AbstractScriptFieldType;
if ((context.dynamic() == ObjectMapper.Dynamic.RUNTIME || isRuntimeField) && context.canAddIgnoredField()) {
try {
context.addIgnoredField(
IgnoredSourceFieldMapper.NameValue.fromContext(context, path, context.encodeFlattenedToken())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,14 +493,18 @@ public boolean containsBrokenAnalysis(String field) {
*/
public SearchLookup lookup() {
if (this.lookup == null) {
SourceProvider sourceProvider = isSourceSynthetic()
? SourceProvider.fromSyntheticSource(mappingLookup.getMapping(), mapperMetrics.sourceFieldMetrics())
: SourceProvider.fromStoredFields();
var sourceProvider = createSourceProvider();
setLookupProviders(sourceProvider, LeafFieldLookupProvider.fromStoredFields());
}
return this.lookup;
}

public SourceProvider createSourceProvider() {
return isSourceSynthetic()
? SourceProvider.fromSyntheticSource(mappingLookup.getMapping(), mapperMetrics.sourceFieldMetrics())
: SourceProvider.fromStoredFields();
}

/**
* Replace the standard source provider and field lookup provider on the SearchLookup
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,14 @@ private SearchLookup(SearchLookup searchLookup, Set<String> fieldChain) {
this.fieldLookupProvider = searchLookup.fieldLookupProvider;
}

private SearchLookup(SearchLookup searchLookup, SourceProvider sourceProvider, Set<String> fieldChain) {
this.fieldChain = Collections.unmodifiableSet(fieldChain);
this.sourceProvider = sourceProvider;
this.fieldTypeLookup = searchLookup.fieldTypeLookup;
this.fieldDataLookup = searchLookup.fieldDataLookup;
this.fieldLookupProvider = searchLookup.fieldLookupProvider;
}

/**
* Creates a copy of the current {@link SearchLookup} that looks fields up in the same way, but also tracks field references
* in order to detect cycles and prevent resolving fields that depend on more than {@link #MAX_FIELD_CHAIN_DEPTH} other fields.
Expand Down Expand Up @@ -144,4 +152,8 @@ public IndexFieldData<?> getForField(MappedFieldType fieldType, MappedFieldType.
public Source getSource(LeafReaderContext ctx, int doc) throws IOException {
return sourceProvider.getSource(ctx, doc);
}

public SearchLookup swapSourceProvider(SourceProvider sourceProvider) {
return new SearchLookup(this, sourceProvider, fieldChain);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.client.internal.ClusterAdminClient;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
Expand Down Expand Up @@ -1648,6 +1649,44 @@ public void testMaxTruncationSizeSetting() {
}
}

public void testScriptField() throws Exception {
XContentBuilder mapping = JsonXContent.contentBuilder();
mapping.startObject();
{
mapping.startObject("runtime");
{
mapping.startObject("k1");
mapping.field("type", "long");
mapping.endObject();
mapping.startObject("k2");
mapping.field("type", "long");
mapping.endObject();
}
mapping.endObject();
{
mapping.startObject("properties");
mapping.startObject("meter").field("type", "double").endObject();
mapping.endObject();
}
}
mapping.endObject();
String sourceMode = randomBoolean() ? "stored" : "synthetic";
Settings.Builder settings = indexSettings(1, 0).put(indexSettings()).put("index.mapping.source.mode", sourceMode);
client().admin().indices().prepareCreate("test-script").setMapping(mapping).setSettings(settings).get();
for (int i = 0; i < 10; i++) {
index("test-script", Integer.toString(i), Map.of("k1", i, "k2", "b-" + i, "meter", 10000 * i));
}
refresh("test-script");
try (EsqlQueryResponse resp = run("FROM test-script | SORT k1 | LIMIT 10")) {
List<Object> k1Column = Iterators.toList(resp.column(0));
assertThat(k1Column, contains(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L));
List<Object> k2Column = Iterators.toList(resp.column(1));
assertThat(k2Column, contains(null, null, null, null, null, null, null, null, null, null));
List<Object> meterColumn = Iterators.toList(resp.column(2));
assertThat(meterColumn, contains(0.0, 10000.0, 20000.0, 30000.0, 40000.0, 50000.0, 60000.0, 70000.0, 80000.0, 90000.0));
}
}

private void clearPersistentSettings(Setting<?>... settings) {
Settings.Builder clearedSettings = Settings.builder();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.index.mapper.FieldNamesFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.NestedLookup;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.mapper.SourceLoader;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
Expand Down Expand Up @@ -340,7 +341,16 @@ public MappedFieldType.FieldExtractPreference fieldExtractPreference() {

@Override
public SearchLookup lookup() {
return ctx.lookup();
boolean syntheticSource = SourceFieldMapper.isSynthetic(indexSettings());
var searchLookup = ctx.lookup();
if (syntheticSource) {
// in the context of scripts and when synthetic source is used the search lookup can't always be reused between
// users of SearchLookup. This is only an issue when scripts fallback to _source, but since we can't always
// accurately determine whether a script uses _source, we should do this for all script usages.
// This lookup() method is only invoked for scripts / runtime fields, so it is ok to do here.
searchLookup = searchLookup.swapSourceProvider(ctx.createSourceProvider());
}
return searchLookup;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.lookup.SourceProvider;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
Expand Down Expand Up @@ -83,6 +84,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

import static org.elasticsearch.xpack.esql.plugin.EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME;

Expand Down Expand Up @@ -416,12 +418,17 @@ void runCompute(CancellableTask task, ComputeContext context, PhysicalPlan plan,
List<EsPhysicalOperationProviders.ShardContext> contexts = new ArrayList<>(context.searchContexts.size());
for (int i = 0; i < context.searchContexts.size(); i++) {
SearchContext searchContext = context.searchContexts.get(i);
var searchExecutionContext = new SearchExecutionContext(searchContext.getSearchExecutionContext()) {

@Override
public SourceProvider createSourceProvider() {
final Supplier<SourceProvider> supplier = () -> super.createSourceProvider();
return new ReinitializingSourceProvider(supplier);

}
};
contexts.add(
new EsPhysicalOperationProviders.DefaultShardContext(
i,
searchContext.getSearchExecutionContext(),
searchContext.request().getAliasFilter()
)
new EsPhysicalOperationProviders.DefaultShardContext(i, searchExecutionContext, searchContext.request().getAliasFilter())
);
}
final List<Driver> drivers;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.plugin;

import org.apache.lucene.index.LeafReaderContext;
import org.elasticsearch.search.lookup.Source;
import org.elasticsearch.search.lookup.SourceProvider;

import java.io.IOException;
import java.util.function.Supplier;

/**
* This is a workaround for when compute engine executes concurrently with data partitioning by docid.
*/
final class ReinitializingSourceProvider implements SourceProvider {

private PerThreadSourceProvider perThreadProvider;
private final Supplier<SourceProvider> sourceProviderFactory;

ReinitializingSourceProvider(Supplier<SourceProvider> sourceProviderFactory) {
this.sourceProviderFactory = sourceProviderFactory;
}

@Override
public Source getSource(LeafReaderContext ctx, int doc) throws IOException {
var currentThread = Thread.currentThread();
PerThreadSourceProvider provider = perThreadProvider;
if (provider == null || provider.creatingThread != currentThread) {
provider = new PerThreadSourceProvider(sourceProviderFactory.get(), currentThread);
this.perThreadProvider = provider;
}
return perThreadProvider.source.getSource(ctx, doc);
}

private record PerThreadSourceProvider(SourceProvider source, Thread creatingThread) {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@
import org.elasticsearch.client.Request;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.common.time.FormatNames;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.hamcrest.Matchers;
import org.junit.ClassRule;

import java.io.IOException;
import java.time.Instant;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -108,4 +111,118 @@ public void testLogsdbSourceModeForLogsIndex() throws IOException {
assertNull(settings.get("index.mapping.source.mode"));
}

public void testEsqlRuntimeFields() throws IOException {
String mappings = """
{
"runtime": {
"message_length": {
"type": "long"
},
"log.offset": {
"type": "long"
}
},
"dynamic": false,
"properties": {
"@timestamp": {
"type": "date"
},
"log" : {
"properties": {
"level": {
"type": "keyword"
},
"file": {
"type": "keyword"
}
}
}
}
}
""";
String indexName = "test-foo";
createIndex(indexName, Settings.builder().put("index.mode", "logsdb").build(), mappings);

int numDocs = 500;
var sb = new StringBuilder();
var now = Instant.now();

var expectedMinTimestamp = now;
for (int i = 0; i < numDocs; i++) {
String level = randomBoolean() ? "info" : randomBoolean() ? "warning" : randomBoolean() ? "error" : "fatal";
String msg = randomAlphaOfLength(20);
String path = randomAlphaOfLength(8);
String messageLength = Integer.toString(msg.length());
String offset = Integer.toString(randomNonNegativeInt());
sb.append("{ \"create\": {} }").append('\n');
if (randomBoolean()) {
sb.append(
"""
{"@timestamp":"$now","message":"$msg","message_length":$l,"file":{"level":"$level","offset":5,"file":"$path"}}
""".replace("$now", formatInstant(now))
.replace("$level", level)
.replace("$msg", msg)
.replace("$path", path)
.replace("$l", messageLength)
.replace("$o", offset)
);
} else {
sb.append("""
{"@timestamp": "$now", "message": "$msg", "message_length": $l}
""".replace("$now", formatInstant(now)).replace("$msg", msg).replace("$l", messageLength));
}
sb.append('\n');
if (i != numDocs - 1) {
now = now.plusSeconds(1);
}
}
var expectedMaxTimestamp = now;

var bulkRequest = new Request("POST", "/" + indexName + "/_bulk");
bulkRequest.setJsonEntity(sb.toString());
bulkRequest.addParameter("refresh", "true");
var bulkResponse = client().performRequest(bulkRequest);
var bulkResponseBody = responseAsMap(bulkResponse);
assertThat(bulkResponseBody, Matchers.hasEntry("errors", false));

var forceMergeRequest = new Request("POST", "/" + indexName + "/_forcemerge");
forceMergeRequest.addParameter("max_num_segments", "1");
var forceMergeResponse = client().performRequest(forceMergeRequest);
assertOK(forceMergeResponse);

String query = "FROM test-foo | STATS count(*), min(@timestamp), max(@timestamp), min(message_length), max(message_length)"
+ " ,sum(message_length), avg(message_length), min(log.offset), max(log.offset) | LIMIT 1";
final Request esqlRequest = new Request("POST", "/_query");
esqlRequest.setJsonEntity("""
{
"query": "$query"
}
""".replace("$query", query));
var esqlResponse = client().performRequest(esqlRequest);
assertOK(esqlResponse);
Map<String, Object> esqlResponseBody = responseAsMap(esqlResponse);

List<?> values = (List<?>) esqlResponseBody.get("values");
assertThat(values, Matchers.not(Matchers.empty()));
var count = ((List<?>) values.get(0)).get(0);
assertThat(count, equalTo(numDocs));
logger.warn("VALUES: {}", values);

var minTimestamp = ((List<?>) values.get(0)).get(1);
assertThat(minTimestamp, equalTo(formatInstant(expectedMinTimestamp)));
var maxTimestamp = ((List<?>) values.get(0)).get(2);
assertThat(maxTimestamp, equalTo(formatInstant(expectedMaxTimestamp)));

var minLength = ((List<?>) values.get(0)).get(3);
assertThat(minLength, equalTo(20));
var maxLength = ((List<?>) values.get(0)).get(4);
assertThat(maxLength, equalTo(20));
var sumLength = ((List<?>) values.get(0)).get(5);
assertThat(sumLength, equalTo(20 * numDocs));
}

static String formatInstant(Instant instant) {
return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant);
}

}