Skip to content

Commit

Permalink
[SPARK-23935][SQL] Resolving conflicts.
Browse files Browse the repository at this point in the history
  • Loading branch information
mn-mikke committed May 17, 2018
2 parents 1bd0d5e + 3e66350 commit baa61e5
Show file tree
Hide file tree
Showing 215 changed files with 3,707 additions and 1,229 deletions.
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
(New BSD license) Protocol Buffer Java API (org.spark-project.protobuf:protobuf-java:2.4.1-shaded - http://code.google.com/p/protobuf)
(The BSD License) Fortran to Java ARPACK (net.sourceforge.f2j:arpack_combined_all:0.1 - http://f2j.sourceforge.net)
(The BSD License) xmlenc Library (xmlenc:xmlenc:0.52 - http://xmlenc.sourceforge.net)
(The New BSD License) Py4J (net.sf.py4j:py4j:0.10.6 - http://py4j.sourceforge.net/)
(The New BSD License) Py4J (net.sf.py4j:py4j:0.10.7 - http://py4j.sourceforge.net/)
(Two-clause BSD-style license) JUnit-Interface (com.novocode:junit-interface:0.10 - http://github.com/szeiger/junit-interface/)
(BSD licence) sbt and sbt-launch-lib.bash
(BSD 3 Clause) d3.min.js (https://github.com/mbostock/d3/blob/master/LICENSE)
Expand Down
1 change: 1 addition & 0 deletions R/pkg/DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"),
License: Apache License (== 2.0)
URL: http://www.apache.org/ http://spark.apache.org/
BugReports: http://spark.apache.org/contributing.html
SystemRequirements: Java (== 8)
Depends:
R (>= 3.0),
methods
Expand Down
2 changes: 2 additions & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ exportMethods("%<=>%",
"array_max",
"array_min",
"array_position",
"array_sort",
"asc",
"ascii",
"asin",
Expand Down Expand Up @@ -351,6 +352,7 @@ exportMethods("%<=>%",
"sinh",
"size",
"skewness",
"slice",
"sort_array",
"soundex",
"spark_partition_id",
Expand Down
40 changes: 38 additions & 2 deletions R/pkg/R/client.R
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

# Creates a SparkR client connection object
# if one doesn't already exist
connectBackend <- function(hostname, port, timeout) {
connectBackend <- function(hostname, port, timeout, authSecret) {
if (exists(".sparkRcon", envir = .sparkREnv)) {
if (isOpen(.sparkREnv[[".sparkRCon"]])) {
cat("SparkRBackend client connection already exists\n")
Expand All @@ -29,7 +29,7 @@ connectBackend <- function(hostname, port, timeout) {

con <- socketConnection(host = hostname, port = port, server = FALSE,
blocking = TRUE, open = "wb", timeout = timeout)

doServerAuth(con, authSecret)
assign(".sparkRCon", con, envir = .sparkREnv)
con
}
Expand Down Expand Up @@ -60,13 +60,49 @@ generateSparkSubmitArgs <- function(args, sparkHome, jars, sparkSubmitOpts, pack
combinedArgs
}

checkJavaVersion <- function() {
javaBin <- "java"
javaHome <- Sys.getenv("JAVA_HOME")
javaReqs <- utils::packageDescription(utils::packageName(), fields = c("SystemRequirements"))
sparkJavaVersion <- as.numeric(tail(strsplit(javaReqs, "[(=)]")[[1]], n = 1L))
if (javaHome != "") {
javaBin <- file.path(javaHome, "bin", javaBin)
}

# If java is missing from PATH, we get an error in Unix and a warning in Windows
javaVersionOut <- tryCatch(
launchScript(javaBin, "-version", wait = TRUE, stdout = TRUE, stderr = TRUE),
error = function(e) {
stop("Java version check failed. Please make sure Java is installed",
" and set JAVA_HOME to point to the installation directory.", e)
},
warning = function(w) {
stop("Java version check failed. Please make sure Java is installed",
" and set JAVA_HOME to point to the installation directory.", w)
})
javaVersionFilter <- Filter(
function(x) {
grepl(" version", x)
}, javaVersionOut)

javaVersionStr <- strsplit(javaVersionFilter[[1]], "[\"]")[[1L]][2]
# javaVersionStr is of the form 1.8.0_92.
# Extract 8 from it to compare to sparkJavaVersion
javaVersionNum <- as.integer(strsplit(javaVersionStr, "[.]")[[1L]][2])
if (javaVersionNum != sparkJavaVersion) {
stop(paste("Java version", sparkJavaVersion, "is required for this package; found version:",
javaVersionStr))
}
}

launchBackend <- function(args, sparkHome, jars, sparkSubmitOpts, packages) {
sparkSubmitBinName <- determineSparkSubmitBin()
if (sparkHome != "") {
sparkSubmitBin <- file.path(sparkHome, "bin", sparkSubmitBinName)
} else {
sparkSubmitBin <- sparkSubmitBinName
}

combinedArgs <- generateSparkSubmitArgs(args, sparkHome, jars, sparkSubmitOpts, packages)
cat("Launching java with spark-submit command", sparkSubmitBin, combinedArgs, "\n")
invisible(launchScript(sparkSubmitBin, combinedArgs))
Expand Down
10 changes: 7 additions & 3 deletions R/pkg/R/deserialize.R
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,18 @@ readTypedObject <- function(con, type) {
stop(paste("Unsupported type for deserialization", type)))
}

readString <- function(con) {
stringLen <- readInt(con)
raw <- readBin(con, raw(), stringLen, endian = "big")
readStringData <- function(con, len) {
raw <- readBin(con, raw(), len, endian = "big")
string <- rawToChar(raw)
Encoding(string) <- "UTF-8"
string
}

readString <- function(con) {
stringLen <- readInt(con)
readStringData(con, stringLen)
}

readInt <- function(con) {
readBin(con, integer(), n = 1, endian = "big")
}
Expand Down
90 changes: 69 additions & 21 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -207,17 +207,21 @@ NULL
#' tmp <- mutate(df, v1 = create_array(df$mpg, df$cyl, df$hp))
#' head(select(tmp, array_contains(tmp$v1, 21), size(tmp$v1)))
#' head(select(tmp, array_max(tmp$v1), array_min(tmp$v1)))
#' head(select(tmp, array_position(tmp$v1, 21)))
#' head(select(tmp, flatten(tmp$v1)))
#' head(select(tmp, array_position(tmp$v1, 21), array_sort(tmp$v1)))
#' head(select(tmp, flatten(tmp$v1), reverse(tmp$v1)))
#' tmp2 <- mutate(tmp, v2 = explode(tmp$v1))
#' head(tmp2)
#' head(select(tmp, posexplode(tmp$v1)))
#' head(select(tmp, slice(tmp$v1, 2L, 2L)))
#' head(select(tmp, sort_array(tmp$v1)))
#' head(select(tmp, sort_array(tmp$v1, asc = FALSE)))
#' tmp3 <- mutate(df, v3 = create_map(df$model, df$cyl))
#' head(select(tmp3, map_keys(tmp3$v3)))
#' head(select(tmp3, map_values(tmp3$v3)))
#' head(select(tmp3, element_at(tmp3$v3, "Valiant")))}
#' head(select(tmp3, element_at(tmp3$v3, "Valiant")))
#' tmp4 <- mutate(df, v4 = create_array(df$mpg, df$cyl), v5 = create_array(df$hp))
#' head(select(tmp4, concat(tmp4$v4, tmp4$v5)))
#' head(select(tmp, concat(df$mpg, df$cyl, df$hp)))}
NULL

#' Window functions for Column operations
Expand Down Expand Up @@ -805,6 +809,8 @@ setMethod("factorial",
#'
#' The function by default returns the first values it sees. It will return the first non-missing
#' value it sees when na.rm is set to true. If all values are missing, then NA is returned.
#' Note: the function is non-deterministic because its results depends on order of rows which
#' may be non-deterministic after a shuffle.
#'
#' @param na.rm a logical value indicating whether NA values should be stripped
#' before the computation proceeds.
Expand Down Expand Up @@ -948,6 +954,8 @@ setMethod("kurtosis",
#'
#' The function by default returns the last values it sees. It will return the last non-missing
#' value it sees when na.rm is set to true. If all values are missing, then NA is returned.
#' Note: the function is non-deterministic because its results depends on order of rows which
#' may be non-deterministic after a shuffle.
#'
#' @param x column to compute on.
#' @param na.rm a logical value indicating whether NA values should be stripped
Expand Down Expand Up @@ -1201,6 +1209,7 @@ setMethod("minute",
#' 0, 1, 2, 8589934592 (1L << 33), 8589934593, 8589934594.
#' This is equivalent to the MONOTONICALLY_INCREASING_ID function in SQL.
#' The method should be used with no argument.
#' Note: the function is non-deterministic because its result depends on partition IDs.
#'
#' @rdname column_nonaggregate_functions
#' @aliases monotonically_increasing_id monotonically_increasing_id,missing-method
Expand Down Expand Up @@ -1254,9 +1263,9 @@ setMethod("quarter",
})

#' @details
#' \code{reverse}: Reverses the string column and returns it as a new string column.
#' \code{reverse}: Returns a reversed string or an array with reverse order of elements.
#'
#' @rdname column_string_functions
#' @rdname column_collection_functions
#' @aliases reverse reverse,Column-method
#' @note reverse since 1.5.0
setMethod("reverse",
Expand Down Expand Up @@ -1907,6 +1916,7 @@ setMethod("atan2", signature(y = "Column"),

#' @details
#' \code{datediff}: Returns the number of days from \code{y} to \code{x}.
#' If \code{y} is later than \code{x} then the result is positive.
#'
#' @rdname column_datetime_diff_functions
#' @aliases datediff datediff,Column-method
Expand Down Expand Up @@ -1966,7 +1976,10 @@ setMethod("levenshtein", signature(y = "Column"),
})

#' @details
#' \code{months_between}: Returns number of months between dates \code{y} and \code{x}.
#' \code{months_between}: Returns number of months between dates \code{y} and \code{x}.
#' If \code{y} is later than \code{x}, then the result is positive. If \code{y} and \code{x}
#' are on the same day of month, or both are the last day of month, time of day will be ignored.
#' Otherwise, the difference is calculated based on 31 days per month, and rounded to 8 digits.
#'
#' @rdname column_datetime_diff_functions
#' @aliases months_between months_between,Column-method
Expand Down Expand Up @@ -2045,20 +2058,10 @@ setMethod("countDistinct",

#' @details
#' \code{concat}: Concatenates multiple input columns together into a single column.
#' If all inputs are binary, concat returns an output as binary. Otherwise, it returns as string.
#' The function works with strings, binary and compatible array columns.
#'
#' @rdname column_string_functions
#' @rdname column_collection_functions
#' @aliases concat concat,Column-method
#' @examples
#'
#' \dontrun{
#' # concatenate strings
#' tmp <- mutate(df, s1 = concat(df$Class, df$Sex),
#' s2 = concat(df$Class, df$Sex, df$Age),
#' s3 = concat(df$Class, df$Sex, df$Age, df$Class),
#' s4 = concat_ws("_", df$Class, df$Sex),
#' s5 = concat_ws("+", df$Class, df$Sex, df$Age, df$Survived))
#' head(tmp)}
#' @note concat since 1.5.0
setMethod("concat",
signature(x = "Column"),
Expand Down Expand Up @@ -2399,6 +2402,13 @@ setMethod("shiftRightUnsigned", signature(y = "Column", x = "numeric"),
#' @param sep separator to use.
#' @rdname column_string_functions
#' @aliases concat_ws concat_ws,character,Column-method
#' @examples
#'
#' \dontrun{
#' # concatenate strings
#' tmp <- mutate(df, s1 = concat_ws("_", df$Class, df$Sex),
#' s2 = concat_ws("+", df$Class, df$Sex, df$Age, df$Survived))
#' head(tmp)}
#' @note concat_ws since 1.5.0
setMethod("concat_ws", signature(sep = "character", x = "Column"),
function(sep, x, ...) {
Expand Down Expand Up @@ -2584,6 +2594,7 @@ setMethod("lpad", signature(x = "Column", len = "numeric", pad = "character"),
#' @details
#' \code{rand}: Generates a random column with independent and identically distributed (i.i.d.)
#' samples from U[0.0, 1.0].
#' Note: the function is non-deterministic in general case.
#'
#' @rdname column_nonaggregate_functions
#' @param seed a random seed. Can be missing.
Expand Down Expand Up @@ -2612,6 +2623,7 @@ setMethod("rand", signature(seed = "numeric"),
#' @details
#' \code{randn}: Generates a column with independent and identically distributed (i.i.d.) samples
#' from the standard normal distribution.
#' Note: the function is non-deterministic in general case.
#'
#' @rdname column_nonaggregate_functions
#' @aliases randn randn,missing-method
Expand Down Expand Up @@ -3037,7 +3049,22 @@ setMethod("array_position",
})

#' @details
#' \code{flatten}: Transforms an array of arrays into a single array.
#' \code{array_sort}: Sorts the input array in ascending order. The elements of the input array
#' must be orderable. NA elements will be placed at the end of the returned array.
#'
#' @rdname column_collection_functions
#' @aliases array_sort array_sort,Column-method
#' @note array_sort since 2.4.0
setMethod("array_sort",
signature(x = "Column"),
function(x) {
jc <- callJStatic("org.apache.spark.sql.functions", "array_sort", x@jc)
column(jc)
})

#' @details
#' \code{flatten}: Creates a single array from an array of arrays.
#' If a structure of nested arrays is deeper than two levels, only one level of nesting is removed.
#'
#' @rdname column_collection_functions
#' @aliases flatten flatten,Column-method
Expand Down Expand Up @@ -3118,8 +3145,25 @@ setMethod("size",
})

#' @details
#' \code{sort_array}: Sorts the input array in ascending or descending order according
#' to the natural ordering of the array elements.
#' \code{slice}: Returns an array containing all the elements in x from the index start
#' (or starting from the end if start is negative) with the specified length.
#'
#' @rdname column_collection_functions
#' @param start an index indicating the first element occuring in the result.
#' @param length a number of consecutive elements choosen to the result.
#' @aliases slice slice,Column-method
#' @note slice since 2.4.0
setMethod("slice",
signature(x = "Column"),
function(x, start, length) {
jc <- callJStatic("org.apache.spark.sql.functions", "slice", x@jc, start, length)
column(jc)
})

#' @details
#' \code{sort_array}: Sorts the input array in ascending or descending order according to
#' the natural ordering of the array elements. NA elements will be placed at the beginning of
#' the returned array in ascending order or at the end of the returned array in descending order.
#'
#' @rdname column_collection_functions
#' @param asc a logical flag indicating the sorting order.
Expand Down Expand Up @@ -3188,6 +3232,8 @@ setMethod("create_map",

#' @details
#' \code{collect_list}: Creates a list of objects with duplicates.
#' Note: the function is non-deterministic because the order of collected results depends
#' on order of rows which may be non-deterministic after a shuffle.
#'
#' @rdname column_aggregate_functions
#' @aliases collect_list collect_list,Column-method
Expand All @@ -3207,6 +3253,8 @@ setMethod("collect_list",

#' @details
#' \code{collect_set}: Creates a list of objects with duplicate elements eliminated.
#' Note: the function is non-deterministic because the order of collected results depends
#' on order of rows which may be non-deterministic after a shuffle.
#'
#' @rdname column_aggregate_functions
#' @aliases collect_set collect_set,Column-method
Expand Down
14 changes: 11 additions & 3 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ setGeneric("summarize", function(x, ...) { standardGeneric("summarize") })
#' @rdname summary
setGeneric("summary", function(object, ...) { standardGeneric("summary") })

setGeneric("toJSON", function(x) { standardGeneric("toJSON") })
setGeneric("toJSON", function(x, ...) { standardGeneric("toJSON") })

setGeneric("toRDD", function(x) { standardGeneric("toRDD") })

Expand Down Expand Up @@ -769,6 +769,10 @@ setGeneric("array_min", function(x) { standardGeneric("array_min") })
#' @name NULL
setGeneric("array_position", function(x, value) { standardGeneric("array_position") })

#' @rdname column_collection_functions
#' @name NULL
setGeneric("array_sort", function(x) { standardGeneric("array_sort") })

#' @rdname column_string_functions
#' @name NULL
setGeneric("ascii", function(x) { standardGeneric("ascii") })
Expand Down Expand Up @@ -813,7 +817,7 @@ setGeneric("collect_set", function(x) { standardGeneric("collect_set") })
#' @rdname column
setGeneric("column", function(x) { standardGeneric("column") })

#' @rdname column_string_functions
#' @rdname column_collection_functions
#' @name NULL
setGeneric("concat", function(x, ...) { standardGeneric("concat") })

Expand Down Expand Up @@ -1130,7 +1134,7 @@ setGeneric("regexp_replace",
#' @name NULL
setGeneric("repeat_string", function(x, n) { standardGeneric("repeat_string") })

#' @rdname column_string_functions
#' @rdname column_collection_functions
#' @name NULL
setGeneric("reverse", function(x) { standardGeneric("reverse") })

Expand Down Expand Up @@ -1190,6 +1194,10 @@ setGeneric("size", function(x) { standardGeneric("size") })
#' @name NULL
setGeneric("skewness", function(x) { standardGeneric("skewness") })

#' @rdname column_collection_functions
#' @name NULL
setGeneric("slice", function(x, start, length) { standardGeneric("slice") })

#' @rdname column_collection_functions
#' @name NULL
setGeneric("sort_array", function(x, asc = TRUE) { standardGeneric("sort_array") })
Expand Down
Loading

0 comments on commit baa61e5

Please sign in to comment.