From b08d4f179ee8ae096a6319087b273c6454baf964 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 1 May 2015 16:05:11 -0700 Subject: [PATCH 1/2] Fix flaky WALBackedBlockRDDSuite --- .../streaming/rdd/WriteAheadLogBackedBlockRDD.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala index ebdf418f4ab6a..b081aa3ce431e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala @@ -16,7 +16,9 @@ */ package org.apache.spark.streaming.rdd +import java.io.File import java.nio.ByteBuffer +import java.util.UUID import scala.reflect.ClassTag import scala.util.control.NonFatal @@ -108,9 +110,13 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( // writing log data. However, the directory is not needed if data needs to be read, hence // a dummy path is provided to satisfy the method parameter requirements. // FileBasedWriteAheadLog will not create any file or directory at that path. - val dummyDirectory = FileUtils.getTempDirectoryPath() + // FileBasedWriteAheadLog will not create any file or directory at that path. Also, + // this dummy directory should not already exist otherwise the WAL will try to recover + // past events from the directory and throw errors. + val nonExistentDirectory = new File( + FileUtils.getTempDirectory(), UUID.randomUUID().toString).getAbsolutePath writeAheadLog = WriteAheadLogUtils.createLogForReceiver( - SparkEnv.get.conf, dummyDirectory, hadoopConf) + SparkEnv.get.conf, nonExistentDirectory, hadoopConf) dataRead = writeAheadLog.read(partition.walRecordHandle) } catch { case NonFatal(e) => From 141afd520c04772abc980ea313be4c7fb3439652 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 1 May 2015 17:51:14 -0700 Subject: [PATCH 2/2] Removed use of FileUtils --- .../spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala index b081aa3ce431e..f4c8046e8a1a8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala @@ -23,8 +23,6 @@ import java.util.UUID import scala.reflect.ClassTag import scala.util.control.NonFatal -import org.apache.commons.io.FileUtils - import org.apache.spark._ import org.apache.spark.rdd.BlockRDD import org.apache.spark.storage.{BlockId, StorageLevel} @@ -114,7 +112,7 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( // this dummy directory should not already exist otherwise the WAL will try to recover // past events from the directory and throw errors. val nonExistentDirectory = new File( - FileUtils.getTempDirectory(), UUID.randomUUID().toString).getAbsolutePath + System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString).getAbsolutePath writeAheadLog = WriteAheadLogUtils.createLogForReceiver( SparkEnv.get.conf, nonExistentDirectory, hadoopConf) dataRead = writeAheadLog.read(partition.walRecordHandle)