Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
chutium committed Jul 17, 2014
2 parents 094f773 + 1fcd5dc commit c870172
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 107 deletions.
46 changes: 22 additions & 24 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
zeroBuffer.get(zeroArray)

lazy val cachedSerializer = SparkEnv.get.closureSerializer.newInstance()
def createZero() = cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray))
val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray))

combineByKey[U]((v: V) => seqOp(createZero(), v), seqOp, combOp, partitioner)
}
Expand Down Expand Up @@ -171,7 +171,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])

// When deserializing, use a lazy val to create just one instance of the serializer per task
lazy val cachedSerializer = SparkEnv.get.closureSerializer.newInstance()
def createZero() = cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))
val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))

combineByKey[V]((v: V) => func(createZero(), v), func, func, partitioner)
}
Expand Down Expand Up @@ -214,22 +214,22 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
throw new SparkException("reduceByKeyLocally() does not support array keys")
}

def reducePartition(iter: Iterator[(K, V)]): Iterator[JHashMap[K, V]] = {
val reducePartition = (iter: Iterator[(K, V)]) => {
val map = new JHashMap[K, V]
iter.foreach { case (k, v) =>
val old = map.get(k)
map.put(k, if (old == null) v else func(old, v))
}
Iterator(map)
}
} : Iterator[JHashMap[K, V]]

def mergeMaps(m1: JHashMap[K, V], m2: JHashMap[K, V]): JHashMap[K, V] = {
val mergeMaps = (m1: JHashMap[K, V], m2: JHashMap[K, V]) => {
m2.foreach { case (k, v) =>
val old = m1.get(k)
m1.put(k, if (old == null) v else func(old, v))
}
m1
}
} : JHashMap[K, V]

self.mapPartitions(reducePartition).reduce(mergeMaps)
}
Expand Down Expand Up @@ -361,11 +361,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
// groupByKey shouldn't use map side combine because map side combine does not
// reduce the amount of data shuffled and requires all map side data be inserted
// into a hash table, leading to more objects in the old gen.
def createCombiner(v: V) = ArrayBuffer(v)
def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
def mergeCombiners(c1: ArrayBuffer[V], c2: ArrayBuffer[V]) = c1 ++ c2
val createCombiner = (v: V) => ArrayBuffer(v)
val mergeValue = (buf: ArrayBuffer[V], v: V) => buf += v
val mergeCombiners = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => c1 ++ c2
val bufs = combineByKey[ArrayBuffer[V]](
createCombiner _, mergeValue _, mergeCombiners _, partitioner, mapSideCombine=false)
createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine=false)
bufs.mapValues(_.toIterable)
}

Expand Down Expand Up @@ -710,14 +710,14 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
self.partitioner match {
case Some(p) =>
val index = p.getPartition(key)
def process(it: Iterator[(K, V)]): Seq[V] = {
val process = (it: Iterator[(K, V)]) => {
val buf = new ArrayBuffer[V]
for ((k, v) <- it if k == key) {
buf += v
}
buf
}
val res = self.context.runJob(self, process _, Array(index), false)
} : Seq[V]
val res = self.context.runJob(self, process, Array(index), false)
res(0)
case None =>
self.filter(_._1 == key).map(_._2).collect()
Expand Down Expand Up @@ -840,7 +840,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
jobFormat.checkOutputSpecs(job)
}

def writeShard(context: TaskContext, iter: Iterator[(K,V)]): Int = {
val writeShard = (context: TaskContext, iter: Iterator[(K,V)]) => {
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
Expand All @@ -861,19 +861,18 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val (k, v) = iter.next()
writer.write(k, v)
}
}
finally {
} finally {
writer.close(hadoopContext)
}
committer.commitTask(hadoopContext)
return 1
}
1
} : Int

val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0)
val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
jobCommitter.setupJob(jobTaskContext)
self.context.runJob(self, writeShard _)
self.context.runJob(self, writeShard)
jobCommitter.commitJob(jobTaskContext)
}

