From 342ff2feb25d9b8550a5eb6607278c806f6f6685 Mon Sep 17 00:00:00 2001 From: lixiao Date: Tue, 14 Dec 2021 15:56:27 +0800 Subject: [PATCH 1/2] ORC-1060: reduce memory usage when vectorized reading dictionary string encoding columns In old code, when dictionary string encoding columns are read by vectorized reading, 2 copy of current stripe's dictionary data and 1 copy of next stripe's dictionary data are hold in memory when reading across different stripes. That could make vectorized reading's memory usage is larger than row reading. This patch fixes this issue, and only hold 1 copy of current stripe's dictionary data. This patch logic has 3 parts: 1) Directly read data to primitive byte array, rather than using DynamicByteArray as intermediate variable. Using DynamicByteArray as intermediate variable causes 2 copy of current stripe's dictionary data are hold in memory. 2) Lazy read dictionary data until read current batch data. In previous code, RecordReaderImpl class's nextBatch method reads dictionary data of next stripe through advanceToNextRow method, then memory will hold two stripe's dictionary data. Through lazy read logic, only one stripe's dictionary data is hold in memory when reading across different stripes. 3) Before lazy read dictionary data from current stripe, remove batch data's reference to dictionary data from previous stripe. This could allow GC to clean previous stripe's dictionary data memory. --- .../apache/orc/impl/TreeReaderFactory.java | 80 ++++++++++++------- 1 file changed, 49 insertions(+), 31 deletions(-) diff --git a/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java b/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java index f1002a1043..5aedca8064 100644 --- a/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java +++ b/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java @@ -2148,12 +2148,15 @@ public InStream getStream() { */ public static class StringDictionaryTreeReader extends TreeReader { private static final byte[] EMPTY_BYTE_ARRAY = new byte[0]; - private DynamicByteArray dictionaryBuffer; private int[] dictionaryOffsets; protected IntegerReader reader; + private InStream lengthStream; + private InStream dictionaryStream; + private OrcProto.ColumnEncoding lengthEncoding; - private byte[] dictionaryBufferInBytesCache = null; + private byte[] dictionaryBuffer = null; private final LongColumnVector scratchlcv; + private boolean initDictionary = false; StringDictionaryTreeReader(int columnId, Context context) throws IOException { this(columnId, null, null, null, null, null, context); @@ -2167,14 +2170,10 @@ protected StringDictionaryTreeReader(int columnId, InStream present, InStream da if (data != null && encoding != null) { this.reader = createIntegerReader(encoding.getKind(), data, false, context); } - - if (dictionary != null && encoding != null) { - readDictionaryStream(dictionary); - } - - if (length != null && encoding != null) { - readDictionaryLengthStream(length, encoding); - } + lengthStream = length; + dictionaryStream = dictionary; + lengthEncoding = encoding; + initDictionary = false; } @Override @@ -2190,22 +2189,21 @@ public void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException { public void startStripe(StripePlanner planner, ReadPhase readPhase) throws IOException { super.startStripe(planner, readPhase); - // read the dictionary blob StreamName name = new StreamName(columnId, - OrcProto.Stream.Kind.DICTIONARY_DATA); - InStream in = planner.getStream(name); - readDictionaryStream(in); + OrcProto.Stream.Kind.DICTIONARY_DATA); + dictionaryStream = planner.getStream(name); + initDictionary = false; // read the lengths name = new StreamName(columnId, OrcProto.Stream.Kind.LENGTH); - in = planner.getStream(name); + InStream in = planner.getStream(name); OrcProto.ColumnEncoding encoding = planner.getEncoding(columnId); readDictionaryLengthStream(in, encoding); // set up the row reader name = new StreamName(columnId, OrcProto.Stream.Kind.DATA); reader = createIntegerReader(encoding.getKind(), - planner.getStream(name), false, context); + planner.getStream(name), false, context); } private void readDictionaryLengthStream(InStream in, OrcProto.ColumnEncoding encoding) @@ -2231,10 +2229,18 @@ private void readDictionaryLengthStream(InStream in, OrcProto.ColumnEncoding enc private void readDictionaryStream(InStream in) throws IOException { if (in != null) { // Guard against empty dictionary stream. if (in.available() > 0) { - dictionaryBuffer = new DynamicByteArray(64, in.available()); - dictionaryBuffer.readAll(in); - // Since its start of strip invalidate the cache. - dictionaryBufferInBytesCache = null; + // remove reference to previous dictionary buffer + dictionaryBuffer = null; + int dictionaryBufferSize = dictionaryOffsets[dictionaryOffsets.length - 1]; + dictionaryBuffer = new byte[dictionaryBufferSize]; + int pos = 0; + int chunkSize = in.available(); + byte[] chunkBytes = new byte[chunkSize]; + while (pos < dictionaryBufferSize) { + int currentLength = in.read(chunkBytes, 0, chunkSize); + System.arraycopy(chunkBytes, 0, dictionaryBuffer, pos, currentLength); + pos += currentLength; + } } in.close(); } else { @@ -2261,6 +2267,23 @@ public void nextVector(ColumnVector previousVector, ReadPhase readPhase) throws IOException { final BytesColumnVector result = (BytesColumnVector) previousVector; + // remove reference to previous dictionary buffer + for (int i = 0; i < batchSize; i++) { + result.vector[i] = null; + } + + // lazy read dictionary buffer, + // ensure there is at most one dictionary buffer in memory when reading cross different file stripes + if (!initDictionary) { + if (lengthStream != null && lengthEncoding != null) { + readDictionaryLengthStream(lengthStream, lengthEncoding); + } + if (dictionaryStream != null) { + readDictionaryStream(dictionaryStream); + } + initDictionary = true; + } + // Read present/isNull stream super.nextVector(result, isNull, batchSize, filterContext, readPhase); readDictionaryByteArray(result, filterContext, batchSize); @@ -2274,11 +2297,6 @@ private void readDictionaryByteArray(BytesColumnVector result, if (dictionaryBuffer != null) { - // Load dictionaryBuffer into cache. - if (dictionaryBufferInBytesCache == null) { - dictionaryBufferInBytesCache = dictionaryBuffer.get(); - } - // Read string offsets scratchlcv.isRepeating = result.isRepeating; scratchlcv.noNulls = result.noNulls; @@ -2291,7 +2309,7 @@ private void readDictionaryByteArray(BytesColumnVector result, if (filterContext.isSelectedInUse()) { // Set all string values to null - offset and length is zero for (int i = 0; i < batchSize; i++) { - result.setRef(i, dictionaryBufferInBytesCache, 0, 0); + result.setRef(i, dictionaryBuffer, 0, 0); } // Read selected rows from stream for (int i = 0; i != filterContext.getSelectedSize(); i++) { @@ -2299,7 +2317,7 @@ private void readDictionaryByteArray(BytesColumnVector result, if (!scratchlcv.isNull[idx]) { offset = dictionaryOffsets[(int) scratchlcv.vector[idx]]; length = getDictionaryEntryLength((int) scratchlcv.vector[idx], offset); - result.setRef(idx, dictionaryBufferInBytesCache, offset, length); + result.setRef(idx, dictionaryBuffer, offset, length); } } } else { @@ -2307,10 +2325,10 @@ private void readDictionaryByteArray(BytesColumnVector result, if (!scratchlcv.isNull[i]) { offset = dictionaryOffsets[(int) scratchlcv.vector[i]]; length = getDictionaryEntryLength((int) scratchlcv.vector[i], offset); - result.setRef(i, dictionaryBufferInBytesCache, offset, length); + result.setRef(i, dictionaryBuffer, offset, length); } else { // If the value is null then set offset and length to zero (null string) - result.setRef(i, dictionaryBufferInBytesCache, 0, 0); + result.setRef(i, dictionaryBuffer, 0, 0); } } } @@ -2320,7 +2338,7 @@ private void readDictionaryByteArray(BytesColumnVector result, // set all the elements to the same value offset = dictionaryOffsets[(int) scratchlcv.vector[0]]; length = getDictionaryEntryLength((int) scratchlcv.vector[0], offset); - result.setRef(0, dictionaryBufferInBytesCache, offset, length); + result.setRef(0, dictionaryBuffer, offset, length); } result.isRepeating = scratchlcv.isRepeating; } else { @@ -2348,7 +2366,7 @@ int getDictionaryEntryLength(int entry, int offset) { if (entry < dictionaryOffsets.length - 1) { length = dictionaryOffsets[entry + 1] - offset; } else { - length = dictionaryBuffer.size() - offset; + length = dictionaryBuffer.length - offset; } return length; } From 2d7ca221917286e591d3691dc6f1f86674f7ec52 Mon Sep 17 00:00:00 2001 From: Yiqun Zhang Date: Thu, 16 Dec 2021 14:47:47 +0800 Subject: [PATCH 2/2] Adjust indentation to avoid committing unnecessary lines --- java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java b/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java index 5aedca8064..47ee9111d7 100644 --- a/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java +++ b/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java @@ -2190,7 +2190,7 @@ public void startStripe(StripePlanner planner, ReadPhase readPhase) throws IOExc super.startStripe(planner, readPhase); StreamName name = new StreamName(columnId, - OrcProto.Stream.Kind.DICTIONARY_DATA); + OrcProto.Stream.Kind.DICTIONARY_DATA); dictionaryStream = planner.getStream(name); initDictionary = false; @@ -2203,7 +2203,7 @@ public void startStripe(StripePlanner planner, ReadPhase readPhase) throws IOExc // set up the row reader name = new StreamName(columnId, OrcProto.Stream.Kind.DATA); reader = createIntegerReader(encoding.getKind(), - planner.getStream(name), false, context); + planner.getStream(name), false, context); } private void readDictionaryLengthStream(InStream in, OrcProto.ColumnEncoding encoding)