Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into multi-way-join-plan…
Browse files Browse the repository at this point in the history
…ning-improvements
  • Loading branch information
JoshRosen committed Jul 31, 2015
2 parents cd8269b + 3c66ff7 commit 5c45924
Show file tree
Hide file tree
Showing 193 changed files with 6,471 additions and 1,668 deletions.
3 changes: 2 additions & 1 deletion R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ export("print.jobj")

# MLlib integration
exportMethods("glm",
"predict")
"predict",
"summary")

# Job group lifecycle management methods
export("setJobGroup",
Expand Down
4 changes: 3 additions & 1 deletion R/pkg/R/backend.R
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ invokeJava <- function(isStatic, objId, methodName, ...) {

# TODO: check the status code to output error information
returnStatus <- readInt(conn)
stopifnot(returnStatus == 0)
if (returnStatus != 0) {
stop(readString(conn))
}
readObject(conn)
}
14 changes: 7 additions & 7 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,10 @@ setGeneric("flatMapValues", function(X, FUN) { standardGeneric("flatMapValues")

# @rdname intersection
# @export
setGeneric("intersection", function(x, other, numPartitions = 1) {
standardGeneric("intersection") })
setGeneric("intersection",
function(x, other, numPartitions = 1) {
standardGeneric("intersection")
})

# @rdname keys
# @export
Expand Down Expand Up @@ -489,9 +491,7 @@ setGeneric("sample",
#' @rdname sample
#' @export
setGeneric("sample_frac",
function(x, withReplacement, fraction, seed) {
standardGeneric("sample_frac")
})
function(x, withReplacement, fraction, seed) { standardGeneric("sample_frac") })

#' @rdname saveAsParquetFile
#' @export
Expand Down Expand Up @@ -553,8 +553,8 @@ setGeneric("withColumn", function(x, colName, col) { standardGeneric("withColumn

#' @rdname withColumnRenamed
#' @export
setGeneric("withColumnRenamed", function(x, existingCol, newCol) {
standardGeneric("withColumnRenamed") })
setGeneric("withColumnRenamed",
function(x, existingCol, newCol) { standardGeneric("withColumnRenamed") })


###################### Column Methods ##########################
Expand Down
26 changes: 26 additions & 0 deletions R/pkg/R/mllib.R
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,29 @@ setMethod("predict", signature(object = "PipelineModel"),
function(object, newData) {
return(dataFrame(callJMethod(object@model, "transform", newData@sdf)))
})

#' Get the summary of a model
#'
#' Returns the summary of a model produced by glm(), similarly to R's summary().
#'
#' @param model A fitted MLlib model
#' @return a list with a 'coefficient' component, which is the matrix of coefficients. See
#' summary.glm for more information.
#' @rdname glm
#' @export
#' @examples
#'\dontrun{
#' model <- glm(y ~ x, trainingData)
#' summary(model)
#'}
setMethod("summary", signature(object = "PipelineModel"),
function(object) {
features <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
"getModelFeatures", object@model)
weights <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
"getModelWeights", object@model)
coefficients <- as.matrix(unlist(weights))
colnames(coefficients) <- c("Estimate")
rownames(coefficients) <- unlist(features)
return(list(coefficients = coefficients))
})
4 changes: 2 additions & 2 deletions R/pkg/R/pairRDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,8 @@ setMethod("partitionBy",

packageNamesArr <- serialize(.sparkREnv$.packages,
connection = NULL)
broadcastArr <- lapply(ls(.broadcastNames), function(name) {
get(name, .broadcastNames) })
broadcastArr <- lapply(ls(.broadcastNames),
function(name) { get(name, .broadcastNames) })
jrdd <- getJRDD(x)

# We create a PairwiseRRDD that extends RDD[(Int, Array[Byte])],
Expand Down
9 changes: 6 additions & 3 deletions R/pkg/R/sparkR.R
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
connExists <- function(env) {
tryCatch({
exists(".sparkRCon", envir = env) && isOpen(env[[".sparkRCon"]])
}, error = function(err) {
},
error = function(err) {
return(FALSE)
})
}
Expand Down Expand Up @@ -153,7 +154,8 @@ sparkR.init <- function(
.sparkREnv$backendPort <- backendPort
tryCatch({
connectBackend("localhost", backendPort)
}, error = function(err) {
},
error = function(err) {
stop("Failed to connect JVM\n")
})

Expand Down Expand Up @@ -264,7 +266,8 @@ sparkRHive.init <- function(jsc = NULL) {
ssc <- callJMethod(sc, "sc")
hiveCtx <- tryCatch({
newJObject("org.apache.spark.sql.hive.HiveContext", ssc)
}, error = function(err) {
},
error = function(err) {
stop("Spark SQL is not built with Hive support")
})

Expand Down
11 changes: 11 additions & 0 deletions R/pkg/inst/tests/test_mllib.R
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,14 @@ test_that("dot minus and intercept vs native glm", {
rVals <- predict(glm(Sepal.Width ~ . - Species + 0, data = iris), iris)
expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
})

test_that("summary coefficients match with native glm", {
training <- createDataFrame(sqlContext, iris)
stats <- summary(glm(Sepal_Width ~ Sepal_Length + Species, data = training))
coefs <- as.vector(stats$coefficients)
rCoefs <- as.vector(coef(glm(Sepal.Width ~ Sepal.Length + Species, data = iris)))
expect_true(all(abs(rCoefs - coefs) < 1e-6))
expect_true(all(
as.character(stats$features) ==
c("(Intercept)", "Sepal_Length", "Species__versicolor", "Species__virginica")))
})
11 changes: 9 additions & 2 deletions R/pkg/inst/tests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ test_that("create DataFrame from RDD", {
df <- jsonFile(sqlContext, jsonPathNa)
hiveCtx <- tryCatch({
newJObject("org.apache.spark.sql.hive.test.TestHiveContext", ssc)
}, error = function(err) {
},
error = function(err) {
skip("Hive is not build with SparkSQL, skipped")
})
sql(hiveCtx, "CREATE TABLE people (name string, age double, height float)")
Expand Down Expand Up @@ -602,7 +603,8 @@ test_that("write.df() as parquet file", {
test_that("test HiveContext", {
hiveCtx <- tryCatch({
newJObject("org.apache.spark.sql.hive.test.TestHiveContext", ssc)
}, error = function(err) {
},
error = function(err) {
skip("Hive is not build with SparkSQL, skipped")
})
df <- createExternalTable(hiveCtx, "json", jsonPath, "json")
Expand Down Expand Up @@ -1000,6 +1002,11 @@ test_that("crosstab() on a DataFrame", {
expect_identical(expected, ordered)
})

test_that("SQL error message is returned from JVM", {
retError <- tryCatch(sql(sqlContext, "select * from blah"), error = function(e) e)
expect_equal(grepl("Table Not Found: blah", retError), TRUE)
})

unlink(parquetPath)
unlink(jsonPath)
unlink(jsonPathNa)
2 changes: 1 addition & 1 deletion R/run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ FAILED=0
LOGFILE=$FWDIR/unit-tests.out
rm -f $LOGFILE

SPARK_TESTING=1 $FWDIR/../bin/sparkR --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
SPARK_TESTING=1 $FWDIR/../bin/sparkR --conf spark.buffer.pageSize=4m --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
FAILED=$((PIPESTATUS[0]||$FAILED))

if [[ $FAILED != 0 ]]; then
Expand Down
34 changes: 5 additions & 29 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@
<dependency>
<groupId>org.tachyonproject</groupId>
<artifactId>tachyon-client</artifactId>
<version>0.6.4</version>
<version>0.7.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
Expand All @@ -297,36 +297,12 @@
<artifactId>curator-recipes</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-jsp</artifactId>
<groupId>org.tachyonproject</groupId>
<artifactId>tachyon-underfs-glusterfs</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-webapp</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
</exclusion>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
<exclusion>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
</exclusion>
<exclusion>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<groupId>org.tachyonproject</groupId>
<artifactId>tachyon-underfs-s3</artifactId>
</exclusion>
</exclusions>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@

package org.apache.spark.util.collection.unsafe.sort;

import com.google.common.base.Charsets;
import com.google.common.primitives.Longs;
import com.google.common.primitives.UnsignedBytes;
import com.google.common.primitives.UnsignedLongs;

import org.apache.spark.annotation.Private;
import org.apache.spark.unsafe.types.UTF8String;
Expand All @@ -30,81 +28,67 @@ public class PrefixComparators {
private PrefixComparators() {}

public static final StringPrefixComparator STRING = new StringPrefixComparator();
public static final IntegralPrefixComparator INTEGRAL = new IntegralPrefixComparator();
public static final FloatPrefixComparator FLOAT = new FloatPrefixComparator();
public static final StringPrefixComparatorDesc STRING_DESC = new StringPrefixComparatorDesc();
public static final LongPrefixComparator LONG = new LongPrefixComparator();
public static final LongPrefixComparatorDesc LONG_DESC = new LongPrefixComparatorDesc();
public static final DoublePrefixComparator DOUBLE = new DoublePrefixComparator();
public static final DoublePrefixComparatorDesc DOUBLE_DESC = new DoublePrefixComparatorDesc();

public static final class StringPrefixComparator extends PrefixComparator {
@Override
public int compare(long aPrefix, long bPrefix) {
// TODO: can done more efficiently
byte[] a = Longs.toByteArray(aPrefix);
byte[] b = Longs.toByteArray(bPrefix);
for (int i = 0; i < 8; i++) {
int c = UnsignedBytes.compare(a[i], b[i]);
if (c != 0) return c;
}
return 0;
return UnsignedLongs.compare(aPrefix, bPrefix);
}

public long computePrefix(byte[] bytes) {
if (bytes == null) {
return 0L;
} else {
byte[] padded = new byte[8];
System.arraycopy(bytes, 0, padded, 0, Math.min(bytes.length, 8));
return Longs.fromByteArray(padded);
}
}

public long computePrefix(String value) {
return value == null ? 0L : computePrefix(value.getBytes(Charsets.UTF_8));
public static long computePrefix(UTF8String value) {
return value == null ? 0L : value.getPrefix();
}
}

public long computePrefix(UTF8String value) {
return value == null ? 0L : computePrefix(value.getBytes());
public static final class StringPrefixComparatorDesc extends PrefixComparator {
@Override
public int compare(long bPrefix, long aPrefix) {
return UnsignedLongs.compare(aPrefix, bPrefix);
}
}

/**
* Prefix comparator for all integral types (boolean, byte, short, int, long).
*/
public static final class IntegralPrefixComparator extends PrefixComparator {
public static final class LongPrefixComparator extends PrefixComparator {
@Override
public int compare(long a, long b) {
return (a < b) ? -1 : (a > b) ? 1 : 0;
}
}

public final long NULL_PREFIX = Long.MIN_VALUE;
public static final class LongPrefixComparatorDesc extends PrefixComparator {
@Override
public int compare(long b, long a) {
return (a < b) ? -1 : (a > b) ? 1 : 0;
}
}

public static final class FloatPrefixComparator extends PrefixComparator {
public static final class DoublePrefixComparator extends PrefixComparator {
@Override
public int compare(long aPrefix, long bPrefix) {
float a = Float.intBitsToFloat((int) aPrefix);
float b = Float.intBitsToFloat((int) bPrefix);
return Utils.nanSafeCompareFloats(a, b);
double a = Double.longBitsToDouble(aPrefix);
double b = Double.longBitsToDouble(bPrefix);
return Utils.nanSafeCompareDoubles(a, b);
}

public long computePrefix(float value) {
return Float.floatToIntBits(value) & 0xffffffffL;
public static long computePrefix(double value) {
return Double.doubleToLongBits(value);
}

public final long NULL_PREFIX = computePrefix(Float.NEGATIVE_INFINITY);
}

public static final class DoublePrefixComparator extends PrefixComparator {
public static final class DoublePrefixComparatorDesc extends PrefixComparator {
@Override
public int compare(long aPrefix, long bPrefix) {
public int compare(long bPrefix, long aPrefix) {
double a = Double.longBitsToDouble(aPrefix);
double b = Double.longBitsToDouble(bPrefix);
return Utils.nanSafeCompareDoubles(a, b);
}

public long computePrefix(double value) {
public static long computePrefix(double value) {
return Double.doubleToLongBits(value);
}

public final long NULL_PREFIX = computePrefix(Double.NEGATIVE_INFINITY);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import java.io.IOException;
import java.util.LinkedList;

import scala.runtime.AbstractFunction0;
import scala.runtime.BoxedUnit;

import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -90,6 +93,17 @@ public UnsafeExternalSorter(
this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
this.pageSizeBytes = conf.getSizeAsBytes("spark.buffer.pageSize", "64m");
initializeForWriting();

// Register a cleanup task with TaskContext to ensure that memory is guaranteed to be freed at
// the end of the task. This is necessary to avoid memory leaks in when the downstream operator
// does not fully consume the sorter's output (e.g. sort followed by limit).
taskContext.addOnCompleteCallback(new AbstractFunction0<BoxedUnit>() {
@Override
public BoxedUnit apply() {
freeMemory();
return null;
}
});
}

// TODO: metrics tracking + integration with shuffle write metrics
Expand Down
3 changes: 0 additions & 3 deletions core/src/main/scala/org/apache/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,4 @@ private[spark] object Accumulators extends Logging {
}
}

def stringifyPartialValue(partialValue: Any): String = "%s".format(partialValue)

def stringifyValue(value: Any): String = "%s".format(value)
}
Loading

0 comments on commit 5c45924

Please sign in to comment.