Skip to content

Commit

Permalink
Add poc mmap data sharing
Browse files Browse the repository at this point in the history
Unix only.
  • Loading branch information
gaborcsardi committed Jun 28, 2019
1 parent 0ba8d30 commit c540791
Show file tree
Hide file tree
Showing 14 changed files with 89 additions and 26 deletions.
2 changes: 2 additions & 0 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ Suggests:
testthat,
tibble,
withr
Remotes:
r-lib/processx@feature/mmap
Encoding: UTF-8
VignetteBuilder: knitr
Language: en-US
5 changes: 4 additions & 1 deletion R/check.R
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ convert_and_check_my_args <- function(options) {
if (no("echo")) echo <- FALSE
if (no("fail_on_status")) fail_on_status <- FALSE
if (no("tmp_files")) tmp_files <- character()
if (has("transfer")) transfer <- transfer[1]
if (no("connections")) connections <- list()
})

## Checks
Expand All @@ -65,7 +67,8 @@ convert_and_check_my_args <- function(options) {
is.character(env),
no("timeout") || (length(timeout) == 1 && !is.na(timeout)),
no("wd") || is_string(wd),
no("fail_on_status") || is_flag(fail_on_status)
no("fail_on_status") || is_flag(fail_on_status),
no("transfer") || is_string(transfer)
))

