Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Addition of parallel computing with future #18 #134

Merged
merged 7 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,17 @@ Imports:
R6 (>= 2.5.0),
assertthat (>= 0.2.0),
doFuture (>= 0.12.2),
dplyr,
foreach,
future (>= 1.23.0),
parallelly (>= 1.30.0),
parallel,
foreach,
dplyr,
geodist,
ggplot2,
graphics,
methods,
Matrix,
ncdf4,
parallel,
posterior,
sf (>= 1.0),
stars (>= 0.5),
Expand All @@ -60,7 +61,6 @@ Suggests:
cubelyr,
dbarts (>= 0.9-22),
deldir,
doParallel,
ellipsis,
glmnet (>= 4.1),
glmnetUtils,
Expand Down
5 changes: 5 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,11 @@ export(get_ngbvalue)
export(get_priors)
export(get_rastervalue)
export(ibis_dependencies)
export(ibis_enable_parallel)
export(ibis_future)
export(ibis_options)
export(ibis_set_strategy)
export(ibis_set_threads)
export(interpolate_gaps)
export(is.Id)
export(is.Raster)
Expand All @@ -119,6 +122,7 @@ export(mask.BiodiversityDatasetCollection)
export(mask.BiodiversityScenario)
export(mask.DistributionModel)
export(mask.PredictorDataset)
export(modal)
export(new_id)
export(new_waiver)
export(partial)
Expand All @@ -140,6 +144,7 @@ export(rm_limits)
export(rm_offset)
export(rm_predictors)
export(rm_priors)
export(run_parallel)
export(run_stan)
export(sanitize_names)
export(scenario)
Expand Down
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#### New features

#### Minor improvements and bug fixes
* Support for modal value calculations in `ensemble()` and export of method.
* Minor :bug: fix related to misaligned thresholds and negative exponential kernels.
* :fire: :bug: fix for scenario projections that use different grain sizes than for inference.

Expand Down
76 changes: 34 additions & 42 deletions R/engine_bart.R
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ NULL
#' sum-of-trees formulation (Default: \code{1000}).
#' @param nburn A [`numeric`] estimate of the burn in samples (Default: \code{250}).
#' @param chains A number of the number of chains to be used (Default: \code{4}).
#' @param type The mode used for creating posterior predictions. Either \code{"link"}
#' @param type The type used for creating posterior predictions. Either \code{"link"}
#' or \code{"response"} (Default: \code{"response"}).
#' @param ... Other options.
#'
Expand Down Expand Up @@ -100,7 +100,8 @@ engine_bart <- function(x,
n.burn = nburn,
n.trees = iter,
n.chains = chains,
n.threads = ifelse( dbarts::guessNumCores() < getOption('ibis.nthread'),dbarts::guessNumCores(),getOption('ibis.nthread'))
n.threads = ifelse( dbarts::guessNumCores() < getOption('ibis.nthread'),
dbarts::guessNumCores(), getOption('ibis.nthread'))
)
# Other parameters
# Set up the parameter list
Expand Down Expand Up @@ -473,58 +474,49 @@ engine_bart <- function(x,
full[[settings$get('bias_variable')[i]]] <- settings$get('bias_value')[i]
}
}
check_package("foreach")
params <- self$get_data("params")

full$rowid <- 1:nrow(full)
if(is.Waiver(model$offset)) of <- NULL else of <- scales::rescale(model$offset[full$cellid, "spatial_offset"], to = c(1e-6, 1))

