Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Address mapping and compute engine runtime field issues #117792

Merged
merged 30 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
619c441
Avoid reusing source providers for script based block loaders.
martijnvg Nov 30, 2024
ea8d11c
spotless
martijnvg Nov 30, 2024
19fd9af
reword
martijnvg Nov 30, 2024
ad224b6
Merge remote-tracking branch 'es/main' into fix_117644
martijnvg Dec 2, 2024
754c62c
add integration test and ensure that runtime fields get added to _ign…
martijnvg Dec 2, 2024
2bee4d8
improve test so that it fails without the fix
martijnvg Dec 2, 2024
050d4b2
spotless
martijnvg Dec 2, 2024
a47dfc6
A more contained approach that resolve the issue at the block loader …
martijnvg Dec 2, 2024
ca0e1a5
iter
martijnvg Dec 2, 2024
e378530
iter2
martijnvg Dec 2, 2024
c09207b
need source
martijnvg Dec 2, 2024
fdee715
introduce StoredFieldsSpec.SCRIPT_NO_REQUIREMENTS to avoid both compu…
martijnvg Dec 2, 2024
7be360c
keep one bock loader impl and fixed long script unit test
martijnvg Dec 2, 2024
848523b
Merge remote-tracking branch 'es/main' into fix_117644
martijnvg Dec 3, 2024
7529bdf
spotless
martijnvg Dec 3, 2024
ec88323
adapt test
martijnvg Dec 3, 2024
2aecb15
add new constructor to reduce changes in unrelated areas of the code …
martijnvg Dec 3, 2024
4344fdb
Merge remote-tracking branch 'es/main' into fix_117644
martijnvg Dec 3, 2024
f189cc1
Revert block loader based solution for the solution that creates a ne…
martijnvg Dec 4, 2024
130a96e
Merge remote-tracking branch 'es/main' into fix_117644
martijnvg Dec 4, 2024
611b50e
add potential fix for when compute engine executes in parallel with d…
martijnvg Dec 4, 2024
21dc0a2
spotless
martijnvg Dec 4, 2024
0d515bd
apply ReinitializingSourceProvider only for esql
martijnvg Dec 4, 2024
dc398ae
spotless
martijnvg Dec 4, 2024
1d2c878
Merge remote-tracking branch 'es/main' into fix_117644
martijnvg Dec 5, 2024
9148140
move ReinitializingSourceProvider
martijnvg Dec 5, 2024
a083d37
Update docs/changelog/117792.yaml
martijnvg Dec 5, 2024
7202757
remove duplicate copyright header
martijnvg Dec 5, 2024
809a27b
A less expensive approach to solve the concurrency issue
martijnvg Dec 5, 2024
bae95e4
update changelog entry
martijnvg Dec 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,9 @@ public Query termQuery(Object value, SearchExecutionContext context) {
protected void parseCreateField(DocumentParserContext context) {
// Run-time fields are mapped to this mapper, so it needs to handle storing values for use in synthetic source.
// #parseValue calls this method once the run-time field is created.
if (context.dynamic() == ObjectMapper.Dynamic.RUNTIME && context.canAddIgnoredField()) {
var fieldType = context.mappingLookup().getFieldType(path);
martijnvg marked this conversation as resolved.
Show resolved Hide resolved
boolean isRuntimeField = fieldType instanceof AbstractScriptFieldType;
if ((context.dynamic() == ObjectMapper.Dynamic.RUNTIME || isRuntimeField) && context.canAddIgnoredField()) {
try {
context.addIgnoredField(
IgnoredSourceFieldMapper.NameValue.fromContext(context, path, context.encodeFlattenedToken())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,14 +493,18 @@ public boolean containsBrokenAnalysis(String field) {
*/
public SearchLookup lookup() {
if (this.lookup == null) {
SourceProvider sourceProvider = isSourceSynthetic()
? SourceProvider.fromSyntheticSource(mappingLookup.getMapping(), mapperMetrics.sourceFieldMetrics())
: SourceProvider.fromStoredFields();
var sourceProvider = createSourceProvider();
setLookupProviders(sourceProvider, LeafFieldLookupProvider.fromStoredFields());
}
return this.lookup;
}

public SourceProvider createSourceProvider() {
return isSourceSynthetic()
? SourceProvider.fromSyntheticSource(mappingLookup.getMapping(), mapperMetrics.sourceFieldMetrics())
: SourceProvider.fromStoredFields();
}

/**
* Replace the standard source provider and field lookup provider on the SearchLookup
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.search.lookup;

import org.apache.lucene.index.LeafReaderContext;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;

import java.io.IOException;
import java.util.Map;
import java.util.function.Supplier;

/**
* This is a workaround for when compute engine executes concurrently with data partitioning by docid.
*/
public class ReinitializingSourceProvider implements SourceProvider {

private final Supplier<SourceProvider> sourceProviderFactory;
private final Map<Long, SourceProvider> map = ConcurrentCollections.newConcurrentMap();

public ReinitializingSourceProvider(Supplier<SourceProvider> sourceProviderFactory) {
this.sourceProviderFactory = sourceProviderFactory;
}

@Override
public Source getSource(LeafReaderContext ctx, int doc) throws IOException {
var currentThread = Thread.currentThread();
var sourceProvider = map.computeIfAbsent(currentThread.threadId(), (key) -> sourceProviderFactory.get());
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
return sourceProvider.getSource(ctx, doc);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,14 @@ private SearchLookup(SearchLookup searchLookup, Set<String> fieldChain) {
this.fieldLookupProvider = searchLookup.fieldLookupProvider;
}

private SearchLookup(SearchLookup searchLookup, SourceProvider sourceProvider, Set<String> fieldChain) {
this.fieldChain = Collections.unmodifiableSet(fieldChain);
this.sourceProvider = sourceProvider;
this.fieldTypeLookup = searchLookup.fieldTypeLookup;
this.fieldDataLookup = searchLookup.fieldDataLookup;
this.fieldLookupProvider = searchLookup.fieldLookupProvider;
}

/**
* Creates a copy of the current {@link SearchLookup} that looks fields up in the same way, but also tracks field references
* in order to detect cycles and prevent resolving fields that depend on more than {@link #MAX_FIELD_CHAIN_DEPTH} other fields.
Expand Down Expand Up @@ -144,4 +152,8 @@ public IndexFieldData<?> getForField(MappedFieldType fieldType, MappedFieldType.
public Source getSource(LeafReaderContext ctx, int doc) throws IOException {
return sourceProvider.getSource(ctx, doc);
}

public SearchLookup swapSourceProvider(SourceProvider sourceProvider) {
return new SearchLookup(this, sourceProvider, fieldChain);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.client.internal.ClusterAdminClient;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
Expand Down Expand Up @@ -1648,6 +1649,44 @@ public void testMaxTruncationSizeSetting() {
}
}

public void testScriptField() throws Exception {
XContentBuilder mapping = JsonXContent.contentBuilder();
mapping.startObject();
{
mapping.startObject("runtime");
{
mapping.startObject("k1");
mapping.field("type", "long");
mapping.endObject();
mapping.startObject("k2");
mapping.field("type", "long");
mapping.endObject();
}
mapping.endObject();
{
mapping.startObject("properties");
mapping.startObject("meter").field("type", "double").endObject();
mapping.endObject();
}
}
mapping.endObject();
String sourceMode = randomBoolean() ? "stored" : "synthetic";
Settings.Builder settings = indexSettings(1, 0).put(indexSettings()).put("index.mapping.source.mode", sourceMode);
client().admin().indices().prepareCreate("test-script").setMapping(mapping).setSettings(settings).get();
for (int i = 0; i < 10; i++) {
index("test-script", Integer.toString(i), Map.of("k1", i, "k2", "b-" + i, "meter", 10000 * i));
}
refresh("test-script");
try (EsqlQueryResponse resp = run("FROM test-script | SORT k1 | LIMIT 10")) {
List<Object> k1Column = Iterators.toList(resp.column(0));
assertThat(k1Column, contains(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L));
List<Object> k2Column = Iterators.toList(resp.column(1));
assertThat(k2Column, contains(null, null, null, null, null, null, null, null, null, null));
List<Object> meterColumn = Iterators.toList(resp.column(2));
assertThat(meterColumn, contains(0.0, 10000.0, 20000.0, 30000.0, 40000.0, 50000.0, 60000.0, 70000.0, 80000.0, 90000.0));
}
}

private void clearPersistentSettings(Setting<?>... settings) {
Settings.Builder clearedSettings = Settings.builder();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.index.mapper.FieldNamesFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.NestedLookup;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.mapper.SourceLoader;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
Expand Down Expand Up @@ -348,7 +349,16 @@ public MappedFieldType.FieldExtractPreference fieldExtractPreference() {

@Override
public SearchLookup lookup() {
return ctx.lookup();
boolean syntheticSource = SourceFieldMapper.isSynthetic(indexSettings());
var searchLookup = ctx.lookup();
if (syntheticSource) {
// in the context of scripts and when synthetic source is used the search lookup can't always be reused between
// users of SearchLookup. This is only an issue when scripts fallback to _source, but since we can't always
// accurately determine whether a script uses _source, we should do this for all script usages.
// This lookup() method is only invoked for scripts / runtime fields, so it is ok to do here.
searchLookup = searchLookup.swapSourceProvider(ctx.createSourceProvider());
}
return searchLookup;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.lookup.ReinitializingSourceProvider;
import org.elasticsearch.search.lookup.SourceProvider;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
Expand Down Expand Up @@ -87,6 +89,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

import static org.elasticsearch.xpack.esql.plugin.EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME;

Expand Down Expand Up @@ -471,12 +474,17 @@ void runCompute(CancellableTask task, ComputeContext context, PhysicalPlan plan,
List<EsPhysicalOperationProviders.ShardContext> contexts = new ArrayList<>(context.searchContexts.size());
for (int i = 0; i < context.searchContexts.size(); i++) {
SearchContext searchContext = context.searchContexts.get(i);
var searchExecutionContext = new SearchExecutionContext(searchContext.getSearchExecutionContext()) {

@Override
public SourceProvider createSourceProvider() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neat!

final Supplier<SourceProvider> supplier = () -> super.createSourceProvider();
return new ReinitializingSourceProvider(supplier);

}
};
contexts.add(
new EsPhysicalOperationProviders.DefaultShardContext(
i,
searchContext.getSearchExecutionContext(),
searchContext.request().getAliasFilter()
)
new EsPhysicalOperationProviders.DefaultShardContext(i, searchExecutionContext, searchContext.request().getAliasFilter())
);
}
final List<Driver> drivers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@
import org.elasticsearch.client.Request;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.common.time.FormatNames;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.hamcrest.Matchers;
import org.junit.ClassRule;

import java.io.IOException;
import java.time.Instant;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -108,4 +111,118 @@ public void testLogsdbSourceModeForLogsIndex() throws IOException {
assertNull(settings.get("index.mapping.source.mode"));
}

public void testEsqlRuntimeFields() throws IOException {
String mappings = """
{
"runtime": {
"message_length": {
"type": "long"
},
"log.offset": {
"type": "long"
}
},
"dynamic": false,
"properties": {
"@timestamp": {
"type": "date"
},
"log" : {
"properties": {
"level": {
"type": "keyword"
},
"file": {
"type": "keyword"
}
}
}
}
}
""";
String indexName = "test-foo";
createIndex(indexName, Settings.builder().put("index.mode", "logsdb").build(), mappings);

int numDocs = 500;
var sb = new StringBuilder();
var now = Instant.now();

var expectedMinTimestamp = now;
for (int i = 0; i < numDocs; i++) {
String level = randomBoolean() ? "info" : randomBoolean() ? "warning" : randomBoolean() ? "error" : "fatal";
String msg = randomAlphaOfLength(20);
String path = randomAlphaOfLength(8);
String messageLength = Integer.toString(msg.length());
String offset = Integer.toString(randomNonNegativeInt());
sb.append("{ \"create\": {} }").append('\n');
if (randomBoolean()) {
sb.append(
"""
{"@timestamp":"$now","message":"$msg","message_length":$l,"file":{"level":"$level","offset":5,"file":"$path"}}
""".replace("$now", formatInstant(now))
.replace("$level", level)
.replace("$msg", msg)
.replace("$path", path)
.replace("$l", messageLength)
.replace("$o", offset)
);
} else {
sb.append("""
{"@timestamp": "$now", "message": "$msg", "message_length": $l}
""".replace("$now", formatInstant(now)).replace("$msg", msg).replace("$l", messageLength));
}
sb.append('\n');
if (i != numDocs - 1) {
now = now.plusSeconds(1);
}
}
var expectedMaxTimestamp = now;

var bulkRequest = new Request("POST", "/" + indexName + "/_bulk");
bulkRequest.setJsonEntity(sb.toString());
bulkRequest.addParameter("refresh", "true");
var bulkResponse = client().performRequest(bulkRequest);
var bulkResponseBody = responseAsMap(bulkResponse);
assertThat(bulkResponseBody, Matchers.hasEntry("errors", false));

var forceMergeRequest = new Request("POST", "/" + indexName + "/_forcemerge");
forceMergeRequest.addParameter("max_num_segments", "1");
var forceMergeResponse = client().performRequest(forceMergeRequest);
assertOK(forceMergeResponse);

String query = "FROM test-foo | STATS count(*), min(@timestamp), max(@timestamp), min(message_length), max(message_length)"
+ " ,sum(message_length), avg(message_length), min(log.offset), max(log.offset) | LIMIT 1";
final Request esqlRequest = new Request("POST", "/_query");
esqlRequest.setJsonEntity("""
{
"query": "$query"
}
""".replace("$query", query));
var esqlResponse = client().performRequest(esqlRequest);
assertOK(esqlResponse);
Map<String, Object> esqlResponseBody = responseAsMap(esqlResponse);

List<?> values = (List<?>) esqlResponseBody.get("values");
assertThat(values, Matchers.not(Matchers.empty()));
var count = ((List<?>) values.getFirst()).get(0);
assertThat(count, equalTo(numDocs));
logger.warn("VALUES: {}", values);

var minTimestamp = ((List<?>) values.getFirst()).get(1);
assertThat(minTimestamp, equalTo(formatInstant(expectedMinTimestamp)));
var maxTimestamp = ((List<?>) values.getFirst()).get(2);
assertThat(maxTimestamp, equalTo(formatInstant(expectedMaxTimestamp)));

var minLength = ((List<?>) values.getFirst()).get(3);
assertThat(minLength, equalTo(20));
var maxLength = ((List<?>) values.getFirst()).get(4);
assertThat(maxLength, equalTo(20));
var sumLength = ((List<?>) values.getFirst()).get(5);
assertThat(sumLength, equalTo(20 * numDocs));
}

static String formatInstant(Instant instant) {
return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant);
}

}