Skip to content

Commit

Permalink
apache#3421 Add more validation applying bloom filter
Browse files Browse the repository at this point in the history
  • Loading branch information
navis committed Sep 29, 2020
1 parent 81cedb9 commit 65bf792
Show file tree
Hide file tree
Showing 13 changed files with 461 additions and 87 deletions.
4 changes: 1 addition & 3 deletions processing/src/main/java/io/druid/query/BaseQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.google.common.base.Strings;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -569,8 +568,7 @@ public String getId()
@Override
public Query<T> withId(String id)
{
Preconditions.checkNotNull(id, "'id' should not be null");
return withOverriddenContext(ImmutableMap.<String, Object>of(QUERYID, id));
return withOverriddenContext(GuavaUtils.<String, Object>mutableMap(QUERYID, id));
}

@Override
Expand Down
12 changes: 9 additions & 3 deletions processing/src/main/java/io/druid/query/JoinQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -522,14 +522,20 @@ public JoinDelegate withSchema(Supplier<RowSignature> schema)
return this;
}

@Override
public String getType()
{
return Query.JOIN;
}

@Override
public String toString()
{
return "JoinDelegate{" +
"queries=" + getQueries() +
", prefixAliases=" + getPrefixAliases() +
", timeColumnName=" + getTimeColumnName() +
", limit=" + getLimit() +
(prefixAliases == null ? "" : ", prefixAliases=" + getPrefixAliases()) +
(timeColumnName == null ? "" : ", timeColumnName=" + getTimeColumnName()) +
(getLimit() > 0 ? ", limit=" + getLimit() : "") +
'}';
}

Expand Down
5 changes: 4 additions & 1 deletion processing/src/main/java/io/druid/query/ViewDataSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import io.druid.common.guava.GuavaUtils;
import io.druid.query.filter.DimFilter;
import io.druid.segment.VirtualColumn;

