Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24801][CORE] Avoid memory waste by empty byte[] arrays in SaslEncryption$EncryptedMessage #21811

Closed
wants to merge 1 commit into from

Conversation

countmdm
Copy link

@countmdm countmdm commented Jul 18, 2018

What changes were proposed in this pull request?

Initialize SaslEncryption$EncryptedMessage.byteChannel lazily,
so that empty, not yet used instances of ByteArrayWritableChannel
referenced by this field don't use up memory.

I analyzed a heap dump from Yarn Node Manager where this code is used, and found that there are over 40,000 of the above objects in memory, each with a big empty byte[] array. The reason they are all there is because of Netty queued up a large number of messages in memory before transferTo() is called. There is a small number of netty ChannelOutboundBuffer objects, and then collectively , via linked lists starting from their flushedEntry data fields, they end up referencing over 40K ChannelOutboundBuffer$Entry objects, which ultimately reference EncryptedMessage objects.

How was this patch tested?

Ran all the tests locally.

…Encryption$EncryptedMessage

Initialize SaslEncryption$EncryptedMessage.byteChannel lazily,
so that empty, not yet used instances of ByteArrayWritableChannel
referenced by this field don't use up memory.

Ran all the tests locally.
@countmdm
Copy link
Author

countmdm commented Jul 18, 2018 via email

@squito
Copy link
Contributor

squito commented Jul 19, 2018

Ok to test

@squito
Copy link
Contributor

squito commented Jul 19, 2018

can you add something in the PR description about how this is important because sometimes many of these messages queue up in netty's ChannelOutboundBuffer before transferTo() is called? its discussed in the jira, but good to have here too

@squito
Copy link
Contributor

squito commented Jul 19, 2018

@zsxwing @jerryshao @Victsm you might be interested in this

Copy link
Member

@felixcheung felixcheung left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like it, but this will still create the byte channel right? is there a way to reuse it?

@kiszk
Copy link
Member

kiszk commented Jul 22, 2018

Does it make sense to release byteChannel at deallocate()?

if (byteChannel != null) {
  byteChannel = null;
}

@squito
Copy link
Contributor

squito commented Jul 23, 2018

I like it, but this will still create the byte channel right? is there a way to reuse it?

we could create a pool, though management becomes a bit more complex. would you ever shrink the pool, or would it always stay the same size? I guess it shouldn't grow more than 1 byte array per io thread.

I'd rather get this simple fix in first before doing anything more complicated ...

@squito
Copy link
Contributor

squito commented Jul 23, 2018

Does it make sense to release byteChannel at deallocate()?

you could, just to let GC kick in a bit earlier, but I don't think its going to make a big difference. (Netty's ByteBufs must be since they may be owned by netty's internal pools and never gc'ed)

@SparkQA
Copy link

SparkQA commented Jul 23, 2018

Test build #4221 has finished for PR 21811 at commit 8b46534.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member

kiszk commented Jul 23, 2018

I see. SGTM.
Would it be possible to attach heap profiling (allocated size for each type) before and after this PR to record the difference?

@countmdm
Copy link
Author

Thank you very much for your responses, @squito. I agree with all you said.

@kiszk the heap dump that prompted me to make this change was obtained from a customer, who probably ran into some unusual situation. So, not being a spark expert, I am not sure how to reproduce this problem. Of course, normally I am all for measuring the app's memory before and after the change to make sure it worked.

@kiszk
Copy link
Member

kiszk commented Jul 23, 2018

