From 1d413ce877a67379a0a74afefba071c018b0ca70 Mon Sep 17 00:00:00 2001 From: Doris Xin Date: Mon, 9 Jun 2014 14:14:26 -0700 Subject: [PATCH] fixed checkstyle issues --- .../scala/org/apache/spark/rdd/PairRDDFunctions.scala | 8 ++++---- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 11 +++++++---- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 53219d1a94b79..5fa4ce751b545 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -266,10 +266,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) } val combOp = (r1: Result, r2: Result) => { - //take union of both key sets in case one partion doesn't contain all keys + // take union of both key sets in case one partion doesn't contain all keys val keyUnion = r1.resultMap.keys.toSet.union(r2.resultMap.keys.toSet) - //Use r2 to keep the combined result since r1 is usual empty + // Use r2 to keep the combined result since r1 is usual empty for (key <- keyUnion) { val entry1 = r1.resultMap.get(key) val entry2 = r2.resultMap.get(key) @@ -286,7 +286,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) val zeroU = new Result(Map[K, Stratum]()) - //determine threshold for each stratum and resample + // determine threshold for each stratum and resample val finalResult = self.aggregateWithContext(zeroU)(seqOp, combOp).resultMap val thresholdByKey = new mutable.HashMap[K, Double]() for ((key, stratum) <- finalResult) { @@ -330,7 +330,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) // Bernoulli sampler self.mapPartitionsWithIndex((idx: Int, iter: Iterator[(K, V)]) => { val random = new RandomDataGenerator - random.reSeed(seed+idx) + random.reSeed(seed + idx) iter.filter(t => random.nextUniform(0.0, 1.0) < thresholdByKey.get(t._1).get) }, preservesPartitioning = true) } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 76418283486aa..41889c96dd28c 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -884,15 +884,18 @@ abstract class RDD[T: ClassTag]( * A version of {@link #aggregate()} that passes the TaskContext to the function that does * aggregation for each partition. */ - def aggregateWithContext[U: ClassTag](zeroValue: U)(seqOp: ((TaskContext, U), T) => U, combOp: (U, U) => U): U = { + def aggregateWithContext[U: ClassTag](zeroValue: U)(seqOp: ((TaskContext, U), T) => U, + combOp: (U, U) => U): U = { // Clone the zero value since we will also be serializing it as part of tasks var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance()) - //pad seqOp and combOp with taskContext to conform to aggregate's signature in TraversableOnce + // pad seqOp and combOp with taskContext to conform to aggregate's signature in TraversableOnce val paddedSeqOp = (arg1: (TaskContext, U), item: T) => (arg1._1, seqOp(arg1, item)) - val paddedcombOp = (arg1 : (TaskContext, U), arg2: (TaskContext, U)) => (arg1._1, combOp(arg1._2, arg1._2)) + val paddedcombOp = (arg1 : (TaskContext, U), arg2: (TaskContext, U)) => + (arg1._1, combOp(arg1._2, arg1._2)) val cleanSeqOp = sc.clean(paddedSeqOp) val cleanCombOp = sc.clean(paddedcombOp) - val aggregatePartition = (tc: TaskContext, it: Iterator[T]) => (it.aggregate(tc, zeroValue)(cleanSeqOp, cleanCombOp))._2 + val aggregatePartition = (tc: TaskContext, it: Iterator[T]) => + (it.aggregate(tc, zeroValue)(cleanSeqOp, cleanCombOp))._2 val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult) sc.runJob(this, aggregatePartition, mergeResult) jobResult