Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New hybrid dispatcher concept (enables mirai cancellation) #170

Merged
merged 75 commits into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
7251bb4
add Posit as copyright holder
shikokuchuo Nov 24, 2024
69ee232
wip
shikokuchuo Nov 25, 2024
ce7b728
dual-protocol dispatcher concept
shikokuchuo Nov 26, 2024
3625416
daemon2() and dispatcher2() and integrate
shikokuchuo Nov 26, 2024
d6f0bfc
push efficiency
shikokuchuo Nov 26, 2024
199cf2d
refine logic efficiency
shikokuchuo Nov 27, 2024
1b64837
introduce daemon busy indicator
shikokuchuo Nov 27, 2024
bdc2579
update mirai_map docs example
shikokuchuo Nov 27, 2024
a90e29b
dual receives at daemon2()
shikokuchuo Nov 27, 2024
690cc27
rename variables req/res
shikokuchuo Nov 27, 2024
a056af8
msgid concept
shikokuchuo Nov 27, 2024
2a0f65f
cancellation concept
shikokuchuo Nov 27, 2024
f77c80f
make mirai cancellation more robust, including those queued for execu…
shikokuchuo Nov 28, 2024
0e66ea0
move 'next' to non-default option for R CMD check to pass
shikokuchuo Nov 28, 2024
f00f195
do more under interrupt handler; start adding tests
shikokuchuo Nov 28, 2024
c7a4915
make launchers work with next dispatcher
shikokuchuo Nov 28, 2024
b5db307
add tls tests
shikokuchuo Nov 28, 2024
cea4645
get real port for next dispatcher; make safe saisei()
shikokuchuo Nov 28, 2024
224f8b9
streamline dispatcher2 logic
shikokuchuo Nov 28, 2024
282b2b0
switch to socket_pipe() from collect_pipe()
shikokuchuo Nov 28, 2024
145d7ac
move to new pipe interface in nanonext
shikokuchuo Nov 29, 2024
7ba6471
use new Monitor class
shikokuchuo Nov 29, 2024
cd7ba44
handle daemon connection resets
shikokuchuo Nov 29, 2024
083e4aa
partially retire dispatcher ctrl socket
shikokuchuo Nov 29, 2024
7bba855
re-architecture complete
shikokuchuo Nov 29, 2024
44f1ca6
cleanups
shikokuchuo Nov 29, 2024
99615cc
safer test
shikokuchuo Nov 29, 2024
aab8477
report richer daemons status
shikokuchuo Nov 29, 2024
c91f6c7
use higher performance synchronous directed sends for dispatcher
shikokuchuo Nov 29, 2024
0909cf3
fix mirai_map() example
shikokuchuo Nov 29, 2024
ff9fc99
further cleanups
shikokuchuo Nov 29, 2024
be099ac
more granular cancellation reporting
shikokuchuo Nov 30, 2024
a409453
fix docs
shikokuchuo Nov 30, 2024
219b384
streamlines daemon() ui
shikokuchuo Dec 1, 2024
5dcf951
prevent partial matching at daemon()
shikokuchuo Dec 1, 2024
a4b6ce6
defunct experimental threaded dispatcher
shikokuchuo Dec 1, 2024
4031b64
remove further traces of threaded dispatcher
shikokuchuo Dec 1, 2024
6a9435f
make new dispatcher the default
shikokuchuo Dec 1, 2024
f02f094
update for latest nanonext
shikokuchuo Dec 1, 2024
ffcabf3
install on.exit expression first for eval_mirai
shikokuchuo Dec 1, 2024
9bb34a3
simplify daemon() loop
shikokuchuo Dec 1, 2024
a32acaa
fix tls
shikokuchuo Dec 1, 2024
5e41210
update docs
shikokuchuo Dec 1, 2024
204a113
more consistent status() and docs
shikokuchuo Dec 1, 2024
a0937c8
update stop_mirai() docs
shikokuchuo Dec 1, 2024
86662ac
cleanup internal functions
shikokuchuo Dec 2, 2024
c5bb9c2
always use unresolved() instead of .unresolved()
shikokuchuo Dec 2, 2024
ddb059b
add stress tests
shikokuchuo Dec 2, 2024
f1558c7
update daemons() docs
shikokuchuo Dec 2, 2024
9d880c2
recv_aio() must be after eval for serialization registration to work
shikokuchuo Dec 2, 2024
53037d8
update vignettes
shikokuchuo Dec 2, 2024
bde62d2
use non-signalling receive for cancellation monitoring
shikokuchuo Dec 2, 2024
86c2ada
simplify interrupt monitoring
shikokuchuo Dec 2, 2024
d9ed3a9
adopt safer sequencing
shikokuchuo Dec 2, 2024
4d7b844
use latest nanonext interface
shikokuchuo Dec 2, 2024
8eb7a2a
use CRAN nanonext
shikokuchuo Dec 3, 2024
5e55f4a
update NEWS and docs
shikokuchuo Dec 3, 2024
567e39d
simplify launchers and launch configurations
shikokuchuo Dec 3, 2024
f2fedb8
fix dameons launcher integration; update vignette
shikokuchuo Dec 3, 2024
2d78641
improve test coverage
shikokuchuo Dec 3, 2024
ad214f7
test more
shikokuchuo Dec 3, 2024
81e9238
fold legacy dispatcher into dispatcher
shikokuchuo Dec 3, 2024
f36ae0a
upgrades everywhere()
shikokuchuo Dec 3, 2024
dd30542
have stop_mirai() return either TRUE or FALSE
shikokuchuo Dec 4, 2024
d701d0d
additional safety against mistimed cancellation requests; simplify cl…
shikokuchuo Dec 4, 2024
baccc90
do not release cancelled slot too early
shikokuchuo Dec 4, 2024
5fe0b60
vectorize stop_mirai() (as documented)
shikokuchuo Dec 4, 2024
a71fada
avoids legacy code path for daemon()
shikokuchuo Dec 5, 2024
d3d14c3
rename and retain docs for legacy functions
shikokuchuo Dec 5, 2024
65550da
implements stop_daemon()
shikokuchuo Dec 5, 2024
36a0259
fix tests and test more
shikokuchuo Dec 5, 2024
d09c988
ensures legacy dispatcher calls legacy daemon
shikokuchuo Dec 5, 2024
517b70f
improves vectorized mirai cancellation
shikokuchuo Dec 29, 2024
8f1ee3f
revert to logical argument for dispatcher at daemons()
shikokuchuo Dec 29, 2024
3e842f1
remove unncessary code
shikokuchuo Dec 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: mirai
Type: Package
Title: Minimalist Async Evaluation Framework for R
Version: 1.3.1.9000
Version: 1.3.1.9022
Description: Designed for simplicity, a 'mirai' evaluates an R expression
asynchronously in a parallel process, locally or distributed over the
network, with the result automatically available upon completion. Modern
Expand All @@ -23,6 +23,8 @@ Authors@R:
role = "ctb",
email = "[email protected]"),
person(given = "Hibiki AI Limited",
role = "cph"),
person(given = "Posit Software, PBC",
role = "cph"))
License: GPL (>= 3)
BugReports: https://github.com/shikokuchuo/mirai/issues
Expand All @@ -31,7 +33,7 @@ Encoding: UTF-8
Depends:
R (>= 3.6)
Imports:
nanonext (>= 1.3.0)
nanonext (>= 1.4.0)
Enhances:
parallel,
promises
Expand Down
7 changes: 5 additions & 2 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -52,33 +52,36 @@ export(serial_config)
export(ssh_config)
export(status)
export(stop_cluster)
export(stop_daemon)
export(stop_mirai)
export(unresolved)
importFrom(nanonext,"opt<-")
importFrom(nanonext,.advance)
importFrom(nanonext,.context)
importFrom(nanonext,.dispatcher)
importFrom(nanonext,.interrupt)
importFrom(nanonext,.keep)
importFrom(nanonext,.mark)
importFrom(nanonext,.online)
importFrom(nanonext,.unresolved)
importFrom(nanonext,call_aio)
importFrom(nanonext,call_aio_)
importFrom(nanonext,collect_aio)
importFrom(nanonext,collect_aio_)
importFrom(nanonext,cv)
importFrom(nanonext,cv_signal)
importFrom(nanonext,cv_value)
importFrom(nanonext,dial)
importFrom(nanonext,is_error_value)
importFrom(nanonext,listen)
importFrom(nanonext,lock)
importFrom(nanonext,mclock)
importFrom(nanonext,monitor)
importFrom(nanonext,msleep)
importFrom(nanonext,nng_error)
importFrom(nanonext,opt)
importFrom(nanonext,parse_url)
importFrom(nanonext,pipe_notify)
importFrom(nanonext,random)
importFrom(nanonext,read_monitor)
importFrom(nanonext,reap)
importFrom(nanonext,recv)
importFrom(nanonext,recv_aio)
Expand Down
26 changes: 25 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,28 @@
# mirai 1.3.1.9000 (development)
# mirai 1.3.1.9022 (development)

