Skip to content

Commit

Permalink
Tests reading and writing data using writers now use Minicluster.
Browse files Browse the repository at this point in the history
Conflicts:
	streaming/pom.xml
	streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala
	streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
  • Loading branch information
harishreedharan committed Oct 22, 2014
1 parent 5c70d1f commit edcbee1
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 72 deletions.
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,12 @@
<artifactId>akka-slf4j_${scala.binary.version}</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
<version>${hadoop.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${akka.group}</groupId>
<artifactId>akka-testkit_${scala.binary.version}</artifactId>
Expand Down
5 changes: 5 additions & 0 deletions streaming/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@
<artifactId>junit-interface</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@ private[streaming] object HdfsUtils {
// HDFS is not thread-safe when getFileSystem is called, so synchronize on that

val dfsPath = new Path(path)
val dfs =
this.synchronized {
dfsPath.getFileSystem(conf)
}
val dfs = this.synchronized {
dfsPath.getFileSystem(conf)
}
// If the file exists and we have append support, append instead of creating a new file
val stream: FSDataOutputStream = {
if (dfs.isFile(dfsPath)) {
Expand All @@ -54,17 +53,16 @@ private[streaming] object HdfsUtils {
}

def checkState(state: Boolean, errorMsg: => String) {
if(!state) {
if (!state) {
throw new IllegalStateException(errorMsg)
}
}

def getBlockLocations(path: String, conf: Configuration): Option[Array[String]] = {
val dfsPath = new Path(path)
val dfs =
this.synchronized {
dfsPath.getFileSystem(conf)
}
val dfs = this.synchronized {
dfsPath.getFileSystem(conf)
}
val fileStatus = dfs.getFileStatus(dfsPath)
val blockLocs = Option(dfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen))
blockLocs.map(_.flatMap(_.getHosts))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,6 @@ private[streaming] class WriteAheadLogManager(
pastLogs ++= logFileInfo
logInfo(s"Recovered ${logFileInfo.size} write ahead log files from $logDirectory")
logDebug(s"Recovered files are:\n${logFileInfo.map(_.path).mkString("\n")}")
} else {
fileSystem.mkdirs(logDirectoryPath,
FsPermission.createImmutable(Integer.parseInt("770", 8).toShort))
logInfo(s"Created ${logDirectory} for write ahead log files")
}
}

Expand Down
Loading

0 comments on commit edcbee1

Please sign in to comment.