Skip to content

Commit

Permalink
formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
rdfriese committed Jul 25, 2024
1 parent 64e78a1 commit b4f68d6
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 105 deletions.
3 changes: 0 additions & 3 deletions examples/hello_world/hello_world_am.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,3 @@ fn main() {
//wait for the request to complete
world.block_on(request);
} //when world drops there is an implicit world.barrier() that occurs



3 changes: 2 additions & 1 deletion examples/team_examples/custom_team_arch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ impl LamellarArch for BlockStridedArch {
let block = parent_pe / self.block_size;
let start_block = self.start_pe / self.block_size;
let remainder = parent_pe % self.block_size;
if block >= start_block && (block - start_block) % self.stride == 0
if block >= start_block
&& (block - start_block) % self.stride == 0
&& self.start_pe <= *parent_pe
&& *parent_pe <= self.end_pe
{
Expand Down
99 changes: 50 additions & 49 deletions src/array/iterator/distributed_iterator.rs

Large diffs are not rendered by default.

65 changes: 32 additions & 33 deletions src/array/iterator/local_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,18 @@ use zip::*;
pub(crate) use consumer::*;

use crate::array::iterator::{private::*, Schedule};
use crate::array::{
operations::ArrayOps, AsyncTeamFrom, Distribution, InnerArray, LamellarArray,
};
use crate::array::{operations::ArrayOps, AsyncTeamFrom, Distribution, InnerArray, LamellarArray};
use crate::memregion::Dist;
use crate::LamellarTeamRT;

use crate::active_messaging::SyncSend;

use enum_dispatch::enum_dispatch;
use futures_util::Future;
use paste::paste;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::Arc;
use paste::paste;

macro_rules! consumer_impl {
($name:ident<$($generics:ident),*>($($arg:ident : $arg_ty:ty),*); [$($return_type: tt)*]; [$($bounds:tt)+] ; [$(-> $($blocking_ret:tt)*)? ]) => {
Expand Down Expand Up @@ -98,45 +96,44 @@ macro_rules! consumer_impl {
#[doc(hidden)]
#[enum_dispatch]
pub trait LocalIteratorLauncher: InnerArray {

consumer_impl!(
for_each<I, F>(iter: &I, op: F);
[LocalIterForEachHandle];
[I: LocalIterator + 'static, F: Fn(I::Item) + SyncSend + Clone + 'static];
[]
);
consumer_impl!(
for_each_async<I, F, Fut>(iter: &I, op: F);
for_each_async<I, F, Fut>(iter: &I, op: F);
[LocalIterForEachHandle];
[I: LocalIterator + 'static, F: Fn(I::Item) -> Fut + SyncSend + Clone + 'static, Fut: Future<Output = ()> + Send + 'static];
[]);

consumer_impl!(
reduce<I, F>(iter: &I, op: F);
reduce<I, F>(iter: &I, op: F);
[LocalIterReduceHandle<I::Item, F>];
[I: LocalIterator + 'static, I::Item: SyncSend + Copy, F: Fn(I::Item, I::Item) -> I::Item + SyncSend + Clone + 'static];
[-> Option<I::Item>]);

consumer_impl!(
collect<I, A>(iter: &I, d: Distribution);
collect<I, A>(iter: &I, d: Distribution);
[LocalIterCollectHandle<I::Item, A>];
[I: LocalIterator + 'static, I::Item: Dist + ArrayOps, A: AsyncTeamFrom<(Vec<I::Item>, Distribution)> + SyncSend + Clone + 'static];
[-> A]);

consumer_impl!(
collect_async<I, A, B>(iter: &I, d: Distribution);
collect_async<I, A, B>(iter: &I, d: Distribution);
[LocalIterCollectHandle<B, A>];
[I: LocalIterator + 'static, I::Item: Future<Output = B> + Send + 'static,B: Dist + ArrayOps,A: AsyncTeamFrom<(Vec<B>, Distribution)> + SyncSend + Clone + 'static,];
[-> A]);

consumer_impl!(
count<I>(iter: &I);
count<I>(iter: &I);
[LocalIterCountHandle];
[I: LocalIterator + 'static ];
[-> usize]);

consumer_impl!(
sum<I>(iter: &I);
sum<I>(iter: &I);
[LocalIterSumHandle<I::Item>];
[I: LocalIterator + 'static, I::Item: SyncSend + std::iter::Sum + for<'a> std::iter::Sum<&'a I::Item> , ];
[-> I::Item]);
Expand Down Expand Up @@ -358,7 +355,7 @@ pub trait LocalIterator: SyncSend + IterClone + 'static {
/// .for_each(move |elem| println!("{:?} {elem}",std::thread::current().id()))
/// );
///```
#[must_use = "this iteration adapter is lazy and does nothing unless awaited. Either await the returned future, or call 'spawn()' or 'block()' on it."]
#[must_use = "this iteration adapter is lazy and does nothing unless awaited. Either await the returned future, or call 'spawn()' or 'block()' on it."]
fn for_each<F>(&self, op: F) -> LocalIterForEachHandle
where
F: Fn(Self::Item) + SyncSend + Clone + 'static,
Expand All @@ -379,11 +376,11 @@ pub trait LocalIterator: SyncSend + IterClone + 'static {
// /// let world = LamellarWorldBuilder::new().build();
// /// let array: ReadOnlyArray<usize> = ReadOnlyArray::new(&world,100,Distribution::Block);
// ///
// ///
// ///
// /// array
// /// .local_iter()
// /// .blocking_for_each(move |elem| println!("{:?} {elem}",std::thread::current().id()));
// ///
// ///
// ///```
// fn blocking_for_each<F>(&self, op: F)
// where
Expand All @@ -408,7 +405,7 @@ pub trait LocalIterator: SyncSend + IterClone + 'static {
/// array.local_iter().for_each_with_schedule(Schedule::WorkStealing, |elem| println!("{:?} {elem}",std::thread::current().id()));
/// array.wait_all();
///```
#[must_use = "this iteration adapter is lazy and does nothing unless awaited. Either await the returned future, or call 'spawn()' or 'block()' on it."]
#[must_use = "this iteration adapter is lazy and does nothing unless awaited. Either await the returned future, or call 'spawn()' or 'block()' on it."]
fn for_each_with_schedule<F>(&self, sched: Schedule, op: F) -> LocalIterForEachHandle
where
F: Fn(Self::Item) + SyncSend + Clone + 'static,
Expand All @@ -429,7 +426,7 @@ pub trait LocalIterator: SyncSend + IterClone + 'static {
// ///
// /// array.local_iter().blocking_for_each_with_schedule(Schedule::WorkStealing, |elem| println!("{:?} {elem}",std::thread::current().id()));
// ///```
// fn blocking_for_each_with_schedule<F>(&self, sched: Schedule, op: F)
// fn blocking_for_each_with_schedule<F>(&self, sched: Schedule, op: F)
// where
// F: Fn(Self::Item) + SyncSend + Clone + 'static,
// {
Expand Down Expand Up @@ -467,7 +464,7 @@ pub trait LocalIterator: SyncSend + IterClone + 'static {
/// fut.await;
/// }
///```
#[must_use = "this iteration adapter is lazy and does nothing unless awaited. Either await the returned future, or call 'spawn()' or 'block()' on it."]
#[must_use = "this iteration adapter is lazy and does nothing unless awaited. Either await the returned future, or call 'spawn()' or 'block()' on it."]
fn for_each_async<F, Fut>(&self, op: F) -> LocalIterForEachHandle
where
F: Fn(Self::Item) -> Fut + SyncSend + Clone + 'static,
Expand Down Expand Up @@ -504,15 +501,14 @@ pub trait LocalIterator: SyncSend + IterClone + 'static {
// /// fut.await;
// /// }
// ///```
// fn blocking_for_each_async<F, Fut>(&self, op: F)
// fn blocking_for_each_async<F, Fut>(&self, op: F)
// where
// F: Fn(Self::Item) -> Fut + SyncSend + Clone + 'static,
// Fut: Future<Output = ()> + Send + 'static,
// {
// self.array().blocking_for_each_async(self, op)
// }


/// Calls a closure on each element of a Local Iterator in parallel on the calling PE (the PE must have some local data of the array) using the specififed [Schedule][crate::array::iterator::Schedule] policy.
///
/// The supplied closure must return a future.
Expand All @@ -536,7 +532,7 @@ pub trait LocalIterator: SyncSend + IterClone + 'static {
/// });
/// array.wait_all();
///```
#[must_use = "this iteration adapter is lazy and does nothing unless awaited. Either await the returned future, or call 'spawn()' or 'block()' on it."]
#[must_use = "this iteration adapter is lazy and does nothing unless awaited. Either await the returned future, or call 'spawn()' or 'block()' on it."]
fn for_each_async_with_schedule<F, Fut>(&self, sched: Schedule, op: F) -> LocalIterForEachHandle
where
F: Fn(Self::Item) -> Fut + SyncSend + Clone + 'static,
Expand Down Expand Up @@ -588,7 +584,7 @@ pub trait LocalIterator: SyncSend + IterClone + 'static {
/// let req = array.local_iter().reduce(|acc,elem| acc+elem);
/// let sum = array.block_on(req); //wait on the collect request to get the new array
///```
#[must_use = "this iteration adapter is lazy and does nothing unless awaited. Either await the returned future, or call 'spawn()' or 'block()' on it."]
#[must_use = "this iteration adapter is lazy and does nothing unless awaited. Either await the returned future, or call 'spawn()' or 'block()' on it."]
fn reduce<F>(&self, op: F) -> LocalIterReduceHandle<Self::Item, F>
where
// &'static Self: LocalIterator + 'static,
Expand Down Expand Up @@ -635,7 +631,7 @@ pub trait LocalIterator: SyncSend + IterClone + 'static {
/// let req = array.local_iter().reduce_with_schedule(Schedule::Chunk(10),|acc,elem| acc+elem);
/// let sum = array.block_on(req); //wait on the collect request to get the new array
///```
#[must_use = "this iteration adapter is lazy and does nothing unless awaited. Either await the returned future, or call 'spawn()' or 'block()' on it."]
#[must_use = "this iteration adapter is lazy and does nothing unless awaited. Either await the returned future, or call 'spawn()' or 'block()' on it."]
fn reduce_with_schedule<F>(
&self,
sched: Schedule,
Expand Down Expand Up @@ -691,7 +687,7 @@ pub trait LocalIterator: SyncSend + IterClone + 'static {
/// let req = array.local_iter().map(elem.load()).filter(|elem| elem % 2 == 0).collect::<ReadOnlyArray<usize>>(Distribution::Cyclic);
/// let new_array = array.block_on(req);
///```
#[must_use = "this iteration adapter is lazy and does nothing unless awaited. Either await the returned future, or call 'spawn()' or 'block()' on it."]
#[must_use = "this iteration adapter is lazy and does nothing unless awaited. Either await the returned future, or call 'spawn()' or 'block()' on it."]
fn collect<A>(&self, d: Distribution) -> LocalIterCollectHandle<Self::Item, A>
where
// &'static Self: LocalIterator + 'static,
Expand Down Expand Up @@ -740,7 +736,7 @@ pub trait LocalIterator: SyncSend + IterClone + 'static {
/// let req = array.local_iter().map(elem.load()).filter(|elem| elem % 2 == 0).collect_with_schedule::<ReadOnlyArray<usize>>(Scheduler::WorkStealing,Distribution::Cyclic);
/// let new_array = array.block_on(req);
///```
#[must_use = "this iteration adapter is lazy and does nothing unless awaited. Either await the returned future, or call 'spawn()' or 'block()' on it."]
#[must_use = "this iteration adapter is lazy and does nothing unless awaited. Either await the returned future, or call 'spawn()' or 'block()' on it."]
fn collect_with_schedule<A>(
&self,
sched: Schedule,
Expand Down Expand Up @@ -819,7 +815,7 @@ pub trait LocalIterator: SyncSend + IterClone + 'static {
/// .collect_async::<ReadOnlyArray<usize>,_>(Distribution::Cyclic);
/// let _new_array = array.block_on(req);
///```
#[must_use = "this iteration adapter is lazy and does nothing unless awaited. Either await the returned future, or call 'spawn()' or 'block()' on it."]
#[must_use = "this iteration adapter is lazy and does nothing unless awaited. Either await the returned future, or call 'spawn()' or 'block()' on it."]
fn collect_async<A, T>(&self, d: Distribution) -> LocalIterCollectHandle<T, A>
where
// &'static Self: DistributedIterator + 'static,
Expand Down Expand Up @@ -874,7 +870,7 @@ pub trait LocalIterator: SyncSend + IterClone + 'static {
// self.array().blocking_collect_async(self, d)
// }

/// Collects the awaited elements of the local iterator into a new LamellarArray, using the provided [Schedule][crate::array::iterator::Schedule] policy
/// Collects the awaited elements of the local iterator into a new LamellarArray, using the provided [Schedule][crate::array::iterator::Schedule] policy
///
/// Calling this function invokes an implicit barrier across all PEs in the Array.
///
Expand Down Expand Up @@ -911,8 +907,12 @@ pub trait LocalIterator: SyncSend + IterClone + 'static {
/// .collect_async_with_schedule::<ReadOnlyArray<usize>,_>(Scheduler::Dynamic, Distribution::Cyclic);
/// let _new_array = array.block_on(req);
///```
#[must_use = "this iteration adapter is lazy and does nothing unless awaited. Either await the returned future, or call 'spawn()' or 'block()' on it."]
fn collect_async_with_schedule<A, T>(&self, sched: Schedule, d: Distribution) -> LocalIterCollectHandle<T, A>
#[must_use = "this iteration adapter is lazy and does nothing unless awaited. Either await the returned future, or call 'spawn()' or 'block()' on it."]
fn collect_async_with_schedule<A, T>(
&self,
sched: Schedule,
d: Distribution,
) -> LocalIterCollectHandle<T, A>
where
// &'static Self: DistributedIterator + 'static,
T: Dist + ArrayOps,
Expand All @@ -922,7 +922,7 @@ pub trait LocalIterator: SyncSend + IterClone + 'static {
self.array().collect_async_with_schedule(sched, self, d)
}

// /// Collects the awaited elements of the local iterator into a new LamellarArray,using the provided [Schedule][crate::array::iterator::Schedule] policy
// /// Collects the awaited elements of the local iterator into a new LamellarArray,using the provided [Schedule][crate::array::iterator::Schedule] policy
// ///
// /// Calling this function invokes an implicit barrier across all PEs in the Array.
// ///
Expand Down Expand Up @@ -981,7 +981,7 @@ pub trait LocalIterator: SyncSend + IterClone + 'static {
/// let req = array.local_iter().count();
/// let cnt = array.block_on(req);
///```
#[must_use = "this iteration adapter is lazy and does nothing unless awaited. Either await the returned future, or call 'spawn()' or 'block()' on it."]
#[must_use = "this iteration adapter is lazy and does nothing unless awaited. Either await the returned future, or call 'spawn()' or 'block()' on it."]
fn count(&self) -> LocalIterCountHandle {
self.array().count(self)
}
Expand Down Expand Up @@ -1018,7 +1018,7 @@ pub trait LocalIterator: SyncSend + IterClone + 'static {
/// let req = array.local_iter().count_with_schedule(Schedule::Dynamic);
/// let cnt = array.block_on(req);
///```
#[must_use = "this iteration adapter is lazy and does nothing unless awaited. Either await the returned future, or call 'spawn()' or 'block()' on it."]
#[must_use = "this iteration adapter is lazy and does nothing unless awaited. Either await the returned future, or call 'spawn()' or 'block()' on it."]
fn count_with_schedule(&self, sched: Schedule) -> LocalIterCountHandle {
self.array().count_with_schedule(sched, self)
}
Expand All @@ -1040,7 +1040,6 @@ pub trait LocalIterator: SyncSend + IterClone + 'static {
// self.array().blocking_count_with_schedule(sched, self)
// }


/// Sums the elements of the local iterator.
///
/// Takes each element, adds them together, and returns the result.
Expand All @@ -1060,7 +1059,7 @@ pub trait LocalIterator: SyncSend + IterClone + 'static {
/// let req = array.local_iter().sum();
/// let sum = array.block_on(req); //wait on the collect request to get the new array
///```
#[must_use = "this iteration adapter is lazy and does nothing unless awaited. Either await the returned future, or call 'spawn()' or 'block()' on it."]
#[must_use = "this iteration adapter is lazy and does nothing unless awaited. Either await the returned future, or call 'spawn()' or 'block()' on it."]
fn sum(&self) -> LocalIterSumHandle<Self::Item>
where
Self::Item: SyncSend + std::iter::Sum + for<'a> std::iter::Sum<&'a Self::Item>,
Expand Down
20 changes: 10 additions & 10 deletions src/array/unsafe/iteration/distributed.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::active_messaging::SyncSend;
use crate::array::iterator::distributed_iterator::*;
use crate::array::iterator::private::Sealed;
use crate::array::iterator::Schedule;
use crate::array::r#unsafe::{UnsafeArray, UnsafeArrayInner};
use crate::array::{ArrayOps, AsyncTeamFrom, Distribution, InnerArray};
use crate::array::iterator::Schedule;
use crate::lamellar_team::LamellarTeamRT;
use crate::memregion::Dist;

Expand Down Expand Up @@ -100,7 +100,7 @@ macro_rules! consumer_impl {
// if std::thread::current().id() != *crate::MAIN_THREAD {
// let name = stringify!{$name};
// let msg = format!("
// [LAMELLAR WARNING] You are calling `blocking_{name}[_with_schedule]` from within an async context which may lead to deadlock, it is recommended that you use `{name}[_with_schedule]().await;` instead!
// [LAMELLAR WARNING] You are calling `blocking_{name}[_with_schedule]` from within an async context which may lead to deadlock, it is recommended that you use `{name}[_with_schedule]().await;` instead!
// Set LAMELLAR_BLOCKING_CALL_WARNING=0 to disable this warning, Set RUST_LIB_BACKTRACE=1 to see where the call is occcuring: {}", std::backtrace::Backtrace::capture()
// );
// if let Some(val) = config().blocking_call_warning {
Expand Down Expand Up @@ -145,9 +145,9 @@ impl DistIteratorLauncher for UnsafeArrayInner {
Some(self.subarray_index_from_local(index * chunk_size)? / chunk_size)
}
}

consumer_impl!(
for_each<I, F>(iter: &I, op: F);
for_each<I, F>(iter: &I, op: F);
[DistIterForEachHandle];
[I: DistributedIterator + 'static, F: Fn(I::Item) + SyncSend + Clone + 'static];
[
Expand All @@ -159,7 +159,7 @@ impl DistIteratorLauncher for UnsafeArrayInner {
[()]);

consumer_impl!(
for_each_async<I, F, Fut>(iter: &I, op: F);
for_each_async<I, F, Fut>(iter: &I, op: F);
[DistIterForEachHandle];
[I: DistributedIterator + 'static, F: Fn(I::Item) -> Fut + SyncSend + Clone + 'static, Fut: Future<Output = ()> + Send + 'static];
[
Expand All @@ -172,7 +172,7 @@ impl DistIteratorLauncher for UnsafeArrayInner {
);

consumer_impl!(
reduce<I, F>( iter: &I, op: F);
reduce<I, F>( iter: &I, op: F);
[DistIterReduceHandle<I::Item, F>];
[I: DistributedIterator + 'static, I::Item: Dist + ArrayOps, F: Fn(I::Item, I::Item) -> I::Item + SyncSend + Clone + 'static];
[
Expand All @@ -184,7 +184,7 @@ impl DistIteratorLauncher for UnsafeArrayInner {
[Option<I::Item>]);

consumer_impl!(
collect<I, A>( iter: &I, d: Distribution);
collect<I, A>( iter: &I, d: Distribution);
[DistIterCollectHandle<I::Item, A>];
[I: DistributedIterator + 'static, I::Item: Dist + ArrayOps, A: AsyncTeamFrom<(Vec<I::Item>, Distribution)> + SyncSend + Clone + 'static,];
[
Expand All @@ -196,7 +196,7 @@ impl DistIteratorLauncher for UnsafeArrayInner {
];
[A]);
consumer_impl!(
collect_async<I, A, B>( iter: &I, d: Distribution);
collect_async<I, A, B>( iter: &I, d: Distribution);
[DistIterCollectHandle<B, A>];
[I: DistributedIterator + 'static, I::Item: Future<Output = B> + Send + 'static,B: Dist + ArrayOps,A: AsyncTeamFrom<(Vec<B>, Distribution)> + SyncSend + Clone + 'static,];
[
Expand All @@ -209,7 +209,7 @@ impl DistIteratorLauncher for UnsafeArrayInner {
[A]);

consumer_impl!(
count<I>( iter: &I);
count<I>( iter: &I);
[DistIterCountHandle];
[I: DistributedIterator + 'static ];
[
Expand All @@ -220,7 +220,7 @@ impl DistIteratorLauncher for UnsafeArrayInner {
[usize]);

consumer_impl!(
sum<I>(iter: &I);
sum<I>(iter: &I);
[DistIterSumHandle<I::Item>];
[I: DistributedIterator + 'static, I::Item: Dist + ArrayOps + std::iter::Sum, ];
[
Expand Down
Loading

0 comments on commit b4f68d6

Please sign in to comment.