# Tile the problem
splits <- cut(1:nrow(full), nrow(full) / min(nrow(full) / 4, 5000) )
if(getOption("ibis.runparallel",default = FALSE)){
check_package("doFuture")
if(!("doFuture" %in% loadedNamespaces()) || ('doFuture' %notin% utils::sessionInfo()$otherPkgs) ) {
try({requireNamespace('doFuture');attachNamespace("doFuture")},silent = TRUE)
}

# Get offset if existing
if(is.Waiver(model$offset)) of <- NULL else of <- scales::rescale(model$offset[full$cellid, "spatial_offset"], to = c(1e-6, 1))
out <- predict_bart(obj = fit_bart,
newdata = full[, model$biodiversity[[1]]$predictors_names],
params = params,
w = w,
of = of,
# FIXME: Somehow parallel prediction do not work with dbarts (RNG)
# All estimates are 0.5 (random)
# By default turned off....
run_future = FALSE,
N = NULL)

# Make a prediction
ms <- foreach::foreach(s = unique(splits),
.inorder = TRUE,
.combine = rbind,
.errorhandling = "stop",
.multicombine = TRUE,
.export = c("splits", "fit_bart", "full", "model", "params", "of"),
.packages = c("dbarts", "matrixStats")) %do% {
i <- which(splits == s)

pred_bart <- predict(object = fit_bart,
newdata = full[i, model$biodiversity[[1]]$predictors_names],
type = params$type,
offset = of[i]
)
# Summarize quantiles and sd from posterior
ms <- as.data.frame(
cbind( apply(pred_bart, 2, function(x) mean(x, na.rm = TRUE)),
matrixStats::colSds(pred_bart),
matrixStats::colQuantiles(pred_bart, probs = c(.05,.5,.95)),
apply(pred_bart, 2, mode)
)
)
names(ms) <- c("mean","sd", "q05", "q50", "q95", "mode")
ms$cv <- ms$sd / ms$mean
rm(pred_bart)
return( ms )
} # End of processing
assertthat::assert_that(nrow(ms)>0,
nrow(ms) == nrow(full))
} else {
out <- predict_bart(obj = fit_bart,
newdata = full[, model$biodiversity[[1]]$predictors_names],
params = params,
w = w,
of = of,
run_future = FALSE,
N = NULL)
}
# End of processing
assertthat::assert_that(nrow(out)>0)

