Skip to content

Commit

Permalink
address review
Browse files Browse the repository at this point in the history
  • Loading branch information
vkorukanti committed Jun 22, 2023
1 parent 4001179 commit ce05199
Show file tree
Hide file tree
Showing 16 changed files with 191 additions and 207 deletions.
92 changes: 72 additions & 20 deletions kernel/kernel-api/src/main/java/io/delta/kernel/utils/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,30 +28,36 @@
import io.delta.kernel.types.StringType;
import io.delta.kernel.types.StructType;

public class Utils {
public class Utils
{
/**
* Utility method to create a singleton {@link CloseableIterator}.
*
* @param elem Element to create iterator with.
* @param <T> Element type.
* @param <T> Element type.
* @return A {@link CloseableIterator} with just one element.
*/
public static <T> CloseableIterator<T> singletonCloseableIterator(T elem) {
return new CloseableIterator<T>() {
public static <T> CloseableIterator<T> singletonCloseableIterator(T elem)
{
return new CloseableIterator<T>()
{
private boolean accessed;

@Override
public void close() throws IOException {
public void close() throws IOException
{
// nothing to close
}

@Override
public boolean hasNext() {
public boolean hasNext()
{
return !accessed;
}

@Override
public T next() {
public T next()
{
accessed = true;
return elem;
}
Expand All @@ -61,14 +67,17 @@ public T next() {
/**
* Convert a {@link Iterator} to {@link CloseableIterator}. Useful when passing normal iterators
* for arguments that require {@link CloseableIterator} type.
*
* @param iter {@link Iterator} instance
* @param <T> Element type
* @return A {@link CloseableIterator} wrapping the given {@link Iterator}
*/
public static <T> CloseableIterator<T> toCloseableIterator(Iterator<T> iter) {
return new CloseableIterator<T>() {
public static <T> CloseableIterator<T> toCloseableIterator(Iterator<T> iter)
{
return new CloseableIterator<T>()
{
@Override
public void close() { }
public void close() {}

@Override
public boolean hasNext()
Expand All @@ -91,30 +100,36 @@ public T next()
* @return A {@link ColumnVector} with a single element {@code value}
*/
// TODO: add String to method name or make generic?
public static ColumnVector singletonColumnVector(String value) {
return new ColumnVector() {
public static ColumnVector singletonColumnVector(String value)
{
return new ColumnVector()
{
@Override
public DataType getDataType()
{
return StringType.INSTANCE;
}

@Override
public int getSize() {
public int getSize()
{
return 1;
}

@Override
public void close() {
public void close()
{
}

@Override
public boolean isNullAt(int rowId) {
public boolean isNullAt(int rowId)
{
return value == null;
}

@Override
public String getString(int rowId) {
public String getString(int rowId)
{
if (rowId != 0) {
throw new IllegalArgumentException("Invalid row id: " + rowId);
}
Expand All @@ -130,7 +145,8 @@ public String getString(int rowId) {
* @param scanState Scan state {@link Row}
* @return Physical schema to read from the data files.
*/
public static StructType getPhysicalSchema(Row scanState) {
public static StructType getPhysicalSchema(Row scanState)
{
// TODO needs io.delta.kernel.internal.data.ScanStateRow
throw new UnsupportedOperationException("not implemented yet");
}
Expand All @@ -142,7 +158,8 @@ public static StructType getPhysicalSchema(Row scanState) {
* @param scanFileInfo {@link Row} representing one scan file.
* @return a {@link FileStatus} object created from the given scan file row.
*/
public static FileStatus getFileStatus(Row scanFileInfo) {
public static FileStatus getFileStatus(Row scanFileInfo)
{
String path = scanFileInfo.getString(0);
Long size = scanFileInfo.getLong(2);

Expand All @@ -151,13 +168,48 @@ public static FileStatus getFileStatus(Row scanFileInfo) {

/**
* Close the iterator.
*
* @param i1
*/
public static void safeClose(CloseableIterator i1) {
public static void safeClose(CloseableIterator i1)
{
try {
i1.close();
} catch (IOException ioe) {
}
catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}

/**
* Close the given one or more {@link CloseableIterator}s. {@link CloseableIterator#close()}
* will be called on all given non-null iterators. Will throw unchecked {@link RuntimeException}
* if an error occurs while closing. If multiple iterators causes exceptions in closing, the
* exceptions will be added as suppressed to the main exception that is thrown.
*
* @param iters
*/
public static void closeIterators(CloseableIterator<? extends Object>... iters)
{
RuntimeException exception = null;
for (CloseableIterator<? extends Object> iter : iters) {
if (iter == null) {
continue;
}
try {
iter.close();
}
catch (Exception ex) {
if (exception == null) {
exception = new RuntimeException(ex);
}
else {
exception.addSuppressed(ex);
}
}
}
if (exception != null) {
throw exception;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
Expand All @@ -38,33 +40,41 @@ private DefaultKernelUtils() {}
* @return
*/
public static final MessageType pruneSchema(
MessageType fileSchema, // parquet
GroupType fileSchema, // parquet
StructType deltaType) // delta-core
{
return deltaType.fields().stream()
.map(column -> {
Type type = findStructField(fileSchema, column);
Type type = findSubFieldType(fileSchema, column);
if (type == null) {
return null;
}
Type prunedSubfields = pruneSubfields(type, column.getDataType());
return new MessageType(column.getName(), prunedSubfields);
})
.filter(type -> type != null)
.filter(Objects::nonNull)
.reduce(MessageType::union)
.get();
}

private static Type findStructField(MessageType fileSchema, StructField column)
/**
* Search for the Parquet type in {@code groupType} of subfield which is equivalent to
* given {@code field}.
*
* @param groupType Parquet group type coming from the file schema.
* @param field Sub field given as Delta Kernel's {@link StructField}
* @return {@link Type} of the Parquet field. Returns {@code null}, if not found.
*/
public static Type findSubFieldType(GroupType groupType, StructField field)
{
// TODO: Need a way to search by id once we start supporting column mapping `id` mode.
final String columnName = column.getName();
if (fileSchema.containsField(columnName)) {
return fileSchema.getType(columnName);
final String columnName = field.getName();
if (groupType.containsField(columnName)) {
return groupType.getType(columnName);
}
// Parquet is case-sensitive, but hive is not. all hive columns get converted to lowercase
// check for direct match above but if no match found, try case-insensitive match
for (org.apache.parquet.schema.Type type : fileSchema.getFields()) {
// Parquet is case-sensitive, but the engine that generated the parquet file may not be.
// Check for direct match above but if no match found, try case-insensitive match.
for (org.apache.parquet.schema.Type type : groupType.getFields()) {
if (type.getName().equalsIgnoreCase(columnName)) {
return type;
}
Expand All @@ -81,20 +91,12 @@ private static Type pruneSubfields(Type type, DataType deltaDatatype)
}

GroupType groupType = (GroupType) type;
StructType deltaStructType = (StructType) deltaDatatype;
List<Type> newParquetSubFields = new ArrayList<>();
for (StructField subField : deltaStructType.fields()) {
String subFieldName = subField.getName();
Type parquetSubFieldType = groupType.getType(subFieldName);
if (parquetSubFieldType == null) {
for (org.apache.parquet.schema.Type typeTemp : groupType.getFields()) {
if (typeTemp.getName().equalsIgnoreCase(subFieldName)) {
parquetSubFieldType = type;
}
}
}
newParquetSubFields.add(parquetSubFieldType);
}
List<Type> newParquetSubFields =
((StructType) deltaDatatype).fields().stream()
.map(structField -> findSubFieldType(groupType, structField))
.filter(Objects::nonNull)
.collect(Collectors.toList());

return groupType.withNewFields(newParquetSubFields);
}

Expand Down Expand Up @@ -158,30 +160,4 @@ public static void checkState(boolean isValid, String message)
throw new IllegalStateException(message);
}
}

/**
* Search for the Parquet type for in the {@code groupType} for the field equilant to
* {@code field}.
*
* @param groupType Parquet group type coming from the file schema.
* @param field Sub field given as Delta Kernel's {@link StructField}
* @return {@link Type} of the Parquet field. Returns {@code null}, if not found.
*/
public static Type findFieldType(GroupType groupType, StructField field)
{
// TODO: Need a way to search by id once we start supporting column mapping `id` mode.
final String columnName = field.getName();
if (groupType.containsField(columnName)) {
return groupType.getType(columnName);
}
// Parquet is case-sensitive, but hive is not. all hive columns get converted to lowercase
// check for direct match above but if no match found, try case-insensitive match
for (org.apache.parquet.schema.Type type : groupType.getFields()) {
if (type.getName().equalsIgnoreCase(columnName)) {
return type;
}
}

return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ public boolean hasNext()
@Override
public FileReadContext next()
{
return () -> fileIter.next();
Row row = fileIter.next();
return () -> row;
}
};
}
Expand All @@ -73,19 +74,15 @@ public CloseableIterator<FileDataReadResult> readParquetFiles(
{
return new CloseableIterator<FileDataReadResult>()
{
private final ParquetBatchReader batchReader = new ParquetBatchReader(hadoopConf);
private FileReadContext currentFile;
private CloseableIterator<ColumnarBatch> currentFileReader;

@Override
public void close()
throws IOException
{
if (currentFileReader != null) {
currentFileReader.close();
}

fileIter.close();
// TODO: implement safe close of multiple closeables.
Utils.closeIterators(currentFileReader, fileIter);
}

@Override
Expand All @@ -95,10 +92,10 @@ public boolean hasNext()
// initialize the next file reader or return false if there are no more files to
// read.
if (currentFileReader == null || !currentFileReader.hasNext()) {
Utils.closeIterators(currentFileReader);
if (fileIter.hasNext()) {
currentFile = fileIter.next();
FileStatus fileStatus = Utils.getFileStatus(currentFile.getScanFileRow());
ParquetBatchReader batchReader = new ParquetBatchReader(hadoopConf);
currentFileReader = batchReader.read(fileStatus.getPath(), physicalSchema);
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,10 @@ public DefaultBinaryVector(DataType dataType, int size, byte[][] values)
{
super(size, dataType, Optional.empty());
checkArgument(dataType instanceof StringType || dataType instanceof BinaryType,
"invalid type");
"invalid type for binary vector: " + dataType);
this.values = requireNonNull(values, "values is null");
checkArgument(values.length >= size,
"invalid number of values (%s) for given size (%s)", values.length, size);
checkArgument(values.length >= 0, "invalid vector size: %s", values.length);
}

@Override
Expand Down Expand Up @@ -90,6 +89,9 @@ public String getString(int rowId)
@Override
public byte[] getBinary(int rowId)
{
if (!(getDataType() instanceof BinaryType)) {
throw unsupportedDataAccessException("binary");
}
checkValidRowId(rowId);
return values[rowId];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ public DefaultBooleanVector(int size, Optional<boolean[]> nullability, boolean[]
{
super(size, BooleanType.INSTANCE, nullability);
this.values = requireNonNull(values, "values is null");
checkArgument(values.length >= 0, "invalid vector size: %s", values.length);
checkArgument(values.length >= size,
"invalid number of values (%s) for given size (%s)", values.length, size);
if (nullability.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,8 @@ public DefaultByteVector(int size, Optional<boolean[]> nullability, byte[] value
{
super(size, ByteType.INSTANCE, nullability);
this.values = requireNonNull(values, "values is null");
checkArgument(values.length >= 0, "invalid vector size: %s", values.length);
checkArgument(values.length >= size,
"invalid number of values (%s) for given size (%s)", values.length, size);
if (nullability.isPresent()) {
checkArgument(values.length == nullability.get().length,
"vector element components are not of same size" +
"value array size = %s, nullability array size = %s",
values.length, nullability.get().length
);
}
}

/**
Expand Down
Loading

0 comments on commit ce05199

Please sign in to comment.