Skip to content

Commit

Permalink
Merge pull request apache#68 from shivaram/master
Browse files Browse the repository at this point in the history
Delete temporary files after they are used
  • Loading branch information
concretevitamin committed Jul 16, 2014
2 parents 5881da7 + 90e2083 commit 715275f
Showing 1 changed file with 23 additions and 5 deletions.
28 changes: 23 additions & 5 deletions pkg/src/src/main/scala/edu/berkeley/cs/amplab/sparkr/RRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ private class PairwiseRRDD[T: ClassTag](

RRDD.startStderrThread(proc)

RRDD.startStdinThread(rLibDir, proc, hashFunc, dataSerialized,
val tempFile = RRDD.startStdinThread(rLibDir, proc, hashFunc, dataSerialized,
functionDependencies, packageNames, broadcastVars,
firstParent[T].iterator(split, context), numPartitions,
split.index)
Expand Down Expand Up @@ -77,7 +77,15 @@ private class PairwiseRRDD[T: ClassTag](
}
var _nextObj = read()

def hasNext = !(_nextObj._1 == 0 && _nextObj._2.length == 0)
def hasNext(): Boolean = {
val hasMore = !(_nextObj._1 == 0 && _nextObj._2.length == 0)
if (!hasMore) {
// Delete the temporary file we created as we are done reading it
dataStream.close()
tempFile.delete()
}
hasMore
}
}
}

Expand Down Expand Up @@ -107,7 +115,7 @@ class RRDD[T: ClassTag](
RRDD.startStderrThread(proc)

// Write -1 in numPartitions to indicate this is a normal RDD
RRDD.startStdinThread(rLibDir, proc, func, dataSerialized,
val tempFile = RRDD.startStdinThread(rLibDir, proc, func, dataSerialized,
functionDependencies, packageNames, broadcastVars,
firstParent[T].iterator(split, context),
numPartitions = -1, split.index)
Expand Down Expand Up @@ -147,7 +155,15 @@ class RRDD[T: ClassTag](
}
var _nextObj = read()

def hasNext = _nextObj.length != 0
def hasNext(): Boolean = {
val hasMore = _nextObj.length != 0
if (!hasMore) {
// Delete the temporary file we created as we are done reading it
dataStream.close()
tempFile.delete()
}
hasMore
}
}
}

Expand Down Expand Up @@ -225,7 +241,7 @@ object RRDD {
broadcastVars: Array[Broadcast[Object]],
iter: Iterator[T],
numPartitions: Int,
splitIndex: Int) {
splitIndex: Int) : File = {

val tempDir =
System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")).split(',')(0)
Expand Down Expand Up @@ -288,5 +304,7 @@ object RRDD {
stream.close()
}
}.start()

tempFile
}
}

0 comments on commit 715275f

Please sign in to comment.