Skip to content

Commit

Permalink
apache#3574 Fix invalid conversion to semi-join filter with duplicate…
Browse files Browse the repository at this point in the history
…d values
  • Loading branch information
navis committed Jan 24, 2021
1 parent 84d6c98 commit 75a8f8a
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 106 deletions.
3 changes: 1 addition & 2 deletions processing/src/main/java/io/druid/query/DataSources.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package io.druid.query;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import io.druid.java.util.common.ISE;
import io.druid.query.Query.FilterSupport;
Expand Down Expand Up @@ -161,7 +160,7 @@ public static List<String> getInvariantColumns(Query<?> query)
} else if (query instanceof Query.ColumnsSupport) {
return ((Query.ColumnsSupport<?>) query).getColumns();
}
return ImmutableList.of();
return null;
}

public static List<String> getOutputColumns(DataSource dataSource)
Expand Down
18 changes: 16 additions & 2 deletions processing/src/main/java/io/druid/query/JoinElement.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.druid.common.guava.GuavaUtils;
import io.druid.java.util.common.ISE;
import io.druid.query.Query.ArrayOutputSupport;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.dimension.DimensionSpecs;
import io.druid.query.groupby.orderby.LimitSpec;
import io.druid.query.groupby.orderby.OrderByColumnSpec;
import io.druid.query.select.StreamQuery;
Expand Down Expand Up @@ -265,11 +268,22 @@ public boolean isRightSemiJoinable(DataSource left, DataSource right, List<Strin
return false;
}

public static boolean allowDuplication(DataSource dataSource, List<String> keys)
{
if (dataSource instanceof QueryDataSource) {
List<DimensionSpec> dimensions = BaseQuery.getDimensions(((QueryDataSource) dataSource).getQuery());
if (!dimensions.isEmpty() && DimensionSpecs.isAllDefault(dimensions)) {
return Sets.newHashSet(DimensionSpecs.toOutputNames(dimensions)).containsAll(keys);
}
}
return false;
}

