-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-28571][CORE][SHUFFLE] Use the shuffle writer plugin for the SortShuffleWriter #25342
[SPARK-28571][CORE][SHUFFLE] Use the shuffle writer plugin for the SortShuffleWriter #25342
Conversation
Test build #108591 has finished for PR 25342 at commit
|
Retest this please. |
cc @jerryshao |
Test build #108635 has finished for PR 25342 at commit
|
* | ||
* @param blockId block ID to write to. The index file will be blockId.name + ".index". | ||
* @return array of lengths, in bytes, of each partition of the file (used by map output tracker) | ||
* TODO remove this, as this is only used by UnsafeRowSerializerSuite in the SQL project. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. Could you file a JIRA and make this IDed TODO, please?
* @return array of lengths, in bytes, of each partition of the file (used by map output tracker) | ||
*/ | ||
def writePartitionedMapOutput( | ||
shuffleId: Int, mapId: Int, mapOutputWriter: ShuffleMapOutputWriter): Array[Long] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: multi-line arg style
} | ||
|
||
override def close(): Unit = { | ||
if (isOpen) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor, but if there's an error in open()
(e.g. when initializing wrappedStream
) this will leave the underlying partitionStream
opened.
Maybe this flag isn't needed and you can just check whether the fields are initialized?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The worry is unnecessary because wrappedStream
and objOut
would must be initialized successfully if partitionStream
is opened as OutputStream without exception.
And I think flag isOpen
makes code easier to understand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because wrappedStream and objOut would must be initialized successfully
That's not necessarily a valid assumption. Compression codecs, e.g., may throw exceptions if the file is corrupt.
Will spill be supported in the series of PRs? @mccheah |
curNumBytesWritten = numBytesWritten | ||
} | ||
|
||
private class CloseShieldOutputStream(delegate: OutputStream) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the usage of this class. Sorry I can only see the definition here.
@gczsjdy
No, spill is still to local disk. trying to generalize local spills was explicitly out of scope for now. |
Thanks @squito |
|
||
package org.apache.spark.util.collection | ||
|
||
private[spark] trait PairsWriter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
: nit add docs where can this be used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also done
* A key-value writer inspired by {@link DiskBlockObjectWriter} that pushes the bytes to an | ||
* arbitrary partition writer instead of writing to local disk through the block manager. | ||
*/ | ||
private[spark] class ShufflePartitionPairsWriter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should instead be in o.a.s.s
package?
@@ -46,7 +47,8 @@ private[spark] class DiskBlockObjectWriter( | |||
writeMetrics: ShuffleWriteMetricsReporter, | |||
val blockId: BlockId = null) | |||
extends OutputStream | |||
with Logging { | |||
with Logging | |||
with PairsWriter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:nit add override
to one function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think this should be done now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good job
Addressed comments. |
Test build #109240 has finished for PR 25342 at commit
|
@vanzin @squito or @dongjoon-hyun - is this good to merge? |
core/src/main/scala/org/apache/spark/shuffle/ShufflePartitionPairsWriter.scala
Show resolved
Hide resolved
Test build #109760 has finished for PR 25342 at commit
|
retest this please |
Test build #109766 has finished for PR 25342 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
other than marcelo's comment, looks good
* | ||
* @param blockId block ID to write to. The index file will be blockId.name + ".index". | ||
* @return array of lengths, in bytes, of each partition of the file (used by map output tracker) | ||
* TODO(SPARK-28764): remove this, as this is only used by UnsafeRowSerializerSuite in the SQL |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't that test just call sorter.writePartitionedMapOutput(..., new LocalDiskMapOutputWriter(...))
? Anyway fine to leave it for the follow up jira.
presentPrev | ||
}).orElse(Some(e)) | ||
} | ||
resolvedException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't think of anything wrong here, but seems safer to be using finally
. kind of a stretch, but if some (badly implemented) stream throws a RuntimeException instead of an IOException you wouldn't clean up properly this way. The nesting gets a bit ugly, but you could do this:
def closeIfNonNull[T <: Closeable](x: T): T = {
if (x != null) x.close()
null.asInstanceOf[T]
}
Utils.tryWithSafeFinally {
objOut = closeIfNonNull(objOut)
} {
// normally closing objOut would close the inner streams as well, but just in case there was
// an error in initialization etc. we make sure we clean the other streams up too
Utils.tryWithSafeFinally {
wrappedStream = closeIfNonNull(wrappedStream)
} {
partitionStream = closeIfNonNull(partitionStream)
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also prefer Imran's approach. I'm just a tiny bit worried about bad stream implementations that don't have an idempotent close()
, since both your code and Imran's are calling it multiple times on certain streams.
Probably ok not to deal with that though.
override def write(key: Any, value: Any): Unit = { | ||
if (!isOpen) { | ||
open() | ||
isOpen = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Speaking of being nitpicky about error handling, this flag has weird semantics. If you call write
and it fails to initialize the streams, and then you call write
again, you'll potentially dereference still open streams.
val mapOutputWriter = shuffleExecutorComponents.createMapOutputWriter( | ||
dep.shuffleId, mapId, context.taskAttemptId(), dep.partitioner.numPartitions) | ||
val partitionLengths = sorter.writePartitionedMapOutput(dep.shuffleId, mapId, mapOutputWriter) | ||
mapOutputWriter.commitAllPartitions() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just checking that you don't need any changes here? Given your other change that made commitAllPartitions
return the partition lengths.
Test build #109825 has finished for PR 25342 at commit
|
null.asInstanceOf[T] | ||
} | ||
|
||
private def tryCloseOrAddSuppressed( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not used anymore.
partitionStream = closeIfNonNull(partitionStream) | ||
} | ||
} | ||
isOpen = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One last comment about error handling. I'll just quote the AutoCloseable
documentation instead:
It is strongly advised to relinquish the underlying resources and to internally
mark the resource as closed, prior to throwing the exception.
Meaning, track whether you've closed the object, not whether it's opened. (isOpen
can be replaced with objOut != null
.) Then in close()
do nothing if the stream has already been closed.
partitionPairsWriter.write(elem._1, elem._2) | ||
} | ||
} | ||
var threwException = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shadow variable. But I wonder if tryWithSafeFinally
isn't better here (and in the
"mirror" block above for the no-spill case).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this mirrors how UnsafeShuffleWriter
and BypassMergeSortShuffleWriter
approaches these cases - but those are written in Java so it's harder to use tryWithSafeFinally
from there.
Test build #109829 has finished for PR 25342 at commit
|
Test build #109835 has finished for PR 25342 at commit
|
Test build #109840 has finished for PR 25342 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks ok with a minor thing. Will wait a bit to see if others have any comments.
if (partitionPairsWriter != null) { | ||
partitionPairsWriter.close() | ||
} | ||
if (partitionWriter != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dead code? Or missing code?
Test build #109884 has finished for PR 25342 at commit
|
Alright, no more comments, so merging to master. |
…rtShuffleWriter Use the shuffle writer APIs introduced in SPARK-28209 in the sort shuffle writer. Existing unit tests were changed to use the plugin instead, and they used the local disk version to ensure that there were no regressions. Closes apache#25342 from mccheah/shuffle-writer-refactor-sort-shuffle-writer. Lead-authored-by: mcheah <[email protected]> Co-authored-by: mccheah <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]>
* Bring implementation into closer alignment with upstream. Step to ease merge conflict resolution and build failure problems when we pull in changes from upstream. * Cherry-pick BypassMergeSortShuffleWriter changes and shuffle writer API changes * [SPARK-28607][CORE][SHUFFLE] Don't store partition lengths twice The shuffle writer API introduced in SPARK-28209 has a flaw that leads to a memory usage regression - we ended up tracking the partition lengths in two places. Here, we modify the API slightly to avoid redundant tracking. The implementation of the shuffle writer plugin is now responsible for tracking the lengths of partitions, and propagating this back up to the higher shuffle writer as part of the commitAllPartitions API. Existing unit tests. Closes apache#25341 from mccheah/dont-redundantly-store-part-lengths. Authored-by: mcheah <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]> * [SPARK-28571][CORE][SHUFFLE] Use the shuffle writer plugin for the SortShuffleWriter Use the shuffle writer APIs introduced in SPARK-28209 in the sort shuffle writer. Existing unit tests were changed to use the plugin instead, and they used the local disk version to ensure that there were no regressions. Closes apache#25342 from mccheah/shuffle-writer-refactor-sort-shuffle-writer. Lead-authored-by: mcheah <[email protected]> Co-authored-by: mccheah <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]> * [SPARK-28570][CORE][SHUFFLE] Make UnsafeShuffleWriter use the new API. * Resolve build issues and remaining semantic conflicts * More build fixes * More build fixes * Attempt to fix build * More build fixes * [SPARK-29072] Put back usage of TimeTrackingOutputStream for UnsafeShuffleWriter and ShufflePartitionPairsWriter. * Address comments * Import ordering * Fix stream reference
@@ -157,7 +157,8 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager | |||
metrics, | |||
shuffleExecutorComponents) | |||
case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] => | |||
new SortShuffleWriter(shuffleBlockResolver, other, mapId, context) | |||
new SortShuffleWriter( | |||
shuffleBlockResolver, other, mapId, context, shuffleExecutorComponents) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shuffleBlockResolver is not needed.
What changes were proposed in this pull request?
Use the shuffle writer APIs introduced in SPARK-28209 in the sort shuffle writer.
How was this patch tested?
Existing unit tests were changed to use the plugin instead, and they used the local disk version to ensure that there were no regressions.