-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24386][SS] coalesce(1) aggregates in continuous processing #21560
Conversation
Test build #91817 has finished for PR 21560 at commit
|
Test build #91816 has finished for PR 21560 at commit
|
* RDD for continuous coalescing. Asynchronously writes all partitions of `prev` into a local | ||
* continuous shuffle, and then reads them in the task thread using `reader`. | ||
*/ | ||
class ContinuousCoalesceRDD(var reader: ContinuousShuffleReadRDD, var prev: RDD[InternalRow]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why the reader
and prev
both is var here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are to make it this RDD checkpointable by make this clearable. This raises a good point, I dont think we should make this checkpointable. Rather I suggest this, make these simple vals (well, just remove modifier), and in clearDependencies, just throw an error saying "Checkpoint this RDD is not supported".
We should do this for all the continuous shuffle RDDs.
case Repartition(1, false, child) => | ||
val isContinuous = child.collectFirst { | ||
case StreamingDataSourceV2Relation(_, _, _, r: ContinuousReader) => r | ||
}.isDefined |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The judgement of whether the plan is continuous or not can be a sperated method and other place can use it?
restest this please |
Test build #91882 has finished for PR 21560 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, this almost looks good. My only high-level comment is that the ContinuousCoalesceRDD should not take ContinuousShuffleReadRDD as a parameter. That is very confusing. It needs a ContinuousShuffleReadRDD only because it is reusing the ContinuousShuffleReadRDD code for coalescing which is an internal implementation detail, and that code should not be in ContinuousCoalesceExec. So move the createion of ContinuousShuffleReadRDD inside the ContinuousCoalesceRDD.
The rest are relatively minor changes.
queueSize: Int, | ||
numShuffleWriters: Int, | ||
epochIntervalMs: Long) | ||
extends Partition { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unnecessary
@@ -350,7 +350,14 @@ object UnsupportedOperationChecker { | |||
_: TypedFilter) => | |||
case node if node.nodeName == "StreamingRelationV2" => | |||
case node => | |||
throwError(s"Continuous processing does not support ${node.nodeName} operations.") | |||
val aboveSinglePartitionCoalesce = node.find { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this allow kafkaStreamDF.coalesc(1).select(...).filter(...).agg(...)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And are we allowing all stateful operations after this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will allow the first one, and I've added a test to verify.
It ought to allow the second one, but for some reason streaming deduplicate insists on inserting a shuffle above the coalesce(1). I will address this in a separate PR, since this seems like suboptimal behavior that isn't only restricted to continuous processing. For now I tweaked the condition to only allow aggregates.
}.isDefined | ||
|
||
if (!aboveSinglePartitionCoalesce) { | ||
throwError(s"Continuous processing does not support ${node.nodeName} operations.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice if this error statement says what is supported. That you can rewrite the query with colesce(1)
|
||
val childRdd = child.execute() | ||
val endpointName = s"RPCContinuousShuffleReader-${UUID.randomUUID()}" | ||
val reader = new ContinuousShuffleReadRDD( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nit: rename to readerRDD to avoid confusion with other v2 reader classes.
} | ||
|
||
val threads = prev.partitions.map { prevSplit => | ||
new Thread() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe use a thread pool (using org...spark.util.ThreadUtils
) with a name to track threads. Then the cached threads in threadpool can be reused across epochs.
* RDD for continuous coalescing. Asynchronously writes all partitions of `prev` into a local | ||
* continuous shuffle, and then reads them in the task thread using `reader`. | ||
*/ | ||
class ContinuousCoalesceRDD(var reader: ContinuousShuffleReadRDD, var prev: RDD[InternalRow]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are to make it this RDD checkpointable by make this clearable. This raises a good point, I dont think we should make this checkpointable. Rather I suggest this, make these simple vals (well, just remove modifier), and in clearDependencies, just throw an error saying "Checkpoint this RDD is not supported".
We should do this for all the continuous shuffle RDDs.
import org.apache.spark.sql.execution.streaming.continuous.shuffle._ | ||
|
||
case class ContinuousCoalesceRDDPartition(index: Int) extends Partition { | ||
private[continuous] var writersInitialized: Boolean = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add docs on what this means.
} | ||
|
||
override def clearDependencies() { | ||
super.clearDependencies() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As commented above, this should actually throw exception so that this is never checkpointed.
@@ -51,7 +51,7 @@ class ContinuousDataSourceRDD( | |||
sc: SparkContext, | |||
dataQueueSize: Int, | |||
epochPollIntervalMs: Long, | |||
@transient private val readerFactories: Seq[InputPartition[UnsafeRow]]) | |||
private val readerFactories: Seq[InputPartition[UnsafeRow]]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since all the partitions do no need all the factories, the right thing to do is to put partition's factory in the partition object. This is so that the all factories are not serialized for all tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to be able to generate the full list of partitions from within a single task in order for coalesce to work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait. I dont see the readerFactories
object to be used anywhere other than in getPartitions, where they are saved as part of ContinuousDataSourceRDDPartition
objects. And RDD.compute() seems to picking it up from ContinuousDataSourceRDDPartition
objects, and not from readerFactories
. So I dont think readerFactories
needs to be serialized.
At the very least, rename readerFactories
to readerInputPartitions
for consistency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We list the partitions when computing the coalesce RDD. Should we instead be packing the partitions into the partitions of the coalesce RDD? I'd assumed it was valid to expect that rdd.partitions would work on executors, but maybe it's not.
|
||
override def getDependencies: Seq[Dependency[_]] = { | ||
Seq(new NarrowDependency(prev) { | ||
def getParents(id: Int): Seq[Int] = Seq(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should 1 partition of this class depend on all parant RDD partitions, and not just the 0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I confused myself when looking at the normal coalesce RDD. The default dependency handling is correct here.
Test build #92146 has finished for PR 21560 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great overall. (FYI I'm mostly concerned about functional correctness while reviewing.)
Left some comments but they're minors (mostly for completeness but this patch is for intermediate state) so you can skip addressing them.
@@ -61,12 +63,14 @@ class ContinuousShuffleReadRDD( | |||
numPartitions: Int, | |||
queueSize: Int = 1024, | |||
numShuffleWriters: Int = 1, | |||
epochIntervalMs: Long = 1000) | |||
epochIntervalMs: Long = 1000, | |||
val endpointNames: Seq[String] = Seq(s"RPCContinuousShuffleReader-${UUID.randomUUID()}")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here: if possible it might be better to have complete code rather than just working with such assumption.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just a default argument to make tests less wordy. I can remove it if you think that's best, but it doesn't impose a restriction.
prev: RDD[InternalRow]) | ||
extends RDD[InternalRow](context, Nil) { | ||
|
||
override def getPartitions: Array[Partition] = Array(ContinuousCoalesceRDDPartition(0)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are addressing only the specific case that number of partitions is 1, but we could have some assertion for that and try to write complete code so that we don't modify it again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. And since theres an assert (numpartitions == 1)
in ContinuousCoalesceExec
, we can probably create any array of numPartitions
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've made some changes to try to restrict the assumption that the number of partitions is 1 to two places:
- ContinuousCoalesceExec
- The output partitioner in ContinuousCoalesceRDD, since it's not obvious to me what the right strategy to get this would be in the general case. If you have ideas I'm open to removing this too.
extends RDD[UnsafeRow](sc, Nil) { | ||
|
||
override protected def getPartitions: Array[Partition] = { | ||
(0 until numPartitions).map { partIndex => | ||
ContinuousShuffleReadPartition(partIndex, queueSize, numShuffleWriters, epochIntervalMs) | ||
ContinuousShuffleReadPartition( | ||
partIndex, endpointNames(partIndex), queueSize, numShuffleWriters, epochIntervalMs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This effectively asserting numPartitions to be 1, otherwise it will throw exception.
case Repartition(1, false, _) => | ||
case node: Aggregate => | ||
val aboveSinglePartitionCoalesce = node.find { | ||
case Repartition(1, false, _) => true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if we have multiple repartitions which one meets the case and others are not? I'm not sure we are restricting repartition operations to be only once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there's any particular reason we need to. There's no reason we couldn't execute multiple repartitions if the optimizer isn't smart enough to combine them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh wait, I see what you mean. Repartition(5, ...) would never be matched by this rule, since it only applies to Aggregate.
@@ -98,6 +98,10 @@ class ContinuousDataSourceRDD( | |||
override def getPreferredLocations(split: Partition): Seq[String] = { | |||
split.asInstanceOf[ContinuousDataSourceRDDPartition].inputPartition.preferredLocations() | |||
} | |||
|
|||
override def clearDependencies(): Unit = { | |||
throw new IllegalStateException("Continuous RDDs cannot be checkpointed") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering the method can be called in normal situation: when continuous query is gracefully terminated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know, I'm unfamiliar with this method. @tdas
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HeartSaVioR No this method no intended for being called in normal circumstance. And less of a reason to call this in an internally generated RDD.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall. Left a few comments.
case _ => false | ||
}.isDefined | ||
|
||
if (!aboveSinglePartitionCoalesce) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if there was only a single partition to begin with ? Then theres no need of Repartition(1) and this check should be skipped.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that it wouldn't be needed, but partitioning information is not always available during analysis. So I don't think we can write the more granular check suggested here.
case _ => false | ||
}.isDefined | ||
|
||
if (!aboveSinglePartitionCoalesce) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also if theres a single parent partition and theres a Repartition(1)
that node should probably be removed. Not sure if this is already being done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(same comment as above applies here - we don't have partitioning information in analysis)
prev: RDD[InternalRow]) | ||
extends RDD[InternalRow](context, Nil) { | ||
|
||
override def getPartitions: Array[Partition] = Array(ContinuousCoalesceRDDPartition(0)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. And since theres an assert (numpartitions == 1)
in ContinuousCoalesceExec
, we can probably create any array of numPartitions
here.
|
||
if (!split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized) { | ||
val rpcEnv = SparkEnv.get.rpcEnv | ||
val outputPartitioner = new HashPartitioner(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I am missing. Is this more like a re-partition (just shuffles) than coalesce?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Repartition would normally imply distributed execution, which isn't happening here.
runnables.foreach(threadPool.execute) | ||
} | ||
|
||
readerRDD.compute(readerRDD.partitions(split.index), context) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The writer.write
and readerRDD.compute()
is going to execute as a separate tasks (but concurrently since there are no stage boundaries) correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, they'll be in the same task. Just different threads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If its the same task, then do we need the RPC mechanism to pass the rows around ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a queue inside the ContinuousShuffleReadRDD
that is buffering all the records that are being sent out by the RPCContinuousShuffleWriter
. And the compute function is returning data from that queue.
As I commented above, we dont really need the ContinuousShuffleReadRDD, just the ContinuousShuffleReader
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. Also the 2*numShuffle partition threads here looks like an overhead. Maybe ok for now but the CoalesceRDD iterator could just iterate over the parent RDD partitions tracking the epochs, returning the rows and terminating when the epoch is received from all its parents.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it could be made more efficient. Part of the goal here is to ensure that the shuffling does indeed work end-to-end, so we can work on both the shuffle framework and distributed repartitioning in parallel.
Test build #92310 has finished for PR 21560 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very close. Just one major refactoring comment.
@@ -51,7 +51,7 @@ class ContinuousDataSourceRDD( | |||
sc: SparkContext, | |||
dataQueueSize: Int, | |||
epochPollIntervalMs: Long, | |||
@transient private val readerFactories: Seq[InputPartition[UnsafeRow]]) | |||
private val readerFactories: Seq[InputPartition[UnsafeRow]]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait. I dont see the readerFactories
object to be used anywhere other than in getPartitions, where they are saved as part of ContinuousDataSourceRDDPartition
objects. And RDD.compute() seems to picking it up from ContinuousDataSourceRDDPartition
objects, and not from readerFactories
. So I dont think readerFactories
needs to be serialized.
At the very least, rename readerFactories
to readerInputPartitions
for consistency.
override def doExecute(): RDD[InternalRow] = { | ||
assert(numPartitions == 1) | ||
|
||
val childRdd = child.execute() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Dont need this variable. And merge remove excess empty lines.
s"ContinuousCoalesceRDD-part$i-${UUID.randomUUID()}" | ||
} | ||
|
||
val readerRDD = new ContinuousShuffleReadRDD( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, honestly, you dont need the RDD here. You only need the shuffle reading code, which is the ContinuousShuffleReader
and endpoint. So you can just instantiate that in the compute function. Its very confusing to an RDD inside another RDD which is not hooked to the dependency chain.
runnables.foreach(threadPool.execute) | ||
} | ||
|
||
readerRDD.compute(readerRDD.partitions(split.index), context) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a queue inside the ContinuousShuffleReadRDD
that is buffering all the records that are being sent out by the RPCContinuousShuffleWriter
. And the compute function is returning data from that queue.
As I commented above, we dont really need the ContinuousShuffleReadRDD, just the ContinuousShuffleReader
.coalesce(1) | ||
.select('value as 'copy, 'value) | ||
.where('copy =!= 2) | ||
.agg(max('value)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test transformations both before and after coalesce.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You missed this comment.
Test build #92360 has finished for PR 21560 at commit
|
Sorry, that wasn't meant to be a complete push. Added the tests now. |
LGTM assuming tests pass. |
Test build #92387 has finished for PR 21560 at commit
|
What changes were proposed in this pull request?
Provide a continuous processing implementation of coalesce(1), as well as allowing aggregates on top of it.
The changes in ContinuousQueuedDataReader and such are to use split.index (the ID of the partition within the RDD currently being compute()d) rather than context.partitionId() (the partition ID of the scheduled task within the Spark job - that is, the post coalesce writer). In the absence of a narrow dependency, these values were previously always the same, so there was no need to distinguish.
How was this patch tested?
new unit test