From 5ecee0b8eaeda1213ddd209bbb55752072de829c Mon Sep 17 00:00:00 2001 From: xianyangliu Date: Mon, 12 Aug 2024 20:16:49 +0800 Subject: [PATCH 1/3] cast to int safety --- .../hadoop/ParquetFileWriteException.java | 32 +++++++++++++++++ .../parquet/hadoop/ParquetFileWriter.java | 34 ++++++++++++------- 2 files changed, 53 insertions(+), 13 deletions(-) create mode 100644 parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriteException.java diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriteException.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriteException.java new file mode 100644 index 0000000000..04952b9e50 --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriteException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop; + +import org.apache.parquet.ParquetRuntimeException; + +/** + * RuntimeException occurs when parquet file writing. + */ +public class ParquetFileWriteException extends ParquetRuntimeException { + private static final long serialVersionUID = 1L; + + public ParquetFileWriteException(String message) { + super(message); + } +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java index 814b98c50f..f7a3efff5d 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java @@ -703,7 +703,7 @@ public void writeDataPage( columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder(); long beforeHeader = out.getPos(); LOG.debug("{}: write data page: {} values", beforeHeader, valueCount); - int compressedPageSize = (int) bytes.size(); + int compressedPageSize = toIntWithCheck(bytes.size()); metadataConverter.writeDataPageV1Header( uncompressedPageSize, compressedPageSize, valueCount, rlEncoding, dlEncoding, valuesEncoding, out); long headerSize = out.getPos() - beforeHeader; @@ -879,7 +879,7 @@ public void writeDataPage( pageHeaderAAD, sizeStatistics); offsetIndexBuilder.add( - (int) (out.getPos() - beforeHeader), + toIntWithCheck(out.getPos() - beforeHeader), rowCount, sizeStatistics != null ? sizeStatistics.getUnencodedByteArrayDataBytes() : Optional.empty()); } @@ -979,7 +979,7 @@ public void writeDataPage( currentChunkFirstDataPage = beforeHeader; } LOG.debug("{}: write data page: {} values", beforeHeader, valueCount); - int compressedPageSize = (int) bytes.size(); + int compressedPageSize = toIntWithCheck(bytes.size()); if (pageWriteChecksumEnabled) { crc.reset(); crcUpdate(bytes); @@ -1209,7 +1209,7 @@ public void writeDataPageV2( BytesInput.concat(repetitionLevels, definitionLevels, compressedData).writeAllTo(out); offsetIndexBuilder.add( - (int) (out.getPos() - beforeHeader), + toIntWithCheck(out.getPos() - beforeHeader), rowCount, sizeStatistics != null ? sizeStatistics.getUnencodedByteArrayDataBytes() : Optional.empty()); } @@ -1626,8 +1626,8 @@ private static void copy(SeekableInputStream from, PositionOutputStream to, long long bytesCopied = 0; byte[] buffer = COPY_BUFFER.get(); while (bytesCopied < length) { - long bytesLeft = length - bytesCopied; - int bytesRead = from.read(buffer, 0, (buffer.length < bytesLeft ? buffer.length : (int) bytesLeft)); + int bytesLeft = Math.toIntExact(length - bytesCopied); + int bytesRead = from.read(buffer, 0, (Math.min(buffer.length, bytesLeft))); if (bytesRead < 0) { throw new IllegalArgumentException("Unexpected end of input file at " + start + bytesCopied); } @@ -1707,12 +1707,12 @@ private static void serializeColumnIndexes( } long offset = out.getPos(); Util.writeColumnIndex(columnIndex, out, columnIndexEncryptor, columnIndexAAD); - column.setColumnIndexReference(new IndexReference(offset, (int) (out.getPos() - offset))); + column.setColumnIndexReference(new IndexReference(offset, toIntWithCheck(out.getPos() - offset))); } } } - private int toIntWithCheck(long size) { + private static int toIntWithCheck(long size) { if ((int) size != size) { throw new ParquetEncodingException( "Cannot write page larger than " + Integer.MAX_VALUE + " bytes: " + size); @@ -1720,6 +1720,14 @@ private int toIntWithCheck(long size) { return (int) size; } + private static int footerSizeToInt(long size) { + if ((int) size != size) { + throw new ParquetFileWriteException( + "Cannot footer size larger than " + Integer.MAX_VALUE + " bytes: " + size); + } + return (int) size; + } + private void mergeColumnStatistics(Statistics statistics, SizeStatistics sizeStatistics) { Preconditions.checkState(currentSizeStatistics != null, "Aggregate size statistics should not be null"); currentSizeStatistics.mergeStatistics(sizeStatistics); @@ -1787,7 +1795,7 @@ private static void serializeOffsetIndexes( out, offsetIndexEncryptor, offsetIndexAAD); - column.setOffsetIndexReference(new IndexReference(offset, (int) (out.getPos() - offset))); + column.setOffsetIndexReference(new IndexReference(offset, toIntWithCheck(out.getPos() - offset))); } } } @@ -1852,7 +1860,7 @@ private static void serializeBloomFilters( } out.write(serializedBitset); - int length = (int) (out.getPos() - offset); + int length = Math.toIntExact(out.getPos() - offset); column.setBloomFilterLength(length); } } @@ -1872,7 +1880,7 @@ private static void serializeFooter( metadataConverter.toParquetMetadata(CURRENT_VERSION, footer); writeFileMetaData(parquetMetadata, out); LOG.debug("{}: footer length = {}", out.getPos(), (out.getPos() - footerIndex)); - BytesUtils.writeIntLittleEndian(out, (int) (out.getPos() - footerIndex)); + BytesUtils.writeIntLittleEndian(out, footerSizeToInt(out.getPos() - footerIndex)); out.write(MAGIC); return; } @@ -1910,7 +1918,7 @@ private static void serializeFooter( out.write(serializedFooter); out.write(signature); LOG.debug("{}: footer and signature length = {}", out.getPos(), (out.getPos() - footerIndex)); - BytesUtils.writeIntLittleEndian(out, (int) (out.getPos() - footerIndex)); + BytesUtils.writeIntLittleEndian(out, footerSizeToInt(out.getPos() - footerIndex)); out.write(MAGIC); return; } @@ -1920,7 +1928,7 @@ private static void serializeFooter( writeFileCryptoMetaData(fileEncryptor.getFileCryptoMetaData(), out); byte[] footerAAD = AesCipher.createFooterAAD(fileEncryptor.getFileAAD()); writeFileMetaData(parquetMetadata, out, fileEncryptor.getFooterEncryptor(), footerAAD); - int combinedMetaDataLength = (int) (out.getPos() - cryptoFooterIndex); + int combinedMetaDataLength = footerSizeToInt(out.getPos() - cryptoFooterIndex); LOG.debug("{}: crypto metadata and footer length = {}", out.getPos(), combinedMetaDataLength); BytesUtils.writeIntLittleEndian(out, combinedMetaDataLength); out.write(EFMAGIC); From 393b94fb0e2723c92967c4eb9ba411fe4c1269c1 Mon Sep 17 00:00:00 2001 From: xianyangliu Date: Tue, 13 Aug 2024 10:38:42 +0800 Subject: [PATCH 2/3] update --- .../main/java/org/apache/parquet/hadoop/ParquetFileWriter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java index f7a3efff5d..ddc8867901 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java @@ -1723,7 +1723,7 @@ private static int toIntWithCheck(long size) { private static int footerSizeToInt(long size) { if ((int) size != size) { throw new ParquetFileWriteException( - "Cannot footer size larger than " + Integer.MAX_VALUE + " bytes: " + size); + "Cannot write footer larger than " + Integer.MAX_VALUE + " bytes: " + size); } return (int) size; } From 1e19b23158cdd8dedb89a7c311fa36e8c0010176 Mon Sep 17 00:00:00 2001 From: xianyangliu Date: Thu, 15 Aug 2024 15:02:45 +0800 Subject: [PATCH 3/3] address comments --- .../parquet/ParquetSizeOverflowException.java | 14 +++--- .../parquet/hadoop/ParquetFileWriter.java | 45 +++++++++---------- 2 files changed, 29 insertions(+), 30 deletions(-) rename parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriteException.java => parquet-common/src/main/java/org/apache/parquet/ParquetSizeOverflowException.java (77%) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriteException.java b/parquet-common/src/main/java/org/apache/parquet/ParquetSizeOverflowException.java similarity index 77% rename from parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriteException.java rename to parquet-common/src/main/java/org/apache/parquet/ParquetSizeOverflowException.java index 04952b9e50..9463909915 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriteException.java +++ b/parquet-common/src/main/java/org/apache/parquet/ParquetSizeOverflowException.java @@ -16,17 +16,19 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.parquet.hadoop; - -import org.apache.parquet.ParquetRuntimeException; +package org.apache.parquet; /** - * RuntimeException occurs when parquet file writing. + * RuntimeException occurs when size overflow. */ -public class ParquetFileWriteException extends ParquetRuntimeException { +public class ParquetSizeOverflowException extends ParquetRuntimeException { private static final long serialVersionUID = 1L; - public ParquetFileWriteException(String message) { + public ParquetSizeOverflowException() { + super(); + } + + public ParquetSizeOverflowException(String message) { super(message); } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java index ddc8867901..f0a912f599 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java @@ -41,6 +41,7 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.parquet.ParquetSizeOverflowException; import org.apache.parquet.Preconditions; import org.apache.parquet.Version; import org.apache.parquet.bytes.ByteBufferAllocator; @@ -703,7 +704,7 @@ public void writeDataPage( columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder(); long beforeHeader = out.getPos(); LOG.debug("{}: write data page: {} values", beforeHeader, valueCount); - int compressedPageSize = toIntWithCheck(bytes.size()); + int compressedPageSize = toIntWithCheck(bytes.size(), "page"); metadataConverter.writeDataPageV1Header( uncompressedPageSize, compressedPageSize, valueCount, rlEncoding, dlEncoding, valuesEncoding, out); long headerSize = out.getPos() - beforeHeader; @@ -879,7 +880,7 @@ public void writeDataPage( pageHeaderAAD, sizeStatistics); offsetIndexBuilder.add( - toIntWithCheck(out.getPos() - beforeHeader), + toIntWithCheck(out.getPos() - beforeHeader, "page"), rowCount, sizeStatistics != null ? sizeStatistics.getUnencodedByteArrayDataBytes() : Optional.empty()); } @@ -979,7 +980,7 @@ public void writeDataPage( currentChunkFirstDataPage = beforeHeader; } LOG.debug("{}: write data page: {} values", beforeHeader, valueCount); - int compressedPageSize = toIntWithCheck(bytes.size()); + int compressedPageSize = toIntWithCheck(bytes.size(), "page"); if (pageWriteChecksumEnabled) { crc.reset(); crcUpdate(bytes); @@ -1146,12 +1147,14 @@ public void writeDataPageV2( SizeStatistics sizeStatistics) throws IOException { state = state.write(); - int rlByteLength = toIntWithCheck(repetitionLevels.size()); - int dlByteLength = toIntWithCheck(definitionLevels.size()); + int rlByteLength = toIntWithCheck(repetitionLevels.size(), "page repetition levels"); + int dlByteLength = toIntWithCheck(definitionLevels.size(), "page definition levels"); - int compressedSize = toIntWithCheck(compressedData.size() + repetitionLevels.size() + definitionLevels.size()); + int compressedSize = + toIntWithCheck(compressedData.size() + repetitionLevels.size() + definitionLevels.size(), "page"); - int uncompressedSize = toIntWithCheck(uncompressedDataSize + repetitionLevels.size() + definitionLevels.size()); + int uncompressedSize = + toIntWithCheck(uncompressedDataSize + repetitionLevels.size() + definitionLevels.size(), "page"); long beforeHeader = out.getPos(); if (currentChunkFirstDataPage < 0) { @@ -1209,7 +1212,7 @@ public void writeDataPageV2( BytesInput.concat(repetitionLevels, definitionLevels, compressedData).writeAllTo(out); offsetIndexBuilder.add( - toIntWithCheck(out.getPos() - beforeHeader), + toIntWithCheck(out.getPos() - beforeHeader, "page"), rowCount, sizeStatistics != null ? sizeStatistics.getUnencodedByteArrayDataBytes() : Optional.empty()); } @@ -1707,23 +1710,16 @@ private static void serializeColumnIndexes( } long offset = out.getPos(); Util.writeColumnIndex(columnIndex, out, columnIndexEncryptor, columnIndexAAD); - column.setColumnIndexReference(new IndexReference(offset, toIntWithCheck(out.getPos() - offset))); + column.setColumnIndexReference( + new IndexReference(offset, toIntWithCheck(out.getPos() - offset, "page"))); } } } - private static int toIntWithCheck(long size) { + private static int toIntWithCheck(long size, String obj) { if ((int) size != size) { - throw new ParquetEncodingException( - "Cannot write page larger than " + Integer.MAX_VALUE + " bytes: " + size); - } - return (int) size; - } - - private static int footerSizeToInt(long size) { - if ((int) size != size) { - throw new ParquetFileWriteException( - "Cannot write footer larger than " + Integer.MAX_VALUE + " bytes: " + size); + throw new ParquetSizeOverflowException( + String.format("Cannot write %s larger than %s bytes: %s", obj, Integer.MAX_VALUE, size)); } return (int) size; } @@ -1795,7 +1791,8 @@ private static void serializeOffsetIndexes( out, offsetIndexEncryptor, offsetIndexAAD); - column.setOffsetIndexReference(new IndexReference(offset, toIntWithCheck(out.getPos() - offset))); + column.setOffsetIndexReference( + new IndexReference(offset, toIntWithCheck(out.getPos() - offset, "page"))); } } } @@ -1880,7 +1877,7 @@ private static void serializeFooter( metadataConverter.toParquetMetadata(CURRENT_VERSION, footer); writeFileMetaData(parquetMetadata, out); LOG.debug("{}: footer length = {}", out.getPos(), (out.getPos() - footerIndex)); - BytesUtils.writeIntLittleEndian(out, footerSizeToInt(out.getPos() - footerIndex)); + BytesUtils.writeIntLittleEndian(out, toIntWithCheck(out.getPos() - footerIndex, "footer")); out.write(MAGIC); return; } @@ -1918,7 +1915,7 @@ private static void serializeFooter( out.write(serializedFooter); out.write(signature); LOG.debug("{}: footer and signature length = {}", out.getPos(), (out.getPos() - footerIndex)); - BytesUtils.writeIntLittleEndian(out, footerSizeToInt(out.getPos() - footerIndex)); + BytesUtils.writeIntLittleEndian(out, toIntWithCheck(out.getPos() - footerIndex, "page")); out.write(MAGIC); return; } @@ -1928,7 +1925,7 @@ private static void serializeFooter( writeFileCryptoMetaData(fileEncryptor.getFileCryptoMetaData(), out); byte[] footerAAD = AesCipher.createFooterAAD(fileEncryptor.getFileAAD()); writeFileMetaData(parquetMetadata, out, fileEncryptor.getFooterEncryptor(), footerAAD); - int combinedMetaDataLength = footerSizeToInt(out.getPos() - cryptoFooterIndex); + int combinedMetaDataLength = toIntWithCheck(out.getPos() - cryptoFooterIndex, "page"); LOG.debug("{}: crypto metadata and footer length = {}", out.getPos(), combinedMetaDataLength); BytesUtils.writeIntLittleEndian(out, combinedMetaDataLength); out.write(EFMAGIC);