# Add them through a loop since the cellid changed
prediction <- terra::rast()
for(post in names(ms)){
for(post in names(out)){
prediction2 <- self$get_data('template')
prediction2[as.numeric(full$cellid)] <- ms[[post]]; names(prediction2) <- post
prediction2[as.numeric(full$cellid)] <- out[[post]]; names(prediction2) <- post
suppressWarnings( prediction <- c(prediction, prediction2) )
rm(prediction2)
}
# plot(prediction$mean, col = ibis_colours$sdm_colour)
try({rm(ms, full)},silent = TRUE)
try({rm(out, full)},silent = TRUE)
} else {
# No prediction done
prediction <- NULL
Expand Down Expand Up @@ -676,7 +668,7 @@ engine_bart <- function(x,
"q50" = matrixStats::rowQuantiles(pred_bart, probs = c(.5)),
"median" = matrixStats::rowQuantiles(pred_bart, probs = c(.5)),
"q95" = matrixStats::rowQuantiles(pred_bart, probs = c(.95)),
"mode" = apply(pred_bart, 1, mode),
"mode" = apply(pred_bart, 1, modal),
"cv" <- matrixStats::rowSds(pred_bart) / matrixStats::rowMeans2(pred_bart)
)

Expand Down
147 changes: 70 additions & 77 deletions R/engine_breg.R
Original file line number Diff line number Diff line change
Expand Up @@ -444,55 +444,30 @@ engine_breg <- function(x,
w_full_sub <- w_full[full_sub$rowid]
assertthat::assert_that((nrow(full_sub) == length(w_full_sub)) || is.null(w_full_sub) )

# Tile the problem
splits <- cut(1:nrow(full_sub), nrow(full_sub) / (min(100, nrow(full_sub) / 10)) )

# Now depending on parallization setting use foreach
if(getOption("ibis.runparallel")){
# Check that future is registered
if(!foreach::getDoParRegistered()) ibis_future(cores = getOption("ibis.nthread"),
strategy = getOption("ibis.futurestrategy"))

# Run the outgoing command
# out <- foreach::foreach(s = unique(splits),
# .combine = rbind,
# .export = c("splits", "fit_breg", "full_sub",
# "w_full_sub", "fam", "params"),
# .packages = c("matrixStats"),
# .multicombine = TRUE,
# .inorder = TRUE,
# verbose = settings$get("verbose") ) %do% {
out <- parallel::mclapply(unique(splits), function(s) {
i <- which(splits == s)
# -> external code in utils-boom
pred_breg <- predict_boom(
obj = fit_breg,
newdata = full_sub[i,],
w = w_full_sub[i],
fam = fam,
params = params
)
# Summarize the posterior
preds <- as.data.frame(
cbind(
matrixStats::rowMeans2(pred_breg, na.rm = TRUE),
matrixStats::rowSds(pred_breg, na.rm = TRUE),
matrixStats::rowQuantiles(pred_breg, probs = c(.05,.5,.95), na.rm = TRUE),
apply(pred_breg, 1, mode)
)
)
names(preds) <- c("mean", "sd", "q05", "q50", "q95", "mode")
preds$cv <- preds$sd / preds$mean
return(preds)
})
out <- do.call(rbind, out)
check_package("doFuture")
if(!("doFuture" %in% loadedNamespaces()) || ('doFuture' %notin% utils::sessionInfo()$otherPkgs) ) {
try({requireNamespace('doFuture');attachNamespace("doFuture")},silent = TRUE)
}
# Prediction function
out <- predict_boom(obj = fit_breg,
newdata = full_sub,
w = w_full_sub,
fam = fam,
params = params,
run_future = TRUE,
N = NULL)

} else {
# Tile the problem
splits <- chunk_data(full_sub,N = (min(100, nrow(full_sub) / 10)), index_only = TRUE)

out <- data.frame()
pb <- progress::progress_bar$new(total = length(levels(unique(splits))),
pb <- progress::progress_bar$new(total = length(splits),
format = "Creating model prediction (:spin) [:bar] :percent")
for(s in unique(splits)){
for(i in splits){
pb$tick()
i <- which(splits == s)
# -> external code in utils-boom
pred_breg <- predict_boom(
obj = fit_breg,
Expand All @@ -506,7 +481,7 @@ engine_breg <- function(x,
matrixStats::rowMeans2(pred_breg, na.rm = TRUE),
matrixStats::rowSds(pred_breg, na.rm = TRUE),
matrixStats::rowQuantiles(pred_breg, probs = c(.05,.5,.95), na.rm = TRUE),
apply(pred_breg, 1, mode)
apply(pred_breg, 1, modal)
) |> as.data.frame()
names(preds) <- c("mean", "sd", "q05", "q50", "q95", "mode")
preds$cv <- preds$sd / preds$mean
Expand All @@ -515,7 +490,7 @@ engine_breg <- function(x,
}
}
assertthat::assert_that(is.data.frame(out), nrow(out)>0,
msg = "Something went wrong withe prediction. Output empty!")
msg = "Something went wrong with the prediction. Output empty!")
# Fill output with summaries of the posterior
stk <- terra::rast()
for(v in colnames(out)){
Expand Down Expand Up @@ -624,7 +599,7 @@ engine_breg <- function(x,
matrixStats::rowMeans2(pred_breg, na.rm = TRUE),
matrixStats::rowSds(pred_breg, na.rm = TRUE),
matrixStats::rowQuantiles(pred_breg, probs = c(.05,.5,.95), na.rm = TRUE),
apply(pred_breg, 1, mode)
apply(pred_breg, 1, modal)
) |> as.data.frame()

names(pred_part) <- c("mean", "sd", "q05", "q50", "q95", "mode")
Expand Down Expand Up @@ -724,10 +699,12 @@ engine_breg <- function(x,
pred_part <- cbind(
matrixStats::rowMeans2(pred_breg, na.rm = TRUE),
matrixStats::rowSds(pred_breg, na.rm = TRUE),
matrixStats::rowQuantiles(pred_breg, probs = c(.05,.5,.95), na.rm = TRUE),
apply(pred_breg, 1, mode)
matrixStats::rowQuantiles(pred_breg, probs = c(.05,.5,.95), na.rm = TRUE)
) |> as.data.frame()
names(pred_part) <- c("mean", "sd", "q05", "q50", "q95", "mode")
names(pred_part) <- c("mean", "sd", "q05", "q50", "q95")
assertthat::assert_that(all(is.numeric(pred_part[,1])),
msg = "Posterior summarizing issue...?")
pred_part$mode <- apply(pred_breg, 1, modal)
pred_part$cv <- pred_part$sd / pred_part$mean

# Now create spatial prediction
Expand Down Expand Up @@ -812,36 +789,52 @@ engine_breg <- function(x,
# For Integrated model, take the last one
fam <- model$biodiversity[[length(model$biodiversity)]]$family

# Rather predict in steps than for the whole thing
out <- data.frame()
if(getOption("ibis.runparallel",default = FALSE)){
check_package("doFuture")
if(!("doFuture" %in% loadedNamespaces()) || ('doFuture' %notin% utils::sessionInfo()$otherPkgs) ) {
try({requireNamespace('doFuture');attachNamespace("doFuture")},silent = TRUE)
}
# Prediction function
out <- predict_boom(obj = mod,
newdata = df_sub,
w = unique(w)[2],
fam = fam,
params = settings$data,
run_future = TRUE,
N = NULL)
} else {
# Sequential prediction
# Rather predict in steps than for the whole thing
out <- data.frame()

# Tile the problem
splits <- cut(1:nrow(df_sub), nrow(df_sub) / (min(100, nrow(df_sub) / 10)) )
# Tile the problem
splits <- cut(1:nrow(df_sub), nrow(df_sub) / (min(100, nrow(df_sub) / 10)) )

pb <- progress::progress_bar$new(total = length(levels(unique(splits))),
format = "Projecting on new data (:spin) [:bar] :percent")
for(s in unique(splits)){
pb$tick()
i <- which(splits == s)
# -> external code in utils-boom
pred_breg <- predict_boom(
obj = mod,
newdata = df_sub[i,],
w = unique(w)[2],
fam = fam,
params = settings$data
)
# Summarize the posterior
preds <- cbind(
matrixStats::rowMeans2(pred_breg, na.rm = TRUE),
matrixStats::rowSds(pred_breg, na.rm = TRUE),
matrixStats::rowQuantiles(pred_breg, probs = c(.05,.5,.95), na.rm = TRUE),
apply(pred_breg, 1, mode)
) |> as.data.frame()
names(preds) <- c("mean", "sd", "q05", "q50", "q95", "mode")
preds$cv <- preds$sd / preds$mean
out <- rbind(out, preds)
rm(preds, pred_breg)
pb <- progress::progress_bar$new(total = length(levels(unique(splits))),
format = "Projecting on new data (:spin) [:bar] :percent")
for(s in unique(splits)){
pb$tick()
i <- which(splits == s)
# -> external code in utils-boom
pred_breg <- predict_boom(
obj = mod,
newdata = df_sub[i,],
w = unique(w)[2],
fam = fam,
params = settings$data
)
# Summarize the posterior
preds <- cbind(
matrixStats::rowMeans2(pred_breg, na.rm = TRUE),
matrixStats::rowSds(pred_breg, na.rm = TRUE),
matrixStats::rowQuantiles(pred_breg, probs = c(.05,.5,.95), na.rm = TRUE),
apply(pred_breg, 1, modal)
) |> as.data.frame()
names(preds) <- c("mean", "sd", "q05", "q50", "q95", "mode")
preds$cv <- preds$sd / preds$mean
out <- rbind(out, preds)
rm(preds, pred_breg)
}
}

# Now create spatial prediction
Expand Down
Loading
Loading