Skip to content

Commit

Permalink
[SPARK-19231][SPARKR] add error handling for download and untar for S…
Browse files Browse the repository at this point in the history
…park release

## What changes were proposed in this pull request?

When R is starting as a package and it needs to download the Spark release distribution we need to handle error for download and untar, and clean up, otherwise it will get stuck.

## How was this patch tested?

manually

Author: Felix Cheung <[email protected]>

Closes apache#16589 from felixcheung/rtarreturncode.
  • Loading branch information
felixcheung authored and cmonkey committed Feb 15, 2017
1 parent f098e84 commit 5277429
Showing 1 changed file with 40 additions and 15 deletions.
55 changes: 40 additions & 15 deletions R/pkg/R/install.R
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
#' }
#' @param overwrite If \code{TRUE}, download and overwrite the existing tar file in localDir
#' and force re-install Spark (in case the local directory or file is corrupted)
#' @return \code{install.spark} returns the local directory where Spark is found or installed
#' @return the (invisible) local directory where Spark is found or installed
#' @rdname install.spark
#' @name install.spark
#' @aliases install.spark
Expand Down Expand Up @@ -115,17 +115,35 @@ install.spark <- function(hadoopVersion = "2.7", mirrorUrl = NULL,
} else {
if (releaseUrl != "") {
message("Downloading from alternate URL:\n- ", releaseUrl)
downloadUrl(releaseUrl, packageLocalPath, paste0("Fetch failed from ", releaseUrl))
success <- downloadUrl(releaseUrl, packageLocalPath)
if (!success) {
unlink(packageLocalPath)
stop(paste0("Fetch failed from ", releaseUrl))
}
} else {
robustDownloadTar(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath)
}
}

message(sprintf("Installing to %s", localDir))
untar(tarfile = packageLocalPath, exdir = localDir)
if (!tarExists || overwrite) {
# There are two ways untar can fail - untar could stop() on errors like incomplete block on file
# or, tar command can return failure code
success <- tryCatch(untar(tarfile = packageLocalPath, exdir = localDir) == 0,
error = function(e) {
message(e)
message()
FALSE
},
warning = function(w) {
# Treat warning as error, add an empty line with message()
message(w)
message()
FALSE
})
if (!tarExists || overwrite || !success) {
unlink(packageLocalPath)
}
if (!success) stop("Extract archive failed.")
message("DONE.")
Sys.setenv(SPARK_HOME = packageLocalDir)
message(paste("SPARK_HOME set to", packageLocalDir))
Expand All @@ -135,8 +153,7 @@ install.spark <- function(hadoopVersion = "2.7", mirrorUrl = NULL,
robustDownloadTar <- function(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) {
# step 1: use user-provided url
if (!is.null(mirrorUrl)) {
msg <- sprintf("Use user-provided mirror site: %s.", mirrorUrl)
message(msg)
message("Use user-provided mirror site: ", mirrorUrl)
success <- directDownloadTar(mirrorUrl, version, hadoopVersion,
packageName, packageLocalPath)
if (success) {
Expand All @@ -156,7 +173,7 @@ robustDownloadTar <- function(mirrorUrl, version, hadoopVersion, packageName, pa
packageName, packageLocalPath)
if (success) return()
} else {
message("Unable to find preferred mirror site.")
message("Unable to download from preferred mirror site: ", mirrorUrl)
}

# step 3: use backup option
Expand All @@ -165,8 +182,11 @@ robustDownloadTar <- function(mirrorUrl, version, hadoopVersion, packageName, pa
success <- directDownloadTar(mirrorUrl, version, hadoopVersion,
packageName, packageLocalPath)
if (success) {
return(packageLocalPath)
return()
} else {
# remove any partially downloaded file
unlink(packageLocalPath)
message("Unable to download from default mirror site: ", mirrorUrl)
msg <- sprintf(paste("Unable to download Spark %s for Hadoop %s.",
"Please check network connection, Hadoop version,",
"or provide other mirror sites."),
Expand Down Expand Up @@ -201,14 +221,20 @@ directDownloadTar <- function(mirrorUrl, version, hadoopVersion, packageName, pa
msg <- sprintf(fmt, version, ifelse(hadoopVersion == "without", "Free build", hadoopVersion),
packageRemotePath)
message(msg)
downloadUrl(packageRemotePath, packageLocalPath, paste0("Fetch failed from ", mirrorUrl))
downloadUrl(packageRemotePath, packageLocalPath)
}

downloadUrl <- function(remotePath, localPath, errorMessage) {
downloadUrl <- function(remotePath, localPath) {
isFail <- tryCatch(download.file(remotePath, localPath),
error = function(e) {
message(errorMessage)
print(e)
message(e)
message()
TRUE
},
warning = function(w) {
# Treat warning as error, add an empty line with message()
message(w)
message()
TRUE
})
!isFail
Expand All @@ -234,10 +260,9 @@ sparkCachePath <- function() {
if (.Platform$OS.type == "windows") {
winAppPath <- Sys.getenv("LOCALAPPDATA", unset = NA)
if (is.na(winAppPath)) {
msg <- paste("%LOCALAPPDATA% not found.",
stop(paste("%LOCALAPPDATA% not found.",
"Please define the environment variable",
"or restart and enter an installation path in localDir.")
stop(msg)
"or restart and enter an installation path in localDir."))
} else {
path <- file.path(winAppPath, "Apache", "Spark", "Cache")
}
Expand Down

0 comments on commit 5277429

Please sign in to comment.