Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel] Refactor the Array and Map return types in ColumnVector and Row #2087

Merged
merged 10 commits into from
Oct 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ lazy val kernelApi = (project in file("kernel/kernel-api"))

"com.fasterxml.jackson.core" % "jackson-databind" % "2.13.5" % "test",
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"junit" % "junit" % "4.11" % "test",
"junit" % "junit" % "4.13" % "test",
"com.novocode" % "junit-interface" % "0.11" % "test"
),

Expand Down Expand Up @@ -255,7 +255,7 @@ lazy val kernelDefaults = (project in file("kernel/kernel-defaults"))
"org.apache.parquet" % "parquet-hadoop" % "1.12.3",

"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"junit" % "junit" % "4.11" % "test",
"junit" % "junit" % "4.13" % "test",
"commons-io" % "commons-io" % "2.8.0" % "test",
"com.novocode" % "junit-interface" % "0.11" % "test"
),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.data;

/**
* Abstraction to represent a single array value in a {@link ColumnVector}.
*/
public interface ArrayValue {
/**
* The number of elements in the array
*/
int getSize();

/**
* A {@link ColumnVector} containing the array elements with exactly
* {@link ArrayValue#getSize()} elements.
*/
ColumnVector getElements();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
package io.delta.kernel.data;

import java.math.BigDecimal;
import java.util.List;
import java.util.Map;

import io.delta.kernel.annotation.Evolving;
import io.delta.kernel.types.DataType;
Expand Down Expand Up @@ -162,15 +160,10 @@ default BigDecimal getDecimal(int rowId) {
}

/**
* Return the map type value located at {@code rowId}. The return value is undefined and can be
* anything, if the slot for {@code rowId} is null.
*
* @param rowId
* @param <K> Return map key type
* @param <V> Return map value type
* @return
* Return the map value located at {@code rowId}. Returns null if the slot for {@code rowId}
* is null
*/
default <K, V> Map<K, V> getMap(int rowId) {
default MapValue getMap(int rowId) {
throw new UnsupportedOperationException("Invalid value request for data type");
}

Expand All @@ -186,14 +179,10 @@ default Row getStruct(int rowId) {
}

/**
* Return the array value located at {@code rowId}. The return value is undefined and can be
* anything, if the slot for {@code rowId} is null.
*
* @param rowId
* @param <T> Array element type
* @return
* Return the array value located at {@code rowId}. Returns null if the slot for {@code rowId}
* is null
*/
default <T> List<T> getArray(int rowId) {
default ArrayValue getArray(int rowId) {
throw new UnsupportedOperationException("Invalid value request for data type");
}

Expand Down
40 changes: 40 additions & 0 deletions kernel/kernel-api/src/main/java/io/delta/kernel/data/MapValue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.data;

/**
* Abstraction to represent a single map value in a {@link ColumnVector}.
*/
public interface MapValue {
/**
* The number of elements in the map
*/
int getSize();

/**
* A {@link ColumnVector} containing the keys. There are exactly {@link MapValue#getSize()} keys
* in the vector, and each key maps one-to-one to the value at the same index in
* {@link MapValue#getValues()}.
*/
ColumnVector getKeys();

/**
* A {@link ColumnVector} containing the values. There are exactly {@link MapValue#getSize()}
* values in the vector, and maps one-to-one to the keys in {@link MapValue#getKeys()}
*/
ColumnVector getValues();

}
6 changes: 2 additions & 4 deletions kernel/kernel-api/src/main/java/io/delta/kernel/data/Row.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
package io.delta.kernel.data;

import java.math.BigDecimal;
import java.util.List;
import java.util.Map;

import io.delta.kernel.annotation.Evolving;
import io.delta.kernel.types.StructType;
Expand Down Expand Up @@ -112,11 +110,11 @@ public interface Row {
* Return array value of the column located at the given ordinal.
* Throws error if the column at given ordinal is not of array type,
*/
<T> List<T> getArray(int ordinal);
ArrayValue getArray(int ordinal);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would getArrayValue and getMapValue be better APIs?

Since getMap makes me think it will return a java.util.Map

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I necessarily agree with

getMap makes me think it will return a java.util.Map

But I'm relatively indifferent about changing this if everyone is in agreement. Noting that Spark's method is ColumnVector::getMap --> ColumnarMap.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use getArrayValue etc. Map and Array ubiquitously refer to java.util.Map and the java array type

Copy link
Collaborator Author

@allisonport-db allisonport-db Oct 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's followup on this when we do the API review (easy to change in a followup PR)


/**
* Return map value of the column located at the given ordinal.
* Throws error if the column at given ordinal is not of map type,
*/
<K, V> Map<K, V> getMap(int ordinal);
MapValue getMap(int ordinal);
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
* <li>Name: <code>element_at</code>
* <ul>
* <li>Semantic: <code>element_at(map, key)</code>. Return the value of given <i>key</i>
* from the <i>map</i> type input. Ex: `element_at(map(1, 'a', 2, 'b'), 2)` returns 'b'</li>
* from the <i>map</i> type input. Returns <i>null</i> if the given <i>key</i> is not in
* the <i>map</i> Ex: `element_at(map(1, 'a', 2, 'b'), 2)` returns 'b'</li>
* <li>Since version: 3.0.0</li>
* </ul>
* </li>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.delta.kernel.types.StringType;
import io.delta.kernel.types.StructField;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.VectorUtils;

import io.delta.kernel.internal.actions.AddFile;
import io.delta.kernel.internal.actions.DeletionVectorDescriptor;
Expand Down Expand Up @@ -111,7 +112,7 @@ public static FileStatus getAddFileStatus(Row scanFileInfo) {
*/
public static Map<String, String> getPartitionValues(Row scanFileInfo) {
Row addFile = getAddFileEntry(scanFileInfo);
return addFile.getMap(ADD_FILE_PARTITION_VALUES_ORDINAL);
return VectorUtils.toJavaMap(addFile.getMap(ADD_FILE_PARTITION_VALUES_ORDINAL));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.delta.kernel.types.TimestampType;
import io.delta.kernel.utils.CloseableIterator;
import io.delta.kernel.utils.Tuple2;
import io.delta.kernel.utils.VectorUtils;

import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
Expand Down Expand Up @@ -86,7 +87,8 @@ public Scan build() {
// TODO: support timestamp type partition columns
// Timestamp partition columns have complicated semantics related to timezones so block this
// for now
List<String> partitionCols = protocolAndMetadata.get()._2.getPartitionColumns();
List<String> partitionCols = VectorUtils.toJavaList(
protocolAndMetadata.get()._2.getPartitionColumns());
for (String colName : partitionCols) {
if (readSchema.indexOf(colName) >= 0 &&
readSchema.get(colName).getDataType() instanceof TimestampType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public Row getScanState(TableClient tableClient) {
readSchema,
snapshotSchema,
protocolAndMetadata.get()._2.getConfiguration()
.getOrDefault("delta.columnMapping.mode", "none")
.getOrDefault("delta.columnMapping.mode", "none")
)
),
dataPath.toUri().toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,23 @@
package io.delta.kernel.internal.actions;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static java.util.Objects.requireNonNull;

import io.delta.kernel.client.TableClient;
import io.delta.kernel.data.ArrayValue;
import io.delta.kernel.data.MapValue;
import io.delta.kernel.data.Row;
import io.delta.kernel.types.ArrayType;
import io.delta.kernel.types.LongType;
import io.delta.kernel.types.MapType;
import io.delta.kernel.types.StringType;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.VectorUtils;
import static io.delta.kernel.utils.Utils.requireNonNull;

import io.delta.kernel.internal.lang.Lazy;
import io.delta.kernel.internal.types.TableSchemaSerDe;

public class Metadata {
Expand Down Expand Up @@ -66,17 +69,18 @@ public static Metadata fromRow(Row row, TableClient tableClient) {
.add("createdTime", LongType.INSTANCE, true /* contains null */)
.add("configuration",
new MapType(StringType.INSTANCE, StringType.INSTANCE, false),
false /* contains null */);
false /* nullable */);

private final String id;
private final Optional<String> name;
private final Optional<String> description;
private final Format format;
private final String schemaString;
private final StructType schema;
private final List<String> partitionColumns;
private final ArrayValue partitionColumns;
private final Optional<Long> createdTime;
private final Map<String, String> configuration;
private final MapValue configurationMapValue;
private final Lazy<Map<String, String>> configuration;

public Metadata(
String id,
Expand All @@ -85,19 +89,19 @@ public Metadata(
Format format,
String schemaString,
StructType schema,
List<String> partitionColumns,
ArrayValue partitionColumns,
Optional<Long> createdTime,
Map<String, String> configuration) {
MapValue configurationMapValue) {
this.id = requireNonNull(id, "id is null");
this.name = name;
this.description = requireNonNull(description, "description is null");
this.format = requireNonNull(format, "format is null");
this.schemaString = requireNonNull(schemaString, "schemaString is null");
this.schema = schema;
this.partitionColumns =
partitionColumns == null ? Collections.emptyList() : partitionColumns;
this.partitionColumns = requireNonNull(partitionColumns, "partitionColumns is null");
this.createdTime = createdTime;
this.configuration = configuration == null ? Collections.emptyMap() : configuration;
this.configurationMapValue = requireNonNull(configurationMapValue, "configuration is null");
this.configuration = new Lazy<>(() -> VectorUtils.toJavaMap(configurationMapValue));
}

public String getSchemaString() {
Expand All @@ -108,7 +112,7 @@ public StructType getSchema() {
return schema;
}

public List<String> getPartitionColumns() {
public ArrayValue getPartitionColumns() {
return partitionColumns;
}

Expand All @@ -132,7 +136,11 @@ public Optional<Long> getCreatedTime() {
return createdTime;
}

public MapValue getConfigurationMapValue() {
return configurationMapValue;
}

public Map<String, String> getConfiguration() {
return configuration;
return Collections.unmodifiableMap(configuration.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.delta.kernel.types.IntegerType;
import io.delta.kernel.types.StringType;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.VectorUtils;

public class Protocol {
public static Protocol fromRow(Row row) {
Expand All @@ -32,8 +33,10 @@ public static Protocol fromRow(Row row) {
return new Protocol(
row.getInt(0),
row.getInt(1),
row.isNullAt(2) ? Collections.emptyList() : row.getArray(2),
row.isNullAt(3) ? Collections.emptyList() : row.getArray(3));
row.isNullAt(2) ? Collections.emptyList() :
VectorUtils.toJavaList(row.getArray(2)),
row.isNullAt(3) ? Collections.emptyList() :
VectorUtils.toJavaList(row.getArray(3)));
}

public static final StructType READ_SCHEMA = new StructType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,9 @@
package io.delta.kernel.internal.data;

import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import io.delta.kernel.data.ColumnVector;
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.data.Row;
import io.delta.kernel.data.*;
import io.delta.kernel.types.StructType;

/**
Expand Down Expand Up @@ -104,12 +100,12 @@ public Row getStruct(int ordinal) {
}

@Override
public <T> List<T> getArray(int ordinal) {
public ArrayValue getArray(int ordinal) {
return columnVector(ordinal).getArray(rowId);
}

@Override
public <K, V> Map<K, V> getMap(int ordinal) {
public MapValue getMap(int ordinal) {
return columnVector(ordinal).getMap(rowId);
}

Expand Down
Loading