Skip to content

Commit

Permalink
Fix Long type in theschema and int32 parquet type
Browse files Browse the repository at this point in the history
  • Loading branch information
Catalin Toda committed Oct 26, 2022
1 parent 1c1eb99 commit 73f742d
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,8 @@ public ParquetVectorUpdater getUpdater(ColumnDescriptor descriptor, DataType spa
case INT32:
if (sparkType == DataTypes.IntegerType || canReadAsIntDecimal(descriptor, sparkType)) {
return new IntegerUpdater();
} else if (sparkType == DataTypes.LongType && isUnsignedIntTypeMatched(32)) {
// In `ParquetToSparkSchemaConverter`, we map parquet UINT32 to our LongType.
// For unsigned int32, it stores as plain signed int32 in Parquet when dictionary
// fallbacks. We read them as long values.
return new UnsignedIntegerUpdater();
} else if (sparkType == DataTypes.LongType) {
return new LongIntegerUpdater();
} else if (sparkType == DataTypes.ByteType) {
return new ByteUpdater();
} else if (sparkType == DataTypes.ShortType) {
Expand Down Expand Up @@ -279,14 +276,14 @@ public void decodeSingleDictionaryId(
}
}

private static class UnsignedIntegerUpdater implements ParquetVectorUpdater {
private static class LongIntegerUpdater implements ParquetVectorUpdater {
@Override
public void readValues(
int total,
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
valuesReader.readUnsignedIntegers(total, values, offset);
valuesReader.readIntegersAsLongs(total, values, offset);
}

@Override
Expand All @@ -299,7 +296,7 @@ public void readValue(
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
values.putLong(offset, Integer.toUnsignedLong(valuesReader.readInteger()));
values.putLong(offset, valuesReader.readInteger());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,16 @@ public void skipIntegers(int total) {
in.skip(total * 4L);
}

@Override
public final void readIntegersAsLongs(int total, WritableColumnVector c, int rowId) {
int requiredBytes = total * 4;
ByteBuffer buffer = getBuffer(requiredBytes);
for (int i = 0; i < total; i += 1) {
c.putLong(rowId + i, buffer.getInt());
}
}


@Override
public final void readUnsignedIntegers(int total, WritableColumnVector c, int rowId) {
int requiredBytes = total * 4;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ public void readIntegers(int total, WritableColumnVector c, int rowId) {
throw new UnsupportedOperationException();
}

@Override
public void readIntegersAsLongs(int total, WritableColumnVector c, int rowId) {
throw new UnsupportedOperationException();
}

@Override
public void readIntegersWithRebase(int total, WritableColumnVector c, int rowId,
boolean failIfRebase) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,11 @@ public void readIntegers(int total, WritableColumnVector c, int rowId) {
}
}

@Override
public void readIntegersAsLongs(int total, WritableColumnVector c, int rowId) {
throw new UnsupportedOperationException("only readInts is valid.");
}

@Override
public void readUnsignedIntegers(int total, WritableColumnVector c, int rowId) {
throw new UnsupportedOperationException("only readInts is valid.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public interface VectorizedValuesReader {
void readShorts(int total, WritableColumnVector c, int rowId);
void readIntegers(int total, WritableColumnVector c, int rowId);
void readIntegersWithRebase(int total, WritableColumnVector c, int rowId, boolean failIfRebase);
void readIntegersAsLongs(int total, WritableColumnVector c, int rowId);
void readUnsignedIntegers(int total, WritableColumnVector c, int rowId);
void readUnsignedLongs(int total, WritableColumnVector c, int rowId);
void readLongs(int total, WritableColumnVector c, int rowId);
Expand Down

0 comments on commit 73f742d

Please sign in to comment.