Skip to content

Commit

Permalink
fix style
Browse files Browse the repository at this point in the history
  • Loading branch information
Davies Liu committed Apr 14, 2015
1 parent 40199eb commit 9387402
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 41 deletions.
2 changes: 1 addition & 1 deletion R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ exportMethods(
"reduceByKeyLocally",
"repartition",
"rightOuterJoin",
"sampleRDD",
"sampleByKey",
"sampleRDD",
"saveAsTextFile",
"saveAsObjectFile",
"sortBy",
Expand Down
25 changes: 12 additions & 13 deletions R/pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -1496,45 +1496,45 @@ setMethod("zipRDD",
stop("Can only zip RDDs which have the same number of partitions.")
}

if (getSerializedMode(x) != getSerializedMode(other) ||
if (getSerializedMode(x) != getSerializedMode(other) ||
getSerializedMode(x) == "byte") {
# Append the number of elements in each partition to that partition so that we can later
# check if corresponding partitions of both RDDs have the same number of elements.
#
# Note that this appending also serves the purpose of reserialization, because even if
# Note that this appending also serves the purpose of reserialization, because even if
# any RDD is serialized, we need to reserialize it to make sure its partitions are encoded
# as a single byte array. For example, partitions of an RDD generated from partitionBy()
# may be encoded as multiple byte arrays.
# may be encoded as multiple byte arrays.
appendLength <- function(part) {
part[[length(part) + 1]] <- length(part) + 1
part
}
x <- lapplyPartition(x, appendLength)
other <- lapplyPartition(other, appendLength)
}

zippedJRDD <- callJMethod(getJRDD(x), "zip", getJRDD(other))
# The zippedRDD's elements are of scala Tuple2 type. The serialized
# flag Here is used for the elements inside the tuples.
serializerMode <- getSerializedMode(x)
zippedRDD <- RDD(zippedJRDD, serializerMode)

partitionFunc <- function(split, part) {
len <- length(part)
if (len > 0) {
if (serializerMode == "byte") {
lengthOfValues <- part[[len]]
lengthOfKeys <- part[[len - lengthOfValues]]
stopifnot(len == lengthOfKeys + lengthOfValues)

# check if corresponding partitions of both RDDs have the same number of elements.
if (lengthOfKeys != lengthOfValues) {
stop("Can only zip RDDs with same number of elements in each pair of corresponding partitions.")
}

if (lengthOfKeys > 1) {
keys <- part[1 : (lengthOfKeys - 1)]
values <- part[(lengthOfKeys + 1) : (len - 1)]
values <- part[(lengthOfKeys + 1) : (len - 1)]
} else {
keys <- list()
values <- list()
Expand All @@ -1557,7 +1557,7 @@ setMethod("zipRDD",
part
}
}

PipelinedRDD(zippedRDD, partitionFunc)
})

Expand Down Expand Up @@ -1585,17 +1585,16 @@ setMethod("subtract",
mapFunction <- function(e) { list(e, NA) }
rdd1 <- map(x, mapFunction)
rdd2 <- map(other, mapFunction)

keys(subtractByKey(rdd1, rdd2, numPartitions))
})

#' Intersection of this RDD and another one.
#'
#' Return the intersection of this RDD and another one.
#' The output will not contain any duplicate elements,
#' The output will not contain any duplicate elements,
#' even if the input RDDs did. Performs a hash partition
#' across the cluster.
#' Note that this method performs a shuffle internally.
#' Note that this method performs a shuffle internally.
#'
#' @param x An RDD.
#' @param other An RDD.
Expand All @@ -1616,7 +1615,7 @@ setMethod("intersection",
function(x, other, numPartitions = SparkR::numPartitions(x)) {
rdd1 <- map(x, function(v) { list(v, NA) })
rdd2 <- map(other, function(v) { list(v, NA) })

filterFunction <- function(elem) {
iters <- elem[[2]]
all(as.vector(
Expand Down
24 changes: 10 additions & 14 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,6 @@ setGeneric("minimum", function(x) { standardGeneric("minimum") })
#' @export
setGeneric("sumRDD", function(x) { standardGeneric("sumRDD") })

#' @rdname foreach
#' @export
setGeneric("foreach", function(x, func) { standardGeneric("foreach") })

#' @rdname name
#' @export
setGeneric("name", function(x) { standardGeneric("name") })
Expand Down Expand Up @@ -269,6 +265,10 @@ setGeneric("sampleByKey",
standardGeneric("sampleByKey")
})

#' @rdname values
#' @export
setGeneric("values", function(x) { standardGeneric("values") })


############ Shuffle Functions ############

Expand Down Expand Up @@ -349,18 +349,18 @@ setGeneric("sortByKey",
standardGeneric("sortByKey")
})

#' @rdname subtractByKey
#' @rdname subtract
#' @export
setGeneric("subtractByKey",
setGeneric("subtract",
function(x, other, numPartitions = 1L) {
standardGeneric("subtractByKey")
standardGeneric("subtract")
})

#' @rdname subtract
#' @rdname subtractByKey
#' @export
setGeneric("subtract",
setGeneric("subtractByKey",
function(x, other, numPartitions = 1L) {
standardGeneric("subtract")
standardGeneric("subtractByKey")
})

################### Broadcast Variable Methods #################
Expand Down Expand Up @@ -462,10 +462,6 @@ setGeneric("showDF", function(x,...) { standardGeneric("showDF") })
#' @export
setGeneric("sortDF", function(x, col, ...) { standardGeneric("sortDF") })

#' @rdname subtract
#' @export
setGeneric("subtract", function(x, y) { standardGeneric("subtract") })

#' @rdname tojson
#' @export
setGeneric("toJSON", function(x) { standardGeneric("toJSON") })
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/pairRDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,7 @@ setMethod("subtractByKey",
iters <- elem[[2]]
(length(iters[[1]]) > 0) && (length(iters[[2]]) == 0)
}

flatMapValues(filterRDD(cogroup(x,
other,
numPartitions = numPartitions),
Expand Down
16 changes: 8 additions & 8 deletions R/pkg/inst/tests/test_rdd.R
Original file line number Diff line number Diff line change
Expand Up @@ -481,12 +481,12 @@ test_that("subtract() on RDDs", {
actual <- collect(subtract(rdd1, rdd2))
expect_equal(as.list(sort(as.vector(actual, mode="integer"))),
l)

rdd2 <- parallelize(sc, list(2, 4))
actual <- collect(subtract(rdd1, rdd2))
expect_equal(as.list(sort(as.vector(actual, mode="integer"))),
list(1, 1, 3))

l <- list("a", "a", "b", "b", "c", "d")
rdd1 <- parallelize(sc, l)
rdd2 <- parallelize(sc, list("b", "d"))
Expand All @@ -499,21 +499,21 @@ test_that("subtractByKey() on pairwise RDDs", {
l <- list(list("a", 1), list("b", 4),
list("b", 5), list("a", 2))
rdd1 <- parallelize(sc, l)

# subtractByKey by itself
actual <- collect(subtractByKey(rdd1, rdd1))
expect_equal(actual, list())

# subtractByKey by an empty RDD
rdd2 <- parallelize(sc, list())
actual <- collect(subtractByKey(rdd1, rdd2))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(l))

rdd2 <- parallelize(sc, list(list("a", 3), list("c", 1)))
actual <- collect(subtractByKey(rdd1, rdd2))
expect_equal(actual,
list(list("b", 4), list("b", 5)))
list(list("b", 4), list("b", 5)))

l <- list(list(1, 1), list(2, 4),
list(2, 5), list(1, 2))
Expand All @@ -528,12 +528,12 @@ test_that("intersection() on RDDs", {
# intersection with self
actual <- collect(intersection(rdd, rdd))
expect_equal(sort(as.integer(actual)), nums)

# intersection with an empty RDD
emptyRdd <- parallelize(sc, list())
actual <- collect(intersection(rdd, emptyRdd))
expect_equal(actual, list())

rdd1 <- parallelize(sc, list(1, 10, 2, 3, 4, 5))
rdd2 <- parallelize(sc, list(1, 6, 2, 3, 7, 8))
actual <- collect(intersection(rdd1, rdd2))
Expand Down
8 changes: 4 additions & 4 deletions R/pkg/inst/tests/test_shuffle.R
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,13 @@ test_that("combineByKey for doubles", {
})

test_that("combineByKey for characters", {
stringKeyRDD <- parallelize(sc,
list(list("max", 1L), list("min", 2L),
stringKeyRDD <- parallelize(sc,
list(list("max", 1L), list("min", 2L),
list("other", 3L), list("max", 4L)), 2L)
reduced <- combineByKey(stringKeyRDD,
reduced <- combineByKey(stringKeyRDD,
function(x) { x }, "+", "+", 2L)
actual <- collect(reduced)

expected <- list(list("max", 5L), list("min", 2L), list("other", 3L))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
})
Expand Down

0 comments on commit 9387402

Please sign in to comment.