Skip to content

Commit

Permalink
Reactor readme
Browse files Browse the repository at this point in the history
  • Loading branch information
ecton committed Aug 5, 2024
1 parent 54c64a8 commit 5603b6a
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 126 deletions.
1 change: 1 addition & 0 deletions muse-lang/src/runtime/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,7 @@ impl Primitive {
Self::Float(value) => Ok(SymbolRef::from(value.to_string())),
}
}

/// Hashes this value into `hasher`.
pub fn hash_into(&self, hasher: &mut ValueHasher) {
core::mem::discriminant(self).hash(hasher);
Expand Down
74 changes: 60 additions & 14 deletions muse-reactor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,63 @@
//! - Waiting on a task's completion
//! - Cancelling a task
//! - Budgeting tasks in pools
//!
//! # Basic Usage
//!
//! ```rust
//! use muse_reactor::Reactor;
//! use muse_lang::runtime::value::{Primitive, RootedValue};
//!
//! // Create a new reactor for tasks to run in.
//! let reactor = Reactor::new();
//!
//! // Spawn a task that computes 1 + 2
//! let task = reactor.spawn_source("1 + 2").unwrap();
//!
//! // Wait for the result and verify it's 3.
//! assert_eq!(
//! task.join().unwrap(),
//! RootedValue::Primitive(Primitive::Int(3))
//! );
//! ```
//!
//! [`TaskHandle`] is also a future that can be awaited to wait for the task to
//! complete.
//!
//! # Budget Pools
//!
//! Budget pools enable efficiently restricting groups of tasks to execution
//! budgets. Each time a task assigned to a [`BudgetPool`] exhausts its budget,
//! it requests additional budget from the pool. If no budget is available, the
//! task is put to sleep and will automatically be resumed when the budget has
//! been replenished.
//!
//! ```rust
//! use muse_reactor::{BudgetPool, BudgetPoolConfig, Reactor};
//! use muse_lang::runtime::value::{Primitive, RootedValue};
//! use std::time::Duration;
//!
//! // Create a new reactor for tasks to run in.
//! let reactor = Reactor::new();
//!
//! // Create a budget pool that we can spawn tasks within.
//! let pool = reactor.create_budget_pool(BudgetPoolConfig::default()).unwrap();
//!
//! // Spawn a task within the budget pool
//! let task = pool.spawn_source("var i = 0; while i < 100 { i = i + 1; }; i").unwrap();
//!
//! // Verify the task isn't able to complete.
//! assert!(task.try_join_for(Duration::from_secs(1)).is_none());
//!
//! // Allocate enough budget.
//! pool.increase_budget(1_000);
//!
//! // Wait for the task to complete
//! assert_eq!(
//! task.join().unwrap(),
//! RootedValue::Primitive(Primitive::Int(100))
//! );
//! ```
#![allow(missing_docs)]
use std::any::Any;
use std::cell::Cell;
Expand Down Expand Up @@ -608,10 +665,6 @@ impl ResultHandle {
fn try_recv(&self) -> Option<Result<RootedValue, TaskError>> {
self.0.locked.lock().result.clone()
}

fn recv_async(&self) -> ResultHandleFuture<'_> {
ResultHandleFuture(self)
}
}

impl Debug for ResultHandle {
Expand Down Expand Up @@ -669,14 +722,11 @@ impl ResultDeadline for Instant {
}
}

#[derive(Debug)]
struct ResultHandleFuture<'a>(&'a ResultHandle);

