diff --git a/contrib/storage-druid/.gitignore b/contrib/storage-druid/.gitignore new file mode 100644 index 00000000000..9341ff44dc5 --- /dev/null +++ b/contrib/storage-druid/.gitignore @@ -0,0 +1,2 @@ +# Directory to store oauth tokens for testing Googlesheets Storage plugin +/src/test/resources/logback-test.xml diff --git a/contrib/storage-druid/README.md b/contrib/storage-druid/README.md index 479024aa067..5b88b285a05 100644 --- a/contrib/storage-druid/README.md +++ b/contrib/storage-druid/README.md @@ -4,7 +4,7 @@ Drill druid storage plugin allows you to perform SQL queries against Druid datas This storage plugin is part of [Apache Drill](https://github.com/apache/drill) ### Tested with Druid version -[0.22.0](https://github.com/apache/druid/releases/tag/druid-0.22.0) +[30.0.0](https://github.com/apache/druid/releases/tag/druid-0.22.0) ### Druid API @@ -33,27 +33,27 @@ Following is the default registration configuration. ### Druid storage plugin developer notes. -* Building the plugin +* Building the plugin `mvn install -pl contrib/storage-druid` * Building DRILL `mvn clean install -DskipTests` - + * Start Drill In Embedded Mode (mac) ```shell script distribution/target/apache-drill-1.20.0-SNAPSHOT/apache-drill-1.20.0-SNAPSHOT/bin/drill-embedded ``` - + * Starting Druid (Docker and Docker Compose required) ``` cd contrib/storage-druid/src/test/resources/druid docker-compose up -d ``` - + * There is an `Indexing Task Json` in the same folder as the docker compose file. It can be used to ingest the wikipedia datasource. - + * Make sure the druid storage plugin is enabled in Drill. diff --git a/contrib/storage-druid/pom.xml b/contrib/storage-druid/pom.xml index 9a9d8175615..54cf88e887c 100755 --- a/contrib/storage-druid/pom.xml +++ b/contrib/storage-druid/pom.xml @@ -29,7 +29,7 @@ drill-druid-storage Drill : Contrib : Storage : Druid - **/DruidTestSuit.class + **/DruidTestSuite.class @@ -53,13 +53,6 @@ ${project.version} test - - org.assertj - assertj-core - - 3.11.1 - test - diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidBatchRecordReader.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidBatchRecordReader.java new file mode 100644 index 00000000000..d9068954f62 --- /dev/null +++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidBatchRecordReader.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.store.druid; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.drill.common.exceptions.CustomErrorContext; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader; +import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator; +import org.apache.drill.exec.physical.resultSet.ResultSetLoader; +import org.apache.drill.exec.store.druid.DruidSubScan.DruidSubScanSpec; +import org.apache.drill.exec.store.druid.common.DruidFilter; +import org.apache.drill.exec.store.druid.druid.DruidScanResponse; +import org.apache.drill.exec.store.druid.druid.ScanQuery; +import org.apache.drill.exec.store.druid.druid.ScanQueryBuilder; +import org.apache.drill.exec.store.druid.rest.DruidQueryClient; +import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl; +import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.List; + +public class DruidBatchRecordReader implements ManagedReader { + private static final Logger logger = LoggerFactory.getLogger(DruidBatchRecordReader.class); + private static final int BATCH_SIZE = 4096; + private static final ObjectMapper objectMapper = new ObjectMapper(); + private final DruidStoragePlugin plugin; + private final DruidSubScan.DruidSubScanSpec scanSpec; + private final List columns; + private final DruidFilter filter; + private final DruidQueryClient druidQueryClient; + private final DruidOffsetTracker offsetTracker; + private int maxRecordsToRead = -1; + private JsonLoaderBuilder jsonBuilder; + private JsonLoaderImpl jsonLoader; + private SchemaNegotiator negotiator; + private ResultSetLoader resultSetLoader; + private CustomErrorContext errorContext; + + + public DruidBatchRecordReader(DruidSubScan subScan, + DruidSubScanSpec subScanSpec, + List projectedColumns, + int maxRecordsToRead, + DruidStoragePlugin plugin, DruidOffsetTracker offsetTracker) { + this.columns = new ArrayList<>(); + this.maxRecordsToRead = maxRecordsToRead; + this.plugin = plugin; + this.scanSpec = subScanSpec; + this.filter = subScanSpec.getFilter(); + this.druidQueryClient = plugin.getDruidQueryClient(); + this.offsetTracker = offsetTracker; + } + + @Override + public boolean open(SchemaNegotiator negotiator) { + this.negotiator = negotiator; + this.errorContext = this.negotiator.parentErrorContext(); + this.negotiator.batchSize(BATCH_SIZE); + this.negotiator.setErrorContext(errorContext); + + resultSetLoader = this.negotiator.build(); + + + return true; + } + + @Override + public boolean next() { + jsonBuilder = new JsonLoaderBuilder() + .resultSetLoader(resultSetLoader) + .standardOptions(negotiator.queryOptions()) + .errorContext(errorContext); + int eventCounter = 0; + boolean result = false; + try { + String query = getQuery(); + logger.debug("Executing query: {}", query); + DruidScanResponse druidScanResponse = druidQueryClient.executeQuery(query); + setNextOffset(druidScanResponse); + + StringBuilder events = new StringBuilder(); + for (ObjectNode eventNode : druidScanResponse.getEvents()) { + events.append(eventNode); + events.append("\n"); + eventCounter++; + } + + + jsonLoader = (JsonLoaderImpl) jsonBuilder + .fromString(events.toString()) + .build(); + + result = jsonLoader.readBatch(); + + if (eventCounter < BATCH_SIZE) { + return false; + } else { + return result; + } + } catch (Exception e) { + throw UserException + .dataReadError(e) + .message("Failure while executing druid query: " + e.getMessage()) + .addContext(errorContext) + .build(logger); + } + } + + @Override + public void close() { + if (jsonLoader != null) { + jsonLoader.close(); + jsonLoader = null; + } + } + + private String getQuery() throws JsonProcessingException { + int queryThreshold = + maxRecordsToRead >= 0 + ? Math.min(BATCH_SIZE, maxRecordsToRead) + : BATCH_SIZE; + ScanQueryBuilder scanQueryBuilder = plugin.getScanQueryBuilder(); + ScanQuery scanQuery = + scanQueryBuilder.build( + scanSpec.dataSourceName, + columns, + filter, + offsetTracker.getOffset(), + queryThreshold, + scanSpec.getMinTime(), + scanSpec.getMaxTime() + ); + return objectMapper.writeValueAsString(scanQuery); + } + + private void setNextOffset(DruidScanResponse druidScanResponse) { + offsetTracker.setNextOffset(BigInteger.valueOf(druidScanResponse.getEvents().size())); + } +} diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidGroupScan.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidGroupScan.java index 24dce99da6b..55b7bd9871e 100755 --- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidGroupScan.java +++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidGroupScan.java @@ -25,12 +25,14 @@ import org.apache.drill.common.PlanStringBuilder; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.metastore.MetadataProviderManager; import org.apache.drill.exec.physical.EndpointAffinity; import org.apache.drill.exec.physical.base.AbstractGroupScan; import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.ScanStats; import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.exec.store.StoragePluginRegistry; import org.apache.drill.exec.store.schedule.AffinityCreator; @@ -44,6 +46,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -54,7 +57,7 @@ public class DruidGroupScan extends AbstractGroupScan { private static final long DEFAULT_TABLET_SIZE = 1000; private final DruidScanSpec scanSpec; private final DruidStoragePlugin storagePlugin; - + private final MetadataProviderManager metadataProviderManager; private List columns; private boolean filterPushedDown = false; private int maxRecordsToRead; @@ -73,19 +76,20 @@ public DruidGroupScan(@JsonProperty("userName") String userName, pluginRegistry.resolve(storagePluginConfig, DruidStoragePlugin.class), scanSpec, columns, - maxRecordsToRead); + maxRecordsToRead, null); } public DruidGroupScan(String userName, DruidStoragePlugin storagePlugin, DruidScanSpec scanSpec, List columns, - int maxRecordsToRead) { + int maxRecordsToRead, MetadataProviderManager metadataProviderManager) { super(userName); this.storagePlugin = storagePlugin; this.scanSpec = scanSpec; this.columns = columns == null || columns.size() == 0? ALL_COLUMNS : columns; this.maxRecordsToRead = maxRecordsToRead; + this.metadataProviderManager = metadataProviderManager; init(); } @@ -102,6 +106,7 @@ private DruidGroupScan(DruidGroupScan that) { this.filterPushedDown = that.filterPushedDown; this.druidWorkList = that.druidWorkList; this.assignments = that.assignments; + this.metadataProviderManager = that.metadataProviderManager; } @Override @@ -163,7 +168,8 @@ private void init() { getScanSpec().getFilter(), getDatasourceSize(), getDataSourceMinTime(), - getDataSourceMaxTime() + getDataSourceMaxTime(), + getSchema() ) ); druidWorkList.add(druidWork); @@ -225,12 +231,13 @@ public DruidSubScan getSpecificScan(int minorFragmentId) { druidWork.getDruidSubScanSpec().getFilter(), druidWork.getDruidSubScanSpec().getDataSourceSize(), druidWork.getDruidSubScanSpec().getMinTime(), - druidWork.getDruidSubScanSpec().getMaxTime() + druidWork.getDruidSubScanSpec().getMaxTime(), + druidWork.getDruidSubScanSpec().getSchema() ) ); } - return new DruidSubScan(getUserName(), storagePlugin, scanSpecList, this.columns, this.maxRecordsToRead); + return new DruidSubScan(getUserName(), storagePlugin, scanSpecList, this.columns, this.maxRecordsToRead, getSchema()); } @JsonIgnore @@ -283,13 +290,30 @@ public int getMaxRecordsToRead() { return maxRecordsToRead; } + @JsonIgnore + public MetadataProviderManager getMetadataProviderManager() { + return metadataProviderManager; + } + + public TupleMetadata getSchema() { + if (metadataProviderManager == null) { + return null; + } + try { + return metadataProviderManager.getSchemaProvider().read().getSchema(); + } catch (IOException | NullPointerException e) { + return null; + } + } + @Override public String toString() { return new PlanStringBuilder(this) - .field("druidScanSpec", scanSpec) - .field("columns", columns) - .field("druidStoragePlugin", storagePlugin) - .toString(); + .field("druidScanSpec", scanSpec) + .field("columns", columns) + .field("druidStoragePlugin", storagePlugin) + .field("schema", getSchema()) + .toString(); } @Override diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidOffsetTracker.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidOffsetTracker.java new file mode 100644 index 00000000000..16604f0b494 --- /dev/null +++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidOffsetTracker.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.druid; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.math.BigInteger; + +public class DruidOffsetTracker { + private static final Logger logger = LoggerFactory.getLogger(DruidOffsetTracker.class); + private BigInteger nextOffset; + + public DruidOffsetTracker() { + this.nextOffset = BigInteger.ZERO; + } + + public BigInteger getOffset() { + return nextOffset; + } + + public void setNextOffset(BigInteger offset) { + nextOffset = nextOffset.add(offset); + logger.debug("Incrementing offset by {}", offset); + } +} diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidPushDownFilterForScan.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidPushDownFilterForScan.java index ba5ab9dc3b8..3b31f5bc307 100644 --- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidPushDownFilterForScan.java +++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidPushDownFilterForScan.java @@ -70,7 +70,8 @@ public void onMatch(RelOptRuleCall relOptRuleCall) { groupScan.getStoragePlugin(), newScanSpec, groupScan.getColumns(), - groupScan.getMaxRecordsToRead()); + groupScan.getMaxRecordsToRead(), + groupScan.getMetadataProviderManager()); newGroupsScan.setFilterPushedDown(true); ScanPrel newScanPrel = scan.copy(filter.getTraitSet(), newGroupsScan, filter.getRowType()); diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidRecordReader.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidRecordReader.java deleted file mode 100755 index 5c437c3e681..00000000000 --- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidRecordReader.java +++ /dev/null @@ -1,183 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.druid; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.drill.common.exceptions.UserException; -import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.common.util.JacksonUtils; -import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.ops.OperatorContext; -import org.apache.drill.exec.physical.impl.OutputMutator; -import org.apache.drill.exec.store.AbstractRecordReader; -import org.apache.drill.exec.store.druid.common.DruidFilter; -import org.apache.drill.exec.store.druid.druid.DruidScanResponse; -import org.apache.drill.exec.store.druid.druid.ScanQuery; -import org.apache.drill.exec.store.druid.druid.ScanQueryBuilder; -import org.apache.drill.exec.store.druid.rest.DruidQueryClient; -import org.apache.drill.exec.vector.BaseValueVector; -import org.apache.drill.exec.vector.complex.fn.JsonReader; -import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter; -import com.google.common.base.Stopwatch; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Sets; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.math.BigInteger; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -public class DruidRecordReader extends AbstractRecordReader { - - private static final Logger logger = LoggerFactory.getLogger(DruidRecordReader.class); - private static final ObjectMapper objectMapper = JacksonUtils.createObjectMapper(); - private final DruidStoragePlugin plugin; - private final DruidSubScan.DruidSubScanSpec scanSpec; - private final List columns; - private final DruidFilter filter; - private BigInteger nextOffset = BigInteger.ZERO; - private int maxRecordsToRead = -1; - - private JsonReader jsonReader; - private VectorContainerWriter writer; - - private final FragmentContext fragmentContext; - private final DruidQueryClient druidQueryClient; - - public DruidRecordReader(DruidSubScan.DruidSubScanSpec subScanSpec, - List projectedColumns, - int maxRecordsToRead, - FragmentContext context, - DruidStoragePlugin plugin) { - columns = new ArrayList<>(); - setColumns(projectedColumns); - this.maxRecordsToRead = maxRecordsToRead; - this.plugin = plugin; - scanSpec = subScanSpec; - fragmentContext = context; - this.filter = subScanSpec.getFilter(); - this.druidQueryClient = plugin.getDruidQueryClient(); - } - - @Override - protected Collection transformColumns(Collection projectedColumns) { - Set transformed = Sets.newLinkedHashSet(); - if (isStarQuery()) { - transformed.add(SchemaPath.STAR_COLUMN); - } else { - for (SchemaPath column : projectedColumns) { - String fieldName = column.getRootSegment().getPath(); - transformed.add(column); - this.columns.add(fieldName); - } - } - return transformed; - } - - @Override - public void setup(OperatorContext context, OutputMutator output) { - this.writer = new VectorContainerWriter(output); - - this.jsonReader = - new JsonReader.Builder(fragmentContext.getManagedBuffer()) - .schemaPathColumns(ImmutableList.copyOf(getColumns())) - .skipOuterList(true) - .build(); - } - - @Override - public int next() { - writer.allocate(); - writer.reset(); - Stopwatch watch = Stopwatch.createStarted(); - try { - String query = getQuery(); - DruidScanResponse druidScanResponse = druidQueryClient.executeQuery(query); - setNextOffset(druidScanResponse); - - int docCount = 0; - for (ObjectNode eventNode : druidScanResponse.getEvents()) { - writer.setPosition(docCount); - jsonReader.setSource(eventNode); - try { - jsonReader.write(writer); - } catch (IOException e) { - throw UserException - .dataReadError(e) - .message("Failure while reading document") - .addContext("Failed Query", query) - .addContext("Parser was at record", eventNode.toString()) - .addContext(e.getMessage()) - .build(logger); - } - docCount++; - } - - writer.setValueCount(docCount); - logger.debug("Took {} ms to get {} records", watch.elapsed(TimeUnit.MILLISECONDS), docCount); - return docCount; - } catch (Exception e) { - throw UserException - .dataReadError(e) - .message("Failure while executing druid query") - .addContext(e.getMessage()) - .build(logger); - } - } - - private String getQuery() throws JsonProcessingException { - int queryThreshold = - this.maxRecordsToRead >= 0 - ? Math.min(BaseValueVector.INITIAL_VALUE_ALLOCATION, this.maxRecordsToRead) - : BaseValueVector.INITIAL_VALUE_ALLOCATION; - ScanQueryBuilder scanQueryBuilder = plugin.getScanQueryBuilder(); - ScanQuery scanQuery = - scanQueryBuilder.build( - scanSpec.dataSourceName, - this.columns, - this.filter, - this.nextOffset, - queryThreshold, - scanSpec.getMinTime(), - scanSpec.getMaxTime() - ); - return objectMapper.writeValueAsString(scanQuery); - } - - private void setNextOffset(DruidScanResponse druidScanResponse) { - this.nextOffset = this.nextOffset.add(BigInteger.valueOf(druidScanResponse.getEvents().size())); - } - - @Override - public void close() throws Exception { - if (writer != null) { - writer.close(); - } - if (!this.nextOffset.equals(BigInteger.ZERO)) { - this.nextOffset = BigInteger.ZERO; - } - jsonReader = null; - } -} diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanBatchCreator.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanBatchCreator.java old mode 100755 new mode 100644 index 45bac99adf5..de59cd813e2 --- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanBatchCreator.java +++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanBatchCreator.java @@ -15,42 +15,82 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.drill.exec.store.druid; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.common.types.Types; import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.impl.BatchCreator; -import org.apache.drill.exec.physical.impl.ScanBatch; +import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader; +import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework; +import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework.ReaderFactory; +import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework.ScanFrameworkBuilder; +import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator; import org.apache.drill.exec.record.CloseableRecordBatch; import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.store.RecordReader; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; +import org.apache.drill.exec.server.options.OptionManager; +import org.apache.drill.exec.store.druid.DruidSubScan.DruidSubScanSpec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Iterator; import java.util.List; public class DruidScanBatchCreator implements BatchCreator { private static final Logger logger = LoggerFactory.getLogger(DruidScanBatchCreator.class); + @Override - public CloseableRecordBatch getBatch(ExecutorFragmentContext context, DruidSubScan subScan, List children) throws ExecutionSetupException { - Preconditions.checkArgument(children.isEmpty()); - List readers = Lists.newArrayList(); - List columns; - - for (DruidSubScan.DruidSubScanSpec scanSpec : subScan.getScanSpec()) { - try { - columns = subScan.getColumns(); - readers.add(new DruidRecordReader(scanSpec, columns, subScan.getMaxRecordsToRead(), context, subScan.getStorageEngine())); - } catch (Exception ex) { - throw new ExecutionSetupException(ex); + public CloseableRecordBatch getBatch(ExecutorFragmentContext context, + DruidSubScan subScan, + List children) throws ExecutionSetupException { + try { + ScanFrameworkBuilder builder = createBuilder(context.getOptions(), subScan); + return builder.buildScanOperator(context, subScan); + } catch (UserException e) { + throw e; + } + } + + private ScanFrameworkBuilder createBuilder(OptionManager options, DruidSubScan subScan) { + ScanFrameworkBuilder builder = new ScanFrameworkBuilder(); + builder.projection(subScan.getColumns()); + builder.providedSchema(subScan.getSchema()); + builder.setUserName(subScan.getUserName()); + + ReaderFactory readerFactory = new DruidReaderFactory(subScan); + builder.setReaderFactory(readerFactory); + builder.nullType(Types.optional(MinorType.VARCHAR)); + return builder; + } + + private static class DruidReaderFactory implements ReaderFactory { + private final DruidSubScan subScan; + private final DruidOffsetTracker offsetTracker; + private final Iterator scanSpecIterator; + + public DruidReaderFactory(DruidSubScan subScan) { + this.subScan = subScan; + this.scanSpecIterator = subScan.getScanSpec().listIterator(); + this.offsetTracker = new DruidOffsetTracker(); + } + + @Override + public void bind(ManagedScanFramework framework) { + + } + + @Override + public ManagedReader next() { + if (scanSpecIterator.hasNext()) { + DruidSubScanSpec scanSpec = scanSpecIterator.next(); + return new DruidBatchRecordReader(subScan, scanSpec, subScan.getColumns(), subScan.getMaxRecordsToRead(), subScan.getStorageEngine(), offsetTracker); } + return null; } - logger.debug("Number of record readers initialized - {}", readers.size()); - return new ScanBatch(subScan, context, readers); } } diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePlugin.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePlugin.java index f702f1a2904..f066cde6a35 100755 --- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePlugin.java +++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePlugin.java @@ -18,11 +18,17 @@ package org.apache.drill.exec.store.druid; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableSet; import org.apache.calcite.schema.SchemaPlus; import org.apache.drill.common.JSONOptions; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.metastore.MetadataProviderManager; import org.apache.drill.exec.ops.OptimizerRulesContext; +import org.apache.drill.exec.physical.base.AbstractGroupScan; import org.apache.drill.exec.planner.PlannerPhase; import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.server.options.SessionOptionManager; import org.apache.drill.exec.store.AbstractStoragePlugin; import org.apache.drill.exec.store.SchemaConfig; import org.apache.drill.exec.store.StoragePluginOptimizerRule; @@ -32,9 +38,9 @@ import org.apache.drill.exec.store.druid.rest.RestClient; import org.apache.drill.exec.store.druid.rest.RestClientWrapper; import org.apache.drill.exec.store.druid.schema.DruidSchemaFactory; -import com.google.common.collect.ImmutableSet; import java.io.IOException; +import java.util.List; import java.util.Set; public class DruidStoragePlugin extends AbstractStoragePlugin { @@ -57,9 +63,36 @@ public DruidStoragePlugin(DruidStoragePluginConfig pluginConfig, DrillbitContext } @Override - public DruidGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException { - DruidScanSpec scanSpec = selection.getListWith(new TypeReference() {}); - return new DruidGroupScan(userName, this, scanSpec, null, -1); + public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, + SessionOptionManager options) throws IOException { + return getPhysicalScan(userName, selection, AbstractGroupScan.ALL_COLUMNS, + options, null); + } + + + @Override + public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, + SessionOptionManager options, MetadataProviderManager metadataProviderManager) throws IOException { + return getPhysicalScan(userName, selection, AbstractGroupScan.ALL_COLUMNS, + options, metadataProviderManager); + } + + @Override + public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, + List columns) throws IOException { + return getPhysicalScan(userName, selection, columns, null, null); + } + + @Override + public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException { + return getPhysicalScan(userName, selection, AbstractGroupScan.ALL_COLUMNS, null); + } + + @Override + public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List columns, SessionOptionManager options, + MetadataProviderManager metadataProviderManager) throws IOException { + DruidScanSpec scanSpec = selection.getListWith(new ObjectMapper(), new TypeReference() {}); + return new DruidGroupScan(userName, this, scanSpec, null, -1, metadataProviderManager); } @Override diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidSubScan.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidSubScan.java index 05405126141..bdc30ca1bb2 100644 --- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidSubScan.java +++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidSubScan.java @@ -29,6 +29,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.PhysicalVisitor; import org.apache.drill.exec.physical.base.SubScan; +import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.exec.store.StoragePluginRegistry; import org.apache.drill.exec.store.druid.common.DruidFilter; import com.google.common.base.Preconditions; @@ -53,30 +54,36 @@ public class DruidSubScan extends AbstractBase implements SubScan { private final List columns; private final int maxRecordsToRead; + private final TupleMetadata schema; + @JsonCreator public DruidSubScan(@JacksonInject StoragePluginRegistry registry, @JsonProperty("userName") String userName, @JsonProperty("config") StoragePluginConfig config, @JsonProperty("scanSpec") LinkedList datasourceScanSpecList, @JsonProperty("columns") List columns, - @JsonProperty("maxRecordsToRead") int maxRecordsToRead) { + @JsonProperty("maxRecordsToRead") int maxRecordsToRead, + @JsonProperty("schema") TupleMetadata schema) { super(userName); druidStoragePlugin = registry.resolve(config, DruidStoragePlugin.class); this.scanSpec = datasourceScanSpecList; this.columns = columns; this.maxRecordsToRead = maxRecordsToRead; + this.schema = schema; } public DruidSubScan(String userName, DruidStoragePlugin plugin, List dataSourceInfoList, List columns, - int maxRecordsToRead) { + int maxRecordsToRead, + TupleMetadata schema) { super(userName); this.druidStoragePlugin = plugin; this.scanSpec = dataSourceInfoList; this.columns = columns; this.maxRecordsToRead = maxRecordsToRead; + this.schema = schema; } @Override @@ -93,6 +100,10 @@ public List getColumns() { return columns; } + public TupleMetadata getSchema() { + return schema; + } + public int getMaxRecordsToRead() { return maxRecordsToRead; } @JsonIgnore @@ -109,7 +120,7 @@ public DruidStoragePlugin getStorageEngine(){ @Override public PhysicalOperator getNewWithChildren(List children) { Preconditions.checkArgument(children.isEmpty()); - return new DruidSubScan(getUserName(), druidStoragePlugin, scanSpec, columns, maxRecordsToRead); + return new DruidSubScan(getUserName(), druidStoragePlugin, scanSpec, columns, maxRecordsToRead, schema); } @JsonIgnore @@ -131,17 +142,21 @@ public static class DruidSubScanSpec { protected final String maxTime; protected final String minTime; + protected final TupleMetadata schema; + @JsonCreator public DruidSubScanSpec(@JsonProperty("dataSourceName") String dataSourceName, @JsonProperty("filter") DruidFilter filter, @JsonProperty("dataSourceSize") long dataSourceSize, @JsonProperty("minTime") String minTime, - @JsonProperty("maxTime") String maxTime) { + @JsonProperty("maxTime") String maxTime, + @JsonProperty("schema") TupleMetadata schema) { this.dataSourceName = dataSourceName; this.filter = filter; this.dataSourceSize = dataSourceSize; this.minTime = minTime; this.maxTime = maxTime; + this.schema = schema; } public String getDataSourceName() { @@ -158,6 +173,10 @@ public String getMinTime() { return minTime; } + public TupleMetadata getSchema() { + return schema; + } + public String getMaxTime() { return maxTime; } @@ -170,6 +189,7 @@ public String toString() { .field("dataSourceSize", dataSourceSize) .field("minTime", minTime) .field("maxTime", maxTime) + .field("schema", schema) .toString(); } } diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidFilterBuilderTest.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidFilterBuilderTest.java index 9ca7c0ec449..5dcb42bfa8b 100644 --- a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidFilterBuilderTest.java +++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidFilterBuilderTest.java @@ -28,7 +28,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -71,7 +71,7 @@ public void setup() { when(logicalExpression.accept(any(), any())).thenReturn(druidScanSpecRight); } catch (Exception ignored) { } - DruidGroupScan druidGroupScan = new DruidGroupScan("some username", null, druidScanSpecLeft, null, 5); + DruidGroupScan druidGroupScan = new DruidGroupScan("some username", null, druidScanSpecLeft, null, 5, null); druidFilterBuilder = new DruidFilterBuilder(druidGroupScan, logicalExpression); } @@ -80,7 +80,7 @@ public void parseTreeWithAndOfTwoSelectorFilters() { DruidScanSpec parsedSpec = druidFilterBuilder.parseTree(); String expectedFilterJson = "{\"type\":\"and\",\"fields\":[{\"type\":\"selector\",\"dimension\":\"some dimension\",\"value\":\"some value\"},{\"type\":\"selector\",\"dimension\":\"some other dimension\",\"value\":\"some other value\"}]}"; String actual = parsedSpec.getFilter().toJson(); - assertThat(actual).isEqualTo(expectedFilterJson); + assertEquals(expectedFilterJson, actual); } @Test @@ -99,7 +99,7 @@ public void visitBooleanOperatorWithAndOperator() { druidFilterBuilder.visitBooleanOperator(booleanOperator, null); String expectedFilterJson = "{\"type\":\"and\",\"fields\":[{\"type\":\"selector\",\"dimension\":\"some dimension\",\"value\":\"some value\"},{\"type\":\"selector\",\"dimension\":\"some other dimension\",\"value\":\"some other value\"}]}"; String actual = druidScanSpec.getFilter().toJson(); - assertThat(actual).isEqualTo(expectedFilterJson); + assertEquals(expectedFilterJson, actual); } @Test @@ -118,6 +118,6 @@ public void visitBooleanOperatorWithOrOperator() { druidFilterBuilder.visitBooleanOperator(booleanOperator, null); String expectedFilterJson = "{\"type\":\"or\",\"fields\":[{\"type\":\"selector\",\"dimension\":\"some dimension\",\"value\":\"some value\"},{\"type\":\"selector\",\"dimension\":\"some other dimension\",\"value\":\"some other value\"}]}"; String actual = druidScanSpec.getFilter().toJson(); - assertThat(actual).isEqualTo(expectedFilterJson); + assertEquals(actual, expectedFilterJson); } } diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidScanSpecBuilderTest.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidScanSpecBuilderTest.java index 6c8a4f06ce4..c2adbe80b7e 100644 --- a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidScanSpecBuilderTest.java +++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidScanSpecBuilderTest.java @@ -22,8 +22,9 @@ import org.apache.drill.exec.store.druid.common.DruidConstants; import org.junit.Before; import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; -import static org.assertj.core.api.Assertions.assertThat; public class DruidScanSpecBuilderTest { @@ -54,8 +55,8 @@ public void buildCalledWithEqualFxShouldBuildSelectorFilter() { FunctionNames.EQ, schemaPath, SOME_VALUE); - - assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"type\":\"selector\",\"dimension\":\"some field\",\"value\":\"some value\"}"); + String actual = "{\"type\":\"selector\",\"dimension\":\"some field\",\"value\":\"some value\"}"; + assertEquals(druidScanSpec.getFilter().toJson(), actual); } @Test @@ -70,8 +71,8 @@ public void buildCalledWithEqualFxIntervalFieldShouldBuildIntervalFilter() { FunctionNames.EQ, schemaPath, SOME_VALUE); - - assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"eventInterval\":\"some value\"}"); + String actual = "{\"eventInterval\":\"some value\"}"; + assertEquals(druidScanSpec.getFilter().toJson(), actual); } @Test @@ -86,8 +87,8 @@ public void buildCalledWithNotEqualFxShouldBuildSelectorFilter() { FunctionNames.NE, schemaPath, SOME_VALUE ); - - assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"type\":\"not\",\"field\":{\"type\":\"selector\",\"dimension\":\"some field\",\"value\":\"some value\"}}"); + String actual = "{\"type\":\"not\",\"field\":{\"type\":\"selector\",\"dimension\":\"some field\",\"value\":\"some value\"}}"; + assertEquals(druidScanSpec.getFilter().toJson(), actual); } @Test @@ -103,8 +104,8 @@ public void buildCalledWithGreaterThanOrEqualToFxShouldBuildBoundFilter() { schemaPath, SOME_VALUE ); - - assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"type\":\"bound\",\"dimension\":\"some field\",\"lower\":\"some value\",\"ordering\":\"lexicographic\"}"); + String actual = "{\"type\":\"bound\",\"dimension\":\"some field\",\"lower\":\"some value\",\"ordering\":\"lexicographic\"}"; + assertEquals(druidScanSpec.getFilter().toJson(), actual); } @Test @@ -120,8 +121,8 @@ public void buildCalledWithGreaterThanFxShouldBuildBoundFilter() { schemaPath, SOME_VALUE ); - - assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"type\":\"bound\",\"dimension\":\"some field\",\"lower\":\"some value\",\"lowerStrict\":true,\"ordering\":\"lexicographic\"}"); + String actual = "{\"type\":\"bound\",\"dimension\":\"some field\",\"lower\":\"some value\",\"lowerStrict\":true,\"ordering\":\"lexicographic\"}"; + assertEquals(druidScanSpec.getFilter().toJson(), actual); } @Test @@ -138,7 +139,8 @@ public void buildCalledWithGreaterThanFxAndNumericValueShouldBuildBoundFilter() "1" ); - assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"type\":\"bound\",\"dimension\":\"some field\",\"lower\":\"1\",\"lowerStrict\":true,\"ordering\":\"numeric\"}"); + String actual = "{\"type\":\"bound\",\"dimension\":\"some field\",\"lower\":\"1\",\"lowerStrict\":true,\"ordering\":\"numeric\"}"; + assertEquals(druidScanSpec.getFilter().toJson(), actual); } @Test @@ -154,7 +156,8 @@ public void buildCalledWithLessThanOrEqualToFxShouldBuildBoundFilter() { schemaPath, SOME_VALUE); - assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"type\":\"bound\",\"dimension\":\"some field\",\"upper\":\"some value\",\"ordering\":\"lexicographic\"}"); + String actual = "{\"type\":\"bound\",\"dimension\":\"some field\",\"upper\":\"some value\",\"ordering\":\"lexicographic\"}"; + assertEquals(druidScanSpec.getFilter().toJson(), actual); } @Test @@ -169,7 +172,8 @@ public void buildCalledWithLessThanFxShouldBuildBoundFilter() { schemaPath, SOME_VALUE); - assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"type\":\"bound\",\"dimension\":\"some field\",\"upper\":\"some value\",\"upperStrict\":true,\"ordering\":\"lexicographic\"}"); + String actual = "{\"type\":\"bound\",\"dimension\":\"some field\",\"upper\":\"some value\",\"upperStrict\":true,\"ordering\":\"lexicographic\"}"; + assertEquals(druidScanSpec.getFilter().toJson(), actual); } @Test @@ -184,7 +188,8 @@ public void buildCalledWithLessThanFxAndNumericValueShouldBuildBoundFilter() { schemaPath, "1"); - assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"type\":\"bound\",\"dimension\":\"some field\",\"upper\":\"1\",\"upperStrict\":true,\"ordering\":\"numeric\"}"); + String actual = "{\"type\":\"bound\",\"dimension\":\"some field\",\"upper\":\"1\",\"upperStrict\":true,\"ordering\":\"numeric\"}"; + assertEquals(druidScanSpec.getFilter().toJson(), actual); } @Test @@ -199,8 +204,9 @@ public void buildCalledWithIsNullFxShouldBuildSelectorFilter() { FunctionNames.IS_NULL, schemaPath, null); - assertThat(druidScanSpec).isNotNull(); - assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"type\":\"selector\",\"dimension\":\"some field\",\"value\":null}"); + assertNotNull(druidScanSpec); + String actual = "{\"type\":\"selector\",\"dimension\":\"some field\",\"value\":null}"; + assertEquals(druidScanSpec.getFilter().toJson(), actual); } @Test @@ -215,8 +221,9 @@ public void buildCalledWithIsNotNullFxShouldBuildSelectorFilter() { FunctionNames.IS_NOT_NULL, schemaPath, null); - assertThat(druidScanSpec).isNotNull(); - assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"type\":\"not\",\"field\":{\"type\":\"selector\",\"dimension\":\"some field\",\"value\":null}}"); + assertNotNull(druidScanSpec); + String actual = "{\"type\":\"not\",\"field\":{\"type\":\"selector\",\"dimension\":\"some field\",\"value\":null}}"; + assertEquals(druidScanSpec.getFilter().toJson(), actual); } @Test @@ -232,7 +239,8 @@ public void buildCalledWithLikeFxButIfValueIsPrefixedWithRegexKeywordHintShouldB schemaPath, "$regex$_some_regular_expression"); - assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"type\":\"regex\",\"dimension\":\"some field\",\"pattern\":\"some_regular_expression\"}"); + String actual = "{\"type\":\"regex\",\"dimension\":\"some field\",\"pattern\":\"some_regular_expression\"}"; + assertEquals(druidScanSpec.getFilter().toJson(), actual); } @Test @@ -247,7 +255,7 @@ public void buildCalledWithLikeFxShouldBuildSearchFilter() { FunctionNames.LIKE, schemaPath, "some search string"); - - assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"type\":\"search\",\"dimension\":\"some field\",\"query\":{\"type\":\"contains\",\"value\":\"some search string\",\"caseSensitive\":false}}"); + String actual = "{\"type\":\"search\",\"dimension\":\"some field\",\"query\":{\"type\":\"contains\",\"value\":\"some search string\",\"caseSensitive\":false}}"; + assertEquals(druidScanSpec.getFilter().toJson(), actual); } } diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfigTest.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfigTest.java index dd76a64ed37..027f80c8ee6 100644 --- a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfigTest.java +++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfigTest.java @@ -28,7 +28,9 @@ import java.io.IOException; import java.net.URISyntaxException; -import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; public class DruidStoragePluginConfigTest { @@ -40,11 +42,11 @@ public void testDruidStoragePluginConfigSuccessfullyParsed() Resources.getResource("bootstrap-storage-plugins.json").toURI())); DruidStoragePluginConfig druidStoragePluginConfig = mapper.treeToValue(storagePluginJson.get("storage").get("druid"), DruidStoragePluginConfig.class); - assertThat(druidStoragePluginConfig).isNotNull(); - assertThat(druidStoragePluginConfig.getBrokerAddress()).isEqualTo("http://localhost:8082"); - assertThat(druidStoragePluginConfig.getCoordinatorAddress()).isEqualTo("http://localhost:8081"); - assertThat(druidStoragePluginConfig.getAverageRowSizeBytes()).isEqualTo(200); - assertThat(druidStoragePluginConfig.isEnabled()).isFalse(); + assertNotNull(druidStoragePluginConfig); + assertEquals("http://localhost:8082", druidStoragePluginConfig.getBrokerAddress()); + assertEquals("http://localhost:8081", druidStoragePluginConfig.getCoordinatorAddress()); + assertEquals(200, druidStoragePluginConfig.getAverageRowSizeBytes()); + assertFalse(druidStoragePluginConfig.isEnabled()); } @Test @@ -59,6 +61,6 @@ public void testDefaultRowSizeUsedWhenNotProvidedInConfig() JsonNode storagePluginJson = mapper.readTree(druidConfigStr); DruidStoragePluginConfig druidStoragePluginConfig = mapper.treeToValue(storagePluginJson.get("storage").get("druid"), DruidStoragePluginConfig.class); - assertThat(druidStoragePluginConfig.getAverageRowSizeBytes()).isEqualTo(100); + assertEquals(100, druidStoragePluginConfig.getAverageRowSizeBytes()); } } diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestConstants.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestConstants.java index 78c8de00e67..24ac6dfd6ef 100644 --- a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestConstants.java +++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestConstants.java @@ -25,4 +25,5 @@ public interface DruidTestConstants { String TEST_STRING_TWO_OR_EQUALS_FILTER_QUERY_TEMPLATE1 = "select * from druid.`%s` as ds where ds.user = 'Dansker' OR ds.page = '1904'"; String TEST_QUERY_PROJECT_PUSH_DOWN_TEMPLATE_1 = "SELECT ds.`comment` FROM druid.`%s` as ds"; String TEST_QUERY_COUNT_QUERY_TEMPLATE = "SELECT count(*) as mycount FROM druid.`%s` as ds"; + String TEST_STAR_QUERY = "SELECT * FROM druid.`%s`"; } diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDruidQueries.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDruidQueries.java index deea3f1edaf..45f2267dd1e 100644 --- a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDruidQueries.java +++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDruidQueries.java @@ -20,14 +20,32 @@ import org.apache.drill.categories.DruidStorageTest; import org.apache.drill.categories.SlowTest; +import org.apache.drill.common.types.TypeProtos.DataMode; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.physical.rowSet.RowSet; +import org.apache.drill.exec.record.metadata.SchemaBuilder; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.test.rowSet.RowSetComparison; import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; +import static org.junit.Assert.assertEquals; + + @Ignore("These tests require a running druid instance. You may start druid by using the docker-compose provide in resources/druid and enable these tests") @Category({SlowTest.class, DruidStorageTest.class}) public class TestDruidQueries extends DruidTestBase { + @Test + public void testStarQuery() throws Exception { + testBuilder() + .sqlQuery(String.format(TEST_STAR_QUERY, TEST_DATASOURCE_WIKIPEDIA)) + .unOrdered() + .expectsNumRecords(876) + .go(); + } + @Test public void testEqualsFilter() throws Exception { testBuilder() @@ -51,7 +69,7 @@ public void testTwoOrdEqualsFilter() throws Exception { testBuilder() .sqlQuery(String.format(TEST_STRING_TWO_OR_EQUALS_FILTER_QUERY_TEMPLATE1, TEST_DATASOURCE_WIKIPEDIA)) .unOrdered() - .expectsNumRecords(3) + .expectsNumRecords(1) .go(); } @@ -63,7 +81,7 @@ public void testSingleColumnProject() throws Exception { .sqlQuery(query) .unOrdered() .baselineColumns("comment") - .expectsNumRecords(24433) + .expectsNumRecords(876) .go(); } @@ -75,7 +93,38 @@ public void testCountAllRowsQuery() throws Exception { .sqlQuery(query) .unOrdered() .baselineColumns("mycount") - .baselineValues(24433L) + .baselineValues(876L) .go(); } + + @Test + public void testGroupByQuery() throws Exception { + String sql = String.format("SELECT `namespace`, COUNT(*) AS user_count FROM druid.`%s` GROUP BY `namespace` ORDER BY user_count DESC LIMIT 5",TEST_DATASOURCE_WIKIPEDIA); + RowSet results = client.queryBuilder().sql(sql).rowSet(); + + TupleMetadata expectedSchema = new SchemaBuilder() + .add("namespace", MinorType.VARCHAR, DataMode.OPTIONAL) + .add("user_count", MinorType.BIGINT) + .buildSchema(); + + RowSet expected = client.rowSetBuilder(expectedSchema) + .addRow("Main", 702) + .addRow("User talk", 29) + .addRow("Wikipedia", 26) + .addRow("Talk", 17) + .addRow("User", 12) + .build(); + + new RowSetComparison(expected).verifyAndClearAll(results); + } + + @Test + public void testSerDe() throws Exception { + // TODO Start here... filters are not deserializing properly + + String sql = String.format("SELECT COUNT(*) FROM druid.`%s`", TEST_DATASOURCE_WIKIPEDIA); + String plan = queryBuilder().sql(sql).explainJson(); + long cnt = queryBuilder().physical(plan).singletonLong(); + assertEquals("Counts should match", 876L, cnt); + } } diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/rest/DruidQueryClientTest.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/rest/DruidQueryClientTest.java index bb065ab7b31..7818ee33905 100644 --- a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/rest/DruidQueryClientTest.java +++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/rest/DruidQueryClientTest.java @@ -17,23 +17,23 @@ */ package org.apache.drill.exec.store.druid.rest; +import okhttp3.Response; +import okhttp3.ResponseBody; import org.apache.drill.exec.store.druid.druid.DruidScanResponse; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; -import okhttp3.Response; -import okhttp3.ResponseBody; - import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import static org.assertj.core.api.Assertions.assertThat; public class DruidQueryClientTest { @@ -79,7 +79,7 @@ public void executeQueryCalledNoResponsesFoundReturnsEmptyEventList() when(httpResponseBody.byteStream()).thenReturn(inputStream); DruidScanResponse response = druidQueryClient.executeQuery(QUERY); - assertThat(response.getEvents()).isEmpty(); + assertEquals(0, response.getEvents().size()); } @Test @@ -91,9 +91,9 @@ public void executeQueryCalledSuccessfullyParseQueryResults() when(httpResponseBody.byteStream()).thenReturn(inputStream); DruidScanResponse response = druidQueryClient.executeQuery(QUERY); - assertThat(response.getEvents()).isNotEmpty(); - assertThat(response.getEvents().size()).isEqualTo(1); - assertThat(response.getEvents().get(0).get("user").textValue()).isEqualTo("Dansker"); - assertThat(response.getEvents().get(0).get("sum_deleted").intValue()).isEqualTo(133); + assertFalse(response.getEvents().isEmpty()); + assertEquals(response.getEvents().size(), 1); + assertEquals(response.getEvents().get(0).get("user").textValue(), "Dansker"); + assertEquals(response.getEvents().get(0).get("sum_deleted").intValue(), 133); } } diff --git a/contrib/storage-druid/src/test/resources/druid/docker-compose.yaml b/contrib/storage-druid/src/test/resources/druid/docker-compose.yaml index 700e9704dd8..53cca236965 100644 --- a/contrib/storage-druid/src/test/resources/druid/docker-compose.yaml +++ b/contrib/storage-druid/src/test/resources/druid/docker-compose.yaml @@ -31,7 +31,7 @@ volumes: services: postgres: container_name: postgres - image: postgres:latest + image: postgres:12 volumes: - metadata_data:/var/lib/postgresql/data environment: @@ -65,7 +65,7 @@ services: - environment.env broker: - image: apache/druid:0.22.0 + image: apache/druid:30.0.0 container_name: broker volumes: - broker_var:/opt/druid/var @@ -81,7 +81,7 @@ services: - environment.env historical: - image: apache/druid:0.22.0 + image: apache/druid:30.0.0 container_name: historical volumes: - druid_shared:/opt/shared @@ -98,7 +98,7 @@ services: - environment.env middlemanager: - image: apache/druid:0.22.0 + image: apache/druid:30.0.0 container_name: middlemanager volumes: - druid_shared:/opt/shared @@ -116,7 +116,7 @@ services: - environment.env router: - image: apache/druid:0.22.0 + image: apache/druid:30.0.0 container_name: router volumes: - router_var:/opt/druid/var