Skip to content

Commit

Permalink
Merge branch 'master' of git://git.apache.org/spark into timeline-vie…
Browse files Browse the repository at this point in the history
…wer-feature
  • Loading branch information
sarutak committed Apr 8, 2015
2 parents fcdab7d + 15e0d2b commit dec85db
Show file tree
Hide file tree
Showing 63 changed files with 338 additions and 289 deletions.
12 changes: 11 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1901,7 +1901,17 @@ object SparkContext extends Logging {

private[spark] val SPARK_JOB_INTERRUPT_ON_CANCEL = "spark.job.interruptOnCancel"

private[spark] val DRIVER_IDENTIFIER = "<driver>"
/**
* Executor id for the driver. In earlier versions of Spark, this was `<driver>`, but this was
* changed to `driver` because the angle brackets caused escaping issues in URLs and XML (see
* SPARK-6716 for more details).
*/
private[spark] val DRIVER_IDENTIFIER = "driver"

/**
* Legacy version of DRIVER_IDENTIFIER, retained for backwards-compatibility.
*/
private[spark] val LEGACY_DRIVER_IDENTIFIER = "<driver>"

// The following deprecated objects have already been copied to `object AccumulatorParam` to
// make the compiler find them automatically. They are duplicate codes only for backward
Expand Down
29 changes: 0 additions & 29 deletions core/src/main/scala/org/apache/spark/TaskContextHelper.scala

This file was deleted.

2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)

