Skip to content

Commit

Permalink
Merge pull request apache#145 from lythesia/master
Browse files Browse the repository at this point in the history
[SPARKR-175] Refactor join code
  • Loading branch information
concretevitamin committed Feb 2, 2015
2 parents 76f6b9e + 1c2dbec commit 21e9b74
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 128 deletions.
136 changes: 8 additions & 128 deletions pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ setMethod("checkpoint",
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10, 2L)
#' numParititions(rdd) # 2L
#' numPartitions(rdd) # 2L
#'}
setGeneric("numPartitions", function(rdd) { standardGeneric("numPartitions") })

Expand Down Expand Up @@ -1603,32 +1603,7 @@ setMethod("join",
rdd2Tagged <- lapply(rdd2, function(x) { list(x[[1]], list(2L, x[[2]])) })

doJoin <- function(v) {
t1 <- vector("list", length(v))
t2 <- vector("list", length(v))
index1 <- 1
index2 <- 1
for (x in v) {
if (x[[1]] == 1L) {
t1[[index1]] <- x[[2]]
index1 <- index1 + 1
} else {
t2[[index2]] <- x[[2]]
index2 <- index2 + 1
}
}
length(t1) <- index1 - 1
length(t2) <- index2 - 1

result <- list()
length(result) <- length(t1) * length(t2)
index <- 1
for (i in t1) {
for (j in t2) {
result[[index]] <- list(i, j)
index <- index + 1
}
}
result
joinTaggedList(v, list(FALSE, FALSE))
}

joined <- flatMapValues(groupByKey(unionRDD(rdd1Tagged, rdd2Tagged), numPartitions), doJoin)
Expand Down Expand Up @@ -1668,37 +1643,7 @@ setMethod("leftOuterJoin",
rdd2Tagged <- lapply(rdd2, function(x) { list(x[[1]], list(2L, x[[2]])) })

doJoin <- function(v) {
t1 <- vector("list", length(v))
t2 <- vector("list", length(v))
index1 <- 1
index2 <- 1
for (x in v) {
if (x[[1]] == 1L) {
t1[[index1]] <- x[[2]]
index1 <- index1 + 1
} else {
t2[[index2]] <- x[[2]]
index2 <- index2 + 1
}
}
length(t1) <- index1 - 1
len2 <- index2 - 1
if (len2 == 0) {
t2 <- list(NULL)
} else {
length(t2) <- len2
}

result <- list()
length(result) <- length(t1) * length(t2)
index <- 1
for (i in t1) {
for (j in t2) {
result[[index]] <- list(i, j)
index <- index + 1
}
}
result
joinTaggedList(v, list(FALSE, TRUE))
}

joined <- flatMapValues(groupByKey(unionRDD(rdd1Tagged, rdd2Tagged), numPartitions), doJoin)
Expand Down Expand Up @@ -1738,37 +1683,7 @@ setMethod("rightOuterJoin",
rdd2Tagged <- lapply(rdd2, function(x) { list(x[[1]], list(2L, x[[2]])) })

doJoin <- function(v) {
t1 <- vector("list", length(v))
t2 <- vector("list", length(v))
index1 <- 1
index2 <- 1
for (x in v) {
if (x[[1]] == 1L) {
t1[[index1]] <- x[[2]]
index1 <- index1 + 1
} else {
t2[[index2]] <- x[[2]]
index2 <- index2 + 1
}
}
len1 <- index1 - 1
if (len1 == 0) {
t1 <- list(NULL)
} else {
length(t1) <- len1
}
length(t2) <- index2 - 1

result <- list()
length(result) <- length(t1) * length(t2)
index <- 1
for (i in t1) {
for (j in t2) {
result[[index]] <- list(i, j)
index <- index + 1
}
}
result
joinTaggedList(v, list(TRUE, FALSE))
}

joined <- flatMapValues(groupByKey(unionRDD(rdd1Tagged, rdd2Tagged), numPartitions), doJoin)
Expand Down Expand Up @@ -1798,59 +1713,24 @@ setMethod("rightOuterJoin",
#' rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4)))
#' fullOuterJoin(rdd1, rdd2, 2L) # list(list(1, list(2, 1)),
#' # list(1, list(3, 1)),
#' # list(3, list(3, NULL)),
#' # list(2, list(NULL, 4)))
#' # list(3, list(3, NULL)),
#'}
setGeneric("fullOuterJoin", function(rdd1, rdd2, numPartitions) { standardGeneric("fullOuterJoin") })

#' @rdname fullOuterJoin
#' @aliases fullOuterJoin,RDD,RDD-method

setMethod("fullOuterJoin",
signature(rdd1 = "RDD", rdd2 = "RDD", numPartitions = "integer"),
function(rdd1, rdd2, numPartitions) {
rdd1Tagged <- lapply(rdd1, function(x) { list(x[[1]], list(1L, x[[2]])) })
rdd2Tagged <- lapply(rdd2, function(x) { list(x[[1]], list(2L, x[[2]])) })

doJoin <- function(v) {
t1 <- vector("list", length(v))
t2 <- vector("list", length(v))
index1 <- 1
index2 <- 1
for (x in v) {
if (x[[1]] == 1L) {
t1[[index1]] <- x[[2]]
index1 <- index1 + 1
} else {
t2[[index2]] <- x[[2]]
index2 <- index2 + 1
}
}
len1 <- index1 - 1
len2 <- index2 - 1

if (len1 == 0) {
t1 <- list(NULL)
} else {
length(t1) <- len1
}

if (len2 == 0) {
t2 <- list(NULL)
} else {
length(t2) <- len2
}

result <- list()
length(result) <- length(t1) * length(t2)
index <- 1
for(i in t1) {
for(j in t2) {
result[[index]] <- list(i, j)
index <- index + 1
}
}
result
joinTaggedList(v, list(TRUE, TRUE))
}

joined <- flatMapValues(groupByKey(unionRDD(rdd1Tagged, rdd2Tagged), numPartitions), doJoin)
})

Expand Down
58 changes: 58 additions & 0 deletions pkg/R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -201,3 +201,61 @@ sortKeyValueList <- function(kv_list) {
keys <- sapply(kv_list, function(x) x[[1]])
kv_list[order(keys)]
}

# Utility function to generate compact R lists from grouped rdd
# Used in Join-family functions
# param:
# tagged_list R list generated via groupByKey with tags(1L, 2L, ...)
# cnull Boolean list where each element determines whether the corresponding list should
# be converted to list(NULL)
genCompactLists <- function(tagged_list, cnull) {
len <- length(tagged_list)
lists <- list(vector("list", len), vector("list", len))
index <- list(1, 1)

for (x in tagged_list) {
tag <- x[[1]]
idx <- index[[tag]]
lists[[tag]][[idx]] <- x[[2]]
index[[tag]] <- idx + 1
}

len <- lapply(index, function(x) x - 1)
for (i in (1:2)) {
if (cnull[[i]] && len[[i]] == 0) {
lists[[i]] <- list(NULL)
} else {
length(lists[[i]]) <- len[[i]]
}
}

lists
}

# Utility function to merge compact R lists
# Used in Join-family functions
# param:
# left/right Two compact lists ready for Cartesian product
mergeCompactLists <- function(left, right) {
result <- list()
length(result) <- length(left) * length(right)
index <- 1
for (i in left) {
for (j in right) {
result[[index]] <- list(i, j)
index <- index + 1
}
}
result
}

# Utility function to wrapper above two operations
# Used in Join-family functions
# param (same as genCompactLists):
# tagged_list R list generated via groupByKey with tags(1L, 2L, ...)
# cnull Boolean list where each element determines whether the corresponding list should
# be converted to list(NULL)
joinTaggedList <- function(tagged_list, cnull) {
lists <- genCompactLists(tagged_list, cnull)
mergeCompactLists(lists[[1]], lists[[2]])
}

0 comments on commit 21e9b74

Please sign in to comment.