From f135a8ec6744e555a5d5d1ee1fdc71acf8d451d7 Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 22 Jan 2015 09:49:08 -0800 Subject: [PATCH] Moving the output commit coordinator from class into method. The SparkHadoopWriter wasn't serializable because the commit coordinator was a class member. It doesn't need to be, so just get it on demand when committing the task. --- core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala index f238a17b464db..a4d13c972c01a 100644 --- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala @@ -45,8 +45,6 @@ class SparkHadoopWriter(@transient jobConf: JobConf) private val now = new Date() private val conf = new SerializableWritable(jobConf) - private val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator - private var jobID = 0 private var splitID = 0 private var attemptID = 0 @@ -109,6 +107,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf) val taCtxt = getTaskContext() val cmtr = getOutputCommitter() if (cmtr.needsTaskCommit(taCtxt)) { + val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator val conf = SparkEnv.get.conf val timeout = AkkaUtils.askTimeout(conf) val maxAttempts = AkkaUtils.numRetries(conf)