-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-6986][SQL] Use Serializer2 in more cases. #5849
Conversation
Merged build triggered. |
Merged build started. |
Test build #31612 has started for PR 5849 at commit |
Test build #31612 has finished for PR 5849 at commit
|
Merged build finished. Test FAILed. |
Test FAILed. |
Merged build triggered. |
Merged build started. |
Test build #31657 has started for PR 5849 at commit |
Test build #31657 timed out for PR 5849 at commit |
Merged build finished. Test FAILed. |
Test FAILed. |
Seems the reason of those test failures is that we are buffering records in the reader side of the shuffle process and we are currently using mutable rows, which require explicitly @sryza Is there any place in the sort based shuffle that we buffer key-value pairs? |
Merged build triggered. |
Merged build started. |
Test build #32025 has started for PR 5849 at commit |
Test build #32025 has finished for PR 5849 at commit
|
Merged build finished. Test FAILed. |
Test FAILed. |
stack trace is
@sryza Is there any particular reason that |
Merged build triggered. |
Merged build started. |
Test build #32084 has started for PR 5849 at commit |
Test build #32084 has finished for PR 5849 at commit
|
Merged build finished. Test PASSed. |
Test PASSed. |
} | ||
|
||
override def readKey[T: ClassTag](): T = { | ||
readKeyFunc() | ||
key.asInstanceOf[T] | ||
readKeyFunc().asInstanceOf[T] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make a performance difference if we move the cast to the line where we define readKeyFunc
? If we did that, I think we'd be doing one cast vs. casting on each record.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I guess we don't have the class tag for T when we create the deserialization function, so this approach looks fine to me.
() => { | ||
// If the schema is null, the returned function does nothing when it get called. | ||
if (schema != null) { | ||
var i = 0 | ||
val mutableRow = new GenericMutableRow(schema.length) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yhuai and I chatted about this offline. The reason that we need to perform this copy is because this patch allows SqlSerializer2 to be used in cases where the shuffle performs a sort. In HashShuffleReader, Spark ends up passing the iterator returned from this deserializer to ExternalSorter, which buffers rows because it needs to sort them based on their contents.
I think that we only need to copy the row in cases where we're shuffling with a key ordering. To avoid unnecessary copying in other cases, I think that we can extend SparkSqlSerializer2
's constructor to accept a boolean flag that indicates whether we should copy, and should thread that flag all the way down to here. In Exchange
, where we create the serializer, we can check whether the shuffle will use a keyOrdering; if it does, then we'll enable copying. Avoiding this copy in other cases should provide a nice performance boost for aggregation queries.
Merged build triggered. |
Merged build started. |
Test build #32158 has started for PR 5849 at commit |
Test build #32158 has finished for PR 5849 at commit
|
Merged build finished. Test PASSed. |
Test PASSed. |
LGTM overall, especially since this code seems to be well covered by tests. |
Thanks for reviewing it. I am merging it to master and branch 1.4. |
With 0a2b15c, the serialization stream and deserialization stream has enough information to determine it is handling a key-value pari, a key, or a value. It is safe to use `SparkSqlSerializer2` in more cases. Author: Yin Huai <[email protected]> Closes #5849 from yhuai/serializer2MoreCases and squashes the following commits: 53a5eaa [Yin Huai] Josh's comments. 487f540 [Yin Huai] Use BufferedOutputStream. 8385f95 [Yin Huai] Always create a new row at the deserialization side to work with sort merge join. c7e2129 [Yin Huai] Update tests. 4513d13 [Yin Huai] Use Serializer2 in more places. (cherry picked from commit 3af423c) Signed-off-by: Yin Huai <[email protected]>
With apache@0a2b15c, the serialization stream and deserialization stream has enough information to determine it is handling a key-value pari, a key, or a value. It is safe to use `SparkSqlSerializer2` in more cases. Author: Yin Huai <[email protected]> Closes apache#5849 from yhuai/serializer2MoreCases and squashes the following commits: 53a5eaa [Yin Huai] Josh's comments. 487f540 [Yin Huai] Use BufferedOutputStream. 8385f95 [Yin Huai] Always create a new row at the deserialization side to work with sort merge join. c7e2129 [Yin Huai] Update tests. 4513d13 [Yin Huai] Use Serializer2 in more places.
With apache@0a2b15c, the serialization stream and deserialization stream has enough information to determine it is handling a key-value pari, a key, or a value. It is safe to use `SparkSqlSerializer2` in more cases. Author: Yin Huai <[email protected]> Closes apache#5849 from yhuai/serializer2MoreCases and squashes the following commits: 53a5eaa [Yin Huai] Josh's comments. 487f540 [Yin Huai] Use BufferedOutputStream. 8385f95 [Yin Huai] Always create a new row at the deserialization side to work with sort merge join. c7e2129 [Yin Huai] Update tests. 4513d13 [Yin Huai] Use Serializer2 in more places.
With apache@0a2b15c, the serialization stream and deserialization stream has enough information to determine it is handling a key-value pari, a key, or a value. It is safe to use `SparkSqlSerializer2` in more cases. Author: Yin Huai <[email protected]> Closes apache#5849 from yhuai/serializer2MoreCases and squashes the following commits: 53a5eaa [Yin Huai] Josh's comments. 487f540 [Yin Huai] Use BufferedOutputStream. 8385f95 [Yin Huai] Always create a new row at the deserialization side to work with sort merge join. c7e2129 [Yin Huai] Update tests. 4513d13 [Yin Huai] Use Serializer2 in more places.
With 0a2b15c, the serialization stream and deserialization stream has enough information to determine it is handling a key-value pari, a key, or a value. It is safe to use
SparkSqlSerializer2
in more cases.