Skip to content

Commit

Permalink
Add poc mmap data sharing
Browse files Browse the repository at this point in the history
Unix only.
  • Loading branch information
gaborcsardi committed Jul 15, 2019
1 parent e8b86fe commit c31786d
Show file tree
Hide file tree
Showing 13 changed files with 83 additions and 23 deletions.
2 changes: 2 additions & 0 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ Suggests:
testthat,
tibble,
withr
Remotes:
r-lib/processx@feature/mmap
Encoding: UTF-8
VignetteBuilder: knitr
Language: en-US
5 changes: 4 additions & 1 deletion R/check.R
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ convert_and_check_my_args <- function(options) {
if (no("echo")) echo <- FALSE
if (no("fail_on_status")) fail_on_status <- FALSE
if (no("tmp_files")) tmp_files <- character()
if (has("transfer")) transfer <- transfer[1]
if (no("connections")) connections <- list()
})

## Checks
Expand All @@ -65,7 +67,8 @@ convert_and_check_my_args <- function(options) {
is.character(env),
no("timeout") || (length(timeout) == 1 && !is.na(timeout)),
no("wd") || is_string(wd),
no("fail_on_status") || is_flag(fail_on_status)
no("fail_on_status") || is_flag(fail_on_status),
no("transfer") || is_string(transfer)
))

