From 10100a98d596fa9ad95128ba42f4efc9bf29eabf Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 15 Jan 2025 08:57:48 +0100 Subject: [PATCH] [8.x] Cleanup swapSourceProvider(...) workaround (#120095) Backporting #118480 to 8.x branch. This reverts the workaround that was introduced in #117792 to avoid EOF error when an es|ql query uses multiple runtime fields that fallback to source when source mode is synthetic. This is now covered by the ReinitializingSourceProvider workaround that covers that and the concurrency problem. With this change, the main code for the required workarounds are now in isolated in ReinitializingSourceProvider. Additional another in `ReinitializingSourceProvider` was fixed, the issue was the lastSeenDoc field was reused overwritten by different threads, the latest commit moves the lastSeenDoc field to PerThreadSourceProvider so that each thread gets its own place to store the last seen docid. --- .../search/lookup/SearchLookup.java | 11 ---------- .../planner/EsPhysicalOperationProviders.java | 12 +---------- .../plugin/ReinitializingSourceProvider.java | 20 ++++++++++++------- 3 files changed, 14 insertions(+), 29 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/lookup/SearchLookup.java b/server/src/main/java/org/elasticsearch/search/lookup/SearchLookup.java index 9eb0170af5efb..d899a390d8be7 100644 --- a/server/src/main/java/org/elasticsearch/search/lookup/SearchLookup.java +++ b/server/src/main/java/org/elasticsearch/search/lookup/SearchLookup.java @@ -102,14 +102,6 @@ private SearchLookup(SearchLookup searchLookup, Set fieldChain) { this.fieldLookupProvider = searchLookup.fieldLookupProvider; } - private SearchLookup(SearchLookup searchLookup, SourceProvider sourceProvider, Set fieldChain) { - this.fieldChain = Collections.unmodifiableSet(fieldChain); - this.sourceProvider = sourceProvider; - this.fieldTypeLookup = searchLookup.fieldTypeLookup; - this.fieldDataLookup = searchLookup.fieldDataLookup; - this.fieldLookupProvider = searchLookup.fieldLookupProvider; - } - /** * Creates a copy of the current {@link SearchLookup} that looks fields up in the same way, but also tracks field references * in order to detect cycles and prevent resolving fields that depend on more than {@link #MAX_FIELD_CHAIN_DEPTH} other fields. @@ -153,7 +145,4 @@ public Source getSource(LeafReaderContext ctx, int doc) throws IOException { return sourceProvider.getSource(ctx, doc); } - public SearchLookup swapSourceProvider(SourceProvider sourceProvider) { - return new SearchLookup(this, sourceProvider, fieldChain); - } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java index 11a599a15662f..36e2989461290 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java @@ -33,7 +33,6 @@ import org.elasticsearch.index.mapper.FieldNamesFieldMapper; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.NestedLookup; -import org.elasticsearch.index.mapper.SourceFieldMapper; import org.elasticsearch.index.mapper.SourceLoader; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; @@ -342,16 +341,7 @@ public MappedFieldType.FieldExtractPreference fieldExtractPreference() { @Override public SearchLookup lookup() { - boolean syntheticSource = SourceFieldMapper.isSynthetic(indexSettings()); - var searchLookup = ctx.lookup(); - if (syntheticSource) { - // in the context of scripts and when synthetic source is used the search lookup can't always be reused between - // users of SearchLookup. This is only an issue when scripts fallback to _source, but since we can't always - // accurately determine whether a script uses _source, we should do this for all script usages. - // This lookup() method is only invoked for scripts / runtime fields, so it is ok to do here. - searchLookup = searchLookup.swapSourceProvider(ctx.createSourceProvider()); - } - return searchLookup; + return ctx.lookup(); } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ReinitializingSourceProvider.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ReinitializingSourceProvider.java index 8dee3478b3b64..61ac67674b252 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ReinitializingSourceProvider.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ReinitializingSourceProvider.java @@ -28,10 +28,6 @@ final class ReinitializingSourceProvider implements SourceProvider { private PerThreadSourceProvider perThreadProvider; private final Supplier sourceProviderFactory; - // Keeping track of last seen doc and if current doc is before last seen doc then source provider is initialized: - // (when source mode is synthetic then _source is read from doc values and doc values don't support going backwards) - private int lastSeenDocId; - ReinitializingSourceProvider(Supplier sourceProviderFactory) { this.sourceProviderFactory = sourceProviderFactory; } @@ -40,15 +36,25 @@ final class ReinitializingSourceProvider implements SourceProvider { public Source getSource(LeafReaderContext ctx, int doc) throws IOException { var currentThread = Thread.currentThread(); PerThreadSourceProvider provider = perThreadProvider; - if (provider == null || provider.creatingThread != currentThread || doc < lastSeenDocId) { + if (provider == null || provider.creatingThread != currentThread || doc < provider.lastSeenDocId) { provider = new PerThreadSourceProvider(sourceProviderFactory.get(), currentThread); this.perThreadProvider = provider; } - lastSeenDocId = doc; + provider.lastSeenDocId = doc; return provider.source.getSource(ctx, doc); } - private record PerThreadSourceProvider(SourceProvider source, Thread creatingThread) { + private static final class PerThreadSourceProvider { + final SourceProvider source; + final Thread creatingThread; + // Keeping track of last seen doc and if current doc is before last seen doc then source provider is initialized: + // (when source mode is synthetic then _source is read from doc values and doc values don't support going backwards) + int lastSeenDocId; + + private PerThreadSourceProvider(SourceProvider source, Thread creatingThread) { + this.source = source; + this.creatingThread = creatingThread; + } } }