Expand Down Expand Up @@ -168,6 +169,8 @@ public int hashCode()
@Override
public String toString()
{
return "$view:" + name + columns + (virtualColumns == null ? "" : "(" + virtualColumns + ")");
return "$view:" + name + columns +
(GuavaUtils.isNullOrEmpty(virtualColumns) ? "" : "(" + virtualColumns + ")") +
(filter == null ? "" : "(" + filter + ")");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,16 @@ public class BloomFilterAggregatorFactory extends AggregatorFactory
{
public static BloomFilterAggregatorFactory of(String name, List<String> fieldNames, int maxNumEntries)
{
return new BloomFilterAggregatorFactory(name, fieldNames, null, null, null, true, maxNumEntries, false);
return new BloomFilterAggregatorFactory(
name,
fieldNames,
null,
GroupingSetSpec.EMPTY,
null,
true,
maxNumEntries,
false
);
}

private static final byte CACHE_TYPE_ID = 0x25;
Expand Down Expand Up @@ -312,8 +321,8 @@ public boolean equals(Object o)
public int hashCode()
{
int result = name != null ? name.hashCode() : 0;
result = 31 * result + (fieldNames != null ? fieldNames.hashCode() : 0);
result = 31 * result + (fields != null ? fields.hashCode() : 0);
result = 31 * result + Objects.hashCode(fieldNames);
result = 31 * result + Objects.hashCode(fields);
result = 31 * result + Objects.hashCode(groupingSets);
result = 31 * result + Objects.hashCode(predicate);
result = 31 * result + (byRow ? 1 : 0);
Expand All @@ -326,10 +335,10 @@ public String toString()
{
return "BloomFilterAggregatorFactory{" +
"name='" + name + '\'' +
", fieldNames='" + fieldNames + '\'' +
", fields=" + fields +
", groupingSets=" + groupingSets +
", predicate='" + predicate + '\'' +
(fieldNames == null ? "" : ", fieldNames=" + fieldNames) +
(fields == null ? "" : ", fields=" + fields) +
(groupingSets == null ? "" : ", groupingSets=" + groupingSets) +
(predicate == null ? "" : ", predicate='" + predicate + '\'') +
", byRow=" + byRow +
", maxNumEntries=" + maxNumEntries +
'}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,8 @@ public KeyBuilder getCacheKey(KeyBuilder builder)
return builder.append(CACHE_TYPE_ID)
.append(fieldNames)
.append(fields)
.append(groupingSets)
.append(predicate)
.append(byRow, round);
}

Expand Down Expand Up @@ -304,8 +306,8 @@ public boolean equals(Object o)
public int hashCode()
{
int result = name != null ? name.hashCode() : 0;
result = 31 * result + (fieldNames != null ? fieldNames.hashCode() : 0);
result = 31 * result + (fields != null ? fields.hashCode() : 0);
result = 31 * result + Objects.hashCode(fieldNames);
result = 31 * result + Objects.hashCode(fields);
result = 31 * result + Objects.hashCode(predicate);
result = 31 * result + (byRow ? 1 : 0);
result = 31 * result + (round ? 1 : 0);
Expand All @@ -317,10 +319,10 @@ public String toString()
{
return "CardinalityAggregatorFactory{" +
"name='" + name + '\'' +
", fieldNames='" + fieldNames + '\'' +
", fields=" + fields +
", groupingSets=" + groupingSets +
", predicate='" + predicate + '\'' +
(fieldNames == null ? "" : ", fieldNames=" + fieldNames) +
(fields == null ? "" : ", fields=" + fields) +
(groupingSets == null ? "" : ", groupingSets=" + groupingSets) +
(predicate == null ? "" : ", predicate=" + predicate) +
", byRow=" + byRow +
", round=" + round +
'}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,13 @@ public int hashCode()
result = 31 * result + end;
return result;
}

@Override
public String toString()
{
return "SubstringDimExtractionFn{" +
"index=" + index +
", end=" + end +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,11 +187,11 @@ public byte[] getBloomFilter()
@Override
public String toString()
{
return "BloomFilter{" +
", fieldNames=" + fieldNames +
", fields=" + fields +
", groupingSets=" + groupingSets +
'}';
if (fieldNames != null) {
return "BloomFilter{fieldNames=" + fieldNames + ", groupingSets=" + groupingSets + '}';
} else {
return "BloomFilter{fields=" + fields + ", groupingSets=" + groupingSets + '}';
}
}

@JsonTypeName("bloom.factory")
Expand Down Expand Up @@ -293,10 +293,10 @@ public DimFilter rewrite(QuerySegmentWalker walker, Query parent)
public String toString()
{
return "BloomDimFilter.Factory{" +
"fieldNames=" + fieldNames +
", fields=" + fields +
", groupingSets=" + groupingSets +
", bloomSource=" + bloomSource +
"bloomSource=" + bloomSource +
(fieldNames == null ? "" : ", fieldNames=" + fieldNames) +
(fields == null ? "" : ", fields=" + fields) +
(groupingSets == null ? "" : ", groupingSets=" + groupingSets) +
", maxNumEntries=" + maxNumEntries +
'}';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@
})
public interface GroupingSetSpec extends Cacheable
{
GroupingSetSpec EMPTY = new GroupingSetSpec.Indices(null);
GroupingSetSpec EMPTY = new GroupingSetSpec.Indices(null)
{
@Override
public String toString() { return "Noop";}
};

void validate(List<String> dimensions);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public boolean isNoop()
@Override
public String toString()
{
return "NoopLimitSpec{}";
return "Noop";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ public Row apply(Map<String, Object> input)
public String toString()
{
StringBuilder builder = new StringBuilder(64)
.append(getType()).append('{')
.append("StreamQuery{")
.append("dataSource='").append(getDataSource()).append('\'');

if (getQuerySegmentSpec() != null) {
Expand Down
43 changes: 0 additions & 43 deletions processing/src/test/java/io/druid/query/TestTpchQuery.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.function.Consumer;

/**
*/
Expand All @@ -99,6 +100,8 @@ public class TestQuerySegmentWalker implements ForwardingSegmentWalker, QueryToo
private final PopulatingMap timeLines;
private final ForwardHandler handler;

private final Consumer<Query<?>> hook;

@Override
public StorageHandler getHandler(String scheme)
{
Expand Down Expand Up @@ -178,15 +181,16 @@ public void addPopulator(String key, Supplier<List<Pair<DataSegment, Segment>>>

public TestQuerySegmentWalker(QueryRunnerFactoryConglomerate conglomerate, QueryConfig config)
{
this(TestHelper.JSON_MAPPER, conglomerate, Execs.newDirectExecutorService(), config, new PopulatingMap());
this(TestHelper.JSON_MAPPER, conglomerate, Execs.newDirectExecutorService(), config, new PopulatingMap(), q -> {});
}

private TestQuerySegmentWalker(
ObjectMapper objectMapper,
QueryRunnerFactoryConglomerate conglomerate,
ExecutorService executor,
QueryConfig queryConfig,
PopulatingMap timeLines
PopulatingMap timeLines,
Consumer<Query<?>> hook
)
{
this.objectMapper = objectMapper;
Expand All @@ -201,6 +205,7 @@ private TestQuerySegmentWalker(
ImmutableMap.<String, StorageHandler>of("file", new LocalStorageHandler(objectMapper)),
this
);
this.hook = hook;
}

public TestQuerySegmentWalker withConglomerate(QueryRunnerFactoryConglomerate conglomerate)
Expand All @@ -210,7 +215,8 @@ public TestQuerySegmentWalker withConglomerate(QueryRunnerFactoryConglomerate co
conglomerate,
executor,
queryConfig,
timeLines
timeLines,
hook
);
}

Expand All @@ -221,7 +227,8 @@ public TestQuerySegmentWalker withObjectMapper(ObjectMapper objectMapper)
conglomerate,
executor,
queryConfig,
timeLines
timeLines,
hook
);
}

Expand All @@ -232,7 +239,8 @@ public TestQuerySegmentWalker withExecutor(ExecutorService executor)
conglomerate,
executor,
queryConfig,
timeLines
timeLines,
hook
);
}

Expand All @@ -243,7 +251,20 @@ public TestQuerySegmentWalker withQueryConfig(QueryConfig queryConfig)
conglomerate,
executor,
queryConfig,
timeLines
timeLines,
hook
);
}

public TestQuerySegmentWalker withQueryHook(Consumer<Query<?>> hook)
{
return new TestQuerySegmentWalker(
objectMapper,
conglomerate,
executor,
queryConfig,
timeLines,
hook
);
}

Expand All @@ -259,7 +280,8 @@ public TestQuerySegmentWalker duplicate()
conglomerate,
executor,
queryConfig,
duplicate
duplicate,
hook
);
}

Expand Down Expand Up @@ -330,6 +352,7 @@ public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<Seg

private <T> QueryRunner<T> makeQueryRunner(Query<T> query)
{
hook.accept(query);
final Query<T> prepared = prepareQuery(query);
final QueryRunner<T> runner = toQueryRunner(prepared);
return new QueryRunner<T>()
Expand Down
Loading

0 comments on commit 65bf792

Please sign in to comment.