/* Find out driver status then exit the JVM */
def pollAndReportStatus(driverId: String) {
println(s"... waiting before polling master for driver state")
println("... waiting before polling master for driver state")
Thread.sleep(5000)
println("... polling master for driver state")
val statusFuture = (masterActor ? RequestDriverStatus(driverId))(timeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ private[deploy] class StandaloneRestClient extends Logging {
}
} else {
val failMessage = Option(submitResponse.message).map { ": " + _ }.getOrElse("")
logError("Application submission failed" + failMessage)
logError(s"Application submission failed$failMessage")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector,
buffer.get(bytes)
bytes.foreach(x => print(x + " "))
buffer.position(curPosition)
print(" (" + bytes.size + ")")
print(" (" + bytes.length + ")")
}

def printBuffer(buffer: ByteBuffer, position: Int, length: Int) {
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
}
result
},
Range(0, self.partitions.size),
Range(0, self.partitions.length),
(index: Int, data: Long) => totalCount.addAndGet(data),
totalCount.get())
}
Expand All @@ -54,8 +54,8 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
* Returns a future for retrieving all elements of this RDD.
*/
def collectAsync(): FutureAction[Seq[T]] = {
val results = new Array[Array[T]](self.partitions.size)
self.context.submitJob[T, Array[T], Seq[T]](self, _.toArray, Range(0, self.partitions.size),
val results = new Array[Array[T]](self.partitions.length)
self.context.submitJob[T, Array[T], Seq[T]](self, _.toArray, Range(0, self.partitions.length),
(index, data) => results(index) = data, results.flatten.toSeq)
}

Expand Down Expand Up @@ -111,15 +111,15 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
*/
def foreachAsync(f: T => Unit): FutureAction[Unit] = {
val cleanF = self.context.clean(f)
self.context.submitJob[T, Unit, Unit](self, _.foreach(cleanF), Range(0, self.partitions.size),
self.context.submitJob[T, Unit, Unit](self, _.foreach(cleanF), Range(0, self.partitions.length),
(index, data) => Unit, Unit)
}

/**
* Applies a function f to each partition of this RDD.
*/
def foreachPartitionAsync(f: Iterator[T] => Unit): FutureAction[Unit] = {
self.context.submitJob[T, Unit, Unit](self, f, Range(0, self.partitions.size),
self.context.submitJob[T, Unit, Unit](self, f, Range(0, self.partitions.length),
(index, data) => Unit, Unit)
}
}
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds

override def getPartitions: Array[Partition] = {
assertValid()
(0 until blockIds.size).map(i => {
(0 until blockIds.length).map(i => {
new BlockRDDPartition(blockIds(i), i).asInstanceOf[Partition]
}).toArray
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ class CartesianRDD[T: ClassTag, U: ClassTag](
extends RDD[Pair[T, U]](sc, Nil)
with Serializable {

val numPartitionsInRdd2 = rdd2.partitions.size
val numPartitionsInRdd2 = rdd2.partitions.length

override def getPartitions: Array[Partition] = {
// create the cross product split
val array = new Array[Partition](rdd1.partitions.size * rdd2.partitions.size)
val array = new Array[Partition](rdd1.partitions.length * rdd2.partitions.length)
for (s1 <- rdd1.partitions; s2 <- rdd2.partitions) {
val idx = s1.index * numPartitionsInRdd2 + s2.index
array(idx) = new CartesianPartition(idx, rdd1, rdd2, s1.index, s2.index)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String)
if (fs.exists(cpath)) {
val dirContents = fs.listStatus(cpath).map(_.getPath)
val partitionFiles = dirContents.filter(_.getName.startsWith("part-")).map(_.toString).sorted
val numPart = partitionFiles.size
val numPart = partitionFiles.length
if (numPart > 0 && (! partitionFiles(0).endsWith(CheckpointRDD.splitIdToFile(0)) ||
! partitionFiles(numPart-1).endsWith(CheckpointRDD.splitIdToFile(numPart-1)))) {
throw new SparkException("Invalid checkpoint directory: " + checkpointPath)
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:

override def getPartitions: Array[Partition] = {
val array = new Array[Partition](part.numPartitions)
for (i <- 0 until array.size) {
for (i <- 0 until array.length) {
// Each CoGroupPartition will have a dependency per contributing RDD
array(i) = new CoGroupPartition(i, rdds.zipWithIndex.map { case (rdd, j) =>
// Assume each RDD contributed a single dependency, and get it
Expand All @@ -120,7 +120,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
val sparkConf = SparkEnv.get.conf
val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true)
val split = s.asInstanceOf[CoGroupPartition]
val numRdds = split.deps.size
val numRdds = split.deps.length

// A list of (rdd iterator, dependency number) pairs
val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack:

// determines the tradeoff between load-balancing the partitions sizes and their locality
// e.g. balanceSlack=0.10 means that it allows up to 10% imbalance in favor of locality
val slack = (balanceSlack * prev.partitions.size).toInt
val slack = (balanceSlack * prev.partitions.length).toInt

var noLocality = true // if true if no preferredLocations exists for parent RDD

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
@Experimental
def meanApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = {
val processPartition = (ctx: TaskContext, ns: Iterator[Double]) => StatCounter(ns)
val evaluator = new MeanEvaluator(self.partitions.size, confidence)
val evaluator = new MeanEvaluator(self.partitions.length, confidence)
self.context.runApproximateJob(self, processPartition, evaluator, timeout)
}

Expand All @@ -81,7 +81,7 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
@Experimental
def sumApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = {
val processPartition = (ctx: TaskContext, ns: Iterator[Double]) => StatCounter(ns)
val evaluator = new SumEvaluator(self.partitions.size, confidence)
val evaluator = new SumEvaluator(self.partitions.length, confidence)
self.context.runApproximateJob(self, processPartition, evaluator, timeout)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class OrderedRDDFunctions[K : Ordering : ClassTag,
* order of the keys).
*/
// TODO: this currently doesn't work on P other than Tuple2!
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size)
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)] =
{
val part = new RangePartitioner(numPartitions, self, ascending)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* RDD will be <= us.
*/
def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)] =
subtractByKey(other, self.partitioner.getOrElse(new HashPartitioner(self.partitions.size)))
subtractByKey(other, self.partitioner.getOrElse(new HashPartitioner(self.partitions.length)))

/** Return an RDD with the pairs from `this` whose keys are not in `other`. */
def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)] =
Expand Down
22 changes: 11 additions & 11 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ abstract class RDD[T: ClassTag](
/**
* Return a new RDD containing the distinct elements in this RDD.
*/
def distinct(): RDD[T] = distinct(partitions.size)
def distinct(): RDD[T] = distinct(partitions.length)

/**
* Return a new RDD that has exactly numPartitions partitions.
Expand Down Expand Up @@ -488,7 +488,7 @@ abstract class RDD[T: ClassTag](
def sortBy[K](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.size)
numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] =
this.keyBy[K](f)
.sortByKey(ascending, numPartitions)
Expand Down Expand Up @@ -852,7 +852,7 @@ abstract class RDD[T: ClassTag](
* RDD will be &lt;= us.
*/
def subtract(other: RDD[T]): RDD[T] =
subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.size)))
subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.length)))

/**
* Return an RDD with the elements from `this` that are not in `other`.
Expand Down Expand Up @@ -986,14 +986,14 @@ abstract class RDD[T: ClassTag](
combOp: (U, U) => U,
depth: Int = 2): U = {
require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.")
if (partitions.size == 0) {
if (partitions.length == 0) {
return Utils.clone(zeroValue, context.env.closureSerializer.newInstance())
}
val cleanSeqOp = context.clean(seqOp)
val cleanCombOp = context.clean(combOp)
val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
var partiallyAggregated = mapPartitions(it => Iterator(aggregatePartition(it)))
var numPartitions = partiallyAggregated.partitions.size
var numPartitions = partiallyAggregated.partitions.length
val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2)
// If creating an extra level doesn't help reduce the wall-clock time, we stop tree aggregation.
while (numPartitions > scale + numPartitions / scale) {
Expand Down Expand Up @@ -1026,7 +1026,7 @@ abstract class RDD[T: ClassTag](
}
result
}
val evaluator = new CountEvaluator(partitions.size, confidence)
val evaluator = new CountEvaluator(partitions.length, confidence)
sc.runApproximateJob(this, countElements, evaluator, timeout)
}

Expand Down Expand Up @@ -1061,7 +1061,7 @@ abstract class RDD[T: ClassTag](
}
map
}
val evaluator = new GroupedCountEvaluator[T](partitions.size, confidence)
val evaluator = new GroupedCountEvaluator[T](partitions.length, confidence)
sc.runApproximateJob(this, countPartition, evaluator, timeout)
}

Expand Down Expand Up @@ -1140,7 +1140,7 @@ abstract class RDD[T: ClassTag](
* the same index assignments, you should sort the RDD with sortByKey() or save it to a file.
*/
def zipWithUniqueId(): RDD[(T, Long)] = {
val n = this.partitions.size.toLong
val n = this.partitions.length.toLong
this.mapPartitionsWithIndex { case (k, iter) =>
iter.zipWithIndex.map { case (item, i) =>
(item, i * n + k)
Expand Down Expand Up @@ -1243,7 +1243,7 @@ abstract class RDD[T: ClassTag](
queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
Iterator.single(queue)
}
if (mapRDDs.partitions.size == 0) {
if (mapRDDs.partitions.length == 0) {
Array.empty
} else {
mapRDDs.reduce { (queue1, queue2) =>
Expand Down Expand Up @@ -1489,7 +1489,7 @@ abstract class RDD[T: ClassTag](
}
// The first RDD in the dependency stack has no parents, so no need for a +-
def firstDebugString(rdd: RDD[_]): Seq[String] = {
val partitionStr = "(" + rdd.partitions.size + ")"
val partitionStr = "(" + rdd.partitions.length + ")"
val leftOffset = (partitionStr.length - 1) / 2
val nextPrefix = (" " * leftOffset) + "|" + (" " * (partitionStr.length - leftOffset))

Expand All @@ -1499,7 +1499,7 @@ abstract class RDD[T: ClassTag](
} ++ debugChildren(rdd, nextPrefix)
}
def shuffleDebugString(rdd: RDD[_], prefix: String = "", isLastChild: Boolean): Seq[String] = {
val partitionStr = "(" + rdd.partitions.size + ")"
val partitionStr = "(" + rdd.partitions.length + ")"
val leftOffset = (partitionStr.length - 1) / 2
val thisPrefix = prefix.replaceAll("\\|\\s+$", "")
val nextPrefix = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,10 @@ private[spark] class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
new SerializableWritable(rdd.context.hadoopConfiguration))
rdd.context.runJob(rdd, CheckpointRDD.writeToFile[T](path.toString, broadcastedConf) _)
val newRDD = new CheckpointRDD[T](rdd.context, path.toString)
if (newRDD.partitions.size != rdd.partitions.size) {
if (newRDD.partitions.length != rdd.partitions.length) {
throw new SparkException(
"Checkpoint RDD " + newRDD + "(" + newRDD.partitions.size + ") has different " +
"number of partitions than original RDD " + rdd + "(" + rdd.partitions.size + ")")
"Checkpoint RDD " + newRDD + "(" + newRDD.partitions.length + ") has different " +
"number of partitions than original RDD " + rdd + "(" + rdd.partitions.length + ")")
}

// Change the dependencies and partitions of the RDD
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag](

override def getPartitions: Array[Partition] = {
val array = new Array[Partition](part.numPartitions)
for (i <- 0 until array.size) {
for (i <- 0 until array.length) {
// Each CoGroupPartition will depend on rdd1 and rdd2
array(i) = new CoGroupPartition(i, Seq(rdd1, rdd2).zipWithIndex.map { case (rdd, j) =>
dependencies(j) match {
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class UnionRDD[T: ClassTag](
extends RDD[T](sc, Nil) { // Nil since we implement getDependencies

override def getPartitions: Array[Partition] = {
val array = new Array[Partition](rdds.map(_.partitions.size).sum)
val array = new Array[Partition](rdds.map(_.partitions.length).sum)
var pos = 0
for ((rdd, rddIndex) <- rdds.zipWithIndex; split <- rdd.partitions) {
array(pos) = new UnionPartition(pos, rdd, rddIndex, split.index)
Expand All @@ -76,8 +76,8 @@ class UnionRDD[T: ClassTag](
val deps = new ArrayBuffer[Dependency[_]]
var pos = 0
for (rdd <- rdds) {
deps += new RangeDependency(rdd, 0, pos, rdd.partitions.size)
pos += rdd.partitions.size
deps += new RangeDependency(rdd, 0, pos, rdd.partitions.length)
pos += rdd.partitions.length
}
deps
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ private[spark] abstract class ZippedPartitionsBaseRDD[V: ClassTag](
if (preservesPartitioning) firstParent[Any].partitioner else None

override def getPartitions: Array[Partition] = {
val numParts = rdds.head.partitions.size
if (!rdds.forall(rdd => rdd.partitions.size == numParts)) {
val numParts = rdds.head.partitions.length
if (!rdds.forall(rdd => rdd.partitions.length == numParts)) {
throw new IllegalArgumentException("Can't zip RDDs with unequal numbers of partitions")
}
Array.tabulate[Partition](numParts) { i =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class ZippedWithIndexRDD[T: ClassTag](@transient prev: RDD[T]) extends RDD[(T, L

/** The start index of each partition. */
@transient private val startIndices: Array[Long] = {
val n = prev.partitions.size
val n = prev.partitions.length
if (n == 0) {
Array[Long]()
} else if (n == 1) {
Expand Down
Loading

0 comments on commit dec85db

Please sign in to comment.