Skip to content

Commit

Permalink
Allow adding calcite rules from extensions (#12715)
Browse files Browse the repository at this point in the history
* Allow adding calcite rules from extensions

* fixup! Allow adding calcite rules from extensions

* Move Rules to CalciteRulesManager

* fixup! Move Rules to CalciteRulesManager
  • Loading branch information
rohangarg authored Jul 6, 2022
1 parent 49fefff commit d732de9
Show file tree
Hide file tree
Showing 14 changed files with 155 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
Expand All @@ -39,6 +40,7 @@
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.expression.SqlOperatorConversion;
import org.apache.druid.sql.calcite.expression.builtin.QueryLookupOperatorConversion;
import org.apache.druid.sql.calcite.planner.CalciteRulesManager;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
import org.apache.druid.sql.calcite.planner.DruidPlanner;
Expand Down Expand Up @@ -431,7 +433,8 @@ public void setup()
plannerConfig,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
CalciteTests.getJsonMapper(),
CalciteTests.DRUID_SCHEMA_NAME
CalciteTests.DRUID_SCHEMA_NAME,
new CalciteRulesManager(ImmutableSet.of())
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
Expand All @@ -37,6 +38,7 @@
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.sql.calcite.SqlVectorizedExpressionSanityTest;
import org.apache.druid.sql.calcite.planner.CalciteRulesManager;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.DruidPlanner;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
Expand Down Expand Up @@ -317,7 +319,8 @@ public void setup()
plannerConfig,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
CalciteTests.getJsonMapper(),
CalciteTests.DRUID_SCHEMA_NAME
CalciteTests.DRUID_SCHEMA_NAME,
new CalciteRulesManager(ImmutableSet.of())
);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.benchmark.query;

import com.google.common.collect.ImmutableSet;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
Expand All @@ -38,6 +39,7 @@
import org.apache.druid.segment.generator.SegmentGenerator;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.sql.calcite.planner.CalciteRulesManager;
import org.apache.druid.sql.calcite.planner.DruidPlanner;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.planner.PlannerFactory;
Expand Down Expand Up @@ -121,7 +123,8 @@ public void setup()
plannerConfig,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
CalciteTests.getJsonMapper(),
CalciteTests.DRUID_SCHEMA_NAME
CalciteTests.DRUID_SCHEMA_NAME,
new CalciteRulesManager(ImmutableSet.of())
);
groupByQuery = GroupByQuery
.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.multibindings.Multibinder;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.PolyBind;
import org.apache.druid.sql.calcite.rule.ExtensionCalciteRuleProvider;
import org.apache.druid.sql.calcite.run.NativeQueryMakerFactory;
import org.apache.druid.sql.calcite.run.QueryMakerFactory;

Expand Down Expand Up @@ -54,5 +56,6 @@ public void configure(Binder binder)

