Skip to content

Commit

Permalink
Merge pull request #116 from palantir/rk/merge-upstream
Browse files Browse the repository at this point in the history
  • Loading branch information
robert3005 authored Mar 6, 2017
2 parents 10a7b4e + 6104c91 commit 704fc2c
Show file tree
Hide file tree
Showing 1,141 changed files with 34,614 additions and 14,630 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ dependency-reduced-pom.xml
derby.log
dev/create-release/*final
dev/create-release/*txt
dev/pr-deps/
dist/
docs/_site
docs/api
Expand Down
3 changes: 1 addition & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ dist: trusty
# 2. Choose language and target JDKs for parallel builds.
language: java
jdk:
- oraclejdk7
- oraclejdk8

# 3. Setup cache directory for SBT and Maven.
Expand All @@ -44,7 +43,7 @@ notifications:
# 5. Run maven install before running lint-java.
install:
- export MAVEN_SKIP_RC=1
- build/mvn -T 4 -q -DskipTests -Pmesos -Pyarn -Phadoop-2.3 -Pkinesis-asl -Phive -Phive-thriftserver install
- build/mvn -T 4 -q -DskipTests -Pmesos -Pyarn -Pkinesis-asl -Phive -Phive-thriftserver install

# 6. Run lint-java.
script:
Expand Down
4 changes: 2 additions & 2 deletions R/WINDOWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ To build SparkR on Windows, the following steps are required
include Rtools and R in `PATH`.

2. Install
[JDK7](http://www.oracle.com/technetwork/java/javase/downloads/jdk7-downloads-1880260.html) and set
[JDK8](http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html) and set
`JAVA_HOME` in the system environment variables.

3. Download and install [Maven](http://maven.apache.org/download.html). Also include the `bin`
Expand Down Expand Up @@ -38,6 +38,6 @@ To run the SparkR unit tests on Windows, the following steps are required —ass

```
R -e "install.packages('testthat', repos='http://cran.us.r-project.org')"
.\bin\spark-submit2.cmd --conf spark.hadoop.fs.default.name="file:///" R\pkg\tests\run-all.R
.\bin\spark-submit2.cmd --conf spark.hadoop.fs.defaultFS="file:///" R\pkg\tests\run-all.R
```

26 changes: 25 additions & 1 deletion R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Imports from base R
# Do not include stats:: "rpois", "runif" - causes error at runtime
importFrom("methods", "setGeneric", "setMethod", "setOldClass")
Expand Down Expand Up @@ -47,7 +64,9 @@ exportMethods("glm",
"spark.kstest",
"spark.logit",
"spark.randomForest",
"spark.gbt")
"spark.gbt",
"spark.bisectingKmeans",
"spark.svmLinear")

# Job group lifecycle management methods
export("setJobGroup",
Expand All @@ -63,6 +82,7 @@ exportMethods("arrange",
"as.data.frame",
"attach",
"cache",
"coalesce",
"collect",
"colnames",
"colnames<-",
Expand Down Expand Up @@ -94,6 +114,7 @@ exportMethods("arrange",
"freqItems",
"gapply",
"gapplyCollect",
"getNumPartitions",
"group_by",
"groupBy",
"head",
Expand Down Expand Up @@ -208,6 +229,7 @@ exportMethods("%in%",
"floor",
"format_number",
"format_string",
"from_json",
"from_unixtime",
"from_utc_timestamp",
"getField",
Expand Down Expand Up @@ -306,6 +328,8 @@ exportMethods("%in%",
"toDegrees",
"toRadians",
"to_date",
"to_json",
"to_timestamp",
"to_utc_timestamp",
"translate",
"trim",
Expand Down
103 changes: 92 additions & 11 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ setMethod("dtypes",

#' Column Names of SparkDataFrame
#'
#' Return all column names as a list.
#' Return a vector of column names.
#'
#' @param x a SparkDataFrame.
#'
Expand Down Expand Up @@ -323,10 +323,8 @@ setMethod("names",
setMethod("names<-",
signature(x = "SparkDataFrame"),
function(x, value) {
if (!is.null(value)) {
sdf <- callJMethod(x@sdf, "toDF", as.list(value))
dataFrame(sdf)
}
colnames(x) <- value
x
})

#' @rdname columns
Expand All @@ -340,7 +338,7 @@ setMethod("colnames",
})

#' @param value a character vector. Must have the same length as the number
#' of columns in the SparkDataFrame.
#' of columns to be renamed.
#' @rdname columns
#' @aliases colnames<-,SparkDataFrame-method
#' @name colnames<-
Expand Down Expand Up @@ -417,7 +415,7 @@ setMethod("coltypes",
type <- PRIMITIVE_TYPES[[specialtype]]
}
}
type
type[[1]]
})

# Find which types don't have mapping to R
Expand Down Expand Up @@ -680,14 +678,53 @@ setMethod("storageLevel",
storageLevelToString(callJMethod(x@sdf, "storageLevel"))
})

#' Coalesce
#'
#' Returns a new SparkDataFrame that has exactly \code{numPartitions} partitions.
#' This operation results in a narrow dependency, e.g. if you go from 1000 partitions to 100
#' partitions, there will not be a shuffle, instead each of the 100 new partitions will claim 10 of
#' the current partitions. If a larger number of partitions is requested, it will stay at the
#' current number of partitions.
#'
#' However, if you're doing a drastic coalesce on a SparkDataFrame, e.g. to numPartitions = 1,
#' this may result in your computation taking place on fewer nodes than
#' you like (e.g. one node in the case of numPartitions = 1). To avoid this,
#' call \code{repartition}. This will add a shuffle step, but means the
#' current upstream partitions will be executed in parallel (per whatever
#' the current partitioning is).
#'
#' @param numPartitions the number of partitions to use.
#'
#' @family SparkDataFrame functions
#' @rdname coalesce
#' @name coalesce
#' @aliases coalesce,SparkDataFrame-method
#' @seealso \link{repartition}
#' @export
#' @examples
#'\dontrun{
#' sparkR.session()
#' path <- "path/to/file.json"
#' df <- read.json(path)
#' newDF <- coalesce(df, 1L)
#'}
#' @note coalesce(SparkDataFrame) since 2.1.1
setMethod("coalesce",
signature(x = "SparkDataFrame"),
function(x, numPartitions) {
stopifnot(is.numeric(numPartitions))
sdf <- callJMethod(x@sdf, "coalesce", numToInt(numPartitions))
dataFrame(sdf)
})

#' Repartition
#'
#' The following options for repartition are possible:
#' \itemize{
#' \item{1.} {Return a new SparkDataFrame partitioned by
#' \item{1.} {Return a new SparkDataFrame that has exactly \code{numPartitions}.}
#' \item{2.} {Return a new SparkDataFrame hash partitioned by
#' the given columns into \code{numPartitions}.}
#' \item{2.} {Return a new SparkDataFrame that has exactly \code{numPartitions}.}
#' \item{3.} {Return a new SparkDataFrame partitioned by the given column(s),
#' \item{3.} {Return a new SparkDataFrame hash partitioned by the given column(s),
#' using \code{spark.sql.shuffle.partitions} as number of partitions.}
#'}
#' @param x a SparkDataFrame.
Expand All @@ -699,6 +736,7 @@ setMethod("storageLevel",
#' @rdname repartition
#' @name repartition
#' @aliases repartition,SparkDataFrame-method
#' @seealso \link{coalesce}
#' @export
#' @examples
#'\dontrun{
Expand Down Expand Up @@ -1138,6 +1176,7 @@ setMethod("collect",
if (!is.null(PRIMITIVE_TYPES[[colType]]) && colType != "binary") {
vec <- do.call(c, col)
stopifnot(class(vec) != "list")
class(vec) <- PRIMITIVE_TYPES[[colType]]
df[[colIndex]] <- vec
} else {
df[[colIndex]] <- col
Expand Down Expand Up @@ -1765,6 +1804,10 @@ setClassUnion("numericOrcharacter", c("numeric", "character"))
#' @note [[ since 1.4.0
setMethod("[[", signature(x = "SparkDataFrame", i = "numericOrcharacter"),
function(x, i) {
if (length(i) > 1) {
warning("Subset index has length > 1. Only the first index is used.")
i <- i[1]
}
if (is.numeric(i)) {
cols <- columns(x)
i <- cols[[i]]
Expand All @@ -1778,6 +1821,10 @@ setMethod("[[", signature(x = "SparkDataFrame", i = "numericOrcharacter"),
#' @note [[<- since 2.1.1
setMethod("[[<-", signature(x = "SparkDataFrame", i = "numericOrcharacter"),
function(x, i, value) {
if (length(i) > 1) {
warning("Subset index has length > 1. Only the first index is used.")
i <- i[1]
}
if (is.numeric(i)) {
cols <- columns(x)
i <- cols[[i]]
Expand Down Expand Up @@ -1831,6 +1878,8 @@ setMethod("[", signature(x = "SparkDataFrame"),
#' Return subsets of SparkDataFrame according to given conditions
#' @param x a SparkDataFrame.
#' @param i,subset (Optional) a logical expression to filter on rows.
#' For extract operator [[ and replacement operator [[<-, the indexing parameter for
#' a single Column.
#' @param j,select expression for the single Column or a list of columns to select from the SparkDataFrame.
#' @param drop if TRUE, a Column will be returned if the resulting dataset has only one column.
#' Otherwise, a SparkDataFrame will always be returned.
Expand All @@ -1841,6 +1890,7 @@ setMethod("[", signature(x = "SparkDataFrame"),
#' @export
#' @family SparkDataFrame functions
#' @aliases subset,SparkDataFrame-method
#' @seealso \link{withColumn}
#' @rdname subset
#' @name subset
#' @family subsetting functions
Expand All @@ -1858,6 +1908,10 @@ setMethod("[", signature(x = "SparkDataFrame"),
#' subset(df, df$age %in% c(19, 30), 1:2)
#' subset(df, df$age %in% c(19), select = c(1,2))
#' subset(df, select = c(1,2))
#' # Columns can be selected and set
#' df[["age"]] <- 23
#' df[[1]] <- df$age
#' df[[2]] <- NULL # drop column
#' }
#' @note subset since 1.5.0
setMethod("subset", signature(x = "SparkDataFrame"),
Expand Down Expand Up @@ -1982,7 +2036,7 @@ setMethod("selectExpr",
#' @aliases withColumn,SparkDataFrame,character-method
#' @rdname withColumn
#' @name withColumn
#' @seealso \link{rename} \link{mutate}
#' @seealso \link{rename} \link{mutate} \link{subset}
#' @export
#' @examples
#'\dontrun{
Expand All @@ -1993,6 +2047,10 @@ setMethod("selectExpr",
#' # Replace an existing column
#' newDF2 <- withColumn(newDF, "newCol", newDF$col1)
#' newDF3 <- withColumn(newDF, "newCol", 42)
#' # Use extract operator to set an existing or new column
#' df[["age"]] <- 23
#' df[[2]] <- df$col1
#' df[[2]] <- NULL # drop column
#' }
#' @note withColumn since 1.4.0
setMethod("withColumn",
Expand Down Expand Up @@ -3428,3 +3486,26 @@ setMethod("randomSplit",
}
sapply(sdfs, dataFrame)
})

#' getNumPartitions
#'
#' Return the number of partitions
#'
#' @param x A SparkDataFrame
#' @family SparkDataFrame functions
#' @aliases getNumPartitions,SparkDataFrame-method
#' @rdname getNumPartitions
#' @name getNumPartitions
#' @export
#' @examples
#'\dontrun{
#' sparkR.session()
#' df <- createDataFrame(cars, numPartitions = 2)
#' getNumPartitions(df)
#' }
#' @note getNumPartitions since 2.1.1
setMethod("getNumPartitions",
signature(x = "SparkDataFrame"),
function(x) {
callJMethod(callJMethod(x@sdf, "rdd"), "getNumPartitions")
})
Loading

0 comments on commit 704fc2c

Please sign in to comment.