diff --git a/docs/changelog/117792.yaml b/docs/changelog/117792.yaml new file mode 100644 index 0000000000000..2d7ddda1ace40 --- /dev/null +++ b/docs/changelog/117792.yaml @@ -0,0 +1,6 @@ +pr: 117792 +summary: Address mapping and compute engine runtime field issues +area: Mapping +type: bug +issues: + - 117644 diff --git a/server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java b/server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java index c9e035f9d3f20..fe2c4dc7f2c5d 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java @@ -946,7 +946,9 @@ public Query termQuery(Object value, SearchExecutionContext context) { protected void parseCreateField(DocumentParserContext context) { // Run-time fields are mapped to this mapper, so it needs to handle storing values for use in synthetic source. // #parseValue calls this method once the run-time field is created. - if (context.dynamic() == ObjectMapper.Dynamic.RUNTIME && context.canAddIgnoredField()) { + var fieldType = context.mappingLookup().getFieldType(path); + boolean isRuntimeField = fieldType instanceof AbstractScriptFieldType; + if ((context.dynamic() == ObjectMapper.Dynamic.RUNTIME || isRuntimeField) && context.canAddIgnoredField()) { try { context.addIgnoredField( IgnoredSourceFieldMapper.NameValue.fromContext(context, path, context.encodeFlattenedToken()) diff --git a/server/src/main/java/org/elasticsearch/index/query/SearchExecutionContext.java b/server/src/main/java/org/elasticsearch/index/query/SearchExecutionContext.java index b07112440d3c2..d5e48a6a54daa 100644 --- a/server/src/main/java/org/elasticsearch/index/query/SearchExecutionContext.java +++ b/server/src/main/java/org/elasticsearch/index/query/SearchExecutionContext.java @@ -493,14 +493,18 @@ public boolean containsBrokenAnalysis(String field) { */ public SearchLookup lookup() { if (this.lookup == null) { - SourceProvider sourceProvider = isSourceSynthetic() - ? SourceProvider.fromSyntheticSource(mappingLookup.getMapping(), mapperMetrics.sourceFieldMetrics()) - : SourceProvider.fromStoredFields(); + var sourceProvider = createSourceProvider(); setLookupProviders(sourceProvider, LeafFieldLookupProvider.fromStoredFields()); } return this.lookup; } + public SourceProvider createSourceProvider() { + return isSourceSynthetic() + ? SourceProvider.fromSyntheticSource(mappingLookup.getMapping(), mapperMetrics.sourceFieldMetrics()) + : SourceProvider.fromStoredFields(); + } + /** * Replace the standard source provider and field lookup provider on the SearchLookup * diff --git a/server/src/main/java/org/elasticsearch/search/lookup/SearchLookup.java b/server/src/main/java/org/elasticsearch/search/lookup/SearchLookup.java index f7f8cee30ee15..9eb0170af5efb 100644 --- a/server/src/main/java/org/elasticsearch/search/lookup/SearchLookup.java +++ b/server/src/main/java/org/elasticsearch/search/lookup/SearchLookup.java @@ -102,6 +102,14 @@ private SearchLookup(SearchLookup searchLookup, Set fieldChain) { this.fieldLookupProvider = searchLookup.fieldLookupProvider; } + private SearchLookup(SearchLookup searchLookup, SourceProvider sourceProvider, Set fieldChain) { + this.fieldChain = Collections.unmodifiableSet(fieldChain); + this.sourceProvider = sourceProvider; + this.fieldTypeLookup = searchLookup.fieldTypeLookup; + this.fieldDataLookup = searchLookup.fieldDataLookup; + this.fieldLookupProvider = searchLookup.fieldLookupProvider; + } + /** * Creates a copy of the current {@link SearchLookup} that looks fields up in the same way, but also tracks field references * in order to detect cycles and prevent resolving fields that depend on more than {@link #MAX_FIELD_CHAIN_DEPTH} other fields. @@ -144,4 +152,8 @@ public IndexFieldData getForField(MappedFieldType fieldType, MappedFieldType. public Source getSource(LeafReaderContext ctx, int doc) throws IOException { return sourceProvider.getSource(ctx, doc); } + + public SearchLookup swapSourceProvider(SourceProvider sourceProvider) { + return new SearchLookup(this, sourceProvider, fieldChain); + } } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java index 147b13b36c44b..00f53d31165b1 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java @@ -18,6 +18,7 @@ import org.elasticsearch.client.internal.ClusterAdminClient; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; @@ -1648,6 +1649,44 @@ public void testMaxTruncationSizeSetting() { } } + public void testScriptField() throws Exception { + XContentBuilder mapping = JsonXContent.contentBuilder(); + mapping.startObject(); + { + mapping.startObject("runtime"); + { + mapping.startObject("k1"); + mapping.field("type", "long"); + mapping.endObject(); + mapping.startObject("k2"); + mapping.field("type", "long"); + mapping.endObject(); + } + mapping.endObject(); + { + mapping.startObject("properties"); + mapping.startObject("meter").field("type", "double").endObject(); + mapping.endObject(); + } + } + mapping.endObject(); + String sourceMode = randomBoolean() ? "stored" : "synthetic"; + Settings.Builder settings = indexSettings(1, 0).put(indexSettings()).put("index.mapping.source.mode", sourceMode); + client().admin().indices().prepareCreate("test-script").setMapping(mapping).setSettings(settings).get(); + for (int i = 0; i < 10; i++) { + index("test-script", Integer.toString(i), Map.of("k1", i, "k2", "b-" + i, "meter", 10000 * i)); + } + refresh("test-script"); + try (EsqlQueryResponse resp = run("FROM test-script | SORT k1 | LIMIT 10")) { + List k1Column = Iterators.toList(resp.column(0)); + assertThat(k1Column, contains(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L)); + List k2Column = Iterators.toList(resp.column(1)); + assertThat(k2Column, contains(null, null, null, null, null, null, null, null, null, null)); + List meterColumn = Iterators.toList(resp.column(2)); + assertThat(meterColumn, contains(0.0, 10000.0, 20000.0, 30000.0, 40000.0, 50000.0, 60000.0, 70000.0, 80000.0, 90000.0)); + } + } + private void clearPersistentSettings(Setting... settings) { Settings.Builder clearedSettings = Settings.builder(); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java index ab0d68b152262..6afa30ca7feb0 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java @@ -38,6 +38,7 @@ import org.elasticsearch.index.mapper.FieldNamesFieldMapper; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.NestedLookup; +import org.elasticsearch.index.mapper.SourceFieldMapper; import org.elasticsearch.index.mapper.SourceLoader; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; @@ -340,7 +341,16 @@ public MappedFieldType.FieldExtractPreference fieldExtractPreference() { @Override public SearchLookup lookup() { - return ctx.lookup(); + boolean syntheticSource = SourceFieldMapper.isSynthetic(indexSettings()); + var searchLookup = ctx.lookup(); + if (syntheticSource) { + // in the context of scripts and when synthetic source is used the search lookup can't always be reused between + // users of SearchLookup. This is only an issue when scripts fallback to _source, but since we can't always + // accurately determine whether a script uses _source, we should do this for all script usages. + // This lookup() method is only invoked for scripts / runtime fields, so it is ok to do here. + searchLookup = searchLookup.swapSourceProvider(ctx.createSourceProvider()); + } + return searchLookup; } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index eeed811674f60..5d58d4a0f2c3a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -45,6 +45,7 @@ import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.ShardSearchRequest; +import org.elasticsearch.search.lookup.SourceProvider; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskCancelledException; @@ -83,6 +84,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; import static org.elasticsearch.xpack.esql.plugin.EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME; @@ -416,12 +418,17 @@ void runCompute(CancellableTask task, ComputeContext context, PhysicalPlan plan, List contexts = new ArrayList<>(context.searchContexts.size()); for (int i = 0; i < context.searchContexts.size(); i++) { SearchContext searchContext = context.searchContexts.get(i); + var searchExecutionContext = new SearchExecutionContext(searchContext.getSearchExecutionContext()) { + + @Override + public SourceProvider createSourceProvider() { + final Supplier supplier = () -> super.createSourceProvider(); + return new ReinitializingSourceProvider(supplier); + + } + }; contexts.add( - new EsPhysicalOperationProviders.DefaultShardContext( - i, - searchContext.getSearchExecutionContext(), - searchContext.request().getAliasFilter() - ) + new EsPhysicalOperationProviders.DefaultShardContext(i, searchExecutionContext, searchContext.request().getAliasFilter()) ); } final List drivers; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ReinitializingSourceProvider.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ReinitializingSourceProvider.java new file mode 100644 index 0000000000000..b6b2c6dfec755 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ReinitializingSourceProvider.java @@ -0,0 +1,43 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.plugin; + +import org.apache.lucene.index.LeafReaderContext; +import org.elasticsearch.search.lookup.Source; +import org.elasticsearch.search.lookup.SourceProvider; + +import java.io.IOException; +import java.util.function.Supplier; + +/** + * This is a workaround for when compute engine executes concurrently with data partitioning by docid. + */ +final class ReinitializingSourceProvider implements SourceProvider { + + private PerThreadSourceProvider perThreadProvider; + private final Supplier sourceProviderFactory; + + ReinitializingSourceProvider(Supplier sourceProviderFactory) { + this.sourceProviderFactory = sourceProviderFactory; + } + + @Override + public Source getSource(LeafReaderContext ctx, int doc) throws IOException { + var currentThread = Thread.currentThread(); + PerThreadSourceProvider provider = perThreadProvider; + if (provider == null || provider.creatingThread != currentThread) { + provider = new PerThreadSourceProvider(sourceProviderFactory.get(), currentThread); + this.perThreadProvider = provider; + } + return perThreadProvider.source.getSource(ctx, doc); + } + + private record PerThreadSourceProvider(SourceProvider source, Thread creatingThread) { + + } +} diff --git a/x-pack/plugin/logsdb/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/LogsdbRestIT.java b/x-pack/plugin/logsdb/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/LogsdbRestIT.java index 2bf8b00cf551c..d42c1aa240a64 100644 --- a/x-pack/plugin/logsdb/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/LogsdbRestIT.java +++ b/x-pack/plugin/logsdb/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/LogsdbRestIT.java @@ -10,6 +10,8 @@ import org.elasticsearch.client.Request; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.time.DateFormatter; +import org.elasticsearch.common.time.FormatNames; import org.elasticsearch.test.cluster.ElasticsearchCluster; import org.elasticsearch.test.cluster.local.distribution.DistributionType; import org.elasticsearch.test.rest.ESRestTestCase; @@ -17,6 +19,7 @@ import org.junit.ClassRule; import java.io.IOException; +import java.time.Instant; import java.util.List; import java.util.Map; @@ -108,4 +111,118 @@ public void testLogsdbSourceModeForLogsIndex() throws IOException { assertNull(settings.get("index.mapping.source.mode")); } + public void testEsqlRuntimeFields() throws IOException { + String mappings = """ + { + "runtime": { + "message_length": { + "type": "long" + }, + "log.offset": { + "type": "long" + } + }, + "dynamic": false, + "properties": { + "@timestamp": { + "type": "date" + }, + "log" : { + "properties": { + "level": { + "type": "keyword" + }, + "file": { + "type": "keyword" + } + } + } + } + } + """; + String indexName = "test-foo"; + createIndex(indexName, Settings.builder().put("index.mode", "logsdb").build(), mappings); + + int numDocs = 500; + var sb = new StringBuilder(); + var now = Instant.now(); + + var expectedMinTimestamp = now; + for (int i = 0; i < numDocs; i++) { + String level = randomBoolean() ? "info" : randomBoolean() ? "warning" : randomBoolean() ? "error" : "fatal"; + String msg = randomAlphaOfLength(20); + String path = randomAlphaOfLength(8); + String messageLength = Integer.toString(msg.length()); + String offset = Integer.toString(randomNonNegativeInt()); + sb.append("{ \"create\": {} }").append('\n'); + if (randomBoolean()) { + sb.append( + """ + {"@timestamp":"$now","message":"$msg","message_length":$l,"file":{"level":"$level","offset":5,"file":"$path"}} + """.replace("$now", formatInstant(now)) + .replace("$level", level) + .replace("$msg", msg) + .replace("$path", path) + .replace("$l", messageLength) + .replace("$o", offset) + ); + } else { + sb.append(""" + {"@timestamp": "$now", "message": "$msg", "message_length": $l} + """.replace("$now", formatInstant(now)).replace("$msg", msg).replace("$l", messageLength)); + } + sb.append('\n'); + if (i != numDocs - 1) { + now = now.plusSeconds(1); + } + } + var expectedMaxTimestamp = now; + + var bulkRequest = new Request("POST", "/" + indexName + "/_bulk"); + bulkRequest.setJsonEntity(sb.toString()); + bulkRequest.addParameter("refresh", "true"); + var bulkResponse = client().performRequest(bulkRequest); + var bulkResponseBody = responseAsMap(bulkResponse); + assertThat(bulkResponseBody, Matchers.hasEntry("errors", false)); + + var forceMergeRequest = new Request("POST", "/" + indexName + "/_forcemerge"); + forceMergeRequest.addParameter("max_num_segments", "1"); + var forceMergeResponse = client().performRequest(forceMergeRequest); + assertOK(forceMergeResponse); + + String query = "FROM test-foo | STATS count(*), min(@timestamp), max(@timestamp), min(message_length), max(message_length)" + + " ,sum(message_length), avg(message_length), min(log.offset), max(log.offset) | LIMIT 1"; + final Request esqlRequest = new Request("POST", "/_query"); + esqlRequest.setJsonEntity(""" + { + "query": "$query" + } + """.replace("$query", query)); + var esqlResponse = client().performRequest(esqlRequest); + assertOK(esqlResponse); + Map esqlResponseBody = responseAsMap(esqlResponse); + + List values = (List) esqlResponseBody.get("values"); + assertThat(values, Matchers.not(Matchers.empty())); + var count = ((List) values.get(0)).get(0); + assertThat(count, equalTo(numDocs)); + logger.warn("VALUES: {}", values); + + var minTimestamp = ((List) values.get(0)).get(1); + assertThat(minTimestamp, equalTo(formatInstant(expectedMinTimestamp))); + var maxTimestamp = ((List) values.get(0)).get(2); + assertThat(maxTimestamp, equalTo(formatInstant(expectedMaxTimestamp))); + + var minLength = ((List) values.get(0)).get(3); + assertThat(minLength, equalTo(20)); + var maxLength = ((List) values.get(0)).get(4); + assertThat(maxLength, equalTo(20)); + var sumLength = ((List) values.get(0)).get(5); + assertThat(sumLength, equalTo(20 * numDocs)); + } + + static String formatInstant(Instant instant) { + return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant); + } + }