Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24795][CORE] Implement barrier execution mode #21758

Closed
wants to merge 14 commits into from
42 changes: 42 additions & 0 deletions core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

import org.apache.spark.annotation.{Experimental, Since}

/** A [[TaskContext]] with extra info and tooling for a barrier stage. */
trait BarrierTaskContext extends TaskContext {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check the generated JavaDoc. I think it becomes a Java interface with only two methods defined here. We might want to define class BarrierTaskContext directly.


/**
* :: Experimental ::
* Sets a global barrier and waits until all tasks in this stage hit this barrier. Similar to
* MPI_Barrier function in MPI, the barrier() function call blocks until all tasks in the same
* stage have reached this routine.
*/
@Experimental
@Since("2.4.0")
def barrier(): Unit

/**
* :: Experimental ::
* Returns the all task infos in this barrier stage, the task infos are ordered by partitionId.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a particular reason why they must be ordered by partitionId?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The major reason is that each tasks within the same barrier stage may need to communicate with each other, we order the task infos by partitionId so a task can find its peer tasks by index.

*/
@Experimental
@Since("2.4.0")
def getTaskInfos(): Array[BarrierTaskInfo]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what other things do you expect to be included in the future in BarrierTaskInfo? It seems overkill to have a new class for a single field (address).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

}
49 changes: 49 additions & 0 deletions core/src/main/scala/org/apache/spark/BarrierTaskContextImpl.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

import java.util.Properties

import org.apache.spark.executor.TaskMetrics
import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.metrics.MetricsSystem

/** A [[BarrierTaskContext]] implementation. */
private[spark] class BarrierTaskContextImpl(
override val stageId: Int,
override val stageAttemptNumber: Int,
override val partitionId: Int,
override val taskAttemptId: Long,
override val attemptNumber: Int,
override val taskMemoryManager: TaskMemoryManager,
localProperties: Properties,
@transient private val metricsSystem: MetricsSystem,
// The default value is only used in tests.
override val taskMetrics: TaskMetrics = TaskMetrics.empty)
extends TaskContextImpl(stageId, stageAttemptNumber, partitionId, taskAttemptId, attemptNumber,
taskMemoryManager, localProperties, metricsSystem, taskMetrics)
with BarrierTaskContext {

// TODO SPARK-24817 implement global barrier.
override def barrier(): Unit = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto. Please provide JIRA link with TODO.


override def getTaskInfos(): Array[BarrierTaskInfo] = {
val addressesStr = localProperties.getProperty("addresses", "")
addressesStr.split(",").map(_.trim()).map(new BarrierTaskInfo(_))
}
}
31 changes: 31 additions & 0 deletions core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

import org.apache.spark.annotation.{Experimental, Since}


/**
* :: Experimental ::
* Carries all task infos of a barrier task.
*
* @param address the IPv4 address(host:port) of the executor that a barrier task is running on
*/
@Experimental
@Since("2.4.0")
class BarrierTaskInfo(val address: String)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need param doc, to say that address is IP v4 address.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this need to be a public API?

Copy link
Contributor Author

@jiangxb1987 jiangxb1987 Jul 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We make this a public API because the BarrierTaskContext.getTaskInfos() will return a list of BarrierTaskInfos, so users have to access the class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just bake address into TaskInfo?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or is TaskIinfo not a public API?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't mind to make TaskInfo a public API then I think it shall be fine to just put address into TaskInfo. The major concern is TaskInfo have been stable for a long time and do we want to potentially make frequent changes to it? (e.g. may add more variables useful for barrier tasks, though I don't really have an example at hand)

12 changes: 12 additions & 0 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,18 @@ private[spark] class MapOutputTrackerMaster(
}
}

/** Unregister all map output information of the given shuffle. */
def unregisterAllMapOutput(shuffleId: Int) {
shuffleStatuses.get(shuffleId) match {
case Some(shuffleStatus) =>
shuffleStatus.removeOutputsByFilter(x => true)
incrementEpoch()
case None =>
throw new SparkException(
s"unregisterAllMapOutput called for nonexistent shuffle ID $shuffleId.")
}
}

/** Unregister shuffle data */
def unregisterShuffle(shuffleId: Int) {
shuffleStatuses.remove(shuffleId).foreach { shuffleStatus =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import org.apache.spark.{Partition, TaskContext}
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
var prev: RDD[T],
f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator)
preservesPartitioning: Boolean = false)
preservesPartitioning: Boolean = false,
isFromBarrier: Boolean = false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should explain what this flag does in classdoc

extends RDD[U](prev) {

override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
Expand All @@ -41,4 +42,7 @@ private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
super.clearDependencies()
prev = null
}

@transient protected lazy override val isBarrier_ : Boolean =
isFromBarrier || dependencies.exists(_.rdd.isBarrier())
}
24 changes: 23 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.hadoop.mapred.TextOutputFormat

