Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into pin-pages
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Jan 20, 2016
2 parents feb1172 + 753b194 commit 90cf403
Show file tree
Hide file tree
Showing 20 changed files with 351 additions and 68 deletions.
5 changes: 3 additions & 2 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ exportMethods("arrange",
"describe",
"dim",
"distinct",
"dropDuplicates",
"dropna",
"dtypes",
"except",
Expand Down Expand Up @@ -271,15 +272,15 @@ export("as.DataFrame",
"createExternalTable",
"dropTempTable",
"jsonFile",
"read.json",
"loadDF",
"parquetFile",
"read.df",
"read.json",
"read.parquet",
"read.text",
"sql",
"str",
"table",
"tableToDF",
"tableNames",
"tables",
"uncacheTable")
Expand Down
30 changes: 30 additions & 0 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -1645,6 +1645,36 @@ setMethod("where",
filter(x, condition)
})

#' dropDuplicates
#'
#' Returns a new DataFrame with duplicate rows removed, considering only
#' the subset of columns.
#'
#' @param x A DataFrame.
#' @param colnames A character vector of column names.
#' @return A DataFrame with duplicate rows removed.
#' @family DataFrame functions
#' @rdname dropduplicates
#' @name dropDuplicates
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- read.json(sqlContext, path)
#' dropDuplicates(df)
#' dropDuplicates(df, c("col1", "col2"))
#' }
setMethod("dropDuplicates",
signature(x = "DataFrame"),
function(x, colNames = columns(x)) {
stopifnot(class(colNames) == "character")

sdf <- callJMethod(x@sdf, "dropDuplicates", as.list(colNames))
dataFrame(sdf)
})

