From 5f9b4e25d3d391260d07ee7b655907cd0c3777ee Mon Sep 17 00:00:00 2001 From: Venki Korukanti Date: Fri, 21 Jul 2023 01:58:12 -0700 Subject: [PATCH 1/4] Add examples for Delta Kernel API usage --- kernel/examples/run_examples.sh | 49 ++++ kernel/examples/table-reader/pom.xml | 79 +++++ .../kernel/examples/BaseTableReader.java | 193 ++++++++++++ .../examples/MultiThreadedTableReader.java | 274 ++++++++++++++++++ .../examples/SingleThreadedTableReader.java | 126 ++++++++ .../delta/kernel/examples/utils/RowSerDe.java | 152 ++++++++++ 6 files changed, 873 insertions(+) create mode 100755 kernel/examples/run_examples.sh create mode 100644 kernel/examples/table-reader/pom.xml create mode 100644 kernel/examples/table-reader/src/main/java/io/delta/kernel/examples/BaseTableReader.java create mode 100644 kernel/examples/table-reader/src/main/java/io/delta/kernel/examples/MultiThreadedTableReader.java create mode 100644 kernel/examples/table-reader/src/main/java/io/delta/kernel/examples/SingleThreadedTableReader.java create mode 100644 kernel/examples/table-reader/src/main/java/io/delta/kernel/examples/utils/RowSerDe.java diff --git a/kernel/examples/run_examples.sh b/kernel/examples/run_examples.sh new file mode 100755 index 00000000000..f9b289116ea --- /dev/null +++ b/kernel/examples/run_examples.sh @@ -0,0 +1,49 @@ +## +## This script runs the Delta Kernel example programs using the golden +## tables located in the /connectors/golden-tables/src/main/resources/golden +## directory. +## +## Make sure to run this script from in order for the relative +## paths used for referring to the golden tables work. + +BASEDIR=`pwd` +echo $BASEDIR +GOLDEN_TABLE_DIR="${BASEDIR}/connectors//golden-tables/src/main/resources/golden/" + +cd kernel/examples/table-reader + +SINGLE_THREAD_READER="io.delta.kernel.examples.SingleThreadedTableReader" +MULTI_THREADED_READER="io.delta.kernel.examples.MultiThreadedTableReader" + +declare -a tests_single_threaded=( + "--table=${GOLDEN_TABLE_DIR}/data-reader-primitives --columns=as_int,as_long --limit=5" + "--table=${GOLDEN_TABLE_DIR}/data-reader-primitives --columns=as_int,as_long,as_double,as_string --limit=20" + "--table=${GOLDEN_TABLE_DIR}/data-reader-partition-values --columns=as_string,as_byte,as_list_of_records,as_nested_struct --limit=20" +) + +for test in "${tests_single_threaded[@]}" +do + mvn package exec:java \ + -Dexec.cleanupDaemonThreads=false \ + -Dexec.mainClass=${SINGLE_THREAD_READER} \ + -Dstaging.repo.url=${EXTRA_MAVEN_REPO:-"___"} \ + -Ddelta-kernel.version=${STANDALONE_VERSION:-"3.0.0-SNAPSHOT"} \ + -Dexec.args="${test}" +done + +declare -a tests_multi_threaded=( + "--table=${GOLDEN_TABLE_DIR}/data-reader-primitives --columns=as_int,as_long --limit=5 --parallelism=5" + "--table=${GOLDEN_TABLE_DIR}/data-reader-primitives --columns=as_int,as_long,as_double,as_string --limit=20 --parallelism=20" + "--table=${GOLDEN_TABLE_DIR}/data-reader-partition-values --columns=as_string,as_byte,as_list_of_records,as_nested_struct --limit=20 --parallelism=2" +) + +for test in "${tests_single_threaded[@]}" +do + mvn package exec:java \ + -Dexec.cleanupDaemonThreads=false \ + -Dexec.mainClass=${MULTI_THREADED_READER} \ + -Dstaging.repo.url=${EXTRA_MAVEN_REPO:-"___"} \ + -Ddelta-kernel.version=${STANDALONE_VERSION:-"3.0.0-SNAPSHOT"} \ + -Dexec.args="${test}" +done + diff --git a/kernel/examples/table-reader/pom.xml b/kernel/examples/table-reader/pom.xml new file mode 100644 index 00000000000..a2825a8ae1d --- /dev/null +++ b/kernel/examples/table-reader/pom.xml @@ -0,0 +1,79 @@ + + + + + + 4.0.0 + + org.example + table-reader + 3.0.0-SNAPSHOT + + + 1.8 + 1.8 + "" + 3.0.0-SNAPSHOT + 3.3.1 + + + + + staging-repo + ${staging.repo.url} + + + + + + io.delta + delta-kernel-api + ${delta-kernel.version} + + + + io.delta + delta-kernel-default + ${delta-kernel.version} + + + + org.apache.hadoop + hadoop-client-runtime + ${hadoop.version} + + + + org.apache.hadoop + hadoop-client-api + ${hadoop.version} + + + + commons-cli + commons-cli + 1.5.0 + + + + com.fasterxml.jackson.core + jackson-databind + 2.13.5 + + + + diff --git a/kernel/examples/table-reader/src/main/java/io/delta/kernel/examples/BaseTableReader.java b/kernel/examples/table-reader/src/main/java/io/delta/kernel/examples/BaseTableReader.java new file mode 100644 index 00000000000..c756b10a8ed --- /dev/null +++ b/kernel/examples/table-reader/src/main/java/io/delta/kernel/examples/BaseTableReader.java @@ -0,0 +1,193 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.examples; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.hadoop.conf.Configuration; + +import io.delta.kernel.TableNotFoundException; +import io.delta.kernel.client.DefaultTableClient; +import io.delta.kernel.client.TableClient; +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.data.DataReadResult; +import io.delta.kernel.data.vector.VectorUtils; +import io.delta.kernel.types.StructField; +import io.delta.kernel.types.StructType; + +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; + +/** + * Base class for reading Delta Lake tables using the Delta Kernel APIs. + */ +public abstract class BaseTableReader +{ + public static final int DEFAULT_LIMIT = 20; + + protected final String tablePath; + protected final TableClient tableClient; + + public BaseTableReader(String tablePath) + { + this.tablePath = requireNonNull(tablePath); + this.tableClient = DefaultTableClient.create(new Configuration()); + } + + /** + * Show the given {@code limit} rows containing the given columns from the table. + * + * @param limit Max number of rows to show. + * @param columnsOpt If null, show all columns in the table. + * @throws TableNotFoundException + * @throws IOException + */ + public abstract void show(int limit, Optional> columnsOpt) + throws TableNotFoundException, IOException; + + /** + * Utility method to return a pruned schema that contains the given {@code columns} from + * {@code baseSchema} + */ + protected static StructType pruneSchema(StructType baseSchema, Optional> columns) + { + if (!columns.isPresent()) { + return baseSchema; + } + List selectedFields = columns.get().stream().map(column -> { + if (baseSchema.indexOf(column) == -1) { + throw new IllegalArgumentException( + format("Column %s is not found in table", column)); + } + return baseSchema.get(column); + }).collect(Collectors.toList()); + + return new StructType(selectedFields); + } + + protected static int printData(DataReadResult dataReadResult, int maxRowsToPrint) + { + int printedRowCount = 0; + ColumnarBatch data = dataReadResult.getData(); + Optional selectionVector = dataReadResult.getSelectionVector(); + for (int rowId = 0; rowId < data.getSize(); rowId++) { + if (!selectionVector.isPresent() || selectionVector.get().getBoolean(rowId)) { + printRow(data, rowId); + printedRowCount++; + if (printedRowCount == maxRowsToPrint) { + break; + } + } + } + return printedRowCount; + } + + protected static void printSchema(StructType schema) + { + System.out.printf(formatter(schema.length()), schema.fieldNames().toArray(new String[0])); + } + + protected static void printRow(ColumnarBatch batch, int rowId) + { + int numCols = batch.getSchema().length(); + Object[] rowValues = IntStream.range(0, numCols).mapToObj(colOrdinal -> { + ColumnVector columnVector = batch.getColumnVector(colOrdinal); + return VectorUtils.getValueAsObject(columnVector, rowId); + }).toArray(); + + // TODO: Need to handle the Row, Map, Array, Timestamp, Date types specially to + // print them in the format they need. Copy this code from Spark CLI. + + System.out.printf(formatter(numCols), rowValues); + } + + private static String formatter(int length) + { + return IntStream.range(0, length) + .mapToObj(i -> "%20s") + .collect(Collectors.joining("|")) + "\n"; + } + + /** + * Minimum command line options for any implementation of this reader. + */ + protected static Options baseOptions() + { + return new Options() + .addRequiredOption("t", "table", true, "Fully qualified table path") + .addOption("c", "columns", true, + "Comma separated list of columns to read from the table. " + + "Ex. --columns=id,name,address") + .addOption( + Option.builder() + .option("l") + .longOpt("limit") + .hasArg(true) + .desc("Maximum number of rows to read from the table (default 20).") + .type(Number.class) + .build() + ); + } + + /** + * Helper method to parse the command line arguments. + */ + protected static CommandLine parseArgs(Options options, String[] args) + { + CommandLineParser cliParser = new DefaultParser(); + + try { + return cliParser.parse(options, args); + } + catch (ParseException parseException) { + new HelpFormatter().printHelp( + "java " + SingleThreadedTableReader.class.getCanonicalName(), + options, + true + ); + } + System.exit(-1); + return null; + } + + protected static Optional> parseColumnList(CommandLine cli, String optionName) + { + return Optional.ofNullable(cli.getOptionValue(optionName)) + .map(colString -> Arrays.asList(colString.split(",[ ]*"))); + } + + protected static int parseInt(CommandLine cli, String optionName, int defaultValue) + throws ParseException + { + return Optional.ofNullable(cli.getParsedOptionValue(optionName)) + .map(Number.class::cast) + .map(Number::intValue) + .orElse(defaultValue); + } +} + diff --git a/kernel/examples/table-reader/src/main/java/io/delta/kernel/examples/MultiThreadedTableReader.java b/kernel/examples/table-reader/src/main/java/io/delta/kernel/examples/MultiThreadedTableReader.java new file mode 100644 index 00000000000..7fb1d5d8cd7 --- /dev/null +++ b/kernel/examples/table-reader/src/main/java/io/delta/kernel/examples/MultiThreadedTableReader.java @@ -0,0 +1,274 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.kernel.examples; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + +import io.delta.kernel.Scan; +import io.delta.kernel.Snapshot; +import io.delta.kernel.Table; +import io.delta.kernel.TableNotFoundException; +import io.delta.kernel.client.TableClient; +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.data.DataReadResult; +import io.delta.kernel.data.Row; +import io.delta.kernel.examples.utils.RowSerDe; +import io.delta.kernel.types.StructType; +import io.delta.kernel.utils.CloseableIterator; +import io.delta.kernel.utils.Utils; + +/** + * Multi-threaded Delta Lake table reader using the Delta Kernel APIs. It illustrates + * how to use the scan files rows received from the Delta Kernel in distributed engine. + *

