Skip to content

Commit

Permalink
Reduce scope of some variables in Exchange
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed May 8, 2015
1 parent 899e1d7 commit f305ff3
Showing 1 changed file with 5 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,6 @@ case class Exchange(

override def output: Seq[Attribute] = child.output

private val sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]

private val bypassMergeThreshold =
child.sqlContext.sparkContext.conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)

private val serializeMapOutputs =
child.sqlContext.sparkContext.conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true)

/**
* Determines whether records must be defensively copied before being sent to the shuffle.
* Several of Spark's shuffle components will buffer deserialized Java objects in memory. The
Expand All @@ -91,7 +83,11 @@ case class Exchange(
// Note: even though we only use the partitioner's `numPartitions` field, we require it to be
// passed instead of directly passing the number of partitions in order to guard against
// corner-cases where a partitioner constructed with `numPartitions` partitions may output
// fewer partitions (like RangeParittioner, for example).
// fewer partitions (like RangePartitioner, for example).
val conf = child.sqlContext.sparkContext.conf
val sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
val bypassMergeThreshold = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
val serializeMapOutputs = conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true)
if (newOrdering.nonEmpty) {
// If a new ordering is required, then records will be sorted with Spark's `ExternalSorter`,
// which requires a defensive copy.
Expand Down

0 comments on commit f305ff3

Please sign in to comment.