Skip to content

Commit

Permalink
[SPARK-6852][SPARKR] Accept numeric as numPartitions in SparkR.
Browse files Browse the repository at this point in the history
  • Loading branch information
Sun Rui committed Apr 21, 2015
1 parent 8136810 commit 29d67c1
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 13 deletions.
2 changes: 1 addition & 1 deletion R/pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -967,7 +967,7 @@ setMethod("keyBy",
setMethod("repartition",
signature(x = "RDD", numPartitions = "numeric"),
function(x, numPartitions) {
coalesce(x, numToInt(numPartitions), TRUE)
coalesce(x, numPartitions, TRUE)
})

#' Return a new RDD that is reduced into numPartitions partitions.
Expand Down
24 changes: 12 additions & 12 deletions R/pkg/R/pairRDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ setMethod("flatMapValues",
#' @rdname partitionBy
#' @aliases partitionBy,RDD,integer-method
setMethod("partitionBy",
signature(x = "RDD", numPartitions = "integer"),
signature(x = "RDD", numPartitions = "numeric"),
function(x, numPartitions, partitionFunc = hashCode) {

#if (missing(partitionFunc)) {
Expand All @@ -211,7 +211,7 @@ setMethod("partitionBy",
# the content (key-val pairs).
pairwiseRRDD <- newJObject("org.apache.spark.api.r.PairwiseRRDD",
callJMethod(jrdd, "rdd"),
as.integer(numPartitions),
numToInt(numPartitions),
serializedHashFuncBytes,
getSerializedMode(x),
packageNamesArr,
Expand All @@ -221,7 +221,7 @@ setMethod("partitionBy",

# Create a corresponding partitioner.
rPartitioner <- newJObject("org.apache.spark.HashPartitioner",
as.integer(numPartitions))
numToInt(numPartitions))

# Call partitionBy on the obtained PairwiseRDD.
javaPairRDD <- callJMethod(pairwiseRRDD, "asJavaPairRDD")
Expand Down Expand Up @@ -256,7 +256,7 @@ setMethod("partitionBy",
#' @rdname groupByKey
#' @aliases groupByKey,RDD,integer-method
setMethod("groupByKey",
signature(x = "RDD", numPartitions = "integer"),
signature(x = "RDD", numPartitions = "numeric"),
function(x, numPartitions) {
shuffled <- partitionBy(x, numPartitions)
groupVals <- function(part) {
Expand Down Expand Up @@ -315,7 +315,7 @@ setMethod("groupByKey",
#' @rdname reduceByKey
#' @aliases reduceByKey,RDD,integer-method
setMethod("reduceByKey",
signature(x = "RDD", combineFunc = "ANY", numPartitions = "integer"),
signature(x = "RDD", combineFunc = "ANY", numPartitions = "numeric"),
function(x, combineFunc, numPartitions) {
reduceVals <- function(part) {
vals <- new.env()
Expand Down Expand Up @@ -422,7 +422,7 @@ setMethod("reduceByKeyLocally",
#' @aliases combineByKey,RDD,ANY,ANY,ANY,integer-method
setMethod("combineByKey",
signature(x = "RDD", createCombiner = "ANY", mergeValue = "ANY",
mergeCombiners = "ANY", numPartitions = "integer"),
mergeCombiners = "ANY", numPartitions = "numeric"),
function(x, createCombiner, mergeValue, mergeCombiners, numPartitions) {
combineLocally <- function(part) {
combiners <- new.env()
Expand Down Expand Up @@ -483,7 +483,7 @@ setMethod("combineByKey",
#' @aliases aggregateByKey,RDD,ANY,ANY,ANY,integer-method
setMethod("aggregateByKey",
signature(x = "RDD", zeroValue = "ANY", seqOp = "ANY",
combOp = "ANY", numPartitions = "integer"),
combOp = "ANY", numPartitions = "numeric"),
function(x, zeroValue, seqOp, combOp, numPartitions) {
createCombiner <- function(v) {
do.call(seqOp, list(zeroValue, v))
Expand Down Expand Up @@ -514,7 +514,7 @@ setMethod("aggregateByKey",
#' @aliases foldByKey,RDD,ANY,ANY,integer-method
setMethod("foldByKey",
signature(x = "RDD", zeroValue = "ANY",
func = "ANY", numPartitions = "integer"),
func = "ANY", numPartitions = "numeric"),
function(x, zeroValue, func, numPartitions) {
aggregateByKey(x, zeroValue, func, func, numPartitions)
})
Expand Down Expand Up @@ -553,7 +553,7 @@ setMethod("join",
joinTaggedList(v, list(FALSE, FALSE))
}

joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numToInt(numPartitions)),
joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numPartitions),
doJoin)
})

Expand Down Expand Up @@ -582,7 +582,7 @@ setMethod("join",
#' @rdname join-methods
#' @aliases leftOuterJoin,RDD,RDD-method
setMethod("leftOuterJoin",
signature(x = "RDD", y = "RDD", numPartitions = "integer"),
signature(x = "RDD", y = "RDD", numPartitions = "numeric"),
function(x, y, numPartitions) {
xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) })
yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) })
Expand Down Expand Up @@ -619,7 +619,7 @@ setMethod("leftOuterJoin",
#' @rdname join-methods
#' @aliases rightOuterJoin,RDD,RDD-method
setMethod("rightOuterJoin",
signature(x = "RDD", y = "RDD", numPartitions = "integer"),
signature(x = "RDD", y = "RDD", numPartitions = "numeric"),
function(x, y, numPartitions) {
xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) })
yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) })
Expand Down Expand Up @@ -659,7 +659,7 @@ setMethod("rightOuterJoin",
#' @rdname join-methods
#' @aliases fullOuterJoin,RDD,RDD-method
setMethod("fullOuterJoin",
signature(x = "RDD", y = "RDD", numPartitions = "integer"),
signature(x = "RDD", y = "RDD", numPartitions = "numeric"),
function(x, y, numPartitions) {
xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) })
yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) })
Expand Down

0 comments on commit 29d67c1

Please sign in to comment.