Skip to content

Commit

Permalink
[SPARK-26761][SQL][R] Vectorized R gapply() implementation
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

This PR targets to add vectorized `gapply()` in R, Arrow optimization.

This can be tested as below:

```bash
$ ./bin/sparkR --conf spark.sql.execution.arrow.enabled=true
```

```r
df <- createDataFrame(mtcars)
collect(gapply(df,
               "gear",
               function(key, group) {
                 data.frame(gear = key[[1]], disp = mean(group$disp) > group$disp)
               },
               structType("gear double, disp boolean")))
```

### Requirements
  - R 3.5.x
  - Arrow package 0.12+
    ```bash
    Rscript -e 'remotes::install_github("apache/arrowapache-arrow-0.12.0", subdir = "r")'
    ```

**Note:** currently, Arrow R package is not in CRAN. Please take a look at ARROW-3204.
**Note:** currently, Arrow R package seems not supporting Windows. Please take a look at ARROW-3204.

### Benchmarks

**Shall**

```bash
sync && sudo purge
./bin/sparkR --conf spark.sql.execution.arrow.enabled=false
```

```bash
sync && sudo purge
./bin/sparkR --conf spark.sql.execution.arrow.enabled=true
```

**R code**

```r
rdf <- read.csv("500000.csv")
rdf <- rdf[, c("Month.of.Joining", "Weight.in.Kgs.")]  # We're only interested in the key and values to calculate.
df <- cache(createDataFrame(rdf))
count(df)

test <- function() {
  options(digits.secs = 6) # milliseconds
  start.time <- Sys.time()
  count(gapply(df,
               "Month_of_Joining",
               function(key, group) {
                 data.frame(Month_of_Joining = key[[1]], Weight_in_Kgs_ = mean(group$Weight_in_Kgs_) > group$Weight_in_Kgs_)
               },
               structType("Month_of_Joining integer, Weight_in_Kgs_ boolean")))
  end.time <- Sys.time()
  time.taken <- end.time - start.time
  print(time.taken)
}

test()
```

**Data (350 MB):**

```r
object.size(read.csv("500000.csv"))
350379504 bytes
```

"500000 Records"  http://eforexcel.com/wp/downloads-16-sample-csv-files-data-sets-for-testing/

**Results**

```
Time difference of 35.67459 secs
```

```
Time difference of 4.301399 secs
```

The performance improvement was around **829%**.

**Note that** I am 100% sure this PR improves more then 829% because I gave up testing it with non-Arrow optimization because it took super super super long when the data size becomes bigger.

### Limitations

- For now, Arrow optimization with R does not support when the data is `raw`, and when user explicitly gives float type in the schema. They produce corrupt values.

- Due to ARROW-4512, it cannot send and receive batch by batch. It has to send all batches in Arrow stream format at once. It needs improvement later.

## How was this patch tested?

Unit tests were added

**TODOs:**
- [x] Draft codes
- [x] make the tests passed
- [x] make the CRAN check pass
- [x] Performance measurement
- [x] Supportability investigation (for instance types)

Closes #23746 from HyukjinKwon/SPARK-26759.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
  • Loading branch information
HyukjinKwon committed Feb 13, 2019
1 parent 72a349a commit 8126d09
Show file tree
Hide file tree
Showing 9 changed files with 578 additions and 70 deletions.
27 changes: 27 additions & 0 deletions R/pkg/R/deserialize.R
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,33 @@ readMultipleObjectsWithKeys <- function(inputCon) {
list(keys = keys, data = data) # this is a list of keys and corresponding data
}

