Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/spark into https
Browse files Browse the repository at this point in the history
  • Loading branch information
scwf committed Oct 7, 2014
2 parents 2dadb2f + 69c3f44 commit 8b32853
Show file tree
Hide file tree
Showing 41 changed files with 640 additions and 372 deletions.
12 changes: 6 additions & 6 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -779,20 +779,20 @@ class SparkContext(config: SparkConf) extends Logging {
/**
* Create an [[org.apache.spark.Accumulable]] shared variable, to which tasks can add values
* with `+=`. Only the driver can access the accumuable's `value`.
* @tparam T accumulator type
* @tparam R type that can be added to the accumulator
* @tparam R accumulator result type
* @tparam T type that can be added to the accumulator
*/
def accumulable[T, R](initialValue: T)(implicit param: AccumulableParam[T, R]) =
def accumulable[R, T](initialValue: R)(implicit param: AccumulableParam[R, T]) =
new Accumulable(initialValue, param)

/**
* Create an [[org.apache.spark.Accumulable]] shared variable, with a name for display in the
* Spark UI. Tasks can add values to the accumuable using the `+=` operator. Only the driver can
* access the accumuable's `value`.
* @tparam T accumulator type
* @tparam R type that can be added to the accumulator
* @tparam R accumulator result type
* @tparam T type that can be added to the accumulator
*/
def accumulable[T, R](initialValue: T, name: String)(implicit param: AccumulableParam[T, R]) =
def accumulable[R, T](initialValue: R, name: String)(implicit param: AccumulableParam[R, T]) =
new Accumulable(initialValue, param, Some(name))

/**
Expand Down
25 changes: 15 additions & 10 deletions core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
Original file line number Diff line number Diff line change
Expand Up @@ -163,18 +163,23 @@ private[broadcast] object HttpBroadcast extends Logging {

private def write(id: Long, value: Any) {
val file = getFile(id)
val out: OutputStream = {
if (compress) {
compressionCodec.compressedOutputStream(new FileOutputStream(file))
} else {
new BufferedOutputStream(new FileOutputStream(file), bufferSize)
val fileOutputStream = new FileOutputStream(file)
try {
val out: OutputStream = {
if (compress) {
compressionCodec.compressedOutputStream(fileOutputStream)
} else {
new BufferedOutputStream(fileOutputStream, bufferSize)
}
}
val ser = SparkEnv.get.serializer.newInstance()
val serOut = ser.serializeStream(out)
serOut.writeObject(value)
serOut.close()
files += file
} finally {
fileOutputStream.close()
}
val ser = SparkEnv.get.serializer.newInstance()
val serOut = ser.serializeStream(out)
serOut.writeObject(value)
serOut.close()
files += file
}

private def read[T: ClassTag](id: Long): T = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,21 @@ private[spark] class FileSystemPersistenceEngine(
val serialized = serializer.toBinary(value)

val out = new FileOutputStream(file)
out.write(serialized)
out.close()
try {
out.write(serialized)
} finally {
out.close()
}
}

def deserializeFromFile[T](file: File)(implicit m: Manifest[T]): T = {
val fileData = new Array[Byte](file.length().asInstanceOf[Int])
val dis = new DataInputStream(new FileInputStream(file))
dis.readFully(fileData)
dis.close()
try {
dis.readFully(fileData)
} finally {
dis.close()
}

val clazz = m.runtimeClass.asInstanceOf[Class[T]]
val serializer = serialization.serializerFor(clazz)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import scala.language.postfixOps

import akka.actor._
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import org.apache.commons.io.FileUtils

import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,13 @@ private[spark] class MesosSchedulerBackend(
recordSlaveLost(d, slaveId, ExecutorExited(status))
}

override def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = {
driver.killTask(
TaskID.newBuilder()
.setValue(taskId.toString).build()
)
}

// TODO: query Mesos for number of cores
override def defaultParallelism() = sc.conf.getInt("spark.default.parallelism", 8)

Expand Down
16 changes: 15 additions & 1 deletion core/src/main/scala/org/apache/spark/storage/DiskStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,21 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
val startTime = System.currentTimeMillis
val file = diskManager.getFile(blockId)
val outputStream = new FileOutputStream(file)
blockManager.dataSerializeStream(blockId, outputStream, values)
try {
try {
blockManager.dataSerializeStream(blockId, outputStream, values)
} finally {
// Close outputStream here because it should be closed before file is deleted.
outputStream.close()
}
} catch {
case e: Throwable =>
if (file.exists()) {
file.delete()
}
throw e
}

val length = file.length

val timeTaken = System.currentTimeMillis - startTime
Expand Down
20 changes: 10 additions & 10 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ import scala.util.control.{ControlThrowable, NonFatal}

import com.google.common.io.Files
import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.commons.io.FileUtils
import org.apache.commons.io.filefilter.TrueFileFilter
import org.apache.commons.lang3.SystemUtils
import org.apache.hadoop.conf.Configuration
import org.apache.log4j.PropertyConfigurator
Expand Down Expand Up @@ -710,18 +708,20 @@ private[spark] object Utils extends Logging {
* Determines if a directory contains any files newer than cutoff seconds.
*
* @param dir must be the path to a directory, or IllegalArgumentException is thrown
* @param cutoff measured in seconds. Returns true if there are any files in dir newer than this.
* @param cutoff measured in seconds. Returns true if there are any files or directories in the
* given directory whose last modified time is later than this many seconds ago
*/
def doesDirectoryContainAnyNewFiles(dir: File, cutoff: Long): Boolean = {
val currentTimeMillis = System.currentTimeMillis
if (!dir.isDirectory) {
throw new IllegalArgumentException (dir + " is not a directory!")
} else {
val files = FileUtils.listFilesAndDirs(dir, TrueFileFilter.TRUE, TrueFileFilter.TRUE)
val cutoffTimeInMillis = (currentTimeMillis - (cutoff * 1000))
val newFiles = files.filter { _.lastModified > cutoffTimeInMillis }
newFiles.nonEmpty
throw new IllegalArgumentException("$dir is not a directory!")
}
val filesAndDirs = dir.listFiles()
val cutoffTimeInMillis = System.currentTimeMillis - (cutoff * 1000)

filesAndDirs.exists(_.lastModified() > cutoffTimeInMillis) ||
filesAndDirs.filter(_.isDirectory).exists(
subdir => doesDirectoryContainAnyNewFiles(subdir, cutoff)
)
}

/**
Expand Down
3 changes: 1 addition & 2 deletions dev/merge_spark_pr.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,10 @@ def fail(msg):


def run_cmd(cmd):
print cmd
if isinstance(cmd, list):
print " ".join(cmd)
return subprocess.check_output(cmd)
else:
print cmd
return subprocess.check_output(cmd.split(" "))


Expand Down
32 changes: 30 additions & 2 deletions dev/run-tests
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ cd "$FWDIR"
# Remove work directory
rm -rf ./work

source "$FWDIR/dev/run-tests-codes.sh"

CURRENT_BLOCK=$BLOCK_GENERAL

function handle_error () {
echo "[error] Got a return code of $? on line $1 of the run-tests script."
exit $CURRENT_BLOCK
}


# Build against the right verison of Hadoop.
{
if [ -n "$AMPLAB_JENKINS_BUILD_PROFILE" ]; then
Expand Down Expand Up @@ -91,33 +101,43 @@ if [ -n "$AMPLAB_JENKINS" ]; then
fi
fi

# Fail fast
set -e
set -o pipefail
trap 'handle_error $LINENO' ERR

echo ""
echo "========================================================================="
echo "Running Apache RAT checks"
echo "========================================================================="

CURRENT_BLOCK=$BLOCK_RAT

./dev/check-license

echo ""
echo "========================================================================="
echo "Running Scala style checks"
echo "========================================================================="

CURRENT_BLOCK=$BLOCK_SCALA_STYLE

./dev/lint-scala

echo ""
echo "========================================================================="
echo "Running Python style checks"
echo "========================================================================="

CURRENT_BLOCK=$BLOCK_PYTHON_STYLE

./dev/lint-python

echo ""
echo "========================================================================="
echo "Building Spark"
echo "========================================================================="

CURRENT_BLOCK=$BLOCK_BUILD

{
# We always build with Hive because the PySpark Spark SQL tests need it.
BUILD_MVN_PROFILE_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive"
Expand All @@ -141,6 +161,8 @@ echo "========================================================================="
echo "Running Spark unit tests"
echo "========================================================================="

CURRENT_BLOCK=$BLOCK_SPARK_UNIT_TESTS

{
# If the Spark SQL tests are enabled, run the tests with the Hive profiles enabled.
# This must be a single argument, as it is.
Expand Down Expand Up @@ -175,10 +197,16 @@ echo ""
echo "========================================================================="
echo "Running PySpark tests"
echo "========================================================================="

CURRENT_BLOCK=$BLOCK_PYSPARK_UNIT_TESTS

./python/run-tests

echo ""
echo "========================================================================="
echo "Detecting binary incompatibilites with MiMa"
echo "========================================================================="

CURRENT_BLOCK=$BLOCK_MIMA

./dev/mima
27 changes: 27 additions & 0 deletions dev/run-tests-codes.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

readonly BLOCK_GENERAL=10
readonly BLOCK_RAT=11
readonly BLOCK_SCALA_STYLE=12
readonly BLOCK_PYTHON_STYLE=13
readonly BLOCK_BUILD=14
readonly BLOCK_SPARK_UNIT_TESTS=15
readonly BLOCK_PYSPARK_UNIT_TESTS=16
readonly BLOCK_MIMA=17
Loading

0 comments on commit 8b32853

Please sign in to comment.