Skip to content

Commit

Permalink
[SPARK-24801][CORE] Avoid memory waste by empty byte[] arrays in Sasl…
Browse files Browse the repository at this point in the history
…Encryption$EncryptedMessage

## What changes were proposed in this pull request?

Initialize SaslEncryption$EncryptedMessage.byteChannel lazily,
so that empty, not yet used instances of ByteArrayWritableChannel
referenced by this field don't use up memory.

I analyzed a heap dump from Yarn Node Manager where this code is used, and found that there are over 40,000 of the above objects in memory, each with a big empty byte[] array. The reason they are all there is because of Netty queued up a large number of messages in memory  before transferTo() is called. There is a small number of netty ChannelOutboundBuffer objects, and then collectively , via linked lists starting from their flushedEntry data fields, they end up referencing over 40K ChannelOutboundBuffer$Entry objects, which ultimately reference EncryptedMessage objects.

## How was this patch tested?

Ran all the tests locally.

Author: Misha Dmitriev <[email protected]>

Closes #21811 from countmdm/misha/spark-24801.
  • Loading branch information
misha-cloudera authored and squito committed Jul 27, 2018
1 parent fa09d91 commit 094aa59
Showing 1 changed file with 7 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,14 @@ static class EncryptedMessage extends AbstractFileRegion {
private final boolean isByteBuf;
private final ByteBuf buf;
private final FileRegion region;
private final int maxOutboundBlockSize;

/**
* A channel used to buffer input data for encryption. The channel has an upper size bound
* so that if the input is larger than the allowed buffer, it will be broken into multiple
* chunks.
* chunks. Made non-final to enable lazy initialization, which saves memory.
*/
private final ByteArrayWritableChannel byteChannel;
private ByteArrayWritableChannel byteChannel;

private ByteBuf currentHeader;
private ByteBuffer currentChunk;
Expand All @@ -157,7 +158,7 @@ static class EncryptedMessage extends AbstractFileRegion {
this.isByteBuf = msg instanceof ByteBuf;
this.buf = isByteBuf ? (ByteBuf) msg : null;
this.region = isByteBuf ? null : (FileRegion) msg;
this.byteChannel = new ByteArrayWritableChannel(maxOutboundBlockSize);
this.maxOutboundBlockSize = maxOutboundBlockSize;
}

/**
Expand Down Expand Up @@ -292,6 +293,9 @@ public long transferTo(final WritableByteChannel target, final long position)
}

private void nextChunk() throws IOException {
if (byteChannel == null) {
byteChannel = new ByteArrayWritableChannel(maxOutboundBlockSize);
}
byteChannel.reset();
if (isByteBuf) {
int copied = byteChannel.write(buf.nioBuffer());
Expand Down

0 comments on commit 094aa59

Please sign in to comment.