diff --git a/processing/src/main/java/io/druid/query/DataSources.java b/processing/src/main/java/io/druid/query/DataSources.java index f4ba835a7b78..b8fc9dac3a46 100644 --- a/processing/src/main/java/io/druid/query/DataSources.java +++ b/processing/src/main/java/io/druid/query/DataSources.java @@ -20,7 +20,6 @@ package io.druid.query; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import io.druid.java.util.common.ISE; import io.druid.query.Query.FilterSupport; @@ -161,7 +160,7 @@ public static List getInvariantColumns(Query query) } else if (query instanceof Query.ColumnsSupport) { return ((Query.ColumnsSupport) query).getColumns(); } - return ImmutableList.of(); + return null; } public static List getOutputColumns(DataSource dataSource) diff --git a/processing/src/main/java/io/druid/query/JoinElement.java b/processing/src/main/java/io/druid/query/JoinElement.java index 097cf9592220..70cd52b1f614 100644 --- a/processing/src/main/java/io/druid/query/JoinElement.java +++ b/processing/src/main/java/io/druid/query/JoinElement.java @@ -25,9 +25,12 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import io.druid.common.guava.GuavaUtils; import io.druid.java.util.common.ISE; import io.druid.query.Query.ArrayOutputSupport; +import io.druid.query.dimension.DimensionSpec; +import io.druid.query.dimension.DimensionSpecs; import io.druid.query.groupby.orderby.LimitSpec; import io.druid.query.groupby.orderby.OrderByColumnSpec; import io.druid.query.select.StreamQuery; @@ -265,11 +268,22 @@ public boolean isRightSemiJoinable(DataSource left, DataSource right, List keys) + { + if (dataSource instanceof QueryDataSource) { + List dimensions = BaseQuery.getDimensions(((QueryDataSource) dataSource).getQuery()); + if (!dimensions.isEmpty() && DimensionSpecs.isAllDefault(dimensions)) { + return Sets.newHashSet(DimensionSpecs.toOutputNames(dimensions)).containsAll(keys); + } + } + return false; + } + public boolean isLeftBroadcastable(DataSource left, DataSource right) { if (joinType.isRightDrivable() && DataSources.isDataNodeSourced(left) && DataSources.isDataNodeSourced(right)) { List columns = DataSources.getInvariantColumns(right); - if (columns.containsAll(rightJoinColumns)) { + if (columns != null && columns.containsAll(rightJoinColumns)) { return true; } } @@ -280,7 +294,7 @@ public boolean isRightBroadcastable(DataSource left, DataSource right) { if (joinType.isLeftDrivable() && DataSources.isDataNodeSourced(left) && DataSources.isDataNodeSourced(right)) { List columns = DataSources.getInvariantColumns(left); - if (columns.containsAll(leftJoinColumns)) { + if (columns != null && columns.containsAll(leftJoinColumns)) { return true; } } diff --git a/processing/src/main/java/io/druid/query/JoinQuery.java b/processing/src/main/java/io/druid/query/JoinQuery.java index 318252f5a0b7..7c9734af1eaa 100644 --- a/processing/src/main/java/io/druid/query/JoinQuery.java +++ b/processing/src/main/java/io/druid/query/JoinQuery.java @@ -56,7 +56,7 @@ import io.druid.query.filter.BloomDimFilter; import io.druid.query.filter.DimFilter; import io.druid.query.filter.DimFilters; -import io.druid.query.filter.ValuesFilter; +import io.druid.query.filter.SemiJoinFactory; import io.druid.query.groupby.orderby.OrderByColumnSpec; import io.druid.query.select.StreamQuery; import io.druid.query.spec.QuerySegmentSpec; @@ -351,17 +351,18 @@ public Query rewriteQuery(QuerySegmentWalker segmentWalker, QueryConfig config) ArrayOutputSupport array = JoinElement.toQuery(segmentWalker, right, segmentSpec, context); List rightColumns = array.estimatedOutputColumns(); if (rightColumns != null && rightColumns.containsAll(rightJoinColumns)) { + boolean allowDuplication = JoinElement.allowDuplication(left, leftJoinColumns); int[] indices = GuavaUtils.indexOf(rightColumns, rightJoinColumns); - Supplier> fieldValues = - () -> Sequences.toList(Sequences.map( - QueryRunners.runArray(array, segmentWalker), GuavaUtils.mapper(indices))); - DataSource filtered = DataSources.applyFilterAndProjection( - left, ValuesFilter.fieldNames(leftJoinColumns, fieldValues), outputColumns - ); - LOG.info("-- %s:%d (R) is merged into %s (L) as a filter", rightAlias, rightEstimated, leftAlias); - queries.add(JoinElement.toQuery(segmentWalker, filtered, segmentSpec, context)); - if (leftEstimated >= 0) { - currentEstimation = resultEstimation(joinType, leftEstimated, rightEstimated); + Sequence fieldValues = + Sequences.map(QueryRunners.runArray(array, segmentWalker), GuavaUtils.mapper(indices)); + DimFilter semijoin = SemiJoinFactory.from(leftJoinColumns, fieldValues, allowDuplication); + if (semijoin != null) { + DataSource filtered = DataSources.applyFilterAndProjection(left, semijoin, outputColumns); + LOG.info("-- %s:%d (R) is merged into %s (L) as filter on %s", rightAlias, rightEstimated, leftAlias, leftJoinColumns); + queries.add(JoinElement.toQuery(segmentWalker, filtered, segmentSpec, context)); + if (leftEstimated >= 0) { + currentEstimation = resultEstimation(joinType, leftEstimated, rightEstimated); + } } continue; } @@ -370,19 +371,20 @@ public Query rewriteQuery(QuerySegmentWalker segmentWalker, QueryConfig config) ArrayOutputSupport array = JoinElement.toQuery(segmentWalker, left, segmentSpec, context); List leftColumns = array.estimatedOutputColumns(); if (leftColumns != null && leftColumns.containsAll(leftJoinColumns)) { + boolean allowDuplication = JoinElement.allowDuplication(right, rightJoinColumns); int[] indices = GuavaUtils.indexOf(leftColumns, leftJoinColumns); - Supplier> fieldValues = - () -> Sequences.toList(Sequences.map( - QueryRunners.runArray(array, segmentWalker), GuavaUtils.mapper(indices))); - DataSource filtered = DataSources.applyFilterAndProjection( - right, ValuesFilter.fieldNames(rightJoinColumns, fieldValues), outputColumns - ); - LOG.info("-- %s:%d (L) is merged into %s (R) as a filter", leftAlias, leftEstimated, rightAlias); - queries.add(JoinElement.toQuery(segmentWalker, filtered, segmentSpec, context)); - if (rightEstimated >= 0) { - currentEstimation = resultEstimation(joinType, leftEstimated, rightEstimated); + Sequence fieldValues = + Sequences.map(QueryRunners.runArray(array, segmentWalker), GuavaUtils.mapper(indices)); + DimFilter semijoin = SemiJoinFactory.from(rightJoinColumns, fieldValues, allowDuplication); + if (semijoin != null) { + DataSource filtered = DataSources.applyFilterAndProjection(right, semijoin, outputColumns); + LOG.info("-- %s:%d (L) is merged into %s (R) as filter on %s", leftAlias, leftEstimated, rightAlias, rightJoinColumns); + queries.add(JoinElement.toQuery(segmentWalker, filtered, segmentSpec, context)); + if (rightEstimated >= 0) { + currentEstimation = resultEstimation(joinType, leftEstimated, rightEstimated); + } + continue; } - continue; } } } diff --git a/processing/src/main/java/io/druid/query/filter/InDimFilter.java b/processing/src/main/java/io/druid/query/filter/InDimFilter.java index 7790c7abdca9..2802fd24ca6d 100644 --- a/processing/src/main/java/io/druid/query/filter/InDimFilter.java +++ b/processing/src/main/java/io/druid/query/filter/InDimFilter.java @@ -69,7 +69,7 @@ public InDimFilter( ); } - private InDimFilter(String dimension, ExtractionFn extractionFn, List values) + public InDimFilter(String dimension, ExtractionFn extractionFn, List values) { this.dimension = Preconditions.checkNotNull(dimension, "dimension can not be null"); this.extractionFn = extractionFn; diff --git a/processing/src/main/java/io/druid/query/filter/InDimsFilter.java b/processing/src/main/java/io/druid/query/filter/InDimsFilter.java index 72b61f72af4f..7e26a39a8e52 100644 --- a/processing/src/main/java/io/druid/query/filter/InDimsFilter.java +++ b/processing/src/main/java/io/druid/query/filter/InDimsFilter.java @@ -55,9 +55,6 @@ public InDimsFilter( { Preconditions.checkArgument(!GuavaUtils.isNullOrEmpty(dimensions), "dimensions can not be empty"); Preconditions.checkArgument(dimensions.size() == values.size(), "number of dimensions and values is not match"); - for (List value : values) { - - } Preconditions.checkArgument(!GuavaUtils.isNullOrEmpty(values), "values can not be empty"); this.dimensions = dimensions; this.values = values; diff --git a/processing/src/main/java/io/druid/query/filter/SemiJoinFactory.java b/processing/src/main/java/io/druid/query/filter/SemiJoinFactory.java new file mode 100644 index 000000000000..1c28ed00eff1 --- /dev/null +++ b/processing/src/main/java/io/druid/query/filter/SemiJoinFactory.java @@ -0,0 +1,79 @@ +/* + * Licensed to SK Telecom Co., LTD. (SK Telecom) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. SK Telecom licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.filter; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import io.druid.common.guava.Sequence; +import io.druid.common.utils.Sequences; +import io.druid.java.util.common.parsers.CloseableIterator; +import io.druid.segment.StringArray; +import org.apache.commons.io.IOUtils; + +import java.util.List; +import java.util.Objects; +import java.util.Set; + +public class SemiJoinFactory +{ + public static DimFilter from(List fieldNames, Sequence fieldValues, boolean allowDuplication) + { + if (fieldNames.size() == 1) { + final Set set = Sets.newTreeSet(); + final CloseableIterator iterator = Sequences.toIterator(fieldValues); + try { + while (iterator.hasNext()) { + if (!set.add(Objects.toString(iterator.next()[0], "")) && !allowDuplication) { + return null; + } + } + } + finally { + IOUtils.closeQuietly(iterator); + } + return new InDimFilter(fieldNames.get(0), null, ImmutableList.copyOf(set)); + } else { + final Set set = Sets.newTreeSet(); + final CloseableIterator iterator = Sequences.toIterator(fieldValues); + try { + while (iterator.hasNext()) { + if (!set.add(StringArray.of(iterator.next(), "")) && !allowDuplication) { + return null; + } + } + } + finally { + IOUtils.closeQuietly(iterator); + } + List> valuesList = Lists.newArrayList(); + for (int i = 0; i < fieldNames.size(); i++) { + valuesList.add(Lists.newArrayList()); + } + for (StringArray array : set) { + for (int i = 0; i < fieldNames.size(); i++) { + valuesList.get(i).add(array.get(i)); + } + } + return new InDimsFilter(fieldNames, valuesList); + } + } +} + diff --git a/processing/src/main/java/io/druid/query/filter/ValuesFilter.java b/processing/src/main/java/io/druid/query/filter/ValuesFilter.java deleted file mode 100644 index 1ed5fd5a00f1..000000000000 --- a/processing/src/main/java/io/druid/query/filter/ValuesFilter.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to SK Telecom Co., LTD. (SK Telecom) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. SK Telecom licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.query.filter; - -import com.google.common.base.Preconditions; -import com.google.common.base.Supplier; -import com.google.common.collect.Lists; -import io.druid.common.guava.GuavaUtils; -import io.druid.query.Query; -import io.druid.query.QuerySegmentWalker; - -import java.util.List; -import java.util.Objects; - -public class ValuesFilter extends DimFilter.FilterFactory implements DimFilter.Rewriting -{ - public static ValuesFilter fieldNames(List fieldNames, Supplier> fieldValues) - { - return new ValuesFilter(fieldNames, fieldValues); - } - - private final List fieldNames; - private final Supplier> fieldValues; - - public ValuesFilter(List fieldNames, Supplier> fieldValues) - { - this.fieldNames = fieldNames; - this.fieldValues = fieldValues; - Preconditions.checkArgument(fieldNames.size() > 0, "'fieldNames' is empty"); - } - - @Override - public DimFilter rewrite(QuerySegmentWalker walker, Query parent) - { - final List values = fieldValues.get(); - if (fieldNames.size() == 1) { - return new InDimFilter( - fieldNames.get(0), GuavaUtils.transform(values, array -> Objects.toString(array[0], null)), null - ); - } else { - List> valuesList = Lists.newArrayList(); - for (int i = 0; i < fieldNames.size(); i++) { - valuesList.add(Lists.newArrayList()); - } - for (Object[] array : values) { - for (int i = 0; i < fieldNames.size(); i++) { - valuesList.get(i).add(Objects.toString(array[i], null)); - } - } - return new InDimsFilter(fieldNames, valuesList); - } - } - - @Override - public String toString() - { - return "ValuesFilter{fieldNames=" + fieldNames + ", fieldValues=" + fieldValues + '}'; - } -} diff --git a/processing/src/main/java/io/druid/segment/StringArray.java b/processing/src/main/java/io/druid/segment/StringArray.java index 0aa08ac7bd0d..129337269648 100644 --- a/processing/src/main/java/io/druid/segment/StringArray.java +++ b/processing/src/main/java/io/druid/segment/StringArray.java @@ -21,10 +21,21 @@ import com.google.common.base.Preconditions; +import java.util.Objects; + /** */ public class StringArray extends ObjectArray { + public static StringArray of(Object[] array, String nullValue) + { + final String[] strings = new String[array.length]; + for (int i = 0; i < array.length; i++) { + strings[i] = Objects.toString(array[i], nullValue); + } + return new StringArray(strings); + } + public static StringArray of(String[] array) { return new StringArray(Preconditions.checkNotNull(array));