Skip to content

Commit

Permalink
[SPARK-20541][SPARKR][SS] support awaitTermination without timeout
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Add without param for timeout - will need this to submit a job that runs until stopped
Need this for 2.2

## How was this patch tested?

manually, unit test

Author: Felix Cheung <[email protected]>

Closes #17815 from felixcheung/rssawaitinfinite.
  • Loading branch information
felixcheung authored and Felix Cheung committed May 1, 2017
1 parent 80e9cf1 commit a355b66
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 5 deletions.
2 changes: 1 addition & 1 deletion R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -1518,7 +1518,7 @@ setGeneric("write.ml", function(object, path, ...) { standardGeneric("write.ml")

#' @rdname awaitTermination
#' @export
setGeneric("awaitTermination", function(x, timeout) { standardGeneric("awaitTermination") })
setGeneric("awaitTermination", function(x, timeout = NULL) { standardGeneric("awaitTermination") })

#' @rdname isActive
#' @export
Expand Down
14 changes: 10 additions & 4 deletions R/pkg/R/streaming.R
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,10 @@ setMethod("isActive",
#' immediately.
#'
#' @param x a StreamingQuery.
#' @param timeout time to wait in milliseconds
#' @return TRUE if query has terminated within the timeout period.
#' @param timeout time to wait in milliseconds, if omitted, wait indefinitely until \code{stopQuery}
#' is called or an error has occured.
#' @return TRUE if query has terminated within the timeout period; nothing if timeout is not
#' specified.
#' @rdname awaitTermination
#' @name awaitTermination
#' @aliases awaitTermination,StreamingQuery-method
Expand All @@ -182,8 +184,12 @@ setMethod("isActive",
#' @note experimental
setMethod("awaitTermination",
signature(x = "StreamingQuery"),
function(x, timeout) {
handledCallJMethod(x@ssq, "awaitTermination", as.integer(timeout))
function(x, timeout = NULL) {
if (is.null(timeout)) {
invisible(handledCallJMethod(x@ssq, "awaitTermination"))
} else {
handledCallJMethod(x@ssq, "awaitTermination", as.integer(timeout))
}
})

#' stopQuery
Expand Down
1 change: 1 addition & 0 deletions R/pkg/inst/tests/testthat/test_streaming.R
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ test_that("read.stream, write.stream, awaitTermination, stopQuery", {

stopQuery(q)
expect_true(awaitTermination(q, 1))
expect_error(awaitTermination(q), NA)
})

test_that("print from explain, lastProgress, status, isActive", {
Expand Down

0 comments on commit a355b66

Please sign in to comment.