Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Address mapping and compute engine runtime field issues #117792

Merged
merged 30 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
619c441
Avoid reusing source providers for script based block loaders.
martijnvg Nov 30, 2024
ea8d11c
spotless
martijnvg Nov 30, 2024
19fd9af
reword
martijnvg Nov 30, 2024
ad224b6
Merge remote-tracking branch 'es/main' into fix_117644
martijnvg Dec 2, 2024
754c62c
add integration test and ensure that runtime fields get added to _ign…
martijnvg Dec 2, 2024
2bee4d8
improve test so that it fails without the fix
martijnvg Dec 2, 2024
050d4b2
spotless
martijnvg Dec 2, 2024
a47dfc6
A more contained approach that resolve the issue at the block loader …
martijnvg Dec 2, 2024
ca0e1a5
iter
martijnvg Dec 2, 2024
e378530
iter2
martijnvg Dec 2, 2024
c09207b
need source
martijnvg Dec 2, 2024
fdee715
introduce StoredFieldsSpec.SCRIPT_NO_REQUIREMENTS to avoid both compu…
martijnvg Dec 2, 2024
7be360c
keep one bock loader impl and fixed long script unit test
martijnvg Dec 2, 2024
848523b
Merge remote-tracking branch 'es/main' into fix_117644
martijnvg Dec 3, 2024
7529bdf
spotless
martijnvg Dec 3, 2024
ec88323
adapt test
martijnvg Dec 3, 2024
2aecb15
add new constructor to reduce changes in unrelated areas of the code …
martijnvg Dec 3, 2024
4344fdb
Merge remote-tracking branch 'es/main' into fix_117644
martijnvg Dec 3, 2024
f189cc1
Revert block loader based solution for the solution that creates a ne…
martijnvg Dec 4, 2024
130a96e
Merge remote-tracking branch 'es/main' into fix_117644
martijnvg Dec 4, 2024
611b50e
add potential fix for when compute engine executes in parallel with d…
martijnvg Dec 4, 2024
21dc0a2
spotless
martijnvg Dec 4, 2024
0d515bd
apply ReinitializingSourceProvider only for esql
martijnvg Dec 4, 2024
dc398ae
spotless
martijnvg Dec 4, 2024
1d2c878
Merge remote-tracking branch 'es/main' into fix_117644
martijnvg Dec 5, 2024
9148140
move ReinitializingSourceProvider
martijnvg Dec 5, 2024
a083d37
Update docs/changelog/117792.yaml
martijnvg Dec 5, 2024
7202757
remove duplicate copyright header
martijnvg Dec 5, 2024
809a27b
A less expensive approach to solve the concurrency issue
martijnvg Dec 5, 2024
bae95e4
update changelog entry
martijnvg Dec 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,29 @@ public SortedSetDocValues ordinals(LeafReaderContext context) throws IOException
}
}

