Skip to content

Commit

Permalink
Merge pull request apache#93 from hlin09/hlin09
Browse files Browse the repository at this point in the history
[SPARKR-15] Adds function Filter() to extract elements that satisfy a predicate.
  • Loading branch information
shivaram committed Oct 30, 2014
2 parents 579db58 + 488ac47 commit be82dcc
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 0 deletions.
2 changes: 2 additions & 0 deletions pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ exportMethods(
"combineByKey",
"count",
"distinct",
"Filter",
"filter",
"flatMap",
"groupByKey",
"length",
Expand Down
38 changes: 38 additions & 0 deletions pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,44 @@ setMethod("mapPartitionsWithIndex",
lapplyPartitionsWithIndex(X, FUN)
})

#' This function returns a new RDD containing only the elements that satisfy
#' a predicate (i.e. returning TRUE in a given logical function).
#' The same as `filter()' in Spark.
#'
#' @param f A unary predicate function.
#' @param x The RDD to be filtered.
#' @rdname Filter
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10)
#' unlist(collect(Filter(function (x) { x < 3 }, rdd))) # c(1, 2)
#'}
#setGeneric("Filter", function(f, x) { standardGeneric("Filter") })

#' @rdname Filter
#' @aliases Filter,function,RDD-method filter,function,RDD-method
setMethod("Filter",
signature(f = "function", x = "RDD"),
function(f, x) {
filter.func <- function(part) {
Filter(f, part)
}
lapplyPartition(x, filter.func)
})

#' @rdname Filter
#' @export
setGeneric("filter", function(f, x) { standardGeneric("filter") })

#' @rdname Filter
#' @aliases filter,function,RDD-method
setMethod("filter",
signature(f = "function", x = "RDD"),
function(f, x) {
Filter(f, x)
})

#' Reduce across elements of an RDD.
#'
Expand Down
15 changes: 15 additions & 0 deletions pkg/inst/tests/test_rdd.R
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,21 @@ test_that("mapPartitions on RDD", {
expect_equal(actual, list(15, 40))
})

test_that("Filter on RDD", {
filtered.rdd <- Filter(function(x) { x %% 2 == 0 }, rdd)
actual <- collect(filtered.rdd)
expect_equal(actual, list(2, 4, 6, 8, 10))

filtered.rdd <- Filter(function(x) { x[[2]] < 0 }, intRdd)
actual <- collect(filtered.rdd)
expect_equal(actual, list(list(1L, -1)))

# Filter out all elements.
filtered.rdd <- Filter(function(x) { x > 10 }, rdd)
actual <- collect(filtered.rdd)
expect_equal(actual, list())
})

test_that("lookup on RDD", {
vals <- lookup(intRdd, 1L)
expect_equal(vals, list(-1, 200))
Expand Down
34 changes: 34 additions & 0 deletions pkg/man/Filter.Rd
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
% Generated by roxygen2 (4.0.2): do not edit by hand
\docType{methods}
\name{Filter,function,RDD-method}
\alias{Filter,function,RDD-method}
\alias{filter}
\alias{filter,function,RDD-method}
\title{This function returns a new RDD containing only the elements that satisfy
a predicate (i.e. returning TRUE in a given logical function).
The same as `filter()' in Spark.}
\usage{
\S4method{Filter}{`function`,RDD}(f, x)
filter(f, x)
\S4method{filter}{`function`,RDD}(f, x)
}
\arguments{
\item{f}{A unary predicate function.}
\item{x}{The RDD to be filtered.}
}
\description{
This function returns a new RDD containing only the elements that satisfy
a predicate (i.e. returning TRUE in a given logical function).
The same as `filter()' in Spark.
}
\examples{
\dontrun{
sc <- sparkR.init()
rdd <- parallelize(sc, 1:10)
unlist(collect(Filter(function (x) { x < 3 }, rdd))) # c(1, 2)
}
}

28 changes: 28 additions & 0 deletions pkg/man/distinct.Rd
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
% Generated by roxygen2 (4.0.2): do not edit by hand
\docType{methods}
\name{distinct}
\alias{distinct}
\alias{distinct,RDD,missingOrInteger-method}
\title{Removes the duplicates from RDD.}
\usage{
distinct(rdd, numPartitions)

\S4method{distinct}{RDD,missingOrInteger}(rdd, numPartitions)
}
\arguments{
\item{rdd}{The RDD to remove duplicates from.}

\item{numPartitions}{Number of partitions to create.}
}
\description{
This function returns a new RDD containing the distinct elements in the
given RDD. The same as `distinct()' in Spark.
}
\examples{
\dontrun{
sc <- sparkR.init()
rdd <- parallelize(sc, c(1,2,2,3,3,3))
sort(unlist(collect(distinct(rdd)))) # c(1, 2, 3)
}
}
32 changes: 32 additions & 0 deletions pkg/man/mapValues.Rd
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
% Generated by roxygen2 (4.0.2): do not edit by hand
\docType{methods}
\name{mapValues}
\alias{mapValues}
\alias{mapValues,RDD,function-method}
\title{Applies a function to all values of the elements, without modifying the keys.}
\usage{
mapValues(X, FUN)

\S4method{mapValues}{RDD,`function`}(X, FUN)
}
\arguments{
\item{X}{The RDD to apply the transformation.}

\item{FUN}{the transformation to apply on the value of each element.}
}
\value{
a new RDD created by the transformation.
}
\description{
The same as `mapValues()' in Spark.
}
\examples{
\dontrun{
sc <- sparkR.init()
rdd <- parallelize(sc, 1:10)
makePairs <- lapply(rdd, function(x) { list(x, x) })
collect(mapValues(makePairs, function(x) { x * 2) })
Output: list(list(1,2), list(2,4), list(3,6), ...)
}
}

0 comments on commit be82dcc

Please sign in to comment.