diff --git a/rayon-futures/src/compile_fail/mod.rs b/rayon-futures/src/compile_fail/mod.rs index 9e08a7bd2..8ee424601 100644 --- a/rayon-futures/src/compile_fail/mod.rs +++ b/rayon-futures/src/compile_fail/mod.rs @@ -1,2 +1,3 @@ // These modules contain `compile_fail` doc tests. mod future_escape; +mod non_send_item; diff --git a/rayon-futures/src/compile_fail/non_send_item.rs b/rayon-futures/src/compile_fail/non_send_item.rs new file mode 100644 index 000000000..474404940 --- /dev/null +++ b/rayon-futures/src/compile_fail/non_send_item.rs @@ -0,0 +1,62 @@ +/*! ```compile_fail,E0277 +extern crate futures; +extern crate rayon_core; + +use std::marker::PhantomData; +use futures::future::poll_fn; +use futures::Poll; +use futures::Async; +use futures::Future; +use rayon_futures::ScopeFutureExt; +use rayon_core::scope; +use rayon_core::ThreadPool; + +let evil_future = poll_fn(|| Ok(Async::Ready(PhantomData::<*mut ()>)) ); +let pool = ThreadPool::global(); + +scope(|s| { + let f = s.spawn_future(evil_future); //~ ERROR + let _: Result<_, ()> = f.rayon_wait(); +} ); +``` */ + +// Original test case: + +/* #[test] +fn non_send_item() { + use std::marker::PhantomData; + use std::thread; + use futures::future::poll_fn; + + struct TattleTale { + id: thread::ThreadId, + not_send: PhantomData<*mut ()> + } + + impl Drop for TattleTale { + fn drop(&mut self) { + let drop_id = thread::current().id(); + assert_eq!(self.id, drop_id); + } + } + + let evil_future_factory = || { poll_fn(|| { + let evil_item = TattleTale { + id: thread::current().id(), + not_send: PhantomData, + }; + return Ok(Async::Ready(evil_item)); + } ) }; + + let pool = ThreadPool::global(); + + scope(|s| { + let futures: Vec<_> = (0..1000) + .map(|_: i32| s.spawn_future(evil_future_factory())) + .collect(); + + for f in futures { + let _: Result<_, ()> = f.rayon_wait(); + } + } ); +} */ diff --git a/rayon-futures/src/lib.rs b/rayon-futures/src/lib.rs index 040134e5e..78a54b179 100644 --- a/rayon-futures/src/lib.rs +++ b/rayon-futures/src/lib.rs @@ -34,7 +34,9 @@ const STATE_COMPLETE: usize = 4; pub trait ScopeFutureExt<'scope> { fn spawn_future(&self, future: F) -> RayonFuture where - F: Future + Send + 'scope; + F: Future + Send + 'scope, + ::Item: Send, + ::Error: Send; } impl<'scope, T> ScopeFutureExt<'scope> for T @@ -44,6 +46,8 @@ where fn spawn_future(&self, future: F) -> RayonFuture where F: Future + Send + 'scope, + ::Item: Send, + ::Error: Send, { let inner = ScopeFuture::spawn(future, self.to_scope_handle()); @@ -136,6 +140,8 @@ impl fmt::Debug for RayonFuture { struct ScopeFuture<'scope, F, S> where F: Future + Send + 'scope, + ::Item: Send, + ::Error: Send, S: ScopeHandle<'scope>, { state: AtomicUsize, @@ -149,6 +155,8 @@ type CUError = as Future>::Error; struct ScopeFutureContents<'scope, F, S> where F: Future + Send + 'scope, + ::Item: Send, + ::Error: Send, S: ScopeHandle<'scope>, { spawn: Option>>, @@ -160,7 +168,7 @@ where // the counter in the scope; since the scope doesn't terminate until // counter reaches zero, and we hold a ref in this counter, we are // assured that this pointer remains valid - scope: Option, + scope: Option>, waiting_task: Option, result: Poll, CUError>, @@ -171,6 +179,8 @@ where impl<'scope, F, S> fmt::Debug for ScopeFutureContents<'scope, F, S> where F: Future + Send + 'scope, + ::Item: Send, + ::Error: Send, S: ScopeHandle<'scope>, { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { @@ -183,11 +193,15 @@ where struct ArcScopeFuture<'scope, F, S>(Arc>) where F: Future + Send + 'scope, + ::Item: Send, + ::Error: Send, S: ScopeHandle<'scope>; impl<'scope, F, S> Clone for ArcScopeFuture<'scope, F, S> where F: Future + Send + 'scope, + ::Item: Send, + ::Error: Send, S: ScopeHandle<'scope>, { fn clone(&self) -> Self { @@ -198,6 +212,8 @@ where impl<'scope, F, S> Notify for ArcScopeFuture<'scope, F, S> where F: Future + Send + 'scope, + ::Item: Send, + ::Error: Send, S: ScopeHandle<'scope>, { fn notify(&self, id: usize) { @@ -225,6 +241,8 @@ unsafe impl<'scope, F, S> Sync for ScopeFutureWrapped<'scope, F, S> {} impl<'scope, F, S> Notify for ScopeFutureWrapped<'scope, F, S> where F: Future + Send + 'scope, + ::Item: Send, + ::Error: Send, S: ScopeHandle<'scope>, { fn notify(&self, id: usize) { @@ -264,6 +282,8 @@ where unsafe impl<'scope, F, S> UnsafeNotify for ScopeFutureWrapped<'scope, F, S> where F: Future + Send + 'scope, + ::Item: Send, + ::Error: Send, S: ScopeHandle<'scope>, { unsafe fn clone_raw(&self) -> NotifyHandle { @@ -285,6 +305,8 @@ where impl<'scope, F, S> From> for NotifyHandle where F: Future + Send + 'scope, + ::Item: Send, + ::Error: Send, S: ScopeHandle<'scope>, { fn from(rc: ArcScopeFuture<'scope, F, S>) -> NotifyHandle { @@ -309,22 +331,31 @@ where } // Assert that the `*const` is safe to transmit between threads: -unsafe impl<'scope, F, S> Send for ScopeFuture<'scope, F, S> -where - F: Future + Send + 'scope, - S: ScopeHandle<'scope>, -{ -} -unsafe impl<'scope, F, S> Sync for ScopeFuture<'scope, F, S> +struct ScopeHandleSend<'s, S: ScopeHandle<'s>>(S, PhantomData<&'s ()>); +unsafe impl<'scope, S> Send for ScopeHandleSend<'scope, S> where S: ScopeHandle<'scope> {} +impl<'scope, S> ScopeHandleSend<'scope, S> where - F: Future + Send + 'scope, S: ScopeHandle<'scope>, { + unsafe fn assert_send(s: S) -> Self { + ScopeHandleSend(s, PhantomData) + } + unsafe fn spawn_task(&self, task: Arc) { + self.0.spawn_task(task); + } + fn panicked(self, err: Box) { + self.0.panicked(err); + } + fn ok(self) { + self.0.ok(); + } } impl<'scope, F, S> ScopeFuture<'scope, F, S> where F: Future + Send + 'scope, + ::Item: Send, + ::Error: Send, S: ScopeHandle<'scope>, { fn spawn(future: F, scope: S) -> Arc { @@ -338,7 +369,7 @@ where contents: Mutex::new(ScopeFutureContents { spawn: None, this: None, - scope: Some(scope), + scope: unsafe { Some(ScopeHandleSend::assert_send(scope)) }, waiting_task: None, result: Ok(Async::NotReady), canceled: false, @@ -466,6 +497,8 @@ where impl<'scope, F, S> Notify for ScopeFuture<'scope, F, S> where F: Future + Send + 'scope, + ::Item: Send, + ::Error: Send, S: ScopeHandle<'scope>, { fn notify(&self, _: usize) { @@ -476,6 +509,8 @@ where impl<'scope, F, S> RayonTask for ScopeFuture<'scope, F, S> where F: Future + Send + 'scope, + ::Item: Send, + ::Error: Send, S: ScopeHandle<'scope>, { fn execute(this: Arc) { @@ -510,6 +545,8 @@ where impl<'scope, F, S> ScopeFutureContents<'scope, F, S> where F: Future + Send + 'scope, + ::Item: Send, + ::Error: Send, S: ScopeHandle<'scope>, { fn poll(&mut self) -> Poll, CUError> { @@ -578,7 +615,9 @@ trait ScopeFutureTrait: Send + Sync { impl<'scope, F, S> ScopeFutureTrait, CUError> for ScopeFuture<'scope, F, S> where - F: Future + Send, + F: Future + Send + 'scope, + ::Item: Send, + ::Error: Send, S: ScopeHandle<'scope>, { fn probe(&self) -> bool {