Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 697 #698

Closed
1 change: 1 addition & 0 deletions rayon-futures/src/compile_fail/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
// These modules contain `compile_fail` doc tests.
mod future_escape;
mod non_send_item;
62 changes: 62 additions & 0 deletions rayon-futures/src/compile_fail/non_send_item.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*! ```compile_fail,E0277
extern crate futures;
extern crate rayon_core;

use std::marker::PhantomData;
use futures::future::poll_fn;
use futures::Poll;
use futures::Async;
use futures::Future;
use rayon_futures::ScopeFutureExt;
use rayon_core::scope;
use rayon_core::ThreadPool;

let evil_future = poll_fn(|| Ok(Async::Ready(PhantomData::<*mut ()>)) );
let pool = ThreadPool::global();

scope(|s| {
let f = s.spawn_future(evil_future); //~ ERROR
let _: Result<_, ()> = f.rayon_wait();
} );
``` */

// Original test case:

/* #[test]
fn non_send_item() {
use std::marker::PhantomData;
use std::thread;
use futures::future::poll_fn;

struct TattleTale {
id: thread::ThreadId,
not_send: PhantomData<*mut ()>
}

impl Drop for TattleTale {
fn drop(&mut self) {
let drop_id = thread::current().id();
assert_eq!(self.id, drop_id);
}
}

let evil_future_factory = || { poll_fn(|| {
let evil_item = TattleTale {
id: thread::current().id(),
not_send: PhantomData,
};
return Ok(Async::Ready(evil_item));
} ) };

let pool = ThreadPool::global();

scope(|s| {
let futures: Vec<_> = (0..1000)
.map(|_: i32| s.spawn_future(evil_future_factory()))
.collect();

for f in futures {
let _: Result<_, ()> = f.rayon_wait();
}
} );
} */
63 changes: 51 additions & 12 deletions rayon-futures/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ const STATE_COMPLETE: usize = 4;
pub trait ScopeFutureExt<'scope> {
fn spawn_future<F>(&self, future: F) -> RayonFuture<F::Item, F::Error>
where
F: Future + Send + 'scope;
F: Future + Send + 'scope,
<F as Future>::Item: Send,
<F as Future>::Error: Send;
}

