Skip to content

Commit

Permalink
Moving the output commit coordinator from class into method.
Browse files Browse the repository at this point in the history
The SparkHadoopWriter wasn't serializable because the commit coordinator
was a class member. It doesn't need to be, so just get it on demand when
committing the task.
  • Loading branch information
mccheah committed Jan 22, 2015
1 parent 1c2b219 commit f135a8e
Showing 1 changed file with 1 addition and 2 deletions.
3 changes: 1 addition & 2 deletions core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
private val now = new Date()
private val conf = new SerializableWritable(jobConf)

private val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator

private var jobID = 0
private var splitID = 0
private var attemptID = 0
Expand Down Expand Up @@ -109,6 +107,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
val taCtxt = getTaskContext()
val cmtr = getOutputCommitter()
if (cmtr.needsTaskCommit(taCtxt)) {
val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
val conf = SparkEnv.get.conf
val timeout = AkkaUtils.askTimeout(conf)
val maxAttempts = AkkaUtils.numRetries(conf)
Expand Down

0 comments on commit f135a8e

Please sign in to comment.