diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformAction.java index ba38938236637..6e3870da77210 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformAction.java @@ -36,7 +36,6 @@ import java.util.Map; import java.util.Objects; -import static org.elasticsearch.action.ValidateActions.addValidationError; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; public class PreviewTransformAction extends ActionType { @@ -94,12 +93,8 @@ public static Request fromXContent(final XContentParser parser) throws IOExcepti @Override public ActionRequestValidationException validate() { ActionRequestValidationException validationException = null; - if (config.getPivotConfig() != null) { - for (String failure : config.getPivotConfig().aggFieldValidation()) { - validationException = addValidationError(failure, validationException); - } - } + validationException = config.validate(validationException); validationException = SourceDestValidator.validateRequest( validationException, config.getDestination() != null ? config.getDestination().getIndex() : null diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/PutTransformAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/PutTransformAction.java index 5199ca0601e6a..1a239330996ba 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/PutTransformAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/PutTransformAction.java @@ -69,20 +69,8 @@ public static Request fromXContent(final XContentParser parser, final String id, @Override public ActionRequestValidationException validate() { ActionRequestValidationException validationException = null; - if (config.getPivotConfig() != null - && config.getPivotConfig().getMaxPageSearchSize() != null - && (config.getPivotConfig().getMaxPageSearchSize() < 10 || config.getPivotConfig().getMaxPageSearchSize() > 10_000)) { - validationException = addValidationError( - "pivot.max_page_search_size [" - + config.getPivotConfig().getMaxPageSearchSize() - + "] must be greater than 10 and less than 10,000", - validationException - ); - } - for (String failure : config.getPivotConfig().aggFieldValidation()) { - validationException = addValidationError(failure, validationException); - } + validationException = config.validate(validationException); validationException = SourceDestValidator.validateRequest(validationException, config.getDestination().getIndex()); if (TransformStrings.isValidId(config.getId()) == false) { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/SettingsConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/SettingsConfig.java index 6a37b149c9d7d..56807f7ea6a1b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/SettingsConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/SettingsConfig.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.core.transform.transforms; +import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -18,6 +19,7 @@ import java.io.IOException; import java.util.Objects; +import static org.elasticsearch.action.ValidateActions.addValidationError; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; public class SettingsConfig implements Writeable, ToXContentObject { @@ -63,6 +65,18 @@ public Float getDocsPerSecond() { return docsPerSecond; } + public ActionRequestValidationException validate(ActionRequestValidationException validationException) { + // TODO: make this dependent on search.max_buckets + if (maxPageSearchSize != null && (maxPageSearchSize < 10 || maxPageSearchSize > 10_000)) { + validationException = addValidationError( + "settings.max_page_search_size [" + maxPageSearchSize + "] must be greater than 10 and less than 10,000", + validationException + ); + } + + return validationException; + } + public boolean isValid() { return true; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfig.java index 5970e33c212f3..ccad6f54cf3cb 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfig.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.core.transform.transforms; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.cluster.AbstractDiffable; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; @@ -292,6 +293,15 @@ public SettingsConfig getSettings() { return settings; } + public ActionRequestValidationException validate(ActionRequestValidationException validationException) { + if (pivotConfig != null) { + validationException = pivotConfig.validate(validationException); + } + validationException = settings.validate(validationException); + + return validationException; + } + public boolean isValid() { if (pivotConfig != null && pivotConfig.isValid() == false) { return false; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSource.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSource.java index 08277310e76a8..7a496bea2ede8 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSource.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSource.java @@ -17,8 +17,6 @@ import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; @@ -26,7 +24,6 @@ import java.time.ZoneId; import java.time.ZoneOffset; import java.util.Objects; -import java.util.Set; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; @@ -300,7 +297,7 @@ public ZoneId getTimeZone() { return timeZone; } - Rounding.Prepared getRounding() { + public Rounding.Prepared getRounding() { return rounding; } @@ -347,19 +344,6 @@ public int hashCode() { return Objects.hash(field, interval, timeZone); } - @Override - public QueryBuilder getIncrementalBucketUpdateFilterQuery( - Set changedBuckets, - String synchronizationField, - long synchronizationTimestamp - ) { - if (synchronizationField != null && synchronizationField.equals(field) && synchronizationTimestamp > 0) { - return new RangeQueryBuilder(field).gte(rounding.round(synchronizationTimestamp)).format("epoch_millis"); - } else { - return null; - } - } - @Override public boolean supportsIncrementalBucketUpdate() { return false; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/GeoTileGroupSource.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/GeoTileGroupSource.java index 0f50aa6cd05a3..32c988662253d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/GeoTileGroupSource.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/GeoTileGroupSource.java @@ -19,9 +19,7 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.geometry.Rectangle; import org.elasticsearch.index.mapper.GeoShapeFieldMapper; -import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.GeoBoundingBoxQueryBuilder; -import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileUtils; @@ -31,7 +29,6 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; -import java.util.Set; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; @@ -64,6 +61,7 @@ private static ConstructingObjectParser createParser(b ); return parser; } + private final Integer precision; private final GeoBoundingBox geoBoundingBox; @@ -106,23 +104,6 @@ public static GeoTileGroupSource fromXContent(final XContentParser parser, boole return lenient ? LENIENT_PARSER.apply(parser, null) : STRICT_PARSER.apply(parser, null); } - @Override - public QueryBuilder getIncrementalBucketUpdateFilterQuery( - Set changedBuckets, - String synchronizationField, - long synchronizationTimestamp - ) { - if (changedBuckets != null && changedBuckets.isEmpty() == false) { - BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); - changedBuckets.stream() - .map(GeoTileUtils::toBoundingBox) - .map(this::toGeoQuery) - .forEach(boolQueryBuilder::should); - return boolQueryBuilder; - } - return null; - } - @Override public boolean supportsIncrementalBucketUpdate() { return true; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/HistogramGroupSource.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/HistogramGroupSource.java index 4f62faa7cb09b..6cb65bea0756c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/HistogramGroupSource.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/HistogramGroupSource.java @@ -11,11 +11,9 @@ import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.index.query.QueryBuilder; import java.io.IOException; import java.util.Objects; -import java.util.Set; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; @@ -100,16 +98,6 @@ public int hashCode() { return Objects.hash(field, interval); } - @Override - public QueryBuilder getIncrementalBucketUpdateFilterQuery( - Set changedBuckets, - String synchronizationField, - long synchronizationTimestamp - ) { - // histograms are simple and cheap, so we skip this optimization - return null; - } - @Override public boolean supportsIncrementalBucketUpdate() { return false; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/PivotConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/PivotConfig.java index 427d3e2d16ac7..e5225cb4ab9f3 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/PivotConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/PivotConfig.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.core.transform.transforms.pivot; +import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -29,6 +30,7 @@ import java.util.Map.Entry; import java.util.Objects; +import static org.elasticsearch.action.ValidateActions.addValidationError; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; @@ -175,6 +177,21 @@ public boolean isValid() { return groups.isValid() && aggregationConfig.isValid(); } + public ActionRequestValidationException validate(ActionRequestValidationException validationException) { + if (maxPageSearchSize != null && (maxPageSearchSize < 10 || maxPageSearchSize > 10_000)) { + validationException = addValidationError( + "pivot.max_page_search_size [" + maxPageSearchSize + "] must be greater than 10 and less than 10,000", + validationException + ); + } + + for (String failure : aggFieldValidation()) { + validationException = addValidationError(failure, validationException); + } + + return validationException; + } + public List aggFieldValidation() { if ((aggregationConfig.isValid() && groups.isValid()) == false) { return Collections.emptyList(); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/SingleGroupSource.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/SingleGroupSource.java index 6f5fd1b8ecc28..c08b0e6c1d2b9 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/SingleGroupSource.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/SingleGroupSource.java @@ -16,12 +16,10 @@ import org.elasticsearch.common.xcontent.AbstractObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.index.query.QueryBuilder; import java.io.IOException; import java.util.Locale; import java.util.Objects; -import java.util.Set; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; @@ -120,12 +118,6 @@ public void writeTo(StreamOutput out) throws IOException { public abstract boolean supportsIncrementalBucketUpdate(); - public abstract QueryBuilder getIncrementalBucketUpdateFilterQuery( - Set changedBuckets, - String synchronizationField, - long synchronizationTimestamp - ); - public String getField() { return field; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/TermsGroupSource.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/TermsGroupSource.java index 17a173836cb95..a5657556d1bbf 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/TermsGroupSource.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/TermsGroupSource.java @@ -9,11 +9,8 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.index.query.TermsQueryBuilder; import java.io.IOException; -import java.util.Set; /* * A terms aggregation source for group_by @@ -53,18 +50,6 @@ public static TermsGroupSource fromXContent(final XContentParser parser, boolean return lenient ? LENIENT_PARSER.apply(parser, null) : STRICT_PARSER.apply(parser, null); } - @Override - public QueryBuilder getIncrementalBucketUpdateFilterQuery( - Set changedBuckets, - String synchronizationField, - long synchronizationTimestamp - ) { - if (changedBuckets != null && changedBuckets.isEmpty() == false) { - return new TermsQueryBuilder(field, changedBuckets); - } - return null; - } - @Override public boolean supportsIncrementalBucketUpdate() { return true; diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/GroupConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/GroupConfigTests.java index c1cba8f60215d..5cf4261a0ba2b 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/GroupConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/GroupConfigTests.java @@ -29,7 +29,7 @@ public class GroupConfigTests extends AbstractSerializingTestCase { // array of illegal characters, see {@link AggregatorFactories#VALID_AGG_NAME} - private static final char[] ILLEGAL_FIELD_NAME_CHARACTERS = {'[', ']', '>'}; + private static final char[] ILLEGAL_FIELD_NAME_CHARACTERS = { '[', ']', '>' }; public static GroupConfig randomGroupConfig() { Map source = new LinkedHashMap<>(); @@ -40,23 +40,24 @@ public static GroupConfig randomGroupConfig() { for (int i = 0; i < randomIntBetween(1, 20); ++i) { String targetFieldName = randomAlphaOfLengthBetween(1, 20); if (names.add(targetFieldName)) { - SingleGroupSource groupBy; + SingleGroupSource groupBy = null; Type type = randomFrom(SingleGroupSource.Type.values()); switch (type) { - case TERMS: - groupBy = TermsGroupSourceTests.randomTermsGroupSource(); - break; - case HISTOGRAM: - groupBy = HistogramGroupSourceTests.randomHistogramGroupSource(); - break; - case DATE_HISTOGRAM: - groupBy = DateHistogramGroupSourceTests.randomDateHistogramGroupSource(); - break; - case GEOTILE_GRID: - default: - groupBy = GeoTileGroupSourceTests.randomGeoTileGroupSource(); + case TERMS: + groupBy = TermsGroupSourceTests.randomTermsGroupSource(); + break; + case HISTOGRAM: + groupBy = HistogramGroupSourceTests.randomHistogramGroupSource(); + break; + case DATE_HISTOGRAM: + groupBy = DateHistogramGroupSourceTests.randomDateHistogramGroupSource(); + break; + case GEOTILE_GRID: + groupBy = GeoTileGroupSourceTests.randomGeoTileGroupSource(); + break; + default: + fail("unknown group source type, please implement tests and add support here"); } - source.put(targetFieldName, Collections.singletonMap(type.value(), getSource(groupBy))); groups.put(targetFieldName, groupBy); } @@ -97,18 +98,19 @@ public void testEmptyGroupBy() throws IOException { public void testInvalidGroupByNames() throws IOException { - String invalidName = randomAlphaOfLengthBetween(0, 5) - + ILLEGAL_FIELD_NAME_CHARACTERS[randomIntBetween(0, ILLEGAL_FIELD_NAME_CHARACTERS.length - 1)] - + randomAlphaOfLengthBetween(0, 5); + String invalidName = randomAlphaOfLengthBetween(0, 5) + ILLEGAL_FIELD_NAME_CHARACTERS[randomIntBetween( + 0, + ILLEGAL_FIELD_NAME_CHARACTERS.length - 1 + )] + randomAlphaOfLengthBetween(0, 5); XContentBuilder source = JsonXContent.contentBuilder() - .startObject() - .startObject(invalidName) - .startObject("terms") - .field("field", "user") - .endObject() - .endObject() - .endObject(); + .startObject() + .startObject(invalidName) + .startObject("terms") + .field("field", "user") + .endObject() + .endObject() + .endObject(); // lenient, passes but reports invalid try (XContentParser parser = createParser(source)) { diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformProgressIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformProgressIT.java index 7b15813ea7593..559bc510ceaca 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformProgressIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformProgressIT.java @@ -6,10 +6,13 @@ package org.elasticsearch.xpack.transform.integration; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; @@ -19,12 +22,13 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.xpack.core.transform.transforms.DestConfig; -import org.elasticsearch.xpack.core.transform.transforms.QueryConfig; import org.elasticsearch.xpack.core.transform.transforms.SourceConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformProgress; @@ -32,11 +36,15 @@ import org.elasticsearch.xpack.core.transform.transforms.pivot.GroupConfig; import org.elasticsearch.xpack.core.transform.transforms.pivot.HistogramGroupSource; import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfig; -import org.elasticsearch.xpack.transform.transforms.TransformProgressGatherer; +import org.elasticsearch.xpack.transform.transforms.Function; +import org.elasticsearch.xpack.transform.transforms.pivot.Pivot; import java.nio.charset.StandardCharsets; import java.util.Base64; import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.xpack.transform.integration.TransformRestTestCase.REVIEWS_INDEX_NAME; @@ -117,6 +125,7 @@ protected void createReviewsIndex() throws Exception { } public void testGetProgress() throws Exception { + String transformId = "get_progress_transform"; createReviewsIndex(); SourceConfig sourceConfig = new SourceConfig(REVIEWS_INDEX_NAME); DestConfig destConfig = new DestConfig("unnecessary", null); @@ -128,40 +137,20 @@ public void testGetProgress() throws Exception { aggs.addAggregator(AggregationBuilders.avg("avg_rating").field("stars")); AggregationConfig aggregationConfig = new AggregationConfig(Collections.emptyMap(), aggs); PivotConfig pivotConfig = new PivotConfig(histgramGroupConfig, aggregationConfig, null); - TransformConfig config = new TransformConfig( - "get_progress_transform", - sourceConfig, - destConfig, - null, - null, - null, - pivotConfig, - null, - null - ); + TransformConfig config = new TransformConfig(transformId, sourceConfig, destConfig, null, null, null, pivotConfig, null, null); - final RestHighLevelClient restClient = new TestRestHighLevelClient(); - SearchResponse response = restClient.search( - TransformProgressGatherer.getSearchRequest(config, config.getSource().getQueryConfig().getQuery()), - RequestOptions.DEFAULT - ); + Pivot pivot = new Pivot(pivotConfig, transformId); - TransformProgress progress = TransformProgressGatherer.searchResponseToTransformProgressFunction().apply(response); + TransformProgress progress = getProgress(pivot, getProgressQuery(pivot, config.getSource().getIndex(), null)); assertThat(progress.getTotalDocs(), equalTo(1000L)); assertThat(progress.getDocumentsProcessed(), equalTo(0L)); assertThat(progress.getPercentComplete(), equalTo(0.0)); - QueryConfig queryConfig = new QueryConfig(Collections.emptyMap(), QueryBuilders.termQuery("user_id", "user_26")); - pivotConfig = new PivotConfig(histgramGroupConfig, aggregationConfig, null); - sourceConfig = new SourceConfig(new String[] { REVIEWS_INDEX_NAME }, queryConfig); - config = new TransformConfig("get_progress_transform", sourceConfig, destConfig, null, null, null, pivotConfig, null, null); - - response = restClient.search( - TransformProgressGatherer.getSearchRequest(config, config.getSource().getQueryConfig().getQuery()), - RequestOptions.DEFAULT + progress = getProgress( + pivot, + getProgressQuery(pivot, config.getSource().getIndex(), QueryBuilders.termQuery("user_id", "user_26")) ); - progress = TransformProgressGatherer.searchResponseToTransformProgressFunction().apply(response); assertThat(progress.getTotalDocs(), equalTo(35L)); assertThat(progress.getDocumentsProcessed(), equalTo(0L)); @@ -172,13 +161,12 @@ public void testGetProgress() throws Exception { Collections.singletonMap("every_50", new HistogramGroupSource("missing_field", null, 50.0)) ); pivotConfig = new PivotConfig(histgramGroupConfig, aggregationConfig, null); - config = new TransformConfig("get_progress_transform", sourceConfig, destConfig, null, null, null, pivotConfig, null, null); + pivot = new Pivot(pivotConfig, transformId); - response = restClient.search( - TransformProgressGatherer.getSearchRequest(config, config.getSource().getQueryConfig().getQuery()), - RequestOptions.DEFAULT + progress = getProgress( + pivot, + getProgressQuery(pivot, config.getSource().getIndex(), QueryBuilders.termQuery("user_id", "user_26")) ); - progress = TransformProgressGatherer.searchResponseToTransformProgressFunction().apply(response); assertThat(progress.getTotalDocs(), equalTo(0L)); assertThat(progress.getDocumentsProcessed(), equalTo(0L)); @@ -194,9 +182,46 @@ protected Settings restClientSettings() { return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", token).build(); } + private TransformProgress getProgress(Function function, SearchRequest searchRequest) throws Exception { + CountDownLatch latch = new CountDownLatch(1); + final AtomicReference progressHolder = new AtomicReference<>(); + final AtomicReference exceptionHolder = new AtomicReference<>(); + + try (RestHighLevelClient restClient = new TestRestHighLevelClient()) { + SearchResponse response = restClient.search(searchRequest, RequestOptions.DEFAULT); + + function.getInitialProgressFromResponse( + response, + new LatchedActionListener<>( + ActionListener.wrap(progressHolder::set, e -> { exceptionHolder.set(e); }), + latch + ) + ); + } + + assertTrue("timed out after 20s", latch.await(20, TimeUnit.SECONDS)); + if (exceptionHolder.get() != null) { + throw exceptionHolder.get(); + } + + return progressHolder.get(); + } + private class TestRestHighLevelClient extends RestHighLevelClient { TestRestHighLevelClient() { super(client(), restClient -> {}, Collections.emptyList()); } } + + private static SearchRequest getProgressQuery(Function function, String[] source, QueryBuilder query) { + SearchRequest searchRequest = new SearchRequest(source); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + function.buildSearchQueryForInitialProgress(searchSourceBuilder); + + if (query != null) { + searchSourceBuilder.query(QueryBuilders.boolQuery().filter(query).filter(searchSourceBuilder.query())); + } + searchRequest.allowPartialSearchResults(false).source(searchSourceBuilder); + return searchRequest; + } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java index f585eeebbef5e..5d8e3cd9ec4e2 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java @@ -9,12 +9,10 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.lucene.util.SetOnce; -import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ingest.SimulatePipelineAction; import org.elasticsearch.action.ingest.SimulatePipelineRequest; import org.elasticsearch.action.ingest.SimulatePipelineResponse; -import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.client.Client; @@ -35,9 +33,6 @@ import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.license.RemoteClusterLicenseChecker; import org.elasticsearch.license.XPackLicenseState; -import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.search.aggregations.Aggregations; -import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -45,15 +40,13 @@ import org.elasticsearch.xpack.core.XPackField; import org.elasticsearch.xpack.core.common.validation.SourceDestValidator; import org.elasticsearch.xpack.core.transform.TransformField; -import org.elasticsearch.xpack.core.transform.TransformMessages; import org.elasticsearch.xpack.core.transform.action.PreviewTransformAction; import org.elasticsearch.xpack.core.transform.transforms.SourceConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformDestIndexSettings; -import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats; import org.elasticsearch.xpack.transform.persistence.TransformIndex; -import org.elasticsearch.xpack.transform.transforms.pivot.AggregationResultUtils; -import org.elasticsearch.xpack.transform.transforms.pivot.Pivot; +import org.elasticsearch.xpack.transform.transforms.Function; +import org.elasticsearch.xpack.transform.transforms.FunctionFactory; import org.elasticsearch.xpack.transform.utils.SourceDestValidations; import java.time.Clock; @@ -64,7 +57,6 @@ import java.util.stream.Collectors; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; -import static org.elasticsearch.xpack.transform.transforms.TransformIndexer.COMPOSITE_AGGREGATION_NAME; public class TransportPreviewTransformAction extends HandledTransportAction< PreviewTransformAction.Request, @@ -146,39 +138,18 @@ protected void doExecute(Task task, PreviewTransformAction.Request request, Acti config.getDestination().getIndex(), SourceDestValidations.PREVIEW_VALIDATIONS, ActionListener.wrap(r -> { - - Pivot pivot = new Pivot(config.getPivotConfig()); - try { - pivot.validateConfig(); - } catch (ElasticsearchStatusException e) { - listener.onFailure( - new ElasticsearchStatusException( - TransformMessages.REST_PUT_TRANSFORM_FAILED_TO_VALIDATE_CONFIGURATION, - e.status(), - e - ) - ); - return; - } catch (Exception e) { - listener.onFailure( - new ElasticsearchStatusException( - TransformMessages.REST_PUT_TRANSFORM_FAILED_TO_VALIDATE_CONFIGURATION, - RestStatus.INTERNAL_SERVER_ERROR, - e - ) + // create the function for validation + final Function function = FunctionFactory.create(config); + function.validateConfig(ActionListener.wrap(functionValidationResponse -> { + getPreview( + config.getId(), // note: @link{PreviewTransformAction} sets an id, so this is never null + function, + config.getSource(), + config.getDestination().getPipeline(), + config.getDestination().getIndex(), + listener ); - return; - } - - getPreview( - config.getId(), // note: @link{PreviewTransformAction} sets an id, so this is never null - pivot, - config.getSource(), - config.getDestination().getPipeline(), - config.getDestination().getIndex(), - listener - ); - + }, listener::onFailure)); }, listener::onFailure) ); } @@ -186,7 +157,7 @@ protected void doExecute(Task task, PreviewTransformAction.Request request, Acti @SuppressWarnings("unchecked") private void getPreview( String transformId, - Pivot pivot, + Function function, SourceConfig source, String pipeline, String dest, @@ -203,77 +174,59 @@ private void getPreview( docs.add((Map) XContentMapValues.extractValue("doc._source", tempMap)); } } - TransformDestIndexSettings generateddestIndexSettings = TransformIndex.createTransformDestIndexSettings( + TransformDestIndexSettings generatedDestIndexSettings = TransformIndex.createTransformDestIndexSettings( mappings.get(), transformId, Clock.systemUTC() ); - listener.onResponse(new PreviewTransformAction.Response(docs, generateddestIndexSettings)); + listener.onResponse(new PreviewTransformAction.Response(docs, generatedDestIndexSettings)); }, listener::onFailure); - pivot.deduceMappings(client, source, ActionListener.wrap(deducedMappings -> { + function.deduceMappings(client, source, ActionListener.wrap(deducedMappings -> { mappings.set(deducedMappings); - ClientHelper.executeWithHeadersAsync( - threadPool.getThreadContext().getHeaders(), - ClientHelper.TRANSFORM_ORIGIN, + function.preview( client, - SearchAction.INSTANCE, - pivot.buildSearchRequest(source, null, NUMBER_OF_PREVIEW_BUCKETS), - ActionListener.wrap(r -> { - try { - final Aggregations aggregations = r.getAggregations(); - if (aggregations == null) { - listener.onFailure( - new ElasticsearchStatusException("Source indices have been deleted or closed.", RestStatus.BAD_REQUEST) - ); - return; - } - final CompositeAggregation agg = aggregations.get(COMPOSITE_AGGREGATION_NAME); - TransformIndexerStats stats = new TransformIndexerStats(); - // remove all internal fields - - if (pipeline == null) { - List> docs = pivot.extractResults(agg, deducedMappings, stats) - .peek(doc -> doc.keySet().removeIf(k -> k.startsWith("_"))) - .collect(Collectors.toList()); - - TransformDestIndexSettings generateddestIndexSettings = TransformIndex.createTransformDestIndexSettings( - mappings.get(), - transformId, - Clock.systemUTC() + threadPool.getThreadContext().getHeaders(), + source, + deducedMappings, + NUMBER_OF_PREVIEW_BUCKETS, + ActionListener.wrap(docs -> { + if (pipeline == null) { + TransformDestIndexSettings generatedDestIndexSettings = TransformIndex.createTransformDestIndexSettings( + mappings.get(), + transformId, + Clock.systemUTC() + ); + + listener.onResponse(new PreviewTransformAction.Response(docs, generatedDestIndexSettings)); + } else { + List> results = docs.stream().map(doc -> { + Map src = new HashMap<>(); + String id = (String) doc.get(TransformField.DOCUMENT_ID_FIELD); + src.put("_source", doc); + src.put("_id", id); + src.put("_index", dest); + return src; + }).collect(Collectors.toList()); + + try (XContentBuilder builder = jsonBuilder()) { + builder.startObject(); + builder.field("docs", results); + builder.endObject(); + var pipelineRequest = new SimulatePipelineRequest(BytesReference.bytes(builder), XContentType.JSON); + pipelineRequest.setId(pipeline); + ClientHelper.executeAsyncWithOrigin( + client, + ClientHelper.TRANSFORM_ORIGIN, + SimulatePipelineAction.INSTANCE, + pipelineRequest, + pipelineResponseActionListener ); - - listener.onResponse(new PreviewTransformAction.Response(docs, generateddestIndexSettings)); - } else { - List> results = pivot.extractResults(agg, deducedMappings, stats).map(doc -> { - Map src = new HashMap<>(); - String id = (String) doc.get(TransformField.DOCUMENT_ID_FIELD); - doc.keySet().removeIf(k -> k.startsWith("_")); - src.put("_source", doc); - src.put("_id", id); - src.put("_index", dest); - return src; - }).collect(Collectors.toList()); - try (XContentBuilder builder = jsonBuilder()) { - builder.startObject(); - builder.field("docs", results); - builder.endObject(); - var pipelineRequest = new SimulatePipelineRequest(BytesReference.bytes(builder), XContentType.JSON); - pipelineRequest.setId(pipeline); - ClientHelper.executeAsyncWithOrigin( - client, - ClientHelper.TRANSFORM_ORIGIN, - SimulatePipelineAction.INSTANCE, - pipelineRequest, - pipelineResponseActionListener - ); - } } - } catch (AggregationResultUtils.AggregationExtractionException extractionException) { - listener.onFailure(new ElasticsearchStatusException(extractionException.getMessage(), RestStatus.BAD_REQUEST)); } }, listener::onFailure) ); + }, listener::onFailure)); } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPutTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPutTransformAction.java index 66c6dd5f8e2f1..32f92b4d796e2 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPutTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPutTransformAction.java @@ -56,7 +56,8 @@ import org.elasticsearch.xpack.transform.TransformServices; import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; -import org.elasticsearch.xpack.transform.transforms.pivot.Pivot; +import org.elasticsearch.xpack.transform.transforms.Function; +import org.elasticsearch.xpack.transform.transforms.FunctionFactory; import org.elasticsearch.xpack.transform.utils.SourceDestValidations; import java.io.IOException; @@ -211,12 +212,7 @@ protected void masterOperation(Task task, Request request, ClusterState clusterS XPackPlugin.checkReadyForXPackCustomMetadata(clusterState); // set headers to run transform as calling user - Map filteredHeaders = threadPool.getThreadContext() - .getHeaders() - .entrySet() - .stream() - .filter(e -> ClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + Map filteredHeaders = ClientHelper.filterSecurityHeaders(threadPool.getThreadContext().getHeaders()); TransformConfig config = request.getConfig().setHeaders(filteredHeaders).setCreateTime(Instant.now()).setVersion(Version.CURRENT); @@ -288,7 +284,8 @@ private void handlePrivsResponse( private void putTransform(Request request, ActionListener listener) { final TransformConfig config = request.getConfig(); - final Pivot pivot = new Pivot(config.getPivotConfig()); + // create the function for validation + final Function function = FunctionFactory.create(config); // <3> Return to the listener ActionListener putTransformConfigurationListener = ActionListener.wrap(putTransformConfigurationResult -> { @@ -298,7 +295,7 @@ private void putTransform(Request request, ActionListener }, listener::onFailure); // <2> Put our transform - ActionListener pivotValidationListener = ActionListener.wrap( + ActionListener validationListener = ActionListener.wrap( validationResult -> transformConfigManager.putTransformConfiguration(config, putTransformConfigurationListener), validationException -> { if (validationException instanceof ElasticsearchStatusException) { @@ -321,38 +318,27 @@ private void putTransform(Request request, ActionListener } ); - try { - pivot.validateConfig(); - } catch (ElasticsearchStatusException e) { - listener.onFailure( - new ElasticsearchStatusException(TransformMessages.REST_PUT_TRANSFORM_FAILED_TO_VALIDATE_CONFIGURATION, e.status(), e) - ); - return; - } catch (Exception e) { - listener.onFailure( - new ElasticsearchStatusException( - TransformMessages.REST_PUT_TRANSFORM_FAILED_TO_VALIDATE_CONFIGURATION, - RestStatus.INTERNAL_SERVER_ERROR, - e - ) - ); - return; - } - - if (request.isDeferValidation()) { - pivotValidationListener.onResponse(true); - } else { - if (config.getDestination().getPipeline() != null) { - if (ingestService.getPipeline(config.getDestination().getPipeline()) == null) { - listener.onFailure(new ElasticsearchStatusException( - TransformMessages.getMessage(TransformMessages.PIPELINE_MISSING, config.getDestination().getPipeline()), - RestStatus.BAD_REQUEST - ) - ); - return; + function.validateConfig(ActionListener.wrap(r2 -> { + if (request.isDeferValidation()) { + validationListener.onResponse(true); + } else { + if (config.getDestination().getPipeline() != null) { + if (ingestService.getPipeline(config.getDestination().getPipeline()) == null) { + listener.onFailure( + new ElasticsearchStatusException( + TransformMessages.getMessage(TransformMessages.PIPELINE_MISSING, config.getDestination().getPipeline()), + RestStatus.BAD_REQUEST + ) + ); + return; + } + } + if (request.isDeferValidation()) { + validationListener.onResponse(true); + } else { + function.validateQuery(client, config.getSource(), validationListener); } } - pivot.validateQuery(client, config.getSource(), pivotValidationListener); - } + }, listener::onFailure)); } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java index 9466d66cd9d27..3095ad0e33703 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java @@ -52,7 +52,8 @@ import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; import org.elasticsearch.xpack.transform.persistence.TransformIndex; -import org.elasticsearch.xpack.transform.transforms.pivot.Pivot; +import org.elasticsearch.xpack.transform.transforms.Function; +import org.elasticsearch.xpack.transform.transforms.FunctionFactory; import org.elasticsearch.xpack.transform.utils.SourceDestValidations; import java.io.IOException; @@ -297,7 +298,7 @@ protected void masterOperation( private void createDestinationIndex(final TransformConfig config, final ActionListener listener) { - final Pivot pivot = new Pivot(config.getPivotConfig()); + final Function function = FunctionFactory.create(config); ActionListener> deduceMappingsListener = ActionListener.wrap(mappings -> { TransformDestIndexSettings generateddestIndexSettings = TransformIndex.createTransformDestIndexSettings( @@ -312,7 +313,7 @@ private void createDestinationIndex(final TransformConfig config, final ActionLi ) ); - pivot.deduceMappings(client, config.getSource(), deduceMappingsListener); + function.deduceMappings(client, config.getSource(), deduceMappingsListener); } @Override diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java index 9935076a10066..70549e2d822ae 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java @@ -62,8 +62,9 @@ import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex; import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; import org.elasticsearch.xpack.transform.persistence.TransformIndex; +import org.elasticsearch.xpack.transform.transforms.Function; +import org.elasticsearch.xpack.transform.transforms.FunctionFactory; import org.elasticsearch.xpack.transform.transforms.TransformTask; -import org.elasticsearch.xpack.transform.transforms.pivot.Pivot; import org.elasticsearch.xpack.transform.utils.SourceDestValidations; import java.time.Clock; @@ -181,12 +182,7 @@ protected void doExecute(Task task, Request request, ActionListener li return; } // set headers to run transform as calling user - Map filteredHeaders = threadPool.getThreadContext() - .getHeaders() - .entrySet() - .stream() - .filter(e -> ClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + Map filteredHeaders = ClientHelper.filterSecurityHeaders(threadPool.getThreadContext().getHeaders()); TransformConfigUpdate update = request.getUpdate(); update.setHeaders(filteredHeaders); @@ -266,7 +262,6 @@ protected Response newResponse( List taskOperationFailures, List failedNodeExceptions ) { - // there should be only 1 response, todo: check return tasks.get(0); } @@ -328,8 +323,7 @@ private void updateTransform( ClusterState clusterState, ActionListener listener ) { - - final Pivot pivot = new Pivot(config.getPivotConfig()); + final Function function = FunctionFactory.create(config); // <3> Return to the listener ActionListener putTransformConfigurationListener = ActionListener.wrap(putTransformConfigurationResult -> { @@ -358,7 +352,7 @@ private void updateTransform( ); // <1> Create destination index if necessary - ActionListener pivotValidationListener = ActionListener.wrap(validationResult -> { + ActionListener functionValidationListener = ActionListener.wrap(validationResult -> { String[] dest = indexNameExpressionResolver.concreteIndexNames( clusterState, IndicesOptions.lenientExpandOpen(), @@ -376,7 +370,7 @@ private void updateTransform( // we allow source indices to disappear. If the source and destination indices do not exist, don't do anything // the transform will just have to dynamically create the destination index without special mapping. && src.length > 0) { - createDestination(pivot, config, createDestinationListener); + createDestination(function, config, createDestinationListener); } else { createDestinationListener.onResponse(null); } @@ -400,33 +394,21 @@ private void updateTransform( } }); - try { - pivot.validateConfig(); - } catch (ElasticsearchStatusException e) { - listener.onFailure( - new ElasticsearchStatusException(TransformMessages.REST_PUT_TRANSFORM_FAILED_TO_VALIDATE_CONFIGURATION, e.status(), e) - ); - return; - } catch (Exception e) { - listener.onFailure( - new ElasticsearchStatusException( - TransformMessages.REST_PUT_TRANSFORM_FAILED_TO_VALIDATE_CONFIGURATION, - RestStatus.INTERNAL_SERVER_ERROR, - e - ) - ); - return; - } - - // <0> Validate the pivot if necessary - if (request.isDeferValidation()) { - pivotValidationListener.onResponse(true); - } else { - pivot.validateQuery(client, config.getSource(), pivotValidationListener); - } + function.validateConfig(ActionListener.wrap(r2 -> { + if (request.isDeferValidation()) { + functionValidationListener.onResponse(true); + } else { + // TODO: it seems we are not validating ingest pipelines, consider to share code with PUT + if (request.isDeferValidation()) { + functionValidationListener.onResponse(true); + } else { + function.validateQuery(client, config.getSource(), functionValidationListener); + } + } + }, listener::onFailure)); } - private void createDestination(Pivot pivot, TransformConfig config, ActionListener listener) { + private void createDestination(Function function, TransformConfig config, ActionListener listener) { ActionListener> deduceMappingsListener = ActionListener.wrap(mappings -> { TransformDestIndexSettings generateddestIndexSettings = TransformIndex.createTransformDestIndexSettings( mappings, @@ -446,7 +428,7 @@ private void createDestination(Pivot pivot, TransformConfig config, ActionListen ) ); - pivot.deduceMappings(client, config.getSource(), deduceMappingsListener); + function.deduceMappings(client, config.getSource(), deduceMappingsListener); } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java index 35651e49ae271..3c2a0d632d2f7 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java @@ -60,7 +60,6 @@ class ClientTransformIndexer extends TransformIndexer { String executorName, TransformConfigManager transformsConfigManager, CheckpointProvider checkpointProvider, - TransformProgressGatherer progressGatherer, AtomicReference initialState, TransformIndexerPosition initialPosition, Client client, @@ -80,7 +79,6 @@ class ClientTransformIndexer extends TransformIndexer { executorName, transformsConfigManager, checkpointProvider, - progressGatherer, auditor, transformConfig, fieldMappings, @@ -225,6 +223,18 @@ protected void doNextBulk(BulkRequest request, ActionListener next ); } + @Override + void doGetInitialProgress(SearchRequest request, ActionListener responseListener) { + ClientHelper.executeWithHeadersAsync( + transformConfig.getHeaders(), + ClientHelper.TRANSFORM_ORIGIN, + client, + SearchAction.INSTANCE, + request, + responseListener + ); + } + @Override protected void doSaveState(IndexerState indexerState, TransformIndexerPosition position, Runnable next) { if (context.getTaskState() == TransformTaskState.FAILED) { diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java index c3739dfd7619a..221da065c40e0 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java @@ -51,7 +51,6 @@ ClientTransformIndexer build(ThreadPool threadPool, String executorName, Transfo executorName, transformsConfigManager, checkpointProvider, - new TransformProgressGatherer(parentTaskClient), new AtomicReference<>(this.indexerState), initialPosition, parentTaskClient, diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/Function.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/Function.java new file mode 100644 index 0000000000000..97d349ed251c7 --- /dev/null +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/Function.java @@ -0,0 +1,216 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.transform.transforms; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.xpack.core.transform.transforms.SourceConfig; +import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats; +import org.elasticsearch.xpack.core.transform.transforms.TransformProgress; + +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +/** + * Interface for transform functions (e.g. pivot) + * + * The function interface abstracts: + * - mapping deduction + * - data preview + * - validation + * - collection of changes (finding the minimal update) + * - querying the source index + * - processing search results in order to write them to dest + * - access to the cursor (for resilience and pausing a transform) + */ +public interface Function { + + /** + * Change collector + * + * The purpose of the change collector is minimizing the update required for continuous transforms. + * + * The change collector is stateful, changes are stored inside. For scaling the change collector has a + * cursor and can run in iterations. + * + * In a nutshell the algorithm works like this: + * 1. check and collect what needs to be updated, but only up to the page size limit + * 2. apply the collected changes as filter query and search/process them + * 3. in case phase 1 could not collect all changes, move the collector cursor, collect changes and continue with step 2 + */ + public interface ChangeCollector { + + /** + * Build the search query to gather the changes between 2 checkpoints. + * + * @param searchSourceBuilder a searchsource builder instance + * @param position the position of the change collector + * @param pageSize the pageSize configured by the function, used as upper boundary, a lower page size might be used + * @return the searchSource, expanded with the relevant parts + */ + SearchSourceBuilder buildChangesQuery(SearchSourceBuilder searchSourceBuilder, Map position, int pageSize); + + /** + * Process the search response of the changes query and remember the changes. + * + * TODO: replace the boolean with a more descriptive enum. + * + * @param searchResponse the response after querying for changes + * @return true in case of no more changed buckets, false in case changes buckets have been collected + */ + boolean processSearchResponse(SearchResponse searchResponse); + + /** + * Build the filter query to narrow the result set given the previously collected changes. + * + * TODO: it might be useful to have the full checkpoint data. + * + * @param lastCheckpointTimestamp the timestamp of the last checkpoint + * @param nextcheckpointTimestamp the timestamp of the next (in progress) checkpoint + * @return a filter query, null in case of no filter + */ + QueryBuilder buildFilterQuery(long lastCheckpointTimestamp, long nextcheckpointTimestamp); + + /** + * Clear the internal state to free up memory. + */ + void clear(); + + /** + * Get the bucket position of the changes collector. + * + * @return the position, null in case the collector is exhausted + */ + Map getBucketPosition(); + } + + /** + * Deduce mappings based on the input mappings and the known configuration. + * + * @param client a client instance for querying the source mappings + * @param sourceConfig the source configuration + * @param listener listener to take the deduced mapping + */ + void deduceMappings(Client client, SourceConfig sourceConfig, ActionListener> listener); + + /** + * Create a preview of the function. + * + * @param client a client instance for querying + * @param headers headers to be used to query only for what the caller is allowed to + * @param sourceConfig the source configuration + * @param fieldTypeMap mapping of field types + * @param numberOfRows number of rows to produce for the preview + * @param listener listener that takes a list, where every entry corresponds to 1 row/doc in the preview + */ + void preview( + Client client, + Map headers, + SourceConfig sourceConfig, + Map fieldTypeMap, + int numberOfRows, + ActionListener>> listener + ); + + /** + * Get the search query for querying for initial (first checkpoint) progress + * + * @param searchSourceBuilder a searchsource builder instance + * @return the searchSource, expanded with the relevant parts + */ + SearchSourceBuilder buildSearchQueryForInitialProgress(SearchSourceBuilder searchSourceBuilder); + + /** + * Process the search response from progress search call and return progress information. + * + * @param response the search response + * @param progressListener listener that takes the progress information as call back + */ + void getInitialProgressFromResponse(SearchResponse response, ActionListener progressListener); + + /** + * Validate the configuration. + * + * @param listener the result listener + */ + void validateConfig(ActionListener listener); + + /** + * Runtime validation by querying the source and checking if source and config fit. + * + * @param client a client instance for querying the source + * @param sourceConfig the source configuration + * @param listener the result listener + */ + void validateQuery(Client client, SourceConfig sourceConfig, ActionListener listener); + + /** + * Create a change collector instance and return it + * + * @param synchronizationField the field used for synchronizing (continuous mode) + * @return a change collector instance + */ + ChangeCollector buildChangeCollector(String synchronizationField); + + /** + * Get the initial page size for this function. + * + * The page size is the main parameter for adjusting memory consumption. Memory consumption mainly depends on + * the page size, the type of aggregations and the data. As the page size is the number of buckets we return + * per page the page size is a multiplier for the costs of aggregating bucket. + * + * In future we might inspect the configuration and base the initial size on the aggregations used. + * + * @return the page size + */ + int getInitialPageSize(); + + /** + * Whether this function - given its configuration - supports incremental bucket update used in continuous mode. + * + * If so, the indexer uses the change collector to update the continuous transform. + * + * TODO: simplify and remove this method if possible + * + * @return true if incremental bucket update is supported + */ + boolean supportsIncrementalBucketUpdate(); + + /** + * Build the query for the next iteration + * + * @param searchSourceBuilder a searchsource builder instance + * @param position current position (cursor/page) + * @param pageSize the pageSize, defining how much data to request + * @return the searchSource, expanded with the relevant parts + */ + SearchSourceBuilder buildSearchQuery(SearchSourceBuilder searchSourceBuilder, Map position, int pageSize); + + /** + * Process the search response and return a stream of index requests as well as the cursor. + * + * @param searchResponse the search response + * @param destinationIndex the destination index + * @param destinationPipeline the destination pipeline + * @param fieldMappings field mappings for the destination + * @param stats a stats object to record/collect stats + * @return a tuple with the stream of index requests and the cursor + */ + Tuple, Map> processSearchResponse( + SearchResponse searchResponse, + String destinationIndex, + String destinationPipeline, + Map fieldMappings, + TransformIndexerStats stats + ); +} diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/FunctionFactory.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/FunctionFactory.java new file mode 100644 index 0000000000000..549f7b2793f0e --- /dev/null +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/FunctionFactory.java @@ -0,0 +1,32 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.transform.transforms; + +import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; +import org.elasticsearch.xpack.transform.transforms.pivot.Pivot; + +/** + * Factory for creating the runtime instance for a function given the configuration + */ +public final class FunctionFactory { + + private FunctionFactory() {} + + /** + * Creates the function instance given the transform configuration + * + * @param config the transform configuration + * @return the instance of the function + */ + public static Function create(TransformConfig config) { + if (config.getPivotConfig() != null) { + return new Pivot(config.getPivotConfig(), config.getId()); + } else { + throw new IllegalArgumentException("unknown transform function"); + } + } +} diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index d26902e8bf8dc..645581cad45ca 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -16,21 +16,17 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.breaker.CircuitBreakingException; -import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.script.ScriptException; -import org.elasticsearch.search.aggregations.Aggregations; -import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; -import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.indexing.IterationResult; -import org.elasticsearch.xpack.core.transform.TransformField; import org.elasticsearch.xpack.core.transform.TransformMessages; import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; @@ -43,22 +39,18 @@ import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider; import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; -import org.elasticsearch.xpack.transform.transforms.pivot.Pivot; +import org.elasticsearch.xpack.transform.transforms.Function.ChangeCollector; import org.elasticsearch.xpack.transform.utils.ExceptionRootCauseFinder; -import java.io.IOException; -import java.io.UncheckedIOException; import java.time.Instant; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; - public abstract class TransformIndexer extends AsyncTwoPhaseIndexer { /** @@ -66,15 +58,14 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer fieldMappings; - private Pivot pivot; + // the function of the transform, e.g. pivot + private Function function; + + // collects changes for continuous mode + private ChangeCollector changeCollector; + private volatile Integer initialConfiguredPageSize; private volatile int pageSize = 0; private long logEvery = 1; @@ -111,10 +106,6 @@ private enum RunState { private volatile String lastAuditedExceptionMessage = null; private volatile RunState runState; - // hold information for continuous mode (partial updates) - private volatile Map> changedBuckets = Collections.emptyMap(); - private volatile Map changedBucketsAfterKey; - private volatile long lastCheckpointCleanup = 0L; public TransformIndexer( @@ -122,7 +113,6 @@ public TransformIndexer( String executorName, TransformConfigManager transformsConfigManager, CheckpointProvider checkpointProvider, - TransformProgressGatherer progressGatherer, TransformAuditor auditor, TransformConfig transformConfig, Map fieldMappings, @@ -137,7 +127,6 @@ public TransformIndexer( super(threadPool, executorName, initialState, initialPosition, jobStats); this.transformsConfigManager = ExceptionsHelper.requireNonNull(transformsConfigManager, "transformsConfigManager"); this.checkpointProvider = ExceptionsHelper.requireNonNull(checkpointProvider, "checkpointProvider"); - this.progressGatherer = ExceptionsHelper.requireNonNull(progressGatherer, "progressGatherer"); this.auditor = ExceptionsHelper.requireNonNull(auditor, "auditor"); this.transformConfig = ExceptionsHelper.requireNonNull(transformConfig, "transformConfig"); this.fieldMappings = ExceptionsHelper.requireNonNull(fieldMappings, "fieldMappings"); @@ -147,13 +136,15 @@ public TransformIndexer( this.context = ExceptionsHelper.requireNonNull(context, "context"); // give runState a default - this.runState = RunState.APPLY_BUCKET_RESULTS; + this.runState = RunState.APPLY_RESULTS; if (transformConfig.getSettings() != null && transformConfig.getSettings().getDocsPerSecond() != null) { docsPerSecond = transformConfig.getSettings().getDocsPerSecond(); } } + abstract void doGetInitialProgress(SearchRequest request, ActionListener responseListener); + public int getPageSize() { return pageSize; } @@ -238,8 +229,6 @@ protected void onStart(long now, ActionListener listener) { ActionListener finalListener = ActionListener.wrap(r -> { try { - pivot = new Pivot(getConfig().getPivotConfig()); - // if we haven't set the page size yet, if it is set we might have reduced it after running into an out of memory if (pageSize == 0) { configurePageSize(getConfig().getSettings().getMaxPageSearchSize()); @@ -257,6 +246,8 @@ protected void onStart(long now, ActionListener listener) { // Since multiple checkpoints can be executed in the task while it is running on the same node, we need to gather // the progress here, and not in the executor. ActionListener updateConfigListener = ActionListener.wrap(updateConfigResponse -> { + initializeFunction(); + if (initialRun()) { createCheckpoint(ActionListener.wrap(cp -> { nextCheckpoint = cp; @@ -267,10 +258,28 @@ protected void onStart(long now, ActionListener listener) { finalListener.onResponse(null); return; } - progressGatherer.getInitialProgress(buildFilterQuery(), getConfig(), ActionListener.wrap(newProgress -> { - logger.trace("[{}] reset the progress from [{}] to [{}].", getJobId(), progress, newProgress); - progress = newProgress; - finalListener.onResponse(null); + + // get progress information + SearchRequest request = new SearchRequest(transformConfig.getSource().getIndex()); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + + function.buildSearchQueryForInitialProgress(searchSourceBuilder); + searchSourceBuilder.query(QueryBuilders.boolQuery().filter(buildFilterQuery()).filter(searchSourceBuilder.query())); + request.allowPartialSearchResults(false).source(searchSourceBuilder); + + doGetInitialProgress(request, ActionListener.wrap(response -> { + function.getInitialProgressFromResponse(response, ActionListener.wrap(newProgress -> { + logger.trace("[{}] reset the progress from [{}] to [{}].", getJobId(), progress, newProgress); + progress = newProgress; + finalListener.onResponse(null); + }, failure -> { + progress = null; + logger.warn( + new ParameterizedMessage("[{}] unable to load progress information for task.", getJobId()), + failure + ); + finalListener.onResponse(null); + })); }, failure -> { progress = null; logger.warn(new ParameterizedMessage("[{}] unable to load progress information for task.", getJobId()), failure); @@ -332,6 +341,15 @@ protected void onStart(long now, ActionListener listener) { } } + protected void initializeFunction() { + // create the function + function = FunctionFactory.create(getConfig()); + + if (isContinuous()) { + changeCollector = function.buildChangeCollector(getConfig().getSyncConfig().getField()); + } + } + protected boolean initialRun() { return getPosition() == null; } @@ -350,9 +368,11 @@ protected void onFinish(ActionListener listener) { } // reset the page size, so we do not memorize a low page size forever - pageSize = pivot.getInitialPageSize(); + pageSize = function.getInitialPageSize(); // reset the changed bucket to free memory - changedBuckets = Collections.emptyMap(); + if (isContinuous()) { + changeCollector.clear(); + } long checkpoint = context.getAndIncrementCheckpoint(); lastCheckpoint = getNextCheckpoint(); @@ -404,30 +424,11 @@ protected void onFinish(ActionListener listener) { @Override protected IterationResult doProcess(SearchResponse searchResponse) { - final Aggregations aggregations = searchResponse.getAggregations(); - // Treat this as a "we reached the end". - // This should only happen when all underlying indices have gone away. Consequently, there is no more data to read. - if (aggregations == null) { - logger.info( - "[{}] unexpected null aggregations in search response. " + "Source indices have been deleted or closed.", - getJobId() - ); - auditor.info( - getJobId(), - "Source indices have been deleted or closed. " - + "Please verify that these indices exist and are open [" - + Strings.arrayToCommaDelimitedString(getConfig().getSource().getIndex()) - + "]." - ); - return new IterationResult<>(Collections.emptyList(), null, true); - } - final CompositeAggregation agg = aggregations.get(COMPOSITE_AGGREGATION_NAME); - switch (runState) { - case APPLY_BUCKET_RESULTS: - return processBuckets(agg); + case APPLY_RESULTS: + return processBuckets(searchResponse); case IDENTIFY_CHANGES: - return processChangedBuckets(agg); + return processChangedBuckets(searchResponse); default: // Any other state is a bug, should not happen @@ -603,37 +604,45 @@ private void sourceHasChanged(ActionListener hasChangedListener) { })); } - private IterationResult processBuckets(final CompositeAggregation agg) { - // we reached the end - if (agg.getBuckets().isEmpty()) { - if (nextCheckpoint.getCheckpoint() == 1 || isContinuous() == false || pivot.supportsIncrementalBucketUpdate() == false) { + private IterationResult processBuckets(final SearchResponse searchResponse) { + long docsBeforeProcess = getStats().getNumDocuments(); + + Tuple, Map> indexRequestStreamAndCursor = function.processSearchResponse( + searchResponse, + getConfig().getDestination().getIndex(), + getConfig().getDestination().getPipeline(), + getFieldMappings(), + getStats() + ); + + if (indexRequestStreamAndCursor == null || indexRequestStreamAndCursor.v1() == null) { + if (nextCheckpoint.getCheckpoint() == 1 || isContinuous() == false || function.supportsIncrementalBucketUpdate() == false) { return new IterationResult<>(Collections.emptyList(), null, true); } // cleanup changed Buckets - changedBuckets = Collections.emptyMap(); + changeCollector.clear(); // reset the runState to fetch changed buckets runState = RunState.IDENTIFY_CHANGES; // advance the cursor for changed bucket detection - return new IterationResult<>(Collections.emptyList(), new TransformIndexerPosition(null, changedBucketsAfterKey), false); - + return new IterationResult<>( + Collections.emptyList(), + new TransformIndexerPosition(null, changeCollector.getBucketPosition()), + false + ); } - long docsBeforeProcess = getStats().getNumDocuments(); - + Stream indexRequestStream = indexRequestStreamAndCursor.v1(); TransformIndexerPosition oldPosition = getPosition(); TransformIndexerPosition newPosition = new TransformIndexerPosition( - agg.afterKey(), + indexRequestStreamAndCursor.v2(), oldPosition != null ? getPosition().getBucketsPosition() : null ); - IterationResult result = new IterationResult<>( - processBucketsToIndexRequests(agg).collect(Collectors.toList()), - newPosition, - agg.getBuckets().isEmpty() - ); + List indexRequests = indexRequestStream.collect(Collectors.toList()); + IterationResult result = new IterationResult<>(indexRequests, newPosition, indexRequests.isEmpty()); // NOTE: progress is also mutated in onFinish if (progress != null) { @@ -644,83 +653,26 @@ private IterationResult processBuckets(final Composite return result; } - private IterationResult processChangedBuckets(final CompositeAggregation agg) { - // initialize the map of changed buckets, the map might be empty if source do not require/implement - // changed bucket detection - changedBuckets = pivot.initialIncrementalBucketUpdateMap(); - - // reached the end? - if (agg.getBuckets().isEmpty()) { - // reset everything and return the end marker - changedBuckets = Collections.emptyMap(); - changedBucketsAfterKey = null; + private IterationResult processChangedBuckets(final SearchResponse searchResponse) { + if (changeCollector.processSearchResponse(searchResponse)) { + changeCollector.clear(); return new IterationResult<>(Collections.emptyList(), null, true); } - // else - - // collect all buckets that require the update - agg.getBuckets().stream().forEach(bucket -> { bucket.getKey().forEach((k, v) -> { changedBuckets.get(k).add(v.toString()); }); }); - - // remember the after key but do not store it in the state yet (in the failure we need to retrieve it again) - changedBucketsAfterKey = agg.afterKey(); // reset the runState to fetch the partial updates next - runState = RunState.APPLY_BUCKET_RESULTS; + runState = RunState.APPLY_RESULTS; return new IterationResult<>(Collections.emptyList(), getPosition(), false); } - /* - * Parses the result and creates a stream of indexable documents - * - * Implementation decisions: - * - * Extraction uses generic maps as intermediate exchange format in order to hook in ingest pipelines/processors - * in later versions, see {@link IngestDocument). - */ - private Stream processBucketsToIndexRequests(CompositeAggregation agg) { - final TransformConfig transformConfig = getConfig(); - String indexName = transformConfig.getDestination().getIndex(); - - return pivot.extractResults(agg, getFieldMappings(), getStats()).map(document -> { - String id = (String) document.get(TransformField.DOCUMENT_ID_FIELD); - - if (id == null) { - throw new RuntimeException("Expected a document id but got null."); - } - - XContentBuilder builder; - try { - builder = jsonBuilder(); - builder.startObject(); - for (Map.Entry value : document.entrySet()) { - // skip all internal fields - if (value.getKey().startsWith("_") == false) { - builder.field(value.getKey(), value.getValue()); - } - } - builder.endObject(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - - IndexRequest request = new IndexRequest(indexName).source(builder).id(id); - if (transformConfig.getDestination().getPipeline() != null) { - request.setPipeline(transformConfig.getDestination().getPipeline()); - } - return request; - }); - } - protected QueryBuilder buildFilterQuery() { assert nextCheckpoint != null; - QueryBuilder pivotQueryBuilder = getConfig().getSource().getQueryConfig().getQuery(); + QueryBuilder queryBuilder = getConfig().getSource().getQueryConfig().getQuery(); TransformConfig config = getConfig(); if (this.isContinuous()) { - - BoolQueryBuilder filteredQuery = new BoolQueryBuilder().filter(pivotQueryBuilder); + BoolQueryBuilder filteredQuery = new BoolQueryBuilder().filter(queryBuilder); if (lastCheckpoint != null) { filteredQuery.filter(config.getSyncConfig().getRangeQuery(lastCheckpoint, nextCheckpoint)); @@ -730,7 +682,7 @@ protected QueryBuilder buildFilterQuery() { return filteredQuery; } - return pivotQueryBuilder; + return queryBuilder; } @Override @@ -739,10 +691,10 @@ protected SearchRequest buildSearchRequest(long waitTimeInNanos) { SearchRequest searchRequest = new SearchRequest(getConfig().getSource().getIndex()).allowPartialSearchResults(false) .indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); - SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().size(0); + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); // .size(0); switch (runState) { - case APPLY_BUCKET_RESULTS: + case APPLY_RESULTS: buildUpdateQuery(sourceBuilder); break; case IDENTIFY_CHANGES: @@ -763,16 +715,15 @@ private SearchSourceBuilder buildChangedBucketsQuery(SearchSourceBuilder sourceB TransformIndexerPosition position = getPosition(); - CompositeAggregationBuilder changesAgg = pivot.buildIncrementalBucketUpdateAggregation(pageSize); - changesAgg.aggregateAfter(position != null ? position.getBucketsPosition() : null); - sourceBuilder.aggregation(changesAgg); + changeCollector.buildChangesQuery(sourceBuilder, position != null ? position.getBucketsPosition() : null, pageSize); - QueryBuilder pivotQueryBuilder = getConfig().getSource().getQueryConfig().getQuery(); + QueryBuilder queryBuilder = getConfig().getSource().getQueryConfig().getQuery(); TransformConfig config = getConfig(); - BoolQueryBuilder filteredQuery = new BoolQueryBuilder().filter(pivotQueryBuilder) + BoolQueryBuilder filteredQuery = new BoolQueryBuilder().filter(queryBuilder) .filter(config.getSyncConfig().getRangeQuery(lastCheckpoint, nextCheckpoint)); + // TODO: if buildChangesQuery changes the query it get overwritten sourceBuilder.query(filteredQuery); logger.trace("running changes query {}", sourceBuilder); @@ -782,29 +733,26 @@ private SearchSourceBuilder buildChangedBucketsQuery(SearchSourceBuilder sourceB private SearchSourceBuilder buildUpdateQuery(SearchSourceBuilder sourceBuilder) { TransformIndexerPosition position = getPosition(); - sourceBuilder.aggregation(pivot.buildAggregation(position != null ? position.getIndexerPosition() : null, pageSize)); TransformConfig config = getConfig(); + QueryBuilder queryBuilder = config.getSource().getQueryConfig().getQuery(); - QueryBuilder pivotQueryBuilder = config.getSource().getQueryConfig().getQuery(); + function.buildSearchQuery(sourceBuilder, position != null ? position.getIndexerPosition() : null, pageSize); // if its either the 1st run or not continuous, do not apply extra filters if (nextCheckpoint.getCheckpoint() == 1 || isContinuous() == false) { - sourceBuilder.query(pivotQueryBuilder); + sourceBuilder.query(queryBuilder); logger.trace("running query: {}", sourceBuilder); return sourceBuilder; } - BoolQueryBuilder filteredQuery = new BoolQueryBuilder().filter(pivotQueryBuilder) + BoolQueryBuilder filteredQuery = new BoolQueryBuilder().filter(queryBuilder) .filter(config.getSyncConfig().getRangeQuery(nextCheckpoint)); - QueryBuilder pivotFilter = pivot.filterBuckets( - changedBuckets, - config.getSyncConfig().getField(), - lastCheckpoint.getTimeUpperBound() - ); - if (pivotFilter != null) { - filteredQuery.filter(pivotFilter); + QueryBuilder filter = changeCollector.buildFilterQuery(lastCheckpoint.getTimeUpperBound(), nextCheckpoint.getTimeUpperBound()); + + if (filter != null) { + filteredQuery.filter(filter); } sourceBuilder.query(filteredQuery); @@ -902,12 +850,12 @@ protected boolean shouldAuditOnFinish(long completedCheckpoint) { private RunState determineRunStateAtStart() { // either 1st run or not a continuous transform if (nextCheckpoint.getCheckpoint() == 1 || isContinuous() == false) { - return RunState.APPLY_BUCKET_RESULTS; + return RunState.APPLY_RESULTS; } // if incremental update is not supported, do a normal run - if (pivot.supportsIncrementalBucketUpdate() == false) { - return RunState.APPLY_BUCKET_RESULTS; + if (function.supportsIncrementalBucketUpdate() == false) { + return RunState.APPLY_RESULTS; } // continuous mode: we need to get the changed buckets first @@ -921,7 +869,7 @@ private void configurePageSize(Integer newPageSize) { if (initialConfiguredPageSize != null && initialConfiguredPageSize > 0) { pageSize = initialConfiguredPageSize; } else { - pageSize = pivot.getInitialPageSize(); + pageSize = function.getInitialPageSize(); } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformProgressGatherer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformProgressGatherer.java deleted file mode 100644 index 42f6258e82fe6..0000000000000 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformProgressGatherer.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -package org.elasticsearch.xpack.transform.transforms; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.search.SearchAction; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.index.query.BoolQueryBuilder; -import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.elasticsearch.xpack.core.ClientHelper; -import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; -import org.elasticsearch.xpack.core.transform.transforms.TransformProgress; - -import java.util.function.Function; - -/** - * Utility class to gather the progress information for a given config and its cursor position - */ -public final class TransformProgressGatherer { - - private Client client; - - TransformProgressGatherer(Client client) { - this.client = client; - } - - /** - * This gathers the total docs given the config and search - * - * @param filterQuery The adapted filter that can optionally take into account checkpoint information - * @param config The transform config containing headers, source, pivot, etc. information - * @param progressListener The listener to notify when progress object has been created - */ - public void getInitialProgress(QueryBuilder filterQuery, TransformConfig config, ActionListener progressListener) { - SearchRequest request = getSearchRequest(config, filterQuery); - - ActionListener searchResponseActionListener = ActionListener.wrap( - searchResponse -> progressListener.onResponse(searchResponseToTransformProgressFunction().apply(searchResponse)), - progressListener::onFailure - ); - ClientHelper.executeWithHeadersAsync( - config.getHeaders(), - ClientHelper.TRANSFORM_ORIGIN, - client, - SearchAction.INSTANCE, - request, - searchResponseActionListener - ); - } - - public static SearchRequest getSearchRequest(TransformConfig config, QueryBuilder filteredQuery) { - SearchRequest request = new SearchRequest(config.getSource().getIndex()); - request.allowPartialSearchResults(false); - BoolQueryBuilder existsClauses = QueryBuilders.boolQuery(); - config.getPivotConfig() - .getGroupConfig() - .getGroups() - .values() - // TODO change once we allow missing_buckets - .forEach(src -> { - if (src.getField() != null) { - existsClauses.must(QueryBuilders.existsQuery(src.getField())); - } - }); - - request.source( - new SearchSourceBuilder().size(0) - .trackTotalHits(true) - .query(QueryBuilders.boolQuery().filter(filteredQuery).filter(existsClauses)) - ); - return request; - } - - public static Function searchResponseToTransformProgressFunction() { - return searchResponse -> searchResponse != null - ? new TransformProgress(searchResponse.getHits().getTotalHits().value, 0L, 0L) - : null; - } -} diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/CompositeBucketsChangeCollector.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/CompositeBucketsChangeCollector.java new file mode 100644 index 0000000000000..10a7f0432dc3d --- /dev/null +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/CompositeBucketsChangeCollector.java @@ -0,0 +1,416 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.transform.transforms.pivot; + +import org.apache.lucene.search.BooleanQuery; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.common.Rounding; +import org.elasticsearch.common.geo.GeoPoint; +import org.elasticsearch.geometry.Rectangle; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.GeoBoundingBoxQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.query.RangeQueryBuilder; +import org.elasticsearch.index.query.TermsQueryBuilder; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; +import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation.Bucket; +import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileUtils; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.xpack.core.transform.transforms.pivot.DateHistogramGroupSource; +import org.elasticsearch.xpack.core.transform.transforms.pivot.SingleGroupSource; +import org.elasticsearch.xpack.transform.transforms.Function.ChangeCollector; + +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +/** + * Utility class to collect bucket changes + */ +public class CompositeBucketsChangeCollector implements ChangeCollector { + + private final Map fieldCollectors; + private final CompositeAggregationBuilder compositeAggregation; + private Map afterKey = null; + + /** + * Collector for collecting changes from 1 group_by field. + * + * Every field collector instance is stateful and implements the query logic and result collection, + * but also stores the changes in their state. + */ + interface FieldCollector { + + /** + * Get the maximum page size supported by this field collector. + * + * Note: this page size is only about change collection, not the indexer page size. + * + * @return the maximum allowed page size, or Integer.MAX_VALUE for unlimited. + */ + int getMaxPageSize(); + + /** + * Allows the field collector to add aggregations to the changes query. + * + * @return aggregations specific for this field collector or null. + */ + AggregationBuilder aggregateChanges(); + + /** + * Collects the changes from the search response, e.g. stores the terms that have changed. + * + * @param buckets buckets from the search result. + * @return true if changes have been found and got collected, false otherwise. + */ + boolean collectChanges(Collection buckets); + + /** + * Apply the collected changes in the query that updates the transform destination. + * + * @param lastCheckpointTimestamp the last(complete) checkpoint timestamp + * @param nextcheckpointTimestamp the next(currently running) checkpoint timestamp. + * @return a querybuilder instance with added filters to narrow the search + */ + QueryBuilder filterByChanges(long lastCheckpointTimestamp, long nextcheckpointTimestamp); + + /** + * Clear the field collector, e.g. the changes to free up memory. + */ + void clear(); + } + + static class TermsFieldCollector implements FieldCollector { + + private final String sourceFieldName; + private final String targetFieldName; + private final Set changedTerms; + + TermsFieldCollector(final String sourceFieldName, final String targetFieldName) { + this.sourceFieldName = sourceFieldName; + this.targetFieldName = targetFieldName; + this.changedTerms = new HashSet<>(); + } + + @Override + public int getMaxPageSize() { + // TODO: based on index.max_terms_count, however this is per index, which we don't have access to here, + // because the page size is limit to 64k anyhow, return 64k + return 65536; + } + + @Override + public boolean collectChanges(Collection buckets) { + changedTerms.clear(); + + for (Bucket b : buckets) { + Object term = b.getKey().get(targetFieldName); + if (term != null) { + changedTerms.add(term.toString()); + } + } + + return true; + } + + @Override + public QueryBuilder filterByChanges(long lastCheckpointTimestamp, long nextcheckpointTimestamp) { + if (changedTerms.isEmpty() == false) { + return new TermsQueryBuilder(sourceFieldName, changedTerms); + } + return null; + } + + @Override + public void clear() { + changedTerms.clear(); + } + + @Override + public AggregationBuilder aggregateChanges() { + return null; + } + } + + static class DateHistogramFieldCollector implements FieldCollector { + + private final String sourceFieldName; + private final String targetFieldName; + private final boolean isSynchronizationField; + private final Rounding.Prepared rounding; + + DateHistogramFieldCollector( + final String sourceFieldName, + final String targetFieldName, + final Rounding.Prepared rounding, + final boolean isSynchronizationField + ) { + this.sourceFieldName = sourceFieldName; + this.targetFieldName = targetFieldName; + this.rounding = rounding; + this.isSynchronizationField = isSynchronizationField; + } + + @Override + public int getMaxPageSize() { + return Integer.MAX_VALUE; + } + + @Override + public boolean collectChanges(Collection buckets) { + // todo: implementation for isSynchronizationField == false + return false; + } + + @Override + public QueryBuilder filterByChanges(long lastCheckpointTimestamp, long nextcheckpointTimestamp) { + if (isSynchronizationField && lastCheckpointTimestamp > 0) { + return new RangeQueryBuilder(sourceFieldName).gte(rounding.round(lastCheckpointTimestamp)).format("epoch_millis"); + } + + // todo: implementation for isSynchronizationField == false + + return null; + } + + @Override + public void clear() {} + + @Override + public AggregationBuilder aggregateChanges() { + return null; + } + } + + static class HistogramFieldCollector implements FieldCollector { + + private final String sourceFieldName; + private final String targetFieldName; + + HistogramFieldCollector(final String sourceFieldName, final String targetFieldName) { + this.sourceFieldName = sourceFieldName; + this.targetFieldName = targetFieldName; + } + + @Override + public int getMaxPageSize() { + return Integer.MAX_VALUE; + } + + @Override + public boolean collectChanges(Collection buckets) { + return false; + } + + @Override + public QueryBuilder filterByChanges(long lastCheckpointTimestamp, long nextcheckpointTimestamp) { + return null; + } + + @Override + public void clear() {} + + @Override + public AggregationBuilder aggregateChanges() { + return null; + } + } + + static class GeoTileFieldCollector implements FieldCollector { + + private final String sourceFieldName; + private final String targetFieldName; + private final Set changedBuckets; + + GeoTileFieldCollector(final String sourceFieldName, final String targetFieldName) { + this.sourceFieldName = sourceFieldName; + this.targetFieldName = targetFieldName; + this.changedBuckets = new HashSet<>(); + } + + @Override + public int getMaxPageSize() { + // this collector is limited by indices.query.bool.max_clause_count, default 1024 + return BooleanQuery.getMaxClauseCount(); + } + + @Override + public boolean collectChanges(Collection buckets) { + changedBuckets.clear(); + + for (Bucket b : buckets) { + Object bucket = b.getKey().get(targetFieldName); + if (bucket != null) { + changedBuckets.add(bucket.toString()); + } + } + + return true; + } + + @Override + public QueryBuilder filterByChanges(long lastCheckpointTimestamp, long nextcheckpointTimestamp) { + if (changedBuckets != null && changedBuckets.isEmpty() == false) { + BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); + changedBuckets.stream().map(GeoTileUtils::toBoundingBox).map(this::toGeoQuery).forEach(boolQueryBuilder::should); + return boolQueryBuilder; + } + return null; + } + + @Override + public void clear() {} + + @Override + public AggregationBuilder aggregateChanges() { + return null; + } + + private GeoBoundingBoxQueryBuilder toGeoQuery(Rectangle rectangle) { + return QueryBuilders.geoBoundingBoxQuery(sourceFieldName) + .setCorners( + new GeoPoint(rectangle.getMaxLat(), rectangle.getMinLon()), + new GeoPoint(rectangle.getMinLat(), rectangle.getMaxLon()) + ); + } + } + + public CompositeBucketsChangeCollector(CompositeAggregationBuilder compositeAggregation, Map fieldCollectors) { + this.compositeAggregation = compositeAggregation; + this.fieldCollectors = fieldCollectors; + } + + @Override + public SearchSourceBuilder buildChangesQuery(SearchSourceBuilder sourceBuilder, Map position, int pageSize) { + + sourceBuilder.size(0); + for (FieldCollector fieldCollector : fieldCollectors.values()) { + AggregationBuilder aggregationForField = fieldCollector.aggregateChanges(); + + if (aggregationForField != null) { + sourceBuilder.aggregation(aggregationForField); + } + pageSize = Math.min(pageSize, fieldCollector.getMaxPageSize()); + } + + CompositeAggregationBuilder changesAgg = this.compositeAggregation; + changesAgg.size(pageSize).aggregateAfter(position); + sourceBuilder.aggregation(changesAgg); + + return sourceBuilder; + } + + @Override + public QueryBuilder buildFilterQuery(long lastCheckpointTimestamp, long nextcheckpointTimestamp) { + // shortcut for only 1 element + if (fieldCollectors.size() == 1) { + return fieldCollectors.values().iterator().next().filterByChanges(lastCheckpointTimestamp, nextcheckpointTimestamp); + } + + BoolQueryBuilder filteredQuery = new BoolQueryBuilder(); + + for (FieldCollector fieldCollector : fieldCollectors.values()) { + QueryBuilder filter = fieldCollector.filterByChanges(lastCheckpointTimestamp, nextcheckpointTimestamp); + if (filter != null) { + filteredQuery.filter(filter); + } + } + + return filteredQuery; + } + + @Override + public boolean processSearchResponse(final SearchResponse searchResponse) { + final Aggregations aggregations = searchResponse.getAggregations(); + if (aggregations == null) { + return true; + } + + final CompositeAggregation agg = aggregations.get(compositeAggregation.getName()); + + Collection buckets = agg.getBuckets(); + afterKey = agg.afterKey(); + + if (buckets.isEmpty()) { + return true; + } + + for (FieldCollector fieldCollector : fieldCollectors.values()) { + fieldCollector.collectChanges(buckets); + } + + return false; + } + + @Override + public void clear() { + fieldCollectors.forEach((k, c) -> c.clear()); + } + + @Override + public Map getBucketPosition() { + return afterKey; + } + + public static ChangeCollector buildChangeCollector( + CompositeAggregationBuilder compositeAggregationBuilder, + Map groups, + String synchronizationField + ) { + Map fieldCollectors = createFieldCollectors(groups, synchronizationField); + return new CompositeBucketsChangeCollector(compositeAggregationBuilder, fieldCollectors); + } + + static Map createFieldCollectors(Map groups, String synchronizationField) { + Map fieldCollectors = new HashMap<>(); + + for (Entry entry : groups.entrySet()) { + switch (entry.getValue().getType()) { + case TERMS: + fieldCollectors.put( + entry.getKey(), + new CompositeBucketsChangeCollector.TermsFieldCollector(entry.getValue().getField(), entry.getKey()) + ); + break; + case HISTOGRAM: + fieldCollectors.put( + entry.getKey(), + new CompositeBucketsChangeCollector.HistogramFieldCollector(entry.getValue().getField(), entry.getKey()) + ); + break; + case DATE_HISTOGRAM: + fieldCollectors.put( + entry.getKey(), + new CompositeBucketsChangeCollector.DateHistogramFieldCollector( + entry.getValue().getField(), + entry.getKey(), + ((DateHistogramGroupSource) entry.getValue()).getRounding(), + entry.getKey().equals(synchronizationField) + ) + ); + break; + case GEOTILE_GRID: + fieldCollectors.put( + entry.getKey(), + new CompositeBucketsChangeCollector.GeoTileFieldCollector(entry.getValue().getField(), entry.getKey()) + ); + break; + default: + throw new IllegalArgumentException("unknown type"); + } + } + return fieldCollectors; + } + +} diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java index 0e798733383a5..54afac5ea0f85 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java @@ -8,61 +8,71 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.PipelineAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.core.transform.TransformField; import org.elasticsearch.xpack.core.transform.TransformMessages; import org.elasticsearch.xpack.core.transform.transforms.SourceConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats; +import org.elasticsearch.xpack.core.transform.transforms.TransformProgress; import org.elasticsearch.xpack.core.transform.transforms.pivot.GroupConfig; import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfig; import org.elasticsearch.xpack.core.transform.transforms.pivot.SingleGroupSource; import org.elasticsearch.xpack.transform.Transform; +import org.elasticsearch.xpack.transform.transforms.Function; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Set; +import java.util.stream.Collectors; import java.util.stream.Stream; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; -public class Pivot { +public class Pivot implements Function { public static final int TEST_QUERY_PAGE_SIZE = 50; private static final String COMPOSITE_AGGREGATION_NAME = "_transform"; private static final Logger logger = LogManager.getLogger(Pivot.class); private final PivotConfig config; + private final String transformId; private final boolean supportsIncrementalBucketUpdate; // objects for re-using private final CompositeAggregationBuilder cachedCompositeAggregation; - public Pivot(PivotConfig config) { + public Pivot(PivotConfig config, String transformId) { this.config = config; + this.transformId = transformId; this.cachedCompositeAggregation = createCompositeAggregation(config); boolean supportsIncrementalBucketUpdate = false; @@ -73,14 +83,21 @@ public Pivot(PivotConfig config) { this.supportsIncrementalBucketUpdate = supportsIncrementalBucketUpdate; } - public void validateConfig() { + @Override + public void validateConfig(ActionListener listener) { for (AggregationBuilder agg : config.getAggregationConfig().getAggregatorFactories()) { - if (Aggregations.isSupportedByTransform(agg.getType()) == false) { - throw new ElasticsearchStatusException("Unsupported aggregation type [" + agg.getType() + "]", RestStatus.BAD_REQUEST); + if (TransformAggregations.isSupportedByTransform(agg.getType()) == false) { + // todo: change to ValidationException + listener.onFailure( + new ElasticsearchStatusException("Unsupported aggregation type [" + agg.getType() + "]", RestStatus.BAD_REQUEST) + ); + return; } } + listener.onResponse(true); } + @Override public void validateQuery(Client client, SourceConfig sourceConfig, final ActionListener listener) { SearchRequest searchRequest = buildSearchRequest(sourceConfig, null, TEST_QUERY_PAGE_SIZE); @@ -110,10 +127,51 @@ public void validateQuery(Client client, SourceConfig sourceConfig, final Action })); } + @Override public void deduceMappings(Client client, SourceConfig sourceConfig, final ActionListener> listener) { SchemaUtil.deduceMappings(client, config, sourceConfig.getIndex(), listener); } + @Override + public void preview( + Client client, + Map headers, + SourceConfig sourceConfig, + Map fieldTypeMap, + int numberOfBuckets, + ActionListener>> listener + ) { + ClientHelper.executeWithHeadersAsync( + headers, + ClientHelper.TRANSFORM_ORIGIN, + client, + SearchAction.INSTANCE, + buildSearchRequest(sourceConfig, null, numberOfBuckets), + ActionListener.wrap(r -> { + try { + final Aggregations aggregations = r.getAggregations(); + if (aggregations == null) { + listener.onFailure( + new ElasticsearchStatusException("Source indices have been deleted or closed.", RestStatus.BAD_REQUEST) + ); + return; + } + final CompositeAggregation agg = aggregations.get(COMPOSITE_AGGREGATION_NAME); + TransformIndexerStats stats = new TransformIndexerStats(); + // remove all internal fields + + List> docs = extractResults(agg, fieldTypeMap, stats).peek( + doc -> doc.keySet().removeIf(k -> k.startsWith("_")) + ).collect(Collectors.toList()); + + listener.onResponse(docs); + } catch (AggregationResultUtils.AggregationExtractionException extractionException) { + listener.onFailure(new ElasticsearchStatusException(extractionException.getMessage(), RestStatus.BAD_REQUEST)); + } + }, listener::onFailure) + ); + } + /** * Get the initial page size for this pivot. * @@ -128,6 +186,7 @@ public void deduceMappings(Client client, SourceConfig sourceConfig, final Actio * * @return the page size */ + @Override public int getInitialPageSize() { return config.getMaxPageSearchSize() == null ? Transform.DEFAULT_INITIAL_MAX_PAGE_SEARCH_SIZE : config.getMaxPageSearchSize(); } @@ -137,8 +196,7 @@ public SearchRequest buildSearchRequest(SourceConfig sourceConfig, Map position, int pageSize) { + @Override + public SearchSourceBuilder buildSearchQuery(SearchSourceBuilder builder, Map position, int pageSize) { cachedCompositeAggregation.aggregateAfter(position); cachedCompositeAggregation.size(pageSize); - return cachedCompositeAggregation; - } - - public CompositeAggregationBuilder buildIncrementalBucketUpdateAggregation(int pageSize) { - - CompositeAggregationBuilder compositeAgg = createCompositeAggregationSources(config, true); - compositeAgg.size(pageSize); - - return compositeAgg; + return builder.size(0).aggregation(cachedCompositeAggregation); } - public Map> initialIncrementalBucketUpdateMap() { - - Map> changedBuckets = new HashMap<>(); - for (Entry entry : config.getGroupConfig().getGroups().entrySet()) { - if (entry.getValue().supportsIncrementalBucketUpdate()) { - changedBuckets.put(entry.getKey(), new HashSet<>()); - } - } - - return changedBuckets; + @Override + public ChangeCollector buildChangeCollector(String synchronizationField) { + return CompositeBucketsChangeCollector.buildChangeCollector( + createCompositeAggregationSources(config, true), + config.getGroupConfig().getGroups(), + synchronizationField + ); } + @Override public boolean supportsIncrementalBucketUpdate() { return supportsIncrementalBucketUpdate; } @@ -197,34 +246,98 @@ public Stream> extractResults( ); } - public QueryBuilder filterBuckets( - Map> changedBuckets, - String synchronizationField, - long lastSynchronizationCheckpoint + @Override + public Tuple, Map> processSearchResponse( + final SearchResponse searchResponse, + final String destinationIndex, + final String destinationPipeline, + final Map fieldMappings, + final TransformIndexerStats stats ) { - assert changedBuckets != null; - - if (config.getGroupConfig().getGroups().size() == 1) { - Entry entry = config.getGroupConfig().getGroups().entrySet().iterator().next(); - logger.trace(() -> new ParameterizedMessage("filter by bucket: {}/{}", entry.getKey(), entry.getValue().getField())); - Set changedBucketsByGroup = changedBuckets.get(entry.getKey()); - return entry.getValue() - .getIncrementalBucketUpdateFilterQuery(changedBucketsByGroup, synchronizationField, lastSynchronizationCheckpoint); + final Aggregations aggregations = searchResponse.getAggregations(); + + // Treat this as a "we reached the end". + // This should only happen when all underlying indices have gone away. Consequently, there is no more data to read. + if (aggregations == null) { + return null; } - // else: more than 1 group by, need to nest it - BoolQueryBuilder filteredQuery = new BoolQueryBuilder(); - for (Entry entry : config.getGroupConfig().getGroups().entrySet()) { - Set changedBucketsByGroup = changedBuckets.get(entry.getKey()); - QueryBuilder sourceQueryFilter = entry.getValue() - .getIncrementalBucketUpdateFilterQuery(changedBucketsByGroup, synchronizationField, lastSynchronizationCheckpoint); - // the source might not define a filter optimization - if (sourceQueryFilter != null) { - filteredQuery.filter(sourceQueryFilter); - } + final CompositeAggregation compositeAgg = aggregations.get(COMPOSITE_AGGREGATION_NAME); + if (compositeAgg == null || compositeAgg.getBuckets().isEmpty()) { + return null; } - return filteredQuery; + return new Tuple<>( + processBucketsToIndexRequests(compositeAgg, destinationIndex, destinationPipeline, fieldMappings, stats), + compositeAgg.afterKey() + ); + } + + @Override + public SearchSourceBuilder buildSearchQueryForInitialProgress(SearchSourceBuilder searchSourceBuilder) { + BoolQueryBuilder existsClauses = QueryBuilders.boolQuery(); + + config.getGroupConfig() + .getGroups() + .values() + // TODO change once we allow missing_buckets + .forEach(src -> { + if (src.getField() != null) { + existsClauses.must(QueryBuilders.existsQuery(src.getField())); + } + }); + + return searchSourceBuilder.query(existsClauses).size(0).trackTotalHits(true); + } + + @Override + public void getInitialProgressFromResponse(SearchResponse response, ActionListener progressListener) { + progressListener.onResponse(new TransformProgress(response.getHits().getTotalHits().value, 0L, 0L)); + } + + /* + * Parses the result and creates a stream of indexable documents + * + * Implementation decisions: + * + * Extraction uses generic maps as intermediate exchange format in order to hook in ingest pipelines/processors + * in later versions, see {@link IngestDocument). + */ + private Stream processBucketsToIndexRequests( + CompositeAggregation agg, + String destinationIndex, + String destinationPipeline, + Map fieldMappings, + TransformIndexerStats stats + ) { + return extractResults(agg, fieldMappings, stats).map(document -> { + String id = (String) document.get(TransformField.DOCUMENT_ID_FIELD); + + if (id == null) { + throw new RuntimeException("Expected a document id but got null."); + } + + XContentBuilder builder; + try { + builder = jsonBuilder(); + builder.startObject(); + for (Map.Entry value : document.entrySet()) { + // skip all internal fields + if (value.getKey().startsWith("_") == false) { + builder.field(value.getKey(), value.getValue()); + } + } + builder.endObject(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + IndexRequest request = new IndexRequest(destinationIndex).source(builder).id(id); + if (destinationPipeline != null) { + request.setPipeline(destinationPipeline); + } + return request; + }); } private static CompositeAggregationBuilder createCompositeAggregation(PivotConfig config) { diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/SchemaUtil.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/SchemaUtil.java index 44d15a201dc88..32ad8fd704237 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/SchemaUtil.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/SchemaUtil.java @@ -75,18 +75,21 @@ public static void deduceMappings( config.getGroupConfig() .getGroups() - .forEach((destinationFieldName, group) -> { - // We will always need the field name for the grouping to create the mapping - fieldNamesForGrouping.put(destinationFieldName, group.getField()); - // Sometimes the group config will supply a desired mapping as well - if (group.getMappingType() != null) { - fieldTypesForGrouping.put(destinationFieldName, group.getMappingType()); + .forEach( + (destinationFieldName, group) -> { + // We will always need the field name for the grouping to create the mapping + fieldNamesForGrouping.put(destinationFieldName, group.getField()); + // Sometimes the group config will supply a desired mapping as well + if (group.getMappingType() != null) { + fieldTypesForGrouping.put(destinationFieldName, group.getMappingType()); + } } - }); - + ); for (AggregationBuilder agg : config.getAggregationConfig().getAggregatorFactories()) { - Tuple, Map> inputAndOutputTypes = Aggregations.getAggregationInputAndOutputTypes(agg); + Tuple, Map> inputAndOutputTypes = TransformAggregations.getAggregationInputAndOutputTypes( + agg + ); aggregationSourceFieldNames.putAll(inputAndOutputTypes.v1()); aggregationTypes.putAll(inputAndOutputTypes.v2()); } @@ -157,21 +160,25 @@ private static Map resolveMappings( aggregationTypes.forEach((targetFieldName, aggregationName) -> { String sourceFieldName = aggregationSourceFieldNames.get(targetFieldName); String sourceMapping = sourceFieldName == null ? null : sourceMappings.get(sourceFieldName); - String destinationMapping = Aggregations.resolveTargetMapping(aggregationName, sourceMapping); + String destinationMapping = TransformAggregations.resolveTargetMapping(aggregationName, sourceMapping); - logger.debug(() -> new ParameterizedMessage( - "Deduced mapping for: [{}], agg type [{}] to [{}]", - targetFieldName, - aggregationName, - destinationMapping - )); - - if (Aggregations.isDynamicMapping(destinationMapping)) { - logger.debug(() -> new ParameterizedMessage( - "Dynamic target mapping set for field [{}] and aggregation [{}]", + logger.debug( + () -> new ParameterizedMessage( + "Deduced mapping for: [{}], agg type [{}] to [{}]", targetFieldName, - aggregationName - )); + aggregationName, + destinationMapping + ) + ); + + if (TransformAggregations.isDynamicMapping(destinationMapping)) { + logger.debug( + () -> new ParameterizedMessage( + "Dynamic target mapping set for field [{}] and aggregation [{}]", + targetFieldName, + aggregationName + ) + ); } else if (destinationMapping != null) { targetMapping.put(targetFieldName, destinationMapping); } else { diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Aggregations.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/TransformAggregations.java similarity index 99% rename from x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Aggregations.java rename to x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/TransformAggregations.java index 9251941ee0b95..288366ad9fff8 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Aggregations.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/TransformAggregations.java @@ -23,7 +23,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -public final class Aggregations { +public final class TransformAggregations { // the field mapping should not explicitly be set and allow ES to dynamically determine mapping via the data. private static final String DYNAMIC = "_dynamic"; @@ -82,7 +82,7 @@ public final class Aggregations { "variable_width_histogram" // https://github.com/elastic/elasticsearch/issues/58140 ); - private Aggregations() {} + private TransformAggregations() {} /** * Supported aggregation by transform and corresponding meta information. diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java index 22ef2fc68e208..f2f81e1e83011 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java @@ -41,7 +41,6 @@ public void testAudiOnFinishFrequency() { ThreadPool.Names.GENERIC, mock(IndexBasedTransformConfigManager.class), mock(CheckpointProvider.class), - new TransformProgressGatherer(mock(Client.class)), new AtomicReference<>(IndexerState.STOPPED), null, mock(Client.class), diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java index 1e212349251fc..c5802ff3663b3 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java @@ -66,7 +66,6 @@ import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.matchesRegex; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.matches; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -91,7 +90,6 @@ class MockedTransformIndexer extends TransformIndexer { String executorName, IndexBasedTransformConfigManager transformsConfigManager, CheckpointProvider checkpointProvider, - TransformProgressGatherer progressGatherer, TransformConfig transformConfig, Map fieldMappings, TransformAuditor auditor, @@ -108,7 +106,6 @@ class MockedTransformIndexer extends TransformIndexer { executorName, transformsConfigManager, checkpointProvider, - progressGatherer, auditor, transformConfig, fieldMappings, @@ -125,6 +122,10 @@ class MockedTransformIndexer extends TransformIndexer { this.failureConsumer = failureConsumer; } + public void initialize() { + this.initializeFunction(); + } + public CountDownLatch newLatch(int count) { return latch = new CountDownLatch(count); } @@ -212,6 +213,31 @@ protected void failIndexer(String message) { } } + @Override + void doGetInitialProgress(SearchRequest request, ActionListener responseListener) { + responseListener.onResponse( + new SearchResponse( + new InternalSearchResponse( + new SearchHits(new SearchHit[0], new TotalHits(0L, TotalHits.Relation.EQUAL_TO), 0.0f), + // Simulate completely null aggs + null, + new Suggest(Collections.emptyList()), + new SearchProfileShardResults(Collections.emptyMap()), + false, + false, + 1 + ), + "", + 1, + 1, + 0, + 0, + ShardSearchFailure.EMPTY_ARRAY, + SearchResponse.Clusters.EMPTY + ) + ); + } + } @Before @@ -349,7 +375,6 @@ public void testDoProcessAggNullCheck() { assertThat(newPosition.getToIndex(), is(empty())); assertThat(newPosition.getPosition(), is(nullValue())); assertThat(newPosition.isDone(), is(true)); - verify(auditor, times(1)).info(anyString(), anyString()); } public void testScriptError() throws Exception { @@ -442,12 +467,11 @@ private MockedTransformIndexer createMockIndexer( TransformAuditor auditor, TransformContext context ) { - return new MockedTransformIndexer( + MockedTransformIndexer indexer = new MockedTransformIndexer( threadPool, executorName, mock(IndexBasedTransformConfigManager.class), mock(CheckpointProvider.class), - new TransformProgressGatherer(client), config, Collections.emptyMap(), auditor, @@ -459,6 +483,9 @@ private MockedTransformIndexer createMockIndexer( bulkFunction, failureConsumer ); + + indexer.initialize(); + return indexer; } } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/CompositeBucketsChangeCollectorTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/CompositeBucketsChangeCollectorTests.java new file mode 100644 index 0000000000000..5a24a542634e0 --- /dev/null +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/CompositeBucketsChangeCollectorTests.java @@ -0,0 +1,170 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.transform.transforms.pivot; + +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchResponseSections; +import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.TermsQueryBuilder; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; +import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.transform.transforms.pivot.GeoTileGroupSourceTests; +import org.elasticsearch.xpack.core.transform.transforms.pivot.GroupConfig; +import org.elasticsearch.xpack.core.transform.transforms.pivot.GroupConfigTests; +import org.elasticsearch.xpack.core.transform.transforms.pivot.HistogramGroupSourceTests; +import org.elasticsearch.xpack.core.transform.transforms.pivot.SingleGroupSource; +import org.elasticsearch.xpack.core.transform.transforms.pivot.TermsGroupSource; +import org.elasticsearch.xpack.core.transform.transforms.pivot.TermsGroupSourceTests; +import org.elasticsearch.xpack.transform.transforms.Function.ChangeCollector; +import org.elasticsearch.xpack.transform.transforms.pivot.CompositeBucketsChangeCollector.FieldCollector; +import org.mockito.stubbing.Answer; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class CompositeBucketsChangeCollectorTests extends ESTestCase { + + /** + * Simple unit tests to check that a field collector has been implemented for any single source type + */ + public void testCreateFieldCollector() { + GroupConfig groupConfig = GroupConfigTests.randomGroupConfig(); + + Map changeCollector = CompositeBucketsChangeCollector.createFieldCollectors( + groupConfig.getGroups(), + randomBoolean() ? randomAlphaOfLengthBetween(3, 10) : null + ); + + assertNotNull(changeCollector); + } + + public void testPageSize() throws IOException { + Map groups = new LinkedHashMap<>(); + + // a histogram group_by has no limits + SingleGroupSource histogramGroupBy = HistogramGroupSourceTests.randomHistogramGroupSource(); + groups.put("hist", histogramGroupBy); + + ChangeCollector collector = CompositeBucketsChangeCollector.buildChangeCollector(getCompositeAggregation(groups), groups, null); + collector = CompositeBucketsChangeCollector.buildChangeCollector(getCompositeAggregation(groups), groups, null); + assertEquals(1_000, getCompositeAggregationBuilder(collector.buildChangesQuery(new SearchSourceBuilder(), null, 1_000)).size()); + assertEquals(100_000, getCompositeAggregationBuilder(collector.buildChangesQuery(new SearchSourceBuilder(), null, 100_000)).size()); + assertEquals(10, getCompositeAggregationBuilder(collector.buildChangesQuery(new SearchSourceBuilder(), null, 10)).size()); + + // a terms group_by is limited by terms query + SingleGroupSource termsGroupBy = TermsGroupSourceTests.randomTermsGroupSource(); + groups.put("terms", termsGroupBy); + + collector = CompositeBucketsChangeCollector.buildChangeCollector(getCompositeAggregation(groups), groups, null); + assertEquals(1_000, getCompositeAggregationBuilder(collector.buildChangesQuery(new SearchSourceBuilder(), null, 1_000)).size()); + assertEquals(10_000, getCompositeAggregationBuilder(collector.buildChangesQuery(new SearchSourceBuilder(), null, 10_000)).size()); + assertEquals(10, getCompositeAggregationBuilder(collector.buildChangesQuery(new SearchSourceBuilder(), null, 10)).size()); + assertEquals(65536, getCompositeAggregationBuilder(collector.buildChangesQuery(new SearchSourceBuilder(), null, 100_000)).size()); + assertEquals( + 65536, + getCompositeAggregationBuilder(collector.buildChangesQuery(new SearchSourceBuilder(), null, Integer.MAX_VALUE)).size() + ); + + // a geo tile group_by is limited by query clauses + SingleGroupSource geoGroupBy = GeoTileGroupSourceTests.randomGeoTileGroupSource(); + groups.put("geo_tile", geoGroupBy); + + collector = CompositeBucketsChangeCollector.buildChangeCollector(getCompositeAggregation(groups), groups, null); + assertEquals(1_000, getCompositeAggregationBuilder(collector.buildChangesQuery(new SearchSourceBuilder(), null, 1_000)).size()); + assertEquals(1_024, getCompositeAggregationBuilder(collector.buildChangesQuery(new SearchSourceBuilder(), null, 10_000)).size()); + } + + public void testTermsFieldCollector() throws IOException { + Map groups = new LinkedHashMap<>(); + + // a terms group_by is limited by terms query + SingleGroupSource termsGroupBy = new TermsGroupSource("id", null); + groups.put("id", termsGroupBy); + + ChangeCollector collector = CompositeBucketsChangeCollector.buildChangeCollector(getCompositeAggregation(groups), groups, null); + + CompositeAggregation composite = mock(CompositeAggregation.class); + when(composite.getName()).thenReturn("_transform"); + when(composite.getBuckets()).thenAnswer((Answer>) invocationOnMock -> { + List compositeBuckets = new ArrayList<>(); + CompositeAggregation.Bucket bucket = mock(CompositeAggregation.Bucket.class); + when(bucket.getKey()).thenReturn(Collections.singletonMap("id", "id1")); + compositeBuckets.add(bucket); + + bucket = mock(CompositeAggregation.Bucket.class); + when(bucket.getKey()).thenReturn(Collections.singletonMap("id", "id2")); + compositeBuckets.add(bucket); + + bucket = mock(CompositeAggregation.Bucket.class); + when(bucket.getKey()).thenReturn(Collections.singletonMap("id", "id3")); + compositeBuckets.add(bucket); + + return compositeBuckets; + }); + Aggregations aggs = new Aggregations(Collections.singletonList(composite)); + + SearchResponseSections sections = new SearchResponseSections(null, aggs, null, false, null, null, 1); + SearchResponse response = new SearchResponse(sections, null, 1, 1, 0, 0, ShardSearchFailure.EMPTY_ARRAY, null); + + collector.processSearchResponse(response); + + QueryBuilder queryBuilder = collector.buildFilterQuery(0, 0); + assertNotNull(queryBuilder); + assertThat(queryBuilder, instanceOf(TermsQueryBuilder.class)); + assertThat(((TermsQueryBuilder) queryBuilder).values(), containsInAnyOrder("id1", "id2", "id3")); + } + + private static CompositeAggregationBuilder getCompositeAggregation(Map groups) throws IOException { + CompositeAggregationBuilder compositeAggregation; + try (XContentBuilder builder = jsonBuilder()) { + builder.startObject(); + builder.field(CompositeAggregationBuilder.SOURCES_FIELD_NAME.getPreferredName()); + builder.startArray(); + for (Entry groupBy : groups.entrySet()) { + builder.startObject(); + builder.startObject(groupBy.getKey()); + builder.field(groupBy.getValue().getType().value(), groupBy.getValue()); + builder.endObject(); + builder.endObject(); + } + builder.endArray(); + builder.endObject(); // sources + + XContentParser parser = builder.generator() + .contentType() + .xContent() + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, BytesReference.bytes(builder).streamInput()); + compositeAggregation = CompositeAggregationBuilder.PARSER.parse(parser, "_transform"); + } + + return compositeAggregation; + } + + private static CompositeAggregationBuilder getCompositeAggregationBuilder(SearchSourceBuilder builder) { + return (CompositeAggregationBuilder) builder.aggregations().getAggregatorFactories().iterator().next(); + } +} diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/PivotTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/PivotTests.java index 8c19e04d586d7..bdccea45eed14 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/PivotTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/PivotTests.java @@ -34,7 +34,8 @@ import org.elasticsearch.xpack.core.transform.transforms.pivot.GroupConfigTests; import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfig; import org.elasticsearch.xpack.transform.Transform; -import org.elasticsearch.xpack.transform.transforms.pivot.Aggregations.AggregationType; +import org.elasticsearch.xpack.transform.transforms.Function; +import org.elasticsearch.xpack.transform.transforms.pivot.TransformAggregations.AggregationType; import org.junit.After; import org.junit.Before; @@ -49,7 +50,9 @@ import java.util.stream.Stream; import static java.util.Collections.emptyList; +import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; @@ -90,14 +93,14 @@ protected NamedXContentRegistry xContentRegistry() { public void testValidateExistingIndex() throws Exception { SourceConfig source = new SourceConfig(new String[] { "existing_source_index" }, QueryConfig.matchAll()); - Pivot pivot = new Pivot(getValidPivotConfig()); + Function pivot = new Pivot(getValidPivotConfig(), randomAlphaOfLength(10)); assertValidTransform(client, source, pivot); } public void testValidateNonExistingIndex() throws Exception { SourceConfig source = new SourceConfig(new String[] { "non_existing_source_index" }, QueryConfig.matchAll()); - Pivot pivot = new Pivot(getValidPivotConfig()); + Function pivot = new Pivot(getValidPivotConfig(), randomAlphaOfLength(10)); assertInvalidTransform(client, source, pivot); } @@ -105,10 +108,16 @@ public void testValidateNonExistingIndex() throws Exception { public void testInitialPageSize() throws Exception { int expectedPageSize = 1000; - Pivot pivot = new Pivot(new PivotConfig(GroupConfigTests.randomGroupConfig(), getValidAggregationConfig(), expectedPageSize)); + Function pivot = new Pivot( + new PivotConfig(GroupConfigTests.randomGroupConfig(), getValidAggregationConfig(), expectedPageSize), + randomAlphaOfLength(10) + ); assertThat(pivot.getInitialPageSize(), equalTo(expectedPageSize)); - pivot = new Pivot(new PivotConfig(GroupConfigTests.randomGroupConfig(), getValidAggregationConfig(), null)); + pivot = new Pivot( + new PivotConfig(GroupConfigTests.randomGroupConfig(), getValidAggregationConfig(), null), + randomAlphaOfLength(10) + ); assertThat(pivot.getInitialPageSize(), equalTo(Transform.DEFAULT_INITIAL_MAX_PAGE_SEARCH_SIZE)); assertWarnings("[max_page_search_size] is deprecated inside pivot please use settings instead"); @@ -119,7 +128,7 @@ public void testSearchFailure() throws Exception { // search has failures although they might just be temporary SourceConfig source = new SourceConfig(new String[] { "existing_source_index_with_failing_shards" }, QueryConfig.matchAll()); - Pivot pivot = new Pivot(getValidPivotConfig()); + Function pivot = new Pivot(getValidPivotConfig(), randomAlphaOfLength(10)); assertInvalidTransform(client, source, pivot); } @@ -129,7 +138,7 @@ public void testValidateAllSupportedAggregations() throws Exception { AggregationConfig aggregationConfig = getAggregationConfig(agg); SourceConfig source = new SourceConfig(new String[] { "existing_source" }, QueryConfig.matchAll()); - Pivot pivot = new Pivot(getValidPivotConfig(aggregationConfig)); + Function pivot = new Pivot(getValidPivotConfig(aggregationConfig), randomAlphaOfLength(10)); assertValidTransform(client, source, pivot); } } @@ -138,9 +147,12 @@ public void testValidateAllUnsupportedAggregations() throws Exception { for (String agg : unsupportedAggregations) { AggregationConfig aggregationConfig = getAggregationConfig(agg); - Pivot pivot = new Pivot(getValidPivotConfig(aggregationConfig)); - ElasticsearchException ex = expectThrows(ElasticsearchException.class, pivot::validateConfig); - assertThat("expected aggregations to be unsupported, but they were", ex, is(notNullValue())); + Function pivot = new Pivot(getValidPivotConfig(aggregationConfig), randomAlphaOfLength(10)); + + pivot.validateConfig(ActionListener.wrap(r -> { fail("expected an exception but got a response"); }, e -> { + assertThat(e, anyOf(instanceOf(ElasticsearchException.class))); + assertThat("expected aggregations to be unsupported, but they were", e, is(notNullValue())); + })); } } @@ -273,15 +285,15 @@ private AggregationConfig parseAggregations(String json) throws IOException { return AggregationConfig.fromXContent(parser, false); } - private static void assertValidTransform(Client client, SourceConfig source, Pivot pivot) throws Exception { + private static void assertValidTransform(Client client, SourceConfig source, Function pivot) throws Exception { validate(client, source, pivot, true); } - private static void assertInvalidTransform(Client client, SourceConfig source, Pivot pivot) throws Exception { + private static void assertInvalidTransform(Client client, SourceConfig source, Function pivot) throws Exception { validate(client, source, pivot, false); } - private static void validate(Client client, SourceConfig source, Pivot pivot, boolean expectValid) throws Exception { + private static void validate(Client client, SourceConfig source, Function pivot, boolean expectValid) throws Exception { CountDownLatch latch = new CountDownLatch(1); final AtomicReference exceptionHolder = new AtomicReference<>(); pivot.validateQuery(client, source, ActionListener.wrap(validity -> { diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationsTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/TransformAggregationsTests.java similarity index 59% rename from x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationsTests.java rename to x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/TransformAggregationsTests.java index 9ebf7da105912..9ea6947d8270f 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationsTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/TransformAggregationsTests.java @@ -25,83 +25,83 @@ import java.util.Map; import java.util.stream.Collectors; -public class AggregationsTests extends ESTestCase { +public class TransformAggregationsTests extends ESTestCase { public void testResolveTargetMapping() { // avg - assertEquals("double", Aggregations.resolveTargetMapping("avg", "int")); - assertEquals("double", Aggregations.resolveTargetMapping("avg", "double")); + assertEquals("double", TransformAggregations.resolveTargetMapping("avg", "int")); + assertEquals("double", TransformAggregations.resolveTargetMapping("avg", "double")); // cardinality - assertEquals("long", Aggregations.resolveTargetMapping("cardinality", "int")); - assertEquals("long", Aggregations.resolveTargetMapping("cardinality", "double")); + assertEquals("long", TransformAggregations.resolveTargetMapping("cardinality", "int")); + assertEquals("long", TransformAggregations.resolveTargetMapping("cardinality", "double")); // value_count - assertEquals("long", Aggregations.resolveTargetMapping("value_count", "int")); - assertEquals("long", Aggregations.resolveTargetMapping("value_count", "double")); + assertEquals("long", TransformAggregations.resolveTargetMapping("value_count", "int")); + assertEquals("long", TransformAggregations.resolveTargetMapping("value_count", "double")); // max - assertEquals("int", Aggregations.resolveTargetMapping("max", "int")); - assertEquals("double", Aggregations.resolveTargetMapping("max", "double")); - assertEquals("half_float", Aggregations.resolveTargetMapping("max", "half_float")); - assertEquals("float", Aggregations.resolveTargetMapping("max", "scaled_float")); + assertEquals("int", TransformAggregations.resolveTargetMapping("max", "int")); + assertEquals("double", TransformAggregations.resolveTargetMapping("max", "double")); + assertEquals("half_float", TransformAggregations.resolveTargetMapping("max", "half_float")); + assertEquals("float", TransformAggregations.resolveTargetMapping("max", "scaled_float")); // min - assertEquals("int", Aggregations.resolveTargetMapping("min", "int")); - assertEquals("double", Aggregations.resolveTargetMapping("min", "double")); - assertEquals("half_float", Aggregations.resolveTargetMapping("min", "half_float")); - assertEquals("float", Aggregations.resolveTargetMapping("min", "scaled_float")); + assertEquals("int", TransformAggregations.resolveTargetMapping("min", "int")); + assertEquals("double", TransformAggregations.resolveTargetMapping("min", "double")); + assertEquals("half_float", TransformAggregations.resolveTargetMapping("min", "half_float")); + assertEquals("float", TransformAggregations.resolveTargetMapping("min", "scaled_float")); // sum - assertEquals("double", Aggregations.resolveTargetMapping("sum", "double")); - assertEquals("double", Aggregations.resolveTargetMapping("sum", "half_float")); - assertEquals("double", Aggregations.resolveTargetMapping("sum", null)); + assertEquals("double", TransformAggregations.resolveTargetMapping("sum", "double")); + assertEquals("double", TransformAggregations.resolveTargetMapping("sum", "half_float")); + assertEquals("double", TransformAggregations.resolveTargetMapping("sum", null)); // geo_centroid - assertEquals("geo_point", Aggregations.resolveTargetMapping("geo_centroid", "geo_point")); - assertEquals("geo_point", Aggregations.resolveTargetMapping("geo_centroid", null)); + assertEquals("geo_point", TransformAggregations.resolveTargetMapping("geo_centroid", "geo_point")); + assertEquals("geo_point", TransformAggregations.resolveTargetMapping("geo_centroid", null)); // geo_bounds - assertEquals("geo_shape", Aggregations.resolveTargetMapping("geo_bounds", "geo_shape")); - assertEquals("geo_shape", Aggregations.resolveTargetMapping("geo_bounds", null)); + assertEquals("geo_shape", TransformAggregations.resolveTargetMapping("geo_bounds", "geo_shape")); + assertEquals("geo_shape", TransformAggregations.resolveTargetMapping("geo_bounds", null)); // scripted_metric - assertEquals("_dynamic", Aggregations.resolveTargetMapping("scripted_metric", null)); - assertEquals("_dynamic", Aggregations.resolveTargetMapping("scripted_metric", "int")); + assertEquals("_dynamic", TransformAggregations.resolveTargetMapping("scripted_metric", null)); + assertEquals("_dynamic", TransformAggregations.resolveTargetMapping("scripted_metric", "int")); // bucket_script - assertEquals("_dynamic", Aggregations.resolveTargetMapping("bucket_script", null)); - assertEquals("_dynamic", Aggregations.resolveTargetMapping("bucket_script", "int")); + assertEquals("_dynamic", TransformAggregations.resolveTargetMapping("bucket_script", null)); + assertEquals("_dynamic", TransformAggregations.resolveTargetMapping("bucket_script", "int")); // bucket_selector - assertEquals("_dynamic", Aggregations.resolveTargetMapping("bucket_selector", null)); - assertEquals("_dynamic", Aggregations.resolveTargetMapping("bucket_selector", "int")); + assertEquals("_dynamic", TransformAggregations.resolveTargetMapping("bucket_selector", null)); + assertEquals("_dynamic", TransformAggregations.resolveTargetMapping("bucket_selector", "int")); // weighted_avg - assertEquals("_dynamic", Aggregations.resolveTargetMapping("weighted_avg", null)); - assertEquals("_dynamic", Aggregations.resolveTargetMapping("weighted_avg", "double")); + assertEquals("_dynamic", TransformAggregations.resolveTargetMapping("weighted_avg", null)); + assertEquals("_dynamic", TransformAggregations.resolveTargetMapping("weighted_avg", "double")); // percentile - assertEquals("double", Aggregations.resolveTargetMapping("percentiles", null)); - assertEquals("double", Aggregations.resolveTargetMapping("percentiles", "int")); + assertEquals("double", TransformAggregations.resolveTargetMapping("percentiles", null)); + assertEquals("double", TransformAggregations.resolveTargetMapping("percentiles", "int")); // filter - assertEquals("long", Aggregations.resolveTargetMapping("filter", null)); - assertEquals("long", Aggregations.resolveTargetMapping("filter", "long")); - assertEquals("long", Aggregations.resolveTargetMapping("filter", "double")); + assertEquals("long", TransformAggregations.resolveTargetMapping("filter", null)); + assertEquals("long", TransformAggregations.resolveTargetMapping("filter", "long")); + assertEquals("long", TransformAggregations.resolveTargetMapping("filter", "double")); // terms - assertEquals("flattened", Aggregations.resolveTargetMapping("terms", null)); - assertEquals("flattened", Aggregations.resolveTargetMapping("terms", "keyword")); - assertEquals("flattened", Aggregations.resolveTargetMapping("terms", "text")); + assertEquals("flattened", TransformAggregations.resolveTargetMapping("terms", null)); + assertEquals("flattened", TransformAggregations.resolveTargetMapping("terms", "keyword")); + assertEquals("flattened", TransformAggregations.resolveTargetMapping("terms", "text")); // rare_terms - assertEquals("flattened", Aggregations.resolveTargetMapping("rare_terms", null)); - assertEquals("flattened", Aggregations.resolveTargetMapping("rare_terms", "text")); - assertEquals("flattened", Aggregations.resolveTargetMapping("rare_terms", "keyword")); + assertEquals("flattened", TransformAggregations.resolveTargetMapping("rare_terms", null)); + assertEquals("flattened", TransformAggregations.resolveTargetMapping("rare_terms", "text")); + assertEquals("flattened", TransformAggregations.resolveTargetMapping("rare_terms", "keyword")); // corner case: source type null - assertEquals(null, Aggregations.resolveTargetMapping("min", null)); + assertEquals(null, TransformAggregations.resolveTargetMapping("min", null)); } public void testAggregationsVsTransforms() { @@ -122,10 +122,11 @@ public void testAggregationsVsTransforms() { + "please open an issue to add transform support for it. Afterwards add \"" + aggregationName + "\" to the list in " - + Aggregations.class.getName() + + TransformAggregations.class.getName() + ". Thanks!", - Aggregations.isSupportedByTransform(aggregationName) || Aggregations.isUnSupportedByTransform(aggregationName) + TransformAggregations.isSupportedByTransform(aggregationName) + || TransformAggregations.isUnSupportedByTransform(aggregationName) ); } } @@ -133,7 +134,7 @@ public void testAggregationsVsTransforms() { public void testGetAggregationOutputTypesPercentiles() { AggregationBuilder percentialAggregationBuilder = new PercentilesAggregationBuilder("percentiles", new double[] { 1, 5, 10 }, null); - Tuple, Map> inputAndOutputTypes = Aggregations.getAggregationInputAndOutputTypes( + Tuple, Map> inputAndOutputTypes = TransformAggregations.getAggregationInputAndOutputTypes( percentialAggregationBuilder ); assertTrue(inputAndOutputTypes.v1().isEmpty()); @@ -146,7 +147,7 @@ public void testGetAggregationOutputTypesPercentiles() { // note: using the constructor, omits validation, in reality this test might fail percentialAggregationBuilder = new PercentilesAggregationBuilder("percentiles", new double[] { 1, 5, 5, 10 }, null); - inputAndOutputTypes = Aggregations.getAggregationInputAndOutputTypes(percentialAggregationBuilder); + inputAndOutputTypes = TransformAggregations.getAggregationInputAndOutputTypes(percentialAggregationBuilder); assertTrue(inputAndOutputTypes.v1().isEmpty()); outputTypes = inputAndOutputTypes.v2(); @@ -159,7 +160,7 @@ public void testGetAggregationOutputTypesPercentiles() { public void testGetAggregationOutputTypesSubAggregations() { AggregationBuilder filterAggregationBuilder = new FilterAggregationBuilder("filter_1", new TermQueryBuilder("type", "cat")); - Tuple, Map> inputAndOutputTypes = Aggregations.getAggregationInputAndOutputTypes( + Tuple, Map> inputAndOutputTypes = TransformAggregations.getAggregationInputAndOutputTypes( filterAggregationBuilder ); assertTrue(inputAndOutputTypes.v1().isEmpty()); @@ -170,7 +171,7 @@ public void testGetAggregationOutputTypesSubAggregations() { AggregationBuilder subFilterAggregationBuilder = new FilterAggregationBuilder("filter_2", new TermQueryBuilder("subtype", "siam")); filterAggregationBuilder.subAggregation(subFilterAggregationBuilder); - inputAndOutputTypes = Aggregations.getAggregationInputAndOutputTypes(filterAggregationBuilder); + inputAndOutputTypes = TransformAggregations.getAggregationInputAndOutputTypes(filterAggregationBuilder); assertTrue(inputAndOutputTypes.v1().isEmpty()); outputTypes = inputAndOutputTypes.v2(); @@ -178,7 +179,7 @@ public void testGetAggregationOutputTypesSubAggregations() { assertEquals("filter", outputTypes.get("filter_1.filter_2")); filterAggregationBuilder.subAggregation(new MaxAggregationBuilder("max_2").field("max_field")); - inputAndOutputTypes = Aggregations.getAggregationInputAndOutputTypes(filterAggregationBuilder); + inputAndOutputTypes = TransformAggregations.getAggregationInputAndOutputTypes(filterAggregationBuilder); assertEquals(1, inputAndOutputTypes.v1().size()); Map inputTypes = inputAndOutputTypes.v1(); assertEquals("max_field", inputTypes.get("filter_1.max_2")); @@ -189,7 +190,7 @@ public void testGetAggregationOutputTypesSubAggregations() { assertEquals("max", outputTypes.get("filter_1.max_2")); subFilterAggregationBuilder.subAggregation(new FilterAggregationBuilder("filter_3", new TermQueryBuilder("color", "white"))); - inputAndOutputTypes = Aggregations.getAggregationInputAndOutputTypes(filterAggregationBuilder); + inputAndOutputTypes = TransformAggregations.getAggregationInputAndOutputTypes(filterAggregationBuilder); assertEquals(1, inputAndOutputTypes.v1().size()); outputTypes = inputAndOutputTypes.v2(); @@ -198,7 +199,7 @@ public void testGetAggregationOutputTypesSubAggregations() { assertEquals("max", outputTypes.get("filter_1.max_2")); subFilterAggregationBuilder.subAggregation(new MinAggregationBuilder("min_3").field("min_field")); - inputAndOutputTypes = Aggregations.getAggregationInputAndOutputTypes(filterAggregationBuilder); + inputAndOutputTypes = TransformAggregations.getAggregationInputAndOutputTypes(filterAggregationBuilder); assertEquals(2, inputAndOutputTypes.v1().size()); inputTypes = inputAndOutputTypes.v1(); assertEquals("max_field", inputTypes.get("filter_1.max_2")); @@ -213,7 +214,7 @@ public void testGetAggregationOutputTypesSubAggregations() { subFilterAggregationBuilder.subAggregation( new PercentilesAggregationBuilder("percentiles", new double[] { 33.3, 44.4, 88.8, 99.5 }, null) ); - inputAndOutputTypes = Aggregations.getAggregationInputAndOutputTypes(filterAggregationBuilder); + inputAndOutputTypes = TransformAggregations.getAggregationInputAndOutputTypes(filterAggregationBuilder); assertEquals(2, inputAndOutputTypes.v1().size()); outputTypes = inputAndOutputTypes.v2();