Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-1316. Remove use of Commons IO #226

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,6 @@
<artifactId>derby</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,10 @@ private[spark] object Utils extends Logging {
}
}
if (!file.delete()) {
throw new IOException("Failed to delete: " + file)
// Delete can also fail if the file simply did not exist
if (file.exists()) {
throw new IOException("Failed to delete: " + file.getAbsolutePath)
}
}
}

Expand Down
3 changes: 1 addition & 2 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import java.nio.{ByteBuffer, ByteOrder}

import com.google.common.base.Charsets
import com.google.common.io.Files
import org.apache.commons.io.FileUtils
import org.scalatest.FunSuite

class UtilsSuite extends FunSuite {
Expand Down Expand Up @@ -136,7 +135,7 @@ class UtilsSuite extends FunSuite {
// Read some nonexistent bytes on both ends
assert(Utils.offsetBytes(f1Path, -3, 25) === "1\n2\n3\n4\n5\n6\n7\n8\n9\n")

FileUtils.deleteDirectory(tmpDir2)
Utils.deleteRecursively(tmpDir2)
}

test("deserialize long value") {
Expand Down
5 changes: 0 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -435,11 +435,6 @@
<version>1.9.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
Expand Down
8 changes: 2 additions & 6 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,7 @@ object SparkBuild extends Build {
"org.scalacheck" %% "scalacheck" % "1.10.0" % "test",
"com.novocode" % "junit-interface" % "0.10" % "test",
"org.easymock" % "easymock" % "3.1" % "test",
"org.mockito" % "mockito-all" % "1.8.5" % "test",
"commons-io" % "commons-io" % "2.4" % "test"
"org.mockito" % "mockito-all" % "1.8.5" % "test"
),

testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"),
Expand Down Expand Up @@ -439,10 +438,7 @@ object SparkBuild extends Build {

def streamingSettings = sharedSettings ++ Seq(
name := "spark-streaming",
previousArtifact := sparkPreviousArtifact("spark-streaming"),
libraryDependencies ++= Seq(
"commons-io" % "commons-io" % "2.4"
)
previousArtifact := sparkPreviousArtifact("spark-streaming")
)

def yarnCommonSettings = sharedSettings ++ Seq(
Expand Down
4 changes: 0 additions & 4 deletions streaming/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,6 @@
<artifactId>junit-interface</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import scala.reflect.ClassTag

import java.io.{File, ObjectInputStream, IOException}
import java.nio.charset.Charset
import java.util.UUID

import com.google.common.io.Files

import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs.{FileUtil, FileSystem, Path}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration


Expand Down Expand Up @@ -389,7 +389,7 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
val localFile = new File(localTestDir, (i + 1).toString)
val hadoopFile = new Path(testDir, (i + 1).toString)
val tempHadoopFile = new Path(testDir, ".tmp_" + (i + 1).toString)
FileUtils.writeStringToFile(localFile, input(i).toString + "\n")
Files.write(input(i) + "\n", localFile, Charset.forName("UTF-8"))
var tries = 0
var done = false
while (!done && tries < maxTries) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,17 @@
package org.apache.spark.streaming

import java.io.File
import java.nio.charset.Charset

import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
import org.apache.commons.io.FileUtils
import com.google.common.io.Files
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.hadoop.conf.Configuration
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.{DStream, FileInputDStream}
import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.util.Utils
import org.apache.spark.SparkConf

/**
* This test suites tests the checkpointing functionality of DStreams -
Expand All @@ -46,13 +45,13 @@ class CheckpointSuite extends TestSuiteBase {

override def beforeFunction() {
super.beforeFunction()
FileUtils.deleteDirectory(new File(checkpointDir))
Utils.deleteRecursively(new File(checkpointDir))
}

override def afterFunction() {
super.afterFunction()
if (ssc != null) ssc.stop()
FileUtils.deleteDirectory(new File(checkpointDir))
Utils.deleteRecursively(new File(checkpointDir))
}

test("basic rdd checkpoints + dstream graph checkpoint recovery") {
Expand Down Expand Up @@ -256,7 +255,7 @@ class CheckpointSuite extends TestSuiteBase {
//var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
Thread.sleep(1000)
for (i <- Seq(1, 2, 3)) {
FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
// wait to make sure that the file is written such that it gets shown in the file listings
Thread.sleep(1000)
}
Expand All @@ -273,7 +272,7 @@ class CheckpointSuite extends TestSuiteBase {

// Create files while the master is down
for (i <- Seq(4, 5, 6)) {
FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
Thread.sleep(1000)
}

Expand All @@ -289,7 +288,7 @@ class CheckpointSuite extends TestSuiteBase {
// Restart stream computation
ssc.start()
for (i <- Seq(7, 8, 9)) {
FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
Thread.sleep(1000)
}
Thread.sleep(1000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,9 @@ package org.apache.spark.streaming

import org.apache.spark.Logging
import org.apache.spark.streaming.util.MasterFailureTest
import StreamingContext._
import org.apache.spark.util.Utils

import org.scalatest.{FunSuite, BeforeAndAfter}
import com.google.common.io.Files
import java.io.File
import org.apache.commons.io.FileUtils
import collection.mutable.ArrayBuffer


/**
* This testsuite tests master failures at random times while the stream is running using
Expand All @@ -43,12 +38,12 @@ class FailureSuite extends TestSuiteBase with Logging {

override def beforeFunction() {
super.beforeFunction()
FileUtils.deleteDirectory(new File(directory))
Utils.deleteRecursively(new File(directory))
}

override def afterFunction() {
super.afterFunction()
FileUtils.deleteDirectory(new File(directory))
Utils.deleteRecursively(new File(directory))
}

test("multiple failures with map") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,23 @@ import akka.actor.IOManager
import akka.actor.Props
import akka.util.ByteString

import org.apache.spark.streaming.dstream.{NetworkReceiver}
import java.net.{InetSocketAddress, SocketException, Socket, ServerSocket}
import java.io.{File, BufferedWriter, OutputStreamWriter}
import java.net.{InetSocketAddress, SocketException, ServerSocket}
import java.nio.charset.Charset
import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue}
import collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import util.ManualClock
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}

import com.google.common.io.Files
import org.scalatest.BeforeAndAfter

import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.NetworkReceiver
import org.apache.spark.streaming.receivers.Receiver
import org.apache.spark.Logging
import scala.util.Random
import org.apache.commons.io.FileUtils
import org.scalatest.BeforeAndAfter
import collection.JavaConversions._
import com.google.common.io.Files
import java.util.concurrent.atomic.AtomicInteger
import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.util.Utils

class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {

Expand Down Expand Up @@ -112,7 +114,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
Thread.sleep(1000)
for (i <- 0 until input.size) {
val file = new File(testDir, i.toString)
FileUtils.writeStringToFile(file, input(i).toString + "\n")
Files.write(input(i) + "\n", file, Charset.forName("UTF-8"))
logInfo("Created file " + file)
Thread.sleep(batchDuration.milliseconds)
Thread.sleep(1000)
Expand All @@ -136,7 +138,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// (whether the elements were received one in each interval is not verified)
assert(output.toList === expectedOutput.toList)

FileUtils.deleteDirectory(testDir)
Utils.deleteRecursively(testDir)

// Enable manual clock back again for other tests
conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
Expand Down