@countmdm I see. We may be interested in ratio regarding byte[] / all allocated memory before and after. Not interested in other objects (e.g. an object including customer's name).

@countmdm
Copy link
Author

@kiszk the situation "before" is well understood. In the respective SPARK-24801 ticket I present a fragment from the analysis of this heap dump by jxray (www.jxray.com). It shows that ~2.5GB of memory, or 64% of the used heap size, is wasted by ~40.5 thousand emtpty byte[] arrays in question:

2,597,946K (64.1%): byte[]: 40583 / 100% of empty 2,597,946K (64.1%)
↖org.apache.spark.network.util.ByteArrayWritableChannel.data
↖org.apache.spark.network.sasl.SaslEncryption$EncryptedMessage.byteChannel
↖io.netty.channel.ChannelOutboundBuffer$Entry.msg
...

However, we don't, and probably cannot, get the real "after" evidence. That's because, as I said, I don't know how to reproduce the situation in house. And I think it's very unlikely that the customer can easily reproduce it either, let alone accept our patched code and collect the necessary data before and after the fix. However, I believe this fix is simple and obvious enough, and thus we can be pretty sure that with it, in the above situation there would simply be no problematic byte[] arrays anymore, and memory consumption will be 64% smaller.

@kiszk
Copy link
Member

kiszk commented Jul 24, 2018

@countmdm Sorry for overlooking the JIRA description. I got the situation.
While the memory pool could be, it is too complex.

LGTM

@felixcheung
Copy link
Member

felixcheung commented Jul 24, 2018 via email

@squito
Copy link
Contributor

squito commented Jul 24, 2018

lgtm

will wait a bit for any more comments before merging

@dongjoon-hyun
Copy link
Member

+1, LGTM, too.

@squito
Copy link
Contributor

squito commented Jul 27, 2018

merged to master, thanks!

@asfgit asfgit closed this in 094aa59 Jul 27, 2018
dongjoon-hyun pushed a commit that referenced this pull request Jul 10, 2022
…edMessage by using a shared byteRawChannel

### What changes were proposed in this pull request?

This patch aims to reduce the memory overhead of `TransportCipher$EncryptedMessage`. In the current code, the `EncryptedMessage` constructor eagerly initializes a `ByteArrayWritableChannel byteRawChannel` (which consumes ~32kb of memory). If there are many `EncryptedMessage` instances on the heap (e.g. because there is a long queue of outgoing messages on a channel) then this overhead adds up and can cause OOMs or GC problems.

SPARK-24801 / #21811 fixed a similar issue in `SaslEncryption`. There, the fix was to lazily initialize the buffer: the buffer isn't actually accessed before `transferTo()` is called (and is only used there), so lazily initializing it there reduces memory requirements for queued outgoing messages.

In principle we could apply a similar lazy initialization fix here. In this PR, however, I have taken a different approach: I construct a single shared `ByteArrayWritableChannel byteRawChannel` in `TransportChannel$EncryptionHandler` and pass that shared instance to the `EncryptedMessage` constructor. I believe that this is safe because we are already doing this for the `byteEncChannel` channel buffer. That shared `byteEncChannel` gets `reset()` when `EncryptedMessage.deallocate()` is called. If we assume that existing sharing is correct then I think it's okay to apply similar sharing of the `byteRawChannel` buffer because its scope of use and lifecycle is similar.

### Why are the changes needed?

Improve performance and reduce a source of OOMs when encryption is enabled.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

**Correctness**: Existing unit tests.

**PerformanceI**: observed memory usage and performance improvements by running an artificial workload that significantly stresses the shuffle sending path. On a two-host Spark Standalone cluster where each host had an external shuffle service (with 1gb heap) and a 64-core executor, I ran the following code:

```scala
val numMapTasks = 25000
val numReduceTasks = 256
val random = new java.util.Random()
val inputData = spark.range(1, numMapTasks * numReduceTasks, 1, numMapTasks).map { x =>
  val bytes = new Array[Byte](10 * 1024)
  random.nextBytes(bytes)
  bytes
}
inputData.repartition(numReduceTasks).write.mode("overwrite").format("noop").save()
```

Prior to this patch, this job reliably failed because the Worker (where the shuffle service runs) would fill its heap and go into long GC pauses, eventually causing it to become disassociated from the Master. After this patch's changes, this job smoothly runs to completion.

Closes #37110 from JoshRosen/TransportCipher-memory-usage-improvements.

Authored-by: Josh Rosen <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants