From b8afe64eb637535691a66d0696296e90202b2350 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 5 Dec 2024 12:02:16 +0100 Subject: [PATCH] [8.x] Address mapping and compute engine runtime field issues (#117792) (#118049) * Address mapping and compute engine runtime field issues (#117792) This change addresses the following issues: Fields mapped as runtime fields not getting stored if source mode is synthetic. Address java.io.EOFException when an es|ql query uses multiple runtime fields that fallback to source when source mode is synthetic. (1) Address concurrency issue when runtime fields get pushed down to Lucene. (2) 1: ValueSourceOperator can read values in row striding or columnar fashion. When values are read in columnar fashion and multiple runtime fields synthetize source then this can cause the same SourceProvider evaluation the same range of docs ids multiple times. This can then result in unexpected io errors at the codec level. This is because the same doc value instances are used by SourceProvider. Re-evaluating the same docids is in violation of the contract of the DocIdSetIterator#advance(...) / DocIdSetIterator#advanceExact(...) methods, which documents that unexpected behaviour can occur if target docid is lower than current docid position. Note that this is only an issue for synthetic source loader and not for stored source loader. And not when executing in row stride fashion which sometimes happen in compute engine and always happen in _search api. 2: The concurrency issue that arrises with source provider if source operator executes in parallel with data portioning set to DOC. The same SourceProvider instance then gets access by multiple threads concurrently. SourceProviders implementations are not designed to handle concurrent access. Closes #117644 --- docs/changelog/117792.yaml | 6 + .../index/mapper/DocumentParser.java | 4 +- .../index/query/SearchExecutionContext.java | 10 +- .../search/lookup/SearchLookup.java | 12 ++ .../xpack/esql/action/EsqlActionIT.java | 39 ++++++ .../planner/EsPhysicalOperationProviders.java | 12 +- .../xpack/esql/plugin/ComputeService.java | 17 ++- .../plugin/ReinitializingSourceProvider.java | 43 +++++++ .../xpack/logsdb/LogsdbRestIT.java | 117 ++++++++++++++++++ 9 files changed, 250 insertions(+), 10 deletions(-) create mode 100644 docs/changelog/117792.yaml create mode 100644 x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ReinitializingSourceProvider.java diff --git a/docs/changelog/117792.yaml b/docs/changelog/117792.yaml new file mode 100644 index 0000000000000..2d7ddda1ace40 --- /dev/null +++ b/docs/changelog/117792.yaml @@ -0,0 +1,6 @@ +pr: 117792 +summary: Address mapping and compute engine runtime field issues +area: Mapping +type: bug +issues: + - 117644 diff --git a/server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java b/server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java index c9e035f9d3f20..fe2c4dc7f2c5d 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java @@ -946,7 +946,9 @@ public Query termQuery(Object value, SearchExecutionContext context) { protected void parseCreateField(DocumentParserContext context) { // Run-time fields are mapped to this mapper, so it needs to handle storing values for use in synthetic source. // #parseValue calls this method once the run-time field is created. - if (context.dynamic() == ObjectMapper.Dynamic.RUNTIME && context.canAddIgnoredField()) { + var fieldType = context.mappingLookup().getFieldType(path); + boolean isRuntimeField = fieldType instanceof AbstractScriptFieldType; + if ((context.dynamic() == ObjectMapper.Dynamic.RUNTIME || isRuntimeField) && context.canAddIgnoredField()) { try { context.addIgnoredField( IgnoredSourceFieldMapper.NameValue.fromContext(context, path, context.encodeFlattenedToken()) diff --git a/server/src/main/java/org/elasticsearch/index/query/SearchExecutionContext.java b/server/src/main/java/org/elasticsearch/index/query/SearchExecutionContext.java index b07112440d3c2..d5e48a6a54daa 100644 --- a/server/src/main/java/org/elasticsearch/index/query/SearchExecutionContext.java +++ b/server/src/main/java/org/elasticsearch/index/query/SearchExecutionContext.java @@ -493,14 +493,18 @@ public boolean containsBrokenAnalysis(String field) { */ public SearchLookup lookup() { if (this.lookup == null) { - SourceProvider sourceProvider = isSourceSynthetic() - ? SourceProvider.fromSyntheticSource(mappingLookup.getMapping(), mapperMetrics.sourceFieldMetrics()) - : SourceProvider.fromStoredFields(); + var sourceProvider = createSourceProvider(); setLookupProviders(sourceProvider, LeafFieldLookupProvider.fromStoredFields()); } return this.lookup; } + public SourceProvider createSourceProvider() { + return isSourceSynthetic() + ? SourceProvider.fromSyntheticSource(mappingLookup.getMapping(), mapperMetrics.sourceFieldMetrics()) + : SourceProvider.fromStoredFields(); + } + /** * Replace the standard source provider and field lookup provider on the SearchLookup * diff --git a/server/src/main/java/org/elasticsearch/search/lookup/SearchLookup.java b/server/src/main/java/org/elasticsearch/search/lookup/SearchLookup.java index f7f8cee30ee15..9eb0170af5efb 100644 --- a/server/src/main/java/org/elasticsearch/search/lookup/SearchLookup.java +++ b/server/src/main/java/org/elasticsearch/search/lookup/SearchLookup.java @@ -102,6 +102,14 @@ private SearchLookup(SearchLookup searchLookup, Set fieldChain) { this.fieldLookupProvider = searchLookup.fieldLookupProvider; } + private SearchLookup(SearchLookup searchLookup, SourceProvider sourceProvider, Set fieldChain) { + this.fieldChain = Collections.unmodifiableSet(fieldChain); + this.sourceProvider = sourceProvider; + this.fieldTypeLookup = searchLookup.fieldTypeLookup; + this.fieldDataLookup = searchLookup.fieldDataLookup; + this.fieldLookupProvider = searchLookup.fieldLookupProvider; + } + /** * Creates a copy of the current {@link SearchLookup} that looks fields up in the same way, but also tracks field references * in order to detect cycles and prevent resolving fields that depend on more than {@link #MAX_FIELD_CHAIN_DEPTH} other fields. @@ -144,4 +152,8 @@ public IndexFieldData getForField(MappedFieldType fieldType, MappedFieldType. public Source getSource(LeafReaderContext ctx, int doc) throws IOException { return sourceProvider.getSource(ctx, doc); } + + public SearchLookup swapSourceProvider(SourceProvider sourceProvider) { + return new SearchLookup(this, sourceProvider, fieldChain); + } } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java index 147b13b36c44b..00f53d31165b1 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java @@ -18,6 +18,7 @@ import org.elasticsearch.client.internal.ClusterAdminClient; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; @@ -1648,6 +1649,44 @@ public void testMaxTruncationSizeSetting() { } } + public void testScriptField() throws Exception { + XContentBuilder mapping = JsonXContent.contentBuilder(); + mapping.startObject(); + { + mapping.startObject("runtime"); + { + mapping.startObject("k1"); + mapping.field("type", "long"); + mapping.endObject(); + mapping.startObject("k2"); + mapping.field("type", "long"); + mapping.endObject(); + } + mapping.endObject(); + { + mapping.startObject("properties"); + mapping.startObject("meter").field("type", "double").endObject(); + mapping.endObject(); + } + } + mapping.endObject(); + String sourceMode = randomBoolean() ? "stored" : "synthetic"; + Settings.Builder settings = indexSettings(1, 0).put(indexSettings()).put("index.mapping.source.mode", sourceMode); + client().admin().indices().prepareCreate("test-script").setMapping(mapping).setSettings(settings).get(); + for (int i = 0; i < 10; i++) { + index("test-script", Integer.toString(i), Map.of("k1", i, "k2", "b-" + i, "meter", 10000 * i)); + } + refresh("test-script"); + try (EsqlQueryResponse resp = run("FROM test-script | SORT k1 | LIMIT 10")) { + List k1Column = Iterators.toList(resp.column(0)); + assertThat(k1Column, contains(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L)); + List k2Column = Iterators.toList(resp.column(1)); + assertThat(k2Column, contains(null, null, null, null, null, null, null, null, null, null)); + List meterColumn = Iterators.toList(resp.column(2)); + assertThat(meterColumn, contains(0.0, 10000.0, 20000.0, 30000.0, 40000.0, 50000.0, 60000.0, 70000.0, 80000.0, 90000.0)); + } + } + private void clearPersistentSettings(Setting... settings) { Settings.Builder clearedSettings = Settings.builder(); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java index 7bf7d0e2d08eb..39e2a3bc1d5af 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java @@ -39,6 +39,7 @@ import org.elasticsearch.index.mapper.FieldNamesFieldMapper; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.NestedLookup; +import org.elasticsearch.index.mapper.SourceFieldMapper; import org.elasticsearch.index.mapper.SourceLoader; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; @@ -348,7 +349,16 @@ public MappedFieldType.FieldExtractPreference fieldExtractPreference() { @Override public SearchLookup lookup() { - return ctx.lookup(); + boolean syntheticSource = SourceFieldMapper.isSynthetic(indexSettings()); + var searchLookup = ctx.lookup(); + if (syntheticSource) { + // in the context of scripts and when synthetic source is used the search lookup can't always be reused between + // users of SearchLookup. This is only an issue when scripts fallback to _source, but since we can't always + // accurately determine whether a script uses _source, we should do this for all script usages. + // This lookup() method is only invoked for scripts / runtime fields, so it is ok to do here. + searchLookup = searchLookup.swapSourceProvider(ctx.createSourceProvider()); + } + return searchLookup; } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 9aea1577a4137..aa3123fc2f636 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -45,6 +45,7 @@ import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.ShardSearchRequest; +import org.elasticsearch.search.lookup.SourceProvider; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskCancelledException; @@ -82,6 +83,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; import static org.elasticsearch.xpack.esql.plugin.EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME; @@ -428,12 +430,17 @@ void runCompute(CancellableTask task, ComputeContext context, PhysicalPlan plan, List contexts = new ArrayList<>(context.searchContexts.size()); for (int i = 0; i < context.searchContexts.size(); i++) { SearchContext searchContext = context.searchContexts.get(i); + var searchExecutionContext = new SearchExecutionContext(searchContext.getSearchExecutionContext()) { + + @Override + public SourceProvider createSourceProvider() { + final Supplier supplier = () -> super.createSourceProvider(); + return new ReinitializingSourceProvider(supplier); + + } + }; contexts.add( - new EsPhysicalOperationProviders.DefaultShardContext( - i, - searchContext.getSearchExecutionContext(), - searchContext.request().getAliasFilter() - ) + new EsPhysicalOperationProviders.DefaultShardContext(i, searchExecutionContext, searchContext.request().getAliasFilter()) ); } final List drivers; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ReinitializingSourceProvider.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ReinitializingSourceProvider.java new file mode 100644 index 0000000000000..b6b2c6dfec755 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ReinitializingSourceProvider.java @@ -0,0 +1,43 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.plugin; + +import org.apache.lucene.index.LeafReaderContext; +import org.elasticsearch.search.lookup.Source; +import org.elasticsearch.search.lookup.SourceProvider; + +import java.io.IOException; +import java.util.function.Supplier; + +/** + * This is a workaround for when compute engine executes concurrently with data partitioning by docid. + */ +final class ReinitializingSourceProvider implements SourceProvider { + + private PerThreadSourceProvider perThreadProvider; + private final Supplier sourceProviderFactory; + + ReinitializingSourceProvider(Supplier sourceProviderFactory) { + this.sourceProviderFactory = sourceProviderFactory; + } + + @Override + public Source getSource(LeafReaderContext ctx, int doc) throws IOException { + var currentThread = Thread.currentThread(); + PerThreadSourceProvider provider = perThreadProvider; + if (provider == null || provider.creatingThread != currentThread) { + provider = new PerThreadSourceProvider(sourceProviderFactory.get(), currentThread); + this.perThreadProvider = provider; + } + return perThreadProvider.source.getSource(ctx, doc); + } + + private record PerThreadSourceProvider(SourceProvider source, Thread creatingThread) { + + } +} diff --git a/x-pack/plugin/logsdb/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/LogsdbRestIT.java b/x-pack/plugin/logsdb/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/LogsdbRestIT.java index 2bf8b00cf551c..d42c1aa240a64 100644 --- a/x-pack/plugin/logsdb/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/LogsdbRestIT.java +++ b/x-pack/plugin/logsdb/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/LogsdbRestIT.java @@ -10,6 +10,8 @@ import org.elasticsearch.client.Request; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.time.DateFormatter; +import org.elasticsearch.common.time.FormatNames; import org.elasticsearch.test.cluster.ElasticsearchCluster; import org.elasticsearch.test.cluster.local.distribution.DistributionType; import org.elasticsearch.test.rest.ESRestTestCase; @@ -17,6 +19,7 @@ import org.junit.ClassRule; import java.io.IOException; +import java.time.Instant; import java.util.List; import java.util.Map; @@ -108,4 +111,118 @@ public void testLogsdbSourceModeForLogsIndex() throws IOException { assertNull(settings.get("index.mapping.source.mode")); } + public void testEsqlRuntimeFields() throws IOException { + String mappings = """ + { + "runtime": { + "message_length": { + "type": "long" + }, + "log.offset": { + "type": "long" + } + }, + "dynamic": false, + "properties": { + "@timestamp": { + "type": "date" + }, + "log" : { + "properties": { + "level": { + "type": "keyword" + }, + "file": { + "type": "keyword" + } + } + } + } + } + """; + String indexName = "test-foo"; + createIndex(indexName, Settings.builder().put("index.mode", "logsdb").build(), mappings); + + int numDocs = 500; + var sb = new StringBuilder(); + var now = Instant.now(); + + var expectedMinTimestamp = now; + for (int i = 0; i < numDocs; i++) { + String level = randomBoolean() ? "info" : randomBoolean() ? "warning" : randomBoolean() ? "error" : "fatal"; + String msg = randomAlphaOfLength(20); + String path = randomAlphaOfLength(8); + String messageLength = Integer.toString(msg.length()); + String offset = Integer.toString(randomNonNegativeInt()); + sb.append("{ \"create\": {} }").append('\n'); + if (randomBoolean()) { + sb.append( + """ + {"@timestamp":"$now","message":"$msg","message_length":$l,"file":{"level":"$level","offset":5,"file":"$path"}} + """.replace("$now", formatInstant(now)) + .replace("$level", level) + .replace("$msg", msg) + .replace("$path", path) + .replace("$l", messageLength) + .replace("$o", offset) + ); + } else { + sb.append(""" + {"@timestamp": "$now", "message": "$msg", "message_length": $l} + """.replace("$now", formatInstant(now)).replace("$msg", msg).replace("$l", messageLength)); + } + sb.append('\n'); + if (i != numDocs - 1) { + now = now.plusSeconds(1); + } + } + var expectedMaxTimestamp = now; + + var bulkRequest = new Request("POST", "/" + indexName + "/_bulk"); + bulkRequest.setJsonEntity(sb.toString()); + bulkRequest.addParameter("refresh", "true"); + var bulkResponse = client().performRequest(bulkRequest); + var bulkResponseBody = responseAsMap(bulkResponse); + assertThat(bulkResponseBody, Matchers.hasEntry("errors", false)); + + var forceMergeRequest = new Request("POST", "/" + indexName + "/_forcemerge"); + forceMergeRequest.addParameter("max_num_segments", "1"); + var forceMergeResponse = client().performRequest(forceMergeRequest); + assertOK(forceMergeResponse); + + String query = "FROM test-foo | STATS count(*), min(@timestamp), max(@timestamp), min(message_length), max(message_length)" + + " ,sum(message_length), avg(message_length), min(log.offset), max(log.offset) | LIMIT 1"; + final Request esqlRequest = new Request("POST", "/_query"); + esqlRequest.setJsonEntity(""" + { + "query": "$query" + } + """.replace("$query", query)); + var esqlResponse = client().performRequest(esqlRequest); + assertOK(esqlResponse); + Map esqlResponseBody = responseAsMap(esqlResponse); + + List values = (List) esqlResponseBody.get("values"); + assertThat(values, Matchers.not(Matchers.empty())); + var count = ((List) values.get(0)).get(0); + assertThat(count, equalTo(numDocs)); + logger.warn("VALUES: {}", values); + + var minTimestamp = ((List) values.get(0)).get(1); + assertThat(minTimestamp, equalTo(formatInstant(expectedMinTimestamp))); + var maxTimestamp = ((List) values.get(0)).get(2); + assertThat(maxTimestamp, equalTo(formatInstant(expectedMaxTimestamp))); + + var minLength = ((List) values.get(0)).get(3); + assertThat(minLength, equalTo(20)); + var maxLength = ((List) values.get(0)).get(4); + assertThat(maxLength, equalTo(20)); + var sumLength = ((List) values.get(0)).get(5); + assertThat(sumLength, equalTo(20 * numDocs)); + } + + static String formatInstant(Instant instant) { + return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant); + } + }