-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-12084][Core]Fix codes that uses ByteBuffer.array incorrectly #10083
Conversation
ByteBuffer doesn't guarantee all contents in `ByteBuffer.array` are valid. E.g, a ByteBuffer returned by ByteBuffer.slice. We should not use the whole content of `ByteBuffer` unless we know that's correct. This patch fixed all places that use `ByteBuffer.array` incorrectly.
/cc @andrewor14 @JoshRosen @tdas @vanzin @srowen It's better to have more eyes review this one since it touches a lot of files. |
Test build #47020 has finished for PR 10083 at commit
|
Test build #47022 has finished for PR 10083 at commit
|
@@ -79,7 +79,10 @@ object AvroConversionUtil extends Serializable { | |||
|
|||
def unpackBytes(obj: Any): Array[Byte] = { | |||
val bytes: Array[Byte] = obj match { | |||
case buf: java.nio.ByteBuffer => buf.array() | |||
case buf: java.nio.ByteBuffer => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can't use bufferToArray
here too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is in examples
. So I don't want to use private API.
Looking good. Besides those comments, the changes all looked sound to me. |
@@ -81,7 +81,10 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) | |||
* seen values so to limit the number of times that decompression has to be done. | |||
*/ | |||
def decompress(schemaBytes: ByteBuffer): Schema = decompressCache.getOrElseUpdate(schemaBytes, { | |||
val bis = new ByteArrayInputStream(schemaBytes.array()) | |||
val bis = new ByteArrayInputStream( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because decompressCache
puts ByteBuffer
as a key, here should not change the schemaBytes
's position
, so cannot use ByteBufferInputStream
here.
@@ -307,7 +307,7 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends Serializ | |||
override def deserialize[T: ClassTag](bytes: ByteBuffer): T = { | |||
val kryo = borrowKryo() | |||
try { | |||
input.setBuffer(bytes.array) | |||
input.setBuffer(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not necessary for this change, but at some point it might be worth it to change this to use Kryo's ByteBufferInput
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kryo will use the array as an internal buffer. Why it's not necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm saying that the change I proposed is not necessary, not that your change is not necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm saying that the change I proposed is not necessary, not that your change is not necessary.
Got it. Sorry for my misunderstanding.
LGTM. |
Test build #47077 has finished for PR 10083 at commit
|
Test build #47083 has started for PR 10083 at commit |
retest this please |
Test build #47126 has finished for PR 10083 at commit
|
Ok, merging to master. I assume we don't want this in 1.6 at this point? |
@srowen I think this is not a bug currently, because the code works based on the buffers being created according to the assumptions being made. But this is needed to unblock SPARK-12060, which breaks the assumption. |
…lize Merged #10051 again since #10083 is resolved. This reverts commit 328b757. Author: Shixiong Zhu <[email protected]> Closes #10167 from zsxwing/merge-SPARK-12060.
ByteBuffer
doesn't guarantee all contents inByteBuffer.array
are valid. E.g, a ByteBuffer returned byByteBuffer.slice
. We should not use the whole content ofByteBuffer
unless we know that's correct.This patch fixed all places that use
ByteBuffer.array
incorrectly.