binder.bind(PlannerFactory.class).in(LazySingleton.class);
binder.bind(DruidOperatorTable.class).in(LazySingleton.class);
Multibinder.newSetBinder(binder, ExtensionCalciteRuleProvider.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.sql.calcite.planner;

import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import org.apache.calcite.interpreter.Bindables;
import org.apache.calcite.plan.RelOptLattice;
import org.apache.calcite.plan.RelOptMaterialization;
Expand Down Expand Up @@ -78,13 +79,15 @@
import org.apache.druid.sql.calcite.rule.DruidRelToDruidRule;
import org.apache.druid.sql.calcite.rule.DruidRules;
import org.apache.druid.sql.calcite.rule.DruidTableScanRule;
import org.apache.druid.sql.calcite.rule.ExtensionCalciteRuleProvider;
import org.apache.druid.sql.calcite.rule.FilterJoinExcludePushToChildRule;
import org.apache.druid.sql.calcite.rule.ProjectAggregatePruneUnusedCallRule;
import org.apache.druid.sql.calcite.rule.SortCollapseRule;

import java.util.List;
import java.util.Set;

public class Rules
public class CalciteRulesManager
{
public static final int DRUID_CONVENTION_RULES = 0;
public static final int BINDABLE_CONVENTION_RULES = 1;
Expand All @@ -95,7 +98,7 @@ public class Rules
// Calcite 1.23.0 fixes this issue by not consider expression as reduced if this case happens. However, while
// we are still using Calcite 1.21.0, a workaround is to limit the number of pattern matches to avoid infinite loop.
private static final String HEP_DEFAULT_MATCH_LIMIT_CONFIG_STRING = "druid.sql.planner.hepMatchLimit";
private static final int HEP_DEFAULT_MATCH_LIMIT = Integer.valueOf(
private final int HEP_DEFAULT_MATCH_LIMIT = Integer.valueOf(
System.getProperty(HEP_DEFAULT_MATCH_LIMIT_CONFIG_STRING, "1200")
);

Expand All @@ -107,7 +110,7 @@ public class Rules
// functions).
// 3) JoinCommuteRule (we don't support reordering joins yet).
// 4) JoinPushThroughJoinRule (we don't support reordering joins yet).
private static final List<RelOptRule> BASE_RULES =
private final List<RelOptRule> BASE_RULES =
ImmutableList.of(
AggregateStarTableRule.INSTANCE,
AggregateStarTableRule.INSTANCE2,
Expand All @@ -130,7 +133,7 @@ public class Rules
);

// Rules for scanning via Bindable, embedded directly in RelOptUtil's registerDefaultRules.
private static final List<RelOptRule> DEFAULT_BINDABLE_RULES =
private final List<RelOptRule> DEFAULT_BINDABLE_RULES =
ImmutableList.of(
Bindables.BINDABLE_TABLE_SCAN_RULE,
ProjectTableScanRule.INSTANCE,
Expand All @@ -142,7 +145,7 @@ public class Rules
// 1) ReduceExpressionsRule.JOIN_INSTANCE
// Removed by https://github.com/apache/druid/pull/9941 due to issue in https://github.com/apache/druid/issues/9942
// TODO: Re-enable when https://github.com/apache/druid/issues/9942 is fixed
private static final List<RelOptRule> REDUCTION_RULES =
private final List<RelOptRule> REDUCTION_RULES =
ImmutableList.of(
ReduceExpressionsRule.PROJECT_INSTANCE,
ReduceExpressionsRule.FILTER_INSTANCE,
Expand All @@ -158,7 +161,7 @@ public class Rules
// Omit DateRangeRules due to https://issues.apache.org/jira/browse/CALCITE-1601
// Omit UnionMergeRule since it isn't very effective given how Druid unions currently operate and is potentially
// expensive in terms of planning time.
private static final List<RelOptRule> ABSTRACT_RULES =
private final List<RelOptRule> ABSTRACT_RULES =
ImmutableList.of(
AggregateProjectPullUpConstantsRule.INSTANCE2,
UnionPullUpConstantsRule.INSTANCE,
Expand Down Expand Up @@ -186,7 +189,7 @@ public class Rules
// 4) FilterJoinRule.FILTER_ON_JOIN and FilterJoinRule.JOIN
// Removed by https://github.com/apache/druid/pull/9773 due to issue in https://github.com/apache/druid/issues/9843
// TODO: Re-enable when https://github.com/apache/druid/issues/9843 is fixed
private static final List<RelOptRule> ABSTRACT_RELATIONAL_RULES =
private final List<RelOptRule> ABSTRACT_RELATIONAL_RULES =
ImmutableList.of(
AbstractConverter.ExpandConversionRule.INSTANCE,
AggregateRemoveRule.INSTANCE,
Expand All @@ -198,15 +201,21 @@ public class Rules
SortRemoveRule.INSTANCE
);

private Rules()
private final Set<ExtensionCalciteRuleProvider> extensionCalciteRuleProviderSet;

/**
* Manages the rules for planning of SQL queries via Calcite. Also provides methods for extensions to provide custom
* rules for planning.
* @param extensionCalciteRuleProviderSet the set of custom rules coming from extensions
*/
@Inject
public CalciteRulesManager(final Set<ExtensionCalciteRuleProvider> extensionCalciteRuleProviderSet)
{
// No instantiation.
this.extensionCalciteRuleProviderSet = extensionCalciteRuleProviderSet;
}

public static List<Program> programs(final PlannerContext plannerContext)
public List<Program> programs(final PlannerContext plannerContext)
{


// Program that pre-processes the tree before letting the full-on VolcanoPlanner loose.
final Program preProgram =
Programs.sequence(
Expand All @@ -221,10 +230,12 @@ public static List<Program> programs(final PlannerContext plannerContext)
);
}

private static Program buildHepProgram(Iterable<? extends RelOptRule> rules,
boolean noDag,
RelMetadataProvider metadataProvider,
int matchLimit)
public Program buildHepProgram(
final Iterable<? extends RelOptRule> rules,
final boolean noDag,
final RelMetadataProvider metadataProvider,
final int matchLimit
)
{
final HepProgramBuilder builder = HepProgram.builder();
builder.addMatchLimit(matchLimit);
Expand All @@ -234,7 +245,7 @@ private static Program buildHepProgram(Iterable<? extends RelOptRule> rules,
return Programs.of(builder.build(), noDag, metadataProvider);
}

private static List<RelOptRule> druidConventionRuleSet(final PlannerContext plannerContext)
public List<RelOptRule> druidConventionRuleSet(final PlannerContext plannerContext)
{
final ImmutableList.Builder<RelOptRule> retVal = ImmutableList
.<RelOptRule>builder()
Expand All @@ -245,10 +256,13 @@ private static List<RelOptRule> druidConventionRuleSet(final PlannerContext plan
.add(new ExternalTableScanRule(plannerContext))
.addAll(DruidRules.rules(plannerContext));

for (ExtensionCalciteRuleProvider extensionCalciteRuleProvider : extensionCalciteRuleProviderSet) {
retVal.add(extensionCalciteRuleProvider.getRule(plannerContext));
}
return retVal.build();
}

private static List<RelOptRule> bindableConventionRuleSet(final PlannerContext plannerContext)
public List<RelOptRule> bindableConventionRuleSet(final PlannerContext plannerContext)
{
return ImmutableList.<RelOptRule>builder()
.addAll(baseRuleSet(plannerContext))
Expand All @@ -258,7 +272,7 @@ private static List<RelOptRule> bindableConventionRuleSet(final PlannerContext p
.build();
}

private static List<RelOptRule> baseRuleSet(final PlannerContext plannerContext)
public List<RelOptRule> baseRuleSet(final PlannerContext plannerContext)
{
final PlannerConfig plannerConfig = plannerContext.getPlannerConfig();
final ImmutableList.Builder<RelOptRule> rules = ImmutableList.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ private PlannerResult planWithDruidConvention(

RelNode parameterized = rewriteRelDynamicParameters(possiblyLimitedRoot.rel);
final DruidRel<?> druidRel = (DruidRel<?>) planner.transform(
Rules.DRUID_CONVENTION_RULES,
CalciteRulesManager.DRUID_CONVENTION_RULES,
planner.getEmptyTraitSet()
.replace(DruidConvention.instance())
.plus(root.collation),
Expand Down Expand Up @@ -362,7 +362,7 @@ private PlannerResult planWithBindableConvention(
) throws RelConversionException
{
BindableRel bindableRel = (BindableRel) planner.transform(
Rules.BINDABLE_CONVENTION_RULES,
CalciteRulesManager.BINDABLE_CONVENTION_RULES,
planner.getEmptyTraitSet().replace(BindableConvention.INSTANCE).plus(root.collation),
root.rel
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public class PlannerFactory
private final ObjectMapper jsonMapper;
private final AuthorizerMapper authorizerMapper;
private final String druidSchemaName;
private final CalciteRulesManager calciteRuleManager;

@Inject
public PlannerFactory(
Expand All @@ -82,7 +83,8 @@ public PlannerFactory(
final PlannerConfig plannerConfig,
final AuthorizerMapper authorizerMapper,
final @Json ObjectMapper jsonMapper,
final @DruidSchemaName String druidSchemaName
final @DruidSchemaName String druidSchemaName,
final CalciteRulesManager calciteRuleManager
)
{
this.rootSchema = rootSchema;
Expand All @@ -93,6 +95,7 @@ public PlannerFactory(
this.authorizerMapper = authorizerMapper;
this.jsonMapper = jsonMapper;
this.druidSchemaName = druidSchemaName;
this.calciteRuleManager = calciteRuleManager;
}

/**
Expand Down Expand Up @@ -163,7 +166,7 @@ private FrameworkConfig buildFrameworkConfig(PlannerContext plannerContext)
.traitDefs(ConventionTraitDef.INSTANCE, RelCollationTraitDef.INSTANCE)
.convertletTable(new DruidConvertletTable(plannerContext))
.operatorTable(operatorTable)
.programs(Rules.programs(plannerContext))
.programs(calciteRuleManager.programs(plannerContext))
.executor(new DruidRexExecutor(plannerContext))
.typeSystem(DruidTypeSystem.INSTANCE)
.defaultSchema(rootSchema.getSubSchema(druidSchemaName))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.sql.calcite.rule;

import org.apache.calcite.plan.RelOptRule;
import org.apache.druid.guice.annotations.UnstableApi;
import org.apache.druid.sql.calcite.planner.PlannerContext;

/**
* This interface provides a way to supply custom calcite planning rules from extensions. All the custom rules are
* collected and supplied to the planner which invokes {@link ExtensionCalciteRuleProvider#getRule(PlannerContext)}
* for each of the rule provider per query.
*/
@UnstableApi
public interface ExtensionCalciteRuleProvider
{
RelOptRule getRule(PlannerContext plannerContext);
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.apache.druid.server.security.AuthenticatorMapper;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.Escalator;
import org.apache.druid.sql.calcite.planner.CalciteRulesManager;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
Expand Down Expand Up @@ -219,6 +220,7 @@ public void configure(Binder binder)
.in(LazySingleton.class);
binder.bind(QueryMakerFactory.class).to(NativeQueryMakerFactory.class);
binder.bind(new TypeLiteral<Supplier<DefaultQueryConfig>>(){}).toInstance(Suppliers.ofInstance(new DefaultQueryConfig(ImmutableMap.of())));
binder.bind(CalciteRulesManager.class).toInstance(new CalciteRulesManager(ImmutableSet.of()));
}
}
)
Expand Down Expand Up @@ -906,7 +908,8 @@ public int getMaxRowsPerFrame()
plannerConfig,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
CalciteTests.getJsonMapper(),
CalciteTests.DRUID_SCHEMA_NAME
CalciteTests.DRUID_SCHEMA_NAME,
new CalciteRulesManager(ImmutableSet.of())
)
),
smallFrameConfig,
Expand Down Expand Up @@ -996,7 +999,8 @@ public int getMinRowsPerFrame()
plannerConfig,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
CalciteTests.getJsonMapper(),
CalciteTests.DRUID_SCHEMA_NAME
CalciteTests.DRUID_SCHEMA_NAME,
new CalciteRulesManager(ImmutableSet.of())
)
),
smallFrameConfig,
Expand Down
Loading

0 comments on commit d732de9

Please sign in to comment.