import org.apache.spark._
import org.apache.spark.Partitioner._
import org.apache.spark.annotation.{DeveloperApi, Since}
import org.apache.spark.annotation.{DeveloperApi, Experimental, Since}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.internal.Logging
import org.apache.spark.partial.BoundedDouble
Expand Down Expand Up @@ -1647,6 +1647,14 @@ abstract class RDD[T: ClassTag](
}
}

/**
* :: Experimental ::
* Indicates that Spark must launch the tasks together for the current stage.
*/
@Experimental
@Since("2.4.0")
def barrier(): RDDBarrier[T] = withScope(new RDDBarrier[T](this))
Copy link
Contributor

@squito squito Jul 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scheduling seems to have a very hard requirement that the number of partitions is less than the number of available task slots. It seems really hard for users to get this right. Eg., if I just do

sc.textFile(...).barrier().mapPartitions()

the number of partitions is based on the hdfs input splits. I see lots of users getting confused by this -- it'll work sometimes, won't work other times, and they won't know why. Should there be some automatic repartitioning based on cluster resources? Or at least an api which lets users do this? Even repartition() isn't great here, because users dont' want to think about cluster resources.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Em, thanks for raising this question. IMO we indeed require users be aware of how many tasks they may launch for a barrier stage, and tasks may exchange internal data between each other in the middle, so users really care about the task numbers. I agree it shall be very useful to enable specify the number of tasks in a barrier stage, maybe we can have RDDBarrier.coalesce(numPartitions: Int) to enforce the number of tasks to be launched together in a barrier stage.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this addressed at all? is there another jira for it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


// =======================================================================
// Other internal methods and fields
// =======================================================================
Expand Down Expand Up @@ -1839,6 +1847,20 @@ abstract class RDD[T: ClassTag](
def toJavaRDD() : JavaRDD[T] = {
new JavaRDD(this)(elementClassTag)
}

/**
* Whether the RDD is in a barrier stage. Spark must launch all the tasks at the same time for a
* barrier stage.
*
* An RDD is in a barrier stage, if at least one of its parent RDD(s), or itself, are mapped from
* a RDDBarrier. This function always returns false for a [[ShuffledRDD]], since a
* [[ShuffledRDD]] indicates start of a new stage.
*/
private[spark] def isBarrier(): Boolean = isBarrier_

// From performance concern, cache the value to avoid repeatedly compute `isBarrier()` on a long
// RDD chain.
@transient protected lazy val isBarrier_ : Boolean = dependencies.exists(_.rdd.isBarrier())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you need to explain why mappartitionsrdd has a different isBarrier implementation.

}


Expand Down
52 changes: 52 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.rdd

import scala.reflect.ClassTag

import org.apache.spark.BarrierTaskContext
import org.apache.spark.TaskContext
import org.apache.spark.annotation.{Experimental, Since}

/** Represents an RDD barrier, which forces Spark to launch tasks of this stage together. */
class RDDBarrier[T: ClassTag](rdd: RDD[T]) {

/**
* :: Experimental ::
* Maps partitions together with a provided BarrierTaskContext.
*
* `preservesPartitioning` indicates whether the input function preserves the partitioner, which
* should be `false` unless `rdd` is a pair RDD and the input function doesn't modify the keys.
*/
@Experimental
@Since("2.4.0")
def mapPartitions[S: ClassTag](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the only thing you can do on this is mapPartitions, is there any particular reason its divided into two calls barrier().mapPartititons(), instead of just barrierMapPartitions() or something? Are there more things planned here?

I can users expecting the ability to be able to call other functions after .barrier(), eg. barrier().reduceByKey() or something. the compiler will help with this, but just wondering if we can make it more obvious.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RDDBarrier is actually expected to be used like a builder, we shall provide more options for the barrier stage in the future, eg. config a timeout of a barrier stage.

f: (Iterator[T], BarrierTaskContext) => Iterator[S],
preservesPartitioning: Boolean = false): RDD[S] = rdd.withScope {
val cleanedF = rdd.sparkContext.clean(f)
new MapPartitionsRDD(
rdd,
(context: TaskContext, index: Int, iter: Iterator[T]) =>
cleanedF(iter, context.asInstanceOf[BarrierTaskContext]),
preservesPartitioning,
isFromBarrier = true
)
}

/** TODO extra conf(e.g. timeout) */
}
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,6 @@ class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](
super.clearDependencies()
prev = null
}

private[spark] override def isBarrier(): Boolean = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,10 @@ private[spark] class ActiveJob(
val finished = Array.fill[Boolean](numPartitions)(false)

var numFinished = 0

// Mark all the partitions of the stage to be not finished.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use /** */ style. also the sentence is a bit awkward. perhaps

"Resets the status of all partitions in this stage so they are marked as not finished."

def resetAllPartitions(): Unit = {
(0 until numPartitions).foreach(finished.update(_, false))
numFinished = 0
}
}
Loading