diff --git a/muse-lang/src/runtime/value.rs b/muse-lang/src/runtime/value.rs index 5ab520a..877f54a 100644 --- a/muse-lang/src/runtime/value.rs +++ b/muse-lang/src/runtime/value.rs @@ -733,6 +733,7 @@ impl Primitive { Self::Float(value) => Ok(SymbolRef::from(value.to_string())), } } + /// Hashes this value into `hasher`. pub fn hash_into(&self, hasher: &mut ValueHasher) { core::mem::discriminant(self).hash(hasher); diff --git a/muse-reactor/src/lib.rs b/muse-reactor/src/lib.rs index 19a4a1f..9f8a500 100644 --- a/muse-reactor/src/lib.rs +++ b/muse-reactor/src/lib.rs @@ -7,6 +7,63 @@ //! - Waiting on a task's completion //! - Cancelling a task //! - Budgeting tasks in pools +//! +//! # Basic Usage +//! +//! ```rust +//! use muse_reactor::Reactor; +//! use muse_lang::runtime::value::{Primitive, RootedValue}; +//! +//! // Create a new reactor for tasks to run in. +//! let reactor = Reactor::new(); +//! +//! // Spawn a task that computes 1 + 2 +//! let task = reactor.spawn_source("1 + 2").unwrap(); +//! +//! // Wait for the result and verify it's 3. +//! assert_eq!( +//! task.join().unwrap(), +//! RootedValue::Primitive(Primitive::Int(3)) +//! ); +//! ``` +//! +//! [`TaskHandle`] is also a future that can be awaited to wait for the task to +//! complete. +//! +//! # Budget Pools +//! +//! Budget pools enable efficiently restricting groups of tasks to execution +//! budgets. Each time a task assigned to a [`BudgetPool`] exhausts its budget, +//! it requests additional budget from the pool. If no budget is available, the +//! task is put to sleep and will automatically be resumed when the budget has +//! been replenished. +//! +//! ```rust +//! use muse_reactor::{BudgetPool, BudgetPoolConfig, Reactor}; +//! use muse_lang::runtime::value::{Primitive, RootedValue}; +//! use std::time::Duration; +//! +//! // Create a new reactor for tasks to run in. +//! let reactor = Reactor::new(); +//! +//! // Create a budget pool that we can spawn tasks within. +//! let pool = reactor.create_budget_pool(BudgetPoolConfig::default()).unwrap(); +//! +//! // Spawn a task within the budget pool +//! let task = pool.spawn_source("var i = 0; while i < 100 { i = i + 1; }; i").unwrap(); +//! +//! // Verify the task isn't able to complete. +//! assert!(task.try_join_for(Duration::from_secs(1)).is_none()); +//! +//! // Allocate enough budget. +//! pool.increase_budget(1_000); +//! +//! // Wait for the task to complete +//! assert_eq!( +//! task.join().unwrap(), +//! RootedValue::Primitive(Primitive::Int(100)) +//! ); +//! ``` #![allow(missing_docs)] use std::any::Any; use std::cell::Cell; @@ -608,10 +665,6 @@ impl ResultHandle { fn try_recv(&self) -> Option> { self.0.locked.lock().result.clone() } - - fn recv_async(&self) -> ResultHandleFuture<'_> { - ResultHandleFuture(self) - } } impl Debug for ResultHandle { @@ -669,14 +722,11 @@ impl ResultDeadline for Instant { } } -#[derive(Debug)] -struct ResultHandleFuture<'a>(&'a ResultHandle); - -impl Future for ResultHandleFuture<'_> { +impl Future for &'_ TaskHandle { type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut data = self.0 .0.locked.lock(); + let mut data = self.result.0.locked.lock(); if let Some(result) = &data.result { Poll::Ready(result.clone()) } else { @@ -846,10 +896,6 @@ impl TaskHandle { self.try_join_until(Instant::now() + duration) } - pub async fn join_async(&self) -> Result { - self.result.recv_async().await - } - pub fn cancel(&self) { let mut locked = self.result.0.locked.lock(); locked.cancelled = true; @@ -866,7 +912,7 @@ impl CustomType for TaskHandle { |this, vm, _arity| { let waker = vm.waker().clone(); let mut context = Context::from_waker(&waker); - let mut future = this.result.recv_async(); + let mut future = &*this; match Pin::new(&mut future).poll(&mut context) { Poll::Ready(result) => result.map(|v| v.downgrade()).map_err(|e| match e { TaskError::Cancelled => { diff --git a/muse-reactor/src/tests.rs b/muse-reactor/src/tests.rs index b55e7e5..45bbf9a 100644 --- a/muse-reactor/src/tests.rs +++ b/muse-reactor/src/tests.rs @@ -1,112 +1,112 @@ -use std::{thread, time::Duration}; - -use muse_lang::{ - compiler::{self, syntax::Ranged}, - runtime::value::{Primitive, RootedValue}, - vm::Vm, -}; -use refuse::CollectionGuard; - -use crate::{BudgetPoolConfig, NoWork, Reactor, ReactorHandle, TaskError}; - -#[test] -fn works() { - let reactor = Reactor::new(); - let task = reactor.spawn_source("1 + 2").unwrap(); - assert_eq!( - task.join().unwrap(), - RootedValue::Primitive(Primitive::Int(3)) - ); -} - -#[test] -fn spawning() { - let reactor = Reactor::new(); - let task = reactor - .spawn_source( - r" - let func = fn(n, func) { - match n { - 0 => 0, - n => { - let task = task.spawn_call(func, [n - 1, func]); - task() + n - } - } - }; - - func(100, func) - ", - ) - .unwrap(); - assert_eq!( - task.join().unwrap(), - RootedValue::Primitive(Primitive::Int((0..=100).sum())) - ); -} - -#[test] -fn budgeting_basic() { - let reactor = Reactor::build() - .new_vm( - |guard: &mut CollectionGuard<'_>, _reactor: &ReactorHandle| { - let vm = Vm::new(guard); - vm.set_steps_per_charge(1); - Ok(vm) - }, - ) - .finish(); - let pool = reactor.create_budget_pool(BudgetPoolConfig::new()).unwrap(); - let task = pool.spawn_source("1 + 2").unwrap(); - - // Make sure the task doesn't complete - thread::sleep(Duration::from_secs(1)); - assert!(task.try_join().is_none()); - - // Give it some budget - pool.increase_budget(100).unwrap(); - - assert_eq!( - task.join().unwrap(), - RootedValue::Primitive(Primitive::Int(3)) - ); -} - -#[test] -fn spawn_err() { - let reactor = Reactor::new(); - let task = reactor.spawn_source("invalid source code").unwrap(); - match task.join() { - Err(TaskError::Compilation(errors)) => assert_eq!( - errors, - vec![Ranged( - compiler::Error::Syntax(crate::compiler::syntax::ParseError::ExpectedEof), - compiler::syntax::SourceRange { - source_id: compiler::syntax::SourceId::anonymous(), - start: 8, - length: 6 - } - )] - ), - other => unreachable!("unexpected result: {other:?}"), - }; -} - -#[test] -fn task_cancellation() { - let reactor = Reactor::new(); - // Spawn a task with an infinite loop - let task = reactor.spawn_source("loop {}").unwrap(); - // Wait a bit to make sure it's running. - assert!(task.try_join_for(Duration::from_secs(1)).is_none()); - - // Cancel the task. - println!("Cancelling"); - task.cancel(); - - // Ensure we get a cancellation error. - match task.join() { - Err(TaskError::Cancelled) => {} - other => unreachable!("unexpected result: {other:?}"), - } -} +use std::thread; +use std::time::Duration; + +use muse_lang::compiler::syntax::Ranged; +use muse_lang::compiler::{self}; +use muse_lang::runtime::value::{Primitive, RootedValue}; +use muse_lang::vm::Vm; +use refuse::CollectionGuard; + +use crate::{BudgetPoolConfig, NoWork, Reactor, ReactorHandle, TaskError}; + +#[test] +fn works() { + let reactor = Reactor::new(); + let task = reactor.spawn_source("1 + 2").unwrap(); + assert_eq!( + task.join().unwrap(), + RootedValue::Primitive(Primitive::Int(3)) + ); +} + +#[test] +fn spawning() { + let reactor = Reactor::new(); + let task = reactor + .spawn_source( + r" + let func = fn(n, func) { + match n { + 0 => 0, + n => { + let task = task.spawn_call(func, [n - 1, func]); + task() + n + } + } + }; + + func(100, func) + ", + ) + .unwrap(); + assert_eq!( + task.join().unwrap(), + RootedValue::Primitive(Primitive::Int((0..=100).sum())) + ); +} + +#[test] +fn budgeting_basic() { + let reactor = Reactor::build() + .new_vm( + |guard: &mut CollectionGuard<'_>, _reactor: &ReactorHandle| { + let vm = Vm::new(guard); + vm.set_steps_per_charge(1); + Ok(vm) + }, + ) + .finish(); + let pool = reactor.create_budget_pool(BudgetPoolConfig::new()).unwrap(); + let task = pool.spawn_source("1 + 2").unwrap(); + + // Make sure the task doesn't complete + thread::sleep(Duration::from_secs(1)); + assert!(task.try_join().is_none()); + + // Give it some budget + pool.increase_budget(100).unwrap(); + + assert_eq!( + task.join().unwrap(), + RootedValue::Primitive(Primitive::Int(3)) + ); +} + +#[test] +fn spawn_err() { + let reactor = Reactor::new(); + let task = reactor.spawn_source("invalid source code").unwrap(); + match task.join() { + Err(TaskError::Compilation(errors)) => assert_eq!( + errors, + vec![Ranged( + compiler::Error::Syntax(crate::compiler::syntax::ParseError::ExpectedEof), + compiler::syntax::SourceRange { + source_id: compiler::syntax::SourceId::anonymous(), + start: 8, + length: 6 + } + )] + ), + other => unreachable!("unexpected result: {other:?}"), + }; +} + +#[test] +fn task_cancellation() { + let reactor = Reactor::new(); + // Spawn a task with an infinite loop + let task = reactor.spawn_source("loop {}").unwrap(); + // Wait a bit to make sure it's running. + assert!(task.try_join_for(Duration::from_secs(1)).is_none()); + + // Cancel the task. + println!("Cancelling"); + task.cancel(); + + // Ensure we get a cancellation error. + match task.join() { + Err(TaskError::Cancelled) => {} + other => unreachable!("unexpected result: {other:?}"), + } +}