diff --git a/distribution/pom.xml b/distribution/pom.xml
index 1c472144c8bd..dea41117e70f 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -257,6 +257,8 @@
org.apache.druid.extensions:druid-kubernetes-extensions
-c
org.apache.druid.extensions:druid-catalog
+ -c
+ org.apache.druid.extensions:druid-deltalake-extensions
${druid.distribution.pulldeps.opts}
@@ -453,6 +455,8 @@
-c
org.apache.druid.extensions:druid-iceberg-extensions
-c
+ org.apache.druid.extensions:druid-deltalake-extensions
+ -c
org.apache.druid.extensions.contrib:druid-spectator-histogram
diff --git a/docs/development/extensions-contrib/delta-lake.md b/docs/development/extensions-contrib/delta-lake.md
new file mode 100644
index 000000000000..8b1de4849718
--- /dev/null
+++ b/docs/development/extensions-contrib/delta-lake.md
@@ -0,0 +1,44 @@
+---
+id: delta-lake
+title: "Delta Lake extension"
+---
+
+
+
+## Delta Lake extension
+
+
+Delta Lake is an open source storage framework that enables building a
+Lakehouse architecture with various compute engines. [DeltaLakeInputSource](../../ingestion/input-sources.md#delta-lake-input-source) lets
+you ingest data stored in a Delta Lake table into Apache Druid. To use the Delta Lake extension, add the `druid-deltalake-extensions` to the list of loaded extensions.
+See [Loading extensions](../../configuration/extensions.md#loading-extensions) for more information.
+
+The Delta input source reads the configured Delta Lake table and extracts all the underlying delta files in the table's latest snapshot.
+These Delta Lake files are versioned Parquet files.
+
+## Version support
+
+The Delta Lake extension uses the Delta Kernel introduced in Delta Lake 3.0.0, which is compatible with Apache Spark 3.5.x.
+Older versions are unsupported, so consider upgrading to Delta Lake 3.0.x or higher to use this extension.
+
+## Known limitations
+
+- This extension relies on the Delta Kernel API and can only read from the latest Delta table snapshot.
+- Column filtering isn't supported. The extension reads all columns in the configured table.
\ No newline at end of file
diff --git a/docs/ingestion/input-sources.md b/docs/ingestion/input-sources.md
index 8e70ed8b0ef7..635f0b7c5b1f 100644
--- a/docs/ingestion/input-sources.md
+++ b/docs/ingestion/input-sources.md
@@ -823,6 +823,13 @@ rolled-up datasource `wikipedia_rollup` by grouping on hour, "countryName", and
to `true` to enable a compatibility mode where the timestampSpec is ignored.
:::
+The [secondary partitioning method](native-batch.md#partitionsspec) determines the requisite number of concurrent worker tasks that run in parallel to complete ingestion with the Combining input source.
+Set this value in `maxNumConcurrentSubTasks` in `tuningConfig` based on the secondary partitioning method:
+- `range` or `single_dim` partitioning: greater than or equal to 1
+- `hashed` or `dynamic` partitioning: greater than or equal to 2
+
+For more information on the `maxNumConcurrentSubTasks` field, see [Implementation considerations](native-batch.md#implementation-considerations).
+
## SQL input source
The SQL input source is used to read data directly from RDBMS.
@@ -925,7 +932,7 @@ The following is an example of a Combining input source spec:
## Iceberg input source
:::info
- To use the Iceberg input source, add the `druid-iceberg-extensions` extension.
+To use the Iceberg input source, load the extension [`druid-iceberg-extensions`](../development/extensions-contrib/iceberg.md).
:::
You use the Iceberg input source to read data stored in the Iceberg table format. For a given table, the input source scans up to the latest Iceberg snapshot from the configured Hive catalog. Druid ingests the underlying live data files using the existing input source formats.
@@ -1114,11 +1121,31 @@ This input source provides the following filters: `and`, `equals`, `interval`, a
|type|Set this value to `not`.|yes|
|filter|The iceberg filter on which logical NOT is applied|yes|
+## Delta Lake input source
+:::info
+To use the Delta Lake input source, load the extension [`druid-deltalake-extensions`](../development/extensions-contrib/delta-lake.md).
+:::
-The [secondary partitioning method](native-batch.md#partitionsspec) determines the requisite number of concurrent worker tasks that run in parallel to complete ingestion with the Combining input source.
-Set this value in `maxNumConcurrentSubTasks` in `tuningConfig` based on the secondary partitioning method:
-- `range` or `single_dim` partitioning: greater than or equal to 1
-- `hashed` or `dynamic` partitioning: greater than or equal to 2
+You can use the Delta input source to read data stored in a Delta Lake table. For a given table, the input source scans
+the latest snapshot from the configured table. Druid ingests the underlying delta files from the table.
+
+The following is a sample spec:
+
+```json
+...
+ "ioConfig": {
+ "type": "index_parallel",
+ "inputSource": {
+ "type": "delta",
+ "tablePath": "/delta-table/directory"
+ },
+ }
+}
+```
+
+| Property|Description|Required|
+|---------|-----------|--------|
+| type|Set this value to `delta`.|yes|
+| tablePath|The location of the Delta table.|yes|
-For more information on the `maxNumConcurrentSubTasks` field, see [Implementation considerations](native-batch.md#implementation-considerations).
diff --git a/extensions-contrib/druid-deltalake-extensions/pom.xml b/extensions-contrib/druid-deltalake-extensions/pom.xml
new file mode 100644
index 000000000000..267d21936928
--- /dev/null
+++ b/extensions-contrib/druid-deltalake-extensions/pom.xml
@@ -0,0 +1,156 @@
+
+
+
+
+ org.apache.druid.extensions
+ druid-deltalake-extensions
+ druid-deltalake-extensions
+ Delta Lake connector for Druid
+
+
+ druid
+ org.apache.druid
+ 30.0.0-SNAPSHOT
+ ../../pom.xml
+
+ 4.0.0
+
+
+ 3.0.0
+
+
+
+
+ io.delta
+ delta-kernel-api
+ ${delta-kernel.version}
+
+
+
+ io.delta
+ delta-kernel-defaults
+ ${delta-kernel.version}
+
+
+ org.apache.hadoop
+ hadoop-client-api
+ ${hadoop.compile.version}
+ compile
+
+
+
+ org.apache.druid
+ druid-processing
+ ${project.parent.version}
+ provided
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+ provided
+
+
+
+ com.google.code.findbugs
+ jsr305
+ provided
+
+
+ com.google.guava
+ guava
+ provided
+
+
+ com.google.inject
+ guice
+ provided
+
+
+ joda-time
+ joda-time
+ provided
+
+
+ com.fasterxml.jackson.core
+ jackson-core
+ provided
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ 2.12.7.1
+
+
+ it.unimi.dsi
+ fastutil-core
+ 8.5.4
+ provided
+
+
+ org.apache.hadoop
+ hadoop-aws
+ ${hadoop.compile.version}
+ runtime
+
+
+ com.amazonaws
+ aws-java-sdk-bundle
+
+
+
+
+
+ junit
+ junit
+ test
+
+
+ org.apache.druid
+ druid-processing
+ ${project.parent.version}
+ test-jar
+ test
+
+
+ org.hamcrest
+ hamcrest-all
+ test
+
+
+ org.hamcrest
+ hamcrest-core
+ test
+
+
+
+
+
+
+ org.owasp
+ dependency-check-maven
+
+ true
+
+
+
+
+
diff --git a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/common/DeltaLakeDruidModule.java b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/common/DeltaLakeDruidModule.java
new file mode 100644
index 000000000000..70c2b6ff655b
--- /dev/null
+++ b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/common/DeltaLakeDruidModule.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.delta.common;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.inject.Binder;
+import org.apache.druid.delta.input.DeltaInputSource;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.initialization.DruidModule;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+
+import java.util.Collections;
+import java.util.List;
+
+public class DeltaLakeDruidModule implements DruidModule
+{
+ @Override
+ public List extends Module> getJacksonModules()
+ {
+ return Collections.singletonList(
+ new SimpleModule("DeltaLakeDruidModule")
+ .registerSubtypes(
+ new NamedType(DeltaInputSource.class, DeltaInputSource.TYPE_KEY)
+ )
+ );
+ }
+
+ @Override
+ public void configure(Binder binder)
+ {
+ final Configuration conf = new Configuration();
+ conf.setClassLoader(getClass().getClassLoader());
+
+ ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+ FileSystem.get(conf);
+ }
+ catch (Exception ex) {
+ throw DruidException.forPersona(DruidException.Persona.DEVELOPER)
+ .ofCategory(DruidException.Category.UNCATEGORIZED)
+ .build(ex, "Problem during fileSystem class level initialization");
+ }
+ finally {
+ Thread.currentThread().setContextClassLoader(currCtxCl);
+ }
+ }
+
+}
+
diff --git a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputRow.java b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputRow.java
new file mode 100644
index 000000000000..acf909452a0b
--- /dev/null
+++ b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputRow.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.delta.input;
+
+import io.delta.kernel.types.BinaryType;
+import io.delta.kernel.types.BooleanType;
+import io.delta.kernel.types.ByteType;
+import io.delta.kernel.types.DataType;
+import io.delta.kernel.types.DateType;
+import io.delta.kernel.types.DecimalType;
+import io.delta.kernel.types.DoubleType;
+import io.delta.kernel.types.FloatType;
+import io.delta.kernel.types.IntegerType;
+import io.delta.kernel.types.LongType;
+import io.delta.kernel.types.ShortType;
+import io.delta.kernel.types.StringType;
+import io.delta.kernel.types.StructField;
+import io.delta.kernel.types.StructType;
+import io.delta.kernel.types.TimestampType;
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.Row;
+import org.apache.druid.data.input.impl.MapInputRowParser;
+import org.apache.druid.error.InvalidInput;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Encodes the row and schema information from the Delta Lake.
+ */
+public class DeltaInputRow implements InputRow
+{
+ private final io.delta.kernel.data.Row row;
+ private final StructType schema;
+ private final Object2IntMap fieldNameToOrdinal = new Object2IntOpenHashMap<>();
+ private final InputRow delegateRow;
+
+ public DeltaInputRow(io.delta.kernel.data.Row row, InputRowSchema inputRowSchema)
+ {
+ this.row = row;
+ this.schema = row.getSchema();
+ List fieldNames = this.schema.fieldNames();
+ for (int i = 0; i < fieldNames.size(); ++i) {
+ fieldNameToOrdinal.put(fieldNames.get(i), i);
+ }
+ fieldNameToOrdinal.defaultReturnValue(-1);
+
+ Map theMap = new HashMap<>();
+ for (String fieldName : fieldNames) {
+ theMap.put(fieldName, _getRaw(fieldName));
+ }
+ delegateRow = MapInputRowParser.parse(inputRowSchema, theMap);
+ }
+
+ @Override
+ public List getDimensions()
+ {
+ return delegateRow.getDimensions();
+ }
+
+ @Override
+ public long getTimestampFromEpoch()
+ {
+ return delegateRow.getTimestampFromEpoch();
+ }
+
+ @Override
+ public DateTime getTimestamp()
+ {
+ return delegateRow.getTimestamp();
+ }
+
+ @Override
+ public List getDimension(String dimension)
+ {
+ return delegateRow.getDimension(dimension);
+ }
+
+ @Nullable
+ @Override
+ public Object getRaw(String dimension)
+ {
+ return delegateRow.getRaw(dimension);
+ }
+
+ @Nullable
+ @Override
+ public Number getMetric(String metric)
+ {
+ return delegateRow.getMetric(metric);
+ }
+
+ @Override
+ public int compareTo(Row o)
+ {
+ return this.getTimestamp().compareTo(o.getTimestamp());
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ return o instanceof DeltaInputRow && compareTo((DeltaInputRow) o) == 0;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(row, schema, fieldNameToOrdinal, delegateRow);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "DeltaInputRow{" +
+ "row=" + row +
+ ", schema=" + schema +
+ ", fieldNameToOrdinal=" + fieldNameToOrdinal +
+ ", delegateRow=" + delegateRow +
+ '}';
+ }
+
+ public Map getRawRowAsMap()
+ {
+ return RowSerde.convertRowToJsonObject(row);
+ }
+
+ @Nullable
+ private Object _getRaw(String dimension)
+ {
+ StructField field = schema.get(dimension);
+ if (field == null || field.isMetadataColumn()) {
+ return null;
+ }
+
+ int ordinal = fieldNameToOrdinal.getInt(dimension);
+ if (ordinal < 0) {
+ return null;
+ }
+ return getValue(field.getDataType(), row, ordinal);
+ }
+
+ @Nullable
+ private static Object getValue(DataType dataType, io.delta.kernel.data.Row dataRow, int columnOrdinal)
+ {
+ if (dataRow.isNullAt(columnOrdinal)) {
+ return null;
+ } else if (dataType instanceof BooleanType) {
+ return dataRow.getBoolean(columnOrdinal);
+ } else if (dataType instanceof ByteType) {
+ return dataRow.getByte(columnOrdinal);
+ } else if (dataType instanceof ShortType) {
+ return dataRow.getShort(columnOrdinal);
+ } else if (dataType instanceof IntegerType) {
+ return dataRow.getInt(columnOrdinal);
+ } else if (dataType instanceof DateType) {
+ return DeltaTimeUtils.getSecondsFromDate(dataRow.getInt(columnOrdinal));
+ } else if (dataType instanceof LongType) {
+ return dataRow.getLong(columnOrdinal);
+ } else if (dataType instanceof TimestampType) {
+ return DeltaTimeUtils.getMillisFromTimestamp(dataRow.getLong(columnOrdinal));
+ } else if (dataType instanceof FloatType) {
+ return dataRow.getFloat(columnOrdinal);
+ } else if (dataType instanceof DoubleType) {
+ return dataRow.getDouble(columnOrdinal);
+ } else if (dataType instanceof StringType) {
+ return dataRow.getString(columnOrdinal);
+ } else if (dataType instanceof BinaryType) {
+ final byte[] arr = dataRow.getBinary(columnOrdinal);
+ final char[] charArray = new char[arr.length];
+ for (int i = 0; i < arr.length; i++) {
+ charArray[i] = (char) (arr[i]);
+ }
+ return String.valueOf(charArray);
+ } else if (dataType instanceof DecimalType) {
+ return dataRow.getDecimal(columnOrdinal).longValue();
+ } else {
+ throw InvalidInput.exception(
+ "Unsupported data type[%s] for fieldName[%s].",
+ dataType,
+ dataRow.getSchema().fieldNames().get(columnOrdinal)
+ );
+ }
+ }
+}
diff --git a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSource.java b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSource.java
new file mode 100644
index 000000000000..936e2dd70d6b
--- /dev/null
+++ b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSource.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.delta.input;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Iterators;
+import com.google.common.primitives.Ints;
+import io.delta.kernel.Scan;
+import io.delta.kernel.Snapshot;
+import io.delta.kernel.Table;
+import io.delta.kernel.TableNotFoundException;
+import io.delta.kernel.client.TableClient;
+import io.delta.kernel.data.FilteredColumnarBatch;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.defaults.client.DefaultTableClient;
+import io.delta.kernel.internal.util.Utils;
+import io.delta.kernel.types.StructField;
+import io.delta.kernel.types.StructType;
+import io.delta.kernel.utils.CloseableIterator;
+import org.apache.druid.data.input.ColumnsFilter;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.data.input.InputSourceReader;
+import org.apache.druid.data.input.InputSplit;
+import org.apache.druid.data.input.SplitHintSpec;
+import org.apache.druid.data.input.impl.SplittableInputSource;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.utils.Streams;
+import org.apache.hadoop.conf.Configuration;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Input source to ingest data from a Delta Lake.
+ * This input source reads the latest snapshot from a Delta table specified by {@code tablePath} parameter.
+ * We leverage the Delta Kernel APIs to interact with a Delta table. The Kernel API abstracts away the
+ * complexities of the Delta protocol itself.
+ * Note: currently, the Kernel table API only supports reading from the latest snapshot.
+ */
+public class DeltaInputSource implements SplittableInputSource
+{
+ public static final String TYPE_KEY = "delta";
+
+ @JsonProperty
+ private final String tablePath;
+
+ @JsonProperty
+ @Nullable
+ private final DeltaSplit deltaSplit;
+
+ @JsonCreator
+ public DeltaInputSource(
+ @JsonProperty("tablePath") String tablePath,
+ @JsonProperty("deltaSplit") @Nullable DeltaSplit deltaSplit
+ )
+ {
+ if (tablePath == null) {
+ throw InvalidInput.exception("tablePath cannot be null.");
+ }
+ this.tablePath = tablePath;
+ this.deltaSplit = deltaSplit;
+ }
+
+ @Override
+ public boolean needsFormat()
+ {
+ // Only support Parquet
+ return false;
+ }
+
+ /**
+ * Instantiates a {@link DeltaInputSourceReader} to read the Delta table rows. If a {@link DeltaSplit} is supplied,
+ * the Delta files and schema are obtained from it to instantiate the reader. Otherwise, a Delta table client is
+ * instantiated with the supplied configuration to read the table.
+ *
+ * @param inputRowSchema schema for {@link org.apache.druid.data.input.InputRow}
+ * @param inputFormat unused parameter. The input format is always parquet
+ * @param temporaryDirectory unused parameter
+ */
+ @Override
+ public InputSourceReader reader(
+ InputRowSchema inputRowSchema,
+ @Nullable InputFormat inputFormat,
+ File temporaryDirectory
+ )
+ {
+ final TableClient tableClient = createTableClient();
+ try {
+ final Row scanState;
+ final List scanRowList;
+
+ if (deltaSplit != null) {
+ scanState = deserialize(tableClient, deltaSplit.getStateRow());
+ scanRowList = deltaSplit.getFiles()
+ .stream()
+ .map(row -> deserialize(tableClient, row))
+ .collect(Collectors.toList());
+ } else {
+ final Table table = Table.forPath(tableClient, tablePath);
+ final Snapshot latestSnapshot = table.getLatestSnapshot(tableClient);
+ final StructType prunedSchema = pruneSchema(
+ latestSnapshot.getSchema(tableClient),
+ inputRowSchema.getColumnsFilter()
+ );
+ final Scan scan = latestSnapshot.getScanBuilder(tableClient).withReadSchema(tableClient, prunedSchema).build();
+ final CloseableIterator scanFiles = scan.getScanFiles(tableClient);
+
+ scanState = scan.getScanState(tableClient);
+ scanRowList = new ArrayList<>();
+
+ while (scanFiles.hasNext()) {
+ final FilteredColumnarBatch scanFileBatch = scanFiles.next();
+ final CloseableIterator scanFileRows = scanFileBatch.getRows();
+ scanFileRows.forEachRemaining(scanRowList::add);
+ }
+ }
+ return new DeltaInputSourceReader(
+ Scan.readData(
+ tableClient,
+ scanState,
+ Utils.toCloseableIterator(scanRowList.iterator()),
+ Optional.empty()
+ ),
+ inputRowSchema
+ );
+ }
+ catch (TableNotFoundException e) {
+ throw InvalidInput.exception(e, "tablePath[%s] not found.", tablePath);
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Stream> createSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec)
+ {
+ if (deltaSplit != null) {
+ // can't split a split
+ return Stream.of(new InputSplit<>(deltaSplit));
+ }
+
+ final TableClient tableClient = createTableClient();
+ final Snapshot latestSnapshot;
+ try {
+ final Table table = Table.forPath(tableClient, tablePath);
+ latestSnapshot = table.getLatestSnapshot(tableClient);
+ }
+ catch (TableNotFoundException e) {
+ throw InvalidInput.exception(e, "tablePath[%s] not found.", tablePath);
+ }
+ final Scan scan = latestSnapshot.getScanBuilder(tableClient).build();
+ // scan files iterator for the current snapshot
+ final CloseableIterator scanFilesIterator = scan.getScanFiles(tableClient);
+
+ final Row scanState = scan.getScanState(tableClient);
+ final String scanStateStr = RowSerde.serializeRowToJson(scanState);
+
+ Iterator deltaSplitIterator = Iterators.transform(
+ scanFilesIterator,
+ scanFile -> {
+ final CloseableIterator rows = scanFile.getRows();
+ final List fileRows = new ArrayList<>();
+ while (rows.hasNext()) {
+ fileRows.add(RowSerde.serializeRowToJson(rows.next()));
+ }
+ return new DeltaSplit(scanStateStr, fileRows);
+ }
+ );
+
+ return Streams.sequentialStreamFrom(deltaSplitIterator).map(InputSplit::new);
+ }
+
+ @Override
+ public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec)
+ {
+ return Ints.checkedCast(createSplits(inputFormat, splitHintSpec).count());
+ }
+
+ @Override
+ public InputSource withSplit(InputSplit split)
+ {
+ return new DeltaInputSource(
+ tablePath,
+ split.get()
+ );
+ }
+
+ private Row deserialize(TableClient tableClient, String row)
+ {
+ return RowSerde.deserializeRowFromJson(tableClient, row);
+ }
+
+ /**
+ * Utility method to return a pruned schema that contains the given {@code columns} from
+ * {@code baseSchema} applied by {@code columnsFilter}. This will serve as an optimization
+ * for table scans if we're interested in reading only a subset of columns from the Delta Lake table.
+ */
+ private StructType pruneSchema(final StructType baseSchema, final ColumnsFilter columnsFilter)
+ {
+ final List columnNames = baseSchema.fieldNames();
+ final List fiteredColumnNames = columnNames
+ .stream()
+ .filter(columnsFilter::apply)
+ .collect(Collectors.toList());
+
+ if (fiteredColumnNames.equals(columnNames)) {
+ return baseSchema;
+ }
+ final List selectedFields = fiteredColumnNames
+ .stream()
+ .map(baseSchema::get)
+ .collect(Collectors.toList());
+ return new StructType(selectedFields);
+ }
+
+ /**
+ * @return a table client where the client is initialized with {@link Configuration} class that uses the class's
+ * class loader instead of the context classloader. The latter by default doesn't know about the extension classes,
+ * so the table client cannot load runtime classes resulting in {@link ClassNotFoundException}.
+ */
+ private TableClient createTableClient()
+ {
+ final ClassLoader currCtxClassloader = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+ final Configuration conf = new Configuration();
+ return DefaultTableClient.create(conf);
+ }
+ finally {
+ Thread.currentThread().setContextClassLoader(currCtxClassloader);
+ }
+ }
+}
diff --git a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSourceReader.java b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSourceReader.java
new file mode 100644
index 000000000000..d0fc4780d001
--- /dev/null
+++ b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSourceReader.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.delta.input;
+
+import io.delta.kernel.data.FilteredColumnarBatch;
+import io.delta.kernel.data.Row;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowListPlusRawValues;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.InputSourceReader;
+import org.apache.druid.data.input.InputStats;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+/**
+ * A reader for the Delta Lake input source. It initializes an iterator {@link DeltaInputSourceIterator}
+ * for a subset of Delta records given by {@link FilteredColumnarBatch} and schema {@link InputRowSchema}.
+ *
+ */
+public class DeltaInputSourceReader implements InputSourceReader
+{
+ private final io.delta.kernel.utils.CloseableIterator filteredColumnarBatchCloseableIterator;
+ private final InputRowSchema inputRowSchema;
+
+ public DeltaInputSourceReader(
+ io.delta.kernel.utils.CloseableIterator filteredColumnarBatchCloseableIterator,
+ InputRowSchema inputRowSchema
+ )
+ {
+ this.filteredColumnarBatchCloseableIterator = filteredColumnarBatchCloseableIterator;
+ this.inputRowSchema = inputRowSchema;
+ }
+
+ @Override
+ public CloseableIterator read()
+ {
+ return new DeltaInputSourceIterator(filteredColumnarBatchCloseableIterator, inputRowSchema);
+ }
+
+ @Override
+ public CloseableIterator read(InputStats inputStats)
+ {
+ return new DeltaInputSourceIterator(filteredColumnarBatchCloseableIterator, inputRowSchema);
+ }
+
+ @Override
+ public CloseableIterator sample()
+ {
+
+ CloseableIterator inner = read();
+ return new CloseableIterator()
+ {
+ @Override
+ public void close() throws IOException
+ {
+ inner.close();
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ return inner.hasNext();
+ }
+
+ @Override
+ public InputRowListPlusRawValues next()
+ {
+ DeltaInputRow deltaInputRow = (DeltaInputRow) inner.next();
+ return InputRowListPlusRawValues.of(deltaInputRow, deltaInputRow.getRawRowAsMap());
+ }
+ };
+ }
+
+ private static class DeltaInputSourceIterator implements CloseableIterator
+ {
+ private final io.delta.kernel.utils.CloseableIterator filteredColumnarBatchCloseableIterator;
+
+ private io.delta.kernel.utils.CloseableIterator currentBatch = null;
+ private final InputRowSchema inputRowSchema;
+
+ public DeltaInputSourceIterator(
+ io.delta.kernel.utils.CloseableIterator filteredColumnarBatchCloseableIterator,
+ InputRowSchema inputRowSchema
+ )
+ {
+ this.filteredColumnarBatchCloseableIterator = filteredColumnarBatchCloseableIterator;
+ this.inputRowSchema = inputRowSchema;
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ while (currentBatch == null || !currentBatch.hasNext()) {
+ if (!filteredColumnarBatchCloseableIterator.hasNext()) {
+ return false; // No more batches or records to read!
+ }
+ currentBatch = filteredColumnarBatchCloseableIterator.next().getRows();
+ }
+ return true;
+ }
+
+ @Override
+ public InputRow next()
+ {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ Row dataRow = currentBatch.next();
+ return new DeltaInputRow(dataRow, inputRowSchema);
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ filteredColumnarBatchCloseableIterator.close();
+ }
+ }
+}
diff --git a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaSplit.java b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaSplit.java
new file mode 100644
index 000000000000..7ab52cef0897
--- /dev/null
+++ b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaSplit.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.delta.input;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+
+/**
+ * An input split of a Delta table containing the following information:
+ *
+ * {@code stateRow} represents the canonical json representation of the latest snapshot of the Delta table.
+ *
+ *
+ * {@code files} represents the list of files from the latest snapshot.
+ *
+ */
+public class DeltaSplit
+{
+ private final String stateRow;
+ private final List files;
+
+ @JsonCreator
+ public DeltaSplit(
+ @JsonProperty("state") final String stateRow,
+ @JsonProperty("files") final List files
+ )
+ {
+ this.stateRow = stateRow;
+ this.files = files;
+ }
+
+ @JsonProperty("state")
+ public String getStateRow()
+ {
+ return stateRow;
+ }
+
+ @JsonProperty("files")
+ public List getFiles()
+ {
+ return files;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "DeltaSplit{" +
+ "stateRow=" + stateRow +
+ ", files=" + files +
+ "}";
+ }
+}
diff --git a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaTimeUtils.java b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaTimeUtils.java
new file mode 100644
index 000000000000..f47428944027
--- /dev/null
+++ b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaTimeUtils.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.delta.input;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+
+public class DeltaTimeUtils
+{
+ private static final ZoneId ZONE_ID = ZoneId.systemDefault();
+
+ /**
+ * {@link io.delta.kernel.types.TimestampType} data in Delta Lake tables is stored internally as the number of
+ * microseconds since epoch.
+ *
+ * @param microSecsSinceEpochUTC microseconds since epoch
+ * @return Datetime millis correpsonding to {@code microSecsSinceEpochUTC}
+ */
+ public static long getMillisFromTimestamp(final long microSecsSinceEpochUTC)
+ {
+ final LocalDateTime dateTime = LocalDateTime.ofEpochSecond(
+ microSecsSinceEpochUTC / 1_000_000 /* epochSecond */,
+ (int) (1000 * microSecsSinceEpochUTC % 1_000_000) /* nanoOfSecond */,
+ ZoneOffset.UTC
+ );
+ return dateTime.atZone(ZONE_ID).toInstant().toEpochMilli();
+ }
+
+ /**
+ * {@link io.delta.kernel.types.DateType} data in Delta Lake tables is stored internally as the number of
+ * days since epoch.
+ *
+ * @param daysSinceEpochUTC number of days since epoch
+ * @return number of seconds corresponding to {@code daysSinceEpochUTC}.
+ */
+ public static long getSecondsFromDate(final int daysSinceEpochUTC)
+ {
+ return LocalDate.ofEpochDay(daysSinceEpochUTC).atStartOfDay(ZONE_ID).toEpochSecond();
+ }
+}
diff --git a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/RowSerde.java b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/RowSerde.java
new file mode 100644
index 000000000000..f10ac0574e9f
--- /dev/null
+++ b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/RowSerde.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.delta.input;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.delta.kernel.client.TableClient;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.defaults.internal.data.DefaultJsonRow;
+import io.delta.kernel.internal.types.TableSchemaSerDe;
+import io.delta.kernel.internal.util.VectorUtils;
+import io.delta.kernel.types.ArrayType;
+import io.delta.kernel.types.BooleanType;
+import io.delta.kernel.types.ByteType;
+import io.delta.kernel.types.DataType;
+import io.delta.kernel.types.DateType;
+import io.delta.kernel.types.DoubleType;
+import io.delta.kernel.types.FloatType;
+import io.delta.kernel.types.IntegerType;
+import io.delta.kernel.types.LongType;
+import io.delta.kernel.types.MapType;
+import io.delta.kernel.types.ShortType;
+import io.delta.kernel.types.StringType;
+import io.delta.kernel.types.StructField;
+import io.delta.kernel.types.StructType;
+import io.delta.kernel.types.TimestampType;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.jackson.DefaultObjectMapper;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Utility class to serialize and deserialize {@link Row} object.
+ * Code borrowed from
+ * RowSerde.java. The only differences between the two classes are the code style and exception handling in
+ * {@link #convertRowToJsonObject}, where we use {@link org.apache.druid.error.DruidException} instead of
+ * {@link UnsupportedOperationException}.
+ *
+ */
+public class RowSerde
+{
+ private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
+
+ private RowSerde()
+ {
+ }
+
+ /**
+ * Utility method to serialize a {@link Row} as a JSON string
+ */
+ public static String serializeRowToJson(Row row)
+ {
+ Map rowObject = convertRowToJsonObject(row);
+ try {
+ Map rowWithSchema = new HashMap<>();
+ rowWithSchema.put("schema", TableSchemaSerDe.toJson(row.getSchema()));
+ rowWithSchema.put("row", rowObject);
+ return OBJECT_MAPPER.writeValueAsString(rowWithSchema);
+ }
+ catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Utility method to deserialize a {@link Row} object from the JSON form.
+ */
+ public static Row deserializeRowFromJson(TableClient tableClient, String jsonRowWithSchema)
+ {
+ try {
+ JsonNode jsonNode = OBJECT_MAPPER.readTree(jsonRowWithSchema);
+ JsonNode schemaNode = jsonNode.get("schema");
+ StructType schema =
+ TableSchemaSerDe.fromJson(tableClient.getJsonHandler(), schemaNode.asText());
+ return parseRowFromJsonWithSchema((ObjectNode) jsonNode.get("row"), schema);
+ }
+ catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static Map convertRowToJsonObject(Row row)
+ {
+ StructType rowType = row.getSchema();
+ Map rowObject = new HashMap<>();
+ for (int fieldId = 0; fieldId < rowType.length(); fieldId++) {
+ StructField field = rowType.at(fieldId);
+ DataType fieldType = field.getDataType();
+ String name = field.getName();
+
+ if (row.isNullAt(fieldId)) {
+ rowObject.put(name, null);
+ continue;
+ }
+
+ Object value;
+ if (fieldType instanceof BooleanType) {
+ value = row.getBoolean(fieldId);
+ } else if (fieldType instanceof ByteType) {
+ value = row.getByte(fieldId);
+ } else if (fieldType instanceof ShortType) {
+ value = row.getShort(fieldId);
+ } else if (fieldType instanceof IntegerType) {
+ value = row.getInt(fieldId);
+ } else if (fieldType instanceof LongType) {
+ value = row.getLong(fieldId);
+ } else if (fieldType instanceof FloatType) {
+ value = row.getFloat(fieldId);
+ } else if (fieldType instanceof DoubleType) {
+ value = row.getDouble(fieldId);
+ } else if (fieldType instanceof DateType) {
+ value = DeltaTimeUtils.getSecondsFromDate(row.getInt(fieldId));
+ } else if (fieldType instanceof TimestampType) {
+ value = DeltaTimeUtils.getMillisFromTimestamp(row.getLong(fieldId));
+ } else if (fieldType instanceof StringType) {
+ value = row.getString(fieldId);
+ } else if (fieldType instanceof ArrayType) {
+ value = VectorUtils.toJavaList(row.getArray(fieldId));
+ } else if (fieldType instanceof MapType) {
+ value = VectorUtils.toJavaMap(row.getMap(fieldId));
+ } else if (fieldType instanceof StructType) {
+ Row subRow = row.getStruct(fieldId);
+ value = convertRowToJsonObject(subRow);
+ } else {
+ throw InvalidInput.exception("Unsupported fieldType[%s] for fieldName[%s]", fieldType, name);
+ }
+
+ rowObject.put(name, value);
+ }
+
+ return rowObject;
+ }
+
+ private static Row parseRowFromJsonWithSchema(ObjectNode rowJsonNode, StructType rowType)
+ {
+ return new DefaultJsonRow(rowJsonNode, rowType);
+ }
+}
diff --git a/extensions-contrib/druid-deltalake-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule b/extensions-contrib/druid-deltalake-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
new file mode 100644
index 000000000000..b2752bd862c6
--- /dev/null
+++ b/extensions-contrib/druid-deltalake-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.druid.delta.common.DeltaLakeDruidModule
\ No newline at end of file
diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputRowTest.java b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputRowTest.java
new file mode 100644
index 000000000000..6f68774dbd61
--- /dev/null
+++ b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputRowTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.delta.input;
+
+import io.delta.kernel.Scan;
+import io.delta.kernel.TableNotFoundException;
+import io.delta.kernel.client.TableClient;
+import io.delta.kernel.data.FilteredColumnarBatch;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.defaults.client.DefaultTableClient;
+import io.delta.kernel.utils.CloseableIterator;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Optional;
+
+public class DeltaInputRowTest
+{
+ @Test
+ public void testDeltaInputRow() throws TableNotFoundException, IOException
+ {
+ final TableClient tableClient = DefaultTableClient.create(new Configuration());
+ final Scan scan = DeltaTestUtils.getScan(tableClient);
+
+ CloseableIterator scanFileIter = scan.getScanFiles(tableClient);
+ int totalRecordCount = 0;
+ while (scanFileIter.hasNext()) {
+ try (CloseableIterator data =
+ Scan.readData(
+ tableClient,
+ scan.getScanState(tableClient),
+ scanFileIter.next().getRows(),
+ Optional.empty()
+ )) {
+ while (data.hasNext()) {
+ FilteredColumnarBatch dataReadResult = data.next();
+ Row next = dataReadResult.getRows().next();
+ DeltaInputRow deltaInputRow = new DeltaInputRow(
+ next,
+ DeltaTestUtils.FULL_SCHEMA
+ );
+ Assert.assertNotNull(deltaInputRow);
+ Assert.assertEquals(DeltaTestUtils.DIMENSIONS, deltaInputRow.getDimensions());
+
+ Map expectedRow = DeltaTestUtils.EXPECTED_ROWS.get(totalRecordCount);
+ for (String key : expectedRow.keySet()) {
+ if (DeltaTestUtils.FULL_SCHEMA.getTimestampSpec().getTimestampColumn().equals(key)) {
+ final long expectedMillis = ((Long) expectedRow.get(key)) * 1000;
+ Assert.assertEquals(expectedMillis, deltaInputRow.getTimestampFromEpoch());
+ } else {
+ Assert.assertEquals(expectedRow.get(key), deltaInputRow.getRaw(key));
+ }
+ }
+ totalRecordCount += 1;
+ }
+ }
+ }
+ Assert.assertEquals(DeltaTestUtils.EXPECTED_ROWS.size(), totalRecordCount);
+ }
+}
diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputSourceTest.java b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputSourceTest.java
new file mode 100644
index 000000000000..24b1096abe1e
--- /dev/null
+++ b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputSourceTest.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.delta.input;
+
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowListPlusRawValues;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.InputSourceReader;
+import org.apache.druid.data.input.InputSplit;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.DruidExceptionMatcher;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.hamcrest.MatcherAssert;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class DeltaInputSourceTest
+{
+ @Before
+ public void setUp()
+ {
+ System.setProperty("user.timezone", "UTC");
+ }
+
+ @Test
+ public void testSampleDeltaTable() throws IOException
+ {
+ final DeltaInputSource deltaInputSource = new DeltaInputSource(DeltaTestUtils.DELTA_TABLE_PATH, null);
+ final InputSourceReader inputSourceReader = deltaInputSource.reader(DeltaTestUtils.FULL_SCHEMA, null, null);
+
+ List actualSampledRows = sampleAllRows(inputSourceReader);
+ Assert.assertEquals(DeltaTestUtils.EXPECTED_ROWS.size(), actualSampledRows.size());
+
+ for (int idx = 0; idx < DeltaTestUtils.EXPECTED_ROWS.size(); idx++) {
+ Map expectedRow = DeltaTestUtils.EXPECTED_ROWS.get(idx);
+ InputRowListPlusRawValues actualSampledRow = actualSampledRows.get(idx);
+ Assert.assertNull(actualSampledRow.getParseException());
+
+ Map actualSampledRawVals = actualSampledRow.getRawValues();
+ Assert.assertNotNull(actualSampledRawVals);
+ Assert.assertNotNull(actualSampledRow.getRawValuesList());
+ Assert.assertEquals(1, actualSampledRow.getRawValuesList().size());
+
+ for (String key : expectedRow.keySet()) {
+ if (DeltaTestUtils.FULL_SCHEMA.getTimestampSpec().getTimestampColumn().equals(key)) {
+ final long expectedMillis = (Long) expectedRow.get(key);
+ Assert.assertEquals(expectedMillis, actualSampledRawVals.get(key));
+ } else {
+ Assert.assertEquals(expectedRow.get(key), actualSampledRawVals.get(key));
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testReadAllDeltaTable() throws IOException
+ {
+ final DeltaInputSource deltaInputSource = new DeltaInputSource(DeltaTestUtils.DELTA_TABLE_PATH, null);
+ final InputSourceReader inputSourceReader = deltaInputSource.reader(
+ DeltaTestUtils.FULL_SCHEMA,
+ null,
+ null
+ );
+ final List actualReadRows = readAllRows(inputSourceReader);
+ validateRows(DeltaTestUtils.EXPECTED_ROWS, actualReadRows, DeltaTestUtils.FULL_SCHEMA);
+ }
+
+ @Test
+ public void testReadAllDeltaTableSubSchema1() throws IOException
+ {
+ final DeltaInputSource deltaInputSource = new DeltaInputSource(DeltaTestUtils.DELTA_TABLE_PATH, null);
+ final InputSourceReader inputSourceReader = deltaInputSource.reader(
+ DeltaTestUtils.SCHEMA_1,
+ null,
+ null
+ );
+ final List actualReadRows = readAllRows(inputSourceReader);
+ validateRows(DeltaTestUtils.EXPECTED_ROWS, actualReadRows, DeltaTestUtils.SCHEMA_1);
+ }
+
+ @Test
+ public void testReadAllDeltaTableWithSubSchema2() throws IOException
+ {
+ final DeltaInputSource deltaInputSource = new DeltaInputSource(DeltaTestUtils.DELTA_TABLE_PATH, null);
+ final InputSourceReader inputSourceReader = deltaInputSource.reader(
+ DeltaTestUtils.SCHEMA_2,
+ null,
+ null
+ );
+ final List actualReadRows = readAllRows(inputSourceReader);
+ validateRows(DeltaTestUtils.EXPECTED_ROWS, actualReadRows, DeltaTestUtils.SCHEMA_2);
+ }
+
+ @Test
+ public void testDeltaLakeWithCreateSplits()
+ {
+ final DeltaInputSource deltaInputSource = new DeltaInputSource(DeltaTestUtils.DELTA_TABLE_PATH, null);
+ final List> splits = deltaInputSource.createSplits(null, null)
+ .collect(Collectors.toList());
+ Assert.assertEquals(DeltaTestUtils.SPLIT_TO_EXPECTED_ROWS.size(), splits.size());
+
+ for (InputSplit split : splits) {
+ final DeltaSplit deltaSplit = split.get();
+ final DeltaInputSource deltaInputSourceWithSplit = new DeltaInputSource(
+ DeltaTestUtils.DELTA_TABLE_PATH,
+ deltaSplit
+ );
+ List> splitsResult = deltaInputSourceWithSplit.createSplits(null, null)
+ .collect(Collectors.toList());
+ Assert.assertEquals(1, splitsResult.size());
+ Assert.assertEquals(deltaSplit, splitsResult.get(0).get());
+ }
+ }
+
+ @Test
+ public void testDeltaLakeWithReadSplits() throws IOException
+ {
+ final DeltaInputSource deltaInputSource = new DeltaInputSource(DeltaTestUtils.DELTA_TABLE_PATH, null);
+ final List> splits = deltaInputSource.createSplits(null, null)
+ .collect(Collectors.toList());
+ Assert.assertEquals(DeltaTestUtils.SPLIT_TO_EXPECTED_ROWS.size(), splits.size());
+
+ for (int idx = 0; idx < splits.size(); idx++) {
+ final InputSplit split = splits.get(idx);
+ final DeltaSplit deltaSplit = split.get();
+ final DeltaInputSource deltaInputSourceWithSplit = new DeltaInputSource(
+ DeltaTestUtils.DELTA_TABLE_PATH,
+ deltaSplit
+ );
+ final InputSourceReader inputSourceReader = deltaInputSourceWithSplit.reader(
+ DeltaTestUtils.FULL_SCHEMA,
+ null,
+ null
+ );
+ final List actualRowsInSplit = readAllRows(inputSourceReader);
+ final List
;
+ case 'delta':
+ return Load data from Delta Lake.
;
+
case 'hdfs':
return Load text based, avro, orc, or parquet data from HDFS.
;
diff --git a/website/.spelling b/website/.spelling
index e878cd2f9a88..0d4aeaf09a8e 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -63,6 +63,7 @@ DRUIDVERSION
DataSketches
DateTime
DateType
+DeltaLakeInputSource
dimensionsSpec
DimensionSpec
DimensionSpecs
@@ -79,6 +80,7 @@ downsamples
downsampling
Dropwizard
dropwizard
+druid-deltalake-extensions
DruidInputSource
DruidSQL
DynamicConfigProvider
@@ -161,6 +163,7 @@ Kerberos
KeyStores
Kinesis
Kubernetes
+Lakehouse
LDAPS
LRU
LZ4
@@ -525,6 +528,7 @@ SVG
symlink
syntaxes
systemFields
+tablePath
tiering
timeseries
Timeseries
@@ -571,6 +575,7 @@ varchar
vectorizable
vectorize
vectorizeVirtualColumns
+versioned
versioning
virtualColumns
w.r.t.
@@ -795,9 +800,11 @@ multi-server
BasicDataSource
LeaderLatch
2.x
-28.x
+28.x
+3.0.x
3.5.x
3.4.x
+3.5.x.
AllowAll
AuthenticationResult
AuthorizationLoadingLookupTest