Skip to content

Commit

Permalink
RThreadSafeObject -> RThreadSafe<T>; Remove Sync / Send from …
Browse files Browse the repository at this point in the history
…`Binding` (#114)

* `RThreadSafeObject` -> `RThreadSafe<T>`

* Remove `Sync` and `Send` from `Binding`

* Clarify this is a more generic shelter

* Check for the main R thread in `new()` too
  • Loading branch information
DavisVaughan authored Oct 18, 2023
1 parent 6db6029 commit 07a94b8
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 75 deletions.
4 changes: 2 additions & 2 deletions crates/ark/src/data_viewer/r_data_viewer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use crate::data_viewer::message::DataViewerRowRequest;
use crate::data_viewer::message::DataViewerRowResponse;
use crate::interface::R_MAIN;
use crate::r_task;
use crate::thread::RThreadSafeObject;
use crate::thread::RThreadSafe;

pub struct RDataViewer {
title: String,
Expand Down Expand Up @@ -271,7 +271,7 @@ impl RDataViewer {

// To be able to `Send` the `data` to the thread to be owned by the data
// viewer, it needs to be made thread safe
let data = RThreadSafeObject::new(data);
let data = RThreadSafe::new(data);

spawn!(format!("ark-data-viewer-{}-{}", title, id), move || {
let title_dataset = title.clone();
Expand Down
53 changes: 31 additions & 22 deletions crates/ark/src/environment/r_environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::environment::message::EnvironmentMessageView;
use crate::environment::variable::EnvironmentVariable;
use crate::lsp::events::EVENTS;
use crate::r_task;
use crate::thread::RThreadSafeObject;
use crate::thread::RThreadSafe;

/**
* The R Environment handler provides the server side of Positron's Environment
Expand All @@ -50,8 +50,15 @@ use crate::thread::RThreadSafeObject;
pub struct REnvironment {
comm: CommSocket,
comm_manager_tx: Sender<CommEvent>,
pub env: RThreadSafeObject,
current_bindings: Vec<Binding>,
pub env: RThreadSafe<RObject>,
/// `Binding` does not currently protect anything, and therefore doesn't
/// implement `Drop`, which might use the R API. It assumes that R SYMSXPs
/// protect themselves, and that the binding value is protected by the
/// `env`. This seems to work fine, so technically we don't need
/// `RThreadSafe` to ensure that the `drop()` runs on the R main thread.
/// However, we do need to `Send` the underlying `SEXP` values between
/// threads, so we still use `RThreadSafe` for that.
current_bindings: RThreadSafe<Vec<Binding>>,
version: u64,
}

Expand All @@ -72,17 +79,20 @@ impl REnvironment {
}

// To be able to `Send` the `env` to the thread, it needs to be made
// thread safe
let env = RThreadSafeObject::new(env);
// thread safe. To create `current_bindings`, we need to be on the main
// R thread.
let env = RThreadSafe::new(env);
let current_bindings = RThreadSafe::new(vec![]);

// Start the execution thread and wait for requests from the front end
spawn!("ark-environment", move || {
// When `env` is dropped, a `r_async_task()` call unprotects it
// When `env` and `current_bindings` are dropped, a `r_async_task()`
// call unprotects them
let environment = Self {
comm,
comm_manager_tx,
env,
current_bindings: vec![],
current_bindings,
version: 0,
};
environment.execution_thread();
Expand Down Expand Up @@ -209,7 +219,8 @@ impl REnvironment {
}
}

fn update_bindings(&mut self, new_bindings: Vec<Binding>) -> u64 {
fn update_bindings(&mut self, new_bindings: RThreadSafe<Vec<Binding>>) -> u64 {
// Updating will `drop()` the old `current_bindings` on the main R thread
self.current_bindings = new_bindings;
self.version = self.version + 1;

Expand All @@ -227,7 +238,9 @@ impl REnvironment {
r_task(|| {
self.update_bindings(self.bindings());

for binding in &self.current_bindings {
let current_bindings = self.current_bindings.get();

for binding in current_bindings {
variables.push(EnvironmentVariable::new(binding));
}
});
Expand Down Expand Up @@ -395,16 +408,13 @@ impl REnvironment {
let mut assigned: Vec<EnvironmentVariable> = vec![];
let mut removed: Vec<String> = vec![];

let old_bindings = &self.current_bindings;
let mut new_bindings = vec![];

r_task(|| {
new_bindings = self.bindings();
let new_bindings = self.bindings();

let mut old_iter = old_bindings.iter();
let mut old_iter = self.current_bindings.get().iter();
let mut old_next = old_iter.next();

let mut new_iter = new_bindings.iter();
let mut new_iter = new_bindings.get().iter();
let mut new_next = new_iter.next();

loop {
Expand Down Expand Up @@ -460,16 +470,15 @@ impl REnvironment {
},
}
}
});

if assigned.len() > 0 || removed.len() > 0 || request_id.is_some() {
// only update the bindings (and the version)
// if anything changed
// Only update the bindings (and the version) if anything changed
if assigned.len() > 0 || removed.len() > 0 {
self.update_bindings(new_bindings);
}
});

// but the message might be sent anyway if this comes from a request
if assigned.len() > 0 || removed.len() > 0 || request_id.is_some() {
// Send the message if anything changed or if this came from a request
let message = EnvironmentMessage::Update(EnvironmentMessageUpdate {
assigned,
removed,
Expand All @@ -479,14 +488,14 @@ impl REnvironment {
}
}

fn bindings(&self) -> Vec<Binding> {
fn bindings(&self) -> RThreadSafe<Vec<Binding>> {
// SAFETY: Should be called in an `r_task()`
let env = self.env.get().clone();
let env = Environment::new(env);
let mut bindings: Vec<Binding> =
env.iter().filter(|binding| !binding.is_hidden()).collect();

bindings.sort_by(|a, b| a.name.cmp(&b.name));
let bindings = RThreadSafe::new(bindings);
bindings
}
}
99 changes: 58 additions & 41 deletions crates/ark/src/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,88 +5,105 @@
//
//

use harp::object::RObject;
use harp::test::R_TASK_BYPASS;

use crate::r_task::r_async_task;
use crate::shell::R_MAIN_THREAD_NAME;

/// Private "shelter" around an `RObject` that makes it `Send`able
/// Private "shelter" around a Rust object (typically wrapping a `SEXP`, like
/// an `RObject`) that makes it `Send`able
///
/// Shelters can only be created by `RThreadSafeObject`, and the lifetime
/// management of the `RThreadSafeObject` ensures that the shelter (and the
/// underlying `RObject`) is only dropped on the main R thread (since this uses
/// Shelters can only be created by `RThreadSafe`, and the lifetime
/// management of the `RThreadSafe` ensures that the shelter (and the
/// underlying R object) is only dropped on the main R thread (since this uses
/// the R API to unprotect).
///
/// As the `RThreadSafeObject` is dropped, the `RObjectShelter` is _moved_ to
/// As the `RThreadSafe` object is dropped, the `RShelter` is _moved_ to
/// the main R thread and dropped there.
struct RObjectShelter {
object: RObject,
///
/// `T` must have a static lifetime, which seems to enforce that `T` "lives
/// however long we need it to", i.e. it prevents `T` from being a reference
/// to some other object that could get dropped out from under us. I think this
/// effectively means that `RShelter` gets to own the type that it shelters.
/// Without this, we can't move the `RShelter` to the main R thread, since `T`
/// "might not live long enough" according to the compiler.
struct RShelter<T: 'static> {
object: T,
}

unsafe impl Sync for RObjectShelter {}
unsafe impl Send for RObjectShelter {}
unsafe impl<T> Sync for RShelter<T> {}
unsafe impl<T> Send for RShelter<T> {}

/// Thread safe wrapper around an `RObject`
/// Thread safe wrapper around a Rust object (typically wrapping a `SEXP`)
///
/// Create one with `new()`, pass it between threads, and access the underlying
/// R object with `get()` once you reach another context that will run on the
/// main R thread. If `get()` is called off the main R thread, it will log an
/// error in release mode and panic in development mode.
/// object with `get()` once you reach another context that will run on the
/// main R thread.
///
/// Both `new()` and `get()` must be called on the main R thread. This ensures
/// that R thread-safe objects can only be created on and unwrapped from the
/// R thread. If either of these are called off the main R thread, they will
/// log an error in release mode and panic in development mode.
///
/// When this object is dropped, it `take()`s the `RObjectShelter` out of the
/// When this object is dropped, it `take()`s the `RShelter` out of the
/// `shelter` and `move`s it to the main R thread through an async task to be
/// able to `drop()` it on the main R thread.
///
/// Purposefully does not implement `Clone`, as we want the thread safe objects
/// to be moved across threads without running any R code.
pub struct RThreadSafeObject {
shelter: Option<RObjectShelter>,
pub struct RThreadSafe<T: 'static> {
shelter: Option<RShelter<T>>,
}

impl RThreadSafeObject {
pub fn new(object: RObject) -> Self {
let shelter = RObjectShelter { object };
impl<T> RThreadSafe<T> {
pub fn new(object: T) -> Self {
check_on_main_r_thread("new");
let shelter = RShelter { object };
let shelter = Some(shelter);
Self { shelter }
}

/// SAFETY: `get()` can only be called on the main R thread.
/// We also make an exception for tests where `test::start_r()` is used.
pub fn get(&self) -> &RObject {
let thread = std::thread::current();
let name = thread.name().unwrap_or("<unnamed>");

if name != R_MAIN_THREAD_NAME && unsafe { !R_TASK_BYPASS } {
#[cfg(debug_assertions)]
panic!("Can't access thread safe `RObject` on thread '{name}'.");
#[cfg(not(debug_assertions))]
log::error!("Can't access thread safe `RObject` on thread '{name}'.");
}

let shelter: &RObjectShelter = self.shelter.as_ref().unwrap();
let object: &RObject = &shelter.object;

pub fn get(&self) -> &T {
check_on_main_r_thread("get");
let shelter: &RShelter<T> = self.shelter.as_ref().unwrap();
let object: &T = &shelter.object;
object
}
}

impl Drop for RThreadSafeObject {
impl<T> Drop for RThreadSafe<T> {
fn drop(&mut self) {
// Take ownership of the `shelter` and `move` it into the async task
// to be dropped there
let shelter = self.shelter.take();

let Some(shelter) = shelter else {
log::error!("Can't find a `shelter` in this `RThreadSafeObject`.");
log::error!("Can't find a `shelter` in this `RThreadSafe`.");
return;
};

r_async_task(move || {
// Run the `drop()` method of the `RObjectShelter`, which in turn
// runs the `drop()` method of the `RObject`, which uses the R API
// so it must be called on the main R thread.
// Run the `drop()` method of the `RShelter`, which in turn
// runs the `drop()` method of the wrapped Rust object, which likely
// uses the R API (i.e. if it is an `RObject`) so it must be called
// on the main R thread.
drop(shelter);
})
}
}

fn check_on_main_r_thread(f: &str) {
let thread = std::thread::current();
let name = thread.name().unwrap_or("<unnamed>");

// An exception is made for testing, where we set `R_TASK_BYPASS` inside of
// `test::start_r()`
if name != R_MAIN_THREAD_NAME && unsafe { !R_TASK_BYPASS } {
let message =
format!("Must call `RThreadSafe::{f}()` on the main R thread, not thread '{name}'.");
#[cfg(debug_assertions)]
panic!("{message}");
#[cfg(not(debug_assertions))]
log::error!("{message}");
}
}
4 changes: 2 additions & 2 deletions crates/ark/tests/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use ark::environment::message::EnvironmentMessageUpdate;
use ark::environment::r_environment::REnvironment;
use ark::lsp::events::EVENTS;
use ark::r_task;
use ark::thread::RThreadSafeObject;
use ark::thread::RThreadSafe;
use crossbeam::channel::bounded;
use harp::exec::RFunction;
use harp::exec::RFunctionExt;
Expand Down Expand Up @@ -51,7 +51,7 @@ fn test_environment_list() {
.param("parent", R_EmptyEnv)
.call()
.unwrap();
RThreadSafeObject::new(env)
RThreadSafe::new(env)
});

// Create a sender/receiver pair for the comm channel.
Expand Down
6 changes: 0 additions & 6 deletions crates/harp/src/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,6 @@ pub struct Binding {
pub value: BindingValue,
}

// For sending to main thread as R task
unsafe impl Sync for Binding {}

// FIXME: This should only be Sync
unsafe impl Send for Binding {}

impl Binding {
pub fn new(env: SEXP, frame: SEXP) -> Self {
unsafe {
Expand Down
4 changes: 2 additions & 2 deletions crates/harp/src/object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,8 @@ impl Drop for RObject {

// SAFETY: Neither `Sync` nor `Send` are safe to implement for `RObject`. Even
// with `Sync`, you can call methods from `&RObject` while on different threads,
// which could call the R API. Instead, use `RThreadSafeObject` to send across
// threads.
// which could call the R API. Instead, use `RThreadSafe<RObject>` to send
// across threads.
// unsafe impl Sync for RObject {}
// unsafe impl Send for RObject {}

Expand Down

0 comments on commit 07a94b8

Please sign in to comment.