#' Join
#'
#' Join two DataFrames based on the given join expression.
Expand Down
7 changes: 4 additions & 3 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,8 @@ sql <- function(sqlContext, sqlQuery) {
#' @param sqlContext SQLContext to use
#' @param tableName The SparkSQL Table to convert to a DataFrame.
#' @return DataFrame
#' @rdname tableToDF
#' @name tableToDF
#' @export
#' @examples
#'\dontrun{
Expand All @@ -360,15 +362,14 @@ sql <- function(sqlContext, sqlQuery) {
#' path <- "path/to/file.json"
#' df <- read.json(sqlContext, path)
#' registerTempTable(df, "table")
#' new_df <- table(sqlContext, "table")
#' new_df <- tableToDF(sqlContext, "table")
#' }

table <- function(sqlContext, tableName) {
tableToDF <- function(sqlContext, tableName) {
sdf <- callJMethod(sqlContext, "table", tableName)
dataFrame(sdf)
}


#' Tables
#'
#' Returns a DataFrame containing names of tables in the given database.
Expand Down
7 changes: 7 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,13 @@ setGeneric("corr", function(x, ...) {standardGeneric("corr") })
#' @export
setGeneric("describe", function(x, col, ...) { standardGeneric("describe") })

#' @rdname dropduplicates
#' @export
setGeneric("dropDuplicates",
function(x, colNames = columns(x)) {
standardGeneric("dropDuplicates")
})

#' @rdname nafunctions
#' @export
setGeneric("dropna",
Expand Down
24 changes: 24 additions & 0 deletions R/pkg/inst/tests/testthat/test_context.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,30 @@

context("test functions in sparkR.R")

test_that("Check masked functions", {
# Check that we are not masking any new function from base, stats, testthat unexpectedly
masked <- conflicts(detail = TRUE)$`package:SparkR`
expect_true("describe" %in% masked) # only when with testthat..
func <- lapply(masked, function(x) { capture.output(showMethods(x))[[1]] })
funcSparkROrEmpty <- grepl("\\(package SparkR\\)$|^$", func)
maskedBySparkR <- masked[funcSparkROrEmpty]
namesOfMasked <- c("describe", "cov", "filter", "lag", "na.omit", "predict", "sd", "var",
"colnames", "colnames<-", "intersect", "rank", "rbind", "sample", "subset",
"summary", "transform")
expect_equal(length(maskedBySparkR), length(namesOfMasked))
expect_equal(sort(maskedBySparkR), sort(namesOfMasked))
# above are those reported as masked when `library(SparkR)`
# note that many of these methods are still callable without base:: or stats:: prefix
# there should be a test for each of these, except followings, which are currently "broken"
funcHasAny <- unlist(lapply(masked, function(x) {
any(grepl("=\"ANY\"", capture.output(showMethods(x)[-1])))
}))
maskedCompletely <- masked[!funcHasAny]
namesOfMaskedCompletely <- c("cov", "filter", "sample")
expect_equal(length(maskedCompletely), length(namesOfMaskedCompletely))
expect_equal(sort(maskedCompletely), sort(namesOfMaskedCompletely))
})

test_that("repeatedly starting and stopping SparkR", {
for (i in 1:4) {
sc <- sparkR.init()
Expand Down
51 changes: 41 additions & 10 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,6 @@ writeLines(mockLinesMapType, mapTypeJsonPath)
test_that("Collect DataFrame with complex types", {
# ArrayType
df <- read.json(sqlContext, complexTypeJsonPath)

ldf <- collect(df)
expect_equal(nrow(ldf), 3)
expect_equal(ncol(ldf), 3)
Expand Down Expand Up @@ -490,19 +489,15 @@ test_that("insertInto() on a registered table", {
unlink(parquetPath2)
})

test_that("table() returns a new DataFrame", {
test_that("tableToDF() returns a new DataFrame", {
df <- read.json(sqlContext, jsonPath)
registerTempTable(df, "table1")
tabledf <- table(sqlContext, "table1")
tabledf <- tableToDF(sqlContext, "table1")
expect_is(tabledf, "DataFrame")
expect_equal(count(tabledf), 3)
tabledf2 <- tableToDF(sqlContext, "table1")
expect_equal(count(tabledf2), 3)
dropTempTable(sqlContext, "table1")

# nolint start
# Test base::table is working
#a <- letters[1:3]
#expect_equal(class(table(a, sample(a))), "table")
# nolint end
})

test_that("toRDD() returns an RRDD", {
Expand Down Expand Up @@ -734,7 +729,7 @@ test_that("head() and first() return the correct data", {
expect_equal(ncol(testFirst), 2)
})

test_that("distinct() and unique on DataFrames", {
test_that("distinct(), unique() and dropDuplicates() on DataFrames", {
lines <- c("{\"name\":\"Michael\"}",
"{\"name\":\"Andy\", \"age\":30}",
"{\"name\":\"Justin\", \"age\":19}",
Expand All @@ -750,6 +745,42 @@ test_that("distinct() and unique on DataFrames", {
uniques2 <- unique(df)
expect_is(uniques2, "DataFrame")
expect_equal(count(uniques2), 3)

# Test dropDuplicates()
df <- createDataFrame(
sqlContext,
list(
list(2, 1, 2), list(1, 1, 1),
list(1, 2, 1), list(2, 1, 2),
list(2, 2, 2), list(2, 2, 1),
list(2, 1, 1), list(1, 1, 2),
list(1, 2, 2), list(1, 2, 1)),
schema = c("key", "value1", "value2"))
result <- collect(dropDuplicates(df))
expected <- rbind.data.frame(
c(1, 1, 1), c(1, 1, 2), c(1, 2, 1),
c(1, 2, 2), c(2, 1, 1), c(2, 1, 2),
c(2, 2, 1), c(2, 2, 2))
names(expected) <- c("key", "value1", "value2")
expect_equivalent(
result[order(result$key, result$value1, result$value2),],
expected)

result <- collect(dropDuplicates(df, c("key", "value1")))
expected <- rbind.data.frame(
c(1, 1, 1), c(1, 2, 1), c(2, 1, 2), c(2, 2, 2))
names(expected) <- c("key", "value1", "value2")
expect_equivalent(
result[order(result$key, result$value1, result$value2),],
expected)

result <- collect(dropDuplicates(df, "key"))
expected <- rbind.data.frame(
c(1, 1, 1), c(2, 1, 2))
names(expected) <- c("key", "value1", "value2")
expect_equivalent(
result[order(result$key, result$value1, result$value2),],
expected)
})

test_that("sample on a DataFrame", {
Expand Down
11 changes: 4 additions & 7 deletions docs/sparkr.md
Original file line number Diff line number Diff line change
Expand Up @@ -375,13 +375,6 @@ The following functions are masked by the SparkR package:
<td><code>sample</code> in <code>package:base</code></td>
<td><code>base::sample(x, size, replace = FALSE, prob = NULL)</code></td>
</tr>
<tr>
<td><code>table</code> in <code>package:base</code></td>
<td><code><pre>base::table(...,
exclude = if (useNA == "no") c(NA, NaN),
useNA = c("no", "ifany", "always"),
dnn = list.names(...), deparse.level = 1)</pre></code></td>
</tr>
</table>

Since part of SparkR is modeled on the `dplyr` package, certain functions in SparkR share the same names with those in `dplyr`. Depending on the load order of the two packages, some functions from the package loaded first are masked by those in the package loaded after. In such case, prefix such calls with the package name, for instance, `SparkR::cume_dist(x)` or `dplyr::cume_dist(x)`.
Expand All @@ -394,3 +387,7 @@ You can inspect the search path in R with [`search()`](https://stat.ethz.ch/R-ma
## Upgrading From SparkR 1.5.x to 1.6

- Before Spark 1.6, the default mode for writes was `append`. It was changed in Spark 1.6.0 to `error` to match the Scala API.

## Upgrading From SparkR 1.6.x to 2.0

- The method `table` has been removed and replaced by `tableToDF`.
1 change: 0 additions & 1 deletion python/pyspark/ml/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,6 @@ def test_fit_maximize_metric(self):


if __name__ == "__main__":
from pyspark.ml.tests import *
if xmlrunner:
unittest.main(testRunner=xmlrunner.XMLTestRunner(output='target/test-reports'))
else:
Expand Down
24 changes: 10 additions & 14 deletions python/pyspark/mllib/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,24 +77,21 @@
pass

ser = PickleSerializer()
sc = SparkContext('local[4]', "MLlib tests")


class MLlibTestCase(unittest.TestCase):
def setUp(self):
self.sc = SparkContext('local[4]', "MLlib tests")

def tearDown(self):
self.sc.stop()
self.sc = sc


class MLLibStreamingTestCase(unittest.TestCase):
def setUp(self):
self.sc = SparkContext('local[4]', "MLlib tests")
self.sc = sc
self.ssc = StreamingContext(self.sc, 1.0)

def tearDown(self):
self.ssc.stop(False)
self.sc.stop()

@staticmethod
def _eventually(condition, timeout=30.0, catch_assertions=False):
Expand Down Expand Up @@ -1169,7 +1166,7 @@ def test_predictOn_model(self):
clusterWeights=[1.0, 1.0, 1.0, 1.0])

predict_data = [[[1.5, 1.5]], [[-1.5, 1.5]], [[-1.5, -1.5]], [[1.5, -1.5]]]
predict_data = [self.sc.parallelize(batch, 1) for batch in predict_data]
predict_data = [sc.parallelize(batch, 1) for batch in predict_data]
predict_stream = self.ssc.queueStream(predict_data)
predict_val = stkm.predictOn(predict_stream)

Expand Down Expand Up @@ -1200,7 +1197,7 @@ def test_trainOn_predictOn(self):
# classification based in the initial model would have been 0
# proving that the model is updated.
batches = [[[-0.5], [0.6], [0.8]], [[0.2], [-0.1], [0.3]]]
batches = [self.sc.parallelize(batch) for batch in batches]
batches = [sc.parallelize(batch) for batch in batches]
input_stream = self.ssc.queueStream(batches)
predict_results = []

Expand Down Expand Up @@ -1233,7 +1230,7 @@ def test_dim(self):
self.assertEqual(len(point.features), 3)

linear_data = LinearDataGenerator.generateLinearRDD(
sc=self.sc, nexamples=6, nfeatures=2, eps=0.1,
sc=sc, nexamples=6, nfeatures=2, eps=0.1,
nParts=2, intercept=0.0).collect()
self.assertEqual(len(linear_data), 6)
for point in linear_data:
Expand Down Expand Up @@ -1409,7 +1406,7 @@ def test_parameter_accuracy(self):
for i in range(10):
batch = LinearDataGenerator.generateLinearInput(
0.0, [10.0, 10.0], xMean, xVariance, 100, 42 + i, 0.1)
batches.append(self.sc.parallelize(batch))
batches.append(sc.parallelize(batch))

input_stream = self.ssc.queueStream(batches)
slr.trainOn(input_stream)
Expand All @@ -1433,7 +1430,7 @@ def test_parameter_convergence(self):
for i in range(10):
batch = LinearDataGenerator.generateLinearInput(
0.0, [10.0], [0.0], [1.0 / 3.0], 100, 42 + i, 0.1)
batches.append(self.sc.parallelize(batch))
batches.append(sc.parallelize(batch))

model_weights = []
input_stream = self.ssc.queueStream(batches)
Expand Down Expand Up @@ -1466,7 +1463,7 @@ def test_prediction(self):
0.0, [10.0, 10.0], [0.0, 0.0], [1.0 / 3.0, 1.0 / 3.0],
100, 42 + i, 0.1)
batches.append(
self.sc.parallelize(batch).map(lambda lp: (lp.label, lp.features)))
sc.parallelize(batch).map(lambda lp: (lp.label, lp.features)))

input_stream = self.ssc.queueStream(batches)
output_stream = slr.predictOnValues(input_stream)
Expand Down Expand Up @@ -1497,7 +1494,7 @@ def test_train_prediction(self):
for i in range(10):
batch = LinearDataGenerator.generateLinearInput(
0.0, [10.0], [0.0], [1.0 / 3.0], 100, 42 + i, 0.1)
batches.append(self.sc.parallelize(batch))
batches.append(sc.parallelize(batch))

predict_batches = [
b.map(lambda lp: (lp.label, lp.features)) for b in batches]
Expand Down Expand Up @@ -1583,7 +1580,6 @@ def test_als_ratings_id_long_error(self):


if __name__ == "__main__":
from pyspark.mllib.tests import *
if not _have_scipy:
print("NOTE: Skipping SciPy tests as it does not seem to be installed")
if xmlrunner:
Expand Down
1 change: 0 additions & 1 deletion python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1259,7 +1259,6 @@ def test_collect_functions(self):


if __name__ == "__main__":
from pyspark.sql.tests import *
if xmlrunner:
unittest.main(testRunner=xmlrunner.XMLTestRunner(output='target/test-reports'))
else:
Expand Down
1 change: 0 additions & 1 deletion python/pyspark/streaming/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1635,7 +1635,6 @@ def search_kinesis_asl_assembly_jar():
are_kinesis_tests_enabled = os.environ.get(kinesis_test_environ_var) == '1'

if __name__ == "__main__":
from pyspark.streaming.tests import *
kafka_assembly_jar = search_kafka_assembly_jar()
flume_assembly_jar = search_flume_assembly_jar()
mqtt_assembly_jar = search_mqtt_assembly_jar()
Expand Down
1 change: 0 additions & 1 deletion python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -2008,7 +2008,6 @@ def test_statcounter_array(self):


if __name__ == "__main__":
from pyspark.tests import *
if not _have_scipy:
print("NOTE: Skipping SciPy tests as it does not seem to be installed")
if not _have_numpy:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ class Analyzer(
* Replaces [[UnresolvedRelation]]s with concrete relations from the catalog.
*/
object ResolveRelations extends Rule[LogicalPlan] {
def getTable(u: UnresolvedRelation): LogicalPlan = {
private def getTable(u: UnresolvedRelation): LogicalPlan = {
try {
catalog.lookupRelation(u.tableIdentifier, u.alias)
} catch {
Expand Down Expand Up @@ -1165,7 +1165,7 @@ class Analyzer(
* scoping information for attributes and can be removed once analysis is complete.
*/
object EliminateSubQueries extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case Subquery(_, child) => child
}
}
Expand Down
Loading

0 comments on commit 90cf403

Please sign in to comment.