From c31786d9a5f46d33131774e4001ead54472faa21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Cs=C3=A1rdi?= Date: Sat, 8 Jun 2019 10:15:19 +0100 Subject: [PATCH] Add poc mmap data sharing Unix only. --- DESCRIPTION | 2 ++ R/check.R | 5 ++++- R/eval-bg.R | 4 +++- R/eval.R | 11 ++++++++++- R/options.R | 3 ++- R/run.R | 1 + R/script.R | 18 +++++------------- R/setup.R | 14 ++++++++++++++ inst/client.R | 10 ++++++++++ man/r.Rd | 18 ++++++++++++++++-- man/r_bg.Rd | 14 +++++++++++++- tests/testthat/test-error.R | 4 ++-- tests/testthat/test-r-session.R | 2 +- 13 files changed, 83 insertions(+), 23 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index 5c3af00e..10676559 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -34,6 +34,8 @@ Suggests: testthat, tibble, withr +Remotes: + r-lib/processx@feature/mmap Encoding: UTF-8 VignetteBuilder: knitr Language: en-US diff --git a/R/check.R b/R/check.R index f26c5185..21bd9fab 100644 --- a/R/check.R +++ b/R/check.R @@ -43,6 +43,8 @@ convert_and_check_my_args <- function(options) { if (no("echo")) echo <- FALSE if (no("fail_on_status")) fail_on_status <- FALSE if (no("tmp_files")) tmp_files <- character() + if (has("transfer")) transfer <- transfer[1] + if (no("connections")) connections <- list() }) ## Checks @@ -65,7 +67,8 @@ convert_and_check_my_args <- function(options) { is.character(env), no("timeout") || (length(timeout) == 1 && !is.na(timeout)), no("wd") || is_string(wd), - no("fail_on_status") || is_flag(fail_on_status) + no("fail_on_status") || is_flag(fail_on_status), + no("transfer") || is_string(transfer) )) options diff --git a/R/eval-bg.R b/R/eval-bg.R index 09b40d82..03231b03 100644 --- a/R/eval-bg.R +++ b/R/eval-bg.R @@ -32,7 +32,9 @@ r_bg <- function(func, args = list(), libpath = .libPaths(), error = getOption("callr.error", "error"), cmdargs = c("--slave", "--no-save", "--no-restore"), system_profile = FALSE, user_profile = FALSE, - env = rcmd_safe_env(), supervise = FALSE, ...) { + env = rcmd_safe_env(), supervise = FALSE, + transfer = getOption("callr.transfer", + c("default", "copy", "mmap")[1]), ...) { options <- as.list(environment()) options$extra <- list(...) diff --git a/R/eval.R b/R/eval.R index a2b0efce..b94de74f 100644 --- a/R/eval.R +++ b/R/eval.R @@ -83,6 +83,13 @@ #' If the process does not finish before the timeout period expires, #' then a `system_command_timeout_error` error is thrown. `Inf` #' means no timeout. +#' @param transfer How to transfer the data to the subprocess: +#' * `"default"`: use the default method, this is currently `"copy"`, but +#' it might change in the future. +#' * `"copy"`: serialize and copy via a temporary file, +#' * `"mmap"`: use shared memory as much as possible. This is currently +#' only supported on Unix systems. +#' Defaults to the value of the `callr.transfer` option. #' @param ... Extra arguments are passed to [processx::run()]. #' @return Value of the evaluated expression. #' @@ -129,7 +136,9 @@ r <- function(func, args = list(), libpath = .libPaths(), show = FALSE, callback = NULL, block_callback = NULL, spinner = show && interactive(), system_profile = FALSE, user_profile = FALSE, - env = rcmd_safe_env(), timeout = Inf, ...) { + env = rcmd_safe_env(), timeout = Inf, + transfer = getOption("callr.transfer", + c("default", "copy", "mmap")[1]), ...) { ## This contains the context that we set up in steps options <- convert_and_check_my_args(as.list(environment())) diff --git a/R/options.R b/R/options.R index 2efc42d3..4c9eb1fd 100644 --- a/R/options.R +++ b/R/options.R @@ -76,7 +76,8 @@ r_process_options_default <- function() { env = character(), supervise = FALSE, load_hook = default_load_hook(), - extra = list() + extra = list(), + transfer = "copy" ) } diff --git a/R/run.R b/R/run.R index f3e9308e..d38d0b81 100644 --- a/R/run.R +++ b/R/run.R @@ -25,6 +25,7 @@ run_r <- function(options) { stdout_callback = real_block_callback, stderr_callback = real_block_callback, stderr_to_stdout = stderr_to_stdout, + connections = as.list(connections), echo_cmd = echo, echo = show, spinner = spinner, error_on_status = fail_on_status, timeout = timeout), extra) diff --git a/R/script.R b/R/script.R index 5904d376..deb38f41 100644 --- a/R/script.R +++ b/R/script.R @@ -17,16 +17,15 @@ make_vanilla_script_expr <- function(expr_file, res, error, rm("__callr_dump__", envir = .GlobalEnv) # To find the frame of the evaluated function, we search for - # do.call in the stack, and then skip one more frame, the other - # do.call. This method only must change if the eval code changes, - # obviously. Also, it might fail if the pre-hook has do.call() at - # the top level. + # do.call in the stack. This method only must change if the eval + # code changes, obviously. Also, it might fail if the pre-hook has + # do.call() at the top level. calls <- sys.calls() dcframe <- which(vapply( calls, function(x) length(x) >= 1 && identical(x[[1]], quote(do.call)), logical(1)))[1] - if (!is.na(dcframe)) e$`_ignore` <- list(c(1, dcframe + 1L)) + if (!is.na(dcframe)) e$`_ignore` <- list(c(1, dcframe)) e$`_pid` <- Sys.getpid() e$`_timestamp` <- Sys.time() e <- err$add_trace_back(e) @@ -82,14 +81,7 @@ make_vanilla_script_expr <- function(expr_file, res, error, withCallingHandlers( { `__pre_hook__` - saveRDS( - do.call( - do.call, - c(readRDS(`__expr_file__`), list(envir = .GlobalEnv)), - envir = .GlobalEnv - ), - file = `__res__` - ) + as.environment("tools:callr")$`__callr_data__`$run(`__expr_file__`, `__res__`) flush(stdout()) flush(stderr()) `__post_hook__` diff --git a/R/setup.R b/R/setup.R index 72b1c83a..a7f88c20 100644 --- a/R/setup.R +++ b/R/setup.R @@ -1,5 +1,14 @@ +setup_mmap <- function(options) { + map <- processx:::conn_create_mmap(options$args) + options$connections <- list(map) + fd <- if (options$poll_connection) 3L else 3L # TODO??? + options$args <- c(fd, attr(map, "size")) + options +} + setup_script_files <- function(options) { + if (options$transfer == "mmap") options <- setup_mmap(options) within(options, { func_file <- save_function_to_temp(options) result_file <- tempfile() @@ -87,6 +96,11 @@ make_profiles <- function(system, user, repos, libpath, load_hook) { append = TRUE) } + cat( + default_load_hook(), + file = profile_user, + append = TRUE) + if (!is.null(load_hook)) { cat(load_hook, sep = "", file = profile_user, append = TRUE) } diff --git a/inst/client.R b/inst/client.R index d89eb598..e444d642 100644 --- a/inst/client.R +++ b/inst/client.R @@ -676,3 +676,13 @@ load_client_lib <- function(sofile = NULL) { on.exit(NULL) env } + +run <- function(expr_file, res_file) { + task <- readRDS(expr_file) + data <- task[[2]] + if (!is.list(task[[2]])) { + data <- processx:::conn_unpack_mmap(data[[1]]) + } + res <- do.call(task[[1]], data, envir = .GlobalEnv) + saveRDS(res, file = res_file) +} diff --git a/man/r.Rd b/man/r.Rd index 785c45b6..4e83e478 100644 --- a/man/r.Rd +++ b/man/r.Rd @@ -11,7 +11,9 @@ r(func, args = list(), libpath = .libPaths(), cmdargs = c("--slave", "--no-save", "--no-restore"), show = FALSE, callback = NULL, block_callback = NULL, spinner = show && interactive(), system_profile = FALSE, user_profile = FALSE, - env = rcmd_safe_env(), timeout = Inf, ...) + env = rcmd_safe_env(), timeout = Inf, + transfer = getOption("callr.transfer", c("default", "copy", + "mmap")[1]), ...) r_safe(func, args = list(), libpath = .libPaths(), repos = default_repos(), stdout = NULL, stderr = NULL, @@ -19,7 +21,9 @@ r_safe(func, args = list(), libpath = .libPaths(), cmdargs = c("--slave", "--no-save", "--no-restore"), show = FALSE, callback = NULL, block_callback = NULL, spinner = show && interactive(), system_profile = FALSE, user_profile = FALSE, - env = rcmd_safe_env(), timeout = Inf, ...) + env = rcmd_safe_env(), timeout = Inf, + transfer = getOption("callr.transfer", c("default", "copy", + "mmap")[1]), ...) } \arguments{ \item{func}{Function object to call in the new R process. @@ -105,6 +109,16 @@ If the process does not finish before the timeout period expires, then a \code{system_command_timeout_error} error is thrown. \code{Inf} means no timeout.} +\item{transfer}{How to transfer the data to the subprocess: +\itemize{ +\item \code{"default"}: use the default method, this is currently \code{"copy"}, but +it might change in the future. +\item \code{"copy"}: serialize and copy via a temporary file, +\item \code{"mmap"}: use shared memory as much as possible. This is currently +only supported on Unix systems. +Defaults to the value of the \code{callr.transfer} option. +}} + \item{...}{Extra arguments are passed to \code{\link[processx:run]{processx::run()}}.} } \value{ diff --git a/man/r_bg.Rd b/man/r_bg.Rd index 6d006f8e..882bd294 100644 --- a/man/r_bg.Rd +++ b/man/r_bg.Rd @@ -9,7 +9,9 @@ r_bg(func, args = list(), libpath = .libPaths(), poll_connection = TRUE, error = getOption("callr.error", "error"), cmdargs = c("--slave", "--no-save", "--no-restore"), system_profile = FALSE, user_profile = FALSE, - env = rcmd_safe_env(), supervise = FALSE, ...) + env = rcmd_safe_env(), supervise = FALSE, + transfer = getOption("callr.transfer", c("default", "copy", + "mmap")[1]), ...) } \arguments{ \item{func}{Function object to call in the new R process. @@ -74,6 +76,16 @@ supplied function and some error handling code.} the supervisor will ensure that the process is killed when the R process exits.} +\item{transfer}{How to transfer the data to the subprocess: +\itemize{ +\item \code{"default"}: use the default method, this is currently \code{"copy"}, but +it might change in the future. +\item \code{"copy"}: serialize and copy via a temporary file, +\item \code{"mmap"}: use shared memory as much as possible. This is currently +only supported on Unix systems. +Defaults to the value of the \code{callr.transfer} option. +}} + \item{...}{Extra arguments are passed to the \link[processx:process]{processx::process} constructor.} } diff --git a/tests/testthat/test-error.R b/tests/testthat/test-error.R index 0600abe7..1b13db7a 100644 --- a/tests/testthat/test-error.R +++ b/tests/testthat/test-error.R @@ -36,7 +36,7 @@ test_that("error stack is passed", { expect_true("call" %in% names(err)) expect_true(inherits(err, "error")) expect_true(inherits(err, "callr_error")) - expect_equal(length(err$stack), 3) + expect_equal(length(err$stack), 2) gc() }) @@ -67,7 +67,7 @@ test_that("error behavior can be set using option", { expect_true("call" %in% names(err)) expect_true(inherits(err, "error")) expect_true(inherits(err, "callr_error")) - expect_equal(length(err$stack), 3) + expect_equal(length(err$stack), 2) gc() }) diff --git a/tests/testthat/test-r-session.R b/tests/testthat/test-r-session.R index 886bf843..a2586d47 100644 --- a/tests/testthat/test-r-session.R +++ b/tests/testthat/test-r-session.R @@ -265,5 +265,5 @@ test_that("traceback", { expect_error(rs$run(do), "oops") expect_output(tb <- rs$traceback(), "1: f() at ", fixed = TRUE) - if (getRversion() >= "3.3.0") expect_equal(c(tb[[4]]), "f()") + if (getRversion() >= "3.3.0") expect_equal(c(tb[[3]]), "f()") })