Skip to content

Commit

Permalink
[Kernel] Refactor the Array and Map return types in ColumnVector and …
Browse files Browse the repository at this point in the history
…Row (delta-io#2087)

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull request title
For example: [Spark] Title of my pull request
-->

- [ ] Spark
- [ ] Standalone
- [ ] Flink
- [x] Kernel
- [ ] Other (fill in here)

## Description

BASED OFF OF delta-io#2069; to review changes in this PR select the last 4 commits ([here](https://github.com/delta-io/delta/pull/2087/files/a5073b4c14496314bae1ca97163f9e7bbc67bc6d..d160915717d683ebfd6087a93ccb224a57216d82))

Refactors the ColumnVector and Row `getArray` and `getMap` APIs to return wrapper objects. These wrappers provide APIs to retrieve column vector views of the elements or keys/values of the map/array. 

## How was this patch tested?

Supporting the new APIs in some of the java testing infrastructure was non-trivial, so this PR also moves some tests from TestDeltaTableReads to DeltaTableReadsSuite and adds the complex types to the scala testing infrastructure.

Existing tests have been modified for this PR as well as a few additional checks/tests added.
  • Loading branch information
allisonport-db authored and vkorukanti committed Oct 3, 2023
1 parent 497eaa0 commit 081ee55
Show file tree
Hide file tree
Showing 39 changed files with 1,215 additions and 516 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.data;

/**
* Abstraction to represent a single array value in a {@link ColumnVector}.
*/
public interface ArrayValue {
/**
* The number of elements in the array
*/
int getSize();

/**
* A {@link ColumnVector} containing the array elements with exactly
* {@link ArrayValue#getSize()} elements.
*/
ColumnVector getElements();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
package io.delta.kernel.data;

import java.math.BigDecimal;
import java.util.List;
import java.util.Map;

import io.delta.kernel.annotation.Evolving;
import io.delta.kernel.types.DataType;
Expand Down Expand Up @@ -162,15 +160,10 @@ default BigDecimal getDecimal(int rowId) {
}

/**
* Return the map type value located at {@code rowId}. The return value is undefined and can be
* anything, if the slot for {@code rowId} is null.
*
* @param rowId
* @param <K> Return map key type
* @param <V> Return map value type
* @return
* Return the map value located at {@code rowId}. Returns null if the slot for {@code rowId}
* is null
*/
default <K, V> Map<K, V> getMap(int rowId) {
default MapValue getMap(int rowId) {
throw new UnsupportedOperationException("Invalid value request for data type");
}

Expand All @@ -186,14 +179,10 @@ default Row getStruct(int rowId) {
}

/**
* Return the array value located at {@code rowId}. The return value is undefined and can be
* anything, if the slot for {@code rowId} is null.
*
* @param rowId
* @param <T> Array element type
* @return
* Return the array value located at {@code rowId}. Returns null if the slot for {@code rowId}
* is null
*/
default <T> List<T> getArray(int rowId) {
default ArrayValue getArray(int rowId) {
throw new UnsupportedOperationException("Invalid value request for data type");
}

Expand Down
40 changes: 40 additions & 0 deletions kernel/kernel-api/src/main/java/io/delta/kernel/data/MapValue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.data;

/**
* Abstraction to represent a single map value in a {@link ColumnVector}.
*/
public interface MapValue {
/**
* The number of elements in the map
*/
int getSize();

/**
* A {@link ColumnVector} containing the keys. There are exactly {@link MapValue#getSize()} keys
* in the vector, and each key maps one-to-one to the value at the same index in
* {@link MapValue#getValues()}.
*/
ColumnVector getKeys();

/**
* A {@link ColumnVector} containing the values. There are exactly {@link MapValue#getSize()}
* values in the vector, and maps one-to-one to the keys in {@link MapValue#getKeys()}
*/
ColumnVector getValues();

}
6 changes: 2 additions & 4 deletions kernel/kernel-api/src/main/java/io/delta/kernel/data/Row.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
package io.delta.kernel.data;

import java.math.BigDecimal;
import java.util.List;
import java.util.Map;

import io.delta.kernel.annotation.Evolving;
import io.delta.kernel.types.StructType;
Expand Down Expand Up @@ -112,11 +110,11 @@ public interface Row {
* Return array value of the column located at the given ordinal.
* Throws error if the column at given ordinal is not of array type,
*/
<T> List<T> getArray(int ordinal);
ArrayValue getArray(int ordinal);

/**
* Return map value of the column located at the given ordinal.
* Throws error if the column at given ordinal is not of map type,
*/
<K, V> Map<K, V> getMap(int ordinal);
MapValue getMap(int ordinal);
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
* <li>Name: <code>element_at</code>
* <ul>
* <li>Semantic: <code>element_at(map, key)</code>. Return the value of given <i>key</i>
* from the <i>map</i> type input. Ex: `element_at(map(1, 'a', 2, 'b'), 2)` returns 'b'</li>
* from the <i>map</i> type input. Returns <i>null</i> if the given <i>key</i> is not in
* the <i>map</i> Ex: `element_at(map(1, 'a', 2, 'b'), 2)` returns 'b'</li>
* <li>Since version: 3.0.0</li>
* </ul>
* </li>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.delta.kernel.types.StringType;
import io.delta.kernel.types.StructField;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.VectorUtils;

import io.delta.kernel.internal.actions.AddFile;
import io.delta.kernel.internal.actions.DeletionVectorDescriptor;
Expand Down Expand Up @@ -111,7 +112,7 @@ public static FileStatus getAddFileStatus(Row scanFileInfo) {
*/
public static Map<String, String> getPartitionValues(Row scanFileInfo) {
Row addFile = getAddFileEntry(scanFileInfo);
return addFile.getMap(ADD_FILE_PARTITION_VALUES_ORDINAL);
return VectorUtils.toJavaMap(addFile.getMap(ADD_FILE_PARTITION_VALUES_ORDINAL));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.delta.kernel.types.TimestampType;
import io.delta.kernel.utils.CloseableIterator;
import io.delta.kernel.utils.Tuple2;
import io.delta.kernel.utils.VectorUtils;

import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
Expand Down Expand Up @@ -86,7 +87,8 @@ public Scan build() {
// TODO: support timestamp type partition columns
// Timestamp partition columns have complicated semantics related to timezones so block this
// for now
List<String> partitionCols = protocolAndMetadata.get()._2.getPartitionColumns();
List<String> partitionCols = VectorUtils.toJavaList(
protocolAndMetadata.get()._2.getPartitionColumns());
for (String colName : partitionCols) {
if (readSchema.indexOf(colName) >= 0 &&
readSchema.get(colName).getDataType() instanceof TimestampType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public Row getScanState(TableClient tableClient) {
readSchema,
snapshotSchema,
protocolAndMetadata.get()._2.getConfiguration()
.getOrDefault("delta.columnMapping.mode", "none")
.getOrDefault("delta.columnMapping.mode", "none")
)
),
dataPath.toUri().toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,23 @@
package io.delta.kernel.internal.actions;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static java.util.Objects.requireNonNull;

import io.delta.kernel.client.TableClient;
import io.delta.kernel.data.ArrayValue;
import io.delta.kernel.data.MapValue;
import io.delta.kernel.data.Row;
import io.delta.kernel.types.ArrayType;
import io.delta.kernel.types.LongType;
import io.delta.kernel.types.MapType;
import io.delta.kernel.types.StringType;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.VectorUtils;
import static io.delta.kernel.utils.Utils.requireNonNull;

import io.delta.kernel.internal.lang.Lazy;
import io.delta.kernel.internal.types.TableSchemaSerDe;

public class Metadata {
Expand Down Expand Up @@ -66,17 +69,18 @@ public static Metadata fromRow(Row row, TableClient tableClient) {
.add("createdTime", LongType.INSTANCE, true /* contains null */)
.add("configuration",
new MapType(StringType.INSTANCE, StringType.INSTANCE, false),
false /* contains null */);
false /* nullable */);

private final String id;
private final Optional<String> name;
private final Optional<String> description;
private final Format format;
private final String schemaString;
private final StructType schema;
private final List<String> partitionColumns;
private final ArrayValue partitionColumns;
private final Optional<Long> createdTime;
private final Map<String, String> configuration;
private final MapValue configurationMapValue;
private final Lazy<Map<String, String>> configuration;

public Metadata(
String id,
Expand All @@ -85,19 +89,19 @@ public Metadata(
Format format,
String schemaString,
StructType schema,
List<String> partitionColumns,
ArrayValue partitionColumns,
Optional<Long> createdTime,
Map<String, String> configuration) {
MapValue configurationMapValue) {
this.id = requireNonNull(id, "id is null");
this.name = name;
this.description = requireNonNull(description, "description is null");
this.format = requireNonNull(format, "format is null");
this.schemaString = requireNonNull(schemaString, "schemaString is null");
this.schema = schema;
this.partitionColumns =
partitionColumns == null ? Collections.emptyList() : partitionColumns;
this.partitionColumns = requireNonNull(partitionColumns, "partitionColumns is null");
this.createdTime = createdTime;
this.configuration = configuration == null ? Collections.emptyMap() : configuration;
this.configurationMapValue = requireNonNull(configurationMapValue, "configuration is null");
this.configuration = new Lazy<>(() -> VectorUtils.toJavaMap(configurationMapValue));
}

public String getSchemaString() {
Expand All @@ -108,7 +112,7 @@ public StructType getSchema() {
return schema;
}

public List<String> getPartitionColumns() {
public ArrayValue getPartitionColumns() {
return partitionColumns;
}

Expand All @@ -132,7 +136,11 @@ public Optional<Long> getCreatedTime() {
return createdTime;
}

public MapValue getConfigurationMapValue() {
return configurationMapValue;
}

public Map<String, String> getConfiguration() {
return configuration;
return Collections.unmodifiableMap(configuration.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.delta.kernel.types.IntegerType;
import io.delta.kernel.types.StringType;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.VectorUtils;

public class Protocol {
public static Protocol fromRow(Row row) {
Expand All @@ -32,8 +33,10 @@ public static Protocol fromRow(Row row) {
return new Protocol(
row.getInt(0),
row.getInt(1),
row.isNullAt(2) ? Collections.emptyList() : row.getArray(2),
row.isNullAt(3) ? Collections.emptyList() : row.getArray(3));
row.isNullAt(2) ? Collections.emptyList() :
VectorUtils.toJavaList(row.getArray(2)),
row.isNullAt(3) ? Collections.emptyList() :
VectorUtils.toJavaList(row.getArray(3)));
}

public static final StructType READ_SCHEMA = new StructType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,9 @@
package io.delta.kernel.internal.data;

import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import io.delta.kernel.data.ColumnVector;
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.data.Row;
import io.delta.kernel.data.*;
import io.delta.kernel.types.StructType;

/**
Expand Down Expand Up @@ -104,12 +100,12 @@ public Row getStruct(int ordinal) {
}

@Override
public <T> List<T> getArray(int ordinal) {
public ArrayValue getArray(int ordinal) {
return columnVector(ordinal).getArray(rowId);
}

@Override
public <K, V> Map<K, V> getMap(int ordinal) {
public MapValue getMap(int ordinal) {
return columnVector(ordinal).getMap(rowId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
package io.delta.kernel.internal.data;

import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
import static java.util.Objects.requireNonNull;

import io.delta.kernel.data.ArrayValue;
import io.delta.kernel.data.MapValue;
import io.delta.kernel.data.Row;
import io.delta.kernel.types.*;

Expand All @@ -31,6 +32,13 @@ public class GenericRow implements Row {
private final StructType schema;
private final Map<Integer, Object> ordinalToValue;


/**
* @param schema the schema of the row
* @param ordinalToValue a mapping of column ordinal to objects; for each column the object
* must be of the return type corresponding to the data type's getter
* method in the Row interface
*/
public GenericRow(StructType schema, Map<Integer, Object> ordinalToValue) {
this.schema = requireNonNull(schema, "schema is null");
this.ordinalToValue = requireNonNull(ordinalToValue, "ordinalToValue is null");
Expand Down Expand Up @@ -113,17 +121,17 @@ public Row getStruct(int ordinal) {
}

@Override
public <T> List<T> getArray(int ordinal) {
public ArrayValue getArray(int ordinal) {
// TODO: not sufficient check, also need to check the element type
throwIfUnsafeAccess(ordinal, ArrayType.class, "array");
return (List<T>) getValue(ordinal);
return (ArrayValue) getValue(ordinal);
}

@Override
public <K, V> Map<K, V> getMap(int ordinal) {
public MapValue getMap(int ordinal) {
// TODO: not sufficient check, also need to check the element types
throwIfUnsafeAccess(ordinal, MapType.class, "map");
return (Map<K, V>) getValue(ordinal);
return (MapValue) getValue(ordinal);
}

private Object getValue(int ordinal) {
Expand Down
Loading

0 comments on commit 081ee55

Please sign in to comment.