Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARKR][SPARK-8452] expose jobGroup API in SparkR #6889

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ export("sparkR.init")
export("sparkR.stop")
export("print.jobj")

# Job group lifecycle management methods
export("setJobGroup",
"clearJobGroup",
"cancelJobGroup")

exportClasses("DataFrame")

exportMethods("arrange",
Expand Down
29 changes: 29 additions & 0 deletions R/pkg/R/sparkR.R
Original file line number Diff line number Diff line change
Expand Up @@ -278,3 +278,32 @@ sparkRHive.init <- function(jsc = NULL) {
assign(".sparkRHivesc", hiveCtx, envir = .sparkREnv)
hiveCtx
}

#' Assigns a group ID to all the jobs started by this thread until the group ID is set to a
#' different value or cleared.
#'
#' @param sc existing spark context
#' @param groupid the ID to be assigned to job groups
#' @param description description for the the job group ID
#' @param interruptOnCancel flag to indicate if the job is interrupted on job cancellation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah @davies point is a good one. We can add an example usage here with something like

 #' @examples
 #'\dontrun{
 #' sc <- sparkR.init()
 # setJobGroup(sc, "group", "some group", TRUE)
 #'}


setJobGroup <- function(sc, groupId, description, interruptOnCancel) {
callJMethod(sc, "setJobGroup", groupId, description, interruptOnCancel)
}

#' Clear current job group ID and its description
#'
#' @param sc existing spark context

clearJobGroup <- function(sc) {
callJMethod(sc, "clearJobGroup")
}

#' Cancel active jobs for the specified group
#'
#' @param sc existing spark context
#' @param groupId the ID of job group to be cancelled

cancelJobGroup <- function(sc, groupId) {
callJMethod(sc, "cancelJobGroup", groupId)
}