readDeserializeInArrow <- function(inputCon) {
# This is a hack to avoid CRAN check. Arrow is not uploaded into CRAN now. See ARROW-3204.
requireNamespace1 <- requireNamespace
if (requireNamespace1("arrow", quietly = TRUE)) {
RecordBatchStreamReader <- get(
"RecordBatchStreamReader", envir = asNamespace("arrow"), inherits = FALSE)
as_tibble <- get("as_tibble", envir = asNamespace("arrow"))

# Currently, there looks no way to read batch by batch by socket connection in R side,
# See ARROW-4512. Therefore, it reads the whole Arrow streaming-formatted binary at once
# for now.
dataLen <- readInt(inputCon)
arrowData <- readBin(inputCon, raw(), as.integer(dataLen), endian = "big")
batches <- RecordBatchStreamReader(arrowData)$batches()

# Read all groupped batches. Tibble -> data.frame is cheap.
data <- lapply(batches, function(batch) as.data.frame(as_tibble(batch)))

# Read keys to map with each groupped batch.
keys <- readMultipleObjects(inputCon)

list(keys = keys, data = data)
} else {
stop("'arrow' package should be installed.")
}
}

readRowList <- function(obj) {
# readRowList is meant for use inside an lapply. As a result, it is
# necessary to open a standalone connection for the row and consume
Expand Down
23 changes: 23 additions & 0 deletions R/pkg/R/group.R
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,29 @@ gapplyInternal <- function(x, func, schema) {
if (is.character(schema)) {
schema <- structType(schema)
}
arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] == "true"
if (arrowEnabled) {
requireNamespace1 <- requireNamespace
if (!requireNamespace1("arrow", quietly = TRUE)) {
stop("'arrow' package should be installed.")
}
# Currenty Arrow optimization does not support raw for now.
# Also, it does not support explicit float type set by users.
if (inherits(schema, "structType")) {
if (any(sapply(schema$fields(), function(x) x$dataType.toString() == "FloatType"))) {
stop("Arrow optimization with gapply do not support FloatType yet.")
}
if (any(sapply(schema$fields(), function(x) x$dataType.toString() == "BinaryType"))) {
stop("Arrow optimization with gapply do not support BinaryType yet.")
}
} else if (is.null(schema)) {
stop(paste0("Arrow optimization does not support gapplyCollect yet. Please use ",
"'collect' and 'gapply' APIs instead."))
} else {
stop("'schema' should be DDL-formatted string or structType.")
}
}

packageNamesArr <- serialize(.sparkREnv[[".packages"]],
connection = NULL)
broadcastArr <- lapply(ls(.broadcastNames),
Expand Down
33 changes: 30 additions & 3 deletions R/pkg/inst/worker/worker.R
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ compute <- function(mode, partition, serializer, deserializer, key,
names(inputData) <- colNames
} else {
# Check to see if inputData is a valid data.frame
stopifnot(deserializer == "byte")
stopifnot(deserializer == "byte" || deserializer == "arrow")
stopifnot(class(inputData) == "data.frame")
}

Expand All @@ -63,7 +63,7 @@ compute <- function(mode, partition, serializer, deserializer, key,
output <- split(output, seq(nrow(output)))
} else {
# Serialize the output to a byte array
stopifnot(serializer == "byte")
stopifnot(serializer == "byte" || serializer == "arrow")
}
} else {
output <- computeFunc(partition, inputData)
Expand Down Expand Up @@ -171,6 +171,10 @@ if (isEmpty != 0) {
data <- dataWithKeys$data
} else if (deserializer == "row") {
data <- SparkR:::readMultipleObjects(inputCon)
} else if (deserializer == "arrow" && mode == 2) {
dataWithKeys <- SparkR:::readDeserializeInArrow(inputCon)
keys <- dataWithKeys$keys
data <- dataWithKeys$data
}

# Timing reading input data for execution
Expand All @@ -181,17 +185,40 @@ if (isEmpty != 0) {
colNames, computeFunc, data)
} else {
# gapply mode
outputs <- list()
for (i in 1:length(data)) {
# Timing reading input data for execution
inputElap <- elapsedSecs()
output <- compute(mode, partition, serializer, deserializer, keys[[i]],
colNames, computeFunc, data[[i]])
computeElap <- elapsedSecs()
outputResult(serializer, output, outputCon)
if (deserializer == "arrow") {
outputs[[length(outputs) + 1L]] <- output
} else {
outputResult(serializer, output, outputCon)
}
outputElap <- elapsedSecs()
computeInputElapsDiff <- computeInputElapsDiff + (computeElap - inputElap)
outputComputeElapsDiff <- outputComputeElapsDiff + (outputElap - computeElap)
}

if (deserializer == "arrow") {
# This is a hack to avoid CRAN check. Arrow is not uploaded into CRAN now. See ARROW-3204.
requireNamespace1 <- requireNamespace
if (requireNamespace1("arrow", quietly = TRUE)) {
write_arrow <- get("write_arrow", envir = asNamespace("arrow"), inherits = FALSE)
# See https://stat.ethz.ch/pipermail/r-help/2010-September/252046.html
# rbind.fill might be an anternative to make it faster if plyr is installed.
combined <- do.call("rbind", outputs)

# Likewise, there looks no way to send each batch in streaming format via socket
# connection. See ARROW-4512.
# So, it writes the whole Arrow streaming-formatted binary at once for now.
SparkR:::writeRaw(outputCon, write_arrow(combined, raw()))
} else {
stop("'arrow' package should be installed.")
}
}
}
} else {
output <- compute(mode, partition, serializer, deserializer, NULL,
Expand Down
110 changes: 110 additions & 0 deletions R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -3529,6 +3529,116 @@ test_that("gapply() and gapplyCollect() on a DataFrame", {
})
})