+ * For this example serialization and deserialization is not needed as the work generator and + * work executors share the same memory, but illustrates an example way to serialize and deserialize + * scan file row and scan state row in order to send it to a remote thread and deserialize in a + * worker thread and read data from. + * + *

+ * Usage: + * java io.delta.kernel.examples.SingleThreadedTableReader [-c ][-l ] [-p ] -t + * -c,--columns Comma separated list of columns to read from the + * table. Ex. --columns=id,name,address + * -l,--limit Maximum number of rows to read from the table (default 20). + * -p,--parallelism Number of parallel readers to use (default 3). + * -t,--table Fully qualified table path + *

+ */ +public class MultiThreadedTableReader + extends BaseTableReader +{ + private static final int DEFAULT_NUM_THREADS = 3; + + private final int numThreads; + + public MultiThreadedTableReader(int numThreads, String tablePath) + { + super(tablePath); + this.numThreads = numThreads; + } + + public static void main(String[] args) + throws Exception + { + Options cliOptions = baseOptions().addOption( + Option.builder() + .option("p") + .longOpt("parallelism") + .hasArg() + .desc("Number of parallel readers to use (default 3).") + .type(Number.class) + .build()); + CommandLine commandLine = parseArgs(cliOptions, args); + + String tablePath = commandLine.getOptionValue("table"); + int limit = parseInt(commandLine, "limit", DEFAULT_LIMIT); + int numThreads = parseInt(commandLine, "parallelism", DEFAULT_NUM_THREADS); + Optional> columns = parseColumnList(commandLine, "columns"); + + new MultiThreadedTableReader(numThreads, tablePath) + .show(limit, columns); + } + + public void show(int limit, Optional> columnsOpt) + throws TableNotFoundException + { + Table table = Table.forPath(tablePath); + Snapshot snapshot = table.getLatestSnapshot(tableClient); + StructType readSchema = pruneSchema(snapshot.getSchema(tableClient), columnsOpt); + + new Reader(limit).readSnapshot(readSchema, snapshot); + } + + private class Reader + { + private final int limit; + private final AtomicBoolean stopSignal = new AtomicBoolean(false); + private final CountDownLatch countDownLatch = new CountDownLatch(numThreads); + private final ExecutorService executorService = + Executors.newFixedThreadPool(numThreads + 1); + private final BlockingQueue workQueue = new ArrayBlockingQueue<>(20); + + private int readRecordCount; // Data read so far. + + Reader(int limit) + { + this.limit = limit; + } + + /** + * Read the data from the given {@code snapshot}. + * + * @param readSchema Subset of columns to read from the snapshot. + * @param snapshot Table snapshot object + */ + void readSnapshot(StructType readSchema, Snapshot snapshot) + { + Scan scan = snapshot.getScanBuilder(tableClient) + .withReadSchema(tableClient, readSchema) + .build(); + + printSchema(readSchema); + try { + executorService.submit(workGenerator(scan)); + for (int i = 0; i < numThreads; i++) { + executorService.submit(workConsumer(i)); + } + + countDownLatch.await(); + } + catch (InterruptedException ie) { + System.out.println("Interrupted exiting now.."); + throw new RuntimeException(ie); + } + finally { + stopSignal.set(true); + executorService.shutdownNow(); + } + } + + private Runnable workGenerator(Scan scan) + { + return (() -> { + try { + Row scanStateRow = scan.getScanState(tableClient); + CloseableIterator scanFileIter = scan.getScanFiles(tableClient); + + while (scanFileIter.hasNext() && !stopSignal.get()) { + ColumnarBatch scanFileBatch = scanFileIter.next(); + try (CloseableIterator scanFileRows = scanFileBatch.getRows()) { + while (scanFileRows.hasNext() && !stopSignal.get()) { + workQueue.put(new ScanFile(scanStateRow, scanFileRows.next())); + } + } + catch (IOException ioe) { + throw new RuntimeException(ioe); + } + } + for (int i = 0; i < numThreads; i++) { + // poison pill for each worker threads to stop the work. + workQueue.put(ScanFile.POISON_PILL); + } + } + catch (InterruptedException ie) { + System.out.print("Work generator is interrupted"); + } + }); + } + + private Runnable workConsumer(int workerId) + { + return (() -> { + try { + ScanFile work = workQueue.take(); + if (work == ScanFile.POISON_PILL) { + return; // exit as there are no more work units + } + try (CloseableIterator dataIter = Scan.readData( + tableClient, + work.getScanRow(tableClient), + Utils.singletonCloseableIterator(work.getScanFileRow(tableClient)), + Optional.empty())) { + while (dataIter.hasNext()) { + if (printDataBatch(dataIter.next())) { + // Have enough records, exit now. + break; + } + } + } + } + catch (IOException ioe) { + throw new UncheckedIOException(ioe); + } + catch (InterruptedException ie) { + System.out.printf("Worker %d is interrupted." + workerId); + } + finally { + countDownLatch.countDown(); + } + }); + } + + /** Returns true when sufficient amount of rows are received */ + private boolean printDataBatch(DataReadResult dataReadResult) + { + synchronized (this) { + if (readRecordCount >= limit) { + return true; + } + readRecordCount += printData(dataReadResult, limit - readRecordCount); + return readRecordCount >= limit; + } + } + } + + /** + * Work unit representing the scan state and scan file in serialized format. + */ + private static class ScanFile + { + /** + * Special instance of the {@link ScanFile} to indicate to the worker that there are no + * more scan files to scan and stop the worker thread. + */ + private static final ScanFile POISON_PILL = new ScanFile("", ""); + + final String stateJson; + final String fileJson; + + ScanFile(Row scanStateRow, Row scanFileRow) + { + this.stateJson = RowSerDe.serializeRowToJson(scanStateRow); + this.fileJson = RowSerDe.serializeRowToJson(scanFileRow); + } + + ScanFile(String stateJson, String fileJson) + { + this.stateJson = stateJson; + this.fileJson = fileJson; + } + + /** + * Get the deserialized scan state as {@link Row} object + */ + Row getScanRow(TableClient tableClient) + { + return RowSerDe.deserializeRowFromJson(tableClient, stateJson); + } + + /** + * Get the deserialized scan file as {@link Row} object + */ + Row getScanFileRow(TableClient tableClient) + { + return RowSerDe.deserializeRowFromJson(tableClient, fileJson); + } + } +} diff --git a/kernel/examples/table-reader/src/main/java/io/delta/kernel/examples/SingleThreadedTableReader.java b/kernel/examples/table-reader/src/main/java/io/delta/kernel/examples/SingleThreadedTableReader.java new file mode 100644 index 00000000000..3ed79783840 --- /dev/null +++ b/kernel/examples/table-reader/src/main/java/io/delta/kernel/examples/SingleThreadedTableReader.java @@ -0,0 +1,126 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.kernel.examples; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import org.apache.commons.cli.CommandLine; + +import io.delta.kernel.Scan; +import io.delta.kernel.Snapshot; +import io.delta.kernel.Table; +import io.delta.kernel.TableNotFoundException; +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.data.DataReadResult; +import io.delta.kernel.data.Row; +import io.delta.kernel.types.StructType; +import io.delta.kernel.utils.CloseableIterator; + +/** + * Single threaded Delta Lake table reader using the Delta Kernel APIs. + * + *

+ * Usage: java io.delta.kernel.examples.SingleThreadedTableReader [-c ] [-l ] -t + *

+ * -c,--columns Comma separated list of columns to read from the + * table. Ex. --columns=id,name,address + * -l,--limit Maximum number of rows to read from the table + * (default 20). + * -t,--table Fully qualified table path + *

+ */ +public class SingleThreadedTableReader + extends BaseTableReader +{ + public SingleThreadedTableReader(String tablePath) + { + super(tablePath); + } + + public static void main(String[] args) + throws Exception + { + CommandLine commandLine = parseArgs(baseOptions(), args); + + String tablePath = commandLine.getOptionValue("table"); + int limit = parseInt(commandLine, "limit", DEFAULT_LIMIT); + Optional> columns = parseColumnList(commandLine, "columns"); + + new SingleThreadedTableReader(tablePath) + .show(limit, columns); + } + + @Override + public void show(int limit, Optional> columnsOpt) + throws TableNotFoundException, IOException + { + Table table = Table.forPath(tablePath); + Snapshot snapshot = table.getLatestSnapshot(tableClient); + StructType readSchema = pruneSchema(snapshot.getSchema(tableClient), columnsOpt); + + readSnapshot(readSchema, snapshot, limit); + } + + /** + * Utility method to read and print the data from the given {@code snapshot}. + * + * @param readSchema Subset of columns to read from the snapshot. + * @param snapshot Table snapshot object + * @param maxRowCount Not a hard limit but use this limit to stop reading more columnar batches + * once the already read columnar batches have at least these many rows. + * @return + * @throws Exception + */ + private void readSnapshot( + StructType readSchema, + Snapshot snapshot, + int maxRowCount) throws IOException + { + Scan scan = snapshot.getScanBuilder(tableClient) + .withReadSchema(tableClient, readSchema) + .build(); + + printSchema(readSchema); + + Row scanState = scan.getScanState(tableClient); + CloseableIterator scanFileIter = scan.getScanFiles(tableClient); + + int readRecordCount = 0; + try { + while (scanFileIter.hasNext()) { + try (CloseableIterator data = + Scan.readData( + tableClient, + scanState, + scanFileIter.next().getRows(), + Optional.empty())) { + while (data.hasNext()) { + DataReadResult dataReadResult = data.next(); + readRecordCount += printData(dataReadResult, maxRowCount - readRecordCount); + if (readRecordCount >= maxRowCount) { + return; + } + } + } + } + } + finally { + scanFileIter.close(); + } + } +} diff --git a/kernel/examples/table-reader/src/main/java/io/delta/kernel/examples/utils/RowSerDe.java b/kernel/examples/table-reader/src/main/java/io/delta/kernel/examples/utils/RowSerDe.java new file mode 100644 index 00000000000..92072a06437 --- /dev/null +++ b/kernel/examples/table-reader/src/main/java/io/delta/kernel/examples/utils/RowSerDe.java @@ -0,0 +1,152 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.kernel.examples.utils; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.delta.kernel.client.TableClient; +import io.delta.kernel.data.DefaultJsonRow; +import io.delta.kernel.data.Row; +import io.delta.kernel.internal.types.TableSchemaSerDe; +import io.delta.kernel.types.ArrayType; +import io.delta.kernel.types.BooleanType; +import io.delta.kernel.types.ByteType; +import io.delta.kernel.types.DataType; +import io.delta.kernel.types.DoubleType; +import io.delta.kernel.types.FloatType; +import io.delta.kernel.types.IntegerType; +import io.delta.kernel.types.LongType; +import io.delta.kernel.types.MapType; +import io.delta.kernel.types.ShortType; +import io.delta.kernel.types.StringType; +import io.delta.kernel.types.StructField; +import io.delta.kernel.types.StructType; + +import java.io.UncheckedIOException; +import java.util.HashMap; +import java.util.Map; + +/** + * Utility class to serialize and deserialize {@link Row} object. + */ +public class RowSerDe +{ + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private RowSerDe() + { + } + + /** + * Utility method to serialize a {@link Row} as a JSON string + */ + public static String serializeRowToJson(Row row) + { + Map rowObject = convertRowToJsonObject(row); + try { + Map rowWithSchema = new HashMap<>(); + rowWithSchema.put("schema", TableSchemaSerDe.toJson(row.getSchema())); + rowWithSchema.put("row", rowObject); + return OBJECT_MAPPER.writeValueAsString(rowWithSchema); + } + catch (JsonProcessingException e) { + throw new UncheckedIOException(e); + } + } + + /** + * Utility method to deserialize a {@link Row} object from the JSON form. + */ + public static Row deserializeRowFromJson(TableClient tableClient, String jsonRowWithSchema) + { + try { + JsonNode jsonNode = OBJECT_MAPPER.readTree(jsonRowWithSchema); + JsonNode schemaNode = jsonNode.get("schema"); + StructType schema = TableSchemaSerDe.fromJson(tableClient.getJsonHandler(), schemaNode.asText()); + return parseRowFromJsonWithSchema((ObjectNode) jsonNode.get("row"), schema); + } + catch (JsonProcessingException ex) { + throw new UncheckedIOException(ex); + } + } + + private static Map convertRowToJsonObject(Row row) + { + StructType rowType = row.getSchema(); + Map rowObject = new HashMap<>(); + for (int fieldId = 0; fieldId < rowType.length(); fieldId++) { + StructField field = rowType.at(fieldId); + DataType fieldType = field.getDataType(); + String name = field.getName(); + + if (row.isNullAt(fieldId)) { + rowObject.put(name, null); + continue; + } + + Object value; + if (fieldType instanceof BooleanType) { + value = row.getBoolean(fieldId); + } + else if (fieldType instanceof ByteType) { + value = row.getByte(fieldId); + } + else if (fieldType instanceof ShortType) { + value = row.getShort(fieldId); + } + else if (fieldType instanceof IntegerType) { + value = row.getInt(fieldId); + } + else if (fieldType instanceof LongType) { + value = row.getLong(fieldId); + } + else if (fieldType instanceof FloatType) { + value = row.getFloat(fieldId); + } + else if (fieldType instanceof DoubleType) { + value = row.getDouble(fieldId); + } + else if (fieldType instanceof StringType) { + value = row.getString(fieldId); + } + else if (fieldType instanceof ArrayType) { + value = row.getArray(fieldId); + } + else if (fieldType instanceof MapType) { + value = row.getMap(fieldId); + } + else if (fieldType instanceof StructType) { + Row subRow = row.getStruct(fieldId); + value = convertRowToJsonObject(subRow); + } + else { + throw new UnsupportedOperationException("NYI"); + } + + rowObject.put(name, value); + } + + return rowObject; + } + + private static Row parseRowFromJsonWithSchema(ObjectNode rowJsonNode, StructType rowType) + { + return new DefaultJsonRow(rowJsonNode, rowType); + } +} From a55aca0633a62626004d265eeaa93d8c0f00ff01 Mon Sep 17 00:00:00 2001 From: Anonymous <> Date: Tue, 1 Aug 2023 14:17:31 -0700 Subject: [PATCH 2/4] update the script to python --- kernel/examples/run-kernel-examples.py | 160 +++++++++++++++++++++++++ kernel/examples/run_examples.sh | 49 -------- kernel/examples/table-reader/pom.xml | 2 +- 3 files changed, 161 insertions(+), 50 deletions(-) create mode 100644 kernel/examples/run-kernel-examples.py delete mode 100755 kernel/examples/run_examples.sh diff --git a/kernel/examples/run-kernel-examples.py b/kernel/examples/run-kernel-examples.py new file mode 100644 index 00000000000..f4c1a884be4 --- /dev/null +++ b/kernel/examples/run-kernel-examples.py @@ -0,0 +1,160 @@ +#!/usr/bin/env python3 + +# +# Copyright (2021) The Delta Lake Project Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import subprocess +from os import path +import shutil +import argparse + +def run_single_threaded_examples(version, maven_repo, examples_root_dir, golden_tables_dir): + main_class = "io.delta.kernel.examples.SingleThreadedTableReader" + test_cases = [ + f"--table={golden_tables_dir}/data-reader-primitives --columns=as_int,as_long --limit=5", + f"--table={golden_tables_dir}/data-reader-primitives --columns=as_int,as_long,as_double,as_string --limit=20", + f"--table={golden_tables_dir}/data-reader-partition-values --columns=as_string,as_byte,as_list_of_records,as_nested_struct --limit=20" + ] + project_dir = path.join(examples_root_dir, "table-reader") + + run_examples(version, maven_repo, project_dir, main_class, test_cases) + + +def run_multi_threaded_examples(version, maven_repo, examples_root_dir, golden_tables_dir): + main_class = "io.delta.kernel.examples.MultiThreadedTableReader" + test_cases = [ + f"--table={golden_tables_dir}/data-reader-primitives --columns=as_int,as_long --limit=5 --parallelism=5", + f"--table={golden_tables_dir}/data-reader-primitives --columns=as_int,as_long,as_double,as_string --limit=20 --parallelism=20", + f"--table={golden_tables_dir}/data-reader-partition-values --columns=as_string,as_byte,as_list_of_records,as_nested_struct --limit=20 --parallelism=2" + ] + project_dir = path.join(examples_root_dir, "table-reader") + + run_examples(version, maven_repo, project_dir, main_class, test_cases) + + +def run_examples(version, maven_repo, project_dir, main_class, test_cases): + with WorkingDirectory(project_dir): + for test in test_cases: + cmd = ["mvn", "package", "exec:java", f"-Dexec.mainClass={main_class}", + f"-Dstaging-repo={maven_repo}", + f"-Ddelta-kernel.version={version}", + f"-Dexec.args={test}"] + run_cmd(cmd, stream_output=True) + + +def clear_artifact_cache(): + print("Clearing Delta Kernel artifacts from ivy2 and mvn cache") + delete_if_exists(os.path.expanduser("~/.ivy2/cache/io.delta.kernel")) + delete_if_exists(os.path.expanduser("~/.ivy2/local/io.delta.kernel")) + delete_if_exists(os.path.expanduser("~/.m2/repository/io/delta/kernel/")) + + +def delete_if_exists(path): + # if path exists, delete it. + if os.path.exists(path): + shutil.rmtree(path) + print("Deleted %s " % path) + + +# pylint: disable=too-few-public-methods +class WorkingDirectory(object): + def __init__(self, working_directory): + self.working_directory = working_directory + self.old_workdir = os.getcwd() + + def __enter__(self): + os.chdir(self.working_directory) + + def __exit__(self, tpe, value, traceback): + os.chdir(self.old_workdir) + + +def run_cmd(cmd, throw_on_error=True, env=None, stream_output=False, **kwargs): + cmd_env = os.environ.copy() + if env: + cmd_env.update(env) + + if stream_output: + child = subprocess.Popen(cmd, env=cmd_env, **kwargs) + exit_code = child.wait() + if throw_on_error and exit_code != 0: + raise Exception("Non-zero exitcode: %s" % (exit_code)) + return exit_code + else: + child = subprocess.Popen( + cmd, + env=cmd_env, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + **kwargs) + (stdout, stderr) = child.communicate() + exit_code = child.wait() + if throw_on_error and exit_code != 0: + raise Exception( + "Non-zero exitcode: %s\n\nSTDOUT:\n%s\n\nSTDERR:%s" % + (exit_code, stdout, stderr)) + return (exit_code, stdout, stderr) + + +if __name__ == "__main__": + """ + Script to run Delta Kernel examples which are located in the kernel/examples directory. + call this by running `python run-kernel-examples.py` + additionally the version can be provided as a command line argument. + """ + + # get the version of the package + examples_root_dir = path.dirname(__file__) + with open(path.join(examples_root_dir, "../../version.sbt")) as fd: + default_version = fd.readline().split('"')[1] + + parser = argparse.ArgumentParser() + parser.add_argument( + "--version", + required=False, + default=default_version, + help="Delta Kernel version to use to run the examples") + + parser.add_argument( + "--maven-repo", + required=False, + default=None, + help="Additional Maven repo to resolve staged new release artifacts") + + parser.add_argument( + "--use-local", + required=False, + default=False, + action="store_true", + help="Generate JARs from local source code and use to run tests") + + args = parser.parse_args() + + if args.use_local and (args.version != default_version): + raise Exception("Cannot specify --use-local with a --version different than in version.sbt") + + clear_artifact_cache() + + if args.use_local: + run_cmd(["build/sbt", "kernelApi/publishM2", "kernelDefault/publishM2"]) + + golden_file_dir = path.join( + examples_root_dir, + "../../connectors//golden-tables/src/main/resources/golden/") + + run_single_threaded_examples(args.version, args.maven_repo, examples_root_dir, golden_file_dir) + run_multi_threaded_examples(args.version, args.maven_repo, examples_root_dir, golden_file_dir) diff --git a/kernel/examples/run_examples.sh b/kernel/examples/run_examples.sh deleted file mode 100755 index f9b289116ea..00000000000 --- a/kernel/examples/run_examples.sh +++ /dev/null @@ -1,49 +0,0 @@ -## -## This script runs the Delta Kernel example programs using the golden -## tables located in the /connectors/golden-tables/src/main/resources/golden -## directory. -## -## Make sure to run this script from in order for the relative -## paths used for referring to the golden tables work. - -BASEDIR=`pwd` -echo $BASEDIR -GOLDEN_TABLE_DIR="${BASEDIR}/connectors//golden-tables/src/main/resources/golden/" - -cd kernel/examples/table-reader - -SINGLE_THREAD_READER="io.delta.kernel.examples.SingleThreadedTableReader" -MULTI_THREADED_READER="io.delta.kernel.examples.MultiThreadedTableReader" - -declare -a tests_single_threaded=( - "--table=${GOLDEN_TABLE_DIR}/data-reader-primitives --columns=as_int,as_long --limit=5" - "--table=${GOLDEN_TABLE_DIR}/data-reader-primitives --columns=as_int,as_long,as_double,as_string --limit=20" - "--table=${GOLDEN_TABLE_DIR}/data-reader-partition-values --columns=as_string,as_byte,as_list_of_records,as_nested_struct --limit=20" -) - -for test in "${tests_single_threaded[@]}" -do - mvn package exec:java \ - -Dexec.cleanupDaemonThreads=false \ - -Dexec.mainClass=${SINGLE_THREAD_READER} \ - -Dstaging.repo.url=${EXTRA_MAVEN_REPO:-"___"} \ - -Ddelta-kernel.version=${STANDALONE_VERSION:-"3.0.0-SNAPSHOT"} \ - -Dexec.args="${test}" -done - -declare -a tests_multi_threaded=( - "--table=${GOLDEN_TABLE_DIR}/data-reader-primitives --columns=as_int,as_long --limit=5 --parallelism=5" - "--table=${GOLDEN_TABLE_DIR}/data-reader-primitives --columns=as_int,as_long,as_double,as_string --limit=20 --parallelism=20" - "--table=${GOLDEN_TABLE_DIR}/data-reader-partition-values --columns=as_string,as_byte,as_list_of_records,as_nested_struct --limit=20 --parallelism=2" -) - -for test in "${tests_single_threaded[@]}" -do - mvn package exec:java \ - -Dexec.cleanupDaemonThreads=false \ - -Dexec.mainClass=${MULTI_THREADED_READER} \ - -Dstaging.repo.url=${EXTRA_MAVEN_REPO:-"___"} \ - -Ddelta-kernel.version=${STANDALONE_VERSION:-"3.0.0-SNAPSHOT"} \ - -Dexec.args="${test}" -done - diff --git a/kernel/examples/table-reader/pom.xml b/kernel/examples/table-reader/pom.xml index a2825a8ae1d..1e2403ec5fa 100644 --- a/kernel/examples/table-reader/pom.xml +++ b/kernel/examples/table-reader/pom.xml @@ -21,7 +21,7 @@ limitations under the License.--> org.example table-reader - 3.0.0-SNAPSHOT + 0.1-SNAPSHOT 1.8 From bfedfb6f12cdae1b4a6251c2a7159c3badbc9f89 Mon Sep 17 00:00:00 2001 From: Anonymous <> Date: Tue, 1 Aug 2023 14:23:35 -0700 Subject: [PATCH 3/4] f --- .../io/delta/kernel/examples/MultiThreadedTableReader.java | 4 ++-- .../io/delta/kernel/examples/SingleThreadedTableReader.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/kernel/examples/table-reader/src/main/java/io/delta/kernel/examples/MultiThreadedTableReader.java b/kernel/examples/table-reader/src/main/java/io/delta/kernel/examples/MultiThreadedTableReader.java index 7fb1d5d8cd7..86daf7c8c5e 100644 --- a/kernel/examples/table-reader/src/main/java/io/delta/kernel/examples/MultiThreadedTableReader.java +++ b/kernel/examples/table-reader/src/main/java/io/delta/kernel/examples/MultiThreadedTableReader.java @@ -104,7 +104,7 @@ public void show(int limit, Optional> columnsOpt) Snapshot snapshot = table.getLatestSnapshot(tableClient); StructType readSchema = pruneSchema(snapshot.getSchema(tableClient), columnsOpt); - new Reader(limit).readSnapshot(readSchema, snapshot); + new Reader(limit).readData(readSchema, snapshot); } private class Reader @@ -129,7 +129,7 @@ private class Reader * @param readSchema Subset of columns to read from the snapshot. * @param snapshot Table snapshot object */ - void readSnapshot(StructType readSchema, Snapshot snapshot) + void readData(StructType readSchema, Snapshot snapshot) { Scan scan = snapshot.getScanBuilder(tableClient) .withReadSchema(tableClient, readSchema) diff --git a/kernel/examples/table-reader/src/main/java/io/delta/kernel/examples/SingleThreadedTableReader.java b/kernel/examples/table-reader/src/main/java/io/delta/kernel/examples/SingleThreadedTableReader.java index 3ed79783840..c94e2ba91ee 100644 --- a/kernel/examples/table-reader/src/main/java/io/delta/kernel/examples/SingleThreadedTableReader.java +++ b/kernel/examples/table-reader/src/main/java/io/delta/kernel/examples/SingleThreadedTableReader.java @@ -73,7 +73,7 @@ public void show(int limit, Optional> columnsOpt) Snapshot snapshot = table.getLatestSnapshot(tableClient); StructType readSchema = pruneSchema(snapshot.getSchema(tableClient), columnsOpt); - readSnapshot(readSchema, snapshot, limit); + readData(readSchema, snapshot, limit); } /** @@ -86,7 +86,7 @@ public void show(int limit, Optional> columnsOpt) * @return * @throws Exception */ - private void readSnapshot( + private void readData( StructType readSchema, Snapshot snapshot, int maxRowCount) throws IOException From 9b919786473d522cd9dd75a579882c03d1cc37d4 Mon Sep 17 00:00:00 2001 From: Anonymous <> Date: Wed, 2 Aug 2023 09:48:56 -0700 Subject: [PATCH 4/4] review --- kernel/examples/run-kernel-examples.py | 10 ++++++---- .../kernel/examples/MultiThreadedTableReader.java | 11 ++++++++--- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/kernel/examples/run-kernel-examples.py b/kernel/examples/run-kernel-examples.py index f4c1a884be4..f1e23c11b3a 100644 --- a/kernel/examples/run-kernel-examples.py +++ b/kernel/examples/run-kernel-examples.py @@ -31,7 +31,7 @@ def run_single_threaded_examples(version, maven_repo, examples_root_dir, golden_ ] project_dir = path.join(examples_root_dir, "table-reader") - run_examples(version, maven_repo, project_dir, main_class, test_cases) + run_example(version, maven_repo, project_dir, main_class, test_cases) def run_multi_threaded_examples(version, maven_repo, examples_root_dir, golden_tables_dir): @@ -43,10 +43,10 @@ def run_multi_threaded_examples(version, maven_repo, examples_root_dir, golden_t ] project_dir = path.join(examples_root_dir, "table-reader") - run_examples(version, maven_repo, project_dir, main_class, test_cases) + run_example(version, maven_repo, project_dir, main_class, test_cases) -def run_examples(version, maven_repo, project_dir, main_class, test_cases): +def run_example(version, maven_repo, project_dir, main_class, test_cases): with WorkingDirectory(project_dir): for test in test_cases: cmd = ["mvn", "package", "exec:java", f"-Dexec.mainClass={main_class}", @@ -150,7 +150,9 @@ def run_cmd(cmd, throw_on_error=True, env=None, stream_output=False, **kwargs): clear_artifact_cache() if args.use_local: - run_cmd(["build/sbt", "kernelApi/publishM2", "kernelDefault/publishM2"]) + project_root = path.join(examples_root_dir, "../../") + with WorkingDirectory(project_root): + run_cmd([f"{project_root}/build/sbt", "kernelGroup/publishM2"], stream_output=True) golden_file_dir = path.join( examples_root_dir, diff --git a/kernel/examples/table-reader/src/main/java/io/delta/kernel/examples/MultiThreadedTableReader.java b/kernel/examples/table-reader/src/main/java/io/delta/kernel/examples/MultiThreadedTableReader.java index 86daf7c8c5e..29ada9be300 100644 --- a/kernel/examples/table-reader/src/main/java/io/delta/kernel/examples/MultiThreadedTableReader.java +++ b/kernel/examples/table-reader/src/main/java/io/delta/kernel/examples/MultiThreadedTableReader.java @@ -48,9 +48,14 @@ * how to use the scan files rows received from the Delta Kernel in distributed engine. *

* For this example serialization and deserialization is not needed as the work generator and - * work executors share the same memory, but illustrates an example way to serialize and deserialize - * scan file row and scan state row in order to send it to a remote thread and deserialize in a - * worker thread and read data from. + * work executors share the same memory, but it illustrates an example of how Delta Kernel can + * work in a distributed query engine. High level steps are: + * - The query engine asks the Delta Kernel APIs for scan file and scan state rows at the driver + * (or equivalent) node + * - The query engine serializes the scan file and scan state at the driver node + * - The driver sends the serialized bytes to remote worker node(s) + * - Worker nodes deserialize the scan file and scan state rows from the serialized bytes + * - Worker nodes read the data from given scan file(s) and scan state using the Delta Kernel APIs. * *

* Usage: