Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-4705
Browse files Browse the repository at this point in the history
Conflicts:
	core/src/main/scala/org/apache/spark/SparkContext.scala
  • Loading branch information
Marcelo Vanzin committed Apr 17, 2015
2 parents 86de638 + c84d916 commit 9092d39
Show file tree
Hide file tree
Showing 295 changed files with 5,698 additions and 3,181 deletions.
6 changes: 3 additions & 3 deletions R/pkg/DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@ License: Apache License (== 2.0)
Collate:
'generics.R'
'jobj.R'
'SQLTypes.R'
'RDD.R'
'pairRDD.R'
'SQLTypes.R'
'column.R'
'group.R'
'DataFrame.R'
'SQLContext.R'
'backend.R'
'broadcast.R'
'client.R'
'context.R'
'deserialize.R'
'serialize.R'
'sparkR.R'
'backend.R'
'client.R'
'utils.R'
'zzz.R'
2 changes: 1 addition & 1 deletion R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

# DataFrame.R - DataFrame class and methods implemented in S4 OO classes

#' @include jobj.R SQLTypes.R RDD.R pairRDD.R column.R group.R
#' @include generics.R jobj.R SQLTypes.R RDD.R pairRDD.R column.R group.R
NULL

setOldClass("jobj")
Expand Down
18 changes: 5 additions & 13 deletions R/pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val)

if (!inherits(prev, "PipelinedRDD") || !isPipelinable(prev)) {
# This transformation is the first in its stage:
.Object@func <- func
.Object@func <- cleanClosure(func)
.Object@prev_jrdd <- getJRDD(prev)
.Object@env$prev_serializedMode <- prev@env$serializedMode
# NOTE: We use prev_serializedMode to track the serialization mode of prev_JRDD
Expand All @@ -94,7 +94,7 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val)
pipelinedFunc <- function(split, iterator) {
func(split, prev@func(split, iterator))
}
.Object@func <- pipelinedFunc
.Object@func <- cleanClosure(pipelinedFunc)
.Object@prev_jrdd <- prev@prev_jrdd # maintain the pipeline
# Get the serialization mode of the parent RDD
.Object@env$prev_serializedMode <- prev@env$prev_serializedMode
Expand Down Expand Up @@ -144,17 +144,13 @@ setMethod("getJRDD", signature(rdd = "PipelinedRDD"),
return(rdd@env$jrdd_val)
}

computeFunc <- function(split, part) {
rdd@func(split, part)
}

packageNamesArr <- serialize(.sparkREnv[[".packages"]],
connection = NULL)

broadcastArr <- lapply(ls(.broadcastNames),
function(name) { get(name, .broadcastNames) })

serializedFuncArr <- serialize(computeFunc, connection = NULL)
serializedFuncArr <- serialize(rdd@func, connection = NULL)

prev_jrdd <- rdd@prev_jrdd

Expand Down Expand Up @@ -279,7 +275,7 @@ setMethod("unpersist",
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' setCheckpointDir(sc, "checkpoints")
#' setCheckpointDir(sc, "checkpoint")
#' rdd <- parallelize(sc, 1:10, 2L)
#' checkpoint(rdd)
#'}
Expand Down Expand Up @@ -551,11 +547,7 @@ setMethod("mapPartitions",
setMethod("lapplyPartitionsWithIndex",
signature(X = "RDD", FUN = "function"),
function(X, FUN) {
FUN <- cleanClosure(FUN)
closureCapturingFunc <- function(split, part) {
FUN(split, part)
}
PipelinedRDD(X, closureCapturingFunc)
PipelinedRDD(X, FUN)
})

#' @rdname lapplyPartitionsWithIndex
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/column.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

# Column Class

#' @include generics.R jobj.R
#' @include generics.R jobj.R SQLTypes.R
NULL

setOldClass("jobj")
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/context.R
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ broadcast <- function(sc, object) {
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' setCheckpointDir(sc, "~/checkpoints")
#' setCheckpointDir(sc, "~/checkpoint")
#' rdd <- parallelize(sc, 1:2, 2L)
#' checkpoint(rdd)
#'}
Expand Down
3 changes: 3 additions & 0 deletions R/pkg/R/group.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

# group.R - GroupedData class and methods implemented in S4 OO classes

#' @include generics.R jobj.R SQLTypes.R column.R
NULL

setOldClass("jobj")

#' @title S4 class that represents a GroupedData
Expand Down
3 changes: 3 additions & 0 deletions R/pkg/R/jobj.R
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
# References to objects that exist on the JVM backend
# are maintained using the jobj.

#' @include generics.R
NULL

# Maintain a reference count of Java object references
# This allows us to GC the java object when it is safe
.validJobjs <- new.env(parent = emptyenv())
Expand Down
6 changes: 2 additions & 4 deletions R/pkg/R/pairRDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#

# Operations supported on RDDs contains pairs (i.e key, value)
#' @include generics.R jobj.R RDD.R
NULL

############ Actions and Transformations ############

Expand Down Expand Up @@ -694,10 +696,6 @@ setMethod("cogroup",
for (i in 1:rddsLen) {
rdds[[i]] <- lapply(rdds[[i]],
function(x) { list(x[[1]], list(i, x[[2]])) })
# TODO(hao): As issue [SparkR-142] mentions, the right value of i
# will not be captured into UDF if getJRDD is not invoked.
# It should be resolved together with that issue.
getJRDD(rdds[[i]]) # Capture the closure.
}
union.rdd <- Reduce(unionRDD, rdds)
group.func <- function(vlist) {
Expand Down
5 changes: 3 additions & 2 deletions R/pkg/inst/tests/test_rdd.R
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ test_that("PipelinedRDD support actions: cache(), persist(), unpersist(), checkp
unpersist(rdd2)
expect_false(rdd2@env$isCached)

setCheckpointDir(sc, "checkpoints")
tempDir <- tempfile(pattern = "checkpoint")
setCheckpointDir(sc, tempDir)
checkpoint(rdd2)
expect_true(rdd2@env$isCheckpointed)

Expand All @@ -152,7 +153,7 @@ test_that("PipelinedRDD support actions: cache(), persist(), unpersist(), checkp
# make sure the data is collectable
collect(rdd2)

unlink("checkpoints")
unlink(tempDir)
})

test_that("reduce on RDD", {
Expand Down
1 change: 1 addition & 0 deletions bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ export PYTHONSTARTUP="$SPARK_HOME/python/pyspark/shell.py"
if [[ -n "$SPARK_TESTING" ]]; then
unset YARN_CONF_DIR
unset HADOOP_CONF_DIR
export PYTHONHASHSEED=0
if [[ -n "$PYSPARK_DOC_TEST" ]]; then
exec "$PYSPARK_DRIVER_PYTHON" -m doctest $1
else
Expand Down
11 changes: 10 additions & 1 deletion bin/spark-class
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,22 @@ if [ $(command -v "$JAR_CMD") ] ; then
fi
fi

LAUNCH_CLASSPATH="$SPARK_ASSEMBLY_JAR"

# Add the launcher build dir to the classpath if requested.
if [ -n "$SPARK_PREPEND_CLASSES" ]; then
LAUNCH_CLASSPATH="$SPARK_HOME/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"
fi

export _SPARK_ASSEMBLY="$SPARK_ASSEMBLY_JAR"

# The launcher library will print arguments separated by a NULL character, to allow arguments with
# characters that would be otherwise interpreted by the shell. Read that in a while loop, populating
# an array that will be used to exec the final command.
CMD=()
while IFS= read -d '' -r ARG; do
CMD+=("$ARG")
done < <("$RUNNER" -cp "$SPARK_ASSEMBLY_JAR" org.apache.spark.launcher.Main "$@")
done < <("$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@")

if [ "${CMD[0]}" = "usage" ]; then
"${CMD[@]}"
Expand Down
11 changes: 10 additions & 1 deletion bin/spark-class2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,22 @@ if "%SPARK_ASSEMBLY_JAR%"=="0" (
exit /b 1
)

set LAUNCH_CLASSPATH=%SPARK_ASSEMBLY_JAR%

rem Add the launcher build dir to the classpath if requested.
if not "x%SPARK_PREPEND_CLASSES%"=="x" (
set LAUNCH_CLASSPATH=%SPARK_HOME%\launcher\target\scala-%SPARK_SCALA_VERSION%\classes;%LAUNCH_CLASSPATH%
)

set _SPARK_ASSEMBLY=%SPARK_ASSEMBLY_JAR%

rem Figure out where java is.
set RUNNER=java
if not "x%JAVA_HOME%"=="x" set RUNNER=%JAVA_HOME%\bin\java

rem The launcher library prints the command to be executed in a single line suitable for being
rem executed by the batch interpreter. So read all the output of the launcher into a variable.
for /f "tokens=*" %%i in ('cmd /C ""%RUNNER%" -cp %SPARK_ASSEMBLY_JAR% org.apache.spark.launcher.Main %*"') do (
for /f "tokens=*" %%i in ('cmd /C ""%RUNNER%" -cp %LAUNCH_CLASSPATH% org.apache.spark.launcher.Main %*"') do (
set SPARK_CMD=%%i
)
%SPARK_CMD%
3 changes: 3 additions & 0 deletions bin/spark-submit
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"

# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0

# Only define a usage function if an upstream script hasn't done so.
if ! type -t usage >/dev/null 2>&1; then
usage() {
Expand Down
3 changes: 3 additions & 0 deletions bin/spark-submit2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ rem
rem This is the entry point for running Spark submit. To avoid polluting the
rem environment, it just launches a new cmd to do the real work.

rem disable randomized hash for string in Python 3.3+
set PYTHONHASHSEED=0

set CLASS=org.apache.spark.deploy.SparkSubmit
call %~dp0spark-class2.cmd %CLASS% %*
set SPARK_ERROR_LEVEL=%ERRORLEVEL%
Expand Down
6 changes: 1 addition & 5 deletions core/src/main/resources/org/apache/spark/ui/static/webui.css
Original file line number Diff line number Diff line change
Expand Up @@ -85,17 +85,13 @@ table.sortable td {
filter: progid:dximagetransform.microsoft.gradient(startColorstr='#FFA4EDFF', endColorstr='#FF94DDFF', GradientType=0);
}

span.kill-link {
a.kill-link {
margin-right: 2px;
margin-left: 20px;
color: gray;
float: right;
}

span.kill-link a {
color: gray;
}

span.expand-details {
font-size: 10pt;
cursor: pointer;
Expand Down
Loading

0 comments on commit 9092d39

Please sign in to comment.