test_that("gapply() Arrow optimization", {
skip_if_not_installed("arrow")
df <- createDataFrame(mtcars)

conf <- callJMethod(sparkSession, "conf")
arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]

callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "false")
tryCatch({
ret <- gapply(df,
"gear",
function(key, grouped) {
if (length(key) > 0) {
stopifnot(is.numeric(key[[1]]))
}
stopifnot(class(grouped) == "data.frame")
grouped
},
schema(df))
expected <- collect(ret)
},
finally = {
# Resetting the conf back to default value
callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
})

callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
tryCatch({
ret <- gapply(df,
"gear",
function(key, grouped) {
if (length(key) > 0) {
stopifnot(is.numeric(key[[1]]))
}
stopifnot(class(grouped) == "data.frame")
grouped
},
schema(df))
actual <- collect(ret)
expect_equal(actual, expected)
},
finally = {
# Resetting the conf back to default value
callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
})
})

test_that("gapply() Arrow optimization - type specification", {
skip_if_not_installed("arrow")
# Note that regular gapply() seems not supporting date and timestamps
# whereas Arrow-optimized gapply() does.
rdf <- data.frame(list(list(a = 1,
b = "a",
c = TRUE,
d = 1.1,
e = 1L)))
df <- createDataFrame(rdf)

conf <- callJMethod(sparkSession, "conf")
arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]

callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "false")
tryCatch({
ret <- gapply(df,
"a",
function(key, grouped) { grouped }, schema(df))
expected <- collect(ret)
},
finally = {
# Resetting the conf back to default value
callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
})


callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
tryCatch({
ret <- gapply(df,
"a",
function(key, grouped) { grouped }, schema(df))
actual <- collect(ret)
expect_equal(actual, expected)
},
finally = {
# Resetting the conf back to default value
callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
})
})

test_that("gapply() Arrow optimization - type specification (date and timestamp)", {
skip_if_not_installed("arrow")
rdf <- data.frame(list(list(a = as.Date("1990-02-24"),
b = as.POSIXct("1990-02-24 12:34:56"))))
df <- createDataFrame(rdf)

conf <- callJMethod(sparkSession, "conf")
arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]

callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
tryCatch({
ret <- gapply(df,
"a",
function(key, grouped) { grouped }, schema(df))
expect_equal(collect(ret), rdf)
},
finally = {
# Resetting the conf back to default value
callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
})
})

test_that("Window functions on a DataFrame", {
df <- createDataFrame(list(list(1L, "1"), list(2L, "2"), list(1L, "1"), list(2L, "2")),
schema = c("key", "value"))
Expand Down
Loading

0 comments on commit 8126d09

Please sign in to comment.