public boolean isLeftBroadcastable(DataSource left, DataSource right)
{
if (joinType.isRightDrivable() && DataSources.isDataNodeSourced(left) && DataSources.isDataNodeSourced(right)) {
List<String> columns = DataSources.getInvariantColumns(right);
if (columns.containsAll(rightJoinColumns)) {
if (columns != null && columns.containsAll(rightJoinColumns)) {
return true;
}
}
Expand All @@ -280,7 +294,7 @@ public boolean isRightBroadcastable(DataSource left, DataSource right)
{
if (joinType.isLeftDrivable() && DataSources.isDataNodeSourced(left) && DataSources.isDataNodeSourced(right)) {
List<String> columns = DataSources.getInvariantColumns(left);
if (columns.containsAll(leftJoinColumns)) {
if (columns != null && columns.containsAll(leftJoinColumns)) {
return true;
}
}
Expand Down
46 changes: 24 additions & 22 deletions processing/src/main/java/io/druid/query/JoinQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
import io.druid.query.filter.BloomDimFilter;
import io.druid.query.filter.DimFilter;
import io.druid.query.filter.DimFilters;
import io.druid.query.filter.ValuesFilter;
import io.druid.query.filter.SemiJoinFactory;
import io.druid.query.groupby.orderby.OrderByColumnSpec;
import io.druid.query.select.StreamQuery;
import io.druid.query.spec.QuerySegmentSpec;
Expand Down Expand Up @@ -351,17 +351,18 @@ public Query rewriteQuery(QuerySegmentWalker segmentWalker, QueryConfig config)
ArrayOutputSupport array = JoinElement.toQuery(segmentWalker, right, segmentSpec, context);
List<String> rightColumns = array.estimatedOutputColumns();
if (rightColumns != null && rightColumns.containsAll(rightJoinColumns)) {
boolean allowDuplication = JoinElement.allowDuplication(left, leftJoinColumns);
int[] indices = GuavaUtils.indexOf(rightColumns, rightJoinColumns);
Supplier<List<Object[]>> fieldValues =
() -> Sequences.toList(Sequences.map(
QueryRunners.runArray(array, segmentWalker), GuavaUtils.mapper(indices)));
DataSource filtered = DataSources.applyFilterAndProjection(
left, ValuesFilter.fieldNames(leftJoinColumns, fieldValues), outputColumns
);
LOG.info("-- %s:%d (R) is merged into %s (L) as a filter", rightAlias, rightEstimated, leftAlias);
queries.add(JoinElement.toQuery(segmentWalker, filtered, segmentSpec, context));
if (leftEstimated >= 0) {
currentEstimation = resultEstimation(joinType, leftEstimated, rightEstimated);
Sequence<Object[]> fieldValues =
Sequences.map(QueryRunners.runArray(array, segmentWalker), GuavaUtils.mapper(indices));
DimFilter semijoin = SemiJoinFactory.from(leftJoinColumns, fieldValues, allowDuplication);
if (semijoin != null) {
DataSource filtered = DataSources.applyFilterAndProjection(left, semijoin, outputColumns);
LOG.info("-- %s:%d (R) is merged into %s (L) as filter on %s", rightAlias, rightEstimated, leftAlias, leftJoinColumns);
queries.add(JoinElement.toQuery(segmentWalker, filtered, segmentSpec, context));
if (leftEstimated >= 0) {
currentEstimation = resultEstimation(joinType, leftEstimated, rightEstimated);
}
}
continue;
}
Expand All @@ -370,19 +371,20 @@ public Query rewriteQuery(QuerySegmentWalker segmentWalker, QueryConfig config)
ArrayOutputSupport array = JoinElement.toQuery(segmentWalker, left, segmentSpec, context);
List<String> leftColumns = array.estimatedOutputColumns();
if (leftColumns != null && leftColumns.containsAll(leftJoinColumns)) {
boolean allowDuplication = JoinElement.allowDuplication(right, rightJoinColumns);
int[] indices = GuavaUtils.indexOf(leftColumns, leftJoinColumns);
Supplier<List<Object[]>> fieldValues =
() -> Sequences.toList(Sequences.map(
QueryRunners.runArray(array, segmentWalker), GuavaUtils.mapper(indices)));
DataSource filtered = DataSources.applyFilterAndProjection(
right, ValuesFilter.fieldNames(rightJoinColumns, fieldValues), outputColumns
);
LOG.info("-- %s:%d (L) is merged into %s (R) as a filter", leftAlias, leftEstimated, rightAlias);
queries.add(JoinElement.toQuery(segmentWalker, filtered, segmentSpec, context));
if (rightEstimated >= 0) {
currentEstimation = resultEstimation(joinType, leftEstimated, rightEstimated);
Sequence<Object[]> fieldValues =
Sequences.map(QueryRunners.runArray(array, segmentWalker), GuavaUtils.mapper(indices));
DimFilter semijoin = SemiJoinFactory.from(rightJoinColumns, fieldValues, allowDuplication);
if (semijoin != null) {
DataSource filtered = DataSources.applyFilterAndProjection(right, semijoin, outputColumns);
LOG.info("-- %s:%d (L) is merged into %s (R) as filter on %s", leftAlias, leftEstimated, rightAlias, rightJoinColumns);
queries.add(JoinElement.toQuery(segmentWalker, filtered, segmentSpec, context));
if (rightEstimated >= 0) {
currentEstimation = resultEstimation(joinType, leftEstimated, rightEstimated);
}
continue;
}
continue;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public InDimFilter(
);
}

private InDimFilter(String dimension, ExtractionFn extractionFn, List<String> values)
public InDimFilter(String dimension, ExtractionFn extractionFn, List<String> values)
{
this.dimension = Preconditions.checkNotNull(dimension, "dimension can not be null");
this.extractionFn = extractionFn;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,6 @@ public InDimsFilter(
{
Preconditions.checkArgument(!GuavaUtils.isNullOrEmpty(dimensions), "dimensions can not be empty");
Preconditions.checkArgument(dimensions.size() == values.size(), "number of dimensions and values is not match");
for (List<String> value : values) {

}
Preconditions.checkArgument(!GuavaUtils.isNullOrEmpty(values), "values can not be empty");
this.dimensions = dimensions;
this.values = values;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to SK Telecom Co., LTD. (SK Telecom) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. SK Telecom licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.query.filter;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.druid.common.guava.Sequence;
import io.druid.common.utils.Sequences;
import io.druid.java.util.common.parsers.CloseableIterator;
import io.druid.segment.StringArray;
import org.apache.commons.io.IOUtils;

import java.util.List;
import java.util.Objects;
import java.util.Set;

public class SemiJoinFactory
{
public static DimFilter from(List<String> fieldNames, Sequence<Object[]> fieldValues, boolean allowDuplication)
{
if (fieldNames.size() == 1) {
final Set<String> set = Sets.newTreeSet();
final CloseableIterator<Object[]> iterator = Sequences.toIterator(fieldValues);
try {
while (iterator.hasNext()) {
if (!set.add(Objects.toString(iterator.next()[0], "")) && !allowDuplication) {
return null;
}
}
}
finally {
IOUtils.closeQuietly(iterator);
}
return new InDimFilter(fieldNames.get(0), null, ImmutableList.copyOf(set));
} else {
final Set<StringArray> set = Sets.newTreeSet();
final CloseableIterator<Object[]> iterator = Sequences.toIterator(fieldValues);
try {
while (iterator.hasNext()) {
if (!set.add(StringArray.of(iterator.next(), "")) && !allowDuplication) {
return null;
}
}
}
finally {
IOUtils.closeQuietly(iterator);
}
List<List<String>> valuesList = Lists.newArrayList();
for (int i = 0; i < fieldNames.size(); i++) {
valuesList.add(Lists.newArrayList());
}
for (StringArray array : set) {
for (int i = 0; i < fieldNames.size(); i++) {
valuesList.get(i).add(array.get(i));
}
}
return new InDimsFilter(fieldNames, valuesList);
}
}
}

76 changes: 0 additions & 76 deletions processing/src/main/java/io/druid/query/filter/ValuesFilter.java

This file was deleted.

11 changes: 11 additions & 0 deletions processing/src/main/java/io/druid/segment/StringArray.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,21 @@

import com.google.common.base.Preconditions;

import java.util.Objects;

/**
*/
public class StringArray extends ObjectArray<String>
{
public static StringArray of(Object[] array, String nullValue)
{
final String[] strings = new String[array.length];
for (int i = 0; i < array.length; i++) {
strings[i] = Objects.toString(array[i], nullValue);
}
return new StringArray(strings);
}

public static StringArray of(String[] array)
{
return new StringArray(Preconditions.checkNotNull(array));
Expand Down

0 comments on commit 75a8f8a

Please sign in to comment.