Skip to content

Commit

Permalink
apache#2994 Cache schema for SQL
Browse files Browse the repository at this point in the history
  • Loading branch information
navis committed Dec 24, 2019
1 parent 743e599 commit 0d4f2cc
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 102 deletions.
2 changes: 1 addition & 1 deletion sql/src/main/java/io/druid/sql/avatica/DruidStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public DruidStatement prepare(
try (final DruidPlanner planner = plannerFactory.createPlanner(queryContext)) {
synchronized (lock) {
ensure(State.NEW);
this.plannerResult = planner.plan(query, null, null);
this.plannerResult = planner.plan(query, null);
this.maxRowCount = maxRowCount;
this.query = query;
this.signature = Meta.Signature.create(
Expand Down
61 changes: 10 additions & 51 deletions sql/src/main/java/io/druid/sql/calcite/planner/DruidPlanner.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,12 @@
import org.apache.calcite.tools.ValidationException;
import org.apache.calcite.util.Pair;

import javax.servlet.http.HttpServletRequest;
import java.io.Closeable;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;

public class DruidPlanner implements Closeable, ForwardConstants
{
Expand All @@ -98,11 +92,7 @@ public DruidPlanner(Planner planner, PlannerContext plannerContext)
this.plannerContext = plannerContext;
}

public PlannerResult plan(
final String sql,
final BrokerServerView brokerServerView,
final HttpServletRequest request
)
public PlannerResult plan(final String sql, final BrokerServerView brokerServerView)
throws SqlParseException, ValidationException, RelConversionException
{
final SqlNode source = planner.parse(sql);
Expand All @@ -123,13 +113,13 @@ public PlannerResult plan(
final RelRoot root = planner.rel(validated);

try {
return planWithDruidConvention(source, root, brokerServerView, request);
return planWithDruidConvention(source, root);
}
catch (RelOptPlanner.CannotPlanException e) {
// Try again with BINDABLE convention. Used for querying Values, metadata tables, and fallback.
try {
if (!SqlKind.DML.contains(source.getKind())) {
return planWithBindableConvention(source, root, request);
return planWithBindableConvention(source, root);
}
}
catch (Exception e2) {
Expand All @@ -151,12 +141,7 @@ public void close()
planner.close();
}

private PlannerResult planWithDruidConvention(
final SqlNode source,
final RelRoot root,
final BrokerServerView brokerServerView,
final HttpServletRequest request
) throws RelConversionException
private PlannerResult planWithDruidConvention(final SqlNode source, final RelRoot root) throws RelConversionException
{
final DruidRel<?> druidRel = (DruidRel<?>) planner.transform(
Rules.DRUID_CONVENTION_RULES,
Expand All @@ -169,9 +154,9 @@ private PlannerResult planWithDruidConvention(
if (source.getKind() == SqlKind.EXPLAIN) {
return handleExplain(druidRel, (SqlExplain) source);
} else if (source.getKind() == SqlKind.CREATE_TABLE) {
return handleCTAS(Utils.getFieldNames(root), druidRel, (SqlCreateTable) source, brokerServerView);
return handleCTAS(Utils.getFieldNames(root), druidRel, (SqlCreateTable) source);
} else if (source instanceof SqlInsertDirectory) {
return handleInsertDirectory(Utils.getFieldNames(root), druidRel, (SqlInsertDirectory) source, brokerServerView);
return handleInsertDirectory(Utils.getFieldNames(root), druidRel, (SqlInsertDirectory) source);
}

final QueryMaker queryMaker = druidRel.getQueryMaker();
Expand Down Expand Up @@ -211,11 +196,8 @@ public Object[] apply(final Object[] input)
return new PlannerResult(resultsSupplier, root.validatedRowType);
}

private PlannerResult planWithBindableConvention(
final SqlNode source,
final RelRoot root,
final HttpServletRequest request
) throws RelConversionException
private PlannerResult planWithBindableConvention(final SqlNode source, final RelRoot root)
throws RelConversionException
{
BindableRel bindableRel = (BindableRel) planner.transform(
Rules.BINDABLE_CONVENTION_RULES,
Expand Down Expand Up @@ -321,8 +303,7 @@ private PlannerResult handleExplain(final RelNode rel, final SqlExplain explain)
private PlannerResult handleCTAS(
final List<String> mappedColumns,
final DruidRel<?> druidRel,
final SqlCreateTable source,
final BrokerServerView brokerServerView
final SqlCreateTable source
)
{
boolean temporary = source.isTemporary();
Expand Down Expand Up @@ -384,8 +365,7 @@ private PlannerResult handleCTAS(
private PlannerResult handleInsertDirectory(
final List<String> mappedColumns,
final DruidRel<?> druidRel,
final SqlInsertDirectory source,
final BrokerServerView brokerServerView
final SqlInsertDirectory source
)
{
QueryMaker queryMaker = druidRel.getQueryMaker();
Expand Down Expand Up @@ -457,25 +437,4 @@ private PlannerResult makeResult(final List<String> names, final List values)
RelDataType dataType = typeFactory.createStructType(relTypes, names);
return new PlannerResult(Suppliers.ofInstance(Sequences.<Object[]>of(values.toArray())), dataType);
}

public static void main(String[] args) throws Exception
{
String url = "jdbc:avatica:remote:url=http://localhost:8082/druid/v2/sql/avatica/";

// Set any connection context parameters you need here (see "Connection context" below).
// Or leave empty for default behavior.
Properties connectionProperties = new Properties();

try (Connection connection = DriverManager.getConnection(url, connectionProperties)) {
try (
final Statement statement = connection.createStatement();
final ResultSet resultSet = statement.executeQuery("select * from lineitem")
) {
int x = 0;
while (resultSet.next() && x++ < 10000) {
// Do something
}
}
}
}
}
68 changes: 36 additions & 32 deletions sql/src/main/java/io/druid/sql/calcite/schema/DruidSchema.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,17 @@

import com.fasterxml.jackson.annotation.JacksonInject;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import io.druid.client.TimelineServerView;
import io.druid.common.utils.Sequences;
import io.druid.data.ValueDesc;
import io.druid.guice.ManageLifecycle;
import io.druid.java.util.common.Pair;
import io.druid.query.Query;
import io.druid.query.QueryRunners;
import io.druid.query.QuerySegmentWalker;
Expand All @@ -42,7 +40,7 @@
import io.druid.query.metadata.metadata.SegmentAnalysis;
import io.druid.query.metadata.metadata.SegmentMetadataQuery;
import io.druid.query.metadata.metadata.SegmentMetadataQuery.AnalysisType;
import io.druid.sql.calcite.table.DruidTable;
import io.druid.sql.calcite.table.DruidTable.WithTimestamp;
import io.druid.sql.calcite.table.RowSignature;
import io.druid.sql.calcite.view.DruidViewMacro;
import io.druid.sql.calcite.view.ViewManager;
Expand All @@ -54,15 +52,19 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.function.BiFunction;

@ManageLifecycle
public class DruidSchema extends AbstractSchema
implements BiFunction<String, WithTimestamp, WithTimestamp>
{
public static final String NAME = "druid";
public static final long CACHE_VALID_MSEC = 20_000; // 20 sec

private final QuerySegmentWalker segmentWalker;
private final TimelineServerView serverView;
private final ViewManager viewManager;
private final Map<String, WithTimestamp> cached;

@Inject
public DruidSchema(
Expand All @@ -74,6 +76,7 @@ public DruidSchema(
this.segmentWalker = Preconditions.checkNotNull(segmentWalker, "segmentWalker");
this.serverView = Preconditions.checkNotNull(serverView, "serverView");
this.viewManager = Preconditions.checkNotNull(viewManager, "viewManager");
this.cached = Maps.newConcurrentMap();
}

@Override
Expand All @@ -90,34 +93,7 @@ public Set<String> keySet()
@Override
public Table get(Object key)
{
final TableDataSource dataSource = TableDataSource.of((String) key);
if (serverView.getTimeline(dataSource) == null) {
return null;
}
Supplier<Pair<RowSignature, Long>> supplier = Suppliers.memoize(new Supplier<Pair<RowSignature, Long>>()
{
@Override
public Pair<RowSignature, Long> get()
{
Query<SegmentAnalysis> metaQuery = SegmentMetadataQuery.of(dataSource.getName(), AnalysisType.INTERVAL)
.withId(UUID.randomUUID().toString());
List<SegmentAnalysis> schemas = Sequences.toList(QueryRunners.run(metaQuery, segmentWalker));

long numRows = 0;
Set<String> columns = Sets.newHashSet();
RowSignature.Builder builder = RowSignature.builder();
for (SegmentAnalysis schema : Lists.reverse(schemas)) {
for (Map.Entry<String, ColumnAnalysis> entry : schema.getColumns().entrySet()) {
if (columns.add(entry.getKey())) {
builder.add(entry.getKey(), ValueDesc.of(entry.getValue().getType()));
}
}
numRows += schema.getNumRows();
}
return Pair.of(builder.sort().build(), numRows);
}
});
return new DruidTable(dataSource, supplier);
return cached.compute((String) key, DruidSchema.this);
}

@Override
Expand All @@ -137,4 +113,32 @@ protected Multimap<String, org.apache.calcite.schema.Function> getFunctionMultim
}
return builder.build();
}

@Override
public WithTimestamp apply(String tableName, WithTimestamp prev)
{
if (prev != null && prev.getTimestamp() + CACHE_VALID_MSEC > System.currentTimeMillis()) {
return prev;
}
TableDataSource dataSource = TableDataSource.of(tableName);
if (serverView.getTimeline(dataSource) == null) {
return null;
}
Query<SegmentAnalysis> metaQuery = SegmentMetadataQuery.of(dataSource.getName(), AnalysisType.INTERVAL)
.withId(UUID.randomUUID().toString());
List<SegmentAnalysis> schemas = Sequences.toList(QueryRunners.run(metaQuery, segmentWalker));

long numRows = 0;
Set<String> columns = Sets.newHashSet();
RowSignature.Builder builder = RowSignature.builder();
for (SegmentAnalysis schema : Lists.reverse(schemas)) {
for (Map.Entry<String, ColumnAnalysis> entry : schema.getColumns().entrySet()) {
if (columns.add(entry.getKey())) {
builder.add(entry.getKey(), ValueDesc.of(entry.getValue().getType()));
}
}
numRows += schema.getNumRows();
}
return new WithTimestamp(dataSource, builder.sort().build(), numRows);
}
}
40 changes: 25 additions & 15 deletions sql/src/main/java/io/druid/sql/calcite/table/DruidTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@
package io.druid.sql.calcite.table;

import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import io.druid.java.util.common.Pair;
import io.druid.query.DataSource;
import org.apache.calcite.config.CalciteConnectionConfig;
import org.apache.calcite.plan.RelOptTable;
Expand All @@ -43,18 +40,14 @@
public class DruidTable implements TranslatableTable
{
private final DataSource dataSource;
private final Supplier<Pair<RowSignature, Long>> supplier;
private final RowSignature signature;
private final long rowNum;

public DruidTable(DataSource dataSource, Supplier<Pair<RowSignature, Long>> supplier)
public DruidTable(DataSource dataSource, RowSignature signature, long rowNum)
{
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.supplier = Preconditions.checkNotNull(supplier, "supplier");
}

public DruidTable(DataSource dataSource, RowSignature rowSignature, long numRows)
{
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.supplier = Suppliers.ofInstance(Pair.of(Preconditions.checkNotNull(rowSignature, "rowSignature"), numRows));
this.signature = Preconditions.checkNotNull(signature, "signature");
this.rowNum = rowNum;
}

public DataSource getDataSource()
Expand All @@ -64,7 +57,7 @@ public DataSource getDataSource()

public RowSignature getRowSignature()
{
return supplier.get().lhs;
return signature;
}

@Override
Expand All @@ -76,7 +69,7 @@ public Schema.TableType getJdbcTableType()
@Override
public Statistic getStatistic()
{
return Statistics.of(supplier.get().rhs, ImmutableList.of());
return Statistics.of(rowNum, ImmutableList.of());
}

@Override
Expand Down Expand Up @@ -132,7 +125,24 @@ public String toString()
{
return "DruidTable{" +
"dataSource=" + dataSource +
", rowSignature=" + getRowSignature() +
", rowSignature=" + signature +
", rowNum=" + rowNum +
'}';
}

public static class WithTimestamp extends DruidTable
{
private final long timestamp;

public WithTimestamp(DataSource dataSource, RowSignature signature, long rowNum)
{
super(dataSource, signature, rowNum);
this.timestamp = System.currentTimeMillis();
}

public long getTimestamp()
{
return timestamp;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public TranslatableTable apply(final List<Object> arguments)
try (final DruidPlanner planner = plannerFactory.createPlanner(null)) {
// Using an escalator here is a hack, but it's currently needed to get the row type. Ideally, some
// later refactoring would make this unnecessary, since there is no actual query going out herem.
rowType = planner.plan(viewSql, null, null).rowType();
rowType = planner.plan(viewSql, null).rowType();
}
catch (Exception e) {
throw Throwables.propagate(e);
Expand Down
2 changes: 1 addition & 1 deletion sql/src/main/java/io/druid/sql/http/SqlResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ private Response execute(
final String query = sqlQuery.getQuery();
final long start = System.currentTimeMillis();
try (final DruidPlanner planner = plannerFactory.createPlanner(context)) {
plannerResult = planner.plan(query, brokerServerView, req);
plannerResult = planner.plan(query, brokerServerView);
timeZone = planner.getPlannerContext().getTimeZone();

// Remember which columns are time-typed, so we can emit ISO8601 instead of millis values.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6926,7 +6926,7 @@ private List<Object[]> getResults(
);

try (DruidPlanner planner = plannerFactory.createPlanner(queryContext)) {
final PlannerResult plan = planner.plan(sql, null, null);
final PlannerResult plan = planner.plan(sql, null);
List<Object[]> results = Sequences.toList(plan.run(), Lists.newArrayList());
log.info("result schema " + plan.rowType());
return results;
Expand Down

0 comments on commit 0d4f2cc

Please sign in to comment.