options
Expand Down
4 changes: 3 additions & 1 deletion R/eval-bg.R
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ r_bg <- function(func, args = list(), libpath = .libPaths(),
error = getOption("callr.error", "error"),
cmdargs = c("--slave", "--no-save", "--no-restore"),
system_profile = FALSE, user_profile = FALSE,
env = rcmd_safe_env(), supervise = FALSE, ...) {
env = rcmd_safe_env(), supervise = FALSE,
transfer = getOption("callr.transfer",
c("default", "copy", "mmap")[1]), ...) {

options <- as.list(environment())
options$extra <- list(...)
Expand Down
11 changes: 10 additions & 1 deletion R/eval.R
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,13 @@
#' If the process does not finish before the timeout period expires,
#' then a `system_command_timeout_error` error is thrown. `Inf`
#' means no timeout.
#' @param transfer How to transfer the data to the subprocess:
#' * `"default"`: use the default method, this is currently `"copy"`, but
#' it might change in the future.
#' * `"copy"`: serialize and copy via a temporary file,
#' * `"mmap"`: use shared memory as much as possible. This is currently
#' only supported on Unix systems.
#' Defaults to the value of the `callr.transfer` option.
#' @param ... Extra arguments are passed to [processx::run()].
#' @return Value of the evaluated expression.
#'
Expand Down Expand Up @@ -129,7 +136,9 @@ r <- function(func, args = list(), libpath = .libPaths(),
show = FALSE, callback = NULL,
block_callback = NULL, spinner = show && interactive(),
system_profile = FALSE, user_profile = FALSE,
env = rcmd_safe_env(), timeout = Inf, ...) {
env = rcmd_safe_env(), timeout = Inf,
transfer = getOption("callr.transfer",
c("default", "copy", "mmap")[1]), ...) {

## This contains the context that we set up in steps
options <- convert_and_check_my_args(as.list(environment()))
Expand Down
3 changes: 2 additions & 1 deletion R/options.R
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ r_process_options_default <- function() {
env = character(),
supervise = FALSE,
load_hook = default_load_hook(),
extra = list()
extra = list(),
transfer = "copy"
)
}

Expand Down
1 change: 1 addition & 0 deletions R/run.R
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ run_r <- function(options) {
stdout_callback = real_block_callback,
stderr_callback = real_block_callback,
stderr_to_stdout = stderr_to_stdout,
connections = as.list(connections),
echo_cmd = echo, echo = show, spinner = spinner,
error_on_status = fail_on_status, timeout = timeout),
extra)
Expand Down
18 changes: 5 additions & 13 deletions R/script.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,15 @@ make_vanilla_script_expr <- function(expr_file, res, error,
rm("__callr_dump__", envir = .GlobalEnv)

# To find the frame of the evaluated function, we search for
# do.call in the stack, and then skip one more frame, the other
# do.call. This method only must change if the eval code changes,
# obviously. Also, it might fail if the pre-hook has do.call() at
# the top level.
# do.call in the stack. This method only must change if the eval
# code changes, obviously. Also, it might fail if the pre-hook has
# do.call() at the top level.
calls <- sys.calls()
dcframe <- which(vapply(
calls,
function(x) length(x) >= 1 && identical(x[[1]], quote(do.call)),
logical(1)))[1]
if (!is.na(dcframe)) e$`_ignore` <- list(c(1, dcframe + 1L))
if (!is.na(dcframe)) e$`_ignore` <- list(c(1, dcframe))
e$`_pid` <- Sys.getpid()
e$`_timestamp` <- Sys.time()
e <- err$add_trace_back(e)
Expand Down Expand Up @@ -82,14 +81,7 @@ make_vanilla_script_expr <- function(expr_file, res, error,
withCallingHandlers(
{
`__pre_hook__`
saveRDS(
do.call(
do.call,
c(readRDS(`__expr_file__`), list(envir = .GlobalEnv)),
envir = .GlobalEnv
),
file = `__res__`
)
as.environment("tools:callr")$`__callr_data__`$run(`__expr_file__`, `__res__`)
flush(stdout())
flush(stderr())
`__post_hook__`
Expand Down
14 changes: 14 additions & 0 deletions R/setup.R
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@

setup_mmap <- function(options) {
map <- processx:::conn_create_mmap(options$args)
options$connections <- list(map)
fd <- if (options$poll_connection) 3L else 3L # TODO???
options$args <- c(fd, attr(map, "size"))
options
}

setup_script_files <- function(options) {
if (options$transfer == "mmap") options <- setup_mmap(options)
within(options, {
func_file <- save_function_to_temp(options)
result_file <- tempfile()
Expand Down Expand Up @@ -87,6 +96,11 @@ make_profiles <- function(system, user, repos, libpath, load_hook) {
append = TRUE)
}

cat(
default_load_hook(),
file = profile_user,
append = TRUE)

if (!is.null(load_hook)) {
cat(load_hook, sep = "", file = profile_user, append = TRUE)
}
Expand Down
10 changes: 10 additions & 0 deletions inst/client.R
Original file line number Diff line number Diff line change
Expand Up @@ -676,3 +676,13 @@ load_client_lib <- function(sofile = NULL) {
on.exit(NULL)
env
}

run <- function(expr_file, res_file) {
task <- readRDS(expr_file)
data <- task[[2]]
if (!is.list(task[[2]])) {
data <- processx:::conn_unpack_mmap(data[[1]])
}
res <- do.call(task[[1]], data, envir = .GlobalEnv)
saveRDS(res, file = res_file)
}
18 changes: 16 additions & 2 deletions man/r.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 13 additions & 1 deletion man/r_bg.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions tests/testthat/test-error.R
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ test_that("error stack is passed", {
expect_true("call" %in% names(err))
expect_true(inherits(err, "error"))
expect_true(inherits(err, "callr_error"))
expect_equal(length(err$stack), 3)
expect_equal(length(err$stack), 2)
gc()
})

Expand Down Expand Up @@ -67,7 +67,7 @@ test_that("error behavior can be set using option", {
expect_true("call" %in% names(err))
expect_true(inherits(err, "error"))
expect_true(inherits(err, "callr_error"))
expect_equal(length(err$stack), 3)
expect_equal(length(err$stack), 2)
gc()
})

Expand Down
2 changes: 1 addition & 1 deletion tests/testthat/test-r-session.R
Original file line number Diff line number Diff line change
Expand Up @@ -265,5 +265,5 @@ test_that("traceback", {

expect_error(rs$run(do), "oops")
expect_output(tb <- rs$traceback(), "1: f() at ", fixed = TRUE)
if (getRversion() >= "3.3.0") expect_equal(c(tb[[4]]), "f()")
if (getRversion() >= "3.3.0") expect_equal(c(tb[[3]]), "f()")
})

0 comments on commit c31786d

Please sign in to comment.