diff --git a/distribution/pom.xml b/distribution/pom.xml index 1c472144c8bd..dea41117e70f 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -257,6 +257,8 @@ org.apache.druid.extensions:druid-kubernetes-extensions -c org.apache.druid.extensions:druid-catalog + -c + org.apache.druid.extensions:druid-deltalake-extensions ${druid.distribution.pulldeps.opts} @@ -453,6 +455,8 @@ -c org.apache.druid.extensions:druid-iceberg-extensions -c + org.apache.druid.extensions:druid-deltalake-extensions + -c org.apache.druid.extensions.contrib:druid-spectator-histogram diff --git a/docs/development/extensions-contrib/delta-lake.md b/docs/development/extensions-contrib/delta-lake.md new file mode 100644 index 000000000000..8b1de4849718 --- /dev/null +++ b/docs/development/extensions-contrib/delta-lake.md @@ -0,0 +1,44 @@ +--- +id: delta-lake +title: "Delta Lake extension" +--- + + + +## Delta Lake extension + + +Delta Lake is an open source storage framework that enables building a +Lakehouse architecture with various compute engines. [DeltaLakeInputSource](../../ingestion/input-sources.md#delta-lake-input-source) lets +you ingest data stored in a Delta Lake table into Apache Druid. To use the Delta Lake extension, add the `druid-deltalake-extensions` to the list of loaded extensions. +See [Loading extensions](../../configuration/extensions.md#loading-extensions) for more information. + +The Delta input source reads the configured Delta Lake table and extracts all the underlying delta files in the table's latest snapshot. +These Delta Lake files are versioned Parquet files. + +## Version support + +The Delta Lake extension uses the Delta Kernel introduced in Delta Lake 3.0.0, which is compatible with Apache Spark 3.5.x. +Older versions are unsupported, so consider upgrading to Delta Lake 3.0.x or higher to use this extension. + +## Known limitations + +- This extension relies on the Delta Kernel API and can only read from the latest Delta table snapshot. +- Column filtering isn't supported. The extension reads all columns in the configured table. \ No newline at end of file diff --git a/docs/ingestion/input-sources.md b/docs/ingestion/input-sources.md index 8e70ed8b0ef7..635f0b7c5b1f 100644 --- a/docs/ingestion/input-sources.md +++ b/docs/ingestion/input-sources.md @@ -823,6 +823,13 @@ rolled-up datasource `wikipedia_rollup` by grouping on hour, "countryName", and to `true` to enable a compatibility mode where the timestampSpec is ignored. ::: +The [secondary partitioning method](native-batch.md#partitionsspec) determines the requisite number of concurrent worker tasks that run in parallel to complete ingestion with the Combining input source. +Set this value in `maxNumConcurrentSubTasks` in `tuningConfig` based on the secondary partitioning method: +- `range` or `single_dim` partitioning: greater than or equal to 1 +- `hashed` or `dynamic` partitioning: greater than or equal to 2 + +For more information on the `maxNumConcurrentSubTasks` field, see [Implementation considerations](native-batch.md#implementation-considerations). + ## SQL input source The SQL input source is used to read data directly from RDBMS. @@ -925,7 +932,7 @@ The following is an example of a Combining input source spec: ## Iceberg input source :::info - To use the Iceberg input source, add the `druid-iceberg-extensions` extension. +To use the Iceberg input source, load the extension [`druid-iceberg-extensions`](../development/extensions-contrib/iceberg.md). ::: You use the Iceberg input source to read data stored in the Iceberg table format. For a given table, the input source scans up to the latest Iceberg snapshot from the configured Hive catalog. Druid ingests the underlying live data files using the existing input source formats. @@ -1114,11 +1121,31 @@ This input source provides the following filters: `and`, `equals`, `interval`, a |type|Set this value to `not`.|yes| |filter|The iceberg filter on which logical NOT is applied|yes| +## Delta Lake input source +:::info +To use the Delta Lake input source, load the extension [`druid-deltalake-extensions`](../development/extensions-contrib/delta-lake.md). +::: -The [secondary partitioning method](native-batch.md#partitionsspec) determines the requisite number of concurrent worker tasks that run in parallel to complete ingestion with the Combining input source. -Set this value in `maxNumConcurrentSubTasks` in `tuningConfig` based on the secondary partitioning method: -- `range` or `single_dim` partitioning: greater than or equal to 1 -- `hashed` or `dynamic` partitioning: greater than or equal to 2 +You can use the Delta input source to read data stored in a Delta Lake table. For a given table, the input source scans +the latest snapshot from the configured table. Druid ingests the underlying delta files from the table. + +The following is a sample spec: + +```json +... + "ioConfig": { + "type": "index_parallel", + "inputSource": { + "type": "delta", + "tablePath": "/delta-table/directory" + }, + } +} +``` + +| Property|Description|Required| +|---------|-----------|--------| +| type|Set this value to `delta`.|yes| +| tablePath|The location of the Delta table.|yes| -For more information on the `maxNumConcurrentSubTasks` field, see [Implementation considerations](native-batch.md#implementation-considerations). diff --git a/extensions-contrib/druid-deltalake-extensions/pom.xml b/extensions-contrib/druid-deltalake-extensions/pom.xml new file mode 100644 index 000000000000..267d21936928 --- /dev/null +++ b/extensions-contrib/druid-deltalake-extensions/pom.xml @@ -0,0 +1,156 @@ + + + + + org.apache.druid.extensions + druid-deltalake-extensions + druid-deltalake-extensions + Delta Lake connector for Druid + + + druid + org.apache.druid + 30.0.0-SNAPSHOT + ../../pom.xml + + 4.0.0 + + + 3.0.0 + + + + + io.delta + delta-kernel-api + ${delta-kernel.version} + + + + io.delta + delta-kernel-defaults + ${delta-kernel.version} + + + org.apache.hadoop + hadoop-client-api + ${hadoop.compile.version} + compile + + + + org.apache.druid + druid-processing + ${project.parent.version} + provided + + + com.fasterxml.jackson.core + jackson-annotations + provided + + + + com.google.code.findbugs + jsr305 + provided + + + com.google.guava + guava + provided + + + com.google.inject + guice + provided + + + joda-time + joda-time + provided + + + com.fasterxml.jackson.core + jackson-core + provided + + + com.fasterxml.jackson.core + jackson-databind + 2.12.7.1 + + + it.unimi.dsi + fastutil-core + 8.5.4 + provided + + + org.apache.hadoop + hadoop-aws + ${hadoop.compile.version} + runtime + + + com.amazonaws + aws-java-sdk-bundle + + + + + + junit + junit + test + + + org.apache.druid + druid-processing + ${project.parent.version} + test-jar + test + + + org.hamcrest + hamcrest-all + test + + + org.hamcrest + hamcrest-core + test + + + + + + + org.owasp + dependency-check-maven + + true + + + + + diff --git a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/common/DeltaLakeDruidModule.java b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/common/DeltaLakeDruidModule.java new file mode 100644 index 000000000000..70c2b6ff655b --- /dev/null +++ b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/common/DeltaLakeDruidModule.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.delta.common; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.inject.Binder; +import org.apache.druid.delta.input.DeltaInputSource; +import org.apache.druid.error.DruidException; +import org.apache.druid.initialization.DruidModule; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; + +import java.util.Collections; +import java.util.List; + +public class DeltaLakeDruidModule implements DruidModule +{ + @Override + public List getJacksonModules() + { + return Collections.singletonList( + new SimpleModule("DeltaLakeDruidModule") + .registerSubtypes( + new NamedType(DeltaInputSource.class, DeltaInputSource.TYPE_KEY) + ) + ); + } + + @Override + public void configure(Binder binder) + { + final Configuration conf = new Configuration(); + conf.setClassLoader(getClass().getClassLoader()); + + ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); + FileSystem.get(conf); + } + catch (Exception ex) { + throw DruidException.forPersona(DruidException.Persona.DEVELOPER) + .ofCategory(DruidException.Category.UNCATEGORIZED) + .build(ex, "Problem during fileSystem class level initialization"); + } + finally { + Thread.currentThread().setContextClassLoader(currCtxCl); + } + } + +} + diff --git a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputRow.java b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputRow.java new file mode 100644 index 000000000000..acf909452a0b --- /dev/null +++ b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputRow.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.delta.input; + +import io.delta.kernel.types.BinaryType; +import io.delta.kernel.types.BooleanType; +import io.delta.kernel.types.ByteType; +import io.delta.kernel.types.DataType; +import io.delta.kernel.types.DateType; +import io.delta.kernel.types.DecimalType; +import io.delta.kernel.types.DoubleType; +import io.delta.kernel.types.FloatType; +import io.delta.kernel.types.IntegerType; +import io.delta.kernel.types.LongType; +import io.delta.kernel.types.ShortType; +import io.delta.kernel.types.StringType; +import io.delta.kernel.types.StructField; +import io.delta.kernel.types.StructType; +import io.delta.kernel.types.TimestampType; +import it.unimi.dsi.fastutil.objects.Object2IntMap; +import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.Row; +import org.apache.druid.data.input.impl.MapInputRowParser; +import org.apache.druid.error.InvalidInput; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * Encodes the row and schema information from the Delta Lake. + */ +public class DeltaInputRow implements InputRow +{ + private final io.delta.kernel.data.Row row; + private final StructType schema; + private final Object2IntMap fieldNameToOrdinal = new Object2IntOpenHashMap<>(); + private final InputRow delegateRow; + + public DeltaInputRow(io.delta.kernel.data.Row row, InputRowSchema inputRowSchema) + { + this.row = row; + this.schema = row.getSchema(); + List fieldNames = this.schema.fieldNames(); + for (int i = 0; i < fieldNames.size(); ++i) { + fieldNameToOrdinal.put(fieldNames.get(i), i); + } + fieldNameToOrdinal.defaultReturnValue(-1); + + Map theMap = new HashMap<>(); + for (String fieldName : fieldNames) { + theMap.put(fieldName, _getRaw(fieldName)); + } + delegateRow = MapInputRowParser.parse(inputRowSchema, theMap); + } + + @Override + public List getDimensions() + { + return delegateRow.getDimensions(); + } + + @Override + public long getTimestampFromEpoch() + { + return delegateRow.getTimestampFromEpoch(); + } + + @Override + public DateTime getTimestamp() + { + return delegateRow.getTimestamp(); + } + + @Override + public List getDimension(String dimension) + { + return delegateRow.getDimension(dimension); + } + + @Nullable + @Override + public Object getRaw(String dimension) + { + return delegateRow.getRaw(dimension); + } + + @Nullable + @Override + public Number getMetric(String metric) + { + return delegateRow.getMetric(metric); + } + + @Override + public int compareTo(Row o) + { + return this.getTimestamp().compareTo(o.getTimestamp()); + } + + @Override + public boolean equals(Object o) + { + return o instanceof DeltaInputRow && compareTo((DeltaInputRow) o) == 0; + } + + @Override + public int hashCode() + { + return Objects.hash(row, schema, fieldNameToOrdinal, delegateRow); + } + + @Override + public String toString() + { + return "DeltaInputRow{" + + "row=" + row + + ", schema=" + schema + + ", fieldNameToOrdinal=" + fieldNameToOrdinal + + ", delegateRow=" + delegateRow + + '}'; + } + + public Map getRawRowAsMap() + { + return RowSerde.convertRowToJsonObject(row); + } + + @Nullable + private Object _getRaw(String dimension) + { + StructField field = schema.get(dimension); + if (field == null || field.isMetadataColumn()) { + return null; + } + + int ordinal = fieldNameToOrdinal.getInt(dimension); + if (ordinal < 0) { + return null; + } + return getValue(field.getDataType(), row, ordinal); + } + + @Nullable + private static Object getValue(DataType dataType, io.delta.kernel.data.Row dataRow, int columnOrdinal) + { + if (dataRow.isNullAt(columnOrdinal)) { + return null; + } else if (dataType instanceof BooleanType) { + return dataRow.getBoolean(columnOrdinal); + } else if (dataType instanceof ByteType) { + return dataRow.getByte(columnOrdinal); + } else if (dataType instanceof ShortType) { + return dataRow.getShort(columnOrdinal); + } else if (dataType instanceof IntegerType) { + return dataRow.getInt(columnOrdinal); + } else if (dataType instanceof DateType) { + return DeltaTimeUtils.getSecondsFromDate(dataRow.getInt(columnOrdinal)); + } else if (dataType instanceof LongType) { + return dataRow.getLong(columnOrdinal); + } else if (dataType instanceof TimestampType) { + return DeltaTimeUtils.getMillisFromTimestamp(dataRow.getLong(columnOrdinal)); + } else if (dataType instanceof FloatType) { + return dataRow.getFloat(columnOrdinal); + } else if (dataType instanceof DoubleType) { + return dataRow.getDouble(columnOrdinal); + } else if (dataType instanceof StringType) { + return dataRow.getString(columnOrdinal); + } else if (dataType instanceof BinaryType) { + final byte[] arr = dataRow.getBinary(columnOrdinal); + final char[] charArray = new char[arr.length]; + for (int i = 0; i < arr.length; i++) { + charArray[i] = (char) (arr[i]); + } + return String.valueOf(charArray); + } else if (dataType instanceof DecimalType) { + return dataRow.getDecimal(columnOrdinal).longValue(); + } else { + throw InvalidInput.exception( + "Unsupported data type[%s] for fieldName[%s].", + dataType, + dataRow.getSchema().fieldNames().get(columnOrdinal) + ); + } + } +} diff --git a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSource.java b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSource.java new file mode 100644 index 000000000000..936e2dd70d6b --- /dev/null +++ b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSource.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.delta.input; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.Iterators; +import com.google.common.primitives.Ints; +import io.delta.kernel.Scan; +import io.delta.kernel.Snapshot; +import io.delta.kernel.Table; +import io.delta.kernel.TableNotFoundException; +import io.delta.kernel.client.TableClient; +import io.delta.kernel.data.FilteredColumnarBatch; +import io.delta.kernel.data.Row; +import io.delta.kernel.defaults.client.DefaultTableClient; +import io.delta.kernel.internal.util.Utils; +import io.delta.kernel.types.StructField; +import io.delta.kernel.types.StructType; +import io.delta.kernel.utils.CloseableIterator; +import org.apache.druid.data.input.ColumnsFilter; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.InputSource; +import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.SplitHintSpec; +import org.apache.druid.data.input.impl.SplittableInputSource; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.utils.Streams; +import org.apache.hadoop.conf.Configuration; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Input source to ingest data from a Delta Lake. + * This input source reads the latest snapshot from a Delta table specified by {@code tablePath} parameter. + * We leverage the Delta Kernel APIs to interact with a Delta table. The Kernel API abstracts away the + * complexities of the Delta protocol itself. + * Note: currently, the Kernel table API only supports reading from the latest snapshot. + */ +public class DeltaInputSource implements SplittableInputSource +{ + public static final String TYPE_KEY = "delta"; + + @JsonProperty + private final String tablePath; + + @JsonProperty + @Nullable + private final DeltaSplit deltaSplit; + + @JsonCreator + public DeltaInputSource( + @JsonProperty("tablePath") String tablePath, + @JsonProperty("deltaSplit") @Nullable DeltaSplit deltaSplit + ) + { + if (tablePath == null) { + throw InvalidInput.exception("tablePath cannot be null."); + } + this.tablePath = tablePath; + this.deltaSplit = deltaSplit; + } + + @Override + public boolean needsFormat() + { + // Only support Parquet + return false; + } + + /** + * Instantiates a {@link DeltaInputSourceReader} to read the Delta table rows. If a {@link DeltaSplit} is supplied, + * the Delta files and schema are obtained from it to instantiate the reader. Otherwise, a Delta table client is + * instantiated with the supplied configuration to read the table. + * + * @param inputRowSchema schema for {@link org.apache.druid.data.input.InputRow} + * @param inputFormat unused parameter. The input format is always parquet + * @param temporaryDirectory unused parameter + */ + @Override + public InputSourceReader reader( + InputRowSchema inputRowSchema, + @Nullable InputFormat inputFormat, + File temporaryDirectory + ) + { + final TableClient tableClient = createTableClient(); + try { + final Row scanState; + final List scanRowList; + + if (deltaSplit != null) { + scanState = deserialize(tableClient, deltaSplit.getStateRow()); + scanRowList = deltaSplit.getFiles() + .stream() + .map(row -> deserialize(tableClient, row)) + .collect(Collectors.toList()); + } else { + final Table table = Table.forPath(tableClient, tablePath); + final Snapshot latestSnapshot = table.getLatestSnapshot(tableClient); + final StructType prunedSchema = pruneSchema( + latestSnapshot.getSchema(tableClient), + inputRowSchema.getColumnsFilter() + ); + final Scan scan = latestSnapshot.getScanBuilder(tableClient).withReadSchema(tableClient, prunedSchema).build(); + final CloseableIterator scanFiles = scan.getScanFiles(tableClient); + + scanState = scan.getScanState(tableClient); + scanRowList = new ArrayList<>(); + + while (scanFiles.hasNext()) { + final FilteredColumnarBatch scanFileBatch = scanFiles.next(); + final CloseableIterator scanFileRows = scanFileBatch.getRows(); + scanFileRows.forEachRemaining(scanRowList::add); + } + } + return new DeltaInputSourceReader( + Scan.readData( + tableClient, + scanState, + Utils.toCloseableIterator(scanRowList.iterator()), + Optional.empty() + ), + inputRowSchema + ); + } + catch (TableNotFoundException e) { + throw InvalidInput.exception(e, "tablePath[%s] not found.", tablePath); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public Stream> createSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) + { + if (deltaSplit != null) { + // can't split a split + return Stream.of(new InputSplit<>(deltaSplit)); + } + + final TableClient tableClient = createTableClient(); + final Snapshot latestSnapshot; + try { + final Table table = Table.forPath(tableClient, tablePath); + latestSnapshot = table.getLatestSnapshot(tableClient); + } + catch (TableNotFoundException e) { + throw InvalidInput.exception(e, "tablePath[%s] not found.", tablePath); + } + final Scan scan = latestSnapshot.getScanBuilder(tableClient).build(); + // scan files iterator for the current snapshot + final CloseableIterator scanFilesIterator = scan.getScanFiles(tableClient); + + final Row scanState = scan.getScanState(tableClient); + final String scanStateStr = RowSerde.serializeRowToJson(scanState); + + Iterator deltaSplitIterator = Iterators.transform( + scanFilesIterator, + scanFile -> { + final CloseableIterator rows = scanFile.getRows(); + final List fileRows = new ArrayList<>(); + while (rows.hasNext()) { + fileRows.add(RowSerde.serializeRowToJson(rows.next())); + } + return new DeltaSplit(scanStateStr, fileRows); + } + ); + + return Streams.sequentialStreamFrom(deltaSplitIterator).map(InputSplit::new); + } + + @Override + public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) + { + return Ints.checkedCast(createSplits(inputFormat, splitHintSpec).count()); + } + + @Override + public InputSource withSplit(InputSplit split) + { + return new DeltaInputSource( + tablePath, + split.get() + ); + } + + private Row deserialize(TableClient tableClient, String row) + { + return RowSerde.deserializeRowFromJson(tableClient, row); + } + + /** + * Utility method to return a pruned schema that contains the given {@code columns} from + * {@code baseSchema} applied by {@code columnsFilter}. This will serve as an optimization + * for table scans if we're interested in reading only a subset of columns from the Delta Lake table. + */ + private StructType pruneSchema(final StructType baseSchema, final ColumnsFilter columnsFilter) + { + final List columnNames = baseSchema.fieldNames(); + final List fiteredColumnNames = columnNames + .stream() + .filter(columnsFilter::apply) + .collect(Collectors.toList()); + + if (fiteredColumnNames.equals(columnNames)) { + return baseSchema; + } + final List selectedFields = fiteredColumnNames + .stream() + .map(baseSchema::get) + .collect(Collectors.toList()); + return new StructType(selectedFields); + } + + /** + * @return a table client where the client is initialized with {@link Configuration} class that uses the class's + * class loader instead of the context classloader. The latter by default doesn't know about the extension classes, + * so the table client cannot load runtime classes resulting in {@link ClassNotFoundException}. + */ + private TableClient createTableClient() + { + final ClassLoader currCtxClassloader = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); + final Configuration conf = new Configuration(); + return DefaultTableClient.create(conf); + } + finally { + Thread.currentThread().setContextClassLoader(currCtxClassloader); + } + } +} diff --git a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSourceReader.java b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSourceReader.java new file mode 100644 index 000000000000..d0fc4780d001 --- /dev/null +++ b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSourceReader.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.delta.input; + +import io.delta.kernel.data.FilteredColumnarBatch; +import io.delta.kernel.data.Row; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowListPlusRawValues; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.data.input.InputStats; +import org.apache.druid.java.util.common.parsers.CloseableIterator; + +import java.io.IOException; +import java.util.NoSuchElementException; + +/** + * A reader for the Delta Lake input source. It initializes an iterator {@link DeltaInputSourceIterator} + * for a subset of Delta records given by {@link FilteredColumnarBatch} and schema {@link InputRowSchema}. + * + */ +public class DeltaInputSourceReader implements InputSourceReader +{ + private final io.delta.kernel.utils.CloseableIterator filteredColumnarBatchCloseableIterator; + private final InputRowSchema inputRowSchema; + + public DeltaInputSourceReader( + io.delta.kernel.utils.CloseableIterator filteredColumnarBatchCloseableIterator, + InputRowSchema inputRowSchema + ) + { + this.filteredColumnarBatchCloseableIterator = filteredColumnarBatchCloseableIterator; + this.inputRowSchema = inputRowSchema; + } + + @Override + public CloseableIterator read() + { + return new DeltaInputSourceIterator(filteredColumnarBatchCloseableIterator, inputRowSchema); + } + + @Override + public CloseableIterator read(InputStats inputStats) + { + return new DeltaInputSourceIterator(filteredColumnarBatchCloseableIterator, inputRowSchema); + } + + @Override + public CloseableIterator sample() + { + + CloseableIterator inner = read(); + return new CloseableIterator() + { + @Override + public void close() throws IOException + { + inner.close(); + } + + @Override + public boolean hasNext() + { + return inner.hasNext(); + } + + @Override + public InputRowListPlusRawValues next() + { + DeltaInputRow deltaInputRow = (DeltaInputRow) inner.next(); + return InputRowListPlusRawValues.of(deltaInputRow, deltaInputRow.getRawRowAsMap()); + } + }; + } + + private static class DeltaInputSourceIterator implements CloseableIterator + { + private final io.delta.kernel.utils.CloseableIterator filteredColumnarBatchCloseableIterator; + + private io.delta.kernel.utils.CloseableIterator currentBatch = null; + private final InputRowSchema inputRowSchema; + + public DeltaInputSourceIterator( + io.delta.kernel.utils.CloseableIterator filteredColumnarBatchCloseableIterator, + InputRowSchema inputRowSchema + ) + { + this.filteredColumnarBatchCloseableIterator = filteredColumnarBatchCloseableIterator; + this.inputRowSchema = inputRowSchema; + } + + @Override + public boolean hasNext() + { + while (currentBatch == null || !currentBatch.hasNext()) { + if (!filteredColumnarBatchCloseableIterator.hasNext()) { + return false; // No more batches or records to read! + } + currentBatch = filteredColumnarBatchCloseableIterator.next().getRows(); + } + return true; + } + + @Override + public InputRow next() + { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + Row dataRow = currentBatch.next(); + return new DeltaInputRow(dataRow, inputRowSchema); + } + + @Override + public void close() throws IOException + { + filteredColumnarBatchCloseableIterator.close(); + } + } +} diff --git a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaSplit.java b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaSplit.java new file mode 100644 index 000000000000..7ab52cef0897 --- /dev/null +++ b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaSplit.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.delta.input; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; + +/** + * An input split of a Delta table containing the following information: + *
  • + * {@code stateRow} represents the canonical json representation of the latest snapshot of the Delta table. + *
  • + *
  • + * {@code files} represents the list of files from the latest snapshot. + *
  • + */ +public class DeltaSplit +{ + private final String stateRow; + private final List files; + + @JsonCreator + public DeltaSplit( + @JsonProperty("state") final String stateRow, + @JsonProperty("files") final List files + ) + { + this.stateRow = stateRow; + this.files = files; + } + + @JsonProperty("state") + public String getStateRow() + { + return stateRow; + } + + @JsonProperty("files") + public List getFiles() + { + return files; + } + + @Override + public String toString() + { + return "DeltaSplit{" + + "stateRow=" + stateRow + + ", files=" + files + + "}"; + } +} diff --git a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaTimeUtils.java b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaTimeUtils.java new file mode 100644 index 000000000000..f47428944027 --- /dev/null +++ b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaTimeUtils.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.delta.input; + +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZoneOffset; + +public class DeltaTimeUtils +{ + private static final ZoneId ZONE_ID = ZoneId.systemDefault(); + + /** + * {@link io.delta.kernel.types.TimestampType} data in Delta Lake tables is stored internally as the number of + * microseconds since epoch. + * + * @param microSecsSinceEpochUTC microseconds since epoch + * @return Datetime millis correpsonding to {@code microSecsSinceEpochUTC} + */ + public static long getMillisFromTimestamp(final long microSecsSinceEpochUTC) + { + final LocalDateTime dateTime = LocalDateTime.ofEpochSecond( + microSecsSinceEpochUTC / 1_000_000 /* epochSecond */, + (int) (1000 * microSecsSinceEpochUTC % 1_000_000) /* nanoOfSecond */, + ZoneOffset.UTC + ); + return dateTime.atZone(ZONE_ID).toInstant().toEpochMilli(); + } + + /** + * {@link io.delta.kernel.types.DateType} data in Delta Lake tables is stored internally as the number of + * days since epoch. + * + * @param daysSinceEpochUTC number of days since epoch + * @return number of seconds corresponding to {@code daysSinceEpochUTC}. + */ + public static long getSecondsFromDate(final int daysSinceEpochUTC) + { + return LocalDate.ofEpochDay(daysSinceEpochUTC).atStartOfDay(ZONE_ID).toEpochSecond(); + } +} diff --git a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/RowSerde.java b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/RowSerde.java new file mode 100644 index 000000000000..f10ac0574e9f --- /dev/null +++ b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/RowSerde.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.delta.input; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.delta.kernel.client.TableClient; +import io.delta.kernel.data.Row; +import io.delta.kernel.defaults.internal.data.DefaultJsonRow; +import io.delta.kernel.internal.types.TableSchemaSerDe; +import io.delta.kernel.internal.util.VectorUtils; +import io.delta.kernel.types.ArrayType; +import io.delta.kernel.types.BooleanType; +import io.delta.kernel.types.ByteType; +import io.delta.kernel.types.DataType; +import io.delta.kernel.types.DateType; +import io.delta.kernel.types.DoubleType; +import io.delta.kernel.types.FloatType; +import io.delta.kernel.types.IntegerType; +import io.delta.kernel.types.LongType; +import io.delta.kernel.types.MapType; +import io.delta.kernel.types.ShortType; +import io.delta.kernel.types.StringType; +import io.delta.kernel.types.StructField; +import io.delta.kernel.types.StructType; +import io.delta.kernel.types.TimestampType; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.jackson.DefaultObjectMapper; + +import java.util.HashMap; +import java.util.Map; + +/** + * Utility class to serialize and deserialize {@link Row} object. + * Code borrowed from + * RowSerde.java. The only differences between the two classes are the code style and exception handling in + * {@link #convertRowToJsonObject}, where we use {@link org.apache.druid.error.DruidException} instead of + * {@link UnsupportedOperationException}. + * + */ +public class RowSerde +{ + private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper(); + + private RowSerde() + { + } + + /** + * Utility method to serialize a {@link Row} as a JSON string + */ + public static String serializeRowToJson(Row row) + { + Map rowObject = convertRowToJsonObject(row); + try { + Map rowWithSchema = new HashMap<>(); + rowWithSchema.put("schema", TableSchemaSerDe.toJson(row.getSchema())); + rowWithSchema.put("row", rowObject); + return OBJECT_MAPPER.writeValueAsString(rowWithSchema); + } + catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + /** + * Utility method to deserialize a {@link Row} object from the JSON form. + */ + public static Row deserializeRowFromJson(TableClient tableClient, String jsonRowWithSchema) + { + try { + JsonNode jsonNode = OBJECT_MAPPER.readTree(jsonRowWithSchema); + JsonNode schemaNode = jsonNode.get("schema"); + StructType schema = + TableSchemaSerDe.fromJson(tableClient.getJsonHandler(), schemaNode.asText()); + return parseRowFromJsonWithSchema((ObjectNode) jsonNode.get("row"), schema); + } + catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + public static Map convertRowToJsonObject(Row row) + { + StructType rowType = row.getSchema(); + Map rowObject = new HashMap<>(); + for (int fieldId = 0; fieldId < rowType.length(); fieldId++) { + StructField field = rowType.at(fieldId); + DataType fieldType = field.getDataType(); + String name = field.getName(); + + if (row.isNullAt(fieldId)) { + rowObject.put(name, null); + continue; + } + + Object value; + if (fieldType instanceof BooleanType) { + value = row.getBoolean(fieldId); + } else if (fieldType instanceof ByteType) { + value = row.getByte(fieldId); + } else if (fieldType instanceof ShortType) { + value = row.getShort(fieldId); + } else if (fieldType instanceof IntegerType) { + value = row.getInt(fieldId); + } else if (fieldType instanceof LongType) { + value = row.getLong(fieldId); + } else if (fieldType instanceof FloatType) { + value = row.getFloat(fieldId); + } else if (fieldType instanceof DoubleType) { + value = row.getDouble(fieldId); + } else if (fieldType instanceof DateType) { + value = DeltaTimeUtils.getSecondsFromDate(row.getInt(fieldId)); + } else if (fieldType instanceof TimestampType) { + value = DeltaTimeUtils.getMillisFromTimestamp(row.getLong(fieldId)); + } else if (fieldType instanceof StringType) { + value = row.getString(fieldId); + } else if (fieldType instanceof ArrayType) { + value = VectorUtils.toJavaList(row.getArray(fieldId)); + } else if (fieldType instanceof MapType) { + value = VectorUtils.toJavaMap(row.getMap(fieldId)); + } else if (fieldType instanceof StructType) { + Row subRow = row.getStruct(fieldId); + value = convertRowToJsonObject(subRow); + } else { + throw InvalidInput.exception("Unsupported fieldType[%s] for fieldName[%s]", fieldType, name); + } + + rowObject.put(name, value); + } + + return rowObject; + } + + private static Row parseRowFromJsonWithSchema(ObjectNode rowJsonNode, StructType rowType) + { + return new DefaultJsonRow(rowJsonNode, rowType); + } +} diff --git a/extensions-contrib/druid-deltalake-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule b/extensions-contrib/druid-deltalake-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule new file mode 100644 index 000000000000..b2752bd862c6 --- /dev/null +++ b/extensions-contrib/druid-deltalake-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.druid.delta.common.DeltaLakeDruidModule \ No newline at end of file diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputRowTest.java b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputRowTest.java new file mode 100644 index 000000000000..6f68774dbd61 --- /dev/null +++ b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputRowTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.delta.input; + +import io.delta.kernel.Scan; +import io.delta.kernel.TableNotFoundException; +import io.delta.kernel.client.TableClient; +import io.delta.kernel.data.FilteredColumnarBatch; +import io.delta.kernel.data.Row; +import io.delta.kernel.defaults.client.DefaultTableClient; +import io.delta.kernel.utils.CloseableIterator; +import org.apache.hadoop.conf.Configuration; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Map; +import java.util.Optional; + +public class DeltaInputRowTest +{ + @Test + public void testDeltaInputRow() throws TableNotFoundException, IOException + { + final TableClient tableClient = DefaultTableClient.create(new Configuration()); + final Scan scan = DeltaTestUtils.getScan(tableClient); + + CloseableIterator scanFileIter = scan.getScanFiles(tableClient); + int totalRecordCount = 0; + while (scanFileIter.hasNext()) { + try (CloseableIterator data = + Scan.readData( + tableClient, + scan.getScanState(tableClient), + scanFileIter.next().getRows(), + Optional.empty() + )) { + while (data.hasNext()) { + FilteredColumnarBatch dataReadResult = data.next(); + Row next = dataReadResult.getRows().next(); + DeltaInputRow deltaInputRow = new DeltaInputRow( + next, + DeltaTestUtils.FULL_SCHEMA + ); + Assert.assertNotNull(deltaInputRow); + Assert.assertEquals(DeltaTestUtils.DIMENSIONS, deltaInputRow.getDimensions()); + + Map expectedRow = DeltaTestUtils.EXPECTED_ROWS.get(totalRecordCount); + for (String key : expectedRow.keySet()) { + if (DeltaTestUtils.FULL_SCHEMA.getTimestampSpec().getTimestampColumn().equals(key)) { + final long expectedMillis = ((Long) expectedRow.get(key)) * 1000; + Assert.assertEquals(expectedMillis, deltaInputRow.getTimestampFromEpoch()); + } else { + Assert.assertEquals(expectedRow.get(key), deltaInputRow.getRaw(key)); + } + } + totalRecordCount += 1; + } + } + } + Assert.assertEquals(DeltaTestUtils.EXPECTED_ROWS.size(), totalRecordCount); + } +} diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputSourceTest.java b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputSourceTest.java new file mode 100644 index 000000000000..24b1096abe1e --- /dev/null +++ b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputSourceTest.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.delta.input; + +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowListPlusRawValues; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.data.input.InputSplit; +import org.apache.druid.error.DruidException; +import org.apache.druid.error.DruidExceptionMatcher; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.hamcrest.MatcherAssert; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class DeltaInputSourceTest +{ + @Before + public void setUp() + { + System.setProperty("user.timezone", "UTC"); + } + + @Test + public void testSampleDeltaTable() throws IOException + { + final DeltaInputSource deltaInputSource = new DeltaInputSource(DeltaTestUtils.DELTA_TABLE_PATH, null); + final InputSourceReader inputSourceReader = deltaInputSource.reader(DeltaTestUtils.FULL_SCHEMA, null, null); + + List actualSampledRows = sampleAllRows(inputSourceReader); + Assert.assertEquals(DeltaTestUtils.EXPECTED_ROWS.size(), actualSampledRows.size()); + + for (int idx = 0; idx < DeltaTestUtils.EXPECTED_ROWS.size(); idx++) { + Map expectedRow = DeltaTestUtils.EXPECTED_ROWS.get(idx); + InputRowListPlusRawValues actualSampledRow = actualSampledRows.get(idx); + Assert.assertNull(actualSampledRow.getParseException()); + + Map actualSampledRawVals = actualSampledRow.getRawValues(); + Assert.assertNotNull(actualSampledRawVals); + Assert.assertNotNull(actualSampledRow.getRawValuesList()); + Assert.assertEquals(1, actualSampledRow.getRawValuesList().size()); + + for (String key : expectedRow.keySet()) { + if (DeltaTestUtils.FULL_SCHEMA.getTimestampSpec().getTimestampColumn().equals(key)) { + final long expectedMillis = (Long) expectedRow.get(key); + Assert.assertEquals(expectedMillis, actualSampledRawVals.get(key)); + } else { + Assert.assertEquals(expectedRow.get(key), actualSampledRawVals.get(key)); + } + } + } + } + + @Test + public void testReadAllDeltaTable() throws IOException + { + final DeltaInputSource deltaInputSource = new DeltaInputSource(DeltaTestUtils.DELTA_TABLE_PATH, null); + final InputSourceReader inputSourceReader = deltaInputSource.reader( + DeltaTestUtils.FULL_SCHEMA, + null, + null + ); + final List actualReadRows = readAllRows(inputSourceReader); + validateRows(DeltaTestUtils.EXPECTED_ROWS, actualReadRows, DeltaTestUtils.FULL_SCHEMA); + } + + @Test + public void testReadAllDeltaTableSubSchema1() throws IOException + { + final DeltaInputSource deltaInputSource = new DeltaInputSource(DeltaTestUtils.DELTA_TABLE_PATH, null); + final InputSourceReader inputSourceReader = deltaInputSource.reader( + DeltaTestUtils.SCHEMA_1, + null, + null + ); + final List actualReadRows = readAllRows(inputSourceReader); + validateRows(DeltaTestUtils.EXPECTED_ROWS, actualReadRows, DeltaTestUtils.SCHEMA_1); + } + + @Test + public void testReadAllDeltaTableWithSubSchema2() throws IOException + { + final DeltaInputSource deltaInputSource = new DeltaInputSource(DeltaTestUtils.DELTA_TABLE_PATH, null); + final InputSourceReader inputSourceReader = deltaInputSource.reader( + DeltaTestUtils.SCHEMA_2, + null, + null + ); + final List actualReadRows = readAllRows(inputSourceReader); + validateRows(DeltaTestUtils.EXPECTED_ROWS, actualReadRows, DeltaTestUtils.SCHEMA_2); + } + + @Test + public void testDeltaLakeWithCreateSplits() + { + final DeltaInputSource deltaInputSource = new DeltaInputSource(DeltaTestUtils.DELTA_TABLE_PATH, null); + final List> splits = deltaInputSource.createSplits(null, null) + .collect(Collectors.toList()); + Assert.assertEquals(DeltaTestUtils.SPLIT_TO_EXPECTED_ROWS.size(), splits.size()); + + for (InputSplit split : splits) { + final DeltaSplit deltaSplit = split.get(); + final DeltaInputSource deltaInputSourceWithSplit = new DeltaInputSource( + DeltaTestUtils.DELTA_TABLE_PATH, + deltaSplit + ); + List> splitsResult = deltaInputSourceWithSplit.createSplits(null, null) + .collect(Collectors.toList()); + Assert.assertEquals(1, splitsResult.size()); + Assert.assertEquals(deltaSplit, splitsResult.get(0).get()); + } + } + + @Test + public void testDeltaLakeWithReadSplits() throws IOException + { + final DeltaInputSource deltaInputSource = new DeltaInputSource(DeltaTestUtils.DELTA_TABLE_PATH, null); + final List> splits = deltaInputSource.createSplits(null, null) + .collect(Collectors.toList()); + Assert.assertEquals(DeltaTestUtils.SPLIT_TO_EXPECTED_ROWS.size(), splits.size()); + + for (int idx = 0; idx < splits.size(); idx++) { + final InputSplit split = splits.get(idx); + final DeltaSplit deltaSplit = split.get(); + final DeltaInputSource deltaInputSourceWithSplit = new DeltaInputSource( + DeltaTestUtils.DELTA_TABLE_PATH, + deltaSplit + ); + final InputSourceReader inputSourceReader = deltaInputSourceWithSplit.reader( + DeltaTestUtils.FULL_SCHEMA, + null, + null + ); + final List actualRowsInSplit = readAllRows(inputSourceReader); + final List> expectedRowsInSplit = DeltaTestUtils.SPLIT_TO_EXPECTED_ROWS.get(idx); + validateRows(expectedRowsInSplit, actualRowsInSplit, DeltaTestUtils.FULL_SCHEMA); + } + } + + @Test + public void testNullTable() + { + MatcherAssert.assertThat( + Assert.assertThrows( + DruidException.class, + () -> new DeltaInputSource(null, null) + ), + DruidExceptionMatcher.invalidInput().expectMessageIs( + "tablePath cannot be null." + ) + ); + } + + @Test + public void testSplitNonExistentTable() + { + final DeltaInputSource deltaInputSource = new DeltaInputSource("non-existent-table", null); + + MatcherAssert.assertThat( + Assert.assertThrows( + DruidException.class, + () -> deltaInputSource.createSplits(null, null) + ), + DruidExceptionMatcher.invalidInput().expectMessageIs( + "tablePath[non-existent-table] not found." + ) + ); + } + + @Test + public void testReadNonExistentTable() + { + final DeltaInputSource deltaInputSource = new DeltaInputSource("non-existent-table", null); + + MatcherAssert.assertThat( + Assert.assertThrows( + DruidException.class, + () -> deltaInputSource.reader(null, null, null) + ), + DruidExceptionMatcher.invalidInput().expectMessageIs( + "tablePath[non-existent-table] not found." + ) + ); + } + + private List sampleAllRows(InputSourceReader reader) throws IOException + { + List rows = new ArrayList<>(); + try (CloseableIterator iterator = reader.sample()) { + iterator.forEachRemaining(rows::add); + } + return rows; + } + + private List readAllRows(InputSourceReader reader) throws IOException + { + final List rows = new ArrayList<>(); + try (CloseableIterator iterator = reader.read()) { + iterator.forEachRemaining(rows::add); + } + return rows; + } + + private void validateRows( + final List> expectedRows, + final List actualReadRows, + final InputRowSchema schema + ) + { + Assert.assertEquals(expectedRows.size(), actualReadRows.size()); + + for (int idx = 0; idx < expectedRows.size(); idx++) { + final Map expectedRow = expectedRows.get(idx); + final InputRow actualInputRow = actualReadRows.get(idx); + for (String key : expectedRow.keySet()) { + if (!schema.getColumnsFilter().apply(key)) { + Assert.assertNull(actualInputRow.getRaw(key)); + } else { + if (schema.getTimestampSpec().getTimestampColumn().equals(key)) { + final long expectedMillis = (Long) expectedRow.get(key) * 1000; + Assert.assertEquals(expectedMillis, actualInputRow.getTimestampFromEpoch()); + Assert.assertEquals(DateTimes.utc(expectedMillis), actualInputRow.getTimestamp()); + } else { + Assert.assertEquals(expectedRow.get(key), actualInputRow.getRaw(key)); + } + } + } + } + } +} diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaTestUtils.java b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaTestUtils.java new file mode 100644 index 000000000000..180adaefcfb5 --- /dev/null +++ b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaTestUtils.java @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.delta.input; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import io.delta.kernel.Scan; +import io.delta.kernel.ScanBuilder; +import io.delta.kernel.Snapshot; +import io.delta.kernel.Table; +import io.delta.kernel.TableNotFoundException; +import io.delta.kernel.client.TableClient; +import io.delta.kernel.types.StructType; +import org.apache.druid.data.input.ColumnsFilter; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.DoubleDimensionSchema; +import org.apache.druid.data.input.impl.FloatDimensionSchema; +import org.apache.druid.data.input.impl.LongDimensionSchema; +import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.data.input.impl.TimestampSpec; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Refer to extensions-contrib/druid-deltalake-extensions/src/test/resources/README.md to generate the + * sample Delta Lake table used in the unit tests. + */ +public class DeltaTestUtils +{ + /** + * The Delta table path used by unit tests. + */ + public static final String DELTA_TABLE_PATH = "src/test/resources/employee-delta-table"; + /** + * The list of dimensions in the Delta table {@link #DELTA_TABLE_PATH}. + */ + public static final List DIMENSIONS = ImmutableList.of( + "id", + "birthday", + "name", + "age", + "salary", + "bonus", + "yoe", + "is_fulltime", + "last_vacation_time" + ); + + /** + * The expected set of rows from the first checkpoint file {@code DELTA_TABLE_PATH/_delta_log/00000000000000000000.json} + */ + private static final List> SPLIT_0_EXPECTED_ROWS = new ArrayList<>( + ImmutableList.of( + ImmutableMap.of( + "birthday", 1057881600L, + "name", "Employee1", + "id", 867799346L, + "salary", 87642.55209817083, + "age", (short) 20, + "yoe", 4 + ), + ImmutableMap.of( + "birthday", 1035417600L, + "is_fulltime", false, + "name", "Employee2", + "id", 9963151889L, + "salary", 79404.63969727767, + "age", (short) 21, + "yoe", 2 + ), + ImmutableMap.of( + "birthday", 890179200L, + "name", "Employee3", + "id", 2766777393L, + "salary", 92418.21424435009, + "age", (short) 25, + "yoe", 9 + ), + ImmutableMap.of( + "birthday", 1073001600L, + "name", "Employee4", + "id", 6320361986L, + "salary", 97907.76612488469, + "age", (short) 20, + "yoe", 3 + ), + ImmutableMap.of( + "birthday", 823996800L, + "is_fulltime", true, + "bonus", 4982.215f, + "name", "Employee5", + "id", 7068152260L, + "salary", 79037.77202099308, + "last_vacation_time", 1706256972000L, + "age", (short) 27, + "yoe", 9 + ) + ) + ); + + /** + * The expected rows from second checkpoint file {@code DELTA_TABLE_PATH/_delta_log/00000000000000000001.json} + */ + private static final List> SPLIT_1_EXPECTED_ROWS = new ArrayList<>( + ImmutableList.of( + ImmutableMap.of( + "birthday", 937526400L, + "is_fulltime", false, + "name", "Employee1", + "id", 4693651733L, + "salary", 83845.11357786917, + "age", (short) 24, + "yoe", 3 + ), + ImmutableMap.of( + "birthday", 810777600L, + "is_fulltime", false, + "name", "Employee2", + "id", 7132772589L, + "salary", 90140.44051385639, + "age", (short) 28, + "yoe", 8 + ), + ImmutableMap.of( + "birthday", 1104969600L, + "is_fulltime", true, + "bonus", 3699.0881f, + "name", "Employee3", + "id", 6627278510L, + "salary", 58857.27649436368, + "last_vacation_time", 1706458554000L, + "age", (short) 19, + "yoe", 4 + ), + ImmutableMap.of( + "birthday", 763257600L, + "is_fulltime", true, + "bonus", 2334.6675f, + "name", "Employee4", + "id", 4786204912L, + "salary", 93646.81222022788, + "last_vacation_time", 1706390154000L, + "age", (short) 29, + "yoe", 5 + ), + ImmutableMap.of( + "birthday", 1114646400L, + "name", "Employee5", + "id", 2773939764L, + "salary", 66300.05339373322, + "age", (short) 18, + "yoe", 3 + ), + ImmutableMap.of( + "birthday", 913334400L, + "is_fulltime", false, + "name", "Employee6", + "id", 8333438088L, + "salary", 59219.5257906128, + "age", (short) 25, + "yoe", 4 + ), + ImmutableMap.of( + "birthday", 893894400L, + "is_fulltime", false, + "name", "Employee7", + "id", 8397454007L, + "salary", 61909.733851830584, + "age", (short) 25, + "yoe", 8 + ), + ImmutableMap.of( + "birthday", 1038873600L, + "is_fulltime", true, + "bonus", 3000.0154f, + "name", "Employee8", + "id", 8925359945L, + "salary", 76588.05471316943, + "last_vacation_time", 1706195754000L, + "age", (short) 21, + "yoe", 1 + ), + ImmutableMap.of( + "birthday", 989798400L, + "is_fulltime", true, + "bonus", 4463.3833f, + "name", "Employee9", + "id", 8154788551L, + "salary", 59787.98539015684, + "last_vacation_time", 1706181354000L, + "age", (short) 22, + "yoe", 4 + ), + ImmutableMap.of( + "birthday", 912297600L, + "is_fulltime", false, + "name", "Employee10", + "id", 5884382356L, + "salary", 51565.91965119349, + "age", (short) 25, + "yoe", 9 + ) + ) + ); + + /** + * Mapping of checkpoint file identifier to the list of expected rows in that checkpoint. + */ + public static final Map>> SPLIT_TO_EXPECTED_ROWS = new HashMap<>( + ImmutableMap.of( + 0, SPLIT_0_EXPECTED_ROWS, + 1, SPLIT_1_EXPECTED_ROWS + ) + ); + + /** + * Complete set of expected rows across all checkpoint files for {@link #DELTA_TABLE_PATH}. + */ + public static final List> EXPECTED_ROWS = SPLIT_TO_EXPECTED_ROWS.values().stream() + .flatMap(List::stream) + .collect(Collectors.toList()); + + /** + * The Druid schema used for ingestion of {@link #DELTA_TABLE_PATH}. + */ + public static final InputRowSchema FULL_SCHEMA = new InputRowSchema( + new TimestampSpec("birthday", "posix", null), + new DimensionsSpec( + ImmutableList.of( + new LongDimensionSchema("id"), + new LongDimensionSchema("birthday"), + new StringDimensionSchema("name"), + new LongDimensionSchema("age"), + new DoubleDimensionSchema("salary"), + new FloatDimensionSchema("bonus"), + new LongDimensionSchema("yoe"), + new StringDimensionSchema("is_fulltime"), + new LongDimensionSchema("last_vacation_time") + ) + ), + ColumnsFilter.all() + ); + + /** + * Similar to {@link #FULL_SCHEMA}, but with a smaller set of columns with an inclusion filter applied. + */ + public static final InputRowSchema SCHEMA_1 = new InputRowSchema( + new TimestampSpec("birthday", "posix", null), + new DimensionsSpec( + ImmutableList.of( + new LongDimensionSchema("id"), + new LongDimensionSchema("birthday"), + new StringDimensionSchema("name"), + new LongDimensionSchema("age"), + new DoubleDimensionSchema("salary"), + new FloatDimensionSchema("bonus"), + new LongDimensionSchema("yoe"), + new StringDimensionSchema("is_fulltime"), + new LongDimensionSchema("last_vacation_time") + ) + ), + ColumnsFilter.inclusionBased(ImmutableSet.of("id", "birthday", "name", "is_fulltime")) + ); + + /** + * Similar to {@link #FULL_SCHEMA}, but with a smaller set of columns with an exclusion filter applied. A non-existent + * column is added to the exclusion filter - it should silently get thrown away. + */ + public static final InputRowSchema SCHEMA_2 = new InputRowSchema( + new TimestampSpec("birthday", "posix", null), + new DimensionsSpec( + ImmutableList.of( + new LongDimensionSchema("id"), + new LongDimensionSchema("birthday"), + new StringDimensionSchema("name"), + new LongDimensionSchema("age"), + new DoubleDimensionSchema("salary"), + new FloatDimensionSchema("bonus"), + new LongDimensionSchema("yoe"), + new StringDimensionSchema("is_fulltime"), + new LongDimensionSchema("last_vacation_time") + ) + ), + ColumnsFilter.exclusionBased(ImmutableSet.of("last_vacation_time", "bonus", "non_existent_column")) + ); + + /** + * A simple wrapper that builds the table scan for {@link #DELTA_TABLE_PATH} meant to be used in tests. + */ + public static Scan getScan(final TableClient tableClient) throws TableNotFoundException + { + final Table table = Table.forPath(tableClient, DELTA_TABLE_PATH); + final Snapshot snapshot = table.getLatestSnapshot(tableClient); + final StructType readSchema = snapshot.getSchema(tableClient); + final ScanBuilder scanBuilder = snapshot.getScanBuilder(tableClient) + .withReadSchema(tableClient, readSchema); + return scanBuilder.build(); + } +} diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaTimeUtilsTest.java b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaTimeUtilsTest.java new file mode 100644 index 000000000000..de78ed97aa30 --- /dev/null +++ b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaTimeUtilsTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.delta.input; + +import org.apache.druid.java.util.common.Intervals; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.time.Instant; + +public class DeltaTimeUtilsTest +{ + @Before + public void setUp() + { + System.setProperty("user.timezone", "UTC"); + } + + @Test + public void testTimestampValue() + { + Assert.assertEquals( + Instant.parse("2018-02-02T00:28:02.000Z"), + Instant.ofEpochMilli( + DeltaTimeUtils.getMillisFromTimestamp( + Instant.parse("2018-02-02T00:28:02.000Z").toEpochMilli() * 1_000 + ) + ) + ); + + Assert.assertEquals( + Instant.parse("2024-01-31T00:58:03.000Z"), + Instant.ofEpochMilli( + DeltaTimeUtils.getMillisFromTimestamp( + Instant.parse("2024-01-31T00:58:03.002Z").toEpochMilli() * 1_000 + ) + ) + ); + } + + @Test + public void testDateTimeValue() + { + Assert.assertEquals( + Instant.parse("2020-02-01T00:00:00.000Z"), + Instant.ofEpochSecond( + DeltaTimeUtils.getSecondsFromDate( + (int) Intervals.of("1970-01-01/2020-02-01").toDuration().getStandardDays() + ) + ) + ); + + Assert.assertEquals( + Instant.parse("2024-01-01T00:00:00.000Z"), + Instant.ofEpochSecond( + DeltaTimeUtils.getSecondsFromDate( + (int) Intervals.of("1970-01-01/2024-01-01T02:23:00").toDuration().getStandardDays() + ) + ) + ); + } +} diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/RowSerdeTest.java b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/RowSerdeTest.java new file mode 100644 index 000000000000..eb06f532a021 --- /dev/null +++ b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/RowSerdeTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.delta.input; + +import io.delta.kernel.Scan; +import io.delta.kernel.TableNotFoundException; +import io.delta.kernel.data.Row; +import io.delta.kernel.defaults.client.DefaultTableClient; +import org.apache.hadoop.conf.Configuration; +import org.junit.Assert; +import org.junit.Test; + +public class RowSerdeTest +{ + @Test + public void testSerializeDeserializeRoundtrip() throws TableNotFoundException + { + final DefaultTableClient tableClient = DefaultTableClient.create(new Configuration()); + final Scan scan = DeltaTestUtils.getScan(tableClient); + final Row scanState = scan.getScanState(tableClient); + + final String rowJson = RowSerde.serializeRowToJson(scanState); + final Row row = RowSerde.deserializeRowFromJson(tableClient, rowJson); + + Assert.assertEquals(scanState.getSchema(), row.getSchema()); + } + +} diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/README.md b/extensions-contrib/druid-deltalake-extensions/src/test/resources/README.md new file mode 100644 index 000000000000..c524adec4b58 --- /dev/null +++ b/extensions-contrib/druid-deltalake-extensions/src/test/resources/README.md @@ -0,0 +1,68 @@ + + +### Generate Delta Table for Unit Tests + +To test Delta Lake ingestion, use the Python script `create_delta_table.py` to generate a sample Delta table. +Create a conda env `delta_test` with all the requirements specified in `requirements.txt` installed in the +environment: +```shell +conda create --name delta_test --file requirements.txt +``` + +To activate the environment: + +```shell +conda activate delta_test +``` + +From the conda environment, you can run the python script: + +```python +python3 create_delta_table.py +``` + +By default, the script uses `append` mode to generate 10 random records and writes the +Delta table to `resources/employee-delta-table`. You can override the defaults by supplying the command line arguments: + +```shell +python3 create_delta_table.py -h + +usage: create_delta_table.py [-h] [--save_mode {append,overwrite}] [--save_path SAVE_PATH] [--num_records NUM_RECORDS] + +Script to write a Delta Lake table. + +optional arguments: + -h, --help show this help message and exit + --save_mode {append,overwrite} + Specify write mode (append/overwrite) (default: append) + --save_path SAVE_PATH + Save path for Delta table (default: /druid/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table) + --num_records NUM_RECORDS + Specify number of Delta records to write (default: 10) +``` + +The test data in `resources/employee-delta-table` was generated by: +```shell +python3 create_delta_table.py +python3 create_delta_table.py --num_records=5 --save_mode=append +``` + +This creates a total of 15 Delta records across two transactional commits. The resulting Delta table is checked in +to the repo. The expectated rows `DeltaTestUtils.java` are updated accordingly. diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/create_delta_table.py b/extensions-contrib/druid-deltalake-extensions/src/test/resources/create_delta_table.py new file mode 100755 index 000000000000..ab9ec87fb005 --- /dev/null +++ b/extensions-contrib/druid-deltalake-extensions/src/test/resources/create_delta_table.py @@ -0,0 +1,122 @@ +#!/usr/bin/env python3 + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +import argparse +import delta +import pyspark +from pyspark.sql.types import StructType, StructField, ShortType, StringType, TimestampType, LongType, IntegerType, DoubleType, FloatType, DateType, BooleanType +from datetime import datetime, timedelta +import random + + +def config_spark_with_delta_lake(): + builder = ( + pyspark.sql.SparkSession.builder.appName("DeltaLakeApp") + .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") + .config( + "spark.sql.catalog.spark_catalog", + "org.apache.spark.sql.delta.catalog.DeltaCatalog", + ) + ) + spark = delta.configure_spark_with_delta_pip(builder).getOrCreate() + spark.sparkContext.setLogLevel("ERROR") + return spark + + +def create_dataset(num_records): + """ + Generate a mock employee dataset with different datatypes for testing purposes. + + Parameters: + - num_records (int): Number of records to generate. + + Returns: + - Tuple: A tuple containing a list of records and the corresponding schema. + - List of Records: Each record is a tuple representing a row of data. + - StructType: The schema defining the structure of the records. + + Example: + ```python + data, schema = create_dataset(10) + ``` + """ + schema = StructType([ + StructField("id", LongType(), False), + StructField("birthday", DateType(), False), + StructField("name", StringType(), True), + StructField("age", ShortType(), True), + StructField("salary", DoubleType(), True), + StructField("bonus", FloatType(), True), + StructField("yoe", IntegerType(), True), + StructField("is_fulltime", BooleanType(), True), + StructField("last_vacation_time", TimestampType(), True) + ]) + + data = [] + current_date = datetime.now() + + for i in range(num_records): + birthday = current_date - timedelta(days=random.randint(365 * 18, 365 * 30)) + age = (current_date - birthday).days // 365 + is_fulltime = random.choice([True, False, None]) + record = ( + random.randint(1, 10000000000), + birthday, + f"Employee{i+1}", + age, + random.uniform(50000, 100000), + random.uniform(1000, 5000) if is_fulltime else None, + random.randint(1, min(20, age - 15)), + is_fulltime, + datetime.now() - timedelta(hours=random.randint(1, 90)) if is_fulltime else None, + ) + data.append(record) + return data, schema + + +def main(): + parser = argparse.ArgumentParser(description="Script to write a Delta Lake table.", + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + + parser.add_argument('--save_mode', choices=('append', 'overwrite'), default="overwrite", + help="Specify write mode (append/overwrite)") + parser.add_argument('--save_path', default=os.path.join(os.getcwd(), "employee-delta-table"), + help="Save path for Delta table") + parser.add_argument('--num_records', type=int, default=10, + help="Specify number of Delta records to write") + + args = parser.parse_args() + + save_mode = args.save_mode + save_path = args.save_path + num_records = args.num_records + + spark = config_spark_with_delta_lake() + + data, schema = create_dataset(num_records=num_records) + df = spark.createDataFrame(data, schema=schema) + df.write.format("delta").mode(save_mode).save(save_path) + + df.show() + + print(f"Generated Delta records to {save_path} in {save_mode} mode with {num_records} records.") + + +if __name__ == "__main__": + main() diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00000-b17c520a-0c50-4e49-b8e7-46132a57d039-c000.snappy.parquet.crc b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00000-b17c520a-0c50-4e49-b8e7-46132a57d039-c000.snappy.parquet.crc new file mode 100644 index 000000000000..33d75bafdc5f Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00000-b17c520a-0c50-4e49-b8e7-46132a57d039-c000.snappy.parquet.crc differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00000-f0224389-c0df-4dbc-90e5-de1d6a5b5ac6-c000.snappy.parquet.crc b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00000-f0224389-c0df-4dbc-90e5-de1d6a5b5ac6-c000.snappy.parquet.crc new file mode 100644 index 000000000000..3bdcd64c34e3 Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00000-f0224389-c0df-4dbc-90e5-de1d6a5b5ac6-c000.snappy.parquet.crc differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00001-e18dc7d0-db98-40f2-9185-45237f51b9bf-c000.snappy.parquet.crc b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00001-e18dc7d0-db98-40f2-9185-45237f51b9bf-c000.snappy.parquet.crc new file mode 100644 index 000000000000..f7e53b3258b1 Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00001-e18dc7d0-db98-40f2-9185-45237f51b9bf-c000.snappy.parquet.crc differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00001-f5c4b19d-a2b2-4189-8927-97fe1720df8d-c000.snappy.parquet.crc b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00001-f5c4b19d-a2b2-4189-8927-97fe1720df8d-c000.snappy.parquet.crc new file mode 100644 index 000000000000..7de25e042fca Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00001-f5c4b19d-a2b2-4189-8927-97fe1720df8d-c000.snappy.parquet.crc differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00002-b2249397-0232-4a5c-b504-62c7c27702c1-c000.snappy.parquet.crc b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00002-b2249397-0232-4a5c-b504-62c7c27702c1-c000.snappy.parquet.crc new file mode 100644 index 000000000000..e6c6dc8bf9e7 Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00002-b2249397-0232-4a5c-b504-62c7c27702c1-c000.snappy.parquet.crc differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00003-648766cd-8ebd-475a-afbb-44ae0b9cba30-c000.snappy.parquet.crc b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00003-648766cd-8ebd-475a-afbb-44ae0b9cba30-c000.snappy.parquet.crc new file mode 100644 index 000000000000..693057b91264 Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00003-648766cd-8ebd-475a-afbb-44ae0b9cba30-c000.snappy.parquet.crc differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00003-fa1d35b8-bb75-4145-ac40-6ccbc04acc79-c000.snappy.parquet.crc b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00003-fa1d35b8-bb75-4145-ac40-6ccbc04acc79-c000.snappy.parquet.crc new file mode 100644 index 000000000000..81f3070f0233 Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00003-fa1d35b8-bb75-4145-ac40-6ccbc04acc79-c000.snappy.parquet.crc differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00004-d580234a-54cb-43b7-87db-448c67a315df-c000.snappy.parquet.crc b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00004-d580234a-54cb-43b7-87db-448c67a315df-c000.snappy.parquet.crc new file mode 100644 index 000000000000..3a8e19d5f789 Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00004-d580234a-54cb-43b7-87db-448c67a315df-c000.snappy.parquet.crc differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00005-963e7ff5-c414-444c-8984-6baecf6987ee-c000.snappy.parquet.crc b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00005-963e7ff5-c414-444c-8984-6baecf6987ee-c000.snappy.parquet.crc new file mode 100644 index 000000000000..16b0da7f9b25 Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00005-963e7ff5-c414-444c-8984-6baecf6987ee-c000.snappy.parquet.crc differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00005-9ff9b585-5a9e-415f-b28a-a85d960ccb04-c000.snappy.parquet.crc b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00005-9ff9b585-5a9e-415f-b28a-a85d960ccb04-c000.snappy.parquet.crc new file mode 100644 index 000000000000..2f8bfc23a921 Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00005-9ff9b585-5a9e-415f-b28a-a85d960ccb04-c000.snappy.parquet.crc differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00006-78cd057e-faaa-477d-b5fd-d00a857f7e54-c000.snappy.parquet.crc b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00006-78cd057e-faaa-477d-b5fd-d00a857f7e54-c000.snappy.parquet.crc new file mode 100644 index 000000000000..b11e9d379d1b Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00006-78cd057e-faaa-477d-b5fd-d00a857f7e54-c000.snappy.parquet.crc differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00007-00eb0d30-e71e-4092-8ea2-0ee576ca7327-c000.snappy.parquet.crc b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00007-00eb0d30-e71e-4092-8ea2-0ee576ca7327-c000.snappy.parquet.crc new file mode 100644 index 000000000000..a0daef3f2b65 Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00007-00eb0d30-e71e-4092-8ea2-0ee576ca7327-c000.snappy.parquet.crc differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00007-15147217-b81a-45ab-92d4-24d725cc07e1-c000.snappy.parquet.crc b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00007-15147217-b81a-45ab-92d4-24d725cc07e1-c000.snappy.parquet.crc new file mode 100644 index 000000000000..96e2c291bf25 Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00007-15147217-b81a-45ab-92d4-24d725cc07e1-c000.snappy.parquet.crc differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00008-9f526a56-2392-4f1c-8c07-3dac19b12e91-c000.snappy.parquet.crc b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00008-9f526a56-2392-4f1c-8c07-3dac19b12e91-c000.snappy.parquet.crc new file mode 100644 index 000000000000..ba21f2ab4603 Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00008-9f526a56-2392-4f1c-8c07-3dac19b12e91-c000.snappy.parquet.crc differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00009-e21486a5-e177-4c02-b256-bc890fadce7e-c000.snappy.parquet.crc b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00009-e21486a5-e177-4c02-b256-bc890fadce7e-c000.snappy.parquet.crc new file mode 100644 index 000000000000..c55ee1b40bc9 Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00009-e21486a5-e177-4c02-b256-bc890fadce7e-c000.snappy.parquet.crc differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00009-ee9dd918-aaec-4f80-bd63-e369c6335699-c000.snappy.parquet.crc b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00009-ee9dd918-aaec-4f80-bd63-e369c6335699-c000.snappy.parquet.crc new file mode 100644 index 000000000000..57a765a96ee6 Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00009-ee9dd918-aaec-4f80-bd63-e369c6335699-c000.snappy.parquet.crc differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/_delta_log/.00000000000000000000.json.crc b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/_delta_log/.00000000000000000000.json.crc new file mode 100644 index 000000000000..75a1c6db2e35 Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/_delta_log/.00000000000000000000.json.crc differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/_delta_log/.00000000000000000001.json.crc b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/_delta_log/.00000000000000000001.json.crc new file mode 100644 index 000000000000..e026fad26ba6 Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/_delta_log/.00000000000000000001.json.crc differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/_delta_log/00000000000000000000.json b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/_delta_log/00000000000000000000.json new file mode 100644 index 000000000000..4a07c503c436 --- /dev/null +++ b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/_delta_log/00000000000000000000.json @@ -0,0 +1,13 @@ +{"commitInfo":{"timestamp":1706498159640,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"isolationLevel":"Serializable","isBlindAppend":false,"operationMetrics":{"numFiles":"10","numOutputRows":"10","numOutputBytes":"23710"},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.0.0","txnId":"5557f0e2-6cdc-4249-bd19-368bec96751d"}} +{"metaData":{"id":"c29741f1-4649-43a8-b2d9-9d46c42c58e1","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"birthday\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}},{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"age\",\"type\":\"short\",\"nullable\":true,\"metadata\":{}},{\"name\":\"salary\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}},{\"name\":\"bonus\",\"type\":\"float\",\"nullable\":true,\"metadata\":{}},{\"name\":\"yoe\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"is_fulltime\",\"type\":\"boolean\",\"nullable\":true,\"metadata\":{}},{\"name\":\"last_vacation_time\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1706498156768}} +{"protocol":{"minReaderVersion":1,"minWriterVersion":2}} +{"add":{"path":"part-00000-b17c520a-0c50-4e49-b8e7-46132a57d039-c000.snappy.parquet","partitionValues":{},"size":2316,"modificationTime":1706498158981,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":4693651733,\"birthday\":\"1999-09-17\",\"name\":\"Employee1\",\"age\":24,\"salary\":83845.11357786917,\"yoe\":3},\"maxValues\":{\"id\":4693651733,\"birthday\":\"1999-09-17\",\"name\":\"Employee1\",\"age\":24,\"salary\":83845.11357786917,\"yoe\":3},\"nullCount\":{\"id\":0,\"birthday\":0,\"name\":0,\"age\":0,\"salary\":0,\"bonus\":1,\"yoe\":0,\"is_fulltime\":0,\"last_vacation_time\":1}}"}} +{"add":{"path":"part-00001-e18dc7d0-db98-40f2-9185-45237f51b9bf-c000.snappy.parquet","partitionValues":{},"size":2316,"modificationTime":1706498158981,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":7132772589,\"birthday\":\"1995-09-11\",\"name\":\"Employee2\",\"age\":28,\"salary\":90140.44051385639,\"yoe\":8},\"maxValues\":{\"id\":7132772589,\"birthday\":\"1995-09-11\",\"name\":\"Employee2\",\"age\":28,\"salary\":90140.44051385639,\"yoe\":8},\"nullCount\":{\"id\":0,\"birthday\":0,\"name\":0,\"age\":0,\"salary\":0,\"bonus\":1,\"yoe\":0,\"is_fulltime\":0,\"last_vacation_time\":1}}"}} +{"add":{"path":"part-00002-b2249397-0232-4a5c-b504-62c7c27702c1-c000.snappy.parquet","partitionValues":{},"size":2455,"modificationTime":1706498158981,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":6627278510,\"birthday\":\"2005-01-06\",\"name\":\"Employee3\",\"age\":19,\"salary\":58857.27649436368,\"bonus\":3699.0881,\"yoe\":4,\"last_vacation_time\":\"2024-01-28T08:15:54.648-08:00\"},\"maxValues\":{\"id\":6627278510,\"birthday\":\"2005-01-06\",\"name\":\"Employee3\",\"age\":19,\"salary\":58857.27649436368,\"bonus\":3699.0881,\"yoe\":4,\"last_vacation_time\":\"2024-01-28T08:15:54.648-08:00\"},\"nullCount\":{\"id\":0,\"birthday\":0,\"name\":0,\"age\":0,\"salary\":0,\"bonus\":0,\"yoe\":0,\"is_fulltime\":0,\"last_vacation_time\":0}}"}} +{"add":{"path":"part-00003-fa1d35b8-bb75-4145-ac40-6ccbc04acc79-c000.snappy.parquet","partitionValues":{},"size":2454,"modificationTime":1706498158981,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":4786204912,\"birthday\":\"1994-03-10\",\"name\":\"Employee4\",\"age\":29,\"salary\":93646.81222022788,\"bonus\":2334.6675,\"yoe\":5,\"last_vacation_time\":\"2024-01-27T13:15:54.648-08:00\"},\"maxValues\":{\"id\":4786204912,\"birthday\":\"1994-03-10\",\"name\":\"Employee4\",\"age\":29,\"salary\":93646.81222022788,\"bonus\":2334.6675,\"yoe\":5,\"last_vacation_time\":\"2024-01-27T13:15:54.648-08:00\"},\"nullCount\":{\"id\":0,\"birthday\":0,\"name\":0,\"age\":0,\"salary\":0,\"bonus\":0,\"yoe\":0,\"is_fulltime\":0,\"last_vacation_time\":0}}"}} +{"add":{"path":"part-00004-d580234a-54cb-43b7-87db-448c67a315df-c000.snappy.parquet","partitionValues":{},"size":2302,"modificationTime":1706498158981,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":2773939764,\"birthday\":\"2005-04-28\",\"name\":\"Employee5\",\"age\":18,\"salary\":66300.05339373322,\"yoe\":3},\"maxValues\":{\"id\":2773939764,\"birthday\":\"2005-04-28\",\"name\":\"Employee5\",\"age\":18,\"salary\":66300.05339373322,\"yoe\":3},\"nullCount\":{\"id\":0,\"birthday\":0,\"name\":0,\"age\":0,\"salary\":0,\"bonus\":1,\"yoe\":0,\"is_fulltime\":1,\"last_vacation_time\":1}}"}} +{"add":{"path":"part-00005-963e7ff5-c414-444c-8984-6baecf6987ee-c000.snappy.parquet","partitionValues":{},"size":2316,"modificationTime":1706498158981,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":8333438088,\"birthday\":\"1998-12-11\",\"name\":\"Employee6\",\"age\":25,\"salary\":59219.5257906128,\"yoe\":4},\"maxValues\":{\"id\":8333438088,\"birthday\":\"1998-12-11\",\"name\":\"Employee6\",\"age\":25,\"salary\":59219.5257906128,\"yoe\":4},\"nullCount\":{\"id\":0,\"birthday\":0,\"name\":0,\"age\":0,\"salary\":0,\"bonus\":1,\"yoe\":0,\"is_fulltime\":0,\"last_vacation_time\":1}}"}} +{"add":{"path":"part-00006-78cd057e-faaa-477d-b5fd-d00a857f7e54-c000.snappy.parquet","partitionValues":{},"size":2317,"modificationTime":1706498158981,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":8397454007,\"birthday\":\"1998-04-30\",\"name\":\"Employee7\",\"age\":25,\"salary\":61909.733851830584,\"yoe\":8},\"maxValues\":{\"id\":8397454007,\"birthday\":\"1998-04-30\",\"name\":\"Employee7\",\"age\":25,\"salary\":61909.733851830584,\"yoe\":8},\"nullCount\":{\"id\":0,\"birthday\":0,\"name\":0,\"age\":0,\"salary\":0,\"bonus\":1,\"yoe\":0,\"is_fulltime\":0,\"last_vacation_time\":1}}"}} +{"add":{"path":"part-00007-00eb0d30-e71e-4092-8ea2-0ee576ca7327-c000.snappy.parquet","partitionValues":{},"size":2455,"modificationTime":1706498158981,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":8925359945,\"birthday\":\"2002-12-03\",\"name\":\"Employee8\",\"age\":21,\"salary\":76588.05471316943,\"bonus\":3000.0154,\"yoe\":1,\"last_vacation_time\":\"2024-01-25T07:15:54.648-08:00\"},\"maxValues\":{\"id\":8925359945,\"birthday\":\"2002-12-03\",\"name\":\"Employee8\",\"age\":21,\"salary\":76588.05471316943,\"bonus\":3000.0154,\"yoe\":1,\"last_vacation_time\":\"2024-01-25T07:15:54.648-08:00\"},\"nullCount\":{\"id\":0,\"birthday\":0,\"name\":0,\"age\":0,\"salary\":0,\"bonus\":0,\"yoe\":0,\"is_fulltime\":0,\"last_vacation_time\":0}}"}} +{"add":{"path":"part-00008-9f526a56-2392-4f1c-8c07-3dac19b12e91-c000.snappy.parquet","partitionValues":{},"size":2455,"modificationTime":1706498158981,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":8154788551,\"birthday\":\"2001-05-14\",\"name\":\"Employee9\",\"age\":22,\"salary\":59787.98539015684,\"bonus\":4463.3833,\"yoe\":4,\"last_vacation_time\":\"2024-01-25T03:15:54.648-08:00\"},\"maxValues\":{\"id\":8154788551,\"birthday\":\"2001-05-14\",\"name\":\"Employee9\",\"age\":22,\"salary\":59787.98539015684,\"bonus\":4463.3833,\"yoe\":4,\"last_vacation_time\":\"2024-01-25T03:15:54.648-08:00\"},\"nullCount\":{\"id\":0,\"birthday\":0,\"name\":0,\"age\":0,\"salary\":0,\"bonus\":0,\"yoe\":0,\"is_fulltime\":0,\"last_vacation_time\":0}}"}} +{"add":{"path":"part-00009-ee9dd918-aaec-4f80-bd63-e369c6335699-c000.snappy.parquet","partitionValues":{},"size":2324,"modificationTime":1706498158981,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":5884382356,\"birthday\":\"1998-11-29\",\"name\":\"Employee10\",\"age\":25,\"salary\":51565.91965119349,\"yoe\":9},\"maxValues\":{\"id\":5884382356,\"birthday\":\"1998-11-29\",\"name\":\"Employee10\",\"age\":25,\"salary\":51565.91965119349,\"yoe\":9},\"nullCount\":{\"id\":0,\"birthday\":0,\"name\":0,\"age\":0,\"salary\":0,\"bonus\":1,\"yoe\":0,\"is_fulltime\":0,\"last_vacation_time\":1}}"}} diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/_delta_log/00000000000000000001.json b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/_delta_log/00000000000000000001.json new file mode 100644 index 000000000000..188f2417e6a9 --- /dev/null +++ b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/_delta_log/00000000000000000001.json @@ -0,0 +1,6 @@ +{"commitInfo":{"timestamp":1706498178162,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"6","numOutputRows":"5","numOutputBytes":"12655"},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.0.0","txnId":"38049998-65f7-4ed6-8d48-e51c5da502c6"}} +{"add":{"path":"part-00001-f5c4b19d-a2b2-4189-8927-97fe1720df8d-c000.snappy.parquet","partitionValues":{},"size":2301,"modificationTime":1706498176452,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":867799346,\"birthday\":\"2003-07-11\",\"name\":\"Employee1\",\"age\":20,\"salary\":87642.55209817083,\"yoe\":4},\"maxValues\":{\"id\":867799346,\"birthday\":\"2003-07-11\",\"name\":\"Employee1\",\"age\":20,\"salary\":87642.55209817083,\"yoe\":4},\"nullCount\":{\"id\":0,\"birthday\":0,\"name\":0,\"age\":0,\"salary\":0,\"bonus\":1,\"yoe\":0,\"is_fulltime\":1,\"last_vacation_time\":1}}"}} +{"add":{"path":"part-00003-648766cd-8ebd-475a-afbb-44ae0b9cba30-c000.snappy.parquet","partitionValues":{},"size":2317,"modificationTime":1706498176452,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":9963151889,\"birthday\":\"2002-10-24\",\"name\":\"Employee2\",\"age\":21,\"salary\":79404.63969727767,\"yoe\":2},\"maxValues\":{\"id\":9963151889,\"birthday\":\"2002-10-24\",\"name\":\"Employee2\",\"age\":21,\"salary\":79404.63969727767,\"yoe\":2},\"nullCount\":{\"id\":0,\"birthday\":0,\"name\":0,\"age\":0,\"salary\":0,\"bonus\":1,\"yoe\":0,\"is_fulltime\":0,\"last_vacation_time\":1}}"}} +{"add":{"path":"part-00005-9ff9b585-5a9e-415f-b28a-a85d960ccb04-c000.snappy.parquet","partitionValues":{},"size":2302,"modificationTime":1706498176452,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":2766777393,\"birthday\":\"1998-03-18\",\"name\":\"Employee3\",\"age\":25,\"salary\":92418.21424435009,\"yoe\":9},\"maxValues\":{\"id\":2766777393,\"birthday\":\"1998-03-18\",\"name\":\"Employee3\",\"age\":25,\"salary\":92418.21424435009,\"yoe\":9},\"nullCount\":{\"id\":0,\"birthday\":0,\"name\":0,\"age\":0,\"salary\":0,\"bonus\":1,\"yoe\":0,\"is_fulltime\":1,\"last_vacation_time\":1}}"}} +{"add":{"path":"part-00007-15147217-b81a-45ab-92d4-24d725cc07e1-c000.snappy.parquet","partitionValues":{},"size":2302,"modificationTime":1706498176452,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":6320361986,\"birthday\":\"2004-01-02\",\"name\":\"Employee4\",\"age\":20,\"salary\":97907.76612488469,\"yoe\":3},\"maxValues\":{\"id\":6320361986,\"birthday\":\"2004-01-02\",\"name\":\"Employee4\",\"age\":20,\"salary\":97907.76612488469,\"yoe\":3},\"nullCount\":{\"id\":0,\"birthday\":0,\"name\":0,\"age\":0,\"salary\":0,\"bonus\":1,\"yoe\":0,\"is_fulltime\":1,\"last_vacation_time\":1}}"}} +{"add":{"path":"part-00009-e21486a5-e177-4c02-b256-bc890fadce7e-c000.snappy.parquet","partitionValues":{},"size":2454,"modificationTime":1706498176452,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":7068152260,\"birthday\":\"1996-02-11\",\"name\":\"Employee5\",\"age\":27,\"salary\":79037.77202099308,\"bonus\":4982.215,\"yoe\":9,\"last_vacation_time\":\"2024-01-26T00:16:12.196-08:00\"},\"maxValues\":{\"id\":7068152260,\"birthday\":\"1996-02-11\",\"name\":\"Employee5\",\"age\":27,\"salary\":79037.77202099308,\"bonus\":4982.215,\"yoe\":9,\"last_vacation_time\":\"2024-01-26T00:16:12.196-08:00\"},\"nullCount\":{\"id\":0,\"birthday\":0,\"name\":0,\"age\":0,\"salary\":0,\"bonus\":0,\"yoe\":0,\"is_fulltime\":0,\"last_vacation_time\":0}}"}} diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00000-b17c520a-0c50-4e49-b8e7-46132a57d039-c000.snappy.parquet b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00000-b17c520a-0c50-4e49-b8e7-46132a57d039-c000.snappy.parquet new file mode 100644 index 000000000000..d88822158908 Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00000-b17c520a-0c50-4e49-b8e7-46132a57d039-c000.snappy.parquet differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00000-f0224389-c0df-4dbc-90e5-de1d6a5b5ac6-c000.snappy.parquet b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00000-f0224389-c0df-4dbc-90e5-de1d6a5b5ac6-c000.snappy.parquet new file mode 100644 index 000000000000..f007f82b3b6e Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00000-f0224389-c0df-4dbc-90e5-de1d6a5b5ac6-c000.snappy.parquet differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00001-e18dc7d0-db98-40f2-9185-45237f51b9bf-c000.snappy.parquet b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00001-e18dc7d0-db98-40f2-9185-45237f51b9bf-c000.snappy.parquet new file mode 100644 index 000000000000..549b403d1535 Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00001-e18dc7d0-db98-40f2-9185-45237f51b9bf-c000.snappy.parquet differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00001-f5c4b19d-a2b2-4189-8927-97fe1720df8d-c000.snappy.parquet b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00001-f5c4b19d-a2b2-4189-8927-97fe1720df8d-c000.snappy.parquet new file mode 100644 index 000000000000..006b8482ac8e Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00001-f5c4b19d-a2b2-4189-8927-97fe1720df8d-c000.snappy.parquet differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00002-b2249397-0232-4a5c-b504-62c7c27702c1-c000.snappy.parquet b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00002-b2249397-0232-4a5c-b504-62c7c27702c1-c000.snappy.parquet new file mode 100644 index 000000000000..d3acfdf32fe0 Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00002-b2249397-0232-4a5c-b504-62c7c27702c1-c000.snappy.parquet differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00003-648766cd-8ebd-475a-afbb-44ae0b9cba30-c000.snappy.parquet b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00003-648766cd-8ebd-475a-afbb-44ae0b9cba30-c000.snappy.parquet new file mode 100644 index 000000000000..d76352414b3e Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00003-648766cd-8ebd-475a-afbb-44ae0b9cba30-c000.snappy.parquet differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00003-fa1d35b8-bb75-4145-ac40-6ccbc04acc79-c000.snappy.parquet b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00003-fa1d35b8-bb75-4145-ac40-6ccbc04acc79-c000.snappy.parquet new file mode 100644 index 000000000000..19fc8efe4f42 Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00003-fa1d35b8-bb75-4145-ac40-6ccbc04acc79-c000.snappy.parquet differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00004-d580234a-54cb-43b7-87db-448c67a315df-c000.snappy.parquet b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00004-d580234a-54cb-43b7-87db-448c67a315df-c000.snappy.parquet new file mode 100644 index 000000000000..3490b5662134 Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00004-d580234a-54cb-43b7-87db-448c67a315df-c000.snappy.parquet differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00005-963e7ff5-c414-444c-8984-6baecf6987ee-c000.snappy.parquet b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00005-963e7ff5-c414-444c-8984-6baecf6987ee-c000.snappy.parquet new file mode 100644 index 000000000000..bdf5e95f1684 Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00005-963e7ff5-c414-444c-8984-6baecf6987ee-c000.snappy.parquet differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00005-9ff9b585-5a9e-415f-b28a-a85d960ccb04-c000.snappy.parquet b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00005-9ff9b585-5a9e-415f-b28a-a85d960ccb04-c000.snappy.parquet new file mode 100644 index 000000000000..3cbe3b65a878 Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00005-9ff9b585-5a9e-415f-b28a-a85d960ccb04-c000.snappy.parquet differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00006-78cd057e-faaa-477d-b5fd-d00a857f7e54-c000.snappy.parquet b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00006-78cd057e-faaa-477d-b5fd-d00a857f7e54-c000.snappy.parquet new file mode 100644 index 000000000000..ec97497ff599 Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00006-78cd057e-faaa-477d-b5fd-d00a857f7e54-c000.snappy.parquet differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00007-00eb0d30-e71e-4092-8ea2-0ee576ca7327-c000.snappy.parquet b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00007-00eb0d30-e71e-4092-8ea2-0ee576ca7327-c000.snappy.parquet new file mode 100644 index 000000000000..6145827b330f Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00007-00eb0d30-e71e-4092-8ea2-0ee576ca7327-c000.snappy.parquet differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00007-15147217-b81a-45ab-92d4-24d725cc07e1-c000.snappy.parquet b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00007-15147217-b81a-45ab-92d4-24d725cc07e1-c000.snappy.parquet new file mode 100644 index 000000000000..858a8a10b183 Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00007-15147217-b81a-45ab-92d4-24d725cc07e1-c000.snappy.parquet differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00008-9f526a56-2392-4f1c-8c07-3dac19b12e91-c000.snappy.parquet b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00008-9f526a56-2392-4f1c-8c07-3dac19b12e91-c000.snappy.parquet new file mode 100644 index 000000000000..fa13a5f9eaf2 Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00008-9f526a56-2392-4f1c-8c07-3dac19b12e91-c000.snappy.parquet differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00009-e21486a5-e177-4c02-b256-bc890fadce7e-c000.snappy.parquet b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00009-e21486a5-e177-4c02-b256-bc890fadce7e-c000.snappy.parquet new file mode 100644 index 000000000000..13863ff97312 Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00009-e21486a5-e177-4c02-b256-bc890fadce7e-c000.snappy.parquet differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00009-ee9dd918-aaec-4f80-bd63-e369c6335699-c000.snappy.parquet b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00009-ee9dd918-aaec-4f80-bd63-e369c6335699-c000.snappy.parquet new file mode 100644 index 000000000000..1f3145483862 Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00009-ee9dd918-aaec-4f80-bd63-e369c6335699-c000.snappy.parquet differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/requirements.txt b/extensions-contrib/druid-deltalake-extensions/src/test/resources/requirements.txt new file mode 100644 index 000000000000..8a846d26385d --- /dev/null +++ b/extensions-contrib/druid-deltalake-extensions/src/test/resources/requirements.txt @@ -0,0 +1,2 @@ +delta-spark==3.0.0 +pyspark==3.5.0 \ No newline at end of file diff --git a/pom.xml b/pom.xml index 795123799855..0a0924cfd952 100644 --- a/pom.xml +++ b/pom.xml @@ -230,7 +230,9 @@ extensions-contrib/opentelemetry-emitter extensions-contrib/kubernetes-overlord-extensions extensions-contrib/druid-iceberg-extensions + extensions-contrib/druid-deltalake-extensions extensions-contrib/spectator-histogram + distribution diff --git a/web-console/assets/delta.png b/web-console/assets/delta.png new file mode 100644 index 000000000000..db535506c7b9 Binary files /dev/null and b/web-console/assets/delta.png differ diff --git a/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx b/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx index 41f77593d28a..6bf10232e89a 100644 --- a/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx +++ b/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx @@ -98,6 +98,7 @@ export type IngestionComboType = | 'index_parallel:inline' | 'index_parallel:s3' | 'index_parallel:azure' + | 'index_parallel:delta' | 'index_parallel:google' | 'index_parallel:hdfs'; @@ -136,6 +137,7 @@ export function getIngestionComboType( switch (inputSource.type) { case 'local': case 'http': + case 'delta': case 'druid': case 'inline': case 's3': @@ -170,6 +172,9 @@ export function getIngestionTitle(ingestionType: IngestionComboTypeWithExtra): s case 'index_parallel:azure': return 'Azure Data Lake'; + case 'index_parallel:delta': + return 'Delta Lake'; + case 'index_parallel:google': return 'Google Cloud Storage'; @@ -462,7 +467,7 @@ export function getIoConfigFormFields(ingestionComboType: IngestionComboType): F name: 'inputSource.type', label: 'Source type', type: 'string', - suggestions: ['local', 'http', 'inline', 's3', 'azure', 'google', 'hdfs'], + suggestions: ['local', 'http', 'inline', 'delta', 's3', 'azure', 'google', 'hdfs'], info: (

    Druid connects to raw data through{' '} @@ -895,6 +900,18 @@ export function getIoConfigFormFields(ingestionComboType: IngestionComboType): F inputSourceFilter, ]; + case 'index_parallel:delta': + return [ + inputSourceType, + { + name: 'inputSource.tablePath', + label: 'Delta table path', + type: 'string', + placeholder: '/path/to/deltaTable', + required: true, + }, + ]; + case 'index_parallel:hdfs': return [ inputSourceType, @@ -1085,6 +1102,7 @@ export function getIoConfigTuningFormFields( case 'index_parallel:s3': case 'index_parallel:azure': case 'index_parallel:google': + case 'index_parallel:delta': case 'index_parallel:hdfs': return [ { diff --git a/web-console/src/druid-models/input-source/input-source.tsx b/web-console/src/druid-models/input-source/input-source.tsx index 4e0647320e50..01b111b957f9 100644 --- a/web-console/src/druid-models/input-source/input-source.tsx +++ b/web-console/src/druid-models/input-source/input-source.tsx @@ -55,6 +55,9 @@ export interface InputSource { // inline data?: string; + // delta + tablePath?: string; + // hdfs paths?: string | string[]; @@ -111,6 +114,10 @@ export type InputSourceDesc = type: 'hdfs'; paths?: string | string[]; } + | { + type: 'delta'; + tablePath?: string; + } | { type: 'sql'; database: any; @@ -158,6 +165,12 @@ export function issueWithInputSource(inputSource: InputSource | undefined): stri } return; + case 'delta': + if (!inputSource.tablePath) { + return 'must have tablePath'; + } + return; + case 'hdfs': if (!inputSource.paths) { return 'must have paths'; @@ -169,7 +182,18 @@ export function issueWithInputSource(inputSource: InputSource | undefined): stri } } -const KNOWN_TYPES = ['inline', 'druid', 'http', 'local', 's3', 'azure', 'google', 'hdfs', 'sql']; +const KNOWN_TYPES = [ + 'inline', + 'druid', + 'http', + 'local', + 's3', + 'azure', + 'delta', + 'google', + 'hdfs', + 'sql', +]; export const INPUT_SOURCE_FIELDS: Field[] = [ // inline @@ -574,6 +598,16 @@ export const INPUT_SOURCE_FIELDS: Field[] = [ required: true, }, + // delta lake + { + name: 'tablePath', + label: 'Delta table path', + type: 'string', + placeholder: '/path/to/deltaTable', + defined: typeIsKnown(KNOWN_TYPES, 'delta'), + required: true, + }, + // sql { name: 'database.type', diff --git a/web-console/src/views/load-data-view/__snapshots__/load-data-view.spec.tsx.snap b/web-console/src/views/load-data-view/__snapshots__/load-data-view.spec.tsx.snap index f5956786afd9..b0a81a1800ac 100644 --- a/web-console/src/views/load-data-view/__snapshots__/load-data-view.spec.tsx.snap +++ b/web-console/src/views/load-data-view/__snapshots__/load-data-view.spec.tsx.snap @@ -202,6 +202,20 @@ exports[`LoadDataView matches snapshot batch 1`] = ` Google Cloud Storage

    + + Ingestion tile for index_parallel:delta +

    + Delta Lake +

    +
    ); + case 'index_parallel:delta': + return ( + <> +

    Load data from Delta Lake.

    +

    Data must be stored in the Delta Lake format.

    + + ); + case 'index_parallel:druid': return ( <> @@ -1009,6 +1018,7 @@ export class LoadDataView extends React.PureComponentLoad text based, avro, orc, or parquet data from the Google Blobstore.

    ; + case 'delta': + return

    Load data from Delta Lake.

    ; + case 'hdfs': return

    Load text based, avro, orc, or parquet data from HDFS.

    ; diff --git a/website/.spelling b/website/.spelling index e878cd2f9a88..0d4aeaf09a8e 100644 --- a/website/.spelling +++ b/website/.spelling @@ -63,6 +63,7 @@ DRUIDVERSION DataSketches DateTime DateType +DeltaLakeInputSource dimensionsSpec DimensionSpec DimensionSpecs @@ -79,6 +80,7 @@ downsamples downsampling Dropwizard dropwizard +druid-deltalake-extensions DruidInputSource DruidSQL DynamicConfigProvider @@ -161,6 +163,7 @@ Kerberos KeyStores Kinesis Kubernetes +Lakehouse LDAPS LRU LZ4 @@ -525,6 +528,7 @@ SVG symlink syntaxes systemFields +tablePath tiering timeseries Timeseries @@ -571,6 +575,7 @@ varchar vectorizable vectorize vectorizeVirtualColumns +versioned versioning virtualColumns w.r.t. @@ -795,9 +800,11 @@ multi-server BasicDataSource LeaderLatch 2.x -28.x +28.x +3.0.x 3.5.x 3.4.x +3.5.x. AllowAll AuthenticationResult AuthorizationLoadingLookupTest