Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ORC-1060: Reduce memory usage when vectorized reading dictionary string encoding columns #971

Merged
merged 2 commits into from
Dec 25, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 47 additions & 29 deletions java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -2148,12 +2148,15 @@ public InStream getStream() {
*/
public static class StringDictionaryTreeReader extends TreeReader {
private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
private DynamicByteArray dictionaryBuffer;
private int[] dictionaryOffsets;
protected IntegerReader reader;
private InStream lengthStream;
private InStream dictionaryStream;
private OrcProto.ColumnEncoding lengthEncoding;

private byte[] dictionaryBufferInBytesCache = null;
private byte[] dictionaryBuffer = null;
private final LongColumnVector scratchlcv;
private boolean initDictionary = false;

StringDictionaryTreeReader(int columnId, Context context) throws IOException {
this(columnId, null, null, null, null, null, context);
Expand All @@ -2167,14 +2170,10 @@ protected StringDictionaryTreeReader(int columnId, InStream present, InStream da
if (data != null && encoding != null) {
this.reader = createIntegerReader(encoding.getKind(), data, false, context);
}

if (dictionary != null && encoding != null) {
readDictionaryStream(dictionary);
}

if (length != null && encoding != null) {
readDictionaryLengthStream(length, encoding);
}
lengthStream = length;
dictionaryStream = dictionary;
lengthEncoding = encoding;
initDictionary = false;
}

@Override
Expand All @@ -2190,15 +2189,14 @@ public void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
public void startStripe(StripePlanner planner, ReadPhase readPhase) throws IOException {
super.startStripe(planner, readPhase);

// read the dictionary blob
StreamName name = new StreamName(columnId,
OrcProto.Stream.Kind.DICTIONARY_DATA);
InStream in = planner.getStream(name);
readDictionaryStream(in);
dictionaryStream = planner.getStream(name);
initDictionary = false;

// read the lengths
name = new StreamName(columnId, OrcProto.Stream.Kind.LENGTH);
in = planner.getStream(name);
InStream in = planner.getStream(name);
OrcProto.ColumnEncoding encoding = planner.getEncoding(columnId);
readDictionaryLengthStream(in, encoding);

Expand Down Expand Up @@ -2231,10 +2229,18 @@ private void readDictionaryLengthStream(InStream in, OrcProto.ColumnEncoding enc
private void readDictionaryStream(InStream in) throws IOException {
if (in != null) { // Guard against empty dictionary stream.
if (in.available() > 0) {
dictionaryBuffer = new DynamicByteArray(64, in.available());
dictionaryBuffer.readAll(in);
// Since its start of strip invalidate the cache.
dictionaryBufferInBytesCache = null;
// remove reference to previous dictionary buffer
dictionaryBuffer = null;
int dictionaryBufferSize = dictionaryOffsets[dictionaryOffsets.length - 1];
dictionaryBuffer = new byte[dictionaryBufferSize];
int pos = 0;
int chunkSize = in.available();
byte[] chunkBytes = new byte[chunkSize];
while (pos < dictionaryBufferSize) {
int currentLength = in.read(chunkBytes, 0, chunkSize);
System.arraycopy(chunkBytes, 0, dictionaryBuffer, pos, currentLength);
pos += currentLength;
}
}
in.close();
} else {
Expand All @@ -2261,6 +2267,23 @@ public void nextVector(ColumnVector previousVector,
ReadPhase readPhase) throws IOException {
final BytesColumnVector result = (BytesColumnVector) previousVector;

// remove reference to previous dictionary buffer
for (int i = 0; i < batchSize; i++) {
result.vector[i] = null;
}

// lazy read dictionary buffer,
// ensure there is at most one dictionary buffer in memory when reading cross different file stripes
if (!initDictionary) {
if (lengthStream != null && lengthEncoding != null) {
readDictionaryLengthStream(lengthStream, lengthEncoding);
}
if (dictionaryStream != null) {
readDictionaryStream(dictionaryStream);
}
initDictionary = true;
}

// Read present/isNull stream
super.nextVector(result, isNull, batchSize, filterContext, readPhase);
readDictionaryByteArray(result, filterContext, batchSize);
Expand All @@ -2274,11 +2297,6 @@ private void readDictionaryByteArray(BytesColumnVector result,

if (dictionaryBuffer != null) {

// Load dictionaryBuffer into cache.
if (dictionaryBufferInBytesCache == null) {
dictionaryBufferInBytesCache = dictionaryBuffer.get();
}

// Read string offsets
scratchlcv.isRepeating = result.isRepeating;
scratchlcv.noNulls = result.noNulls;
Expand All @@ -2291,26 +2309,26 @@ private void readDictionaryByteArray(BytesColumnVector result,
if (filterContext.isSelectedInUse()) {
// Set all string values to null - offset and length is zero
for (int i = 0; i < batchSize; i++) {
result.setRef(i, dictionaryBufferInBytesCache, 0, 0);
result.setRef(i, dictionaryBuffer, 0, 0);
}
// Read selected rows from stream
for (int i = 0; i != filterContext.getSelectedSize(); i++) {
int idx = filterContext.getSelected()[i];
if (!scratchlcv.isNull[idx]) {
offset = dictionaryOffsets[(int) scratchlcv.vector[idx]];
length = getDictionaryEntryLength((int) scratchlcv.vector[idx], offset);
result.setRef(idx, dictionaryBufferInBytesCache, offset, length);
result.setRef(idx, dictionaryBuffer, offset, length);
}
}
} else {
for (int i = 0; i < batchSize; i++) {
if (!scratchlcv.isNull[i]) {
offset = dictionaryOffsets[(int) scratchlcv.vector[i]];
length = getDictionaryEntryLength((int) scratchlcv.vector[i], offset);
result.setRef(i, dictionaryBufferInBytesCache, offset, length);
result.setRef(i, dictionaryBuffer, offset, length);
} else {
// If the value is null then set offset and length to zero (null string)
result.setRef(i, dictionaryBufferInBytesCache, 0, 0);
result.setRef(i, dictionaryBuffer, 0, 0);
}
}
}
Expand All @@ -2320,7 +2338,7 @@ private void readDictionaryByteArray(BytesColumnVector result,
// set all the elements to the same value
offset = dictionaryOffsets[(int) scratchlcv.vector[0]];
length = getDictionaryEntryLength((int) scratchlcv.vector[0], offset);
result.setRef(0, dictionaryBufferInBytesCache, offset, length);
result.setRef(0, dictionaryBuffer, offset, length);
}
result.isRepeating = scratchlcv.isRepeating;
} else {
Expand Down Expand Up @@ -2348,7 +2366,7 @@ int getDictionaryEntryLength(int entry, int offset) {
if (entry < dictionaryOffsets.length - 1) {
length = dictionaryOffsets[entry + 1] - offset;
} else {
length = dictionaryBuffer.size() - offset;
length = dictionaryBuffer.length - offset;
}
return length;
}
Expand Down