Expand Down Expand Up @@ -912,7 +911,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val writer = new SparkHadoopWriter(hadoopConf)
writer.preSetup()

def writeToFile(context: TaskContext, iter: Iterator[(K, V)]) {
val writeToFile = (context: TaskContext, iter: Iterator[(K, V)]) => {
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
Expand All @@ -921,19 +920,18 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
writer.open()
try {
var count = 0
while(iter.hasNext) {
while (iter.hasNext) {
val record = iter.next()
count += 1
writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
}
}
finally {
} finally {
writer.close()
}
writer.commit()
}

self.context.runJob(self, writeToFile _)
self.context.runJob(self, writeToFile)
writer.commitJob()
}

Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -328,15 +328,15 @@ abstract class RDD[T: ClassTag](
: RDD[T] = {
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
def distributePartition(index: Int, items: Iterator[T]): Iterator[(Int, T)] = {
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = (new Random(index)).nextInt(numPartitions)
items.map { t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
position = position + 1
(position, t)
}
}
} : Iterator[(Int, T)]

// include a shuffle step so that our upstream tasks are still distributed
new CoalescedRDD(
Expand Down Expand Up @@ -919,19 +919,19 @@ abstract class RDD[T: ClassTag](
throw new SparkException("countByValue() does not support arrays")
}
// TODO: This should perhaps be distributed by default.
def countPartition(iter: Iterator[T]): Iterator[OpenHashMap[T,Long]] = {
val countPartition = (iter: Iterator[T]) => {
val map = new OpenHashMap[T,Long]
iter.foreach {
t => map.changeValue(t, 1L, _ + 1L)
}
Iterator(map)
}
def mergeMaps(m1: OpenHashMap[T,Long], m2: OpenHashMap[T,Long]): OpenHashMap[T,Long] = {
}: Iterator[OpenHashMap[T,Long]]
val mergeMaps = (m1: OpenHashMap[T,Long], m2: OpenHashMap[T,Long]) => {
m2.foreach { case (key, value) =>
m1.changeValue(key, value, _ + value)
}
m1
}
}: OpenHashMap[T,Long]
val myResult = mapPartitions(countPartition).reduce(mergeMaps)
// Convert to a Scala mutable map
val mutableResult = scala.collection.mutable.Map[T,Long]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,15 @@ class FlumeReceiver(

private def initServer() = {
if (enableDecompression) {
val channelFactory = new NioServerSocketChannelFactory
(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
val channelPipelieFactory = new CompressionChannelPipelineFactory()
val channelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool())
val channelPipelineFactory = new CompressionChannelPipelineFactory()

new NettyServer(
responder,
new InetSocketAddress(host, port),
channelFactory,
channelPipelieFactory,
channelFactory,
channelPipelineFactory,
null)
} else {
new NettyServer(responder, new InetSocketAddress(host, port))
Expand Down
159 changes: 87 additions & 72 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,76 +31,91 @@ import com.typesafe.tools.mima.core._
* MimaBuild.excludeSparkClass("graphx.util.collection.GraphXPrimitiveKeyOpenHashMap")
*/
object MimaExcludes {
def excludes(version: String) =
version match {
case v if v.startsWith("1.1") =>
Seq(
MimaBuild.excludeSparkPackage("deploy"),
MimaBuild.excludeSparkPackage("graphx")
) ++
Seq(
// Adding new method to JavaRDLike trait - we should probably mark this as a developer API.
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.api.java.JavaRDDLike.partitions"),
// We made a mistake earlier (ed06500d3) in the Java API to use default parameter values
// for countApproxDistinct* functions, which does not work in Java. We later removed
// them, and use the following to tell Mima to not care about them.
ProblemFilters.exclude[IncompatibleResultTypeProblem](
"org.apache.spark.api.java.JavaPairRDD.countApproxDistinctByKey"),
ProblemFilters.exclude[IncompatibleResultTypeProblem](
"org.apache.spark.api.java.JavaPairRDD.countApproxDistinctByKey"),
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.api.java.JavaPairRDD.countApproxDistinct$default$1"),
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.api.java.JavaPairRDD.countApproxDistinctByKey$default$1"),
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.api.java.JavaRDD.countApproxDistinct$default$1"),
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.api.java.JavaRDDLike.countApproxDistinct$default$1"),
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.api.java.JavaDoubleRDD.countApproxDistinct$default$1"),
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.storage.MemoryStore.Entry"),
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.rdd.PairRDDFunctions.org$apache$spark$rdd$PairRDDFunctions$$"
+ "createZero$1")
) ++
Seq(
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.FlumeReceiver.this")
) ++
Seq( // Ignore some private methods in ALS.
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$^dateFeatures"),
ProblemFilters.exclude[MissingMethodProblem]( // The only public constructor is the one without arguments.
"org.apache.spark.mllib.recommendation.ALS.this"),
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$<init>$default$7")
) ++
MimaBuild.excludeSparkClass("mllib.linalg.distributed.ColumnStatisticsAggregator") ++
MimaBuild.excludeSparkClass("rdd.ZippedRDD") ++
MimaBuild.excludeSparkClass("rdd.ZippedPartition") ++
MimaBuild.excludeSparkClass("util.SerializableHyperLogLog") ++
MimaBuild.excludeSparkClass("storage.Values") ++
MimaBuild.excludeSparkClass("storage.Entry") ++
MimaBuild.excludeSparkClass("storage.MemoryStore$Entry")
case v if v.startsWith("1.0") =>
Seq(
MimaBuild.excludeSparkPackage("api.java"),
MimaBuild.excludeSparkPackage("mllib"),
MimaBuild.excludeSparkPackage("streaming")
) ++
MimaBuild.excludeSparkClass("rdd.ClassTags") ++
MimaBuild.excludeSparkClass("util.XORShiftRandom") ++
MimaBuild.excludeSparkClass("graphx.EdgeRDD") ++
MimaBuild.excludeSparkClass("graphx.VertexRDD") ++
MimaBuild.excludeSparkClass("graphx.impl.GraphImpl") ++
MimaBuild.excludeSparkClass("graphx.impl.RoutingTable") ++
MimaBuild.excludeSparkClass("graphx.util.collection.PrimitiveKeyOpenHashMap") ++
MimaBuild.excludeSparkClass("graphx.util.collection.GraphXPrimitiveKeyOpenHashMap") ++
MimaBuild.excludeSparkClass("mllib.recommendation.MFDataGenerator") ++
MimaBuild.excludeSparkClass("mllib.optimization.SquaredGradient") ++
MimaBuild.excludeSparkClass("mllib.regression.RidgeRegressionWithSGD") ++
MimaBuild.excludeSparkClass("mllib.regression.LassoWithSGD") ++
MimaBuild.excludeSparkClass("mllib.regression.LinearRegressionWithSGD")
case _ => Seq()
}

def excludes(version: String) = version match {
case v if v.startsWith("1.1") =>
Seq(
MimaBuild.excludeSparkPackage("deploy"),
MimaBuild.excludeSparkPackage("graphx")
) ++
closures.map(method => ProblemFilters.exclude[MissingMethodProblem](method)) ++
Seq(
// Adding new method to JavaRDLike trait - we should probably mark this as a developer API.
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.api.java.JavaRDDLike.partitions"),
// We made a mistake earlier (ed06500d3) in the Java API to use default parameter values
// for countApproxDistinct* functions, which does not work in Java. We later removed
// them, and use the following to tell Mima to not care about them.
ProblemFilters.exclude[IncompatibleResultTypeProblem](
"org.apache.spark.api.java.JavaPairRDD.countApproxDistinctByKey"),
ProblemFilters.exclude[IncompatibleResultTypeProblem](
"org.apache.spark.api.java.JavaPairRDD.countApproxDistinctByKey"),
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.api.java.JavaPairRDD.countApproxDistinct$default$1"),
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.api.java.JavaPairRDD.countApproxDistinctByKey$default$1"),
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.api.java.JavaRDD.countApproxDistinct$default$1"),
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.api.java.JavaRDDLike.countApproxDistinct$default$1"),
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.api.java.JavaDoubleRDD.countApproxDistinct$default$1"),
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.storage.MemoryStore.Entry"),
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.rdd.PairRDDFunctions.org$apache$spark$rdd$PairRDDFunctions$$"
+ "createZero$1")
) ++
Seq(
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.FlumeReceiver.this")
) ++
Seq( // Ignore some private methods in ALS.
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$^dateFeatures"),
ProblemFilters.exclude[MissingMethodProblem]( // The only public constructor is the one without arguments.
"org.apache.spark.mllib.recommendation.ALS.this"),
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$<init>$default$7")
) ++
MimaBuild.excludeSparkClass("mllib.linalg.distributed.ColumnStatisticsAggregator") ++
MimaBuild.excludeSparkClass("rdd.ZippedRDD") ++
MimaBuild.excludeSparkClass("rdd.ZippedPartition") ++
MimaBuild.excludeSparkClass("util.SerializableHyperLogLog") ++
MimaBuild.excludeSparkClass("storage.Values") ++
MimaBuild.excludeSparkClass("storage.Entry") ++
MimaBuild.excludeSparkClass("storage.MemoryStore$Entry")
case v if v.startsWith("1.0") =>
Seq(
MimaBuild.excludeSparkPackage("api.java"),
MimaBuild.excludeSparkPackage("mllib"),
MimaBuild.excludeSparkPackage("streaming")
) ++
MimaBuild.excludeSparkClass("rdd.ClassTags") ++
MimaBuild.excludeSparkClass("util.XORShiftRandom") ++
MimaBuild.excludeSparkClass("graphx.EdgeRDD") ++
MimaBuild.excludeSparkClass("graphx.VertexRDD") ++
MimaBuild.excludeSparkClass("graphx.impl.GraphImpl") ++
MimaBuild.excludeSparkClass("graphx.impl.RoutingTable") ++
MimaBuild.excludeSparkClass("graphx.util.collection.PrimitiveKeyOpenHashMap") ++
MimaBuild.excludeSparkClass("graphx.util.collection.GraphXPrimitiveKeyOpenHashMap") ++
MimaBuild.excludeSparkClass("mllib.recommendation.MFDataGenerator") ++
MimaBuild.excludeSparkClass("mllib.optimization.SquaredGradient") ++
MimaBuild.excludeSparkClass("mllib.regression.RidgeRegressionWithSGD") ++
MimaBuild.excludeSparkClass("mllib.regression.LassoWithSGD") ++
MimaBuild.excludeSparkClass("mllib.regression.LinearRegressionWithSGD")
case _ => Seq()
}

private val closures = Seq(
"org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$mergeMaps$1",
"org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$countPartition$1",
"org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$distributePartition$1",
"org.apache.spark.rdd.PairRDDFunctions.org$apache$spark$rdd$PairRDDFunctions$$mergeValue$1",
"org.apache.spark.rdd.PairRDDFunctions.org$apache$spark$rdd$PairRDDFunctions$$writeToFile$1",
"org.apache.spark.rdd.PairRDDFunctions.org$apache$spark$rdd$PairRDDFunctions$$reducePartition$1",
"org.apache.spark.rdd.PairRDDFunctions.org$apache$spark$rdd$PairRDDFunctions$$writeShard$1",
"org.apache.spark.rdd.PairRDDFunctions.org$apache$spark$rdd$PairRDDFunctions$$mergeCombiners$1",
"org.apache.spark.rdd.PairRDDFunctions.org$apache$spark$rdd$PairRDDFunctions$$process$1",
"org.apache.spark.rdd.PairRDDFunctions.org$apache$spark$rdd$PairRDDFunctions$$createCombiner$1",
"org.apache.spark.rdd.PairRDDFunctions.org$apache$spark$rdd$PairRDDFunctions$$mergeMaps$1"
)
}

0 comments on commit c870172

Please sign in to comment.