Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Option to configure default analysis types in SegmentMetadataQuery #4259

Merged
merged 23 commits into from
May 26, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,29 @@
package io.druid.query.metadata;

import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.query.metadata.metadata.SegmentMetadataQuery;
import org.joda.time.Period;
import org.joda.time.format.ISOPeriodFormat;
import org.joda.time.format.PeriodFormatter;

import java.util.EnumSet;

public class SegmentMetadataQueryConfig
{
private static final String DEFAULT_PERIOD_STRING = "P1W";
private static final PeriodFormatter ISO_FORMATTER = ISOPeriodFormat.standard();
static final EnumSet<SegmentMetadataQuery.AnalysisType> DEFAULT_ANALYSIS_TYPES = EnumSet.of(
SegmentMetadataQuery.AnalysisType.CARDINALITY,
SegmentMetadataQuery.AnalysisType.INTERVAL,
SegmentMetadataQuery.AnalysisType.MINMAX
);

@JsonProperty
private Period defaultHistory = ISO_FORMATTER.parsePeriod(DEFAULT_PERIOD_STRING);

@JsonProperty
private EnumSet<SegmentMetadataQuery.AnalysisType> defaultAnalysisType = DEFAULT_ANALYSIS_TYPES;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested defaultAnalysisTypes (update getter and setter names too)


public SegmentMetadataQueryConfig(String period)
{
defaultHistory = ISO_FORMATTER.parsePeriod(period);
Expand All @@ -46,4 +56,16 @@ public Period getDefaultHistory()
{
return defaultHistory;
}

public void setDefaultHistory(String period)
{
this.defaultHistory = ISO_FORMATTER.parsePeriod(period);
}

public EnumSet<SegmentMetadataQuery.AnalysisType> getDefaultAnalysisType() { return defaultAnalysisType; }

public void setDefaultAnalysisType(EnumSet<SegmentMetadataQuery.AnalysisType> defaultAnalysisType)
{
this.defaultAnalysisType = defaultAnalysisType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -188,7 +189,7 @@ public boolean isCacheable(SegmentMetadataQuery query, boolean willMergeRunners)
public byte[] computeCacheKey(SegmentMetadataQuery query)
{
byte[] includerBytes = query.getToInclude().getCacheKey();
byte[] analysisTypesBytes = query.getAnalysisTypesCacheKey();
byte[] analysisTypesBytes = getAnalysisTypesCacheKey(query);
return ByteBuffer.allocate(1 + includerBytes.length + analysisTypesBytes.length)
.put(SEGMENT_METADATA_CACHE_PREFIX)
.put(includerBytes)
Expand Down Expand Up @@ -404,4 +405,39 @@ public static SegmentAnalysis finalizeAnalysis(SegmentAnalysis analysis)
analysis.isRollup()
);
}

public EnumSet<SegmentMetadataQuery.AnalysisType> getAnalysisTypes(SegmentMetadataQuery query)
{
if (query.getAnalysisTypes() == null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested Objects.firstNonNull()

return config != null ? config.getDefaultAnalysisType() : SegmentMetadataQueryConfig.DEFAULT_ANALYSIS_TYPES;
} else {
return query.getAnalysisTypes();
}
}

public SegmentAnalyzer getSegmentAnalyzer(SegmentMetadataQuery query)
{
return new SegmentAnalyzer(getAnalysisTypes(query));
}

private byte[] getAnalysisTypesCacheKey(SegmentMetadataQuery query)
{
int size = 1;
final EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes = getAnalysisTypes(query);

final List<byte[]> typeBytesList = Lists.newArrayListWithExpectedSize(analysisTypes.size());
for (SegmentMetadataQuery.AnalysisType analysisType : analysisTypes) {
final byte[] bytes = analysisType.getCacheKey();
typeBytesList.add(bytes);
size += bytes.length;
}

final ByteBuffer bytes = ByteBuffer.allocate(size);
bytes.put(SegmentMetadataQuery.ANALYSIS_TYPES_CACHE_PREFIX);
for (byte[] typeBytes : typeBytesList) {
bytes.put(typeBytes);
}

return bytes.array();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,15 @@
import io.druid.query.metadata.metadata.ColumnIncluderator;
import io.druid.query.metadata.metadata.SegmentAnalysis;
import io.druid.query.metadata.metadata.SegmentMetadataQuery;
import io.druid.query.metadata.metadata.SegmentMetadataQuery.AnalysisType;
import io.druid.segment.Metadata;
import io.druid.segment.Segment;
import org.joda.time.Interval;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
Expand Down Expand Up @@ -86,7 +89,8 @@ public QueryRunner<SegmentAnalysis> createRunner(final Segment segment)
public Sequence<SegmentAnalysis> run(Query<SegmentAnalysis> inQ, Map<String, Object> responseContext)
{
SegmentMetadataQuery query = (SegmentMetadataQuery) inQ;
final SegmentAnalyzer analyzer = new SegmentAnalyzer(query.getAnalysisTypes());
final SegmentAnalyzer analyzer = toolChest.getSegmentAnalyzer(query);
final EnumSet<SegmentMetadataQuery.AnalysisType> analysisTypes = toolChest.getAnalysisTypes(query);
final Map<String, ColumnAnalysis> analyzedColumns = analyzer.analyze(segment);
final long numRows = analyzer.numRows(segment);
long totalSize = 0;
Expand All @@ -109,11 +113,16 @@ public Sequence<SegmentAnalysis> run(Query<SegmentAnalysis> inQ, Map<String, Obj
columns.put(columnName, column);
}
}
List<Interval> retIntervals = query.analyzingInterval() ? Arrays.asList(segment.getDataInterval()) : null;
List<Interval> retIntervals;
if (analysisTypes.contains(AnalysisType.INTERVAL)) {
retIntervals = Collections.singletonList(segment.getDataInterval());
} else {
retIntervals = null;
}

final Map<String, AggregatorFactory> aggregators;
Metadata metadata = null;
if (query.hasAggregators()) {
if (analysisTypes.contains(AnalysisType.AGGREGATORS)) {
metadata = segment.asStorageAdapter().getMetadata();
if (metadata != null && metadata.getAggregators() != null) {
aggregators = Maps.newHashMap();
Expand All @@ -128,7 +137,7 @@ public Sequence<SegmentAnalysis> run(Query<SegmentAnalysis> inQ, Map<String, Obj
}

final TimestampSpec timestampSpec;
if (query.hasTimestampSpec()) {
if (analysisTypes.contains(AnalysisType.TIMESTAMPSPEC)) {
if (metadata == null) {
metadata = segment.asStorageAdapter().getMetadata();
}
Expand All @@ -138,7 +147,7 @@ public Sequence<SegmentAnalysis> run(Query<SegmentAnalysis> inQ, Map<String, Obj
}

final Granularity queryGranularity;
if (query.hasQueryGranularity()) {
if (analysisTypes.contains(AnalysisType.QUERYGRANULARITY)) {
if (metadata == null) {
metadata = segment.asStorageAdapter().getMetadata();
}
Expand All @@ -148,7 +157,7 @@ public Sequence<SegmentAnalysis> run(Query<SegmentAnalysis> inQ, Map<String, Obj
}

Boolean rollup = null;
if (query.hasRollup()) {
if (analysisTypes.contains(AnalysisType.ROLLUP)) {
if (metadata == null) {
metadata = segment.asStorageAdapter().getMetadata();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import io.druid.common.utils.JodaUtils;
import io.druid.query.BaseQuery;
import io.druid.query.DataSource;
Expand All @@ -36,10 +35,8 @@
import io.druid.query.spec.QuerySegmentSpec;
import org.joda.time.Interval;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;

Expand Down Expand Up @@ -86,12 +83,6 @@ public byte[] getCacheKey()
JodaUtils.MIN_INSTANT, JodaUtils.MAX_INSTANT
);

public static final EnumSet<AnalysisType> DEFAULT_ANALYSIS_TYPES = EnumSet.of(
AnalysisType.CARDINALITY,
AnalysisType.INTERVAL,
AnalysisType.MINMAX
);

private final ColumnIncluderator toInclude;
private final boolean merge;
private final boolean usingDefaultInterval;
Expand Down Expand Up @@ -125,7 +116,7 @@ public SegmentMetadataQuery(
}
this.toInclude = toInclude == null ? new AllColumnIncluderator() : toInclude;
this.merge = merge == null ? false : merge;
this.analysisTypes = (analysisTypes == null) ? DEFAULT_ANALYSIS_TYPES : analysisTypes;
this.analysisTypes = analysisTypes;
Preconditions.checkArgument(
dataSource instanceof TableDataSource || dataSource instanceof UnionDataSource,
"SegmentMetadataQuery only supports table or union datasource"
Expand Down Expand Up @@ -181,56 +172,6 @@ public boolean isLenientAggregatorMerge()
return lenientAggregatorMerge;
}

public boolean analyzingInterval()
{
return analysisTypes.contains(AnalysisType.INTERVAL);
}

public boolean hasAggregators()
{
return analysisTypes.contains(AnalysisType.AGGREGATORS);
}

public boolean hasTimestampSpec()
{
return analysisTypes.contains(AnalysisType.TIMESTAMPSPEC);
}

public boolean hasQueryGranularity()
{
return analysisTypes.contains(AnalysisType.QUERYGRANULARITY);
}

public boolean hasRollup()
{
return analysisTypes.contains(AnalysisType.ROLLUP);
}

public boolean hasMinMax()
{
return analysisTypes.contains(AnalysisType.MINMAX);
}

public byte[] getAnalysisTypesCacheKey()
{
int size = 1;
List<byte[]> typeBytesList = Lists.newArrayListWithExpectedSize(analysisTypes.size());
for (AnalysisType analysisType : analysisTypes) {
final byte[] bytes = analysisType.getCacheKey();
typeBytesList.add(bytes);
size += bytes.length;
}

final ByteBuffer bytes = ByteBuffer.allocate(size);
bytes.put(ANALYSIS_TYPES_CACHE_PREFIX);
for (byte[] typeBytes : typeBytesList) {
bytes.put(typeBytes);
}

return bytes.array();
}


@Override
public Query<SegmentAnalysis> withOverriddenContext(Map<String, Object> contextOverride)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1100,4 +1100,40 @@ public void testCacheKeyWithListColumnIncluderator()

Assert.assertFalse(Arrays.equals(oneColumnQueryCacheKey, twoColumnQueryCacheKey));
}

@Test
public void testAnanlysisTypesBeingSet()
{

SegmentMetadataQuery query1 = Druids.newSegmentMetadataQueryBuilder()
.dataSource("testing")
.toInclude(new ListColumnIncluderator(Arrays.asList("foo")))
.build();

SegmentMetadataQuery query2 = Druids.newSegmentMetadataQueryBuilder()
.dataSource("testing")
.toInclude(new ListColumnIncluderator(Arrays.asList("foo")))
.analysisTypes(SegmentMetadataQuery.AnalysisType.MINMAX)
.build();

SegmentMetadataQueryConfig emptyCfg = new SegmentMetadataQueryConfig();
SegmentMetadataQueryConfig analysisCfg = new SegmentMetadataQueryConfig();
analysisCfg.setDefaultAnalysisType(EnumSet.of(SegmentMetadataQuery.AnalysisType.CARDINALITY));

EnumSet<SegmentMetadataQuery.AnalysisType> analysis1 = new SegmentMetadataQueryQueryToolChest(emptyCfg).getAnalysisTypes(query1);
EnumSet<SegmentMetadataQuery.AnalysisType> analysis2 = new SegmentMetadataQueryQueryToolChest(emptyCfg).getAnalysisTypes(query2);
EnumSet<SegmentMetadataQuery.AnalysisType> analysisWCfg1 = new SegmentMetadataQueryQueryToolChest(analysisCfg).getAnalysisTypes(query1);
EnumSet<SegmentMetadataQuery.AnalysisType> analysisWCfg2 = new SegmentMetadataQueryQueryToolChest(analysisCfg).getAnalysisTypes(query2);

EnumSet<SegmentMetadataQuery.AnalysisType> expectedAnalysis1 = SegmentMetadataQueryConfig.DEFAULT_ANALYSIS_TYPES;
EnumSet<SegmentMetadataQuery.AnalysisType> expectedAnalysis2 = EnumSet.of(SegmentMetadataQuery.AnalysisType.MINMAX);
EnumSet<SegmentMetadataQuery.AnalysisType> expectedAnalysisWCfg1 = EnumSet.of(SegmentMetadataQuery.AnalysisType.CARDINALITY);
EnumSet<SegmentMetadataQuery.AnalysisType> expectedAnalysisWCfg2 = EnumSet.of(SegmentMetadataQuery.AnalysisType.MINMAX);

Assert.assertEquals(analysis1, expectedAnalysis1);
Assert.assertEquals(analysis2, expectedAnalysis2);
Assert.assertEquals(analysisWCfg1, expectedAnalysisWCfg1);
Assert.assertEquals(analysisWCfg2, expectedAnalysisWCfg2);
}

}