Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Transform] separate pivot and extract function interface #58744

Merged
merged 12 commits into from
Jul 14, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.util.Map;
import java.util.Objects;

import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;

public class PreviewTransformAction extends ActionType<PreviewTransformAction.Response> {
Expand Down Expand Up @@ -94,12 +93,8 @@ public static Request fromXContent(final XContentParser parser) throws IOExcepti
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (config.getPivotConfig() != null) {
for (String failure : config.getPivotConfig().aggFieldValidation()) {
validationException = addValidationError(failure, validationException);
}
}

validationException = config.validate(validationException);
validationException = SourceDestValidator.validateRequest(
validationException,
config.getDestination() != null ? config.getDestination().getIndex() : null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,20 +69,8 @@ public static Request fromXContent(final XContentParser parser, final String id,
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (config.getPivotConfig() != null
&& config.getPivotConfig().getMaxPageSearchSize() != null
&& (config.getPivotConfig().getMaxPageSearchSize() < 10 || config.getPivotConfig().getMaxPageSearchSize() > 10_000)) {
validationException = addValidationError(
"pivot.max_page_search_size ["
+ config.getPivotConfig().getMaxPageSearchSize()
+ "] must be greater than 10 and less than 10,000",
validationException
);
}
for (String failure : config.getPivotConfig().aggFieldValidation()) {
validationException = addValidationError(failure, validationException);
}

validationException = config.validate(validationException);
validationException = SourceDestValidator.validateRequest(validationException, config.getDestination().getIndex());

if (TransformStrings.isValidId(config.getId()) == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

package org.elasticsearch.xpack.core.transform.transforms;

import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand All @@ -18,6 +19,7 @@
import java.io.IOException;
import java.util.Objects;

import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;

public class SettingsConfig implements Writeable, ToXContentObject {
Expand Down Expand Up @@ -63,6 +65,18 @@ public Float getDocsPerSecond() {
return docsPerSecond;
}

public ActionRequestValidationException validate(ActionRequestValidationException validationException) {
// TODO: make this dependent on search.max_buckets
if (maxPageSearchSize != null && (maxPageSearchSize < 10 || maxPageSearchSize > 10_000)) {
validationException = addValidationError(
"settings.max_page_search_size [" + maxPageSearchSize + "] must be greater than 10 and less than 10,000",
validationException
);
}

return validationException;
}

public boolean isValid() {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package org.elasticsearch.xpack.core.transform.transforms;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
Expand Down Expand Up @@ -292,6 +293,15 @@ public SettingsConfig getSettings() {
return settings;
}

public ActionRequestValidationException validate(ActionRequestValidationException validationException) {
if (pivotConfig != null) {
validationException = pivotConfig.validate(validationException);
}
validationException = settings.validate(validationException);

return validationException;
}

public boolean isValid() {
if (pivotConfig != null && pivotConfig.isValid() == false) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,13 @@
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;

import java.io.IOException;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.Objects;
import java.util.Set;

import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;

Expand Down Expand Up @@ -300,7 +297,7 @@ public ZoneId getTimeZone() {
return timeZone;
}

Rounding.Prepared getRounding() {
public Rounding.Prepared getRounding() {
return rounding;
}

Expand Down Expand Up @@ -347,19 +344,6 @@ public int hashCode() {
return Objects.hash(field, interval, timeZone);
}

@Override
public QueryBuilder getIncrementalBucketUpdateFilterQuery(
Set<String> changedBuckets,
String synchronizationField,
long synchronizationTimestamp
) {
if (synchronizationField != null && synchronizationField.equals(field) && synchronizationTimestamp > 0) {
return new RangeQueryBuilder(field).gte(rounding.round(synchronizationTimestamp)).format("epoch_millis");
} else {
return null;
}
}

@Override
public boolean supportsIncrementalBucketUpdate() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.geometry.Rectangle;
import org.elasticsearch.index.mapper.GeoShapeFieldMapper;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.GeoBoundingBoxQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileUtils;

Expand All @@ -31,7 +29,6 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;

Expand Down Expand Up @@ -64,6 +61,7 @@ private static ConstructingObjectParser<GeoTileGroupSource, Void> createParser(b
);
return parser;
}

private final Integer precision;
private final GeoBoundingBox geoBoundingBox;

Expand Down Expand Up @@ -106,23 +104,6 @@ public static GeoTileGroupSource fromXContent(final XContentParser parser, boole
return lenient ? LENIENT_PARSER.apply(parser, null) : STRICT_PARSER.apply(parser, null);
}

@Override
public QueryBuilder getIncrementalBucketUpdateFilterQuery(
Set<String> changedBuckets,
String synchronizationField,
long synchronizationTimestamp
) {
if (changedBuckets != null && changedBuckets.isEmpty() == false) {
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
changedBuckets.stream()
.map(GeoTileUtils::toBoundingBox)
.map(this::toGeoQuery)
.forEach(boolQueryBuilder::should);
return boolQueryBuilder;
}
return null;
}

@Override
public boolean supportsIncrementalBucketUpdate() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,9 @@
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryBuilder;

import java.io.IOException;
import java.util.Objects;
import java.util.Set;

import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;

Expand Down Expand Up @@ -100,16 +98,6 @@ public int hashCode() {
return Objects.hash(field, interval);
}

@Override
public QueryBuilder getIncrementalBucketUpdateFilterQuery(
Set<String> changedBuckets,
String synchronizationField,
long synchronizationTimestamp
) {
// histograms are simple and cheap, so we skip this optimization
return null;
}

@Override
public boolean supportsIncrementalBucketUpdate() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

package org.elasticsearch.xpack.core.transform.transforms.pivot;

import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -29,6 +30,7 @@
import java.util.Map.Entry;
import java.util.Objects;

import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;

Expand Down Expand Up @@ -175,6 +177,21 @@ public boolean isValid() {
return groups.isValid() && aggregationConfig.isValid();
}

public ActionRequestValidationException validate(ActionRequestValidationException validationException) {
if (maxPageSearchSize != null && (maxPageSearchSize < 10 || maxPageSearchSize > 10_000)) {
validationException = addValidationError(
"pivot.max_page_search_size [" + maxPageSearchSize + "] must be greater than 10 and less than 10,000",
validationException
);
}

for (String failure : aggFieldValidation()) {
validationException = addValidationError(failure, validationException);
}

return validationException;
}

public List<String> aggFieldValidation() {
if ((aggregationConfig.isValid() && groups.isValid()) == false) {
return Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@
import org.elasticsearch.common.xcontent.AbstractObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.QueryBuilder;

import java.io.IOException;
import java.util.Locale;
import java.util.Objects;
import java.util.Set;

import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;

Expand Down Expand Up @@ -120,12 +118,6 @@ public void writeTo(StreamOutput out) throws IOException {

public abstract boolean supportsIncrementalBucketUpdate();

public abstract QueryBuilder getIncrementalBucketUpdateFilterQuery(
Set<String> changedBuckets,
String synchronizationField,
long synchronizationTimestamp
);

public String getField() {
return field;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,8 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.TermsQueryBuilder;

import java.io.IOException;
import java.util.Set;

/*
* A terms aggregation source for group_by
Expand Down Expand Up @@ -53,18 +50,6 @@ public static TermsGroupSource fromXContent(final XContentParser parser, boolean
return lenient ? LENIENT_PARSER.apply(parser, null) : STRICT_PARSER.apply(parser, null);
}

@Override
public QueryBuilder getIncrementalBucketUpdateFilterQuery(
Set<String> changedBuckets,
String synchronizationField,
long synchronizationTimestamp
) {
if (changedBuckets != null && changedBuckets.isEmpty() == false) {
return new TermsQueryBuilder(field, changedBuckets);
}
return null;
}

@Override
public boolean supportsIncrementalBucketUpdate() {
return true;
Expand Down
Loading