-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-12078][Core]Fix ByteBuffer.limit misuse #10076
Conversation
@@ -253,7 +253,7 @@ private[spark] class Executor( | |||
|
|||
val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.orNull) | |||
val serializedDirectResult = ser.serialize(directResult) | |||
val resultSize = serializedDirectResult.limit | |||
val resultSize = serializedDirectResult.remaining() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right that there's an implicit assumption in some of this code that the buffer's position is 0 on returning, and the entire buffer is filled with valid data. Do we have a situation where the position is not 0 though, but is correctly at the start of the data? at least, this looks like it handles the situation, but it sounds unusual. Equally, if that's an issue, are we sure the entire buffer has valid data, through the end? that assumption is still present here, that the end of the data is the end of the buffer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have a situation where the position is not 0 though, but is correctly at the start of the data?
If a ByteBuffer
is from Netty, the position could be a non-zero value.
Equally, if that's an issue, are we sure the entire buffer has valid data, through the end? that assumption is still present here, that the end of the data is the end of the buffer.
The ByteBuffer
may contain more data internally, but the user should only read the part between position
and limit
. I think that's defined in ByteBuffer/Buffer
javadoc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found I was wrong about the position of ByteBuffer
from Netty. Netty will call ByteBuffer.slice
to reset the position to 0 before returning it: https://github.com/netty/netty/blob/0f9492c9affc528c766f9677952412564d4a3f6d/buffer/src/main/java/io/netty/buffer/PooledHeapByteBuf.java#L269
I think we don't need this patch.
Test build #46995 has finished for PR 10076 at commit
|
@@ -51,14 +51,14 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul | |||
try { | |||
val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match { | |||
case directResult: DirectTaskResult[_] => | |||
if (!taskSetManager.canFetchMoreResults(serializedData.limit())) { | |||
if (!taskSetManager.canFetchMoreResults(serializedData.remaining())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like it makes a unit test fail. I think you may have to check the size before the deserializer consumes the byte buffer?
This is overall looking good but we probably have to comb through these a little more to think through the implications.
I want to put this one on hold until #10083 gets merged. |
Test build #47281 has finished for PR 10076 at commit
|
Test build #47291 has finished for PR 10076 at commit
|
ByteBuffer.limit
is not the remaining size of ByteBuffer.ByteBuffer.limit
is equal toByteBuffer.remaining
only ifByteBuffer.position
is 0.I just went through the codes and replaced misused
limit
withremaining
.