public abstract static class RowStrideOnlyDocValuesBlockLoader implements BlockLoader {

@Override
public final ColumnAtATimeReader columnAtATimeReader(LeafReaderContext context) throws IOException {
return null;
}

@Override
public boolean supportsOrdinals() {
return false;
}

@Override
public SortedSetDocValues ordinals(LeafReaderContext context) throws IOException {
throw new UnsupportedOperationException();
}

@Override
public final StoredFieldsSpec rowStrideStoredFieldSpec() {
return StoredFieldsSpec.NEEDS_SOURCE;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed because otherwise the follow error occurs:

[2024-12-03T04:52:11,002][WARN ][o.e.x.e.a.EsqlResponseListener] [test-cluster-0] Request failed with status [INTERNAL_SERVER_ERROR]: java.lang.IllegalStateException: found row stride readers [[RowStrideReaderWork[reader=ScriptLongs, builder=org.elasticsearch.compute.data.LongBlockBuilder@19f6840e, loader=org.elasticsearch.index.mapper.LongScriptBlockDocValuesReader$RowStrideLongScriptBlockLoader@6f332c9, offset=1], RowStrideReaderWork[reader=ScriptLongs, builder=org.elasticsearch.compute.data.LongBlockBuilder@12a1d61f, loader=org.elasticsearch.index.mapper.LongScriptBlockDocValuesReader$RowStrideLongScriptBlockLoader@7b7a5f6c, offset=2]]] without stored fields [StoredFieldsSpec[requiresSource=false, requiresMetadata=false, requiredStoredFields=[]]]
        at org.elasticsearch.compute.lucene.ValuesSourceReaderOperator.loadFromSingleLeaf(ValuesSourceReaderOperator.java:254)
        at org.elasticsearch.compute.lucene.ValuesSourceReaderOperator.process(ValuesSourceReaderOperator.java:143)
        at org.elasticsearch.compute.operator.AbstractPageMappingOperator.getOutput(AbstractPageMappingOperator.java:76)
        at org.elasticsearch.compute.operator.Driver.runSingleLoopIteration(Driver.java:258)
        at org.elasticsearch.compute.operator.Driver.run(Driver.java:189)
        at org.elasticsearch.compute.operator.Driver$1.doRun(Driver.java:378)
        at [email protected]/org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:27)
        at [email protected]/org.elasticsearch.common.util.concurrent.TimedRunnable.doRun(TimedRunnable.java:34)
        at [email protected]/org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:1023)
        at [email protected]/org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:27)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
        at java.base/java.lang.Thread.run(Thread.java:1575)

However it isn't really required. Since the SearchLook / SourceLoader will load stored fields and not compute engine.

}
}

public static class LongsBlockLoader extends DocValuesBlockLoader {
private final String fieldName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -941,7 +941,9 @@ public Query termQuery(Object value, SearchExecutionContext context) {
protected void parseCreateField(DocumentParserContext context) {
// Run-time fields are mapped to this mapper, so it needs to handle storing values for use in synthetic source.
// #parseValue calls this method once the run-time field is created.
if (context.dynamic() == ObjectMapper.Dynamic.RUNTIME && context.canAddIgnoredField()) {
var fieldType = context.mappingLookup().getFieldType(path);
martijnvg marked this conversation as resolved.
Show resolved Hide resolved
boolean isRuntimeField = fieldType instanceof AbstractScriptFieldType;
if ((context.dynamic() == ObjectMapper.Dynamic.RUNTIME || isRuntimeField) && context.canAddIgnoredField()) {
try {
context.addIgnoredField(
IgnoredSourceFieldMapper.NameValue.fromContext(context, path, context.encodeFlattenedToken())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,25 @@ public AllReader reader(LeafReaderContext context) throws IOException {
}
}

static class RowStrideLongScriptBlockLoader extends RowStrideOnlyDocValuesBlockLoader {
private final LongFieldScript.LeafFactory factory;

RowStrideLongScriptBlockLoader(LongFieldScript.LeafFactory factory) {
this.factory = factory;
}

@Override
public Builder builder(BlockFactory factory, int expectedCount) {
return factory.longs(expectedCount);
}

@Override
public RowStrideReader rowStrideReader(LeafReaderContext context) throws IOException {
return new LongScriptBlockDocValuesReader(factory.newInstance(context));
}

}

private final LongFieldScript script;
private int docId;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,12 @@ public DocValueFormat docValueFormat(String format, ZoneId timeZone) {

@Override
public BlockLoader blockLoader(BlockLoaderContext blContext) {
return new LongScriptBlockDocValuesReader.LongScriptBlockLoader(leafFactory(blContext.lookup()));
boolean isSyntheticSource = SourceFieldMapper.isSynthetic(blContext.indexSettings());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have two different implementations based on source mode?

if (isSyntheticSource) {
return new LongScriptBlockDocValuesReader.RowStrideLongScriptBlockLoader(leafFactory(blContext.lookup()));
} else {
return new LongScriptBlockDocValuesReader.LongScriptBlockLoader(leafFactory(blContext.lookup()));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@
import org.elasticsearch.client.Request;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.common.time.FormatNames;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.hamcrest.Matchers;
import org.junit.ClassRule;

import java.io.IOException;
import java.time.Instant;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -108,4 +111,118 @@ public void testLogsdbSourceModeForLogsIndex() throws IOException {
assertNull(settings.get("index.mapping.source.mode"));
}

public void testEsqlRuntimeFields() throws IOException {
String mappings = """
{
"runtime": {
"message_length": {
"type": "long"
},
"log.offset": {
"type": "long"
}
},
"dynamic": false,
"properties": {
"@timestamp": {
"type": "date"
},
"log" : {
"properties": {
"level": {
"type": "keyword"
},
"file": {
"type": "keyword"
}
}
}
}
}
""";
String indexName = "test-foo";
createIndex(indexName, Settings.builder().put("index.mode", "logsdb").build(), mappings);

int numDocs = 500;
var sb = new StringBuilder();
var now = Instant.now();

var expectedMinTimestamp = now;
for (int i = 0; i < numDocs; i++) {
String level = randomBoolean() ? "info" : randomBoolean() ? "warning" : randomBoolean() ? "error" : "fatal";
String msg = randomAlphaOfLength(20);
String path = randomAlphaOfLength(8);
String messageLength = Integer.toString(msg.length());
String offset = Integer.toString(randomNonNegativeInt());
sb.append("{ \"create\": {} }").append('\n');
if (randomBoolean()) {
sb.append(
"""
{"@timestamp":"$now","message":"$msg","message_length":$l,"file":{"level":"$level","offset":5,"file":"$path"}}
""".replace("$now", formatInstant(now))
.replace("$level", level)
.replace("$msg", msg)
.replace("$path", path)
.replace("$l", messageLength)
.replace("$o", offset)
);
} else {
sb.append("""
{"@timestamp": "$now", "message": "$msg", "message_length": $l}
""".replace("$now", formatInstant(now)).replace("$msg", msg).replace("$l", messageLength));
}
sb.append('\n');
if (i != numDocs - 1) {
now = now.plusSeconds(1);
}
}
var expectedMaxTimestamp = now;

var bulkRequest = new Request("POST", "/" + indexName + "/_bulk");
bulkRequest.setJsonEntity(sb.toString());
bulkRequest.addParameter("refresh", "true");
var bulkResponse = client().performRequest(bulkRequest);
var bulkResponseBody = responseAsMap(bulkResponse);
assertThat(bulkResponseBody, Matchers.hasEntry("errors", false));

var forceMergeRequest = new Request("POST", "/" + indexName + "/_forcemerge");
forceMergeRequest.addParameter("max_num_segments", "1");
var forceMergeResponse = client().performRequest(forceMergeRequest);
assertOK(forceMergeResponse);

String query = "FROM test-foo | STATS count(*), min(@timestamp), max(@timestamp), min(message_length), max(message_length)"
+ " ,sum(message_length), avg(message_length), min(log.offset), max(log.offset) | LIMIT 1";
final Request esqlRequest = new Request("POST", "/_query");
esqlRequest.setJsonEntity("""
{
"query": "$query"
}
""".replace("$query", query));
var esqlResponse = client().performRequest(esqlRequest);
assertOK(esqlResponse);
Map<String, Object> esqlResponseBody = responseAsMap(esqlResponse);

List<?> values = (List<?>) esqlResponseBody.get("values");
assertThat(values, Matchers.not(Matchers.empty()));
var count = ((List<?>) values.getFirst()).get(0);
assertThat(count, equalTo(numDocs));
logger.warn("VALUES: {}", values);

var minTimestamp = ((List<?>) values.getFirst()).get(1);
assertThat(minTimestamp, equalTo(formatInstant(expectedMinTimestamp)));
var maxTimestamp = ((List<?>) values.getFirst()).get(2);
assertThat(maxTimestamp, equalTo(formatInstant(expectedMaxTimestamp)));

var minLength = ((List<?>) values.getFirst()).get(3);
assertThat(minLength, equalTo(20));
var maxLength = ((List<?>) values.getFirst()).get(4);
assertThat(maxLength, equalTo(20));
var sumLength = ((List<?>) values.getFirst()).get(5);
assertThat(sumLength, equalTo(20 * numDocs));
}

static String formatInstant(Instant instant) {
return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant);
}

}