Skip to content

Commit

Permalink
Rebase & respond to some comments
Browse files Browse the repository at this point in the history
  • Loading branch information
allisonport-db committed Sep 28, 2023
1 parent d5b7d30 commit f72d5fb
Show file tree
Hide file tree
Showing 11 changed files with 87 additions and 35 deletions.
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ lazy val kernelApi = (project in file("kernel/kernel-api"))

"com.fasterxml.jackson.core" % "jackson-databind" % "2.13.5" % "test",
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"junit" % "junit" % "4.11" % "test",
"junit" % "junit" % "4.13" % "test",
"com.novocode" % "junit-interface" % "0.11" % "test"
),

Expand Down Expand Up @@ -255,7 +255,7 @@ lazy val kernelDefaults = (project in file("kernel/kernel-defaults"))
"org.apache.parquet" % "parquet-hadoop" % "1.12.3",

"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"junit" % "junit" % "4.11" % "test",
"junit" % "junit" % "4.13" % "test",
"commons-io" % "commons-io" % "2.8.0" % "test",
"com.novocode" % "junit-interface" % "0.11" % "test"
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.delta.kernel.types.StringType;
import io.delta.kernel.types.StructField;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.VectorUtils;

import io.delta.kernel.internal.actions.AddFile;
import io.delta.kernel.internal.actions.DeletionVectorDescriptor;
Expand Down Expand Up @@ -111,7 +112,7 @@ public static FileStatus getAddFileStatus(Row scanFileInfo) {
*/
public static Map<String, String> getPartitionValues(Row scanFileInfo) {
Row addFile = getAddFileEntry(scanFileInfo);
return addFile.getMap(ADD_FILE_PARTITION_VALUES_ORDINAL);
return VectorUtils.toJavaMap(addFile.getMap(ADD_FILE_PARTITION_VALUES_ORDINAL));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ public Scan build() {
// TODO: support timestamp type partition columns
// Timestamp partition columns have complicated semantics related to timezones so block this
// for now
List<String> partitionCols = VectorUtils.toJavaList(protocolAndMetadata.get()._2
.getPartitionColumns());
List<String> partitionCols = VectorUtils.toJavaList(
protocolAndMetadata.get()._2.getPartitionColumns());
for (String colName : partitionCols) {
if (readSchema.indexOf(colName) >= 0 &&
readSchema.get(colName).getDataType() instanceof TimestampType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ public Row getScanState(TableClient tableClient) {
InternalSchemaUtils.convertToPhysicalSchema(
readSchema,
snapshotSchema,
protocolAndMetadata.get()._2.getColumnMappingMode()
protocolAndMetadata.get()._2.getConfiguration()
.getOrDefault("delta.columnMapping.mode", "none")
)
),
dataPath.toUri().toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.delta.kernel.internal.actions;

import java.util.Map;
import java.util.Optional;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -77,8 +78,8 @@ public static Metadata fromRow(Row row, TableClient tableClient) {
private final StructType schema;
private final ArrayValue partitionColumns;
private final Optional<Long> createdTime;
private final MapValue configuration;
private final Lazy<String> columnMappingMode;
private final MapValue configurationMapValue;
private final Lazy<Map<String, String>> configuration;

public Metadata(
String id,
Expand All @@ -89,7 +90,7 @@ public Metadata(
StructType schema,
ArrayValue partitionColumns,
Optional<Long> createdTime,
MapValue configuration) {
MapValue configurationMapValue) {
this.id = requireNonNull(id, "id is null");
this.name = name;
this.description = requireNonNull(description, "description is null");
Expand All @@ -98,12 +99,8 @@ public Metadata(
this.schema = schema;
this.partitionColumns = requireNonNull(partitionColumns, "partitionColumns is null");
this.createdTime = createdTime;
this.configuration = requireNonNull(configuration, "configuration is null");
this.columnMappingMode = new Lazy<>(() ->
VectorUtils.<String, String>toJavaMap(configuration)
.getOrDefault("delta.columnMapping.mode", "none")
);

this.configurationMapValue = requireNonNull(configurationMapValue, "configuration is null");
this.configuration = new Lazy<>(() -> VectorUtils.toJavaMap(configurationMapValue));
}

public String getSchemaString() {
Expand Down Expand Up @@ -138,11 +135,11 @@ public Optional<Long> getCreatedTime() {
return createdTime;
}

public MapValue getConfiguration() {
return configuration;
public MapValue getConfigurationMapValue() {
return configurationMapValue;
}

public String getColumnMappingMode() {
return columnMappingMode.get();
public Map<String, String> getConfiguration() {
return configuration.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public static ScanStateRow of(
String readSchemaPhysicalJson,
String tablePath) {
HashMap<Integer, Object> valueMap = new HashMap<>();
valueMap.put(COL_NAME_TO_ORDINAL.get("configuration"), metadata.getConfiguration());
valueMap.put(COL_NAME_TO_ORDINAL.get("configuration"), metadata.getConfigurationMapValue());
valueMap.put(COL_NAME_TO_ORDINAL.get("logicalSchemaString"), readSchemaLogicalJson);
valueMap.put(COL_NAME_TO_ORDINAL.get("physicalSchemaString"), readSchemaPhysicalJson);
valueMap.put(COL_NAME_TO_ORDINAL.get("partitionColumns"), metadata.getPartitionColumns());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,8 @@ private void validateSupportedTable(Protocol protocol, Metadata metadata) {

private void verifySupportedColumnMappingMode(Metadata metadata) {
// Check if the mode is name. Id mode is not yet supported
String cmMode = metadata.getColumnMappingMode();
String cmMode = metadata.getConfiguration()
.getOrDefault("delta.columnMapping.mode", "none");
if (!"none".equalsIgnoreCase(cmMode) &&
!"name".equalsIgnoreCase(cmMode)) {
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,11 @@ private static Object getValueAsObject(
} else if (dataType instanceof ShortType) {
return columnVector.getShort(rowId);
} else if (dataType instanceof IntegerType || dataType instanceof DateType) {
// DateType data is stored internally as the number of days since 1970-01-01
return columnVector.getInt(rowId);
} else if (dataType instanceof LongType || dataType instanceof TimestampType) {
// TimestampType data is stored internally as the number of microseconds since the unix
// epoch
return columnVector.getLong(rowId);
} else if (dataType instanceof FloatType) {
return columnVector.getFloat(rowId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static java.lang.String.format;

import io.delta.kernel.data.ColumnVector;
import io.delta.kernel.data.MapValue;
import io.delta.kernel.expressions.Expression;
import io.delta.kernel.expressions.ScalarExpression;
import io.delta.kernel.types.DataType;
Expand Down Expand Up @@ -73,7 +74,7 @@ static ColumnVector eval(ColumnVector map, ColumnVector lookupKey) {
// The general pattern is call `isNullAt(rowId)` followed by `getString`.
// So the cache of one value is enough.
private int lastLookupRowId = -1;
private Object lastLookupValue = null;
private String lastLookupValue = null;

@Override
public DataType getDataType() {
Expand Down Expand Up @@ -101,20 +102,31 @@ public boolean isNullAt(int rowId) {
@Override
public String getString(int rowId) {
lookupValue(rowId);
return lastLookupValue == null ? null : (String) lastLookupValue;
return lastLookupValue == null ? null : lastLookupValue;
}

private Object lookupValue(int rowId) {
if (rowId == lastLookupRowId) {
return lastLookupValue;
}
// TODO: this needs to be updated after the new way of accessing the complex
// types is merged.
lastLookupRowId = rowId;
String keyValue = lookupKey.getString(rowId);
lastLookupValue = map.getMap(rowId).get(keyValue);
lastLookupValue = findValueForKey(map.getMap(rowId), keyValue);
return lastLookupValue;
}

private String findValueForKey(MapValue mapValue, String key) {
ColumnVector keyVector = mapValue.getKeys();
for (int i = 0; i < mapValue.getSize(); i++) {
if ((keyVector.isNullAt(i) && key == null) ||
(!keyVector.isNullAt(i) && keyVector.getString(i).equals(key))) {
return mapValue.getValues().isNullAt(i) ? null :
mapValue.getValues().getString(i);
}
}
// TODO what is the behavior for element_at if not in the map?
return null;
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import io.delta.kernel.data.*;
Expand Down Expand Up @@ -568,8 +569,8 @@ private static <T> void checkArrayValue(
assertEquals(elementVector.getDataType(), expDataType);
// Check the elements are correct
assertEquals(expList, VectorUtils.toJavaList(arrayValue));
// TODO check that you cannot access later elements (can we upgrade JUnit ot 4.13?)
// assertThrows(DefaultKernelTestUtils.getValueAsObject(elementVector, size + 1));
assertThrows(IllegalArgumentException.class,
() -> DefaultKernelTestUtils.getValueAsObject(elementVector, size + 1));
}

private static <K, V> void checkMapValue(
Expand All @@ -586,8 +587,9 @@ private static <K, V> void checkMapValue(
assertEquals(valueVector.getDataType(), valueDataType);
// Check the elements are correct
assertEquals(expMap, VectorUtils.toJavaMap(mapValue));
// TODO check that you cannot access later elements (can we upgrade JUnit ot 4.13?)
// assertThrows(DefaultKernelTestUtils.getValueAsObject(keyVector, size + 1));
// assertThrows(DefaultKernelTestUtils.getValueAsObject(valueVector, size + 1));
assertThrows(IllegalArgumentException.class,
() -> DefaultKernelTestUtils.getValueAsObject(keyVector, size + 1));
assertThrows(IllegalArgumentException.class,
() -> DefaultKernelTestUtils.getValueAsObject(valueVector, size + 1));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ import java.util
import java.util.Optional

import org.scalatest.funsuite.AnyFunSuite

import io.delta.kernel.data.{ColumnarBatch, ColumnVector}
import io.delta.kernel.data.{ColumnarBatch, ColumnVector, MapValue}
import io.delta.kernel.defaults.internal.data.DefaultColumnarBatch
import io.delta.kernel.defaults.internal.data.vector.{DefaultIntVector, DefaultStructVector}
import io.delta.kernel.defaults.utils.DefaultKernelTestUtils.getValueAsObject
Expand Down Expand Up @@ -414,9 +413,45 @@ class DefaultExpressionEvaluatorSuite extends AnyFunSuite with ExpressionSuiteBa

override def isNullAt(rowId: Int): Boolean = testMapValues(rowId) == null

override def getMap[K, V](rowId: Int): util.Map[K, V] =
testMapValues(rowId).asInstanceOf[util.Map[K, V]]
override def getMap(rowId: Int): MapValue = {

// TODO make this a test utils somewhere?
new MapValue() {

private val keyArray = testMapValues(rowId).keySet().asScala.toSeq
private val valueArray = keyArray.map(testMapValues(rowId).get(_))

override def getSize: Int = testMapValues(rowId).size()

override def getKeys: ColumnVector = new ColumnVector {

override def getDataType: DataType = StringType.INSTANCE

override def getSize: Int = testMapValues(rowId).size()

override def close(): Unit = {}

override def isNullAt(rowId: Int): Boolean = keyArray(rowId) == null

override def getString(rowId: Int): String = keyArray(rowId)
}

override def getValues: ColumnVector = new ColumnVector {

override def getDataType: DataType = StringType.INSTANCE

override def getSize: Int = testMapValues(rowId).size()

override def close(): Unit = {}

override def isNullAt(rowId: Int): Boolean = valueArray(rowId) == null

override def getString(rowId: Int): String = valueArray(rowId)
}
}
}
}

val inputBatch = new DefaultColumnarBatch(
testMapVector.getSize,
new StructType().add("partitionValues", testMapVector.getDataType),
Expand Down

0 comments on commit f72d5fb

Please sign in to comment.