From 049a47fd2dbe0362f77971035de4979b9592a90f Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 1 Mar 2017 17:03:43 -0800 Subject: [PATCH] SQL: Make row extractions extensible and add one for lookups. This is a reopening of #3989, since that PR was merged to master prematurely and accidentally. --- docs/content/querying/sql.md | 11 +- .../histogram/sql/QuantileSqlAggregator.java | 3 + .../sql/QuantileSqlAggregatorTest.java | 6 +- sql/pom.xml | 5 + .../ApproxCountDistinctSqlAggregator.java | 3 + .../calcite/aggregation/SqlAggregator.java | 3 + .../AbstractExpressionConversion.java | 57 -------- ...=> CharacterLengthExtractionOperator.java} | 27 ++-- .../expression/ExpressionConverter.java | 123 ------------------ .../sql/calcite/expression/Expressions.java | 70 +++++++--- ...on.java => ExtractExtractionOperator.java} | 22 ++-- ...sion.java => FloorExtractionOperator.java} | 28 ++-- .../expression/LookupExtractionOperator.java | 113 ++++++++++++++++ ...ersion.java => SqlExtractionOperator.java} | 27 ++-- ....java => SubstringExtractionOperator.java} | 27 ++-- .../calcite/planner/DruidConvertletTable.java | 2 +- .../calcite/planner/DruidOperatorTable.java | 40 +++++- .../io/druid/sql/calcite/planner/Rules.java | 4 +- .../sql/calcite/rule/DruidFilterRule.java | 12 +- .../druid/sql/calcite/rule/GroupByRules.java | 36 +++-- .../druid/sql/calcite/rule/SelectRules.java | 19 +-- .../java/io/druid/sql/guice/SqlBindings.java | 10 ++ .../java/io/druid/sql/guice/SqlModule.java | 30 ++++- .../druid/sql/calcite/CalciteQueryTest.java | 96 +++++++++++++- .../druid/sql/calcite/util/CalciteTests.java | 90 ++++++++++++- 25 files changed, 556 insertions(+), 308 deletions(-) delete mode 100644 sql/src/main/java/io/druid/sql/calcite/expression/AbstractExpressionConversion.java rename sql/src/main/java/io/druid/sql/calcite/expression/{CharLengthExpressionConversion.java => CharacterLengthExtractionOperator.java} (72%) delete mode 100644 sql/src/main/java/io/druid/sql/calcite/expression/ExpressionConverter.java rename sql/src/main/java/io/druid/sql/calcite/expression/{ExtractExpressionConversion.java => ExtractExtractionOperator.java} (85%) rename sql/src/main/java/io/druid/sql/calcite/expression/{FloorExpressionConversion.java => FloorExtractionOperator.java} (85%) create mode 100644 sql/src/main/java/io/druid/sql/calcite/expression/LookupExtractionOperator.java rename sql/src/main/java/io/druid/sql/calcite/expression/{ExpressionConversion.java => SqlExtractionOperator.java} (69%) rename sql/src/main/java/io/druid/sql/calcite/expression/{SubstringExpressionConversion.java => SubstringExtractionOperator.java} (76%) diff --git a/docs/content/querying/sql.md b/docs/content/querying/sql.md index c662b99d8524..9e1ae3126d8f 100644 --- a/docs/content/querying/sql.md +++ b/docs/content/querying/sql.md @@ -127,6 +127,16 @@ Druid's SQL language supports a number of time operations, including: By default, time operations use the UTC time zone. You can change the time zone for time operations by setting the connection context parameter "sqlTimeZone" to the name of the time zone, like "America/Los_Angeles". +### Query-time lookups + +Druid [query-time lookups](lookups.html) can be accessed through the `LOOKUP(expression, lookupName)` function. The +"lookupName" must refer to a lookup you have registered with Druid's lookup framework. For example, the following +query can be used to perform a groupBy on looked-up values: + +```sql +SELECT LOOKUP(col, 'my_lookup') AS col_with_lookup FROM data_source GROUP BY LOOKUP(col, 'my_lookup') +``` + ### Subqueries Druid's SQL layer supports many types of subqueries, including the ones listed below. @@ -229,7 +239,6 @@ language. Some unsupported SQL features include: Additionally, some Druid features are not supported by the SQL language. Some unsupported Druid features include: - [Multi-value dimensions](multi-value-dimensions.html). -- [Query-time lookups](lookups.html). - [DataSketches](../development/extensions-core/datasketches-aggregators.html). ## Third-party SQL libraries diff --git a/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregator.java b/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregator.java index 446944c8a531..1140eec6dda8 100644 --- a/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregator.java +++ b/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregator.java @@ -33,6 +33,7 @@ import io.druid.sql.calcite.aggregation.SqlAggregator; import io.druid.sql.calcite.expression.Expressions; import io.druid.sql.calcite.expression.RowExtraction; +import io.druid.sql.calcite.planner.DruidOperatorTable; import io.druid.sql.calcite.planner.PlannerContext; import io.druid.sql.calcite.table.RowSignature; import org.apache.calcite.rel.core.AggregateCall; @@ -64,6 +65,7 @@ public SqlAggFunction calciteFunction() public Aggregation toDruidAggregation( final String name, final RowSignature rowSignature, + final DruidOperatorTable operatorTable, final PlannerContext plannerContext, final List existingAggregations, final Project project, @@ -72,6 +74,7 @@ public Aggregation toDruidAggregation( ) { final RowExtraction rex = Expressions.toRowExtraction( + operatorTable, plannerContext, rowSignature.getRowOrder(), Expressions.fromFieldAccess( diff --git a/extensions-core/histogram/src/test/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java b/extensions-core/histogram/src/test/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java index 47698cd8ea61..e4eb972d320a 100644 --- a/extensions-core/histogram/src/test/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java +++ b/extensions-core/histogram/src/test/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java @@ -43,6 +43,7 @@ import io.druid.segment.TestHelper; import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.sql.calcite.aggregation.SqlAggregator; +import io.druid.sql.calcite.expression.SqlExtractionOperator; import io.druid.sql.calcite.filtration.Filtration; import io.druid.sql.calcite.planner.Calcites; import io.druid.sql.calcite.planner.DruidOperatorTable; @@ -130,9 +131,8 @@ public void setUp() throws Exception ) ); final DruidOperatorTable operatorTable = new DruidOperatorTable( - ImmutableSet.of( - new QuantileSqlAggregator() - ) + ImmutableSet.of(new QuantileSqlAggregator()), + ImmutableSet.of() ); plannerFactory = new PlannerFactory(rootSchema, walker, operatorTable, plannerConfig); } diff --git a/sql/pom.xml b/sql/pom.xml index 3b93e39e9fba..f1287afd4d73 100644 --- a/sql/pom.xml +++ b/sql/pom.xml @@ -71,6 +71,11 @@ test-jar test + + org.easymock + easymock + test + io.druid druid-processing diff --git a/sql/src/main/java/io/druid/sql/calcite/aggregation/ApproxCountDistinctSqlAggregator.java b/sql/src/main/java/io/druid/sql/calcite/aggregation/ApproxCountDistinctSqlAggregator.java index 6209d5ef758d..cd6d1b6b54e5 100644 --- a/sql/src/main/java/io/druid/sql/calcite/aggregation/ApproxCountDistinctSqlAggregator.java +++ b/sql/src/main/java/io/druid/sql/calcite/aggregation/ApproxCountDistinctSqlAggregator.java @@ -33,6 +33,7 @@ import io.druid.sql.calcite.expression.Expressions; import io.druid.sql.calcite.expression.RowExtraction; import io.druid.sql.calcite.planner.Calcites; +import io.druid.sql.calcite.planner.DruidOperatorTable; import io.druid.sql.calcite.planner.PlannerContext; import io.druid.sql.calcite.table.RowSignature; import org.apache.calcite.rel.core.AggregateCall; @@ -63,6 +64,7 @@ public SqlAggFunction calciteFunction() public Aggregation toDruidAggregation( final String name, final RowSignature rowSignature, + final DruidOperatorTable operatorTable, final PlannerContext plannerContext, final List existingAggregations, final Project project, @@ -76,6 +78,7 @@ public Aggregation toDruidAggregation( Iterables.getOnlyElement(aggregateCall.getArgList()) ); final RowExtraction rex = Expressions.toRowExtraction( + operatorTable, plannerContext, rowSignature.getRowOrder(), rexNode diff --git a/sql/src/main/java/io/druid/sql/calcite/aggregation/SqlAggregator.java b/sql/src/main/java/io/druid/sql/calcite/aggregation/SqlAggregator.java index 9539762ce09d..2d333b427eac 100644 --- a/sql/src/main/java/io/druid/sql/calcite/aggregation/SqlAggregator.java +++ b/sql/src/main/java/io/druid/sql/calcite/aggregation/SqlAggregator.java @@ -20,6 +20,7 @@ package io.druid.sql.calcite.aggregation; import io.druid.query.filter.DimFilter; +import io.druid.sql.calcite.planner.DruidOperatorTable; import io.druid.sql.calcite.planner.PlannerContext; import io.druid.sql.calcite.table.RowSignature; import org.apache.calcite.rel.core.AggregateCall; @@ -46,6 +47,7 @@ public interface SqlAggregator * * @param name desired output name of the aggregation * @param rowSignature signature of the rows being aggregated + * @param operatorTable Operator table that can be used to convert sub-expressions * @param plannerContext SQL planner context * @param existingAggregations existing aggregations for this query; useful for re-using aggregations. May be safely * ignored if you do not want to re-use existing aggregations. @@ -59,6 +61,7 @@ public interface SqlAggregator Aggregation toDruidAggregation( final String name, final RowSignature rowSignature, + final DruidOperatorTable operatorTable, final PlannerContext plannerContext, final List existingAggregations, final Project project, diff --git a/sql/src/main/java/io/druid/sql/calcite/expression/AbstractExpressionConversion.java b/sql/src/main/java/io/druid/sql/calcite/expression/AbstractExpressionConversion.java deleted file mode 100644 index 3633dc0b88a3..000000000000 --- a/sql/src/main/java/io/druid/sql/calcite/expression/AbstractExpressionConversion.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.sql.calcite.expression; - -import org.apache.calcite.sql.SqlKind; - -public abstract class AbstractExpressionConversion implements ExpressionConversion -{ - private final SqlKind kind; - private final String operatorName; - - public AbstractExpressionConversion(SqlKind kind) - { - this(kind, null); - } - - public AbstractExpressionConversion(SqlKind kind, String operatorName) - { - this.kind = kind; - this.operatorName = operatorName; - - if (kind == SqlKind.OTHER_FUNCTION && operatorName == null) { - throw new NullPointerException("operatorName must be non-null for kind OTHER_FUNCTION"); - } else if (kind != SqlKind.OTHER_FUNCTION && operatorName != null) { - throw new NullPointerException("operatorName must be non-null for kind " + kind); - } - } - - @Override - public SqlKind sqlKind() - { - return kind; - } - - @Override - public String operatorName() - { - return operatorName; - } -} diff --git a/sql/src/main/java/io/druid/sql/calcite/expression/CharLengthExpressionConversion.java b/sql/src/main/java/io/druid/sql/calcite/expression/CharacterLengthExtractionOperator.java similarity index 72% rename from sql/src/main/java/io/druid/sql/calcite/expression/CharLengthExpressionConversion.java rename to sql/src/main/java/io/druid/sql/calcite/expression/CharacterLengthExtractionOperator.java index 746037b66d16..1697f0b24d33 100644 --- a/sql/src/main/java/io/druid/sql/calcite/expression/CharLengthExpressionConversion.java +++ b/sql/src/main/java/io/druid/sql/calcite/expression/CharacterLengthExtractionOperator.java @@ -20,37 +20,38 @@ package io.druid.sql.calcite.expression; import io.druid.query.extraction.StrlenExtractionFn; +import io.druid.sql.calcite.planner.DruidOperatorTable; import io.druid.sql.calcite.planner.PlannerContext; import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexNode; -import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlFunction; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; import java.util.List; -public class CharLengthExpressionConversion extends AbstractExpressionConversion +public class CharacterLengthExtractionOperator implements SqlExtractionOperator { - private static final CharLengthExpressionConversion INSTANCE = new CharLengthExpressionConversion(); - - private CharLengthExpressionConversion() - { - super(SqlKind.OTHER_FUNCTION, "CHAR_LENGTH"); - } - - public static CharLengthExpressionConversion instance() + @Override + public SqlFunction calciteFunction() { - return INSTANCE; + return SqlStdOperatorTable.CHAR_LENGTH; } @Override public RowExtraction convert( - final ExpressionConverter converter, + final DruidOperatorTable operatorTable, final PlannerContext plannerContext, final List rowOrder, final RexNode expression ) { final RexCall call = (RexCall) expression; - final RowExtraction arg = converter.convert(plannerContext, rowOrder, call.getOperands().get(0)); + final RowExtraction arg = Expressions.toRowExtraction( + operatorTable, + plannerContext, + rowOrder, + call.getOperands().get(0) + ); if (arg == null) { return null; } diff --git a/sql/src/main/java/io/druid/sql/calcite/expression/ExpressionConverter.java b/sql/src/main/java/io/druid/sql/calcite/expression/ExpressionConverter.java deleted file mode 100644 index 411838963bb1..000000000000 --- a/sql/src/main/java/io/druid/sql/calcite/expression/ExpressionConverter.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.sql.calcite.expression; - -import com.google.common.collect.Maps; -import io.druid.java.util.common.ISE; -import io.druid.sql.calcite.planner.PlannerContext; -import org.apache.calcite.avatica.util.TimeUnitRange; -import org.apache.calcite.rex.RexCall; -import org.apache.calcite.rex.RexInputRef; -import org.apache.calcite.rex.RexNode; -import org.apache.calcite.sql.SqlKind; -import org.apache.calcite.sql.type.SqlTypeName; - -import java.util.List; -import java.util.Map; - -public class ExpressionConverter -{ - private final Map kindMap; - private final Map otherFunctionMap; - - private ExpressionConverter( - Map kindMap, - Map otherFunctionMap - ) - { - this.kindMap = kindMap; - this.otherFunctionMap = otherFunctionMap; - } - - public static ExpressionConverter create(final List conversions) - { - final Map kindMap = Maps.newHashMap(); - final Map otherFunctionMap = Maps.newHashMap(); - - for (final ExpressionConversion conversion : conversions) { - if (conversion.sqlKind() != SqlKind.OTHER_FUNCTION) { - if (kindMap.put(conversion.sqlKind(), conversion) != null) { - throw new ISE("Oops, can't have two conversions for sqlKind[%s]", conversion.sqlKind()); - } - } else { - // kind is OTHER_FUNCTION - if (otherFunctionMap.put(conversion.operatorName(), conversion) != null) { - throw new ISE( - "Oops, can't have two conversions for sqlKind[%s], operatorName[%s]", - conversion.sqlKind(), - conversion.operatorName() - ); - } - } - } - - return new ExpressionConverter(kindMap, otherFunctionMap); - } - - /** - * Translate a row-expression to a Druid row extraction. Note that this signature will probably need to change - * once we support extractions from multiple columns. - * - * @param plannerContext SQL planner context - * @param rowOrder order of fields in the Druid rows to be extracted from - * @param expression expression meant to be applied on top of the table - * - * @return (columnName, extractionFn) or null - */ - public RowExtraction convert(PlannerContext plannerContext, List rowOrder, RexNode expression) - { - if (expression.getKind() == SqlKind.INPUT_REF) { - final RexInputRef ref = (RexInputRef) expression; - final String columnName = rowOrder.get(ref.getIndex()); - if (columnName == null) { - throw new ISE("WTF?! Expression referred to nonexistent index[%d]", ref.getIndex()); - } - - return RowExtraction.of(columnName, null); - } else if (expression.getKind() == SqlKind.CAST) { - final RexNode operand = ((RexCall) expression).getOperands().get(0); - if (expression.getType().getSqlTypeName() == SqlTypeName.DATE - && operand.getType().getSqlTypeName() == SqlTypeName.TIMESTAMP) { - // Handling casting TIMESTAMP to DATE by flooring to DAY. - return FloorExpressionConversion.applyTimestampFloor( - convert(plannerContext, rowOrder, operand), - TimeUnits.toQueryGranularity(TimeUnitRange.DAY, plannerContext.getTimeZone()) - ); - } else { - // Ignore other casts. - // TODO(gianm): Probably not a good idea to ignore other CASTs like this. - return convert(plannerContext, rowOrder, ((RexCall) expression).getOperands().get(0)); - } - } else { - // Try conversion using an ExpressionConversion specific to this operator. - final RowExtraction retVal; - - if (expression.getKind() == SqlKind.OTHER_FUNCTION) { - final ExpressionConversion conversion = otherFunctionMap.get(((RexCall) expression).getOperator().getName()); - retVal = conversion != null ? conversion.convert(this, plannerContext, rowOrder, expression) : null; - } else { - final ExpressionConversion conversion = kindMap.get(expression.getKind()); - retVal = conversion != null ? conversion.convert(this, plannerContext, rowOrder, expression) : null; - } - - return retVal; - } - } -} diff --git a/sql/src/main/java/io/druid/sql/calcite/expression/Expressions.java b/sql/src/main/java/io/druid/sql/calcite/expression/Expressions.java index 01992dc1b5e8..6f7733db7c69 100644 --- a/sql/src/main/java/io/druid/sql/calcite/expression/Expressions.java +++ b/sql/src/main/java/io/druid/sql/calcite/expression/Expressions.java @@ -21,7 +21,6 @@ import com.google.common.base.Joiner; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -51,8 +50,10 @@ import io.druid.sql.calcite.filtration.Bounds; import io.druid.sql.calcite.filtration.Filtration; import io.druid.sql.calcite.planner.Calcites; +import io.druid.sql.calcite.planner.DruidOperatorTable; import io.druid.sql.calcite.planner.PlannerContext; import io.druid.sql.calcite.table.RowSignature; +import org.apache.calcite.avatica.util.TimeUnitRange; import org.apache.calcite.jdbc.JavaTypeFactoryImpl; import org.apache.calcite.rel.core.Project; import org.apache.calcite.rex.RexCall; @@ -74,15 +75,6 @@ */ public class Expressions { - private static final ExpressionConverter EXPRESSION_CONVERTER = ExpressionConverter.create( - ImmutableList.of( - CharLengthExpressionConversion.instance(), - ExtractExpressionConversion.instance(), - FloorExpressionConversion.instance(), - SubstringExpressionConversion.instance() - ) - ); - private static final Map MATH_FUNCTIONS = ImmutableMap.builder() .put("ABS", "abs") .put("CEIL", "ceil") @@ -153,17 +145,58 @@ public static RexNode fromFieldAccess( * @return RowExtraction or null if not possible */ public static RowExtraction toRowExtraction( + final DruidOperatorTable operatorTable, final PlannerContext plannerContext, final List rowOrder, final RexNode expression ) { - return EXPRESSION_CONVERTER.convert(plannerContext, rowOrder, expression); + if (expression.getKind() == SqlKind.INPUT_REF) { + final RexInputRef ref = (RexInputRef) expression; + final String columnName = rowOrder.get(ref.getIndex()); + if (columnName == null) { + throw new ISE("WTF?! Expression referred to nonexistent index[%d]", ref.getIndex()); + } + + return RowExtraction.of(columnName, null); + } else if (expression.getKind() == SqlKind.CAST) { + final RexNode operand = ((RexCall) expression).getOperands().get(0); + if (expression.getType().getSqlTypeName() == SqlTypeName.DATE + && operand.getType().getSqlTypeName() == SqlTypeName.TIMESTAMP) { + // Handling casting TIMESTAMP to DATE by flooring to DAY. + return FloorExtractionOperator.applyTimestampFloor( + toRowExtraction(operatorTable, plannerContext, rowOrder, operand), + TimeUnits.toQueryGranularity(TimeUnitRange.DAY, plannerContext.getTimeZone()) + ); + } else { + // Ignore other casts. + // TODO(gianm): Probably not a good idea to ignore other CASTs like this. + return toRowExtraction(operatorTable, plannerContext, rowOrder, ((RexCall) expression).getOperands().get(0)); + } + } else { + // Try conversion using a SqlExtractionOperator. + final RowExtraction retVal; + + if (expression instanceof RexCall) { + final SqlExtractionOperator extractionOperator = operatorTable.lookupExtractionOperator( + expression.getKind(), + ((RexCall) expression).getOperator().getName() + ); + + retVal = extractionOperator != null + ? extractionOperator.convert(operatorTable, plannerContext, rowOrder, expression) + : null; + } else { + retVal = null; + } + + return retVal; + } } /** * Translate a Calcite row-expression to a Druid PostAggregator. One day, when possible, this could be folded - * into {@link #toRowExtraction(PlannerContext, List, RexNode)}. + * into {@link #toRowExtraction(DruidOperatorTable, PlannerContext, List, RexNode)} . * * @param name name of the PostAggregator * @param rowOrder order of fields in the Druid rows to be extracted from @@ -241,7 +274,7 @@ public static PostAggregator toPostAggregator( /** * Translate a row-expression to a Druid math expression. One day, when possible, this could be folded into - * {@link #toRowExtraction(PlannerContext, List, RexNode)}. + * {@link #toRowExtraction(DruidOperatorTable, PlannerContext, List, RexNode)}. * * @param rowOrder order of fields in the Druid rows to be extracted from * @param expression expression meant to be applied on top of the rows @@ -367,6 +400,7 @@ public static long toMillisLiteral(final RexNode literal, final DateTimeZone tim * @param expression Calcite row expression */ public static DimFilter toFilter( + final DruidOperatorTable operatorTable, final PlannerContext plannerContext, final RowSignature rowSignature, final RexNode expression @@ -377,7 +411,7 @@ public static DimFilter toFilter( || expression.getKind() == SqlKind.NOT) { final List filters = Lists.newArrayList(); for (final RexNode rexNode : ((RexCall) expression).getOperands()) { - final DimFilter nextFilter = toFilter(plannerContext, rowSignature, rexNode); + final DimFilter nextFilter = toFilter(operatorTable, plannerContext, rowSignature, rexNode); if (nextFilter == null) { return null; } @@ -394,7 +428,7 @@ public static DimFilter toFilter( } } else { // Handle filter conditions on everything else. - return toLeafFilter(plannerContext, rowSignature, expression); + return toLeafFilter(operatorTable, plannerContext, rowSignature, expression); } } @@ -407,6 +441,7 @@ public static DimFilter toFilter( * @param expression Calcite row expression */ private static DimFilter toLeafFilter( + final DruidOperatorTable operatorTable, final PlannerContext plannerContext, final RowSignature rowSignature, final RexNode expression @@ -422,7 +457,8 @@ private static DimFilter toLeafFilter( if (kind == SqlKind.LIKE) { final List operands = ((RexCall) expression).getOperands(); - final RowExtraction rex = EXPRESSION_CONVERTER.convert( + final RowExtraction rex = toRowExtraction( + operatorTable, plannerContext, rowSignature.getRowOrder(), operands.get(0) @@ -462,7 +498,7 @@ private static DimFilter toLeafFilter( } // lhs must be translatable to a RowExtraction to be filterable - final RowExtraction rex = EXPRESSION_CONVERTER.convert(plannerContext, rowSignature.getRowOrder(), lhs); + final RowExtraction rex = toRowExtraction(operatorTable, plannerContext, rowSignature.getRowOrder(), lhs); if (rex == null || !rex.isFilterable(rowSignature)) { return null; } diff --git a/sql/src/main/java/io/druid/sql/calcite/expression/ExtractExpressionConversion.java b/sql/src/main/java/io/druid/sql/calcite/expression/ExtractExtractionOperator.java similarity index 85% rename from sql/src/main/java/io/druid/sql/calcite/expression/ExtractExpressionConversion.java rename to sql/src/main/java/io/druid/sql/calcite/expression/ExtractExtractionOperator.java index a1a59fab0bac..0776a3f1244c 100644 --- a/sql/src/main/java/io/druid/sql/calcite/expression/ExtractExpressionConversion.java +++ b/sql/src/main/java/io/druid/sql/calcite/expression/ExtractExtractionOperator.java @@ -22,32 +22,28 @@ import io.druid.java.util.common.granularity.Granularity; import io.druid.query.extraction.ExtractionFn; import io.druid.query.extraction.TimeFormatExtractionFn; +import io.druid.sql.calcite.planner.DruidOperatorTable; import io.druid.sql.calcite.planner.PlannerContext; import org.apache.calcite.avatica.util.TimeUnitRange; import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; -import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlFunction; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; import java.util.List; -public class ExtractExpressionConversion extends AbstractExpressionConversion +public class ExtractExtractionOperator implements SqlExtractionOperator { - private static final ExtractExpressionConversion INSTANCE = new ExtractExpressionConversion(); - - private ExtractExpressionConversion() - { - super(SqlKind.EXTRACT); - } - - public static ExtractExpressionConversion instance() + @Override + public SqlFunction calciteFunction() { - return INSTANCE; + return SqlStdOperatorTable.EXTRACT; } @Override public RowExtraction convert( - final ExpressionConverter converter, + final DruidOperatorTable operatorTable, final PlannerContext plannerContext, final List rowOrder, final RexNode expression @@ -59,7 +55,7 @@ public RowExtraction convert( final TimeUnitRange timeUnit = (TimeUnitRange) flag.getValue(); final RexNode expr = call.getOperands().get(1); - final RowExtraction rex = converter.convert(plannerContext, rowOrder, expr); + final RowExtraction rex = Expressions.toRowExtraction(operatorTable, plannerContext, rowOrder, expr); if (rex == null) { return null; } diff --git a/sql/src/main/java/io/druid/sql/calcite/expression/FloorExpressionConversion.java b/sql/src/main/java/io/druid/sql/calcite/expression/FloorExtractionOperator.java similarity index 85% rename from sql/src/main/java/io/druid/sql/calcite/expression/FloorExpressionConversion.java rename to sql/src/main/java/io/druid/sql/calcite/expression/FloorExtractionOperator.java index bee2dbe97406..6b803303ec0a 100644 --- a/sql/src/main/java/io/druid/sql/calcite/expression/FloorExpressionConversion.java +++ b/sql/src/main/java/io/druid/sql/calcite/expression/FloorExtractionOperator.java @@ -21,29 +21,19 @@ import io.druid.java.util.common.granularity.Granularity; import io.druid.query.extraction.BucketExtractionFn; +import io.druid.sql.calcite.planner.DruidOperatorTable; import io.druid.sql.calcite.planner.PlannerContext; import org.apache.calcite.avatica.util.TimeUnitRange; import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; -import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlFunction; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; import java.util.List; -public class FloorExpressionConversion extends AbstractExpressionConversion +public class FloorExtractionOperator implements SqlExtractionOperator { - private static final FloorExpressionConversion INSTANCE = new FloorExpressionConversion(); - - private FloorExpressionConversion() - { - super(SqlKind.FLOOR); - } - - public static FloorExpressionConversion instance() - { - return INSTANCE; - } - public static RowExtraction applyTimestampFloor( final RowExtraction rex, final Granularity queryGranularity @@ -62,9 +52,15 @@ public static RowExtraction applyTimestampFloor( ); } + @Override + public SqlFunction calciteFunction() + { + return SqlStdOperatorTable.FLOOR; + } + @Override public RowExtraction convert( - final ExpressionConverter converter, + final DruidOperatorTable operatorTable, final PlannerContext plannerContext, final List rowOrder, final RexNode expression @@ -73,7 +69,7 @@ public RowExtraction convert( final RexCall call = (RexCall) expression; final RexNode arg = call.getOperands().get(0); - final RowExtraction rex = converter.convert(plannerContext, rowOrder, arg); + final RowExtraction rex = Expressions.toRowExtraction(operatorTable, plannerContext, rowOrder, arg); if (rex == null) { return null; } else if (call.getOperands().size() == 1) { diff --git a/sql/src/main/java/io/druid/sql/calcite/expression/LookupExtractionOperator.java b/sql/src/main/java/io/druid/sql/calcite/expression/LookupExtractionOperator.java new file mode 100644 index 000000000000..f5ff486acb9f --- /dev/null +++ b/sql/src/main/java/io/druid/sql/calcite/expression/LookupExtractionOperator.java @@ -0,0 +1,113 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.sql.calcite.expression; + +import com.google.inject.Inject; +import io.druid.query.lookup.LookupReferencesManager; +import io.druid.query.lookup.RegisteredLookupExtractionFn; +import io.druid.sql.calcite.planner.DruidOperatorTable; +import io.druid.sql.calcite.planner.PlannerContext; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlFunction; +import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.type.OperandTypes; +import org.apache.calcite.sql.type.ReturnTypes; +import org.apache.calcite.sql.type.SqlTypeFamily; +import org.apache.calcite.sql.type.SqlTypeName; + +import java.util.List; + +public class LookupExtractionOperator implements SqlExtractionOperator +{ + private static final String NAME = "LOOKUP"; + private static final SqlFunction SQL_FUNCTION = new LookupSqlFunction(); + + private final LookupReferencesManager lookupReferencesManager; + + @Inject + public LookupExtractionOperator(final LookupReferencesManager lookupReferencesManager) + { + this.lookupReferencesManager = lookupReferencesManager; + } + + @Override + public SqlFunction calciteFunction() + { + return SQL_FUNCTION; + } + + @Override + public RowExtraction convert( + final DruidOperatorTable operatorTable, + final PlannerContext plannerContext, + final List rowOrder, + final RexNode expression + ) + { + final RexCall call = (RexCall) expression; + final RowExtraction rex = Expressions.toRowExtraction( + operatorTable, + plannerContext, + rowOrder, + call.getOperands().get(0) + ); + if (rex == null) { + return null; + } + + final String lookupName = RexLiteral.stringValue(call.getOperands().get(1)); + final RegisteredLookupExtractionFn extractionFn = new RegisteredLookupExtractionFn( + lookupReferencesManager, + lookupName, + false, + null, + false, + true + ); + + return RowExtraction.of( + rex.getColumn(), + ExtractionFns.compose(extractionFn, rex.getExtractionFn()) + ); + } + + private static class LookupSqlFunction extends SqlFunction + { + private static final String SIGNATURE = "'" + NAME + "(expression, lookupName)'\n"; + + LookupSqlFunction() + { + super( + NAME, + SqlKind.OTHER_FUNCTION, + ReturnTypes.explicit(SqlTypeName.VARCHAR), + null, + OperandTypes.and( + OperandTypes.sequence(SIGNATURE, OperandTypes.CHARACTER, OperandTypes.LITERAL), + OperandTypes.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.CHARACTER) + ), + SqlFunctionCategory.STRING + ); + } + } +} diff --git a/sql/src/main/java/io/druid/sql/calcite/expression/ExpressionConversion.java b/sql/src/main/java/io/druid/sql/calcite/expression/SqlExtractionOperator.java similarity index 69% rename from sql/src/main/java/io/druid/sql/calcite/expression/ExpressionConversion.java rename to sql/src/main/java/io/druid/sql/calcite/expression/SqlExtractionOperator.java index 901a1c789194..82ef3b4979d9 100644 --- a/sql/src/main/java/io/druid/sql/calcite/expression/ExpressionConversion.java +++ b/sql/src/main/java/io/druid/sql/calcite/expression/SqlExtractionOperator.java @@ -19,41 +19,36 @@ package io.druid.sql.calcite.expression; +import io.druid.sql.calcite.planner.DruidOperatorTable; import io.druid.sql.calcite.planner.PlannerContext; import org.apache.calcite.rex.RexNode; -import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlFunction; import java.util.List; -public interface ExpressionConversion +public interface SqlExtractionOperator { /** - * SQL kind that this converter knows how to convert. + * Returns the SQL operator corresponding to this aggregation function. Should be a singleton. * - * @return sql kind + * @return operator */ - SqlKind sqlKind(); + SqlFunction calciteFunction(); /** - * Operator name, if {@link #sqlKind()} is {@code OTHER_FUNCTION}. + * Returns the Druid {@link RowExtraction} corresponding to a SQL {@code RexNode}. * - * @return operator name, or null - */ - String operatorName(); - - /** - * Translate a row-expression to a Druid column reference. Note that this signature will probably need to change - * once we support extractions from multiple columns. - * - * @param converter converter that can be used to convert sub-expressions + * @param operatorTable Operator table that can be used to convert sub-expressions * @param plannerContext SQL planner context * @param rowOrder order of fields in the Druid rows to be extracted from * @param expression expression meant to be applied on top of the table * * @return (columnName, extractionFn) or null + * + * @see ExpressionConversion#convert(ExpressionConverter, PlannerContext, List, RexNode) */ RowExtraction convert( - ExpressionConverter converter, + DruidOperatorTable operatorTable, PlannerContext plannerContext, List rowOrder, RexNode expression diff --git a/sql/src/main/java/io/druid/sql/calcite/expression/SubstringExpressionConversion.java b/sql/src/main/java/io/druid/sql/calcite/expression/SubstringExtractionOperator.java similarity index 76% rename from sql/src/main/java/io/druid/sql/calcite/expression/SubstringExpressionConversion.java rename to sql/src/main/java/io/druid/sql/calcite/expression/SubstringExtractionOperator.java index 73719badc911..4d2a7a3d9c70 100644 --- a/sql/src/main/java/io/druid/sql/calcite/expression/SubstringExpressionConversion.java +++ b/sql/src/main/java/io/druid/sql/calcite/expression/SubstringExtractionOperator.java @@ -20,38 +20,39 @@ package io.druid.sql.calcite.expression; import io.druid.query.extraction.SubstringDimExtractionFn; +import io.druid.sql.calcite.planner.DruidOperatorTable; import io.druid.sql.calcite.planner.PlannerContext; import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; -import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlFunction; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; import java.util.List; -public class SubstringExpressionConversion extends AbstractExpressionConversion +public class SubstringExtractionOperator implements SqlExtractionOperator { - private static final SubstringExpressionConversion INSTANCE = new SubstringExpressionConversion(); - - private SubstringExpressionConversion() - { - super(SqlKind.OTHER_FUNCTION, "SUBSTRING"); - } - - public static SubstringExpressionConversion instance() + @Override + public SqlFunction calciteFunction() { - return INSTANCE; + return SqlStdOperatorTable.SUBSTRING; } @Override public RowExtraction convert( - final ExpressionConverter converter, + final DruidOperatorTable operatorTable, final PlannerContext plannerContext, final List rowOrder, final RexNode expression ) { final RexCall call = (RexCall) expression; - final RowExtraction arg = converter.convert(plannerContext, rowOrder, call.getOperands().get(0)); + final RowExtraction arg = Expressions.toRowExtraction( + operatorTable, + plannerContext, + rowOrder, + call.getOperands().get(0) + ); if (arg == null) { return null; } diff --git a/sql/src/main/java/io/druid/sql/calcite/planner/DruidConvertletTable.java b/sql/src/main/java/io/druid/sql/calcite/planner/DruidConvertletTable.java index 6d06b2ca0250..e83264498a3e 100644 --- a/sql/src/main/java/io/druid/sql/calcite/planner/DruidConvertletTable.java +++ b/sql/src/main/java/io/druid/sql/calcite/planner/DruidConvertletTable.java @@ -68,7 +68,7 @@ public SqlRexConvertlet get(SqlCall call) { if (call.getKind() == SqlKind.EXTRACT && call.getOperandList().get(1).getKind() != SqlKind.LITERAL) { // Avoid using the standard convertlet for EXTRACT(TIMEUNIT FROM col), since we want to handle it directly - // in ExtractExpressionConversion. + // in ExtractExtractionOperator. return BYPASS_CONVERTLET; } else { final SqlRexConvertlet convertlet = table.get(call.getOperator()); diff --git a/sql/src/main/java/io/druid/sql/calcite/planner/DruidOperatorTable.java b/sql/src/main/java/io/druid/sql/calcite/planner/DruidOperatorTable.java index 5db18afe4802..827eb4546ec8 100644 --- a/sql/src/main/java/io/druid/sql/calcite/planner/DruidOperatorTable.java +++ b/sql/src/main/java/io/druid/sql/calcite/planner/DruidOperatorTable.java @@ -23,8 +23,10 @@ import com.google.inject.Inject; import io.druid.java.util.common.ISE; import io.druid.sql.calcite.aggregation.SqlAggregator; +import io.druid.sql.calcite.expression.SqlExtractionOperator; import org.apache.calcite.sql.SqlFunctionCategory; import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlOperator; import org.apache.calcite.sql.SqlOperatorTable; import org.apache.calcite.sql.SqlSyntax; @@ -40,17 +42,28 @@ public class DruidOperatorTable implements SqlOperatorTable private static final SqlStdOperatorTable STANDARD_TABLE = SqlStdOperatorTable.instance(); private final Map aggregators; + private final Map extractionOperators; @Inject public DruidOperatorTable( - final Set aggregators + final Set aggregators, + final Set extractionOperators ) { this.aggregators = Maps.newHashMap(); + this.extractionOperators = Maps.newHashMap(); + for (SqlAggregator aggregator : aggregators) { final String lcname = aggregator.calciteFunction().getName().toLowerCase(); if (this.aggregators.put(lcname, aggregator) != null) { - throw new ISE("Cannot have two aggregators with name[%s]", lcname); + throw new ISE("Cannot have two operators with name[%s]", lcname); + } + } + + for (SqlExtractionOperator extractionFunction : extractionOperators) { + final String lcname = extractionFunction.calciteFunction().getName().toLowerCase(); + if (this.aggregators.containsKey(lcname) || this.extractionOperators.put(lcname, extractionFunction) != null) { + throw new ISE("Cannot have two operators with name[%s]", lcname); } } } @@ -60,6 +73,16 @@ public SqlAggregator lookupAggregator(final String opName) return aggregators.get(opName.toLowerCase()); } + public SqlExtractionOperator lookupExtractionOperator(final SqlKind kind, final String opName) + { + final SqlExtractionOperator extractionOperator = extractionOperators.get(opName.toLowerCase()); + if (extractionOperator != null && extractionOperator.calciteFunction().getKind() == kind) { + return extractionOperator; + } else { + return null; + } + } + @Override public void lookupOperatorOverloads( final SqlIdentifier opName, @@ -68,12 +91,18 @@ public void lookupOperatorOverloads( final List operatorList ) { - if (opName.names.size() == 1) { + if (opName.names.size() == 1 && syntax == SqlSyntax.FUNCTION) { final SqlAggregator aggregator = aggregators.get(opName.getSimple().toLowerCase()); - if (aggregator != null && syntax == SqlSyntax.FUNCTION) { + if (aggregator != null) { operatorList.add(aggregator.calciteFunction()); } + + final SqlExtractionOperator extractionFunction = extractionOperators.get(opName.getSimple().toLowerCase()); + if (extractionFunction != null) { + operatorList.add(extractionFunction.calciteFunction()); + } } + STANDARD_TABLE.lookupOperatorOverloads(opName, category, syntax, operatorList); } @@ -84,6 +113,9 @@ public List getOperatorList() for (SqlAggregator aggregator : aggregators.values()) { retVal.add(aggregator.calciteFunction()); } + for (SqlExtractionOperator extractionFunction : extractionOperators.values()) { + retVal.add(extractionFunction.calciteFunction()); + } retVal.addAll(STANDARD_TABLE.getOperatorList()); return retVal; } diff --git a/sql/src/main/java/io/druid/sql/calcite/planner/Rules.java b/sql/src/main/java/io/druid/sql/calcite/planner/Rules.java index 475af34b085c..e93ddd0309dc 100644 --- a/sql/src/main/java/io/druid/sql/calcite/planner/Rules.java +++ b/sql/src/main/java/io/druid/sql/calcite/planner/Rules.java @@ -208,13 +208,13 @@ private static List baseRuleSet( // Druid-specific rules. rules.add(new DruidTableScanRule(queryMaker)); - rules.add(DruidFilterRule.instance()); + rules.add(new DruidFilterRule(operatorTable)); if (plannerConfig.getMaxSemiJoinRowsInMemory() > 0) { rules.add(DruidSemiJoinRule.instance()); } - rules.addAll(SelectRules.rules()); + rules.addAll(SelectRules.rules(operatorTable)); rules.addAll(GroupByRules.rules(operatorTable)); return rules.build(); diff --git a/sql/src/main/java/io/druid/sql/calcite/rule/DruidFilterRule.java b/sql/src/main/java/io/druid/sql/calcite/rule/DruidFilterRule.java index 1c8392909d80..7884d238e0ca 100644 --- a/sql/src/main/java/io/druid/sql/calcite/rule/DruidFilterRule.java +++ b/sql/src/main/java/io/druid/sql/calcite/rule/DruidFilterRule.java @@ -21,6 +21,7 @@ import io.druid.query.filter.DimFilter; import io.druid.sql.calcite.expression.Expressions; +import io.druid.sql.calcite.planner.DruidOperatorTable; import io.druid.sql.calcite.rel.DruidRel; import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.RelOptRuleCall; @@ -28,16 +29,12 @@ public class DruidFilterRule extends RelOptRule { - private static final DruidFilterRule INSTANCE = new DruidFilterRule(); + private final DruidOperatorTable operatorTable; - private DruidFilterRule() + public DruidFilterRule(final DruidOperatorTable operatorTable) { super(operand(Filter.class, operand(DruidRel.class, none()))); - } - - public static DruidFilterRule instance() - { - return INSTANCE; + this.operatorTable = operatorTable; } @Override @@ -53,6 +50,7 @@ public void onMatch(RelOptRuleCall call) } final DimFilter dimFilter = Expressions.toFilter( + operatorTable, druidRel.getPlannerContext(), druidRel.getSourceRowSignature(), filter.getCondition() diff --git a/sql/src/main/java/io/druid/sql/calcite/rule/GroupByRules.java b/sql/src/main/java/io/druid/sql/calcite/rule/GroupByRules.java index 4a3d304be2f0..05acca45fafe 100644 --- a/sql/src/main/java/io/druid/sql/calcite/rule/GroupByRules.java +++ b/sql/src/main/java/io/druid/sql/calcite/rule/GroupByRules.java @@ -94,7 +94,7 @@ public static List rules(final DruidOperatorTable operatorTable) new DruidAggregateProjectRule(operatorTable), new DruidAggregateProjectFilterRule(operatorTable), new DruidGroupByPostAggregationRule(), - new DruidGroupByHavingRule(), + new DruidGroupByHavingRule(operatorTable), new DruidGroupByLimitRule() ); } @@ -116,12 +116,13 @@ public FieldOrExpression(String fieldName, String expression) } public static FieldOrExpression fromRexNode( + final DruidOperatorTable operatorTable, final PlannerContext plannerContext, final List rowOrder, final RexNode rexNode ) { - final RowExtraction rex = Expressions.toRowExtraction(plannerContext, rowOrder, rexNode); + final RowExtraction rex = Expressions.toRowExtraction(operatorTable, plannerContext, rowOrder, rexNode); if (rex != null && rex.getExtractionFn() == null) { // This was a simple field access. return fieldName(rex.getColumn()); @@ -302,9 +303,12 @@ public void onMatch(RelOptRuleCall call) public static class DruidGroupByHavingRule extends RelOptRule { - private DruidGroupByHavingRule() + private final DruidOperatorTable operatorTable; + + private DruidGroupByHavingRule(final DruidOperatorTable operatorTable) { super(operand(Filter.class, operand(DruidRel.class, none()))); + this.operatorTable = operatorTable; } @Override @@ -319,7 +323,7 @@ public void onMatch(RelOptRuleCall call) { final Filter postFilter = call.rel(0); final DruidRel druidRel = call.rel(1); - final DruidRel newDruidRel = GroupByRules.applyHaving(druidRel, postFilter); + final DruidRel newDruidRel = GroupByRules.applyHaving(operatorTable, druidRel, postFilter); if (newDruidRel != null) { call.transformTo(newDruidRel); } @@ -395,7 +399,12 @@ private static DruidRel applyAggregate( // Filter that should be applied before aggregating. final DimFilter filter; if (filter0 != null) { - filter = Expressions.toFilter(druidRel.getPlannerContext(), sourceRowSignature, filter0.getCondition()); + filter = Expressions.toFilter( + operatorTable, + druidRel.getPlannerContext(), + sourceRowSignature, + filter0.getCondition() + ); if (filter == null) { // Can't plan this filter. return null; @@ -435,6 +444,7 @@ private static DruidRel applyAggregate( } else { final RexNode rexNode = Expressions.fromFieldAccess(sourceRowSignature, project, i); final RowExtraction rex = Expressions.toRowExtraction( + operatorTable, druidRel.getPlannerContext(), sourceRowSignature.getRowOrder(), rexNode @@ -590,11 +600,16 @@ private static boolean canApplyHaving(final DruidRel druidRel) * * @return new rel, or null if the filter cannot be applied */ - private static DruidRel applyHaving(final DruidRel druidRel, final Filter postFilter) + private static DruidRel applyHaving( + final DruidOperatorTable operatorTable, + final DruidRel druidRel, + final Filter postFilter + ) { Preconditions.checkState(canApplyHaving(druidRel), "Cannot applyHaving."); final DimFilter dimFilter = Expressions.toFilter( + operatorTable, druidRel.getPlannerContext(), druidRel.getOutputRowSignature(), postFilter.getCondition() @@ -751,7 +766,7 @@ private static Aggregation translateAggregateCall( } final RexNode expression = project.getChildExps().get(call.filterArg); - final DimFilter filter = Expressions.toFilter(plannerContext, sourceRowSignature, expression); + final DimFilter filter = Expressions.toFilter(operatorTable, plannerContext, sourceRowSignature, expression); if (filter == null) { return null; } @@ -767,6 +782,7 @@ private static Aggregation translateAggregateCall( return approximateCountDistinct ? APPROX_COUNT_DISTINCT.toDruidAggregation( name, sourceRowSignature, + operatorTable, plannerContext, existingAggregations, project, @@ -785,7 +801,7 @@ private static Aggregation translateAggregateCall( final int inputField = Iterables.getOnlyElement(call.getArgList()); final RexNode rexNode = Expressions.fromFieldAccess(sourceRowSignature, project, inputField); - final FieldOrExpression foe = FieldOrExpression.fromRexNode(plannerContext, rowOrder, rexNode); + final FieldOrExpression foe = FieldOrExpression.fromRexNode(operatorTable, plannerContext, rowOrder, rexNode); if (foe != null) { input = foe; @@ -804,6 +820,7 @@ private static Aggregation translateAggregateCall( // Operand 1: Filter final DimFilter filter = Expressions.toFilter( + operatorTable, plannerContext, sourceRowSignature, caseCall.getOperands().get(0) @@ -831,7 +848,7 @@ private static Aggregation translateAggregateCall( input = null; } else if (RexLiteral.isNullLiteral(arg2)) { // Maybe case A - input = FieldOrExpression.fromRexNode(plannerContext, rowOrder, arg1); + input = FieldOrExpression.fromRexNode(operatorTable, plannerContext, rowOrder, arg1); if (input == null) { return null; } @@ -904,6 +921,7 @@ private static Aggregation translateAggregateCall( return sqlAggregator != null ? sqlAggregator.toDruidAggregation( name, sourceRowSignature, + operatorTable, plannerContext, existingAggregations, project, diff --git a/sql/src/main/java/io/druid/sql/calcite/rule/SelectRules.java b/sql/src/main/java/io/druid/sql/calcite/rule/SelectRules.java index 823b9efc4dee..377106e44519 100644 --- a/sql/src/main/java/io/druid/sql/calcite/rule/SelectRules.java +++ b/sql/src/main/java/io/druid/sql/calcite/rule/SelectRules.java @@ -31,6 +31,7 @@ import io.druid.sql.calcite.expression.Expressions; import io.druid.sql.calcite.expression.RowExtraction; import io.druid.sql.calcite.planner.Calcites; +import io.druid.sql.calcite.planner.DruidOperatorTable; import io.druid.sql.calcite.rel.DruidRel; import io.druid.sql.calcite.rel.SelectProjection; import io.druid.sql.calcite.table.RowSignature; @@ -45,26 +46,27 @@ public class SelectRules { - private static final List RULES = ImmutableList.of( - new DruidSelectProjectionRule(), - new DruidSelectSortRule() - ); - private SelectRules() { // No instantiation. } - public static List rules() + public static List rules(final DruidOperatorTable operatorTable) { - return RULES; + return ImmutableList.of( + new DruidSelectProjectionRule(operatorTable), + new DruidSelectSortRule() + ); } static class DruidSelectProjectionRule extends RelOptRule { - private DruidSelectProjectionRule() + private final DruidOperatorTable operatorTable; + + public DruidSelectProjectionRule(final DruidOperatorTable operatorTable) { super(operand(Project.class, operand(DruidRel.class, none()))); + this.operatorTable = operatorTable; } @Override @@ -95,6 +97,7 @@ public void onMatch(RelOptRuleCall call) for (int i = 0; i < project.getRowType().getFieldCount(); i++) { final RexNode rexNode = project.getChildExps().get(i); final RowExtraction rex = Expressions.toRowExtraction( + operatorTable, druidRel.getPlannerContext(), sourceRowSignature.getRowOrder(), rexNode diff --git a/sql/src/main/java/io/druid/sql/guice/SqlBindings.java b/sql/src/main/java/io/druid/sql/guice/SqlBindings.java index 5ab705c044ac..c0bbc4fc74dd 100644 --- a/sql/src/main/java/io/druid/sql/guice/SqlBindings.java +++ b/sql/src/main/java/io/druid/sql/guice/SqlBindings.java @@ -22,6 +22,7 @@ import com.google.inject.Binder; import com.google.inject.multibindings.Multibinder; import io.druid.sql.calcite.aggregation.SqlAggregator; +import io.druid.sql.calcite.expression.SqlExtractionOperator; public class SqlBindings { @@ -33,4 +34,13 @@ public static void addAggregator( final Multibinder setBinder = Multibinder.newSetBinder(binder, SqlAggregator.class); setBinder.addBinding().to(aggregatorClass); } + + public static void addExtractionOperator( + final Binder binder, + final Class clazz + ) + { + final Multibinder setBinder = Multibinder.newSetBinder(binder, SqlExtractionOperator.class); + setBinder.addBinding().to(clazz); + } } diff --git a/sql/src/main/java/io/druid/sql/guice/SqlModule.java b/sql/src/main/java/io/druid/sql/guice/SqlModule.java index e285b4b4360c..a2a4aae86528 100644 --- a/sql/src/main/java/io/druid/sql/guice/SqlModule.java +++ b/sql/src/main/java/io/druid/sql/guice/SqlModule.java @@ -20,6 +20,7 @@ package io.druid.sql.guice; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.inject.Binder; import com.google.inject.Inject; import com.google.inject.Module; @@ -34,16 +35,36 @@ import io.druid.sql.avatica.AvaticaServerConfig; import io.druid.sql.avatica.DruidAvaticaHandler; import io.druid.sql.calcite.aggregation.ApproxCountDistinctSqlAggregator; +import io.druid.sql.calcite.aggregation.SqlAggregator; +import io.druid.sql.calcite.expression.CharacterLengthExtractionOperator; +import io.druid.sql.calcite.expression.ExtractExtractionOperator; +import io.druid.sql.calcite.expression.FloorExtractionOperator; +import io.druid.sql.calcite.expression.LookupExtractionOperator; +import io.druid.sql.calcite.expression.SqlExtractionOperator; +import io.druid.sql.calcite.expression.SubstringExtractionOperator; import io.druid.sql.calcite.planner.Calcites; import io.druid.sql.calcite.planner.PlannerConfig; import io.druid.sql.calcite.schema.DruidSchema; import io.druid.sql.http.SqlResource; import org.apache.calcite.schema.SchemaPlus; +import java.util.List; import java.util.Properties; public class SqlModule implements Module { + public static final List> DEFAULT_AGGREGATOR_CLASSES = ImmutableList.>of( + ApproxCountDistinctSqlAggregator.class + ); + + public static final List> DEFAULT_EXTRACTION_OPERATOR_CLASSES = ImmutableList.>of( + CharacterLengthExtractionOperator.class, + ExtractExtractionOperator.class, + FloorExtractionOperator.class, + LookupExtractionOperator.class, + SubstringExtractionOperator.class + ); + private static final String PROPERTY_SQL_ENABLE = "druid.sql.enable"; private static final String PROPERTY_SQL_ENABLE_JSON_OVER_HTTP = "druid.sql.http.enable"; private static final String PROPERTY_SQL_ENABLE_AVATICA = "druid.sql.avatica.enable"; @@ -64,7 +85,14 @@ public void configure(Binder binder) JsonConfigProvider.bind(binder, "druid.sql.planner", PlannerConfig.class); JsonConfigProvider.bind(binder, "druid.sql.avatica", AvaticaServerConfig.class); LifecycleModule.register(binder, DruidSchema.class); - SqlBindings.addAggregator(binder, ApproxCountDistinctSqlAggregator.class); + + for (Class clazz : DEFAULT_AGGREGATOR_CLASSES) { + SqlBindings.addAggregator(binder, clazz); + } + + for (Class clazz : DEFAULT_EXTRACTION_OPERATOR_CLASSES) { + SqlBindings.addExtractionOperator(binder, clazz); + } if (isJsonOverHttpEnabled()) { Jerseys.addResource(binder, SqlResource.class); diff --git a/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java index cc406ea60a75..1fd4555c1a24 100644 --- a/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java @@ -68,6 +68,7 @@ import io.druid.query.groupby.having.DimFilterHavingSpec; import io.druid.query.groupby.orderby.DefaultLimitSpec; import io.druid.query.groupby.orderby.OrderByColumnSpec; +import io.druid.query.lookup.RegisteredLookupExtractionFn; import io.druid.query.ordering.StringComparator; import io.druid.query.ordering.StringComparators; import io.druid.query.select.PagingSpec; @@ -2844,7 +2845,12 @@ public void testGroupByFloorWithOrderBy() throws Exception .setGranularity(Granularities.ALL) .setDimensions( DIMS( - new ExtractionDimensionSpec("dim1", "d0", ValueType.FLOAT, new BucketExtractionFn(1.0, 0.0)) + new ExtractionDimensionSpec( + "dim1", + "d0", + ValueType.FLOAT, + new BucketExtractionFn(1.0, 0.0) + ) ) ) .setAggregatorSpecs(AGGS(new CountAggregatorFactory("a0"))) @@ -2973,6 +2979,94 @@ public void testGroupByStringLength() throws Exception ); } + @Test + public void testFilterAndGroupByLookup() throws Exception + { + final RegisteredLookupExtractionFn extractionFn = new RegisteredLookupExtractionFn( + null, + "lookyloo", + false, + null, + false, + true + ); + + testQuery( + "SELECT LOOKUP(dim1, 'lookyloo'), COUNT(*) FROM foo\n" + + "WHERE LOOKUP(dim1, 'lookyloo') <> 'xxx'\n" + + "GROUP BY LOOKUP(dim1, 'lookyloo')", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setInterval(QSS(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimFilter( + NOT(SELECTOR( + "dim1", + "xxx", + extractionFn + )) + ) + .setDimensions( + DIMS( + new ExtractionDimensionSpec( + "dim1", + "d0", + ValueType.STRING, + extractionFn + ) + ) + ) + .setAggregatorSpecs( + AGGS( + new CountAggregatorFactory("a0") + ) + ) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{"", 5L}, + new Object[]{"xabc", 1L} + ) + ); + } + + @Test + public void testCountDistinctOfLookup() throws Exception + { + final RegisteredLookupExtractionFn extractionFn = new RegisteredLookupExtractionFn( + null, + "lookyloo", + false, + null, + false, + true + ); + + testQuery( + "SELECT COUNT(DISTINCT LOOKUP(dim1, 'lookyloo')) FROM foo", + ImmutableList.of( + Druids.newTimeseriesQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(QSS(Filtration.eternity())) + .granularity(Granularities.ALL) + .aggregators(AGGS( + new CardinalityAggregatorFactory( + "a0", + ImmutableList.of(new ExtractionDimensionSpec("dim1", null, extractionFn)), + false + ) + )) + .context(TIMESERIES_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{2L} + ) + ); + } + @Test public void testTimeseries() throws Exception { diff --git a/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java index 9bf1205c2af0..e65bb6b52fa9 100644 --- a/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java @@ -24,7 +24,10 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; +import com.google.inject.Binder; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Module; import io.druid.collections.StupidPool; import io.druid.data.input.InputRow; import io.druid.data.input.impl.DimensionsSpec; @@ -42,10 +45,15 @@ import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; +import io.druid.query.extraction.MapLookupExtractor; import io.druid.query.groupby.GroupByQuery; import io.druid.query.groupby.GroupByQueryConfig; import io.druid.query.groupby.GroupByQueryRunnerTest; import io.druid.query.groupby.strategy.GroupByStrategySelector; +import io.druid.query.lookup.LookupExtractor; +import io.druid.query.lookup.LookupExtractorFactory; +import io.druid.query.lookup.LookupIntrospectHandler; +import io.druid.query.lookup.LookupReferencesManager; import io.druid.query.metadata.SegmentMetadataQueryConfig; import io.druid.query.metadata.SegmentMetadataQueryQueryToolChest; import io.druid.query.metadata.SegmentMetadataQueryRunnerFactory; @@ -67,19 +75,24 @@ import io.druid.segment.QueryableIndex; import io.druid.segment.TestHelper; import io.druid.segment.incremental.IncrementalIndexSchema; -import io.druid.sql.calcite.aggregation.ApproxCountDistinctSqlAggregator; import io.druid.sql.calcite.aggregation.SqlAggregator; +import io.druid.sql.calcite.expression.SqlExtractionOperator; import io.druid.sql.calcite.planner.DruidOperatorTable; import io.druid.sql.calcite.planner.PlannerConfig; import io.druid.sql.calcite.schema.DruidSchema; +import io.druid.sql.guice.SqlModule; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.LinearShardSpec; +import org.easymock.EasyMock; import org.joda.time.DateTime; +import javax.annotation.Nullable; import java.io.File; import java.nio.ByteBuffer; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; /** * Utility functions for Calcite tests. @@ -269,7 +282,78 @@ public static SpecificSegmentsQuerySegmentWalker createMockWalker(final File tmp public static DruidOperatorTable createOperatorTable() { - return new DruidOperatorTable(ImmutableSet.of(new ApproxCountDistinctSqlAggregator())); + try { + final Injector injector = Guice.createInjector( + new Module() + { + @Override + public void configure(final Binder binder) + { + // This Module is just to get a LookupReferencesManager with a usable "lookyloo" lookup. + + final LookupReferencesManager mock = EasyMock.createMock(LookupReferencesManager.class); + EasyMock.expect(mock.get(EasyMock.eq("lookyloo"))).andReturn( + new LookupExtractorFactory() + { + @Override + public boolean start() + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean close() + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean replaces(@Nullable final LookupExtractorFactory other) + { + throw new UnsupportedOperationException(); + } + + @Nullable + @Override + public LookupIntrospectHandler getIntrospectHandler() + { + throw new UnsupportedOperationException(); + } + + @Override + public LookupExtractor get() + { + return new MapLookupExtractor( + ImmutableMap.of( + "a", "xa", + "abc", "xabc" + ), + false + ); + } + } + ).anyTimes(); + EasyMock.replay(mock); + binder.bind(LookupReferencesManager.class).toInstance(mock); + } + } + ); + final Set aggregators = new HashSet<>(); + final Set extractionOperators = new HashSet<>(); + + for (Class clazz : SqlModule.DEFAULT_AGGREGATOR_CLASSES) { + aggregators.add(injector.getInstance(clazz)); + } + + for (Class clazz : SqlModule.DEFAULT_EXTRACTION_OPERATOR_CLASSES) { + extractionOperators.add(injector.getInstance(clazz)); + } + + return new DruidOperatorTable(aggregators, extractionOperators); + } + catch (Exception e) { + throw Throwables.propagate(e); + } } public static DruidSchema createMockSchema(