diff --git a/contrib/storage-druid/.gitignore b/contrib/storage-druid/.gitignore
new file mode 100644
index 00000000000..9341ff44dc5
--- /dev/null
+++ b/contrib/storage-druid/.gitignore
@@ -0,0 +1,2 @@
+# Directory to store oauth tokens for testing Googlesheets Storage plugin
+/src/test/resources/logback-test.xml
diff --git a/contrib/storage-druid/README.md b/contrib/storage-druid/README.md
index 479024aa067..5b88b285a05 100644
--- a/contrib/storage-druid/README.md
+++ b/contrib/storage-druid/README.md
@@ -4,7 +4,7 @@ Drill druid storage plugin allows you to perform SQL queries against Druid datas
This storage plugin is part of [Apache Drill](https://github.com/apache/drill)
### Tested with Druid version
-[0.22.0](https://github.com/apache/druid/releases/tag/druid-0.22.0)
+[30.0.0](https://github.com/apache/druid/releases/tag/druid-0.22.0)
### Druid API
@@ -33,27 +33,27 @@ Following is the default registration configuration.
### Druid storage plugin developer notes.
-* Building the plugin
+* Building the plugin
`mvn install -pl contrib/storage-druid`
* Building DRILL
`mvn clean install -DskipTests`
-
+
* Start Drill In Embedded Mode (mac)
```shell script
distribution/target/apache-drill-1.20.0-SNAPSHOT/apache-drill-1.20.0-SNAPSHOT/bin/drill-embedded
```
-
+
* Starting Druid (Docker and Docker Compose required)
```
cd contrib/storage-druid/src/test/resources/druid
docker-compose up -d
```
-
+
* There is an `Indexing Task Json` in the same folder as the docker compose file. It can be used to ingest the wikipedia datasource.
-
+
* Make sure the druid storage plugin is enabled in Drill.
diff --git a/contrib/storage-druid/pom.xml b/contrib/storage-druid/pom.xml
index 9a9d8175615..54cf88e887c 100755
--- a/contrib/storage-druid/pom.xml
+++ b/contrib/storage-druid/pom.xml
@@ -29,7 +29,7 @@
drill-druid-storage
Drill : Contrib : Storage : Druid
- **/DruidTestSuit.class
+ **/DruidTestSuite.class
@@ -53,13 +53,6 @@
${project.version}
test
-
- org.assertj
- assertj-core
-
- 3.11.1
- test
-
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidBatchRecordReader.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidBatchRecordReader.java
new file mode 100644
index 00000000000..d9068954f62
--- /dev/null
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidBatchRecordReader.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.store.druid.DruidSubScan.DruidSubScanSpec;
+import org.apache.drill.exec.store.druid.common.DruidFilter;
+import org.apache.drill.exec.store.druid.druid.DruidScanResponse;
+import org.apache.drill.exec.store.druid.druid.ScanQuery;
+import org.apache.drill.exec.store.druid.druid.ScanQueryBuilder;
+import org.apache.drill.exec.store.druid.rest.DruidQueryClient;
+import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl;
+import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+
+public class DruidBatchRecordReader implements ManagedReader {
+ private static final Logger logger = LoggerFactory.getLogger(DruidBatchRecordReader.class);
+ private static final int BATCH_SIZE = 4096;
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+ private final DruidStoragePlugin plugin;
+ private final DruidSubScan.DruidSubScanSpec scanSpec;
+ private final List columns;
+ private final DruidFilter filter;
+ private final DruidQueryClient druidQueryClient;
+ private final DruidOffsetTracker offsetTracker;
+ private int maxRecordsToRead = -1;
+ private JsonLoaderBuilder jsonBuilder;
+ private JsonLoaderImpl jsonLoader;
+ private SchemaNegotiator negotiator;
+ private ResultSetLoader resultSetLoader;
+ private CustomErrorContext errorContext;
+
+
+ public DruidBatchRecordReader(DruidSubScan subScan,
+ DruidSubScanSpec subScanSpec,
+ List projectedColumns,
+ int maxRecordsToRead,
+ DruidStoragePlugin plugin, DruidOffsetTracker offsetTracker) {
+ this.columns = new ArrayList<>();
+ this.maxRecordsToRead = maxRecordsToRead;
+ this.plugin = plugin;
+ this.scanSpec = subScanSpec;
+ this.filter = subScanSpec.getFilter();
+ this.druidQueryClient = plugin.getDruidQueryClient();
+ this.offsetTracker = offsetTracker;
+ }
+
+ @Override
+ public boolean open(SchemaNegotiator negotiator) {
+ this.negotiator = negotiator;
+ this.errorContext = this.negotiator.parentErrorContext();
+ this.negotiator.batchSize(BATCH_SIZE);
+ this.negotiator.setErrorContext(errorContext);
+
+ resultSetLoader = this.negotiator.build();
+
+
+ return true;
+ }
+
+ @Override
+ public boolean next() {
+ jsonBuilder = new JsonLoaderBuilder()
+ .resultSetLoader(resultSetLoader)
+ .standardOptions(negotiator.queryOptions())
+ .errorContext(errorContext);
+ int eventCounter = 0;
+ boolean result = false;
+ try {
+ String query = getQuery();
+ logger.debug("Executing query: {}", query);
+ DruidScanResponse druidScanResponse = druidQueryClient.executeQuery(query);
+ setNextOffset(druidScanResponse);
+
+ StringBuilder events = new StringBuilder();
+ for (ObjectNode eventNode : druidScanResponse.getEvents()) {
+ events.append(eventNode);
+ events.append("\n");
+ eventCounter++;
+ }
+
+
+ jsonLoader = (JsonLoaderImpl) jsonBuilder
+ .fromString(events.toString())
+ .build();
+
+ result = jsonLoader.readBatch();
+
+ if (eventCounter < BATCH_SIZE) {
+ return false;
+ } else {
+ return result;
+ }
+ } catch (Exception e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Failure while executing druid query: " + e.getMessage())
+ .addContext(errorContext)
+ .build(logger);
+ }
+ }
+
+ @Override
+ public void close() {
+ if (jsonLoader != null) {
+ jsonLoader.close();
+ jsonLoader = null;
+ }
+ }
+
+ private String getQuery() throws JsonProcessingException {
+ int queryThreshold =
+ maxRecordsToRead >= 0
+ ? Math.min(BATCH_SIZE, maxRecordsToRead)
+ : BATCH_SIZE;
+ ScanQueryBuilder scanQueryBuilder = plugin.getScanQueryBuilder();
+ ScanQuery scanQuery =
+ scanQueryBuilder.build(
+ scanSpec.dataSourceName,
+ columns,
+ filter,
+ offsetTracker.getOffset(),
+ queryThreshold,
+ scanSpec.getMinTime(),
+ scanSpec.getMaxTime()
+ );
+ return objectMapper.writeValueAsString(scanQuery);
+ }
+
+ private void setNextOffset(DruidScanResponse druidScanResponse) {
+ offsetTracker.setNextOffset(BigInteger.valueOf(druidScanResponse.getEvents().size()));
+ }
+}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidGroupScan.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidGroupScan.java
index 24dce99da6b..55b7bd9871e 100755
--- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidGroupScan.java
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidGroupScan.java
@@ -25,12 +25,14 @@
import org.apache.drill.common.PlanStringBuilder;
import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.metastore.MetadataProviderManager;
import org.apache.drill.exec.physical.EndpointAffinity;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.schedule.AffinityCreator;
@@ -44,6 +46,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -54,7 +57,7 @@ public class DruidGroupScan extends AbstractGroupScan {
private static final long DEFAULT_TABLET_SIZE = 1000;
private final DruidScanSpec scanSpec;
private final DruidStoragePlugin storagePlugin;
-
+ private final MetadataProviderManager metadataProviderManager;
private List columns;
private boolean filterPushedDown = false;
private int maxRecordsToRead;
@@ -73,19 +76,20 @@ public DruidGroupScan(@JsonProperty("userName") String userName,
pluginRegistry.resolve(storagePluginConfig, DruidStoragePlugin.class),
scanSpec,
columns,
- maxRecordsToRead);
+ maxRecordsToRead, null);
}
public DruidGroupScan(String userName,
DruidStoragePlugin storagePlugin,
DruidScanSpec scanSpec,
List columns,
- int maxRecordsToRead) {
+ int maxRecordsToRead, MetadataProviderManager metadataProviderManager) {
super(userName);
this.storagePlugin = storagePlugin;
this.scanSpec = scanSpec;
this.columns = columns == null || columns.size() == 0? ALL_COLUMNS : columns;
this.maxRecordsToRead = maxRecordsToRead;
+ this.metadataProviderManager = metadataProviderManager;
init();
}
@@ -102,6 +106,7 @@ private DruidGroupScan(DruidGroupScan that) {
this.filterPushedDown = that.filterPushedDown;
this.druidWorkList = that.druidWorkList;
this.assignments = that.assignments;
+ this.metadataProviderManager = that.metadataProviderManager;
}
@Override
@@ -163,7 +168,8 @@ private void init() {
getScanSpec().getFilter(),
getDatasourceSize(),
getDataSourceMinTime(),
- getDataSourceMaxTime()
+ getDataSourceMaxTime(),
+ getSchema()
)
);
druidWorkList.add(druidWork);
@@ -225,12 +231,13 @@ public DruidSubScan getSpecificScan(int minorFragmentId) {
druidWork.getDruidSubScanSpec().getFilter(),
druidWork.getDruidSubScanSpec().getDataSourceSize(),
druidWork.getDruidSubScanSpec().getMinTime(),
- druidWork.getDruidSubScanSpec().getMaxTime()
+ druidWork.getDruidSubScanSpec().getMaxTime(),
+ druidWork.getDruidSubScanSpec().getSchema()
)
);
}
- return new DruidSubScan(getUserName(), storagePlugin, scanSpecList, this.columns, this.maxRecordsToRead);
+ return new DruidSubScan(getUserName(), storagePlugin, scanSpecList, this.columns, this.maxRecordsToRead, getSchema());
}
@JsonIgnore
@@ -283,13 +290,30 @@ public int getMaxRecordsToRead() {
return maxRecordsToRead;
}
+ @JsonIgnore
+ public MetadataProviderManager getMetadataProviderManager() {
+ return metadataProviderManager;
+ }
+
+ public TupleMetadata getSchema() {
+ if (metadataProviderManager == null) {
+ return null;
+ }
+ try {
+ return metadataProviderManager.getSchemaProvider().read().getSchema();
+ } catch (IOException | NullPointerException e) {
+ return null;
+ }
+ }
+
@Override
public String toString() {
return new PlanStringBuilder(this)
- .field("druidScanSpec", scanSpec)
- .field("columns", columns)
- .field("druidStoragePlugin", storagePlugin)
- .toString();
+ .field("druidScanSpec", scanSpec)
+ .field("columns", columns)
+ .field("druidStoragePlugin", storagePlugin)
+ .field("schema", getSchema())
+ .toString();
}
@Override
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidOffsetTracker.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidOffsetTracker.java
new file mode 100644
index 00000000000..16604f0b494
--- /dev/null
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidOffsetTracker.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+
+public class DruidOffsetTracker {
+ private static final Logger logger = LoggerFactory.getLogger(DruidOffsetTracker.class);
+ private BigInteger nextOffset;
+
+ public DruidOffsetTracker() {
+ this.nextOffset = BigInteger.ZERO;
+ }
+
+ public BigInteger getOffset() {
+ return nextOffset;
+ }
+
+ public void setNextOffset(BigInteger offset) {
+ nextOffset = nextOffset.add(offset);
+ logger.debug("Incrementing offset by {}", offset);
+ }
+}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidPushDownFilterForScan.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidPushDownFilterForScan.java
index ba5ab9dc3b8..3b31f5bc307 100644
--- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidPushDownFilterForScan.java
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidPushDownFilterForScan.java
@@ -70,7 +70,8 @@ public void onMatch(RelOptRuleCall relOptRuleCall) {
groupScan.getStoragePlugin(),
newScanSpec,
groupScan.getColumns(),
- groupScan.getMaxRecordsToRead());
+ groupScan.getMaxRecordsToRead(),
+ groupScan.getMetadataProviderManager());
newGroupsScan.setFilterPushedDown(true);
ScanPrel newScanPrel = scan.copy(filter.getTraitSet(), newGroupsScan, filter.getRowType());
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidRecordReader.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidRecordReader.java
deleted file mode 100755
index 5c437c3e681..00000000000
--- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidRecordReader.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.druid;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.util.JacksonUtils;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.store.AbstractRecordReader;
-import org.apache.drill.exec.store.druid.common.DruidFilter;
-import org.apache.drill.exec.store.druid.druid.DruidScanResponse;
-import org.apache.drill.exec.store.druid.druid.ScanQuery;
-import org.apache.drill.exec.store.druid.druid.ScanQueryBuilder;
-import org.apache.drill.exec.store.druid.rest.DruidQueryClient;
-import org.apache.drill.exec.vector.BaseValueVector;
-import org.apache.drill.exec.vector.complex.fn.JsonReader;
-import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Sets;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-public class DruidRecordReader extends AbstractRecordReader {
-
- private static final Logger logger = LoggerFactory.getLogger(DruidRecordReader.class);
- private static final ObjectMapper objectMapper = JacksonUtils.createObjectMapper();
- private final DruidStoragePlugin plugin;
- private final DruidSubScan.DruidSubScanSpec scanSpec;
- private final List columns;
- private final DruidFilter filter;
- private BigInteger nextOffset = BigInteger.ZERO;
- private int maxRecordsToRead = -1;
-
- private JsonReader jsonReader;
- private VectorContainerWriter writer;
-
- private final FragmentContext fragmentContext;
- private final DruidQueryClient druidQueryClient;
-
- public DruidRecordReader(DruidSubScan.DruidSubScanSpec subScanSpec,
- List projectedColumns,
- int maxRecordsToRead,
- FragmentContext context,
- DruidStoragePlugin plugin) {
- columns = new ArrayList<>();
- setColumns(projectedColumns);
- this.maxRecordsToRead = maxRecordsToRead;
- this.plugin = plugin;
- scanSpec = subScanSpec;
- fragmentContext = context;
- this.filter = subScanSpec.getFilter();
- this.druidQueryClient = plugin.getDruidQueryClient();
- }
-
- @Override
- protected Collection transformColumns(Collection projectedColumns) {
- Set transformed = Sets.newLinkedHashSet();
- if (isStarQuery()) {
- transformed.add(SchemaPath.STAR_COLUMN);
- } else {
- for (SchemaPath column : projectedColumns) {
- String fieldName = column.getRootSegment().getPath();
- transformed.add(column);
- this.columns.add(fieldName);
- }
- }
- return transformed;
- }
-
- @Override
- public void setup(OperatorContext context, OutputMutator output) {
- this.writer = new VectorContainerWriter(output);
-
- this.jsonReader =
- new JsonReader.Builder(fragmentContext.getManagedBuffer())
- .schemaPathColumns(ImmutableList.copyOf(getColumns()))
- .skipOuterList(true)
- .build();
- }
-
- @Override
- public int next() {
- writer.allocate();
- writer.reset();
- Stopwatch watch = Stopwatch.createStarted();
- try {
- String query = getQuery();
- DruidScanResponse druidScanResponse = druidQueryClient.executeQuery(query);
- setNextOffset(druidScanResponse);
-
- int docCount = 0;
- for (ObjectNode eventNode : druidScanResponse.getEvents()) {
- writer.setPosition(docCount);
- jsonReader.setSource(eventNode);
- try {
- jsonReader.write(writer);
- } catch (IOException e) {
- throw UserException
- .dataReadError(e)
- .message("Failure while reading document")
- .addContext("Failed Query", query)
- .addContext("Parser was at record", eventNode.toString())
- .addContext(e.getMessage())
- .build(logger);
- }
- docCount++;
- }
-
- writer.setValueCount(docCount);
- logger.debug("Took {} ms to get {} records", watch.elapsed(TimeUnit.MILLISECONDS), docCount);
- return docCount;
- } catch (Exception e) {
- throw UserException
- .dataReadError(e)
- .message("Failure while executing druid query")
- .addContext(e.getMessage())
- .build(logger);
- }
- }
-
- private String getQuery() throws JsonProcessingException {
- int queryThreshold =
- this.maxRecordsToRead >= 0
- ? Math.min(BaseValueVector.INITIAL_VALUE_ALLOCATION, this.maxRecordsToRead)
- : BaseValueVector.INITIAL_VALUE_ALLOCATION;
- ScanQueryBuilder scanQueryBuilder = plugin.getScanQueryBuilder();
- ScanQuery scanQuery =
- scanQueryBuilder.build(
- scanSpec.dataSourceName,
- this.columns,
- this.filter,
- this.nextOffset,
- queryThreshold,
- scanSpec.getMinTime(),
- scanSpec.getMaxTime()
- );
- return objectMapper.writeValueAsString(scanQuery);
- }
-
- private void setNextOffset(DruidScanResponse druidScanResponse) {
- this.nextOffset = this.nextOffset.add(BigInteger.valueOf(druidScanResponse.getEvents().size()));
- }
-
- @Override
- public void close() throws Exception {
- if (writer != null) {
- writer.close();
- }
- if (!this.nextOffset.equals(BigInteger.ZERO)) {
- this.nextOffset = BigInteger.ZERO;
- }
- jsonReader = null;
- }
-}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanBatchCreator.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanBatchCreator.java
old mode 100755
new mode 100644
index 45bac99adf5..de59cd813e2
--- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanBatchCreator.java
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanBatchCreator.java
@@ -15,42 +15,82 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.drill.exec.store.druid;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.impl.BatchCreator;
-import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework.ReaderFactory;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework.ScanFrameworkBuilder;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
import org.apache.drill.exec.record.CloseableRecordBatch;
import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.store.RecordReader;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.druid.DruidSubScan.DruidSubScanSpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Iterator;
import java.util.List;
public class DruidScanBatchCreator implements BatchCreator {
private static final Logger logger = LoggerFactory.getLogger(DruidScanBatchCreator.class);
+
@Override
- public CloseableRecordBatch getBatch(ExecutorFragmentContext context, DruidSubScan subScan, List children) throws ExecutionSetupException {
- Preconditions.checkArgument(children.isEmpty());
- List readers = Lists.newArrayList();
- List columns;
-
- for (DruidSubScan.DruidSubScanSpec scanSpec : subScan.getScanSpec()) {
- try {
- columns = subScan.getColumns();
- readers.add(new DruidRecordReader(scanSpec, columns, subScan.getMaxRecordsToRead(), context, subScan.getStorageEngine()));
- } catch (Exception ex) {
- throw new ExecutionSetupException(ex);
+ public CloseableRecordBatch getBatch(ExecutorFragmentContext context,
+ DruidSubScan subScan,
+ List children) throws ExecutionSetupException {
+ try {
+ ScanFrameworkBuilder builder = createBuilder(context.getOptions(), subScan);
+ return builder.buildScanOperator(context, subScan);
+ } catch (UserException e) {
+ throw e;
+ }
+ }
+
+ private ScanFrameworkBuilder createBuilder(OptionManager options, DruidSubScan subScan) {
+ ScanFrameworkBuilder builder = new ScanFrameworkBuilder();
+ builder.projection(subScan.getColumns());
+ builder.providedSchema(subScan.getSchema());
+ builder.setUserName(subScan.getUserName());
+
+ ReaderFactory readerFactory = new DruidReaderFactory(subScan);
+ builder.setReaderFactory(readerFactory);
+ builder.nullType(Types.optional(MinorType.VARCHAR));
+ return builder;
+ }
+
+ private static class DruidReaderFactory implements ReaderFactory {
+ private final DruidSubScan subScan;
+ private final DruidOffsetTracker offsetTracker;
+ private final Iterator scanSpecIterator;
+
+ public DruidReaderFactory(DruidSubScan subScan) {
+ this.subScan = subScan;
+ this.scanSpecIterator = subScan.getScanSpec().listIterator();
+ this.offsetTracker = new DruidOffsetTracker();
+ }
+
+ @Override
+ public void bind(ManagedScanFramework framework) {
+
+ }
+
+ @Override
+ public ManagedReader extends SchemaNegotiator> next() {
+ if (scanSpecIterator.hasNext()) {
+ DruidSubScanSpec scanSpec = scanSpecIterator.next();
+ return new DruidBatchRecordReader(subScan, scanSpec, subScan.getColumns(), subScan.getMaxRecordsToRead(), subScan.getStorageEngine(), offsetTracker);
}
+ return null;
}
- logger.debug("Number of record readers initialized - {}", readers.size());
- return new ScanBatch(subScan, context, readers);
}
}
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePlugin.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePlugin.java
index f702f1a2904..f066cde6a35 100755
--- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePlugin.java
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePlugin.java
@@ -18,11 +18,17 @@
package org.apache.drill.exec.store.druid;
import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableSet;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.drill.common.JSONOptions;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.metastore.MetadataProviderManager;
import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.planner.PlannerPhase;
import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.SessionOptionManager;
import org.apache.drill.exec.store.AbstractStoragePlugin;
import org.apache.drill.exec.store.SchemaConfig;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
@@ -32,9 +38,9 @@
import org.apache.drill.exec.store.druid.rest.RestClient;
import org.apache.drill.exec.store.druid.rest.RestClientWrapper;
import org.apache.drill.exec.store.druid.schema.DruidSchemaFactory;
-import com.google.common.collect.ImmutableSet;
import java.io.IOException;
+import java.util.List;
import java.util.Set;
public class DruidStoragePlugin extends AbstractStoragePlugin {
@@ -57,9 +63,36 @@ public DruidStoragePlugin(DruidStoragePluginConfig pluginConfig, DrillbitContext
}
@Override
- public DruidGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
- DruidScanSpec scanSpec = selection.getListWith(new TypeReference() {});
- return new DruidGroupScan(userName, this, scanSpec, null, -1);
+ public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection,
+ SessionOptionManager options) throws IOException {
+ return getPhysicalScan(userName, selection, AbstractGroupScan.ALL_COLUMNS,
+ options, null);
+ }
+
+
+ @Override
+ public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection,
+ SessionOptionManager options, MetadataProviderManager metadataProviderManager) throws IOException {
+ return getPhysicalScan(userName, selection, AbstractGroupScan.ALL_COLUMNS,
+ options, metadataProviderManager);
+ }
+
+ @Override
+ public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection,
+ List columns) throws IOException {
+ return getPhysicalScan(userName, selection, columns, null, null);
+ }
+
+ @Override
+ public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
+ return getPhysicalScan(userName, selection, AbstractGroupScan.ALL_COLUMNS, null);
+ }
+
+ @Override
+ public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List columns, SessionOptionManager options,
+ MetadataProviderManager metadataProviderManager) throws IOException {
+ DruidScanSpec scanSpec = selection.getListWith(new ObjectMapper(), new TypeReference() {});
+ return new DruidGroupScan(userName, this, scanSpec, null, -1, metadataProviderManager);
}
@Override
diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidSubScan.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidSubScan.java
index 05405126141..bdc30ca1bb2 100644
--- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidSubScan.java
+++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidSubScan.java
@@ -29,6 +29,7 @@
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.PhysicalVisitor;
import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.druid.common.DruidFilter;
import com.google.common.base.Preconditions;
@@ -53,30 +54,36 @@ public class DruidSubScan extends AbstractBase implements SubScan {
private final List columns;
private final int maxRecordsToRead;
+ private final TupleMetadata schema;
+
@JsonCreator
public DruidSubScan(@JacksonInject StoragePluginRegistry registry,
@JsonProperty("userName") String userName,
@JsonProperty("config") StoragePluginConfig config,
@JsonProperty("scanSpec") LinkedList datasourceScanSpecList,
@JsonProperty("columns") List columns,
- @JsonProperty("maxRecordsToRead") int maxRecordsToRead) {
+ @JsonProperty("maxRecordsToRead") int maxRecordsToRead,
+ @JsonProperty("schema") TupleMetadata schema) {
super(userName);
druidStoragePlugin = registry.resolve(config, DruidStoragePlugin.class);
this.scanSpec = datasourceScanSpecList;
this.columns = columns;
this.maxRecordsToRead = maxRecordsToRead;
+ this.schema = schema;
}
public DruidSubScan(String userName,
DruidStoragePlugin plugin,
List dataSourceInfoList,
List columns,
- int maxRecordsToRead) {
+ int maxRecordsToRead,
+ TupleMetadata schema) {
super(userName);
this.druidStoragePlugin = plugin;
this.scanSpec = dataSourceInfoList;
this.columns = columns;
this.maxRecordsToRead = maxRecordsToRead;
+ this.schema = schema;
}
@Override
@@ -93,6 +100,10 @@ public List getColumns() {
return columns;
}
+ public TupleMetadata getSchema() {
+ return schema;
+ }
+
public int getMaxRecordsToRead() { return maxRecordsToRead; }
@JsonIgnore
@@ -109,7 +120,7 @@ public DruidStoragePlugin getStorageEngine(){
@Override
public PhysicalOperator getNewWithChildren(List children) {
Preconditions.checkArgument(children.isEmpty());
- return new DruidSubScan(getUserName(), druidStoragePlugin, scanSpec, columns, maxRecordsToRead);
+ return new DruidSubScan(getUserName(), druidStoragePlugin, scanSpec, columns, maxRecordsToRead, schema);
}
@JsonIgnore
@@ -131,17 +142,21 @@ public static class DruidSubScanSpec {
protected final String maxTime;
protected final String minTime;
+ protected final TupleMetadata schema;
+
@JsonCreator
public DruidSubScanSpec(@JsonProperty("dataSourceName") String dataSourceName,
@JsonProperty("filter") DruidFilter filter,
@JsonProperty("dataSourceSize") long dataSourceSize,
@JsonProperty("minTime") String minTime,
- @JsonProperty("maxTime") String maxTime) {
+ @JsonProperty("maxTime") String maxTime,
+ @JsonProperty("schema") TupleMetadata schema) {
this.dataSourceName = dataSourceName;
this.filter = filter;
this.dataSourceSize = dataSourceSize;
this.minTime = minTime;
this.maxTime = maxTime;
+ this.schema = schema;
}
public String getDataSourceName() {
@@ -158,6 +173,10 @@ public String getMinTime() {
return minTime;
}
+ public TupleMetadata getSchema() {
+ return schema;
+ }
+
public String getMaxTime() {
return maxTime;
}
@@ -170,6 +189,7 @@ public String toString() {
.field("dataSourceSize", dataSourceSize)
.field("minTime", minTime)
.field("maxTime", maxTime)
+ .field("schema", schema)
.toString();
}
}
diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidFilterBuilderTest.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidFilterBuilderTest.java
index 9ca7c0ec449..5dcb42bfa8b 100644
--- a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidFilterBuilderTest.java
+++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidFilterBuilderTest.java
@@ -28,7 +28,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -71,7 +71,7 @@ public void setup() {
when(logicalExpression.accept(any(), any())).thenReturn(druidScanSpecRight);
} catch (Exception ignored) { }
- DruidGroupScan druidGroupScan = new DruidGroupScan("some username", null, druidScanSpecLeft, null, 5);
+ DruidGroupScan druidGroupScan = new DruidGroupScan("some username", null, druidScanSpecLeft, null, 5, null);
druidFilterBuilder = new DruidFilterBuilder(druidGroupScan, logicalExpression);
}
@@ -80,7 +80,7 @@ public void parseTreeWithAndOfTwoSelectorFilters() {
DruidScanSpec parsedSpec = druidFilterBuilder.parseTree();
String expectedFilterJson = "{\"type\":\"and\",\"fields\":[{\"type\":\"selector\",\"dimension\":\"some dimension\",\"value\":\"some value\"},{\"type\":\"selector\",\"dimension\":\"some other dimension\",\"value\":\"some other value\"}]}";
String actual = parsedSpec.getFilter().toJson();
- assertThat(actual).isEqualTo(expectedFilterJson);
+ assertEquals(expectedFilterJson, actual);
}
@Test
@@ -99,7 +99,7 @@ public void visitBooleanOperatorWithAndOperator() {
druidFilterBuilder.visitBooleanOperator(booleanOperator, null);
String expectedFilterJson = "{\"type\":\"and\",\"fields\":[{\"type\":\"selector\",\"dimension\":\"some dimension\",\"value\":\"some value\"},{\"type\":\"selector\",\"dimension\":\"some other dimension\",\"value\":\"some other value\"}]}";
String actual = druidScanSpec.getFilter().toJson();
- assertThat(actual).isEqualTo(expectedFilterJson);
+ assertEquals(expectedFilterJson, actual);
}
@Test
@@ -118,6 +118,6 @@ public void visitBooleanOperatorWithOrOperator() {
druidFilterBuilder.visitBooleanOperator(booleanOperator, null);
String expectedFilterJson = "{\"type\":\"or\",\"fields\":[{\"type\":\"selector\",\"dimension\":\"some dimension\",\"value\":\"some value\"},{\"type\":\"selector\",\"dimension\":\"some other dimension\",\"value\":\"some other value\"}]}";
String actual = druidScanSpec.getFilter().toJson();
- assertThat(actual).isEqualTo(expectedFilterJson);
+ assertEquals(actual, expectedFilterJson);
}
}
diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidScanSpecBuilderTest.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidScanSpecBuilderTest.java
index 6c8a4f06ce4..c2adbe80b7e 100644
--- a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidScanSpecBuilderTest.java
+++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidScanSpecBuilderTest.java
@@ -22,8 +22,9 @@
import org.apache.drill.exec.store.druid.common.DruidConstants;
import org.junit.Before;
import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
-import static org.assertj.core.api.Assertions.assertThat;
public class DruidScanSpecBuilderTest {
@@ -54,8 +55,8 @@ public void buildCalledWithEqualFxShouldBuildSelectorFilter() {
FunctionNames.EQ,
schemaPath,
SOME_VALUE);
-
- assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"type\":\"selector\",\"dimension\":\"some field\",\"value\":\"some value\"}");
+ String actual = "{\"type\":\"selector\",\"dimension\":\"some field\",\"value\":\"some value\"}";
+ assertEquals(druidScanSpec.getFilter().toJson(), actual);
}
@Test
@@ -70,8 +71,8 @@ public void buildCalledWithEqualFxIntervalFieldShouldBuildIntervalFilter() {
FunctionNames.EQ,
schemaPath,
SOME_VALUE);
-
- assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"eventInterval\":\"some value\"}");
+ String actual = "{\"eventInterval\":\"some value\"}";
+ assertEquals(druidScanSpec.getFilter().toJson(), actual);
}
@Test
@@ -86,8 +87,8 @@ public void buildCalledWithNotEqualFxShouldBuildSelectorFilter() {
FunctionNames.NE,
schemaPath, SOME_VALUE
);
-
- assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"type\":\"not\",\"field\":{\"type\":\"selector\",\"dimension\":\"some field\",\"value\":\"some value\"}}");
+ String actual = "{\"type\":\"not\",\"field\":{\"type\":\"selector\",\"dimension\":\"some field\",\"value\":\"some value\"}}";
+ assertEquals(druidScanSpec.getFilter().toJson(), actual);
}
@Test
@@ -103,8 +104,8 @@ public void buildCalledWithGreaterThanOrEqualToFxShouldBuildBoundFilter() {
schemaPath,
SOME_VALUE
);
-
- assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"type\":\"bound\",\"dimension\":\"some field\",\"lower\":\"some value\",\"ordering\":\"lexicographic\"}");
+ String actual = "{\"type\":\"bound\",\"dimension\":\"some field\",\"lower\":\"some value\",\"ordering\":\"lexicographic\"}";
+ assertEquals(druidScanSpec.getFilter().toJson(), actual);
}
@Test
@@ -120,8 +121,8 @@ public void buildCalledWithGreaterThanFxShouldBuildBoundFilter() {
schemaPath,
SOME_VALUE
);
-
- assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"type\":\"bound\",\"dimension\":\"some field\",\"lower\":\"some value\",\"lowerStrict\":true,\"ordering\":\"lexicographic\"}");
+ String actual = "{\"type\":\"bound\",\"dimension\":\"some field\",\"lower\":\"some value\",\"lowerStrict\":true,\"ordering\":\"lexicographic\"}";
+ assertEquals(druidScanSpec.getFilter().toJson(), actual);
}
@Test
@@ -138,7 +139,8 @@ public void buildCalledWithGreaterThanFxAndNumericValueShouldBuildBoundFilter()
"1"
);
- assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"type\":\"bound\",\"dimension\":\"some field\",\"lower\":\"1\",\"lowerStrict\":true,\"ordering\":\"numeric\"}");
+ String actual = "{\"type\":\"bound\",\"dimension\":\"some field\",\"lower\":\"1\",\"lowerStrict\":true,\"ordering\":\"numeric\"}";
+ assertEquals(druidScanSpec.getFilter().toJson(), actual);
}
@Test
@@ -154,7 +156,8 @@ public void buildCalledWithLessThanOrEqualToFxShouldBuildBoundFilter() {
schemaPath,
SOME_VALUE);
- assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"type\":\"bound\",\"dimension\":\"some field\",\"upper\":\"some value\",\"ordering\":\"lexicographic\"}");
+ String actual = "{\"type\":\"bound\",\"dimension\":\"some field\",\"upper\":\"some value\",\"ordering\":\"lexicographic\"}";
+ assertEquals(druidScanSpec.getFilter().toJson(), actual);
}
@Test
@@ -169,7 +172,8 @@ public void buildCalledWithLessThanFxShouldBuildBoundFilter() {
schemaPath,
SOME_VALUE);
- assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"type\":\"bound\",\"dimension\":\"some field\",\"upper\":\"some value\",\"upperStrict\":true,\"ordering\":\"lexicographic\"}");
+ String actual = "{\"type\":\"bound\",\"dimension\":\"some field\",\"upper\":\"some value\",\"upperStrict\":true,\"ordering\":\"lexicographic\"}";
+ assertEquals(druidScanSpec.getFilter().toJson(), actual);
}
@Test
@@ -184,7 +188,8 @@ public void buildCalledWithLessThanFxAndNumericValueShouldBuildBoundFilter() {
schemaPath,
"1");
- assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"type\":\"bound\",\"dimension\":\"some field\",\"upper\":\"1\",\"upperStrict\":true,\"ordering\":\"numeric\"}");
+ String actual = "{\"type\":\"bound\",\"dimension\":\"some field\",\"upper\":\"1\",\"upperStrict\":true,\"ordering\":\"numeric\"}";
+ assertEquals(druidScanSpec.getFilter().toJson(), actual);
}
@Test
@@ -199,8 +204,9 @@ public void buildCalledWithIsNullFxShouldBuildSelectorFilter() {
FunctionNames.IS_NULL,
schemaPath,
null);
- assertThat(druidScanSpec).isNotNull();
- assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"type\":\"selector\",\"dimension\":\"some field\",\"value\":null}");
+ assertNotNull(druidScanSpec);
+ String actual = "{\"type\":\"selector\",\"dimension\":\"some field\",\"value\":null}";
+ assertEquals(druidScanSpec.getFilter().toJson(), actual);
}
@Test
@@ -215,8 +221,9 @@ public void buildCalledWithIsNotNullFxShouldBuildSelectorFilter() {
FunctionNames.IS_NOT_NULL,
schemaPath,
null);
- assertThat(druidScanSpec).isNotNull();
- assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"type\":\"not\",\"field\":{\"type\":\"selector\",\"dimension\":\"some field\",\"value\":null}}");
+ assertNotNull(druidScanSpec);
+ String actual = "{\"type\":\"not\",\"field\":{\"type\":\"selector\",\"dimension\":\"some field\",\"value\":null}}";
+ assertEquals(druidScanSpec.getFilter().toJson(), actual);
}
@Test
@@ -232,7 +239,8 @@ public void buildCalledWithLikeFxButIfValueIsPrefixedWithRegexKeywordHintShouldB
schemaPath,
"$regex$_some_regular_expression");
- assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"type\":\"regex\",\"dimension\":\"some field\",\"pattern\":\"some_regular_expression\"}");
+ String actual = "{\"type\":\"regex\",\"dimension\":\"some field\",\"pattern\":\"some_regular_expression\"}";
+ assertEquals(druidScanSpec.getFilter().toJson(), actual);
}
@Test
@@ -247,7 +255,7 @@ public void buildCalledWithLikeFxShouldBuildSearchFilter() {
FunctionNames.LIKE,
schemaPath,
"some search string");
-
- assertThat(druidScanSpec.getFilter().toJson()).isEqualTo("{\"type\":\"search\",\"dimension\":\"some field\",\"query\":{\"type\":\"contains\",\"value\":\"some search string\",\"caseSensitive\":false}}");
+ String actual = "{\"type\":\"search\",\"dimension\":\"some field\",\"query\":{\"type\":\"contains\",\"value\":\"some search string\",\"caseSensitive\":false}}";
+ assertEquals(druidScanSpec.getFilter().toJson(), actual);
}
}
diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfigTest.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfigTest.java
index dd76a64ed37..027f80c8ee6 100644
--- a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfigTest.java
+++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfigTest.java
@@ -28,7 +28,9 @@
import java.io.IOException;
import java.net.URISyntaxException;
-import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
public class DruidStoragePluginConfigTest {
@@ -40,11 +42,11 @@ public void testDruidStoragePluginConfigSuccessfullyParsed()
Resources.getResource("bootstrap-storage-plugins.json").toURI()));
DruidStoragePluginConfig druidStoragePluginConfig =
mapper.treeToValue(storagePluginJson.get("storage").get("druid"), DruidStoragePluginConfig.class);
- assertThat(druidStoragePluginConfig).isNotNull();
- assertThat(druidStoragePluginConfig.getBrokerAddress()).isEqualTo("http://localhost:8082");
- assertThat(druidStoragePluginConfig.getCoordinatorAddress()).isEqualTo("http://localhost:8081");
- assertThat(druidStoragePluginConfig.getAverageRowSizeBytes()).isEqualTo(200);
- assertThat(druidStoragePluginConfig.isEnabled()).isFalse();
+ assertNotNull(druidStoragePluginConfig);
+ assertEquals("http://localhost:8082", druidStoragePluginConfig.getBrokerAddress());
+ assertEquals("http://localhost:8081", druidStoragePluginConfig.getCoordinatorAddress());
+ assertEquals(200, druidStoragePluginConfig.getAverageRowSizeBytes());
+ assertFalse(druidStoragePluginConfig.isEnabled());
}
@Test
@@ -59,6 +61,6 @@ public void testDefaultRowSizeUsedWhenNotProvidedInConfig()
JsonNode storagePluginJson = mapper.readTree(druidConfigStr);
DruidStoragePluginConfig druidStoragePluginConfig =
mapper.treeToValue(storagePluginJson.get("storage").get("druid"), DruidStoragePluginConfig.class);
- assertThat(druidStoragePluginConfig.getAverageRowSizeBytes()).isEqualTo(100);
+ assertEquals(100, druidStoragePluginConfig.getAverageRowSizeBytes());
}
}
diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestConstants.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestConstants.java
index 78c8de00e67..24ac6dfd6ef 100644
--- a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestConstants.java
+++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidTestConstants.java
@@ -25,4 +25,5 @@ public interface DruidTestConstants {
String TEST_STRING_TWO_OR_EQUALS_FILTER_QUERY_TEMPLATE1 = "select * from druid.`%s` as ds where ds.user = 'Dansker' OR ds.page = '1904'";
String TEST_QUERY_PROJECT_PUSH_DOWN_TEMPLATE_1 = "SELECT ds.`comment` FROM druid.`%s` as ds";
String TEST_QUERY_COUNT_QUERY_TEMPLATE = "SELECT count(*) as mycount FROM druid.`%s` as ds";
+ String TEST_STAR_QUERY = "SELECT * FROM druid.`%s`";
}
diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDruidQueries.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDruidQueries.java
index deea3f1edaf..45f2267dd1e 100644
--- a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDruidQueries.java
+++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/TestDruidQueries.java
@@ -20,14 +20,32 @@
import org.apache.drill.categories.DruidStorageTest;
import org.apache.drill.categories.SlowTest;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.rowSet.RowSetComparison;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import static org.junit.Assert.assertEquals;
+
+
@Ignore("These tests require a running druid instance. You may start druid by using the docker-compose provide in resources/druid and enable these tests")
@Category({SlowTest.class, DruidStorageTest.class})
public class TestDruidQueries extends DruidTestBase {
+ @Test
+ public void testStarQuery() throws Exception {
+ testBuilder()
+ .sqlQuery(String.format(TEST_STAR_QUERY, TEST_DATASOURCE_WIKIPEDIA))
+ .unOrdered()
+ .expectsNumRecords(876)
+ .go();
+ }
+
@Test
public void testEqualsFilter() throws Exception {
testBuilder()
@@ -51,7 +69,7 @@ public void testTwoOrdEqualsFilter() throws Exception {
testBuilder()
.sqlQuery(String.format(TEST_STRING_TWO_OR_EQUALS_FILTER_QUERY_TEMPLATE1, TEST_DATASOURCE_WIKIPEDIA))
.unOrdered()
- .expectsNumRecords(3)
+ .expectsNumRecords(1)
.go();
}
@@ -63,7 +81,7 @@ public void testSingleColumnProject() throws Exception {
.sqlQuery(query)
.unOrdered()
.baselineColumns("comment")
- .expectsNumRecords(24433)
+ .expectsNumRecords(876)
.go();
}
@@ -75,7 +93,38 @@ public void testCountAllRowsQuery() throws Exception {
.sqlQuery(query)
.unOrdered()
.baselineColumns("mycount")
- .baselineValues(24433L)
+ .baselineValues(876L)
.go();
}
+
+ @Test
+ public void testGroupByQuery() throws Exception {
+ String sql = String.format("SELECT `namespace`, COUNT(*) AS user_count FROM druid.`%s` GROUP BY `namespace` ORDER BY user_count DESC LIMIT 5",TEST_DATASOURCE_WIKIPEDIA);
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("namespace", MinorType.VARCHAR, DataMode.OPTIONAL)
+ .add("user_count", MinorType.BIGINT)
+ .buildSchema();
+
+ RowSet expected = client.rowSetBuilder(expectedSchema)
+ .addRow("Main", 702)
+ .addRow("User talk", 29)
+ .addRow("Wikipedia", 26)
+ .addRow("Talk", 17)
+ .addRow("User", 12)
+ .build();
+
+ new RowSetComparison(expected).verifyAndClearAll(results);
+ }
+
+ @Test
+ public void testSerDe() throws Exception {
+ // TODO Start here... filters are not deserializing properly
+
+ String sql = String.format("SELECT COUNT(*) FROM druid.`%s`", TEST_DATASOURCE_WIKIPEDIA);
+ String plan = queryBuilder().sql(sql).explainJson();
+ long cnt = queryBuilder().physical(plan).singletonLong();
+ assertEquals("Counts should match", 876L, cnt);
+ }
}
diff --git a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/rest/DruidQueryClientTest.java b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/rest/DruidQueryClientTest.java
index bb065ab7b31..7818ee33905 100644
--- a/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/rest/DruidQueryClientTest.java
+++ b/contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/rest/DruidQueryClientTest.java
@@ -17,23 +17,23 @@
*/
package org.apache.drill.exec.store.druid.rest;
+import okhttp3.Response;
+import okhttp3.ResponseBody;
import org.apache.drill.exec.store.druid.druid.DruidScanResponse;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
-import okhttp3.Response;
-import okhttp3.ResponseBody;
-
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import static org.assertj.core.api.Assertions.assertThat;
public class DruidQueryClientTest {
@@ -79,7 +79,7 @@ public void executeQueryCalledNoResponsesFoundReturnsEmptyEventList()
when(httpResponseBody.byteStream()).thenReturn(inputStream);
DruidScanResponse response = druidQueryClient.executeQuery(QUERY);
- assertThat(response.getEvents()).isEmpty();
+ assertEquals(0, response.getEvents().size());
}
@Test
@@ -91,9 +91,9 @@ public void executeQueryCalledSuccessfullyParseQueryResults()
when(httpResponseBody.byteStream()).thenReturn(inputStream);
DruidScanResponse response = druidQueryClient.executeQuery(QUERY);
- assertThat(response.getEvents()).isNotEmpty();
- assertThat(response.getEvents().size()).isEqualTo(1);
- assertThat(response.getEvents().get(0).get("user").textValue()).isEqualTo("Dansker");
- assertThat(response.getEvents().get(0).get("sum_deleted").intValue()).isEqualTo(133);
+ assertFalse(response.getEvents().isEmpty());
+ assertEquals(response.getEvents().size(), 1);
+ assertEquals(response.getEvents().get(0).get("user").textValue(), "Dansker");
+ assertEquals(response.getEvents().get(0).get("sum_deleted").intValue(), 133);
}
}
diff --git a/contrib/storage-druid/src/test/resources/druid/docker-compose.yaml b/contrib/storage-druid/src/test/resources/druid/docker-compose.yaml
index 700e9704dd8..53cca236965 100644
--- a/contrib/storage-druid/src/test/resources/druid/docker-compose.yaml
+++ b/contrib/storage-druid/src/test/resources/druid/docker-compose.yaml
@@ -31,7 +31,7 @@ volumes:
services:
postgres:
container_name: postgres
- image: postgres:latest
+ image: postgres:12
volumes:
- metadata_data:/var/lib/postgresql/data
environment:
@@ -65,7 +65,7 @@ services:
- environment.env
broker:
- image: apache/druid:0.22.0
+ image: apache/druid:30.0.0
container_name: broker
volumes:
- broker_var:/opt/druid/var
@@ -81,7 +81,7 @@ services:
- environment.env
historical:
- image: apache/druid:0.22.0
+ image: apache/druid:30.0.0
container_name: historical
volumes:
- druid_shared:/opt/shared
@@ -98,7 +98,7 @@ services:
- environment.env
middlemanager:
- image: apache/druid:0.22.0
+ image: apache/druid:30.0.0
container_name: middlemanager
volumes:
- druid_shared:/opt/shared
@@ -116,7 +116,7 @@ services:
- environment.env
router:
- image: apache/druid:0.22.0
+ image: apache/druid:30.0.0
container_name: router
volumes:
- router_var:/opt/druid/var