diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index 4ed59b08a467c..cb51cb499eede 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -221,15 +221,21 @@ private void decodeDictionaryIds(int rowId, int num, ColumnVector column, if (column.dataType() == DataTypes.IntegerType || DecimalType.is32BitDecimalType(column.dataType())) { for (int i = rowId; i < rowId + num; ++i) { - column.putInt(i, dictionary.decodeToInt(dictionaryIds.getDictId(i))); + if (!column.isNullAt(i)) { + column.putInt(i, dictionary.decodeToInt(dictionaryIds.getDictId(i))); + } } } else if (column.dataType() == DataTypes.ByteType) { for (int i = rowId; i < rowId + num; ++i) { - column.putByte(i, (byte) dictionary.decodeToInt(dictionaryIds.getDictId(i))); + if (!column.isNullAt(i)) { + column.putByte(i, (byte) dictionary.decodeToInt(dictionaryIds.getDictId(i))); + } } } else if (column.dataType() == DataTypes.ShortType) { for (int i = rowId; i < rowId + num; ++i) { - column.putShort(i, (short) dictionary.decodeToInt(dictionaryIds.getDictId(i))); + if (!column.isNullAt(i)) { + column.putShort(i, (short) dictionary.decodeToInt(dictionaryIds.getDictId(i))); + } } } else { throw new UnsupportedOperationException("Unimplemented type: " + column.dataType()); @@ -240,7 +246,9 @@ private void decodeDictionaryIds(int rowId, int num, ColumnVector column, if (column.dataType() == DataTypes.LongType || DecimalType.is64BitDecimalType(column.dataType())) { for (int i = rowId; i < rowId + num; ++i) { - column.putLong(i, dictionary.decodeToLong(dictionaryIds.getDictId(i))); + if (!column.isNullAt(i)) { + column.putLong(i, dictionary.decodeToLong(dictionaryIds.getDictId(i))); + } } } else { throw new UnsupportedOperationException("Unimplemented type: " + column.dataType()); @@ -249,21 +257,27 @@ private void decodeDictionaryIds(int rowId, int num, ColumnVector column, case FLOAT: for (int i = rowId; i < rowId + num; ++i) { - column.putFloat(i, dictionary.decodeToFloat(dictionaryIds.getDictId(i))); + if (!column.isNullAt(i)) { + column.putFloat(i, dictionary.decodeToFloat(dictionaryIds.getDictId(i))); + } } break; case DOUBLE: for (int i = rowId; i < rowId + num; ++i) { - column.putDouble(i, dictionary.decodeToDouble(dictionaryIds.getDictId(i))); + if (!column.isNullAt(i)) { + column.putDouble(i, dictionary.decodeToDouble(dictionaryIds.getDictId(i))); + } } break; case INT96: if (column.dataType() == DataTypes.TimestampType) { for (int i = rowId; i < rowId + num; ++i) { // TODO: Convert dictionary of Binaries to dictionary of Longs - Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i)); - column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v)); + if (!column.isNullAt(i)) { + Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i)); + column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v)); + } } } else { throw new UnsupportedOperationException(); @@ -275,26 +289,34 @@ private void decodeDictionaryIds(int rowId, int num, ColumnVector column, // and reuse it across batches. This should mean adding a ByteArray would just update // the length and offset. for (int i = rowId; i < rowId + num; ++i) { - Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i)); - column.putByteArray(i, v.getBytes()); + if (!column.isNullAt(i)) { + Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i)); + column.putByteArray(i, v.getBytes()); + } } break; case FIXED_LEN_BYTE_ARRAY: // DecimalType written in the legacy mode if (DecimalType.is32BitDecimalType(column.dataType())) { for (int i = rowId; i < rowId + num; ++i) { - Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i)); - column.putInt(i, (int) ParquetRowConverter.binaryToUnscaledLong(v)); + if (!column.isNullAt(i)) { + Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i)); + column.putInt(i, (int) ParquetRowConverter.binaryToUnscaledLong(v)); + } } } else if (DecimalType.is64BitDecimalType(column.dataType())) { for (int i = rowId; i < rowId + num; ++i) { - Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i)); - column.putLong(i, ParquetRowConverter.binaryToUnscaledLong(v)); + if (!column.isNullAt(i)) { + Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i)); + column.putLong(i, ParquetRowConverter.binaryToUnscaledLong(v)); + } } } else if (DecimalType.isByteArrayDecimalType(column.dataType())) { for (int i = rowId; i < rowId + num; ++i) { - Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i)); - column.putByteArray(i, v.getBytes()); + if (!column.isNullAt(i)) { + Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i)); + column.putByteArray(i, v.getBytes()); + } } } else { throw new UnsupportedOperationException();