#### New Architecture

* Distributed computing now uses a single URL at which all daemons connect (with or without dispatcher).
- Allows a more efficient `tcp://` or `tls+tcp://` connection in most cases instead of websockets.
- Daemons may be added or removed at any time without limit.

#### New Features

* `daemons(dispatcher = TRUE)` provides a new and more efficient architecture for dispatcher. This argument reverts to a logical value, although 'process' is still accepted and retains the previous behaviour of the v1 dispatcher.
* Upgrades `stop_mirai()` to cancel remote mirai tasks when using the new dispatcher, returning a logical value indicating whether cancellation was successful.
* Adds `stop_daemon()` to reduce the number of connected daemons (without interrupting any executing tasks).
* `daemon()` gains the new argument 'dispatcher', which should be set to `TRUE` when connecting to dispatcher and `FALSE` when connecting directly to host.

#### Updates

* `status()` using the new dispatcher is updated to provide more concise information.
* `everywhere()` now returns a list of mirai, which may be waited for and inspected (thanks @dgkf #164).
* `launch_local()` and `launch_remote()` simplified to take the argument 'n' instead of 'url' for how many daemons to launch. `launch_local()` now returns the number of dameons launched rather than invisible NULL.
* `ssh_config()` simplified to take the argument 'port' instead of 'host'. For SSH tunnelling, this is the port that will be used, and the hostname is now required to be '127.0.0.1' (no longer accepting 'localhost').
* `daemon()` '...' argument has been moved up to prevent partial matching on any of the optional arguments.
* `saisei()` is defunct as no longer required, but still available for use with the old v1 dispatcher.
* Experimental threaded dispatcher `daemons(dispatcher = "thread")` has been retired (as this was based on the old dispatcher architecture and future development will focus on the current design). Specifying 'dispatcher = thread' is defunct, but will point to 'dispatcher = process' for the time being.
* Requires `nanonext` >= 1.4.0.

# mirai 1.3.1

Expand Down
185 changes: 179 additions & 6 deletions R/daemon.R
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,131 @@
#' @param url the character host or dispatcher URL to dial into, including the
#' port to connect to (and optionally for websockets, a path), e.g.
#' 'tcp://hostname:5555' or 'ws://10.75.32.70:5555/path'.
#' @param ... reserved but not currently used.
#' @param dispatcher [default FALSE] logical value, which should be set to TRUE
#' if using dispatcher and FALSE otherwise.
#' @param asyncdial [default FALSE] whether to perform dials asynchronously. The
#' default FALSE will error if a connection is not immediately possible (for
#' instance if \code{\link{daemons}} has yet to be called on the host, or the
#' specified port is not open etc.). Specifying TRUE continues retrying
#' (indefinitely) if not immediately successful, which is more resilient but
#' can mask potential connection issues.
#' @param autoexit [default TRUE] logical value, whether the daemon should exit
#' automatically when its socket connection ends. If a signal from the
#' \pkg{tools} package, such as \code{tools::SIGINT}, or an equivalent integer
#' value is supplied, this signal is additionally raised on exit (see
#' 'Persistence' section below).
#' @param cleanup [default TRUE] logical value, whether to perform cleanup of
#' the global environment and restore attached packages and options to an
#' initial state after each evaluation.
#' @param output [default FALSE] logical value, to output generated stdout /
#' stderr if TRUE, or else discard if FALSE. Specify as TRUE in the
#' \sQuote{...} argument to \code{\link{daemons}} or
#' \code{\link{launch_local}} to provide redirection of output to the host
#' process (applicable only for local daemons).
#' @param tls [default NULL] required for secure TLS connections over
#' 'tls+tcp://' or 'wss://'. \strong{Either} the character path to a file
#' containing X.509 certificate(s) in PEM format, comprising the certificate
#' authority certificate chain starting with the TLS certificate and ending
#' with the CA certificate, \strong{or} a length 2 character vector comprising
#' [i] the certificate authority certificate chain and [ii] the empty string
#' \code{''}.
#' @param rs [default NULL] the initial value of .Random.seed. This is set
#' automatically using L'Ecuyer-CMRG RNG streams generated by the host process
#' and should not be independently supplied.
#'
#' @return Invisible NULL.
#'
#' @section Persistence:
#'
#' The \sQuote{autoexit} argument governs persistence settings for the daemon.
#' The default TRUE ensures that it will exit cleanly once its socket connection
#' has ended.
#'
#' Instead of TRUE, supplying a signal from the \pkg{tools} package, such as
#' \code{tools::SIGINT}, or an equivalent integer value, sets this signal to be
#' raised when the socket connection ends. For instance, supplying SIGINT allows
#' a potentially more immediate exit by interrupting any ongoing evaluation
#' rather than letting it complete.
#'
#' Setting to FALSE allows the daemon to persist indefinitely even when there is
#' no longer a socket connection. This allows a host session to end and a new
#' session to connect at the URL where the daemon is dialled in. Daemons must be
#' terminated with \code{daemons(NULL)} in this case, which sends explicit exit
#' signals to all connected daemons.
#'
#' @export
#'
daemon <- function(url, dispatcher = FALSE, ..., asyncdial = FALSE, autoexit = TRUE,
cleanup = TRUE, output = FALSE, tls = NULL, rs = NULL) {

missing(dispatcher) && return(
v1_daemon(
url = url, asyncdial = asyncdial, autoexit = autoexit, cleanup = cleanup,
output = output, ..., tls = tls, rs = rs
)
)
cv <- cv()
sock <- socket(protocol = if (dispatcher) "poly" else "rep")
on.exit(reap(sock))
`[[<-`(., "sock", sock)
autoexit && pipe_notify(sock, cv = cv, remove = TRUE, flag = as.integer(autoexit))
if (length(tls)) tls <- tls_config(client = tls)
dial_and_sync_socket(sock, url, asyncdial = asyncdial, tls = tls)

if (is.numeric(rs)) `[[<-`(.GlobalEnv, ".Random.seed", as.integer(rs))
if (!output) {
devnull <- file(nullfile(), open = "w", blocking = FALSE)
sink(file = devnull)
sink(file = devnull, type = "message")
on.exit({
sink(type = "message")
sink()
close(devnull)
}, add = TRUE)
}
snapshot()

if (dispatcher)
repeat {
aio <- recv_aio(sock, mode = 1L, cv = cv)
wait(cv) || break
m <- collect_aio(aio)
is.object(m) && next
cancel <- recv_aio(sock, mode = 8L, cv = NA)
data <- eval_mirai(m)
stop_aio(cancel)
send(sock, data, mode = 1L, block = TRUE)
if (cleanup) do_cleanup()
} else
repeat {
ctx <- .context(sock)
aio <- recv_aio(ctx, mode = 1L, cv = cv)
wait(cv) || break
m <- collect_aio(aio)
data <- eval_mirai(m)
send(ctx, data, mode = 1L, block = TRUE)
if (cleanup) do_cleanup()
}

}

#' Daemon Instance (Legacy v1)
#'
#' Starts up an execution daemon to receive \code{\link{mirai}} requests. Awaits
#' data, evaluates an expression in an environment containing the supplied data,
#' and returns the value to the host caller. Daemon settings may be controlled
#' by \code{\link{daemons}} and this function should not need to be invoked
#' directly, unless deploying manually on remote resources.
#'
#' The network topology is such that daemons dial into the host or dispatcher,
#' which listens at the \sQuote{url} address. In this way, network resources may
#' be added or removed dynamically and the host or dispatcher automatically
#' distributes tasks to all available daemons.
#'
#' @param url the character host or dispatcher URL to dial into, including the
#' port to connect to (and optionally for websockets, a path), e.g.
#' 'tcp://hostname:5555' or 'ws://10.75.32.70:5555/path'.
#' @param asyncdial [default FALSE] whether to perform dials asynchronously. The
#' default FALSE will error if a connection is not immediately possible (for
#' instance if \code{\link{daemons}} has yet to be called on the host, or the
Expand Down Expand Up @@ -107,11 +232,11 @@
#' Caution: do not reset options but not loaded packages if packages set options
#' on load.
#'
#' @export
#' @noRd
#'
daemon <- function(url, asyncdial = FALSE, autoexit = TRUE, cleanup = TRUE,
output = FALSE, maxtasks = Inf, idletime = Inf, walltime = Inf,
timerstart = 0L, ..., tls = NULL, rs = NULL) {
v1_daemon <- function(url, asyncdial = FALSE, autoexit = TRUE, cleanup = TRUE,
output = FALSE, maxtasks = Inf, idletime = Inf, walltime = Inf,
timerstart = 0L, ..., tls = NULL, rs = NULL) {

cv <- cv()
sock <- socket(protocol = "rep")
Expand Down Expand Up @@ -193,17 +318,59 @@ daemon <- function(url, asyncdial = FALSE, autoexit = TRUE, cleanup = TRUE,

}

#' Stop Daemon
#'
#' Sends exit signals to reduce the number of connected daemons. Does not
#' interrupt any \sQuote{mirai} tasks in execution - exits will happen once
#' these complete (if applicable). This function is only effective when using
#' dispatcher.
#'
#' @param n [default 1L] integer number of daemons to stop.
#' @inheritParams mirai
#'
#' @return Integer number of exit signals sent (the smaller of \sQuote{n} and
#' the number of actually connected daemons).
#'
#' @examples
#' if (interactive()) {
#' # Only run examples in interactive R sessions
#'
#' daemons(4)
#' status()
#' stop_daemon(2)
#' mirai(TRUE)[]
#' status()
#' daemons(0)
#'
#' }
#'
#' @export
#'
stop_daemon <- function(n = 1L, .compute = "default") {
envir <- if (is.character(.compute)) ..[[.compute]]
length(envir[["msgid"]]) || return(0L)
d <- query_dispatcher(envir[["sock"]], c(0L, 0L))[1L]
stopped <- min(n, d)
for (i in seq_len(stopped))
query_dispatcher(envir[["sock"]], c(0L, -1L))
stopped
}

# internals --------------------------------------------------------------------

handle_mirai_error <- function(e) invokeRestart("mirai_error", e, sys.calls())

handle_mirai_interrupt <- function(e) invokeRestart("mirai_interrupt")

eval_mirai <- function(._mirai_.) {
list2env(._mirai_.[["._mirai_globals_."]], envir = .GlobalEnv)
withRestarts(
withCallingHandlers(
eval(._mirai_.[[".expr"]], envir = ._mirai_., enclos = .GlobalEnv),
{
on.exit(.interrupt(FALSE))
.interrupt()
list2env(._mirai_.[["._mirai_globals_."]], envir = .GlobalEnv)
eval(._mirai_.[[".expr"]], envir = ._mirai_., enclos = .GlobalEnv)
},
error = handle_mirai_error,
interrupt = handle_mirai_interrupt
),
Expand Down Expand Up @@ -232,5 +399,11 @@ perform_cleanup <- function(cleanup) {
if (cleanup[4L]) gc(verbose = FALSE)
}

do_cleanup <- function() {
rm(list = (vars <- names(.GlobalEnv))[!vars %in% .[["vars"]]], envir = .GlobalEnv)
lapply((new <- search())[!new %in% .[["se"]]], detach, character.only = TRUE)
options(.[["op"]])
}

register <- function(x) `opt<-`(.[["sock"]], "serial", x)
snapshot <- function() `[[<-`(`[[<-`(`[[<-`(., "op", .Options), "se", search()), "vars", names(.GlobalEnv))
Loading
Loading