Skip to content

Commit

Permalink
PARQUET-2431: Handle ByteBufferAllocator gracefully (#1274)
Browse files Browse the repository at this point in the history
  • Loading branch information
gszadovszky authored Feb 19, 2024
1 parent 61bdf44 commit d839608
Show file tree
Hide file tree
Showing 27 changed files with 936 additions and 479 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public boolean apply(@Nullable ColumnDescriptor input) {
pages != null;
pages = reader.readNextRowGroup()) {
validator.validate(columns, pages);
pages.close();
}
} catch (BadStatsException e) {
return e.getMessage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ public int run() throws IOException {
}
}
rowGroupNum += 1;
pageStore.close();
}

// TODO: Show total column size and overall size per value in the column summary line
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ private void initDataReader(Encoding dataEncoding, ByteBufferInputStream in, int

if (CorruptDeltaByteArrays.requiresSequentialReads(writerVersion, dataEncoding)
&& previousReader != null
&& previousReader instanceof RequiresPreviousReader) {
&& dataColumn instanceof RequiresPreviousReader) {
// previous reader can only be set if reading sequentially
((RequiresPreviousReader) dataColumn).setPreviousReader(previousReader);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
* <p>
* TODO: rename to RowGroup?
*/
public interface PageReadStore {
public interface PageReadStore extends AutoCloseable {

/**
* @param descriptor the descriptor of the column
Expand Down Expand Up @@ -58,4 +58,9 @@ default Optional<Long> getRowIndexOffset() {
default Optional<PrimitiveIterator.OfLong> getRowIndexes() {
return Optional.empty();
}

@Override
default void close() {
// No-op default implementation for compatibility
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
/**
* base class to implement an encoding for a given column
*/
public abstract class ValuesWriter {
public abstract class ValuesWriter implements AutoCloseable {

/**
* used to decide if we want to work to the next page
Expand Down Expand Up @@ -58,6 +58,7 @@ public abstract class ValuesWriter {
* Called to close the values writer. Any output stream is closed and can no longer be used.
* All resources are released.
*/
@Override
public void close() {}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder;
import org.apache.parquet.io.ParquetEncodingException;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.util.AutoCloseables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -98,7 +99,7 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req
protected ByteBufferAllocator allocator;
/* Track the list of writers used so they can be appropriately closed when necessary
(currently used for off-heap memory which is not garbage collected) */
private List<RunLengthBitPackingHybridEncoder> encoders = new ArrayList<>();
private List<AutoCloseable> toClose = new ArrayList<>();

protected DictionaryValuesWriter(
int maxDictionaryByteSize,
Expand All @@ -114,7 +115,7 @@ protected DictionaryValuesWriter(
protected DictionaryPage dictPage(ValuesWriter dictPageWriter) {
DictionaryPage ret =
new DictionaryPage(dictPageWriter.getBytes(), lastUsedDictionarySize, encodingForDictionaryPage);
dictPageWriter.close();
toClose.add(dictPageWriter);
return ret;
}

Expand Down Expand Up @@ -164,7 +165,7 @@ public BytesInput getBytes() {

RunLengthBitPackingHybridEncoder encoder =
new RunLengthBitPackingHybridEncoder(bitWidth, initialSlabSize, maxDictionaryByteSize, this.allocator);
encoders.add(encoder);
toClose.add(encoder);
IntIterator iterator = encodedValues.iterator();
try {
while (iterator.hasNext()) {
Expand Down Expand Up @@ -198,10 +199,8 @@ public void reset() {
@Override
public void close() {
encodedValues = null;
for (RunLengthBitPackingHybridEncoder encoder : encoders) {
encoder.close();
}
encoders.clear();
AutoCloseables.uncheckedClose(toClose);
toClose.clear();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
* <p>
* Only supports positive values (including 0) // TODO: is that ok? Should we make a signed version?
*/
public class RunLengthBitPackingHybridEncoder {
public class RunLengthBitPackingHybridEncoder implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(RunLengthBitPackingHybridEncoder.class);

private final BytePacker packer;
Expand Down Expand Up @@ -279,6 +279,7 @@ public void reset() {
reset(true);
}

@Override
public void close() {
reset(false);
baos.close();
Expand Down
Loading

0 comments on commit d839608

Please sign in to comment.