Skip to content

Commit

Permalink
Re-enable guarding of commit coordination with spark.speculation sett…
Browse files Browse the repository at this point in the history
…ing.
  • Loading branch information
JoshRosen committed Feb 4, 2015
1 parent ede7590 commit 3969f5f
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 7 deletions.
7 changes: 1 addition & 6 deletions core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
// attempts, which should only occur if speculation is enabled
val speculationEnabled = sparkConf.getBoolean("spark.speculation", false)
// This (undocumented) setting is an escape-hatch in case the commit code introduces bugs
// sparkConf.getBoolean("spark.hadoop.outputCommitCoordination.enabled", speculationEnabled)

// TODO: revert this before merging the PR; this is enabled so this code path is exercised
// by more tests (even though it might not be _necessary_, it should be _safe_ to do the
// extra coordination)
true
sparkConf.getBoolean("spark.hadoop.outputCommitCoordination.enabled", speculationEnabled)
}
if (shouldCoordinateWithDriver) {
val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ class OutputCommitCoordinatorSuite extends FunSuite with BeforeAndAfter {

before {
tempDir = Utils.createTempDir()
sc = new SparkContext("local[4]", classOf[OutputCommitCoordinatorSuite].getSimpleName) {
val conf = new SparkConf()
.setMaster("local[4]")
.setAppName(classOf[OutputCommitCoordinatorSuite].getSimpleName)
.set("spark.speculation", "true")
sc = new SparkContext(conf) {
override private[spark] def createSparkEnv(
conf: SparkConf,
isLocal: Boolean,
Expand Down

0 comments on commit 3969f5f

Please sign in to comment.