options
Expand Down
4 changes: 3 additions & 1 deletion R/eval-bg.R
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ r_bg <- function(func, args = list(), libpath = .libPaths(),
error = getOption("callr.error", "error"),
cmdargs = c("--slave", "--no-save", "--no-restore"),
system_profile = FALSE, user_profile = FALSE,
env = rcmd_safe_env(), supervise = FALSE, ...) {
env = rcmd_safe_env(), supervise = FALSE,
transfer = getOption("callr.transfer",
c("default", "copy", "mmap")[1]), ...) {

options <- as.list(environment())
options$extra <- list(...)
Expand Down
11 changes: 10 additions & 1 deletion R/eval.R
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,13 @@
#' If the process does not finish before the timeout period expires,
#' then a `system_command_timeout_error` error is thrown. `Inf`
#' means no timeout.
#' @param transfer How to transfer the data to the subprocess:
#' * `"default"`: use the default method, this is currently `"copy"`, but
#' it might change in the future.
#' * `"copy"`: serialize and copy via a temporary file,
#' * `"mmap"`: use shared memory as much as possible. This is currently
#' only supported on Unix systems.
#' Defaults to the value of the `callr.transfer` option.
#' @param ... Extra arguments are passed to [processx::run()].
#' @return Value of the evaluated expression.
#'
Expand Down Expand Up @@ -129,7 +136,9 @@ r <- function(func, args = list(), libpath = .libPaths(),
show = FALSE, callback = NULL,
block_callback = NULL, spinner = show && interactive(),
system_profile = FALSE, user_profile = FALSE,
env = rcmd_safe_env(), timeout = Inf, ...) {
env = rcmd_safe_env(), timeout = Inf,
transfer = getOption("callr.transfer",
c("default", "copy", "mmap")[1]), ...) {

## This contains the context that we set up in steps
options <- convert_and_check_my_args(as.list(environment()))
Expand Down
3 changes: 2 additions & 1 deletion R/options.R
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ r_process_options_default <- function() {
env = character(),
supervise = FALSE,
load_hook = default_load_hook(),
extra = list()
extra = list(),
transfer = "copy"
)
}

Expand Down
13 changes: 13 additions & 0 deletions R/package.R
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,23 @@ dummy_r6 <- function() R6::R6Class

## We save this as an RDS, so it can be loaded quickly
env <- new.env(parent = emptyenv())

env$`__callr_data__` <- new.env(parent = emptyenv())
env$`__callr_data__`$err <- err

env$`__callr_data__`$run <- function(expr_file, res_file) {
task <- readRDS(expr_file)
data <- task[[2]]
if (!is.list(task[[2]])) {
data <- processx:::conn_unpack_mmap(data[[1]])
}
res <- do.call(task[[1]], data, envir = .GlobalEnv)
saveRDS(res, file = res_file)
}

saveRDS(
env,
file = file.path(system.file(package = "callr"), "env.rds"),
version = 2)

rm(env)
2 changes: 1 addition & 1 deletion R/r-session.R
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ rs_poll_process <- function(self, private, timeout) {
}

rs_traceback <- function(self, private) {
traceback(utils::head(self$run(function() traceback()), -12))
traceback(utils::head(self$run(function() traceback()), -11))
}

rs_attach <- function(self, private) {
Expand Down
1 change: 1 addition & 0 deletions R/run.R
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ run_r <- function(options) {
stdout_callback = real_block_callback,
stderr_callback = real_block_callback,
stderr_to_stdout = stderr_to_stdout,
connections = as.list(connections),
echo_cmd = echo, echo = show, spinner = spinner,
error_on_status = fail_on_status, timeout = timeout),
extra)
Expand Down
22 changes: 7 additions & 15 deletions R/script.R
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,19 @@ make_vanilla_script_expr <- function(expr_file, res, error,
## This will inserted into the main script
err <- if (error == "error") {
substitute({
# TODO: get rid of magic number 9
capture.output(assign(".Traceback", traceback(9), envir = baseenv()))
# TODO: get rid of magic number 10
capture.output(assign(".Traceback", traceback(10), envir = baseenv()))

# To find the frame of the evaluated function, we search for
# do.call in the stack, and then skip one more frame, the other
# do.call. This method only must change if the eval code changes,
# obviously. Also, it might fail if the pre-hook has do.call() at
# the top level.
# do.call in the stack. This method only must change if the eval
# code changes, obviously. Also, it might fail if the pre-hook has
# do.call() at the top level.
calls <- sys.calls()
dcframe <- which(vapply(
calls,
function(x) length(x) >= 1 && identical(x[[1]], quote(do.call)),
logical(1)))[1]
if (!is.na(dcframe)) e$`_ignore` <- list(c(1, dcframe + 1L))
if (!is.na(dcframe)) e$`_ignore` <- list(c(1, dcframe))
e$`_pid` <- Sys.getpid()
e$`_timestamp` <- Sys.time()
err <- as.environment("tools:callr")$`__callr_data__`$err
Expand Down Expand Up @@ -80,14 +79,7 @@ make_vanilla_script_expr <- function(expr_file, res, error,
withCallingHandlers(
{
`__pre_hook__`
saveRDS(
do.call(
do.call,
c(readRDS(`__expr_file__`), list(envir = .GlobalEnv)),
envir = .GlobalEnv
),
file = `__res__`
)
as.environment("tools:callr")$`__callr_data__`$run(`__expr_file__`, `__res__`)
flush(stdout())
flush(stderr())
`__post_hook__`
Expand Down
14 changes: 14 additions & 0 deletions R/setup.R
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@

setup_mmap <- function(options) {
map <- processx:::conn_create_mmap(options$args)
options$connections <- list(map)
fd <- if (options$poll_connection) 3L else 3L # TODO???
options$args <- c(fd, attr(map, "size"))
options
}

setup_script_files <- function(options) {
if (options$transfer == "mmap") options <- setup_mmap(options)
within(options, {
func_file <- save_function_to_temp(options)
result_file <- tempfile()
Expand Down Expand Up @@ -87,6 +96,11 @@ make_profiles <- function(system, user, repos, libpath, load_hook) {
append = TRUE)
}

cat(
default_load_hook(),
file = profile_user,
append = TRUE)

if (!is.null(load_hook)) {
cat(load_hook, sep = "", file = profile_user, append = TRUE)
}
Expand Down
18 changes: 16 additions & 2 deletions man/r.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 13 additions & 1 deletion man/r_bg.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions tests/testthat/test-error.R
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ test_that("error stack is passed", {
expect_true("call" %in% names(err))
expect_true(inherits(err, "error"))
expect_true(inherits(err, "callr_error"))
expect_equal(length(err$stack), 3)
expect_equal(length(err$stack), 2)
gc()
})

Expand Down Expand Up @@ -67,7 +67,7 @@ test_that("error behavior can be set using option", {
expect_true("call" %in% names(err))
expect_true(inherits(err, "error"))
expect_true(inherits(err, "callr_error"))
expect_equal(length(err$stack), 3)
expect_equal(length(err$stack), 2)
gc()
})

Expand Down
2 changes: 1 addition & 1 deletion tests/testthat/test-r-session.R
Original file line number Diff line number Diff line change
Expand Up @@ -257,5 +257,5 @@ test_that("traceback", {

expect_error(rs$run(do), "oops")
expect_output(tb <- rs$traceback(), "1: f() at ", fixed = TRUE)
if (getRversion() >= "3.3.0") expect_equal(c(tb[[4]]), "f()")
if (getRversion() >= "3.3.0") expect_equal(c(tb[[3]]), "f()")
})

0 comments on commit c540791

Please sign in to comment.