Skip to content

Commit

Permalink
[SPARK-6852] [SPARKR] Accept numeric as numPartitions in SparkR.
Browse files Browse the repository at this point in the history
Author: Sun Rui <[email protected]>

Closes apache#5613 from sun-rui/SPARK-6852 and squashes the following commits:

abaf02e [Sun Rui] Change the type of default numPartitions from integer to numeric in generics.R.
29d67c1 [Sun Rui] [SPARK-6852][SPARKR] Accept numeric as numPartitions in SparkR.
  • Loading branch information
Sun Rui authored and liuchuanqi committed May 13, 2015
1 parent 9fe48b3 commit 66871b4
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 19 deletions.
2 changes: 1 addition & 1 deletion R/pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -967,7 +967,7 @@ setMethod("keyBy",
setMethod("repartition",
signature(x = "RDD", numPartitions = "numeric"),
function(x, numPartitions) {
coalesce(x, numToInt(numPartitions), TRUE)
coalesce(x, numPartitions, TRUE)
})

#' Return a new RDD that is reduced into numPartitions partitions.
Expand Down
12 changes: 6 additions & 6 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ setGeneric("countByValue", function(x) { standardGeneric("countByValue") })

#' @rdname distinct
#' @export
setGeneric("distinct", function(x, numPartitions = 1L) { standardGeneric("distinct") })
setGeneric("distinct", function(x, numPartitions = 1) { standardGeneric("distinct") })

#' @rdname filterRDD
#' @export
Expand Down Expand Up @@ -182,7 +182,7 @@ setGeneric("setName", function(x, name) { standardGeneric("setName") })
#' @rdname sortBy
#' @export
setGeneric("sortBy",
function(x, func, ascending = TRUE, numPartitions = 1L) {
function(x, func, ascending = TRUE, numPartitions = 1) {
standardGeneric("sortBy")
})

Expand Down Expand Up @@ -244,7 +244,7 @@ setGeneric("flatMapValues", function(X, FUN) { standardGeneric("flatMapValues")

#' @rdname intersection
#' @export
setGeneric("intersection", function(x, other, numPartitions = 1L) {
setGeneric("intersection", function(x, other, numPartitions = 1) {
standardGeneric("intersection") })

#' @rdname keys
Expand Down Expand Up @@ -346,21 +346,21 @@ setGeneric("rightOuterJoin", function(x, y, numPartitions) { standardGeneric("ri
#' @rdname sortByKey
#' @export
setGeneric("sortByKey",
function(x, ascending = TRUE, numPartitions = 1L) {
function(x, ascending = TRUE, numPartitions = 1) {
standardGeneric("sortByKey")
})

#' @rdname subtract
#' @export
setGeneric("subtract",
function(x, other, numPartitions = 1L) {
function(x, other, numPartitions = 1) {
standardGeneric("subtract")
})

#' @rdname subtractByKey
#' @export
setGeneric("subtractByKey",
function(x, other, numPartitions = 1L) {
function(x, other, numPartitions = 1) {
standardGeneric("subtractByKey")
})

Expand Down
24 changes: 12 additions & 12 deletions R/pkg/R/pairRDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ setMethod("flatMapValues",
#' @rdname partitionBy
#' @aliases partitionBy,RDD,integer-method
setMethod("partitionBy",
signature(x = "RDD", numPartitions = "integer"),
signature(x = "RDD", numPartitions = "numeric"),
function(x, numPartitions, partitionFunc = hashCode) {

#if (missing(partitionFunc)) {
Expand All @@ -211,7 +211,7 @@ setMethod("partitionBy",
# the content (key-val pairs).
pairwiseRRDD <- newJObject("org.apache.spark.api.r.PairwiseRRDD",
callJMethod(jrdd, "rdd"),
as.integer(numPartitions),
numToInt(numPartitions),
serializedHashFuncBytes,
getSerializedMode(x),
packageNamesArr,
Expand All @@ -221,7 +221,7 @@ setMethod("partitionBy",

# Create a corresponding partitioner.
rPartitioner <- newJObject("org.apache.spark.HashPartitioner",
as.integer(numPartitions))
numToInt(numPartitions))

# Call partitionBy on the obtained PairwiseRDD.
javaPairRDD <- callJMethod(pairwiseRRDD, "asJavaPairRDD")
Expand Down Expand Up @@ -256,7 +256,7 @@ setMethod("partitionBy",
#' @rdname groupByKey
#' @aliases groupByKey,RDD,integer-method
setMethod("groupByKey",
signature(x = "RDD", numPartitions = "integer"),
signature(x = "RDD", numPartitions = "numeric"),
function(x, numPartitions) {
shuffled <- partitionBy(x, numPartitions)
groupVals <- function(part) {
Expand Down Expand Up @@ -315,7 +315,7 @@ setMethod("groupByKey",
#' @rdname reduceByKey
#' @aliases reduceByKey,RDD,integer-method
setMethod("reduceByKey",
signature(x = "RDD", combineFunc = "ANY", numPartitions = "integer"),
signature(x = "RDD", combineFunc = "ANY", numPartitions = "numeric"),
function(x, combineFunc, numPartitions) {
reduceVals <- function(part) {
vals <- new.env()
Expand Down Expand Up @@ -422,7 +422,7 @@ setMethod("reduceByKeyLocally",
#' @aliases combineByKey,RDD,ANY,ANY,ANY,integer-method
setMethod("combineByKey",
signature(x = "RDD", createCombiner = "ANY", mergeValue = "ANY",
mergeCombiners = "ANY", numPartitions = "integer"),
mergeCombiners = "ANY", numPartitions = "numeric"),
function(x, createCombiner, mergeValue, mergeCombiners, numPartitions) {
combineLocally <- function(part) {
combiners <- new.env()
Expand Down Expand Up @@ -483,7 +483,7 @@ setMethod("combineByKey",
#' @aliases aggregateByKey,RDD,ANY,ANY,ANY,integer-method
setMethod("aggregateByKey",
signature(x = "RDD", zeroValue = "ANY", seqOp = "ANY",
combOp = "ANY", numPartitions = "integer"),
combOp = "ANY", numPartitions = "numeric"),
function(x, zeroValue, seqOp, combOp, numPartitions) {
createCombiner <- function(v) {
do.call(seqOp, list(zeroValue, v))
Expand Down Expand Up @@ -514,7 +514,7 @@ setMethod("aggregateByKey",
#' @aliases foldByKey,RDD,ANY,ANY,integer-method
setMethod("foldByKey",
signature(x = "RDD", zeroValue = "ANY",
func = "ANY", numPartitions = "integer"),
func = "ANY", numPartitions = "numeric"),
function(x, zeroValue, func, numPartitions) {
aggregateByKey(x, zeroValue, func, func, numPartitions)
})
Expand Down Expand Up @@ -553,7 +553,7 @@ setMethod("join",
joinTaggedList(v, list(FALSE, FALSE))
}

joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numToInt(numPartitions)),
joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numPartitions),
doJoin)
})

Expand Down Expand Up @@ -582,7 +582,7 @@ setMethod("join",
#' @rdname join-methods
#' @aliases leftOuterJoin,RDD,RDD-method
setMethod("leftOuterJoin",
signature(x = "RDD", y = "RDD", numPartitions = "integer"),
signature(x = "RDD", y = "RDD", numPartitions = "numeric"),
function(x, y, numPartitions) {
xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) })
yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) })
Expand Down Expand Up @@ -619,7 +619,7 @@ setMethod("leftOuterJoin",
#' @rdname join-methods
#' @aliases rightOuterJoin,RDD,RDD-method
setMethod("rightOuterJoin",
signature(x = "RDD", y = "RDD", numPartitions = "integer"),
signature(x = "RDD", y = "RDD", numPartitions = "numeric"),
function(x, y, numPartitions) {
xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) })
yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) })
Expand Down Expand Up @@ -659,7 +659,7 @@ setMethod("rightOuterJoin",
#' @rdname join-methods
#' @aliases fullOuterJoin,RDD,RDD-method
setMethod("fullOuterJoin",
signature(x = "RDD", y = "RDD", numPartitions = "integer"),
signature(x = "RDD", y = "RDD", numPartitions = "numeric"),
function(x, y, numPartitions) {
xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) })
yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) })
Expand Down

0 comments on commit 66871b4

Please sign in to comment.