Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24386][SS] coalesce(1) aggregates in continuous processing #21560

Closed
wants to merge 38 commits into from

Conversation

jose-torres
Copy link
Contributor

What changes were proposed in this pull request?

Provide a continuous processing implementation of coalesce(1), as well as allowing aggregates on top of it.

The changes in ContinuousQueuedDataReader and such are to use split.index (the ID of the partition within the RDD currently being compute()d) rather than context.partitionId() (the partition ID of the scheduled task within the Spark job - that is, the post coalesce writer). In the absence of a narrow dependency, these values were previously always the same, so there was no need to distinguish.

How was this patch tested?

new unit test

@jose-torres
Copy link
Contributor Author

@SparkQA
Copy link

SparkQA commented Jun 14, 2018

Test build #91817 has finished for PR 21560 at commit 252f5c9.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 14, 2018

Test build #91816 has finished for PR 21560 at commit 03cc20d.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

* RDD for continuous coalescing. Asynchronously writes all partitions of `prev` into a local
* continuous shuffle, and then reads them in the task thread using `reader`.
*/
class ContinuousCoalesceRDD(var reader: ContinuousShuffleReadRDD, var prev: RDD[InternalRow])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the reader and prev both is var here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are to make it this RDD checkpointable by make this clearable. This raises a good point, I dont think we should make this checkpointable. Rather I suggest this, make these simple vals (well, just remove modifier), and in clearDependencies, just throw an error saying "Checkpoint this RDD is not supported".

We should do this for all the continuous shuffle RDDs.

case Repartition(1, false, child) =>
val isContinuous = child.collectFirst {
case StreamingDataSourceV2Relation(_, _, _, r: ContinuousReader) => r
}.isDefined
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The judgement of whether the plan is continuous or not can be a sperated method and other place can use it?

@xuanyuanking
Copy link
Member

restest this please

@SparkQA
Copy link

SparkQA commented Jun 15, 2018

Test build #91882 has finished for PR 21560 at commit 252f5c9.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@tdas tdas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, this almost looks good. My only high-level comment is that the ContinuousCoalesceRDD should not take ContinuousShuffleReadRDD as a parameter. That is very confusing. It needs a ContinuousShuffleReadRDD only because it is reusing the ContinuousShuffleReadRDD code for coalescing which is an internal implementation detail, and that code should not be in ContinuousCoalesceExec. So move the createion of ContinuousShuffleReadRDD inside the ContinuousCoalesceRDD.

The rest are relatively minor changes.