impl Future for ResultHandleFuture<'_> {
impl Future for &'_ TaskHandle {
type Output = Result<RootedValue, TaskError>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut data = self.0 .0.locked.lock();
let mut data = self.result.0.locked.lock();
if let Some(result) = &data.result {
Poll::Ready(result.clone())
} else {
Expand Down Expand Up @@ -846,10 +896,6 @@ impl TaskHandle {
self.try_join_until(Instant::now() + duration)
}

pub async fn join_async(&self) -> Result<RootedValue, TaskError> {
self.result.recv_async().await
}

pub fn cancel(&self) {
let mut locked = self.result.0.locked.lock();
locked.cancelled = true;
Expand All @@ -866,7 +912,7 @@ impl CustomType for TaskHandle {
|this, vm, _arity| {
let waker = vm.waker().clone();
let mut context = Context::from_waker(&waker);
let mut future = this.result.recv_async();
let mut future = &*this;
match Pin::new(&mut future).poll(&mut context) {
Poll::Ready(result) => result.map(|v| v.downgrade()).map_err(|e| match e {
TaskError::Cancelled => {
Expand Down
224 changes: 112 additions & 112 deletions muse-reactor/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,112 +1,112 @@
use std::{thread, time::Duration};

use muse_lang::{
compiler::{self, syntax::Ranged},
runtime::value::{Primitive, RootedValue},
vm::Vm,
};
use refuse::CollectionGuard;

use crate::{BudgetPoolConfig, NoWork, Reactor, ReactorHandle, TaskError};

#[test]
fn works() {
let reactor = Reactor::new();
let task = reactor.spawn_source("1 + 2").unwrap();
assert_eq!(
task.join().unwrap(),
RootedValue::Primitive(Primitive::Int(3))
);
}

#[test]
fn spawning() {
let reactor = Reactor::new();
let task = reactor
.spawn_source(
r"
let func = fn(n, func) {
match n {
0 => 0,
n => {
let task = task.spawn_call(func, [n - 1, func]);
task() + n
}
}
};
func(100, func)
",
)
.unwrap();
assert_eq!(
task.join().unwrap(),
RootedValue::Primitive(Primitive::Int((0..=100).sum()))
);
}

#[test]
fn budgeting_basic() {
let reactor = Reactor::build()
.new_vm(
|guard: &mut CollectionGuard<'_>, _reactor: &ReactorHandle<NoWork>| {
let vm = Vm::new(guard);
vm.set_steps_per_charge(1);
Ok(vm)
},
)
.finish();
let pool = reactor.create_budget_pool(BudgetPoolConfig::new()).unwrap();
let task = pool.spawn_source("1 + 2").unwrap();

// Make sure the task doesn't complete
thread::sleep(Duration::from_secs(1));
assert!(task.try_join().is_none());

// Give it some budget
pool.increase_budget(100).unwrap();

assert_eq!(
task.join().unwrap(),
RootedValue::Primitive(Primitive::Int(3))
);
}

#[test]
fn spawn_err() {
let reactor = Reactor::new();
let task = reactor.spawn_source("invalid source code").unwrap();
match task.join() {
Err(TaskError::Compilation(errors)) => assert_eq!(
errors,
vec![Ranged(
compiler::Error::Syntax(crate::compiler::syntax::ParseError::ExpectedEof),
compiler::syntax::SourceRange {
source_id: compiler::syntax::SourceId::anonymous(),
start: 8,
length: 6
}
)]
),
other => unreachable!("unexpected result: {other:?}"),
};
}

#[test]
fn task_cancellation() {
let reactor = Reactor::new();
// Spawn a task with an infinite loop
let task = reactor.spawn_source("loop {}").unwrap();
// Wait a bit to make sure it's running.
assert!(task.try_join_for(Duration::from_secs(1)).is_none());

// Cancel the task.
println!("Cancelling");
task.cancel();

// Ensure we get a cancellation error.
match task.join() {
Err(TaskError::Cancelled) => {}
other => unreachable!("unexpected result: {other:?}"),
}
}
use std::thread;
use std::time::Duration;

use muse_lang::compiler::syntax::Ranged;
use muse_lang::compiler::{self};
use muse_lang::runtime::value::{Primitive, RootedValue};
use muse_lang::vm::Vm;
use refuse::CollectionGuard;

use crate::{BudgetPoolConfig, NoWork, Reactor, ReactorHandle, TaskError};

#[test]
fn works() {
let reactor = Reactor::new();
let task = reactor.spawn_source("1 + 2").unwrap();
assert_eq!(
task.join().unwrap(),
RootedValue::Primitive(Primitive::Int(3))
);
}

#[test]
fn spawning() {
let reactor = Reactor::new();
let task = reactor
.spawn_source(
r"
let func = fn(n, func) {
match n {
0 => 0,
n => {
let task = task.spawn_call(func, [n - 1, func]);
task() + n
}
}
};
func(100, func)
",
)
.unwrap();
assert_eq!(
task.join().unwrap(),
RootedValue::Primitive(Primitive::Int((0..=100).sum()))
);
}

#[test]
fn budgeting_basic() {
let reactor = Reactor::build()
.new_vm(
|guard: &mut CollectionGuard<'_>, _reactor: &ReactorHandle<NoWork>| {
let vm = Vm::new(guard);
vm.set_steps_per_charge(1);
Ok(vm)
},
)
.finish();
let pool = reactor.create_budget_pool(BudgetPoolConfig::new()).unwrap();
let task = pool.spawn_source("1 + 2").unwrap();

// Make sure the task doesn't complete
thread::sleep(Duration::from_secs(1));
assert!(task.try_join().is_none());

// Give it some budget
pool.increase_budget(100).unwrap();

assert_eq!(
task.join().unwrap(),
RootedValue::Primitive(Primitive::Int(3))
);
}

#[test]
fn spawn_err() {
let reactor = Reactor::new();
let task = reactor.spawn_source("invalid source code").unwrap();
match task.join() {
Err(TaskError::Compilation(errors)) => assert_eq!(
errors,
vec![Ranged(
compiler::Error::Syntax(crate::compiler::syntax::ParseError::ExpectedEof),
compiler::syntax::SourceRange {
source_id: compiler::syntax::SourceId::anonymous(),
start: 8,
length: 6
}
)]
),
other => unreachable!("unexpected result: {other:?}"),
};
}

#[test]
fn task_cancellation() {
let reactor = Reactor::new();
// Spawn a task with an infinite loop
let task = reactor.spawn_source("loop {}").unwrap();
// Wait a bit to make sure it's running.
assert!(task.try_join_for(Duration::from_secs(1)).is_none());

// Cancel the task.
println!("Cancelling");
task.cancel();

// Ensure we get a cancellation error.
match task.join() {
Err(TaskError::Cancelled) => {}
other => unreachable!("unexpected result: {other:?}"),
}
}

0 comments on commit 5603b6a

Please sign in to comment.