From 094aa597155dfcbf41a2490c9e462415e3824901 Mon Sep 17 00:00:00 2001 From: Misha Dmitriev Date: Thu, 26 Jul 2018 22:15:12 -0500 Subject: [PATCH] [SPARK-24801][CORE] Avoid memory waste by empty byte[] arrays in SaslEncryption$EncryptedMessage ## What changes were proposed in this pull request? Initialize SaslEncryption$EncryptedMessage.byteChannel lazily, so that empty, not yet used instances of ByteArrayWritableChannel referenced by this field don't use up memory. I analyzed a heap dump from Yarn Node Manager where this code is used, and found that there are over 40,000 of the above objects in memory, each with a big empty byte[] array. The reason they are all there is because of Netty queued up a large number of messages in memory before transferTo() is called. There is a small number of netty ChannelOutboundBuffer objects, and then collectively , via linked lists starting from their flushedEntry data fields, they end up referencing over 40K ChannelOutboundBuffer$Entry objects, which ultimately reference EncryptedMessage objects. ## How was this patch tested? Ran all the tests locally. Author: Misha Dmitriev Closes #21811 from countmdm/misha/spark-24801. --- .../org/apache/spark/network/sasl/SaslEncryption.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java b/common/network-common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java index 3ac9081d78a75..d3b2a334baadd 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java +++ b/common/network-common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java @@ -135,13 +135,14 @@ static class EncryptedMessage extends AbstractFileRegion { private final boolean isByteBuf; private final ByteBuf buf; private final FileRegion region; + private final int maxOutboundBlockSize; /** * A channel used to buffer input data for encryption. The channel has an upper size bound * so that if the input is larger than the allowed buffer, it will be broken into multiple - * chunks. + * chunks. Made non-final to enable lazy initialization, which saves memory. */ - private final ByteArrayWritableChannel byteChannel; + private ByteArrayWritableChannel byteChannel; private ByteBuf currentHeader; private ByteBuffer currentChunk; @@ -157,7 +158,7 @@ static class EncryptedMessage extends AbstractFileRegion { this.isByteBuf = msg instanceof ByteBuf; this.buf = isByteBuf ? (ByteBuf) msg : null; this.region = isByteBuf ? null : (FileRegion) msg; - this.byteChannel = new ByteArrayWritableChannel(maxOutboundBlockSize); + this.maxOutboundBlockSize = maxOutboundBlockSize; } /** @@ -292,6 +293,9 @@ public long transferTo(final WritableByteChannel target, final long position) } private void nextChunk() throws IOException { + if (byteChannel == null) { + byteChannel = new ByteArrayWritableChannel(maxOutboundBlockSize); + } byteChannel.reset(); if (isByteBuf) { int copied = byteChannel.write(buf.nioBuffer());