queueSize: Int,
numShuffleWriters: Int,
epochIntervalMs: Long)
extends Partition {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary

@@ -350,7 +350,14 @@ object UnsupportedOperationChecker {
_: TypedFilter) =>
case node if node.nodeName == "StreamingRelationV2" =>
case node =>
throwError(s"Continuous processing does not support ${node.nodeName} operations.")
val aboveSinglePartitionCoalesce = node.find {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this allow kafkaStreamDF.coalesc(1).select(...).filter(...).agg(...)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And are we allowing all stateful operations after this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will allow the first one, and I've added a test to verify.

It ought to allow the second one, but for some reason streaming deduplicate insists on inserting a shuffle above the coalesce(1). I will address this in a separate PR, since this seems like suboptimal behavior that isn't only restricted to continuous processing. For now I tweaked the condition to only allow aggregates.

}.isDefined

if (!aboveSinglePartitionCoalesce) {
throwError(s"Continuous processing does not support ${node.nodeName} operations.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if this error statement says what is supported. That you can rewrite the query with colesce(1)


val childRdd = child.execute()
val endpointName = s"RPCContinuousShuffleReader-${UUID.randomUUID()}"
val reader = new ContinuousShuffleReadRDD(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: rename to readerRDD to avoid confusion with other v2 reader classes.

}

val threads = prev.partitions.map { prevSplit =>
new Thread() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe use a thread pool (using org...spark.util.ThreadUtils) with a name to track threads. Then the cached threads in threadpool can be reused across epochs.

* RDD for continuous coalescing. Asynchronously writes all partitions of `prev` into a local
* continuous shuffle, and then reads them in the task thread using `reader`.
*/
class ContinuousCoalesceRDD(var reader: ContinuousShuffleReadRDD, var prev: RDD[InternalRow])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are to make it this RDD checkpointable by make this clearable. This raises a good point, I dont think we should make this checkpointable. Rather I suggest this, make these simple vals (well, just remove modifier), and in clearDependencies, just throw an error saying "Checkpoint this RDD is not supported".

We should do this for all the continuous shuffle RDDs.

import org.apache.spark.sql.execution.streaming.continuous.shuffle._

case class ContinuousCoalesceRDDPartition(index: Int) extends Partition {
private[continuous] var writersInitialized: Boolean = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add docs on what this means.

}

override def clearDependencies() {
super.clearDependencies()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As commented above, this should actually throw exception so that this is never checkpointed.

@@ -51,7 +51,7 @@ class ContinuousDataSourceRDD(
sc: SparkContext,
dataQueueSize: Int,
epochPollIntervalMs: Long,
@transient private val readerFactories: Seq[InputPartition[UnsafeRow]])
private val readerFactories: Seq[InputPartition[UnsafeRow]])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since all the partitions do no need all the factories, the right thing to do is to put partition's factory in the partition object. This is so that the all factories are not serialized for all tasks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to be able to generate the full list of partitions from within a single task in order for coalesce to work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait. I dont see the readerFactories object to be used anywhere other than in getPartitions, where they are saved as part of ContinuousDataSourceRDDPartition objects. And RDD.compute() seems to picking it up from ContinuousDataSourceRDDPartition objects, and not from readerFactories. So I dont think readerFactories needs to be serialized.

At the very least, rename readerFactories to readerInputPartitions for consistency.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We list the partitions when computing the coalesce RDD. Should we instead be packing the partitions into the partitions of the coalesce RDD? I'd assumed it was valid to expect that rdd.partitions would work on executors, but maybe it's not.


override def getDependencies: Seq[Dependency[_]] = {
Seq(new NarrowDependency(prev) {
def getParents(id: Int): Seq[Int] = Seq(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should 1 partition of this class depend on all parant RDD partitions, and not just the 0.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I confused myself when looking at the normal coalesce RDD. The default dependency handling is correct here.

@SparkQA
Copy link

SparkQA commented Jun 21, 2018

Test build #92146 has finished for PR 21560 at commit c0f769e.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great overall. (FYI I'm mostly concerned about functional correctness while reviewing.)

Left some comments but they're minors (mostly for completeness but this patch is for intermediate state) so you can skip addressing them.

@@ -61,12 +63,14 @@ class ContinuousShuffleReadRDD(
numPartitions: Int,
queueSize: Int = 1024,
numShuffleWriters: Int = 1,
epochIntervalMs: Long = 1000)
epochIntervalMs: Long = 1000,
val endpointNames: Seq[String] = Seq(s"RPCContinuousShuffleReader-${UUID.randomUUID()}"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here: if possible it might be better to have complete code rather than just working with such assumption.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a default argument to make tests less wordy. I can remove it if you think that's best, but it doesn't impose a restriction.

prev: RDD[InternalRow])
extends RDD[InternalRow](context, Nil) {

override def getPartitions: Array[Partition] = Array(ContinuousCoalesceRDDPartition(0))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are addressing only the specific case that number of partitions is 1, but we could have some assertion for that and try to write complete code so that we don't modify it again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. And since theres an assert (numpartitions == 1) in ContinuousCoalesceExec, we can probably create any array of numPartitions here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've made some changes to try to restrict the assumption that the number of partitions is 1 to two places:

  • ContinuousCoalesceExec
  • The output partitioner in ContinuousCoalesceRDD, since it's not obvious to me what the right strategy to get this would be in the general case. If you have ideas I'm open to removing this too.

extends RDD[UnsafeRow](sc, Nil) {

override protected def getPartitions: Array[Partition] = {
(0 until numPartitions).map { partIndex =>
ContinuousShuffleReadPartition(partIndex, queueSize, numShuffleWriters, epochIntervalMs)
ContinuousShuffleReadPartition(
partIndex, endpointNames(partIndex), queueSize, numShuffleWriters, epochIntervalMs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This effectively asserting numPartitions to be 1, otherwise it will throw exception.

case Repartition(1, false, _) =>
case node: Aggregate =>
val aboveSinglePartitionCoalesce = node.find {
case Repartition(1, false, _) => true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we have multiple repartitions which one meets the case and others are not? I'm not sure we are restricting repartition operations to be only once.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there's any particular reason we need to. There's no reason we couldn't execute multiple repartitions if the optimizer isn't smart enough to combine them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh wait, I see what you mean. Repartition(5, ...) would never be matched by this rule, since it only applies to Aggregate.

@@ -98,6 +98,10 @@ class ContinuousDataSourceRDD(
override def getPreferredLocations(split: Partition): Seq[String] = {
split.asInstanceOf[ContinuousDataSourceRDDPartition].inputPartition.preferredLocations()
}

override def clearDependencies(): Unit = {
throw new IllegalStateException("Continuous RDDs cannot be checkpointed")
Copy link
Contributor

@HeartSaVioR HeartSaVioR Jun 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering the method can be called in normal situation: when continuous query is gracefully terminated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know, I'm unfamiliar with this method. @tdas

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HeartSaVioR No this method no intended for being called in normal circumstance. And less of a reason to call this in an internally generated RDD.

Copy link
Contributor

@arunmahadevan arunmahadevan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall. Left a few comments.

case _ => false
}.isDefined

if (!aboveSinglePartitionCoalesce) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if there was only a single partition to begin with ? Then theres no need of Repartition(1) and this check should be skipped.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that it wouldn't be needed, but partitioning information is not always available during analysis. So I don't think we can write the more granular check suggested here.

case _ => false
}.isDefined

if (!aboveSinglePartitionCoalesce) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also if theres a single parent partition and theres a Repartition(1) that node should probably be removed. Not sure if this is already being done.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(same comment as above applies here - we don't have partitioning information in analysis)

prev: RDD[InternalRow])
extends RDD[InternalRow](context, Nil) {

override def getPartitions: Array[Partition] = Array(ContinuousCoalesceRDDPartition(0))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. And since theres an assert (numpartitions == 1) in ContinuousCoalesceExec, we can probably create any array of numPartitions here.


if (!split.asInstanceOf[ContinuousCoalesceRDDPartition].writersInitialized) {
val rpcEnv = SparkEnv.get.rpcEnv
val outputPartitioner = new HashPartitioner(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I am missing. Is this more like a re-partition (just shuffles) than coalesce?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Repartition would normally imply distributed execution, which isn't happening here.

runnables.foreach(threadPool.execute)
}

readerRDD.compute(readerRDD.partitions(split.index), context)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The writer.write and readerRDD.compute() is going to execute as a separate tasks (but concurrently since there are no stage boundaries) correct?

Copy link
Contributor Author

@jose-torres jose-torres Jun 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, they'll be in the same task. Just different threads.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If its the same task, then do we need the RPC mechanism to pass the rows around ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a queue inside the ContinuousShuffleReadRDD that is buffering all the records that are being sent out by the RPCContinuousShuffleWriter. And the compute function is returning data from that queue.

As I commented above, we dont really need the ContinuousShuffleReadRDD, just the ContinuousShuffleReader

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Also the 2*numShuffle partition threads here looks like an overhead. Maybe ok for now but the CoalesceRDD iterator could just iterate over the parent RDD partitions tracking the epochs, returning the rows and terminating when the epoch is received from all its parents.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it could be made more efficient. Part of the goal here is to ensure that the shuffling does indeed work end-to-end, so we can work on both the shuffle framework and distributed repartitioning in parallel.

@SparkQA
Copy link

SparkQA commented Jun 26, 2018

Test build #92310 has finished for PR 21560 at commit 71a3568.

  • This patch passes all tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@tdas tdas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very close. Just one major refactoring comment.

@@ -51,7 +51,7 @@ class ContinuousDataSourceRDD(
sc: SparkContext,
dataQueueSize: Int,
epochPollIntervalMs: Long,
@transient private val readerFactories: Seq[InputPartition[UnsafeRow]])
private val readerFactories: Seq[InputPartition[UnsafeRow]])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait. I dont see the readerFactories object to be used anywhere other than in getPartitions, where they are saved as part of ContinuousDataSourceRDDPartition objects. And RDD.compute() seems to picking it up from ContinuousDataSourceRDDPartition objects, and not from readerFactories. So I dont think readerFactories needs to be serialized.

At the very least, rename readerFactories to readerInputPartitions for consistency.

override def doExecute(): RDD[InternalRow] = {
assert(numPartitions == 1)

val childRdd = child.execute()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Dont need this variable. And merge remove excess empty lines.

s"ContinuousCoalesceRDD-part$i-${UUID.randomUUID()}"
}

val readerRDD = new ContinuousShuffleReadRDD(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, honestly, you dont need the RDD here. You only need the shuffle reading code, which is the ContinuousShuffleReader and endpoint. So you can just instantiate that in the compute function. Its very confusing to an RDD inside another RDD which is not hooked to the dependency chain.

runnables.foreach(threadPool.execute)
}

readerRDD.compute(readerRDD.partitions(split.index), context)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a queue inside the ContinuousShuffleReadRDD that is buffering all the records that are being sent out by the RPCContinuousShuffleWriter. And the compute function is returning data from that queue.

As I commented above, we dont really need the ContinuousShuffleReadRDD, just the ContinuousShuffleReader

.coalesce(1)
.select('value as 'copy, 'value)
.where('copy =!= 2)
.agg(max('value))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test transformations both before and after coalesce.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You missed this comment.

@SparkQA
Copy link

SparkQA commented Jun 27, 2018

Test build #92360 has finished for PR 21560 at commit 468f134.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ContinuousCoalesceRDDPartition(

@jose-torres
Copy link
Contributor Author

Sorry, that wasn't meant to be a complete push. Added the tests now.

@tdas
Copy link
Contributor

tdas commented Jun 27, 2018

LGTM assuming tests pass.

@SparkQA
Copy link

SparkQA commented Jun 27, 2018

Test build #92387 has finished for PR 21560 at commit f77b12b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@asfgit asfgit closed this in f6e6899 Jun 28, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants