Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into pin-pages
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Jan 14, 2016
2 parents 7265784 + bcc7373 commit 2fb8c89
Show file tree
Hide file tree
Showing 169 changed files with 3,369 additions and 1,005 deletions.
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
(New BSD license) Protocol Buffer Java API (org.spark-project.protobuf:protobuf-java:2.4.1-shaded - http://code.google.com/p/protobuf)
(The BSD License) Fortran to Java ARPACK (net.sourceforge.f2j:arpack_combined_all:0.1 - http://f2j.sourceforge.net)
(The BSD License) xmlenc Library (xmlenc:xmlenc:0.52 - http://xmlenc.sourceforge.net)
(The New BSD License) Py4J (net.sf.py4j:py4j:0.9 - http://py4j.sourceforge.net/)
(The New BSD License) Py4J (net.sf.py4j:py4j:0.9.1 - http://py4j.sourceforge.net/)
(Two-clause BSD-style license) JUnit-Interface (com.novocode:junit-interface:0.10 - http://github.com/szeiger/junit-interface/)
(BSD licence) sbt and sbt-launch-lib.bash
(BSD 3 Clause) d3.min.js (https://github.com/mbostock/d3/blob/master/LICENSE)
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1173,7 +1173,7 @@ test_that("group by, agg functions", {

expect_equal(3, count(mean(gd)))
expect_equal(3, count(max(gd)))
expect_equal(30, collect(max(gd))[1, 2])
expect_equal(30, collect(max(gd))[2, 2])
expect_equal(1, collect(count(gd))[1, 2])

mockLines2 <- c("{\"name\":\"ID1\", \"value\": \"10\"}",
Expand Down
2 changes: 1 addition & 1 deletion bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export PYSPARK_PYTHON

# Add the PySpark classes to the Python path:
export PYTHONPATH="${SPARK_HOME}/python/:$PYTHONPATH"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.9-src.zip:$PYTHONPATH"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.9.1-src.zip:$PYTHONPATH"

# Load the PySpark shell.py script when ./pyspark is used interactively:
export OLD_PYTHONSTARTUP="$PYTHONSTARTUP"
Expand Down
2 changes: 1 addition & 1 deletion bin/pyspark2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ if "x%PYSPARK_DRIVER_PYTHON%"=="x" (
)

set PYTHONPATH=%SPARK_HOME%\python;%PYTHONPATH%
set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.9-src.zip;%PYTHONPATH%
set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.9.1-src.zip;%PYTHONPATH%

set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
set PYTHONSTARTUP=%SPARK_HOME%\python\pyspark\shell.py
Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@
<dependency>
<groupId>net.sf.py4j</groupId>
<artifactId>py4j</artifactId>
<version>0.9</version>
<version>0.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
final File file = tempShuffleBlockIdPlusFile._2();
final BlockId blockId = tempShuffleBlockIdPlusFile._1();
partitionWriters[i] =
blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics).open();
blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
}
// Creating the file to write to and creating a disk writer both involve interacting with
// the disk, and can take a long time in aggregate when we open many files, so should be
Expand Down Expand Up @@ -185,16 +185,19 @@ private long[] writePartitionedFile(File outputFile) throws IOException {
boolean threwException = true;
try {
for (int i = 0; i < numPartitions; i++) {
final FileInputStream in = new FileInputStream(partitionWriters[i].fileSegment().file());
boolean copyThrewException = true;
try {
lengths[i] = Utils.copyStream(in, out, false, transferToEnabled);
copyThrewException = false;
} finally {
Closeables.close(in, copyThrewException);
}
if (!partitionWriters[i].fileSegment().file().delete()) {
logger.error("Unable to delete file for partition {}", i);
final File file = partitionWriters[i].fileSegment().file();
if (file.exists()) {
final FileInputStream in = new FileInputStream(file);
boolean copyThrewException = true;
try {
lengths[i] = Utils.copyStream(in, out, false, transferToEnabled);
copyThrewException = false;
} finally {
Closeables.close(in, copyThrewException);
}
if (!file.delete()) {
logger.error("Unable to delete file for partition {}", i);
}
}
}
threwException = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,10 @@ public UnsafeInMemorySorter(
* Free the memory used by pointer array.
*/
public void free() {
consumer.freeArray(array);
array = null;
if (consumer != null) {
consumer.freeArray(array);
array = null;
}
}

public void reset() {
Expand Down
5 changes: 0 additions & 5 deletions core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,6 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
logInfo(s"Partition $key not found, computing it")
val computedValues = rdd.computeOrReadCheckpoint(partition, context)

// If the task is running locally, do not persist the result
if (context.isRunningLocally) {
return computedValues
}

// Otherwise, cache the values and keep track of any updates in block statuses
val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/TaskContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,9 @@ abstract class TaskContext extends Serializable {

/**
* Returns true if the task is running locally in the driver program.
* @return
* @return false
*/
@deprecated("Local execution was removed, so this always returns false", "2.0.0")
def isRunningLocally(): Boolean

/**
Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/org/apache/spark/TaskContextImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ private[spark] class TaskContextImpl(
override val taskMemoryManager: TaskMemoryManager,
@transient private val metricsSystem: MetricsSystem,
internalAccumulators: Seq[Accumulator[Long]],
val runningLocally: Boolean = false,
val taskMetrics: TaskMetrics = TaskMetrics.empty)
extends TaskContext
with Logging {
Expand Down Expand Up @@ -85,7 +84,7 @@ private[spark] class TaskContextImpl(

override def isCompleted(): Boolean = completed

override def isRunningLocally(): Boolean = runningLocally
override def isRunningLocally(): Boolean = false

override def isInterrupted(): Boolean = interrupted

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ private[spark] object PythonUtils {
val pythonPath = new ArrayBuffer[String]
for (sparkHome <- sys.env.get("SPARK_HOME")) {
pythonPath += Seq(sparkHome, "python", "lib", "pyspark.zip").mkString(File.separator)
pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.9-src.zip").mkString(File.separator)
pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.9.1-src.zip").mkString(File.separator)
}
pythonPath ++= SparkContext.jarOfObject(this)
pythonPath.mkString(File.pathSeparator)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class CartesianRDD[T: ClassTag, U: ClassTag](
sc: SparkContext,
var rdd1 : RDD[T],
var rdd2 : RDD[U])
extends RDD[Pair[T, U]](sc, Nil)
extends RDD[(T, U)](sc, Nil)
with Serializable {

val numPartitionsInRdd2 = rdd2.partitions.length
Expand Down
16 changes: 15 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ abstract class RDD[T: ClassTag](

/** Construct an RDD with just a one-to-one dependency on one parent */
def this(@transient oneParent: RDD[_]) =
this(oneParent.context , List(new OneToOneDependency(oneParent)))
this(oneParent.context, List(new OneToOneDependency(oneParent)))

private[spark] def conf = sc.conf
// =======================================================================
Expand Down Expand Up @@ -970,6 +970,13 @@ abstract class RDD[T: ClassTag](
* apply the fold to each element sequentially in some defined ordering. For functions
* that are not commutative, the result may differ from that of a fold applied to a
* non-distributed collection.
*
* @param zeroValue the initial value for the accumulated result of each partition for the `op`
* operator, and also the initial value for the combine results from different
* partitions for the `op` operator - this will typically be the neutral
* element (e.g. `Nil` for list concatenation or `0` for summation)
* @param op an operator used to both accumulate results within a partition and combine results
* from different partitions
*/
def fold(zeroValue: T)(op: (T, T) => T): T = withScope {
// Clone the zero value since we will also be serializing it as part of tasks
Expand All @@ -988,6 +995,13 @@ abstract class RDD[T: ClassTag](
* and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are
* allowed to modify and return their first argument instead of creating a new U to avoid memory
* allocation.
*
* @param zeroValue the initial value for the accumulated result of each partition for the
* `seqOp` operator, and also the initial value for the combine results from
* different partitions for the `combOp` operator - this will typically be the
* neutral element (e.g. `Nil` for list concatenation or `0` for summation)
* @param seqOp an operator used to accumulate results within a partition
* @param combOp an associative operator used to combine results from different partitions
*/
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
// Clone the zero value since we will also be serializing it as part of tasks
Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ private[spark] abstract class Task[T](
attemptNumber,
taskMemoryManager,
metricsSystem,
internalAccumulators,
runningLocally = false)
internalAccumulators)
TaskContext.setTaskContext(context)
context.taskMetrics.setHostname(Utils.localHostName())
context.taskMetrics.setAccumulatorsUpdater(context.collectInternalAccumulators)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,13 @@ private[v1] class ExecutorListResource(ui: SparkUI) {
@GET
def executorList(): Seq[ExecutorSummary] = {
val listener = ui.executorsListener
val storageStatusList = listener.storageStatusList
(0 until storageStatusList.size).map { statusId =>
ExecutorsPage.getExecInfo(listener, statusId)
listener.synchronized {
// The follow codes should be protected by `listener` to make sure no executors will be
// removed before we query their status. See SPARK-12784.
val storageStatusList = listener.storageStatusList
(0 until storageStatusList.size).map { statusId =>
ExecutorsPage.getExecInfo(listener, statusId)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class StageData private[spark](
val status: StageStatus,
val stageId: Int,
val attemptId: Int,
val numActiveTasks: Int ,
val numActiveTasks: Int,
val numCompleteTasks: Int,
val numFailedTasks: Int,

Expand Down
13 changes: 10 additions & 3 deletions core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,19 @@ private[ui] class ExecutorsPage(
private val listener = parent.listener

def render(request: HttpServletRequest): Seq[Node] = {
val storageStatusList = listener.storageStatusList
val (storageStatusList, execInfo) = listener.synchronized {
// The follow codes should be protected by `listener` to make sure no executors will be
// removed before we query their status. See SPARK-12784.
val _storageStatusList = listener.storageStatusList
val _execInfo = {
for (statusId <- 0 until _storageStatusList.size)
yield ExecutorsPage.getExecInfo(listener, statusId)
}
(_storageStatusList, _execInfo)
}
val maxMem = storageStatusList.map(_.maxMem).sum
val memUsed = storageStatusList.map(_.memUsed).sum
val diskUsed = storageStatusList.map(_.diskUsed).sum
val execInfo = for (statusId <- 0 until storageStatusList.size) yield
ExecutorsPage.getExecInfo(listener, statusId)
val execInfoSorted = execInfo.sortBy(_.id)
val logsExist = execInfo.filter(_.executorLogs.nonEmpty).nonEmpty

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.util.logging

import java.io.{File, FileOutputStream, InputStream}
import java.io.{File, FileOutputStream, InputStream, IOException}

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.util.{IntParam, Utils}
Expand Down Expand Up @@ -58,20 +58,28 @@ private[spark] class FileAppender(inputStream: InputStream, file: File, bufferSi
protected def appendStreamToFile() {
try {
logDebug("Started appending thread")
openFile()
val buf = new Array[Byte](bufferSize)
var n = 0
while (!markedForStop && n != -1) {
n = inputStream.read(buf)
if (n != -1) {
appendToFile(buf, n)
Utils.tryWithSafeFinally {
openFile()
val buf = new Array[Byte](bufferSize)
var n = 0
while (!markedForStop && n != -1) {
try {
n = inputStream.read(buf)
} catch {
// An InputStream can throw IOException during read if the stream is closed
// asynchronously, so once appender has been flagged to stop these will be ignored
case _: IOException if markedForStop => // do nothing and proceed to stop appending
}
if (n > 0) {
appendToFile(buf, n)
}
}
} {
closeFile()
}
} catch {
case e: Exception =>
logError(s"Error writing stream to file $file", e)
} finally {
closeFile()
}
}

Expand Down
9 changes: 0 additions & 9 deletions core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,6 @@ class CacheManagerSuite extends SparkFunSuite with LocalSparkContext with Before
assert(value.toList === List(5, 6, 7))
}

test("get uncached local rdd") {
// Local computation should not persist the resulting value, so don't expect a put().
when(blockManager.get(RDDBlockId(0, 0))).thenReturn(None)

val context = new TaskContextImpl(0, 0, 0, 0, null, null, Seq.empty, runningLocally = true)
val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
assert(value.toList === List(1, 2, 3, 4))
}

test("verify task metrics updated correctly") {
cacheManager = sc.env.cacheManager
val context = TaskContext.empty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class DoubleRDDSuite extends SparkFunSuite with SharedSparkContext {
test("WorksWithOutOfRangeWithInfiniteBuckets") {
// Verify that out of range works with two buckets
val rdd = sc.parallelize(Seq(10.01, -0.01, Double.NaN))
val buckets = Array(-1.0/0.0 , 0.0, 1.0/0.0)
val buckets = Array(-1.0/0.0, 0.0, 1.0/0.0)
val histogramResults = rdd.histogram(buckets)
val expectedHistogramResults = Array(1, 1)
assert(histogramResults === expectedHistogramResults)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi

test("check spark-class location correctly") {
val conf = new SparkConf
conf.set("spark.mesos.executor.home" , "/mesos-home")
conf.set("spark.mesos.executor.home", "/mesos-home")

val listenerBus = mock[LiveListenerBus]
listenerBus.post(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte
new Answer[(TempShuffleBlockId, File)] {
override def answer(invocation: InvocationOnMock): (TempShuffleBlockId, File) = {
val blockId = new TempShuffleBlockId(UUID.randomUUID)
val file = File.createTempFile(blockId.toString, null, tempDir)
val file = new File(tempDir, blockId.name)
blockIdToFileMap.put(blockId, file)
temporaryFilesCreated.append(file)
(blockId, file)
Expand Down Expand Up @@ -166,6 +166,7 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte
writer.stop( /* success = */ true)
assert(temporaryFilesCreated.nonEmpty)
assert(writer.getPartitionLengths.sum === outputFile.length())
assert(writer.getPartitionLengths.filter(_ == 0L).size === 4) // should be 4 zero length files
assert(temporaryFilesCreated.count(_.exists()) === 0) // check that temporary files were deleted
val shuffleWriteMetrics = taskContext.taskMetrics().shuffleWriteMetrics.get
assert(shuffleWriteMetrics.shuffleBytesWritten === outputFile.length())
Expand All @@ -174,6 +175,41 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte
assert(taskMetrics.memoryBytesSpilled === 0)
}

test("only generate temp shuffle file for non-empty partition") {
// Using exception to test whether only non-empty partition creates temp shuffle file,
// because temp shuffle file will only be cleaned after calling stop(false) in the failure
// case, so we could use it to validate the temp shuffle files.
def records: Iterator[(Int, Int)] =
Iterator((1, 1), (5, 5)) ++
(0 until 100000).iterator.map { i =>
if (i == 99990) {
throw new SparkException("intentional failure")
} else {
(2, 2)
}
}

val writer = new BypassMergeSortShuffleWriter[Int, Int](
blockManager,
blockResolver,
shuffleHandle,
0, // MapId
taskContext,
conf
)

intercept[SparkException] {
writer.write(records)
}

assert(temporaryFilesCreated.nonEmpty)
// Only 3 temp shuffle files will be created
assert(temporaryFilesCreated.count(_.exists()) === 3)

writer.stop( /* success = */ false)
assert(temporaryFilesCreated.count(_.exists()) === 0) // check that temporary files were deleted
}

test("cleanup of intermediate files after errors") {
val writer = new BypassMergeSortShuffleWriter[Int, Int](
blockManager,
Expand Down
Loading

0 comments on commit 2fb8c89

Please sign in to comment.