From 274dc51bc9e5cc880ba3c77c3db826d2a4943965 Mon Sep 17 00:00:00 2001 From: Gabor Szadovszky Date: Fri, 23 Feb 2024 17:11:56 +0100 Subject: [PATCH] PARQUET-2432: Use ByteBufferAllocator over hardcoded heap allocation (#1278) * PARQUET-2432: Use ByteBufferAllocator over hardcoded heap allocation * Updated BytesInput implementations to rely on a ByteBufferAllocator instance for allocating/releasing ByteBuffer objects. * Extend the usage of a ByteBufferAllocator instead of the hardcoded usage of heap (e.g. byte[], ByteBuffer.allocate etc.) * parquet-cli related code parts including ParquetRewriter and tests are not changed in this effort * Reuse temporary ByteBuffer instead of keep allocating/releasing --- .../parquet/column/ColumnWriteStore.java | 3 +- .../apache/parquet/column/ColumnWriter.java | 3 +- .../column/page/DictionaryPageReadStore.java | 7 +- .../parquet/column/page/PageWriteStore.java | 7 +- .../parquet/column/page/PageWriter.java | 7 +- .../bloomfilter/BloomFilterWriteStore.java | 7 +- .../values/bloomfilter/BloomFilterWriter.java | 7 +- .../parquet/column/page/mem/MemPageStore.java | 5 + parquet-common/pom.xml | 7 + .../parquet/bytes/ByteBufferReleaser.java | 62 +++ .../org/apache/parquet/bytes/BytesInput.java | 194 ++++++++- .../org/apache/parquet/bytes/BytesUtils.java | 12 +- .../bytes/CapacityByteArrayOutputStream.java | 19 + .../ConcatenatingByteArrayCollector.java | 15 + .../ConcatenatingByteBufferCollector.java | 105 +++++ .../bytes/ReusingByteBufferAllocator.java | 107 +++++ .../bytes/TrackingByteBufferAllocator.java | 2 + .../apache/parquet/util/AutoCloseables.java | 41 +- .../apache/parquet/bytes/TestBytesInput.java | 392 ++++++++++++++++++ .../TestConcatenatingByteBufferCollector.java | 112 +++++ .../bytes/TestReusingByteBufferAllocator.java | 124 ++++++ .../filter2/compat/RowGroupFilter.java | 5 +- .../converter/ParquetMetadataConverter.java | 23 +- .../hadoop/ColumnChunkPageReadStore.java | 32 +- .../hadoop/ColumnChunkPageWriteStore.java | 25 +- .../parquet/hadoop/DictionaryPageReader.java | 25 +- .../hadoop/InternalParquetRecordWriter.java | 27 +- .../parquet/hadoop/ParquetFileReader.java | 117 ++++-- .../parquet/hadoop/ParquetFileWriter.java | 84 +++- .../parquet/hadoop/ParquetOutputFormat.java | 6 +- .../apache/parquet/hadoop/ParquetWriter.java | 10 +- .../hadoop/TestColumnChunkPageWriteStore.java | 27 +- .../parquet/hadoop/TestDataPageChecksums.java | 23 +- .../parquet/hadoop/TestParquetFileWriter.java | 66 ++- 34 files changed, 1549 insertions(+), 159 deletions(-) create mode 100644 parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferReleaser.java create mode 100644 parquet-common/src/main/java/org/apache/parquet/bytes/ConcatenatingByteBufferCollector.java create mode 100644 parquet-common/src/main/java/org/apache/parquet/bytes/ReusingByteBufferAllocator.java create mode 100644 parquet-common/src/test/java/org/apache/parquet/bytes/TestBytesInput.java create mode 100644 parquet-common/src/test/java/org/apache/parquet/bytes/TestConcatenatingByteBufferCollector.java create mode 100644 parquet-common/src/test/java/org/apache/parquet/bytes/TestReusingByteBufferAllocator.java diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriteStore.java b/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriteStore.java index 9e2edb25ad..a1836b2578 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriteStore.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriteStore.java @@ -22,7 +22,7 @@ * Container which can construct writers for multiple columns to be stored * together. */ -public interface ColumnWriteStore { +public interface ColumnWriteStore extends AutoCloseable { /** * @param path the column for which to create a writer * @return the column writer for the given column @@ -63,6 +63,7 @@ public interface ColumnWriteStore { /** * Close the related output stream and release any resources */ + @Override public abstract void close(); /** diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriter.java index 833a6c92b9..539dbaf383 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriter.java @@ -23,7 +23,7 @@ /** * writer for (repetition level, definition level, values) triplets */ -public interface ColumnWriter { +public interface ColumnWriter extends AutoCloseable { /** * writes the current value @@ -91,6 +91,7 @@ public interface ColumnWriter { * Close the underlying store. This should be called when there are no * more data to be written. */ + @Override void close(); /** diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/DictionaryPageReadStore.java b/parquet-column/src/main/java/org/apache/parquet/column/page/DictionaryPageReadStore.java index 5740303088..931fb5abbf 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/page/DictionaryPageReadStore.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/page/DictionaryPageReadStore.java @@ -23,7 +23,7 @@ /** * Interface to read dictionary pages for all the columns of a row group */ -public interface DictionaryPageReadStore { +public interface DictionaryPageReadStore extends AutoCloseable { /** * Returns a {@link DictionaryPage} for the given column descriptor. @@ -33,4 +33,9 @@ public interface DictionaryPageReadStore { * @return the DictionaryPage for that column, or null if there isn't one */ DictionaryPage readDictionaryPage(ColumnDescriptor descriptor); + + @Override + default void close() { + // No-op default implementation for compatibility + } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriteStore.java b/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriteStore.java index bfe2985605..4f0044ae5a 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriteStore.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriteStore.java @@ -23,11 +23,16 @@ /** * contains all the writers for the columns in the corresponding row group */ -public interface PageWriteStore { +public interface PageWriteStore extends AutoCloseable { /** * @param path the descriptor for the column * @return the corresponding page writer */ PageWriter getPageWriter(ColumnDescriptor path); + + @Override + default void close() { + // No-op default implementation for compatibility + } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriter.java index 92be10e623..e0016123ab 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriter.java @@ -26,7 +26,7 @@ /** * a writer for all the pages of a given column chunk */ -public interface PageWriter { +public interface PageWriter extends AutoCloseable { /** * writes a single page @@ -120,4 +120,9 @@ void writePageV2( * @return a string presenting a summary of how memory is used */ String memUsageString(String prefix); + + @Override + default void close() { + // No-op default implementation for compatibility + } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriteStore.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriteStore.java index f7e28fdf2d..a044b48cb4 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriteStore.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriteStore.java @@ -24,7 +24,7 @@ /** * Contains all writers for all columns of a row group */ -public interface BloomFilterWriteStore { +public interface BloomFilterWriteStore extends AutoCloseable { /** * Get bloom filter writer of a column * @@ -32,4 +32,9 @@ public interface BloomFilterWriteStore { * @return the corresponding Bloom filter writer */ BloomFilterWriter getBloomFilterWriter(ColumnDescriptor path); + + @Override + default void close() { + // No-op default implementation for compatibility + } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriter.java index 00ce896366..941106d35d 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriter.java @@ -19,11 +19,16 @@ package org.apache.parquet.column.values.bloomfilter; -public interface BloomFilterWriter { +public interface BloomFilterWriter extends AutoCloseable { /** * Write a Bloom filter * * @param bloomFilter the Bloom filter to write */ void writeBloomFilter(BloomFilter bloomFilter); + + @Override + default void close() { + // No-op default implementation for compatibility + } } diff --git a/parquet-column/src/test/java/org/apache/parquet/column/page/mem/MemPageStore.java b/parquet-column/src/test/java/org/apache/parquet/column/page/mem/MemPageStore.java index 1e4ad99b5e..f5b66fd88b 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/page/mem/MemPageStore.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/page/mem/MemPageStore.java @@ -70,6 +70,11 @@ public long getRowCount() { return rowCount; } + @Override + public void close() { + // no-op + } + public void addRowCount(long count) { rowCount += count; } diff --git a/parquet-common/pom.xml b/parquet-common/pom.xml index 61ec1d7bf6..6c2d244393 100644 --- a/parquet-common/pom.xml +++ b/parquet-common/pom.xml @@ -61,6 +61,13 @@ ${slf4j.version} test + + + org.mockito + mockito-all + ${mockito.version} + test + diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferReleaser.java b/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferReleaser.java new file mode 100644 index 0000000000..754e1954ea --- /dev/null +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferReleaser.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.bytes; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +/** + * Convenient class for releasing {@link java.nio.ByteBuffer} objects with the corresponding allocator; + */ +public class ByteBufferReleaser implements AutoCloseable { + + final ByteBufferAllocator allocator; + private final List toRelease = new ArrayList<>(); + + /** + * Constructs a new {@link ByteBufferReleaser} instance with the specified {@link ByteBufferAllocator} to be used for + * releasing the buffers in {@link #close()}. + * + * @param allocator the allocator to be used for releasing the buffers + * @see #releaseLater(ByteBuffer) + * @see #close() + */ + public ByteBufferReleaser(ByteBufferAllocator allocator) { + this.allocator = allocator; + } + + /** + * Adds a {@link ByteBuffer} object to the list of buffers to be released at {@link #close()}. The specified buffer + * shall be one that was allocated by the {@link ByteBufferAllocator} of this object. + * + * @param buffer the buffer to be released + */ + public void releaseLater(ByteBuffer buffer) { + toRelease.add(buffer); + } + + @Override + public void close() { + for (ByteBuffer buf : toRelease) { + allocator.release(buf); + } + toRelease.clear(); + } +} diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java index 42a2537921..88bb1da7cf 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java @@ -24,10 +24,12 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; import java.util.Arrays; import java.util.List; +import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -194,7 +196,9 @@ public static BytesInput empty() { * @param bytesInput a BytesInput * @return a copy of the BytesInput * @throws IOException if there is an exception when reading bytes from the BytesInput + * @deprecated Use {@link #copy(ByteBufferAllocator, Consumer)} instead */ + @Deprecated public static BytesInput copy(BytesInput bytesInput) throws IOException { return from(bytesInput.toByteArray()); } @@ -207,10 +211,18 @@ public static BytesInput copy(BytesInput bytesInput) throws IOException { */ public abstract void writeAllTo(OutputStream out) throws IOException; + /** + * For internal use only. It is expected that the buffer is large enough to fit the content of this {@link BytesInput} + * object. + */ + abstract void writeInto(ByteBuffer buffer); + /** * @return a new byte array materializing the contents of this input * @throws IOException if there is an exception reading + * @deprecated Use {@link #toByteBuffer(ByteBufferAllocator, Consumer)} */ + @Deprecated public byte[] toByteArray() throws IOException { long size = size(); if (size > Integer.MAX_VALUE) { @@ -229,11 +241,93 @@ public byte[] toByteArray() throws IOException { /** * @return a new ByteBuffer materializing the contents of this input * @throws IOException if there is an exception reading + * @deprecated Use {@link #toByteBuffer(ByteBufferAllocator, Consumer)} */ + @Deprecated public ByteBuffer toByteBuffer() throws IOException { return ByteBuffer.wrap(toByteArray()); } + /** + * Copies the content of this {@link BytesInput} object to a newly created {@link ByteBuffer} and returns it wrapped + * in a {@link BytesInput} object. + * + * The data content shall be able to be fit in a {@link ByteBuffer} object! (In case of the size of + * this {@link BytesInput} object cannot fit in an {@code int}, an {@link ArithmeticException} will be thrown. The + * {@code allocator} might throw an {@link OutOfMemoryError} if it is unable to allocate the required + * {@link ByteBuffer}.) + * + * @param allocator the allocator to be used for creating the new {@link ByteBuffer} object + * @param callback the callback called with the newly created {@link ByteBuffer} object; to be used for make it + * released at the proper time + * @return the newly created {@link BytesInput} object wrapping the copied content of the specified one + */ + public BytesInput copy(ByteBufferAllocator allocator, Consumer callback) { + ByteBuffer buf = allocator.allocate(Math.toIntExact(size())); + callback.accept(buf); + writeInto(buf); + buf.flip(); + return BytesInput.from(buf); + } + + /** + * Similar to {@link #copy(ByteBufferAllocator, Consumer)} where the allocator and the callback are in the specified + * {@link ByteBufferReleaser}. + */ + public BytesInput copy(ByteBufferReleaser releaser) { + return copy(releaser.allocator, releaser::releaseLater); + } + + /** + * Returns a {@link ByteBuffer} object referencing the data behind this {@link BytesInput} object. It may create a new + * {@link ByteBuffer} object if this {@link BytesInput} is not backed by a single {@link ByteBuffer}. In the latter + * case the specified {@link ByteBufferAllocator} object will be used. In case of allocation the specified callback + * will be invoked so the release of the newly allocated {@link ByteBuffer} object can be released at a proper time. + * + * The data content shall be able to be fit in a {@link ByteBuffer} object! (In case of the size of + * this {@link BytesInput} object cannot fit in an {@code int}, an {@link ArithmeticException} will be thrown. The + * {@code allocator} might throw an {@link OutOfMemoryError} if it is unable to allocate the required + * {@link ByteBuffer}.) + * + * @param allocator the {@link ByteBufferAllocator} to be used for potentially allocating a new {@link ByteBuffer} + * object + * @param callback the callback to be called with the new {@link ByteBuffer} object potentially allocated + * @return the {@link ByteBuffer} object with the data content of this {@link BytesInput} object. (Might be a copy of + * the content or directly referencing the same memory as this {@link BytesInput} object.) + */ + public ByteBuffer toByteBuffer(ByteBufferAllocator allocator, Consumer callback) { + ByteBuffer buf = getInternalByteBuffer(); + // The internal buffer should be direct iff the allocator is direct as well but let's be sure + if (buf == null || buf.isDirect() != allocator.isDirect()) { + buf = allocator.allocate(Math.toIntExact(size())); + callback.accept(buf); + writeInto(buf); + buf.flip(); + } + return buf; + } + + /** + * Similar to {@link #toByteBuffer(ByteBufferAllocator, Consumer)} where the allocator and the callback are in the + * specified {@link ByteBufferReleaser}. + */ + public ByteBuffer toByteBuffer(ByteBufferReleaser releaser) { + return toByteBuffer(releaser.allocator, releaser::releaseLater); + } + + /** + * For internal use only. + *

+ * Returns a {@link ByteBuffer} object referencing to the internal data of this {@link BytesInput} without copying if + * applicable. If it is not possible (because there are multiple {@link ByteBuffer}s internally or cannot be + * referenced as a {@link ByteBuffer}), {@code null} value will be returned. + * + * @return the internal data of this {@link BytesInput} or {@code null} + */ + ByteBuffer getInternalByteBuffer() { + return null; + } + /** * @return a new InputStream materializing the contents of this input * @throws IOException if there is an exception reading @@ -275,6 +369,20 @@ public void writeAllTo(OutputStream out) throws IOException { out.write(this.toByteArray()); } + @Override + void writeInto(ByteBuffer buffer) { + try { + // Needs a duplicate buffer to set the correct limit (we do not want to over-read the stream) + ByteBuffer workBuf = buffer.duplicate(); + int pos = buffer.position(); + workBuf.limit(pos + byteCount); + Channels.newChannel(in).read(workBuf); + buffer.position(pos + byteCount); + } catch (IOException e) { + new RuntimeException("Exception occurred during reading input stream", e); + } + } + public byte[] toByteArray() throws IOException { LOG.debug("read all {} bytes", byteCount); byte[] buf = new byte[byteCount]; @@ -315,6 +423,18 @@ public void writeAllTo(OutputStream out) throws IOException { } } + @Override + void writeInto(ByteBuffer buffer) { + for (BytesInput input : inputs) { + input.writeInto(buffer); + } + } + + @Override + ByteBuffer getInternalByteBuffer() { + return inputs.size() == 1 ? inputs.get(0).getInternalByteBuffer() : null; + } + @Override public long size() { return size; @@ -334,8 +454,16 @@ public void writeAllTo(OutputStream out) throws IOException { BytesUtils.writeIntLittleEndian(out, intValue); } - public ByteBuffer toByteBuffer() throws IOException { - return ByteBuffer.allocate(4).putInt(0, intValue); + @Override + void writeInto(ByteBuffer buffer) { + buffer.order(ByteOrder.LITTLE_ENDIAN).putInt(intValue); + } + + public ByteBuffer toByteBuffer() { + ByteBuffer buf = ByteBuffer.allocate(4); + writeInto(buf); + buf.flip(); + return buf; } @Override @@ -357,9 +485,20 @@ public void writeAllTo(OutputStream out) throws IOException { BytesUtils.writeUnsignedVarInt(intValue, out); } - public ByteBuffer toByteBuffer() throws IOException { + @Override + void writeInto(ByteBuffer buffer) { + try { + BytesUtils.writeUnsignedVarInt(intValue, buffer); + } catch (IOException e) { + // It does not actually throw an I/O exception, but we cannot remove throws for compatibility + throw new RuntimeException(e); + } + } + + public ByteBuffer toByteBuffer() { ByteBuffer ret = ByteBuffer.allocate((int) size()); - BytesUtils.writeUnsignedVarInt(intValue, ret); + writeInto(ret); + ret.flip(); return ret; } @@ -383,6 +522,11 @@ public void writeAllTo(OutputStream out) throws IOException { BytesUtils.writeUnsignedVarLong(longValue, out); } + @Override + void writeInto(ByteBuffer buffer) { + BytesUtils.writeUnsignedVarLong(longValue, buffer); + } + @Override public long size() { int s = (70 - Long.numberOfLeadingZeros(longValue)) / 7; @@ -395,6 +539,11 @@ private static class EmptyBytesInput extends BytesInput { @Override public void writeAllTo(OutputStream out) throws IOException {} + @Override + void writeInto(ByteBuffer buffer) { + // no-op + } + @Override public long size() { return 0; @@ -418,6 +567,16 @@ public void writeAllTo(OutputStream out) throws IOException { arrayOut.writeTo(out); } + @Override + void writeInto(ByteBuffer buffer) { + arrayOut.writeInto(buffer); + } + + @Override + ByteBuffer getInternalByteBuffer() { + return arrayOut.getInternalByteBuffer(); + } + @Override public long size() { return arrayOut.size(); @@ -437,6 +596,11 @@ public void writeAllTo(OutputStream out) throws IOException { arrayOut.writeTo(out); } + @Override + void writeInto(ByteBuffer buffer) { + buffer.put(arrayOut.toByteArray()); + } + @Override public long size() { return arrayOut.size(); @@ -460,6 +624,11 @@ public void writeAllTo(OutputStream out) throws IOException { out.write(in, offset, length); } + @Override + void writeInto(ByteBuffer buffer) { + buffer.put(in, offset, length); + } + public ByteBuffer toByteBuffer() throws IOException { return java.nio.ByteBuffer.wrap(in, offset, length); } @@ -491,6 +660,13 @@ public void writeAllTo(OutputStream out) throws IOException { } } + @Override + void writeInto(ByteBuffer target) { + for (ByteBuffer buffer : buffers) { + target.put(buffer.duplicate()); + } + } + @Override public ByteBufferInputStream toInputStream() { return ByteBufferInputStream.wrap(buffers); @@ -514,6 +690,16 @@ public void writeAllTo(OutputStream out) throws IOException { Channels.newChannel(out).write(buffer.duplicate()); } + @Override + void writeInto(ByteBuffer target) { + target.put(buffer.duplicate()); + } + + @Override + ByteBuffer getInternalByteBuffer() { + return buffer.slice(); + } + @Override public ByteBufferInputStream toInputStream() { return ByteBufferInputStream.wrap(buffer); diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesUtils.java b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesUtils.java index 1e11ee08aa..b8373a898d 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesUtils.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesUtils.java @@ -233,10 +233,10 @@ public static void writeUnsignedVarInt(int value, OutputStream out) throws IOExc public static void writeUnsignedVarInt(int value, ByteBuffer dest) throws IOException { while ((value & 0xFFFFFF80) != 0L) { - dest.putInt((value & 0x7F) | 0x80); + dest.put((byte) ((value & 0x7F) | 0x80)); value >>>= 7; } - dest.putInt(value & 0x7F); + dest.put((byte) (value & 0x7F)); } public static void writeZigZagVarInt(int intValue, OutputStream out) throws IOException { @@ -276,6 +276,14 @@ public static void writeUnsignedVarLong(long value, OutputStream out) throws IOE out.write((int) (value & 0x7F)); } + public static void writeUnsignedVarLong(long value, ByteBuffer out) { + while ((value & 0xFFFFFFFFFFFFFF80L) != 0L) { + out.put((byte) ((value & 0x7F) | 0x80)); + value >>>= 7; + } + out.put((byte) (value & 0x7F)); + } + public static void writeZigZagVarLong(long longValue, OutputStream out) throws IOException { writeUnsignedVarLong((longValue << 1) ^ (longValue >> 63), out); } diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java index 2031e625ad..84d3c5b7ba 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java @@ -251,6 +251,16 @@ public void writeTo(OutputStream out) throws IOException { } } + /** + * It is expected that the buffer is large enough to fit the content of this. + */ + void writeInto(ByteBuffer buffer) { + for (ByteBuffer slab : slabs) { + slab.flip(); + buffer.put(slab); + } + } + /** * @return The total size in bytes of data written to this stream. */ @@ -329,6 +339,15 @@ int getSlabCount() { return slabs.size(); } + ByteBuffer getInternalByteBuffer() { + if (slabs.size() == 1) { + ByteBuffer buf = slabs.get(0).duplicate(); + buf.flip(); + return buf.slice(); + } + return null; + } + @Override public void close() { for (ByteBuffer slab : slabs) { diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/ConcatenatingByteArrayCollector.java b/parquet-common/src/main/java/org/apache/parquet/bytes/ConcatenatingByteArrayCollector.java index e6e9eea3a0..e025a1bcff 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/ConcatenatingByteArrayCollector.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/ConcatenatingByteArrayCollector.java @@ -22,10 +22,18 @@ import java.io.IOException; import java.io.OutputStream; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +/** + * Used for collecting the content of {@link BytesInput} objects. + * + * @deprecated Use {@link ConcatenatingByteBufferCollector} instead. + */ +@Deprecated public class ConcatenatingByteArrayCollector extends BytesInput { + private final List slabs = new ArrayList(); private long size = 0; @@ -47,6 +55,13 @@ public void writeAllTo(OutputStream out) throws IOException { } } + @Override + public void writeInto(ByteBuffer buffer) { + for (byte[] slab : slabs) { + buffer.put(slab); + } + } + @Override public long size() { return size; diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/ConcatenatingByteBufferCollector.java b/parquet-common/src/main/java/org/apache/parquet/bytes/ConcatenatingByteBufferCollector.java new file mode 100644 index 0000000000..7a616e9b9d --- /dev/null +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/ConcatenatingByteBufferCollector.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.bytes; + +import static java.lang.String.format; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import java.util.ArrayList; +import java.util.List; + +/** + * Alternative to {@link ConcatenatingByteArrayCollector} but using {@link java.nio.ByteBuffer}s allocated by its + * {@link ByteBufferAllocator}. + */ +public class ConcatenatingByteBufferCollector extends BytesInput implements AutoCloseable { + + private final ByteBufferAllocator allocator; + private final List slabs = new ArrayList<>(); + private long size = 0; + + /** + * Constructs a new {@link ConcatenatingByteBufferCollector} instance with the specified allocator. + * + * @param allocator to be used for allocating the required {@link ByteBuffer} instances + */ + public ConcatenatingByteBufferCollector(ByteBufferAllocator allocator) { + this.allocator = allocator; + } + + /** + * Collects the content of the specified input. It allocates a new {@link ByteBuffer} instance that can contain all + * the content. + * + * @param bytesInput the input which content is to be collected + */ + public void collect(BytesInput bytesInput) { + int inputSize = Math.toIntExact(bytesInput.size()); + ByteBuffer slab = allocator.allocate(inputSize); + bytesInput.writeInto(slab); + slab.flip(); + slabs.add(slab); + size += inputSize; + } + + @Override + public void close() { + for (ByteBuffer slab : slabs) { + allocator.release(slab); + } + slabs.clear(); + } + + @Override + public void writeAllTo(OutputStream out) throws IOException { + WritableByteChannel channel = Channels.newChannel(out); + for (ByteBuffer buffer : slabs) { + channel.write(buffer.duplicate()); + } + } + + @Override + public void writeInto(ByteBuffer buffer) { + for (ByteBuffer slab : slabs) { + buffer.put(slab.duplicate()); + } + } + + @Override + ByteBuffer getInternalByteBuffer() { + return slabs.size() == 1 ? slabs.get(0).duplicate() : null; + } + + @Override + public long size() { + return size; + } + + /** + * @param prefix a prefix to be used for every new line in the string + * @return a text representation of the memory usage of this structure + */ + public String memUsageString(String prefix) { + return format("%s %s %d slabs, %,d bytes", prefix, getClass().getSimpleName(), slabs.size(), size); + } +} diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/ReusingByteBufferAllocator.java b/parquet-common/src/main/java/org/apache/parquet/bytes/ReusingByteBufferAllocator.java new file mode 100644 index 0000000000..83b77b0ca9 --- /dev/null +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/ReusingByteBufferAllocator.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.bytes; + +import java.nio.ByteBuffer; + +/** + * A special {@link ByteBufferAllocator} implementation that keeps one {@link ByteBuffer} object and reuses it at the + * next {@link #allocate(int)} call. The {@link #close()} shall be called when this allocator is not needed anymore to + * really release the one buffer. + */ +public class ReusingByteBufferAllocator implements ByteBufferAllocator, AutoCloseable { + + private final ByteBufferAllocator allocator; + private final ByteBufferReleaser releaser = new ByteBufferReleaser(this); + private ByteBuffer buffer; + private ByteBuffer bufferOut; + + /** + * Constructs a new {@link ReusingByteBufferAllocator} object with the specified "parent" allocator to be used for + * allocating/releasing the one buffer. + * + * @param allocator the allocator to be used for allocating/releasing the one buffer + */ + public ReusingByteBufferAllocator(ByteBufferAllocator allocator) { + this.allocator = allocator; + } + + /** + * A convenience method to get a {@link ByteBufferReleaser} instance already created for this allocator. + * + * @return a releaser for this allocator + */ + public ByteBufferReleaser getReleaser() { + return releaser; + } + + /** + * {@inheritDoc} + * + * @throws IllegalStateException if the one buffer was not released yet + */ + @Override + public ByteBuffer allocate(int size) { + if (bufferOut != null) { + throw new IllegalStateException("The single buffer is not yet released"); + } + if (buffer == null) { + bufferOut = buffer = allocator.allocate(size); + } else if (buffer.capacity() < size) { + allocator.release(buffer); + bufferOut = buffer = allocator.allocate(size); + } else { + buffer.clear(); + buffer.limit(size); + bufferOut = buffer.slice(); + } + return bufferOut; + } + + /** + * {@inheritDoc} + * + * @throws IllegalStateException if the one has already been released or never allocated + * @throws IllegalArgumentException if the specified buffer is not the one allocated by this allocator + */ + @Override + public void release(ByteBuffer b) { + if (bufferOut == null) { + throw new IllegalStateException("The single buffer has already been released or never allocated"); + } + if (b != bufferOut) { + throw new IllegalArgumentException("The buffer to be released is not the one allocated by this allocator"); + } + bufferOut = null; + } + + @Override + public boolean isDirect() { + return allocator.isDirect(); + } + + @Override + public void close() { + if (buffer != null) { + allocator.release(buffer); + buffer = null; + bufferOut = null; + } + } +} diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/TrackingByteBufferAllocator.java b/parquet-common/src/main/java/org/apache/parquet/bytes/TrackingByteBufferAllocator.java index c6dd3431f2..d46073551d 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/TrackingByteBufferAllocator.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/TrackingByteBufferAllocator.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; +import java.util.Objects; /** * A wrapper {@link ByteBufferAllocator} implementation that tracks whether all allocated buffers are released. It @@ -138,6 +139,7 @@ public ByteBuffer allocate(int size) { @Override public void release(ByteBuffer b) throws ReleasingUnallocatedByteBufferException { + Objects.requireNonNull(b); if (allocated.remove(new Key(b)) == null) { throw new ReleasingUnallocatedByteBufferException(); } diff --git a/parquet-common/src/main/java/org/apache/parquet/util/AutoCloseables.java b/parquet-common/src/main/java/org/apache/parquet/util/AutoCloseables.java index 6934c40bdb..833b8f9f5c 100644 --- a/parquet-common/src/main/java/org/apache/parquet/util/AutoCloseables.java +++ b/parquet-common/src/main/java/org/apache/parquet/util/AutoCloseables.java @@ -18,6 +18,7 @@ */ package org.apache.parquet.util; +import java.util.Arrays; import org.apache.parquet.ParquetRuntimeException; /** @@ -27,24 +28,27 @@ public final class AutoCloseables { public static class ParquetCloseResourceException extends ParquetRuntimeException { - private ParquetCloseResourceException(Exception e) { + private ParquetCloseResourceException(Throwable e) { super("Unable to close resource", e); } } /** * Invokes the {@link AutoCloseable#close()} method of each specified objects in a way that guarantees that all the - * methods will be invoked even if an exception is occurred before. + * methods will be invoked even if an exception is occurred before. It also gracefully handles {@code null} + * {@link AutoCloseable} instances by skipping them. * * @param autoCloseables the objects to be closed * @throws Exception the compound exception built from the exceptions thrown by the close methods */ - public static void close(Iterable autoCloseables) throws Exception { - Exception root = null; + public static void close(Iterable autoCloseables) throws Throwable { + Throwable root = null; for (AutoCloseable autoCloseable : autoCloseables) { try { - autoCloseable.close(); - } catch (Exception e) { + if (autoCloseable != null) { + autoCloseable.close(); + } + } catch (Throwable e) { if (root == null) { root = e; } else { @@ -57,17 +61,38 @@ public static void close(Iterable autoCloseables) throws Exceptio } } + /** + * Invokes the {@link AutoCloseable#close()} method of each specified objects in a way that guarantees that all the + * methods will be invoked even if an exception is occurred before. It also gracefully handles {@code null} + * {@link AutoCloseable} instances by skipping them. + * + * @param autoCloseables the objects to be closed + * @throws Exception the compound exception built from the exceptions thrown by the close methods + */ + public static void close(AutoCloseable... autoCloseables) throws Throwable { + close(Arrays.asList(autoCloseables)); + } + /** * Works similarly to {@link #close(Iterable)} but it wraps the thrown exception (if any) into a * {@link ParquetCloseResourceException}. */ - public static void uncheckedClose(Iterable autoCloseables) throws ParquetCloseResourceException { + public static void uncheckedClose(Iterable autoCloseables) + throws ParquetCloseResourceException { try { close(autoCloseables); - } catch (Exception e) { + } catch (Throwable e) { throw new ParquetCloseResourceException(e); } } + /** + * Works similarly to {@link #close(Iterable)} but it wraps the thrown exception (if any) into a + * {@link ParquetCloseResourceException}. + */ + public static void uncheckedClose(AutoCloseable... autoCloseables) { + uncheckedClose(Arrays.asList(autoCloseables)); + } + private AutoCloseables() {} } diff --git a/parquet-common/src/test/java/org/apache/parquet/bytes/TestBytesInput.java b/parquet-common/src/test/java/org/apache/parquet/bytes/TestBytesInput.java new file mode 100644 index 0000000000..d2c9e82353 --- /dev/null +++ b/parquet-common/src/test/java/org/apache/parquet/bytes/TestBytesInput.java @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.bytes; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.function.Consumer; +import java.util.function.Supplier; +import org.apache.parquet.util.AutoCloseables; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +import org.mockito.Mockito; + +/** + * Unit tests for the {@link BytesInput} class and its descendants. + */ +@RunWith(Parameterized.class) +public class TestBytesInput { + + private static final Random RANDOM = new Random(2024_02_20_16_28L); + private TrackingByteBufferAllocator allocator; + private final ByteBufferAllocator innerAllocator; + + @Parameters(name = "{0}") + public static Object[][] parameters() { + return new Object[][] { + { + new HeapByteBufferAllocator() { + @Override + public String toString() { + return "heap-allocator"; + } + } + }, + { + new DirectByteBufferAllocator() { + @Override + public String toString() { + return "direct-allocator"; + } + } + } + }; + } + + public TestBytesInput(ByteBufferAllocator innerAllocator) { + this.innerAllocator = innerAllocator; + } + + @Before + public void initAllocator() { + allocator = TrackingByteBufferAllocator.wrap(innerAllocator); + } + + @After + public void closeAllocator() { + allocator.close(); + } + + @Test + public void testFromSingleByteBuffer() throws IOException { + byte[] data = new byte[1000]; + RANDOM.nextBytes(data); + Supplier factory = () -> BytesInput.from(toByteBuffer(data)); + + validate(data, factory); + + validateToByteBufferIsInternal(factory); + } + + @Test + public void testFromMultipleByteBuffers() throws IOException { + byte[] data = new byte[1000]; + RANDOM.nextBytes(data); + Supplier factory = () -> BytesInput.from( + toByteBuffer(data, 0, 250), + toByteBuffer(data, 250, 250), + toByteBuffer(data, 500, 250), + toByteBuffer(data, 750, 250)); + + validate(data, factory); + } + + @Test + public void testFromByteArray() throws IOException { + byte[] data = new byte[1000]; + RANDOM.nextBytes(data); + byte[] input = new byte[data.length + 20]; + RANDOM.nextBytes(input); + System.arraycopy(data, 0, input, 10, data.length); + Supplier factory = () -> BytesInput.from(input, 10, data.length); + + validate(data, factory); + } + + @Test + public void testFromInputStream() throws IOException { + byte[] data = new byte[1000]; + RANDOM.nextBytes(data); + byte[] input = new byte[data.length + 10]; + RANDOM.nextBytes(input); + System.arraycopy(data, 0, input, 0, data.length); + Supplier factory = () -> BytesInput.from(new ByteArrayInputStream(input), 1000); + + validate(data, factory); + } + + @Test + public void testFromByteArrayOutputStream() throws IOException { + byte[] data = new byte[1000]; + RANDOM.nextBytes(data); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + baos.write(data); + Supplier factory = () -> BytesInput.from(baos); + + validate(data, factory); + } + + @Test + public void testFromCapacityByteArrayOutputStreamOneSlab() throws IOException { + byte[] data = new byte[1000]; + RANDOM.nextBytes(data); + List toClose = new ArrayList<>(); + Supplier factory = () -> { + CapacityByteArrayOutputStream cbaos = new CapacityByteArrayOutputStream(10, 1000, allocator); + toClose.add(cbaos); + try { + cbaos.write(data); + } catch (IOException e) { + throw new RuntimeException(e); + } + return BytesInput.from(cbaos); + }; + + try { + validate(data, factory); + + validateToByteBufferIsInternal(factory); + } finally { + AutoCloseables.uncheckedClose(toClose); + } + } + + @Test + public void testFromCapacityByteArrayOutputStreamMultipleSlabs() throws IOException { + byte[] data = new byte[1000]; + RANDOM.nextBytes(data); + List toClose = new ArrayList<>(); + Supplier factory = () -> { + CapacityByteArrayOutputStream cbaos = new CapacityByteArrayOutputStream(10, 1000, allocator); + toClose.add(cbaos); + for (byte b : data) { + cbaos.write(b); + } + return BytesInput.from(cbaos); + }; + + try { + validate(data, factory); + } finally { + AutoCloseables.uncheckedClose(toClose); + } + } + + @Test + public void testFromInt() throws IOException { + int value = RANDOM.nextInt(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(4); + BytesUtils.writeIntLittleEndian(baos, value); + byte[] data = baos.toByteArray(); + Supplier factory = () -> BytesInput.fromInt(value); + + validate(data, factory); + } + + @Test + public void testFromUnsignedVarInt() throws IOException { + int value = RANDOM.nextInt(Short.MAX_VALUE); + ByteArrayOutputStream baos = new ByteArrayOutputStream(2); + BytesUtils.writeUnsignedVarInt(value, baos); + byte[] data = baos.toByteArray(); + Supplier factory = () -> BytesInput.fromUnsignedVarInt(value); + + validate(data, factory); + } + + @Test + public void testFromUnsignedVarLong() throws IOException { + long value = RANDOM.nextInt(Integer.MAX_VALUE); + ByteArrayOutputStream baos = new ByteArrayOutputStream(4); + BytesUtils.writeUnsignedVarLong(value, baos); + byte[] data = baos.toByteArray(); + Supplier factory = () -> BytesInput.fromUnsignedVarLong(value); + + validate(data, factory); + } + + @Test + public void testFromZigZagVarInt() throws IOException { + int value = RANDOM.nextInt() % Short.MAX_VALUE; + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + BytesUtils.writeZigZagVarInt(value, baos); + byte[] data = baos.toByteArray(); + Supplier factory = () -> BytesInput.fromZigZagVarInt(value); + + validate(data, factory); + } + + @Test + public void testFromZigZagVarLong() throws IOException { + long value = RANDOM.nextInt(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + BytesUtils.writeZigZagVarLong(value, baos); + byte[] data = baos.toByteArray(); + Supplier factory = () -> BytesInput.fromZigZagVarLong(value); + + validate(data, factory); + } + + @Test + public void testEmpty() throws IOException { + byte[] data = new byte[0]; + Supplier factory = () -> BytesInput.empty(); + + validate(data, factory); + } + + @Test + public void testConcatenatingByteBufferCollectorOneSlab() throws IOException { + byte[] data = new byte[1000]; + RANDOM.nextBytes(data); + List toClose = new ArrayList<>(); + + Supplier factory = () -> { + ConcatenatingByteBufferCollector collector = new ConcatenatingByteBufferCollector(allocator); + toClose.add(collector); + collector.collect(BytesInput.from(toByteBuffer(data))); + return collector; + }; + + try { + validate(data, factory); + + validateToByteBufferIsInternal(factory); + } finally { + AutoCloseables.uncheckedClose(toClose); + } + } + + @Test + public void testConcatenatingByteBufferCollectorMultipleSlabs() throws IOException { + byte[] data = new byte[1000]; + RANDOM.nextBytes(data); + List toClose = new ArrayList<>(); + + Supplier factory = () -> { + ConcatenatingByteBufferCollector collector = new ConcatenatingByteBufferCollector(allocator); + toClose.add(collector); + collector.collect(BytesInput.from(toByteBuffer(data, 0, 250))); + collector.collect(BytesInput.from(toByteBuffer(data, 250, 250))); + collector.collect(BytesInput.from(toByteBuffer(data, 500, 250))); + collector.collect(BytesInput.from(toByteBuffer(data, 750, 250))); + return collector; + }; + + try { + validate(data, factory); + } finally { + AutoCloseables.uncheckedClose(toClose); + } + } + + @Test + public void testConcat() throws IOException { + byte[] data = new byte[1000]; + RANDOM.nextBytes(data); + + Supplier factory = () -> BytesInput.concat( + BytesInput.from(toByteBuffer(data, 0, 250)), + BytesInput.empty(), + BytesInput.from(toByteBuffer(data, 250, 250)), + BytesInput.from(data, 500, 250), + BytesInput.from(new ByteArrayInputStream(data, 750, 250), 250)); + + validate(data, factory); + } + + private ByteBuffer toByteBuffer(byte[] data) { + return toByteBuffer(data, 0, data.length); + } + + private ByteBuffer toByteBuffer(byte[] data, int offset, int length) { + ByteBuffer buf = innerAllocator.allocate(length); + buf.put(data, offset, length); + buf.flip(); + return buf; + } + + private void validate(byte[] data, Supplier factory) throws IOException { + assertEquals(data.length, factory.get().size()); + validateToByteBuffer(data, factory); + validateCopy(data, factory); + validateToInputStream(data, factory); + validateWriteAllTo(data, factory); + } + + private void validateToByteBuffer(byte[] data, Supplier factory) { + BytesInput bi = factory.get(); + try (ByteBufferReleaser releaser = new ByteBufferReleaser(allocator)) { + ByteBuffer buf = bi.toByteBuffer(releaser); + int index = 0; + while (buf.hasRemaining()) { + if (buf.get() != data[index++]) { + fail("Data mismatch at position " + index); + } + } + } + } + + private void validateCopy(byte[] data, Supplier factory) throws IOException { + BytesInput bi = factory.get(); + try (ByteBufferReleaser releaser = new ByteBufferReleaser(allocator); + InputStream is = bi.copy(releaser).toInputStream()) { + assertContentEquals(data, is); + } + } + + private void validateToInputStream(byte[] data, Supplier factory) throws IOException { + BytesInput bi = factory.get(); + try (InputStream is = bi.toInputStream()) { + assertContentEquals(data, is); + } + } + + private void assertContentEquals(byte[] expected, InputStream is) throws IOException { + byte[] actual = new byte[expected.length]; + is.read(actual); + assertArrayEquals(expected, actual); + } + + private void validateWriteAllTo(byte[] data, Supplier factory) throws IOException { + BytesInput bi = factory.get(); + try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + bi.writeAllTo(baos); + assertArrayEquals(data, baos.toByteArray()); + } + } + + private void validateToByteBufferIsInternal(Supplier factory) { + ByteBufferAllocator allocatorMock = Mockito.mock(ByteBufferAllocator.class); + when(allocatorMock.isDirect()).thenReturn(innerAllocator.isDirect()); + Consumer callbackMock = Mockito.mock(Consumer.class); + factory.get().toByteBuffer(allocatorMock, callbackMock); + verify(allocatorMock, never()).allocate(anyInt()); + verify(callbackMock, never()).accept(anyObject()); + } +} diff --git a/parquet-common/src/test/java/org/apache/parquet/bytes/TestConcatenatingByteBufferCollector.java b/parquet-common/src/test/java/org/apache/parquet/bytes/TestConcatenatingByteBufferCollector.java new file mode 100644 index 0000000000..8b3a9cabaf --- /dev/null +++ b/parquet-common/src/test/java/org/apache/parquet/bytes/TestConcatenatingByteBufferCollector.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.bytes; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Test class of {@link ConcatenatingByteBufferCollector}. + */ +public class TestConcatenatingByteBufferCollector { + + private TrackingByteBufferAllocator allocator; + + @Before + public void initAllocator() { + allocator = TrackingByteBufferAllocator.wrap(new HeapByteBufferAllocator()); + } + + @After + public void closeAllocator() { + allocator.close(); + } + + @Test + public void test() throws IOException { + byte[] result; + try (ConcatenatingByteBufferCollector outer = new ConcatenatingByteBufferCollector(allocator); + ConcatenatingByteBufferCollector inner = new ConcatenatingByteBufferCollector(allocator)) { + outer.collect(BytesInput.concat( + BytesInput.from(byteBuffer("This"), byteBuffer(" "), byteBuffer("is")), + BytesInput.from(Arrays.asList(byteBuffer(" a"), byteBuffer(" "), byteBuffer("test"))), + BytesInput.from(inputStream(" text to blabla"), 8), + BytesInput.from(bytes(" ")), + BytesInput.from(bytes("blabla validate blabla"), 7, 9), + BytesInput.from(byteArrayOutputStream("the class ")), + BytesInput.from(capacityByteArrayOutputStream("ConcatenatingByteBufferCollector")))); + inner.collect(BytesInput.fromInt(12345)); + inner.collect(BytesInput.fromUnsignedVarInt(67891)); + inner.collect(BytesInput.fromUnsignedVarLong(2345678901L)); + inner.collect(BytesInput.fromZigZagVarInt(-234567)); + inner.collect(BytesInput.fromZigZagVarLong(-890123456789L)); + outer.collect(inner); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + outer.writeAllTo(baos); + result = baos.toByteArray(); + } + + Assert.assertEquals( + "This is a test text to validate the class ConcatenatingByteBufferCollector", + new String(result, 0, 74)); + InputStream in = new ByteArrayInputStream(result, 74, result.length - 74); + Assert.assertEquals(12345, BytesUtils.readIntLittleEndian(in)); + Assert.assertEquals(67891, BytesUtils.readUnsignedVarInt(in)); + Assert.assertEquals(2345678901L, BytesUtils.readUnsignedVarLong(in)); + Assert.assertEquals(-234567, BytesUtils.readZigZagVarInt(in)); + Assert.assertEquals(-890123456789L, BytesUtils.readZigZagVarLong(in)); + } + + private static byte[] bytes(String str) { + return str.getBytes(StandardCharsets.UTF_8); + } + + private static ByteBuffer byteBuffer(String str) { + return ByteBuffer.wrap(bytes(str)); + } + + private static InputStream inputStream(String str) { + return new ByteArrayInputStream(bytes(str)); + } + + private static ByteArrayOutputStream byteArrayOutputStream(String str) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + baos.write(bytes(str)); + return baos; + } + + private static CapacityByteArrayOutputStream capacityByteArrayOutputStream(String str) { + CapacityByteArrayOutputStream cbaos = + new CapacityByteArrayOutputStream(2, Integer.MAX_VALUE, new HeapByteBufferAllocator()); + for (byte b : bytes(str)) { + cbaos.write(b); + } + return cbaos; + } +} diff --git a/parquet-common/src/test/java/org/apache/parquet/bytes/TestReusingByteBufferAllocator.java b/parquet-common/src/test/java/org/apache/parquet/bytes/TestReusingByteBufferAllocator.java new file mode 100644 index 0000000000..cc585d7d41 --- /dev/null +++ b/parquet-common/src/test/java/org/apache/parquet/bytes/TestReusingByteBufferAllocator.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.bytes; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; + +import java.nio.ByteBuffer; +import java.nio.InvalidMarkException; +import java.util.Random; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +public class TestReusingByteBufferAllocator { + + private static final Random RANDOM = new Random(2024_02_22_09_51L); + + private TrackingByteBufferAllocator allocator; + + @Parameter + public ByteBufferAllocator innerAllocator; + + @Parameters(name = "{0}") + public static Object[][] parameters() { + return new Object[][] { + { + new HeapByteBufferAllocator() { + @Override + public String toString() { + return "HEAP"; + } + } + }, + { + new DirectByteBufferAllocator() { + @Override + public String toString() { + return "DIRECT"; + } + } + } + }; + } + + @Before + public void initAllocator() { + allocator = TrackingByteBufferAllocator.wrap(innerAllocator); + } + + @After + public void closeAllocator() { + allocator.close(); + } + + @Test + public void normalUseCase() { + try (ReusingByteBufferAllocator reusingAllocator = new ReusingByteBufferAllocator(allocator)) { + assertEquals(innerAllocator.isDirect(), reusingAllocator.isDirect()); + for (int i = 0; i < 10; ++i) { + try (ByteBufferReleaser releaser = reusingAllocator.getReleaser()) { + int size = RANDOM.nextInt(1024); + ByteBuffer buf = reusingAllocator.allocate(size); + releaser.releaseLater(buf); + + assertEquals(0, buf.position()); + assertEquals(size, buf.capacity()); + assertEquals(size, buf.remaining()); + assertEquals(allocator.isDirect(), buf.isDirect()); + assertThrows(InvalidMarkException.class, buf::reset); + + // Let's see if the next allocate would clear the buffer + buf.position(buf.capacity() / 2); + buf.mark(); + buf.position(buf.limit()); + } + } + + // Check if actually releasing the buffer is independent of the release call in the reusing allocator + reusingAllocator.allocate(1025); + } + } + + @Test + public void validateExceptions() { + try (ByteBufferReleaser releaser = new ByteBufferReleaser(allocator); + ReusingByteBufferAllocator reusingAllocator = new ReusingByteBufferAllocator(allocator)) { + ByteBuffer fromOther = allocator.allocate(10); + releaser.releaseLater(fromOther); + + assertThrows(IllegalStateException.class, () -> reusingAllocator.release(fromOther)); + + ByteBuffer fromReusing = reusingAllocator.allocate(10); + + assertThrows(IllegalArgumentException.class, () -> reusingAllocator.release(fromOther)); + assertThrows(IllegalStateException.class, () -> reusingAllocator.allocate(10)); + + reusingAllocator.release(fromReusing); + assertThrows(IllegalStateException.class, () -> reusingAllocator.release(fromOther)); + assertThrows(IllegalStateException.class, () -> reusingAllocator.release(fromReusing)); + } + } +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java index 9bf21c26f1..a86590ead0 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.List; import java.util.Objects; +import org.apache.parquet.column.page.DictionaryPageReadStore; import org.apache.parquet.filter2.bloomfilterlevel.BloomFilterImpl; import org.apache.parquet.filter2.compat.FilterCompat.Filter; import org.apache.parquet.filter2.compat.FilterCompat.NoOpFilter; @@ -102,7 +103,9 @@ public List visit(FilterCompat.FilterPredicateCompat filterPredic } if (!drop && levels.contains(FilterLevel.DICTIONARY)) { - drop = DictionaryFilter.canDrop(filterPredicate, block.getColumns(), reader.getDictionaryReader(block)); + try (DictionaryPageReadStore dictionaryPageReadStore = reader.getDictionaryReader(block)) { + drop = DictionaryFilter.canDrop(filterPredicate, block.getColumns(), dictionaryPageReadStore); + } } if (!drop && levels.contains(FilterLevel.BLOOMFILTER)) { diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index 350bc37949..e2e109a871 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -25,6 +25,7 @@ import static org.apache.parquet.format.Util.writeColumnMetaData; import static org.apache.parquet.format.Util.writePageHeader; +import java.io.BufferedInputStream; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -46,7 +47,6 @@ import org.apache.parquet.CorruptStatistics; import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.Preconditions; -import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.column.EncodingStats; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.statistics.BinaryStatistics; @@ -1450,10 +1450,11 @@ private static void verifyFooterIntegrity( AesGcmEncryptor footerSigner = fileDecryptor.createSignedFooterEncryptor(); - byte[] footerAndSignature = ((ByteBufferInputStream) from).slice(0).array(); int footerSignatureLength = AesCipher.NONCE_LENGTH + AesCipher.GCM_TAG_LENGTH; byte[] serializedFooter = new byte[combinedFooterLength - footerSignatureLength]; - System.arraycopy(footerAndSignature, 0, serializedFooter, 0, serializedFooter.length); + // Resetting to the beginning of the footer + from.reset(); + from.read(serializedFooter); byte[] signedFooterAAD = AesCipher.createFooterAAD(fileDecryptor.getFileAAD()); byte[] encryptedFooterBytes = footerSigner.encrypt(false, serializedFooter, nonce, signedFooterAAD); @@ -1501,7 +1502,7 @@ public FileMetaDataAndRowGroupOffsetInfo( } public ParquetMetadata readParquetMetadata( - final InputStream from, + final InputStream fromInputStream, MetadataFilter filter, final InternalFileDecryptor fileDecryptor, final boolean encryptedFooter, @@ -1512,6 +1513,20 @@ public ParquetMetadata readParquetMetadata( final byte[] encryptedFooterAAD = (encryptedFooter ? AesCipher.createFooterAAD(fileDecryptor.getFileAAD()) : null); + // Mark the beginning of the footer for verifyFooterIntegrity + final InputStream from; + if (fileDecryptor != null && fileDecryptor.checkFooterIntegrity()) { + // fromInputStream should already support marking but let's be on the safe side + if (!fromInputStream.markSupported()) { + from = new BufferedInputStream(fromInputStream, combinedFooterLength); + } else { + from = fromInputStream; + } + from.mark(combinedFooterLength); + } else { + from = fromInputStream; + } + FileMetaDataAndRowGroupOffsetInfo fileMetaDataAndRowGroupInfo = filter.accept(new MetadataFilterVisitor() { @Override diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java index f5cc761624..cc20c15629 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayDeque; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -30,6 +29,7 @@ import java.util.Queue; import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.bytes.ByteBufferAllocator; +import org.apache.parquet.bytes.ByteBufferReleaser; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.DataPage; @@ -79,7 +79,7 @@ static final class ColumnChunkPageReader implements PageReader { private final BlockCipher.Decryptor blockDecryptor; private final byte[] dataPageAAD; private final byte[] dictionaryPageAAD; - private final List toRelease = new ArrayList<>(); + private final ByteBufferReleaser releaser; ColumnChunkPageReader( BytesInputDecompressor decompressor, @@ -103,6 +103,7 @@ static final class ColumnChunkPageReader implements PageReader { this.offsetIndex = offsetIndex; this.rowCount = rowCount; this.options = options; + this.releaser = new ByteBufferReleaser(options.getAllocator()); this.blockDecryptor = blockDecryptor; if (null != blockDecryptor) { dataPageAAD = @@ -148,7 +149,7 @@ public DataPage visit(DataPageV1 dataPageV1) { BytesInput decompressed; if (options.getAllocator().isDirect() && options.useOffHeapDecryptBuffer()) { - ByteBuffer byteBuffer = bytes.toByteBuffer(); + ByteBuffer byteBuffer = bytes.toByteBuffer(releaser); if (!byteBuffer.isDirect()) { throw new ParquetDecodingException("Expected a direct buffer"); } @@ -159,7 +160,7 @@ public DataPage visit(DataPageV1 dataPageV1) { ByteBuffer decompressedBuffer = options.getAllocator().allocate(dataPageV1.getUncompressedSize()); - toRelease.add(decompressedBuffer); + releaser.releaseLater(decompressedBuffer); long start = System.nanoTime(); decompressor.decompress( byteBuffer, @@ -228,7 +229,7 @@ public DataPage visit(DataPageV2 dataPageV2) { long compressedSize; if (options.getAllocator().isDirect() && options.useOffHeapDecryptBuffer()) { - ByteBuffer byteBuffer = pageBytes.toByteBuffer(); + ByteBuffer byteBuffer = pageBytes.toByteBuffer(releaser); if (!byteBuffer.isDirect()) { throw new ParquetDecodingException("Expected a direct buffer"); } @@ -242,7 +243,7 @@ public DataPage visit(DataPageV2 dataPageV2) { - dataPageV2.getRepetitionLevels().size()); ByteBuffer decompressedBuffer = options.getAllocator().allocate(uncompressedSize); - toRelease.add(decompressedBuffer); + releaser.releaseLater(decompressedBuffer); long start = System.nanoTime(); decompressor.decompress( byteBuffer, (int) compressedSize, decompressedBuffer, uncompressedSize); @@ -334,8 +335,6 @@ public DictionaryPage readDictionaryPage() { bytes = BytesInput.from(blockDecryptor.decrypt(bytes.toByteArray(), dictionaryPageAAD)); } long start = System.nanoTime(); - BytesInput decompressed = - decompressor.decompress(bytes, compressedDictionaryPage.getUncompressedSize()); setDecompressMetrics(bytes, start); DictionaryPage decompressedPage = new DictionaryPage( decompressor.decompress(bytes, compressedDictionaryPage.getUncompressedSize()), @@ -351,11 +350,7 @@ public DictionaryPage readDictionaryPage() { } private void releaseBuffers() { - ByteBufferAllocator allocator = options.getAllocator(); - for (ByteBuffer buffer : toRelease) { - allocator.release(buffer); - } - toRelease.clear(); + releaser.close(); } } @@ -365,7 +360,7 @@ private void releaseBuffers() { private final long rowIndexOffset; private final RowRanges rowRanges; private ByteBufferAllocator allocator; - private List toRelease; + private ByteBufferReleaser releaser; public ColumnChunkPageReadStore(long rowCount) { this(rowCount, -1); @@ -422,9 +417,8 @@ void addColumn(ColumnDescriptor path, ColumnChunkPageReader reader) { } } - void setBuffersToRelease(ByteBufferAllocator allocator, List toRelease) { - this.allocator = allocator; - this.toRelease = toRelease; + void setReleaser(ByteBufferReleaser releaser) { + this.releaser = releaser; } @Override @@ -432,8 +426,6 @@ public void close() { for (ColumnChunkPageReader reader : readers.values()) { reader.releaseBuffers(); } - for (ByteBuffer buffer : toRelease) { - allocator.release(buffer); - } + releaser.close(); } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java index 91e6343b6e..cb8ca25cf8 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java @@ -28,8 +28,9 @@ import java.util.Set; import java.util.zip.CRC32; import org.apache.parquet.bytes.ByteBufferAllocator; +import org.apache.parquet.bytes.ByteBufferReleaser; import org.apache.parquet.bytes.BytesInput; -import org.apache.parquet.bytes.ConcatenatingByteArrayCollector; +import org.apache.parquet.bytes.ConcatenatingByteBufferCollector; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.Encoding; import org.apache.parquet.column.ParquetProperties; @@ -52,6 +53,7 @@ import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder; import org.apache.parquet.io.ParquetEncodingException; import org.apache.parquet.schema.MessageType; +import org.apache.parquet.util.AutoCloseables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,7 +71,7 @@ private static final class ColumnChunkPageWriter implements PageWriter, BloomFil private final BytesInputCompressor compressor; private final ByteArrayOutputStream tempOutputStream = new ByteArrayOutputStream(); - private final ConcatenatingByteArrayCollector buf; + private final ConcatenatingByteBufferCollector buf; private DictionaryPage dictionaryPage; private long uncompressedLength; @@ -86,7 +88,7 @@ private static final class ColumnChunkPageWriter implements PageWriter, BloomFil private ColumnIndexBuilder columnIndexBuilder; private OffsetIndexBuilder offsetIndexBuilder; private Statistics totalStatistics; - private final ByteBufferAllocator allocator; + private final ByteBufferReleaser releaser; private final CRC32 crc; boolean pageWriteChecksumEnabled; @@ -113,8 +115,8 @@ private ColumnChunkPageWriter( int columnOrdinal) { this.path = path; this.compressor = compressor; - this.allocator = allocator; - this.buf = new ConcatenatingByteArrayCollector(); + this.releaser = new ByteBufferReleaser(allocator); + this.buf = new ConcatenatingByteBufferCollector(allocator); this.columnIndexBuilder = ColumnIndexBuilder.getBuilder(path.getPrimitiveType(), columnIndexTruncateLength); this.offsetIndexBuilder = OffsetIndexBuilder.getBuilder(); this.pageWriteChecksumEnabled = pageWriteChecksumEnabled; @@ -428,7 +430,7 @@ public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOExceptio BytesInput.from(pageBlockEncryptor.encrypt(compressedBytes.toByteArray(), dictonaryPageAAD)); } this.dictionaryPage = new DictionaryPage( - BytesInput.copy(compressedBytes), + compressedBytes.copy(releaser), uncompressedSize, dictionaryPage.getDictionarySize(), dictionaryPage.getEncoding()); @@ -439,6 +441,11 @@ public String memUsageString(String prefix) { return buf.memUsageString(prefix + " ColumnChunkPageWriter"); } + @Override + public void close() { + AutoCloseables.uncheckedClose(buf, releaser); + } + @Override public void writeBloomFilter(BloomFilter bloomFilter) { this.bloomFilter = bloomFilter; @@ -550,6 +557,12 @@ public PageWriter getPageWriter(ColumnDescriptor path) { return writers.get(path); } + @Override + public void close() { + AutoCloseables.uncheckedClose(writers.values()); + writers.clear(); + } + @Override public BloomFilterWriter getBloomFilterWriter(ColumnDescriptor path) { return writers.get(path); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java index 696e407e81..e1118b6076 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java @@ -24,7 +24,8 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; -import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.bytes.ByteBufferAllocator; +import org.apache.parquet.bytes.ByteBufferReleaser; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.DictionaryPage; import org.apache.parquet.column.page.DictionaryPageReadStore; @@ -46,19 +47,21 @@ class DictionaryPageReader implements DictionaryPageReadStore { private final Map columns; private final Map> dictionaryPageCache; private ColumnChunkPageReadStore rowGroup = null; + private ByteBufferReleaser releaser; /** * Instantiate a new DictionaryPageReader. * - * @param reader The target ParquetFileReader - * @param block The target BlockMetaData - * @throws NullPointerException if {@code reader} or {@code block} is - * {@code null} + * @param reader The target ParquetFileReader + * @param block The target BlockMetaData + * @param allocator The allocator to be used for potentially allocating {@link java.nio.ByteBuffer} objects + * @throws NullPointerException if {@code reader} or {@code block} is {@code null} */ - DictionaryPageReader(ParquetFileReader reader, BlockMetaData block) { + DictionaryPageReader(ParquetFileReader reader, BlockMetaData block, ByteBufferAllocator allocator) { this.reader = Objects.requireNonNull(reader); this.columns = new HashMap<>(); this.dictionaryPageCache = new ConcurrentHashMap<>(); + releaser = new ByteBufferReleaser(allocator); for (ColumnChunkMetaData column : block.getColumns()) { columns.put(column.getPath().toDotString(), column); @@ -106,8 +109,12 @@ public DictionaryPage readDictionaryPage(ColumnDescriptor descriptor) { .orElse(null); } - private static DictionaryPage reusableCopy(DictionaryPage dict) throws IOException { - return new DictionaryPage( - BytesInput.from(dict.getBytes().toByteArray()), dict.getDictionarySize(), dict.getEncoding()); + private DictionaryPage reusableCopy(DictionaryPage dict) throws IOException { + return new DictionaryPage(dict.getBytes().copy(releaser), dict.getDictionarySize(), dict.getEncoding()); + } + + @Override + public void close() { + releaser.close(); } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java index 77bfb60992..20809089a4 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java @@ -37,6 +37,7 @@ import org.apache.parquet.io.MessageColumnIO; import org.apache.parquet.io.api.RecordConsumer; import org.apache.parquet.schema.MessageType; +import org.apache.parquet.util.AutoCloseables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -125,16 +126,20 @@ private void initStore() { public void close() throws IOException, InterruptedException { if (!closed) { - flushRowGroupToStore(); - FinalizedWriteContext finalWriteContext = writeSupport.finalizeWrite(); - Map finalMetadata = new HashMap(extraMetaData); - String modelName = writeSupport.getName(); - if (modelName != null) { - finalMetadata.put(ParquetWriter.OBJECT_MODEL_NAME_PROP, modelName); + try { + flushRowGroupToStore(); + FinalizedWriteContext finalWriteContext = writeSupport.finalizeWrite(); + Map finalMetadata = new HashMap(extraMetaData); + String modelName = writeSupport.getName(); + if (modelName != null) { + finalMetadata.put(ParquetWriter.OBJECT_MODEL_NAME_PROP, modelName); + } + finalMetadata.putAll(finalWriteContext.getExtraMetaData()); + parquetFileWriter.end(finalMetadata); + } finally { + AutoCloseables.uncheckedClose(columnStore, pageStore, bloomFilterWriteStore, parquetFileWriter); + closed = true; } - finalMetadata.putAll(finalWriteContext.getExtraMetaData()); - parquetFileWriter.end(finalMetadata); - closed = true; } } @@ -197,10 +202,10 @@ private void flushRowGroupToStore() throws IOException { parquetFileWriter.endBlock(); this.nextRowGroupSize = Math.min(parquetFileWriter.getNextRowGroupSize(), rowGroupSizeThreshold); } - - columnStore.close(); + AutoCloseables.uncheckedClose(columnStore, pageStore, bloomFilterWriteStore); columnStore = null; pageStore = null; + bloomFilterWriteStore = null; } long getRowGroupSizeThreshold() { diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index 6bd71ee8b8..628b6dcf16 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -63,7 +63,9 @@ import org.apache.parquet.HadoopReadOptions; import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.ByteBufferReleaser; import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.bytes.ReusingByteBufferAllocator; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.DataPage; import org.apache.parquet.column.page.DataPageV1; @@ -112,6 +114,7 @@ import org.apache.parquet.io.SeekableInputStream; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.util.AutoCloseables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -127,6 +130,7 @@ public class ParquetFileReader implements Closeable { private final ParquetMetadataConverter converter; private final CRC32 crc; + private final ReusingByteBufferAllocator crcAllocator; /** * for files provided, check if there's a summary file. @@ -606,27 +610,32 @@ private static final ParquetMetadata readFooter( // Read all the footer bytes in one time to avoid multiple read operations, // since it can be pretty time consuming for a single read operation in HDFS. - ByteBuffer footerBytesBuffer = ByteBuffer.allocate(fileMetadataLength); - f.readFully(footerBytesBuffer); - LOG.debug("Finished to read all footer bytes."); - footerBytesBuffer.flip(); - InputStream footerBytesStream = ByteBufferInputStream.wrap(footerBytesBuffer); - - // Regular file, or encrypted file with plaintext footer - if (!encryptedFooterMode) { - return converter.readParquetMetadata( - footerBytesStream, options.getMetadataFilter(), fileDecryptor, false, fileMetadataLength); - } + ByteBuffer footerBytesBuffer = options.getAllocator().allocate(fileMetadataLength); + try { + f.readFully(footerBytesBuffer); + LOG.debug("Finished to read all footer bytes."); + footerBytesBuffer.flip(); + InputStream footerBytesStream = ByteBufferInputStream.wrap(footerBytesBuffer); + + // Regular file, or encrypted file with plaintext footer + if (!encryptedFooterMode) { + return converter.readParquetMetadata( + footerBytesStream, options.getMetadataFilter(), fileDecryptor, false, fileMetadataLength); + } - // Encrypted file with encrypted footer - if (null == fileDecryptor) { - throw new ParquetCryptoRuntimeException("Trying to read file with encrypted footer. No keys available"); + // Encrypted file with encrypted footer + if (null == fileDecryptor) { + throw new ParquetCryptoRuntimeException("Trying to read file with encrypted footer. No keys available"); + } + FileCryptoMetaData fileCryptoMetaData = readFileCryptoMetaData(footerBytesStream); + fileDecryptor.setFileCryptoMetaData( + fileCryptoMetaData.getEncryption_algorithm(), true, fileCryptoMetaData.getKey_metadata()); + // footer length is required only for signed plaintext footers + return converter.readParquetMetadata( + footerBytesStream, options.getMetadataFilter(), fileDecryptor, true, 0); + } finally { + options.getAllocator().release(footerBytesBuffer); } - FileCryptoMetaData fileCryptoMetaData = readFileCryptoMetaData(footerBytesStream); - fileDecryptor.setFileCryptoMetaData( - fileCryptoMetaData.getEncryption_algorithm(), true, fileCryptoMetaData.getKey_metadata()); - // footer length is required only for signed plaintext footers - return converter.readParquetMetadata(footerBytesStream, options.getMetadataFilter(), fileDecryptor, true, 0); } /** @@ -769,7 +778,14 @@ public ParquetFileReader( for (ColumnDescriptor col : columns) { paths.put(ColumnPath.get(col.getPath()), col); } - this.crc = options.usePageChecksumVerification() ? new CRC32() : null; + + if (options.usePageChecksumVerification()) { + this.crc = new CRC32(); + this.crcAllocator = new ReusingByteBufferAllocator(options.getAllocator()); + } else { + this.crc = null; + this.crcAllocator = null; + } } /** @@ -821,7 +837,14 @@ public ParquetFileReader(Configuration conf, Path file, ParquetMetadata footer) for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) { paths.put(ColumnPath.get(col.getPath()), col); } - this.crc = options.usePageChecksumVerification() ? new CRC32() : null; + + if (options.usePageChecksumVerification()) { + this.crc = new CRC32(); + this.crcAllocator = new ReusingByteBufferAllocator(options.getAllocator()); + } else { + this.crc = null; + this.crcAllocator = null; + } } /** @@ -853,7 +876,14 @@ public ParquetFileReader(Configuration conf, Path file, ParquetMetadata footer, for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) { paths.put(ColumnPath.get(col.getPath()), col); } - this.crc = options.usePageChecksumVerification() ? new CRC32() : null; + + if (options.usePageChecksumVerification()) { + this.crc = new CRC32(); + this.crcAllocator = new ReusingByteBufferAllocator(options.getAllocator()); + } else { + this.crc = null; + this.crcAllocator = null; + } } public ParquetFileReader(InputFile file, ParquetReadOptions options) throws IOException { @@ -888,7 +918,14 @@ public ParquetFileReader(InputFile file, ParquetReadOptions options) throws IOEx for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) { paths.put(ColumnPath.get(col.getPath()), col); } - this.crc = options.usePageChecksumVerification() ? new CRC32() : null; + + if (options.usePageChecksumVerification()) { + this.crc = new CRC32(); + this.crcAllocator = new ReusingByteBufferAllocator(options.getAllocator()); + } else { + this.crc = null; + this.crcAllocator = null; + } } private static List listWithNulls(int size) { @@ -1057,7 +1094,7 @@ private ColumnChunkPageReadStore internalReadRowGroup(int blockIndex) throws IOE for (ConsecutivePartList consecutiveChunks : allParts) { consecutiveChunks.readAll(f, builder); } - rowGroup.setBuffersToRelease(options.getAllocator(), builder.toRelease); + rowGroup.setReleaser(builder.releaser); for (Chunk chunk : builder.build()) { readChunkPages(chunk, block, rowGroup); } @@ -1215,7 +1252,7 @@ private ColumnChunkPageReadStore internalReadFilteredRowGroup( for (ConsecutivePartList consecutiveChunks : allParts) { consecutiveChunks.readAll(f, builder); } - rowGroup.setBuffersToRelease(options.getAllocator(), builder.toRelease); + rowGroup.setReleaser(builder.releaser); for (Chunk chunk : builder.build()) { readChunkPages(chunk, block, rowGroup); } @@ -1281,6 +1318,10 @@ private boolean advanceToNextBlock() { // update the current block and instantiate a dictionary reader for it ++currentBlock; + + if (nextDictionaryReader != null) { + nextDictionaryReader.close(); + } this.nextDictionaryReader = null; return true; @@ -1304,11 +1345,11 @@ public DictionaryPageReader getDictionaryReader(int blockIndex) { if (blockIndex < 0 || blockIndex >= blocks.size()) { return null; } - return new DictionaryPageReader(this, blocks.get(blockIndex)); + return new DictionaryPageReader(this, blocks.get(blockIndex), options.getAllocator()); } public DictionaryPageReader getDictionaryReader(BlockMetaData block) { - return new DictionaryPageReader(this, block); + return new DictionaryPageReader(this, block, options.getAllocator()); } /** @@ -1385,10 +1426,7 @@ private DictionaryPage readCompressedDictionary( int uncompressedPageSize = pageHeader.getUncompressed_page_size(); int compressedPageSize = pageHeader.getCompressed_page_size(); - byte[] dictPageBytes = new byte[compressedPageSize]; - fin.readFully(dictPageBytes); - - BytesInput bin = BytesInput.from(dictPageBytes); + BytesInput bin = BytesInput.from(fin, compressedPageSize); if (null != pageDecryptor) { bin = BytesInput.from(pageDecryptor.decrypt(bin.toByteArray(), dictionaryPageAAD)); @@ -1569,6 +1607,7 @@ public void close() throws IOException { f.close(); } } finally { + AutoCloseables.uncheckedClose(nextDictionaryReader, crcAllocator); options.getCodecFactory().release(); } } @@ -1587,7 +1626,7 @@ private class ChunkData { private ChunkDescriptor lastDescriptor; private final long rowCount; private SeekableInputStream f; - private List toRelease = new ArrayList<>(); + private final ByteBufferReleaser releaser = new ByteBufferReleaser(options.getAllocator()); public ChunkListBuilder(long rowCount) { this.rowCount = rowCount; @@ -1600,7 +1639,7 @@ void add(ChunkDescriptor descriptor, List buffers, SeekableInputStre } void addBuffersToRelease(List toRelease) { - this.toRelease.addAll(toRelease); + toRelease.forEach(releaser::releaseLater); } void setOffsetIndex(ChunkDescriptor descriptor, OffsetIndex offsetIndex) { @@ -1659,9 +1698,11 @@ protected PageHeader readPageHeader(BlockCipher.Decryptor blockDecryptor, byte[] * Calculate checksum of input bytes, throw decoding exception if it does not match the provided * reference crc */ - private void verifyCrc(int referenceCrc, byte[] bytes, String exceptionMsg) { + private void verifyCrc(int referenceCrc, BytesInput bytes, String exceptionMsg) { crc.reset(); - crc.update(bytes); + try (ByteBufferReleaser releaser = crcAllocator.getReleaser()) { + crc.update(bytes.toByteBuffer(releaser)); + } if (crc.getValue() != ((long) referenceCrc & 0xffffffffL)) { throw new ParquetDecodingException(exceptionMsg); } @@ -1727,7 +1768,7 @@ public ColumnChunkPageReader readAllPages( if (options.usePageChecksumVerification() && pageHeader.isSetCrc()) { verifyCrc( pageHeader.getCrc(), - pageBytes.toByteArray(), + pageBytes, "could not verify dictionary page integrity, CRC checksum verification failed"); } DictionaryPageHeader dicHeader = pageHeader.getDictionary_page_header(); @@ -1747,7 +1788,7 @@ public ColumnChunkPageReader readAllPages( if (options.usePageChecksumVerification() && pageHeader.isSetCrc()) { verifyCrc( pageHeader.getCrc(), - pageBytes.toByteArray(), + pageBytes, "could not verify page integrity, CRC checksum verification failed"); } DataPageV1 dataPageV1 = new DataPageV1( @@ -1781,7 +1822,7 @@ public ColumnChunkPageReader readAllPages( pageBytes = BytesInput.concat(repetitionLevels, definitionLevels, values); verifyCrc( pageHeader.getCrc(), - pageBytes.toByteArray(), + pageBytes, "could not verify page integrity, CRC checksum verification failed"); } DataPageV2 dataPageV2 = new DataPageV2( diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java index 57700d494a..9867964def 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java @@ -42,8 +42,12 @@ import org.apache.hadoop.fs.Path; import org.apache.parquet.Preconditions; import org.apache.parquet.Version; +import org.apache.parquet.bytes.ByteBufferAllocator; +import org.apache.parquet.bytes.ByteBufferReleaser; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.bytes.BytesUtils; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.bytes.ReusingByteBufferAllocator; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.Encoding; import org.apache.parquet.column.EncodingStats; @@ -93,7 +97,7 @@ /** * Internal implementation of the Parquet file writer as a block container */ -public class ParquetFileWriter { +public class ParquetFileWriter implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(ParquetFileWriter.class); private final ParquetMetadataConverter metadataConverter; @@ -165,6 +169,7 @@ public static enum Mode { private ParquetMetadata footer = null; private final CRC32 crc; + private final ReusingByteBufferAllocator crcAllocator; private final boolean pageWriteChecksumEnabled; /** @@ -354,6 +359,7 @@ public ParquetFileWriter( statisticsTruncateLength, pageWriteChecksumEnabled, null, + null, null); } @@ -378,9 +384,34 @@ public ParquetFileWriter( statisticsTruncateLength, pageWriteChecksumEnabled, encryptionProperties, + null, null); } + public ParquetFileWriter( + OutputFile file, + MessageType schema, + Mode mode, + long rowGroupSize, + int maxPaddingSize, + FileEncryptionProperties encryptionProperties, + ParquetProperties props) + throws IOException { + this( + file, + schema, + mode, + rowGroupSize, + maxPaddingSize, + props.getColumnIndexTruncateLength(), + props.getStatisticsTruncateLength(), + props.getPageWriteChecksumEnabled(), + encryptionProperties, + null, + props.getAllocator()); + } + + @Deprecated public ParquetFileWriter( OutputFile file, MessageType schema, @@ -402,7 +433,8 @@ public ParquetFileWriter( statisticsTruncateLength, pageWriteChecksumEnabled, null, - encryptor); + encryptor, + null); } private ParquetFileWriter( @@ -415,7 +447,8 @@ private ParquetFileWriter( int statisticsTruncateLength, boolean pageWriteChecksumEnabled, FileEncryptionProperties encryptionProperties, - InternalFileEncryptor encryptor) + InternalFileEncryptor encryptor, + ByteBufferAllocator allocator) throws IOException { TypeUtil.checkValidWriteSchema(schema); @@ -439,6 +472,9 @@ private ParquetFileWriter( this.columnIndexTruncateLength = columnIndexTruncateLength; this.pageWriteChecksumEnabled = pageWriteChecksumEnabled; this.crc = pageWriteChecksumEnabled ? new CRC32() : null; + this.crcAllocator = pageWriteChecksumEnabled + ? new ReusingByteBufferAllocator(allocator == null ? new HeapByteBufferAllocator() : allocator) + : null; this.metadataConverter = new ParquetMetadataConverter(statisticsTruncateLength); @@ -488,10 +524,16 @@ private ParquetFileWriter( * @param file the file to write to * @param rowAndBlockSize the row group size * @param maxPaddingSize the maximum padding + * @param allocator allocator to potentially allocate {@link java.nio.ByteBuffer} objects * @throws IOException if the file can not be created */ ParquetFileWriter( - Configuration configuration, MessageType schema, Path file, long rowAndBlockSize, int maxPaddingSize) + Configuration configuration, + MessageType schema, + Path file, + long rowAndBlockSize, + int maxPaddingSize, + ByteBufferAllocator allocator) throws IOException { FileSystem fs = file.getFileSystem(configuration); this.schema = schema; @@ -502,6 +544,9 @@ private ParquetFileWriter( this.columnIndexTruncateLength = Integer.MAX_VALUE; this.pageWriteChecksumEnabled = ParquetOutputFormat.getPageWriteChecksumEnabled(configuration); this.crc = pageWriteChecksumEnabled ? new CRC32() : null; + this.crcAllocator = pageWriteChecksumEnabled + ? new ReusingByteBufferAllocator(allocator == null ? new HeapByteBufferAllocator() : allocator) + : null; this.metadataConverter = new ParquetMetadataConverter(ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH); this.fileEncryptor = null; } @@ -590,10 +635,10 @@ public void writeDictionaryPage( LOG.debug("{}: write dictionary page: {} values", out.getPos(), dictionaryPage.getDictionarySize()); currentChunkDictionaryPageOffset = out.getPos(); int uncompressedSize = dictionaryPage.getUncompressedSize(); - int compressedPageSize = (int) dictionaryPage.getBytes().size(); // TODO: fix casts + int compressedPageSize = Math.toIntExact(dictionaryPage.getBytes().size()); if (pageWriteChecksumEnabled) { crc.reset(); - crc.update(dictionaryPage.getBytes().toByteArray()); + crcUpdate(dictionaryPage.getBytes()); metadataConverter.writeDictionaryPageHeader( uncompressedSize, compressedPageSize, @@ -838,7 +883,7 @@ public void writeDataPage( int compressedPageSize = (int) bytes.size(); if (pageWriteChecksumEnabled) { crc.reset(); - crc.update(bytes.toByteArray()); + crcUpdate(bytes); metadataConverter.writeDataPageV1Header( uncompressedPageSize, compressedPageSize, @@ -970,13 +1015,13 @@ public void writeDataPageV2( if (pageWriteChecksumEnabled) { crc.reset(); if (repetitionLevels.size() > 0) { - crc.update(repetitionLevels.toByteArray()); + crcUpdate(repetitionLevels); } if (definitionLevels.size() > 0) { - crc.update(definitionLevels.toByteArray()); + crcUpdate(definitionLevels); } if (compressedData.size() > 0) { - crc.update(compressedData.toByteArray()); + crcUpdate(compressedData); } metadataConverter.writeDataPageV2Header( uncompressedSize, @@ -1020,6 +1065,12 @@ public void writeDataPageV2( offsetIndexBuilder.add((int) (out.getPos() - beforeHeader), rowCount); } + private void crcUpdate(BytesInput bytes) { + try (ByteBufferReleaser releaser = crcAllocator.getReleaser()) { + crc.update(bytes.toByteBuffer(releaser)); + } + } + /** * Writes a column chunk at once * @@ -1446,7 +1497,18 @@ public void end(Map extraMetaData) throws IOException { LOG.debug("{}: end", out.getPos()); this.footer = new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks); serializeFooter(footer, out, fileEncryptor, metadataConverter); - out.close(); + close(); + } + + @Override + public void close() throws IOException { + try { + out.close(); + } finally { + if (crcAllocator != null) { + crcAllocator.close(); + } + } } private static void serializeColumnIndexes( diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java index 158c432458..37a551cdea 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java @@ -505,10 +505,8 @@ public RecordWriter getRecordWriter(Configuration conf, Path file, Comp mode, blockSize, maxPaddingSize, - props.getColumnIndexTruncateLength(), - props.getStatisticsTruncateLength(), - props.getPageWriteChecksumEnabled(), - encryptionProperties); + encryptionProperties, + props); w.start(); float maxLoad = conf.getFloat(ParquetOutputFormat.MEMORY_POOL_RATIO, MemoryManager.DEFAULT_MEMORY_POOL_RATIO); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java index c609a11df6..22dc7e30f0 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java @@ -393,15 +393,7 @@ public ParquetWriter(Path file, Configuration conf, WriteSupport writeSupport } ParquetFileWriter fileWriter = new ParquetFileWriter( - file, - schema, - mode, - rowGroupSize, - maxPaddingSize, - encodingProps.getColumnIndexTruncateLength(), - encodingProps.getStatisticsTruncateLength(), - encodingProps.getPageWriteChecksumEnabled(), - encryptionProperties); + file, schema, mode, rowGroupSize, maxPaddingSize, encryptionProperties, encodingProps); fileWriter.start(); this.codecFactory = codecFactory; diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java index 58b48e0ea5..4efe4324c7 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java @@ -53,6 +53,7 @@ import org.apache.parquet.bytes.TrackingByteBufferAllocator; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.page.DataPageV2; import org.apache.parquet.column.page.DictionaryPage; import org.apache.parquet.column.page.PageReadStore; @@ -187,13 +188,14 @@ public void test(Configuration config, ByteBufferAllocator allocator) throws Exc schema, Mode.CREATE, ParquetWriter.DEFAULT_BLOCK_SIZE, - ParquetWriter.MAX_PADDING_SIZE_DEFAULT); + ParquetWriter.MAX_PADDING_SIZE_DEFAULT, + null, + ParquetProperties.builder().withAllocator(allocator).build()); writer.start(); writer.startBlock(rowCount); pageOffset = outputFile.out().getPos(); - { - ColumnChunkPageWriteStore store = - new ColumnChunkPageWriteStore(compressor(GZIP), schema, allocator, Integer.MAX_VALUE); + try (ColumnChunkPageWriteStore store = + new ColumnChunkPageWriteStore(compressor(GZIP), schema, allocator, Integer.MAX_VALUE)) { PageWriter pageWriter = store.getPageWriter(col); pageWriter.writePageV2( rowCount, @@ -277,19 +279,20 @@ public void testColumnOrderV1() throws IOException { int fakeCount = 3; BinaryStatistics fakeStats = new BinaryStatistics(); - ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore( + try (ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore( compressor(UNCOMPRESSED), schema, allocator = TrackingByteBufferAllocator.wrap(new HeapByteBufferAllocator()), - Integer.MAX_VALUE); + Integer.MAX_VALUE)) { - for (ColumnDescriptor col : schema.getColumns()) { - PageWriter pageWriter = store.getPageWriter(col); - pageWriter.writePage(fakeData, fakeCount, fakeStats, RLE, RLE, PLAIN); - } + for (ColumnDescriptor col : schema.getColumns()) { + PageWriter pageWriter = store.getPageWriter(col); + pageWriter.writePage(fakeData, fakeCount, fakeStats, RLE, RLE, PLAIN); + } - // flush to the mock writer - store.flushToFileWriter(mockFileWriter); + // flush to the mock writer + store.flushToFileWriter(mockFileWriter); + } for (ColumnDescriptor col : schema.getColumns()) { inOrder.verify(mockFileWriter) diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDataPageChecksums.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDataPageChecksums.java index 88dbc381d0..3ffa3806c4 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDataPageChecksums.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDataPageChecksums.java @@ -38,6 +38,7 @@ import org.apache.hadoop.fs.Path; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.bytes.TrackingByteBufferAllocator; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.Encoding; import org.apache.parquet.column.ParquetProperties; @@ -67,6 +68,8 @@ import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.MessageTypeParser; import org.apache.parquet.schema.Types; +import org.junit.After; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -79,6 +82,18 @@ public class TestDataPageChecksums { @Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); + private TrackingByteBufferAllocator allocator; + + @Before + public void initAllocator() { + allocator = TrackingByteBufferAllocator.wrap(new HeapByteBufferAllocator()); + } + + @After + public void closeAllocator() { + allocator.close(); + } + private static final Statistics EMPTY_STATS_INT32 = Statistics.getBuilderForReading(Types.required(INT32).named("a")).build(); @@ -116,7 +131,12 @@ private Path writeSimpleParquetFile( } ParquetFileWriter writer = new ParquetFileWriter( - conf, schemaSimple, path, ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.MAX_PADDING_SIZE_DEFAULT); + conf, + schemaSimple, + path, + ParquetWriter.DEFAULT_BLOCK_SIZE, + ParquetWriter.MAX_PADDING_SIZE_DEFAULT, + allocator); writer.start(); writer.startBlock(numRecordsLargeFile); @@ -251,6 +271,7 @@ private Path writeNestedWithNullsSampleParquetFile( try (ParquetWriter writer = ExampleParquetWriter.builder(path) .withConf(conf) + .withAllocator(allocator) .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) .withCompressionCodec(compression) .withDictionaryEncoding(dictionaryEncoding) diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java index cfef7398ab..8b85b181b3 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java @@ -23,8 +23,11 @@ import static org.apache.parquet.column.Encoding.PLAIN; import static org.apache.parquet.column.Encoding.RLE_DICTIONARY; import static org.apache.parquet.format.converter.ParquetMetadataConverter.MAX_STATS_SIZE; +import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.CREATE; import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE; import static org.apache.parquet.hadoop.ParquetInputFormat.READ_SUPPORT_CLASS; +import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE; +import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT; import static org.apache.parquet.hadoop.TestUtils.enforceEmptyDir; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; import static org.apache.parquet.schema.Type.Repetition.OPTIONAL; @@ -61,8 +64,11 @@ import org.apache.parquet.Version; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.bytes.BytesUtils; +import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.bytes.TrackingByteBufferAllocator; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.page.DataPage; import org.apache.parquet.column.page.DataPageV1; import org.apache.parquet.column.page.DataPageV2; @@ -89,6 +95,7 @@ import org.apache.parquet.hadoop.metadata.StrictKeyValueMetadataMergeStrategy; import org.apache.parquet.hadoop.util.ContextUtil; import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.hadoop.util.HadoopOutputFile; import org.apache.parquet.hadoop.util.HiddenFileFilter; import org.apache.parquet.internal.column.columnindex.BoundaryOrder; import org.apache.parquet.internal.column.columnindex.ColumnIndex; @@ -100,7 +107,9 @@ import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; import org.apache.parquet.schema.Types; +import org.junit.After; import org.junit.Assume; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -141,6 +150,39 @@ public class TestParquetFileWriter { @Rule public final TemporaryFolder temp = new TemporaryFolder(); + private TrackingByteBufferAllocator allocator; + + @Before + public void initAllocator() { + allocator = TrackingByteBufferAllocator.wrap(new HeapByteBufferAllocator()); + } + + @After + public void closeAllocator() { + allocator.close(); + } + + private ParquetFileWriter createWriter(Configuration conf, MessageType schema, Path path) throws IOException { + return createWriter(conf, schema, path, CREATE); + } + + private ParquetFileWriter createWriter( + Configuration conf, MessageType schema, Path path, ParquetFileWriter.Mode mode) throws IOException { + return new ParquetFileWriter( + HadoopOutputFile.fromPath(path, conf), + schema, + mode, + DEFAULT_BLOCK_SIZE, + MAX_PADDING_SIZE_DEFAULT, + null, + ParquetProperties.builder().withAllocator(allocator).build()); + } + + private ParquetFileWriter createWriter( + Configuration conf, MessageType schema, Path path, long blockSize, int maxPaddingSize) throws IOException { + return new ParquetFileWriter(conf, schema, path, blockSize, maxPaddingSize, allocator); + } + @Test public void testWriteMode() throws Exception { File testFile = temp.newFile(); @@ -152,14 +194,14 @@ public void testWriteMode() throws Exception { boolean exceptionThrown = false; Path path = new Path(testFile.toURI()); try { - writer = new ParquetFileWriter(conf, schema, path, ParquetFileWriter.Mode.CREATE); + writer = createWriter(conf, schema, path); } catch (IOException ioe1) { exceptionThrown = true; } assertTrue(exceptionThrown); exceptionThrown = false; try { - writer = new ParquetFileWriter(conf, schema, path, OVERWRITE); + writer = createWriter(conf, schema, path, OVERWRITE); } catch (IOException ioe2) { exceptionThrown = true; } @@ -175,7 +217,7 @@ public void testWriteRead() throws Exception { Path path = new Path(testFile.toURI()); Configuration configuration = new Configuration(); - ParquetFileWriter w = new ParquetFileWriter(configuration, SCHEMA, path); + ParquetFileWriter w = createWriter(configuration, SCHEMA, path); w.start(); w.startBlock(3); w.startColumn(C1, 5, CODEC); @@ -275,7 +317,7 @@ public void testWriteReadWithRecordReader() throws Exception { Path path = new Path(testFile.toURI()); Configuration configuration = new Configuration(); - ParquetFileWriter w = new ParquetFileWriter(configuration, SCHEMA, path); + ParquetFileWriter w = createWriter(configuration, SCHEMA, path); w.start(); w.startBlock(3); w.startColumn(C1, 5, CODEC); @@ -355,7 +397,7 @@ public void testWriteEmptyBlock() throws Exception { Path path = new Path(testFile.toURI()); Configuration configuration = new Configuration(); - ParquetFileWriter w = new ParquetFileWriter(configuration, SCHEMA, path); + ParquetFileWriter w = createWriter(configuration, SCHEMA, path); w.start(); w.startBlock(0); @@ -376,7 +418,7 @@ public void testBloomFilterWriteRead() throws Exception { String[] colPath = {"foo"}; ColumnDescriptor col = schema.getColumnDescription(colPath); BinaryStatistics stats1 = new BinaryStatistics(); - ParquetFileWriter w = new ParquetFileWriter(configuration, schema, path); + ParquetFileWriter w = createWriter(configuration, schema, path); w.start(); w.startBlock(3); w.startColumn(col, 5, CODEC); @@ -414,7 +456,7 @@ public void testWriteReadDataPageV2() throws Exception { Path path = new Path(testFile.toURI()); Configuration configuration = new Configuration(); - ParquetFileWriter w = new ParquetFileWriter(configuration, SCHEMA, path); + ParquetFileWriter w = createWriter(configuration, SCHEMA, path); w.start(); w.startBlock(14); @@ -530,7 +572,7 @@ public void testAlignmentWithPadding() throws Exception { conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, false); // uses the test constructor - ParquetFileWriter w = new ParquetFileWriter(conf, SCHEMA, path, 120, 60); + ParquetFileWriter w = createWriter(conf, SCHEMA, path, 120, 60); w.start(); w.startBlock(3); @@ -654,7 +696,7 @@ public void testAlignmentWithNoPaddingNeeded() throws Exception { conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, false); // uses the test constructor - ParquetFileWriter w = new ParquetFileWriter(conf, SCHEMA, path, 100, 50); + ParquetFileWriter w = createWriter(conf, SCHEMA, path, 100, 50); w.start(); w.startBlock(3); @@ -827,7 +869,7 @@ public void testWriteReadStatistics() throws Exception { statsB2C1P1.setMinMax(Binary.fromString("d"), Binary.fromString("e")); statsB2C2P1.setMinMax(11l, 122l); - ParquetFileWriter w = new ParquetFileWriter(configuration, schema, path); + ParquetFileWriter w = createWriter(configuration, schema, path); w.start(); w.startBlock(3); w.startColumn(c1, 5, codec); @@ -999,7 +1041,7 @@ private void createFile(Configuration configuration, Path path, MessageType sche BinaryStatistics stats1 = new BinaryStatistics(); BinaryStatistics stats2 = new BinaryStatistics(); - ParquetFileWriter w = new ParquetFileWriter(configuration, schema, path); + ParquetFileWriter w = createWriter(configuration, schema, path); w.start(); w.startBlock(3); w.startColumn(c1, 5, codec); @@ -1170,7 +1212,7 @@ public void testColumnIndexWriteRead() throws Exception { Path path = new Path(testFile.toURI()); Configuration configuration = new Configuration(); - ParquetFileWriter w = new ParquetFileWriter(configuration, SCHEMA, path); + ParquetFileWriter w = createWriter(configuration, SCHEMA, path); w.start(); w.startBlock(4); w.startColumn(C1, 7, CODEC);