impl<'scope, T> ScopeFutureExt<'scope> for T
Expand All @@ -44,6 +46,8 @@ where
fn spawn_future<F>(&self, future: F) -> RayonFuture<F::Item, F::Error>
where
F: Future + Send + 'scope,
<F as Future>::Item: Send,
<F as Future>::Error: Send,
{
let inner = ScopeFuture::spawn(future, self.to_scope_handle());

Expand Down Expand Up @@ -136,6 +140,8 @@ impl<T, E> fmt::Debug for RayonFuture<T, E> {
struct ScopeFuture<'scope, F, S>
where
F: Future + Send + 'scope,
<F as Future>::Item: Send,
<F as Future>::Error: Send,
S: ScopeHandle<'scope>,
{
state: AtomicUsize,
Expand All @@ -149,6 +155,8 @@ type CUError<F> = <CU<F> as Future>::Error;
struct ScopeFutureContents<'scope, F, S>
where
F: Future + Send + 'scope,
<F as Future>::Item: Send,
<F as Future>::Error: Send,
S: ScopeHandle<'scope>,
{
spawn: Option<Spawn<CU<F>>>,
Expand All @@ -160,7 +168,7 @@ where
// the counter in the scope; since the scope doesn't terminate until
// counter reaches zero, and we hold a ref in this counter, we are
// assured that this pointer remains valid
scope: Option<S>,
scope: Option<ScopeHandleSend<'scope, S>>,

waiting_task: Option<Task>,
result: Poll<CUItem<F>, CUError<F>>,
Expand All @@ -171,6 +179,8 @@ where
impl<'scope, F, S> fmt::Debug for ScopeFutureContents<'scope, F, S>
where
F: Future + Send + 'scope,
<F as Future>::Item: Send,
<F as Future>::Error: Send,
S: ScopeHandle<'scope>,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
Expand All @@ -183,11 +193,15 @@ where
struct ArcScopeFuture<'scope, F, S>(Arc<ScopeFuture<'scope, F, S>>)
where
F: Future + Send + 'scope,
<F as Future>::Item: Send,
<F as Future>::Error: Send,
S: ScopeHandle<'scope>;

impl<'scope, F, S> Clone for ArcScopeFuture<'scope, F, S>
where
F: Future + Send + 'scope,
<F as Future>::Item: Send,
<F as Future>::Error: Send,
S: ScopeHandle<'scope>,
{
fn clone(&self) -> Self {
Expand All @@ -198,6 +212,8 @@ where
impl<'scope, F, S> Notify for ArcScopeFuture<'scope, F, S>
where
F: Future + Send + 'scope,
<F as Future>::Item: Send,
<F as Future>::Error: Send,
S: ScopeHandle<'scope>,
{
fn notify(&self, id: usize) {
Expand Down Expand Up @@ -225,6 +241,8 @@ unsafe impl<'scope, F, S> Sync for ScopeFutureWrapped<'scope, F, S> {}
impl<'scope, F, S> Notify for ScopeFutureWrapped<'scope, F, S>
where
F: Future + Send + 'scope,
<F as Future>::Item: Send,
<F as Future>::Error: Send,
S: ScopeHandle<'scope>,
{
fn notify(&self, id: usize) {
Expand Down Expand Up @@ -264,6 +282,8 @@ where
unsafe impl<'scope, F, S> UnsafeNotify for ScopeFutureWrapped<'scope, F, S>
where
F: Future + Send + 'scope,
<F as Future>::Item: Send,
<F as Future>::Error: Send,
S: ScopeHandle<'scope>,
{
unsafe fn clone_raw(&self) -> NotifyHandle {
Expand All @@ -285,6 +305,8 @@ where
impl<'scope, F, S> From<ArcScopeFuture<'scope, F, S>> for NotifyHandle
where
F: Future + Send + 'scope,
<F as Future>::Item: Send,
<F as Future>::Error: Send,
S: ScopeHandle<'scope>,
{
fn from(rc: ArcScopeFuture<'scope, F, S>) -> NotifyHandle {
Expand All @@ -309,22 +331,31 @@ where
}

// Assert that the `*const` is safe to transmit between threads:
unsafe impl<'scope, F, S> Send for ScopeFuture<'scope, F, S>
where
F: Future + Send + 'scope,
S: ScopeHandle<'scope>,
{
}
unsafe impl<'scope, F, S> Sync for ScopeFuture<'scope, F, S>
struct ScopeHandleSend<'s, S: ScopeHandle<'s>>(S, PhantomData<&'s ()>);
unsafe impl<'scope, S> Send for ScopeHandleSend<'scope, S> where S: ScopeHandle<'scope> {}
impl<'scope, S> ScopeHandleSend<'scope, S>
where
F: Future + Send + 'scope,
S: ScopeHandle<'scope>,
{
unsafe fn assert_send(s: S) -> Self {
ScopeHandleSend(s, PhantomData)
}
unsafe fn spawn_task<T: RayonTask + 'scope>(&self, task: Arc<T>) {
self.0.spawn_task(task);
}
fn panicked(self, err: Box<Any + Send>) {
self.0.panicked(err);
}
fn ok(self) {
self.0.ok();
}
}

impl<'scope, F, S> ScopeFuture<'scope, F, S>
where
F: Future + Send + 'scope,
<F as Future>::Item: Send,
<F as Future>::Error: Send,
S: ScopeHandle<'scope>,
{
fn spawn(future: F, scope: S) -> Arc<Self> {
Expand All @@ -338,7 +369,7 @@ where
contents: Mutex::new(ScopeFutureContents {
spawn: None,
this: None,
scope: Some(scope),
scope: unsafe { Some(ScopeHandleSend::assert_send(scope)) },
waiting_task: None,
result: Ok(Async::NotReady),
canceled: false,
Expand Down Expand Up @@ -466,6 +497,8 @@ where
impl<'scope, F, S> Notify for ScopeFuture<'scope, F, S>
where
F: Future + Send + 'scope,
<F as Future>::Item: Send,
<F as Future>::Error: Send,
S: ScopeHandle<'scope>,
{
fn notify(&self, _: usize) {
Expand All @@ -476,6 +509,8 @@ where
impl<'scope, F, S> RayonTask for ScopeFuture<'scope, F, S>
where
F: Future + Send + 'scope,
<F as Future>::Item: Send,
<F as Future>::Error: Send,
S: ScopeHandle<'scope>,
{
fn execute(this: Arc<Self>) {
Expand Down Expand Up @@ -510,6 +545,8 @@ where
impl<'scope, F, S> ScopeFutureContents<'scope, F, S>
where
F: Future + Send + 'scope,
<F as Future>::Item: Send,
<F as Future>::Error: Send,
S: ScopeHandle<'scope>,
{
fn poll(&mut self) -> Poll<CUItem<F>, CUError<F>> {
Expand Down Expand Up @@ -578,7 +615,9 @@ trait ScopeFutureTrait<T, E>: Send + Sync {

impl<'scope, F, S> ScopeFutureTrait<CUItem<F>, CUError<F>> for ScopeFuture<'scope, F, S>
where
F: Future + Send,
F: Future + Send + 'scope,
<F as Future>::Item: Send,
<F as Future>::Error: Send,
S: ScopeHandle<'scope>,
{
fn probe(&self) -> bool {
Expand Down