Skip to content

Commit

Permalink
add readonly and unsafe array local_chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
rdfriese committed Feb 28, 2024
1 parent ed82808 commit 983f340
Show file tree
Hide file tree
Showing 7 changed files with 333 additions and 6 deletions.
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -363,9 +363,9 @@ path="examples/array_examples/global_lock_array.rs"
#name="2d_array"
#path="examples/array_examples/2d_array.rs"

#[[example]]
#name="histo"
#path="examples/array_examples/histo.rs"
[[example]]
name="histo"
path="examples/array_examples/histo.rs"

##------------ RDMA Examples -----------------##
[[example]]
Expand Down
29 changes: 29 additions & 0 deletions examples/array_examples/histo.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// use itertools::Itertools;
use lamellar::active_messaging::prelude::*;
use lamellar::array::prelude::*;
use rand::{distributions::Distribution, rngs::StdRng, SeedableRng};
use std::time::Instant;

const ARRAY_SIZE: usize = 1000000000;
const NUM_UPDATES_PER_PE: usize = 100000;

fn main() {
let world = lamellar::LamellarWorldBuilder::new().build();
let array = AtomicArray::<usize>::new(&world, ARRAY_SIZE, lamellar::Distribution::Block);
let mut rng: StdRng = SeedableRng::seed_from_u64(world.my_pe() as u64);
let range = rand::distributions::Uniform::new(0, ARRAY_SIZE);

let start = Instant::now();
let histo = array.batch_add(
&mut range.sample_iter(&mut rng).take(NUM_UPDATES_PER_PE)
as &mut dyn Iterator<Item = usize>,
1,
);
world.block_on(histo);
world.barrier();
println!(
"PE{} time: {:?} done",
world.my_pe(),
start.elapsed().as_secs_f64()
);
}
13 changes: 10 additions & 3 deletions examples/kernels/parallel_array_gemm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,9 @@ fn main() {
let col = col.clone();
let c = c.clone();
let _ = a
.local_iter() //LocalIterator (each pe will iterate through only its local data -- in parallel)
.chunks(n) // chunk by the row size
// .local_iter() //LocalIterator (each pe will iterate through only its local data -- in parallel)
// .chunks(n) // chunk by the row size
.local_chunks(n)
.enumerate()
.for_each(move |(i, row)| {
let sum = unsafe { col.iter().zip(row).map(|(&i1, &i2)| i1 * i2).sum::<f32>() }; // dot product using rust iters... but MatrixMultiply is faster
Expand All @@ -85,10 +86,16 @@ fn main() {
world.wait_all();
world.barrier();
let elapsed = start.elapsed().as_secs_f64();
let sum = world.block_on(c.sum());

println!("Elapsed: {:?}", elapsed);
if my_pe == 0 {
println!("elapsed {:?} Gflops: {:?}", elapsed, num_gops / elapsed,);
println!(
"elapsed {:?} Gflops: {:?} {:?}",
elapsed,
num_gops / elapsed,
sum
);
}
// unsafe {
// world.block_on(c.dist_iter_mut().enumerate().for_each(|(i, x)| {
Expand Down
1 change: 1 addition & 0 deletions src/array/read_only.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod iteration;
mod rdma;
mod local_chunks;
use crate::array::private::LamellarArrayPrivate;
use crate::array::*;
use crate::darc::DarcMode;
Expand Down
99 changes: 99 additions & 0 deletions src/array/read_only/local_chunks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
use crate::array::iterator::local_iterator::{IndexedLocalIterator, LocalIterator};
use crate::array::iterator::private::*;
use crate::array::read_only::*;
use crate::array::LamellarArray;
use crate::memregion::Dist;


#[derive(Clone)]
pub struct ReadOnlyLocalChunks<T: Dist> {
chunk_size: usize,
index: usize, //global index within the array local data
end_index: usize, //global index within the array local data
array: ReadOnlyArray<T>,
}

impl<T: Dist> IterClone for ReadOnlyLocalChunks<T> {
fn iter_clone(&self, _: Sealed) -> Self {
ReadOnlyLocalChunks {
chunk_size: self.chunk_size,
index: self.index,
end_index: self.end_index,
array: self.array.clone(),
}
}
}



impl<T: Dist + 'static> LocalIterator for ReadOnlyLocalChunks<T> {
type Item = &'static [T];
type Array = ReadOnlyArray<T>;
fn init(&self, start_i: usize, cnt: usize) -> Self {
//these are with respect to the single elements, not chunk indexing and cnt
let end_i = std::cmp::min(
(start_i + cnt) * self.chunk_size,
self.array.num_elems_local(),
);
let new_start_i = start_i * self.chunk_size;
// println!(
// "start_i {} new_start_i {} end_i {} cnt: {}",
// start_i, new_start_i, end_i, cnt
// );
ReadOnlyLocalChunks {
chunk_size: self.chunk_size,
index: new_start_i,
end_index: end_i,
array: self.array.clone(),
}
}
fn array(&self) -> Self::Array {
self.array.clone()
}
fn next(&mut self) -> Option<Self::Item> {
// println!("next index {} end_index: {}", self.index, self.end_index);
if self.index < self.end_index {
let start_i = self.index;
self.index += self.chunk_size;
let end_i = std::cmp::min(self.index, self.end_index);
// println!(
// "start_i {} end_i {} self.index {} self.end_index {}",
// start_i, end_i, self.index, self.end_index
// );
Some(unsafe{std::slice::from_raw_parts_mut(
self.array.array.local_as_mut_ptr().offset(start_i as isize),
end_i - start_i,
)})
} else {
None
}
}
fn elems(&self, in_elems: usize) -> usize {
in_elems / self.chunk_size + (in_elems % self.chunk_size != 0) as usize
}

fn advance_index(&mut self, count: usize) {
self.index = std::cmp::min(self.index + count * self.chunk_size, self.end_index);
}
}

impl<T: Dist + 'static> IndexedLocalIterator for ReadOnlyLocalChunks<T> {
fn iterator_index(&self, index: usize) -> Option<usize> {
if index * self.chunk_size < self.array.len() {
Some(index) //everyone at this point as calculated the actual index (cause we are local only) so just return it
} else {
None
}
}
}

impl<T: Dist> ReadOnlyArray<T> {
pub fn local_chunks(&self, chunk_size: usize) -> ReadOnlyLocalChunks<T> {
ReadOnlyLocalChunks {
chunk_size,
index: 0,
end_index: 0,
array: self.clone(),
}
}
}
1 change: 1 addition & 0 deletions src/array/unsafe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod iteration;

pub(crate) mod operations;
mod rdma;
mod local_chunks;

use crate::active_messaging::*;
// use crate::array::r#unsafe::operations::BUFOPS;
Expand Down
190 changes: 190 additions & 0 deletions src/array/unsafe/local_chunks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
use crate::array::iterator::local_iterator::{IndexedLocalIterator, LocalIterator};
use crate::array::iterator::private::*;
use crate::array::r#unsafe::*;
use crate::array::LamellarArray;
use crate::memregion::Dist;


#[derive(Clone)]
pub struct UnsafeLocalChunks<T: Dist> {
chunk_size: usize,
index: usize, //global index within the array local data
end_index: usize, //global index within the array local data
array: UnsafeArray<T>,
}

impl<T: Dist> IterClone for UnsafeLocalChunks<T> {
fn iter_clone(&self, _: Sealed) -> Self {
UnsafeLocalChunks {
chunk_size: self.chunk_size,
index: self.index,
end_index: self.end_index,
array: self.array.clone(),
}
}
}

#[derive(Clone)]
pub struct UnsafeLocalChunksMut<T: Dist> {
chunk_size: usize,
index: usize, //global index within the array local data
end_index: usize, //global index within the array local data
array: UnsafeArray<T>,
}

impl<T: Dist> IterClone for UnsafeLocalChunksMut<T> {
fn iter_clone(&self, _: Sealed) -> Self {
UnsafeLocalChunksMut {
chunk_size: self.chunk_size,
index: self.index,
end_index: self.end_index,
array: self.array.clone(),
}
}
}



impl<T: Dist + 'static> LocalIterator for UnsafeLocalChunks<T> {
type Item = &'static [T];
type Array = UnsafeArray<T>;
fn init(&self, start_i: usize, cnt: usize) -> Self {
//these are with respect to the single elements, not chunk indexing and cnt
let end_i = std::cmp::min(
(start_i + cnt) * self.chunk_size,
self.array.num_elems_local(),
);
let new_start_i = start_i * self.chunk_size;
// println!(
// "start_i {} new_start_i {} end_i {} cnt: {}",
// start_i, new_start_i, end_i, cnt
// );
UnsafeLocalChunks {
chunk_size: self.chunk_size,
index: new_start_i,
end_index: end_i,
array: self.array.clone(),
}
}
fn array(&self) -> Self::Array {
self.array.clone()
}
fn next(&mut self) -> Option<Self::Item> {
// println!("next index {} end_index: {}", self.index, self.end_index);
if self.index < self.end_index {
let start_i = self.index;
self.index += self.chunk_size;
let end_i = std::cmp::min(self.index, self.end_index);
// println!(
// "start_i {} end_i {} self.index {} self.end_index {}",
// start_i, end_i, self.index, self.end_index
// );
Some(unsafe{std::slice::from_raw_parts_mut(
self.array.local_as_mut_ptr().offset(start_i as isize),
end_i - start_i,
)})
} else {
None
}
}
fn elems(&self, in_elems: usize) -> usize {
in_elems / self.chunk_size + (in_elems % self.chunk_size != 0) as usize
}

fn advance_index(&mut self, count: usize) {
self.index = std::cmp::min(self.index + count * self.chunk_size, self.end_index);
}
}

impl<T: Dist + 'static> IndexedLocalIterator for UnsafeLocalChunks<T> {
fn iterator_index(&self, index: usize) -> Option<usize> {
if index * self.chunk_size < self.array.len() {
Some(index) //everyone at this point as calculated the actual index (cause we are local only) so just return it
} else {
None
}
}
}



impl<T: Dist + 'static> LocalIterator for UnsafeLocalChunksMut<T> {
type Item = &'static mut [T];
type Array = UnsafeArray<T>;
fn init(&self, start_i: usize, cnt: usize) -> Self {
//these are with respect to the single elements, not chunk indexing and cnt
let end_i = std::cmp::min(
(start_i + cnt) * self.chunk_size,
self.array.num_elems_local(),
);
let new_start_i = start_i * self.chunk_size;
// println!(
// "start_i {} new_start_i {} end_i {} cnt: {}",
// start_i, new_start_i, end_i, cnt
// );
UnsafeLocalChunksMut {
chunk_size: self.chunk_size,
index: new_start_i,
end_index: end_i,
array: self.array.clone(),
}
}
fn array(&self) -> Self::Array {
self.array.clone()
}
fn next(&mut self) -> Option<Self::Item> {
// println!("next index {} end_index: {}", self.index, self.end_index);
if self.index < self.end_index {
let start_i = self.index;
self.index += self.chunk_size;
let end_i = std::cmp::min(self.index, self.end_index);
// println!(
// "start_i {} end_i {} self.index {} self.end_index {}",
// start_i, end_i, self.index, self.end_index
// );
Some(unsafe{std::slice::from_raw_parts_mut(
self.array.local_as_mut_ptr().offset(start_i as isize),
end_i - start_i,
)})
} else {
None
}
}
fn elems(&self, in_elems: usize) -> usize {
in_elems / self.chunk_size + (in_elems % self.chunk_size != 0) as usize
}

fn advance_index(&mut self, count: usize) {
self.index = std::cmp::min(self.index + count * self.chunk_size, self.end_index);
}
}

impl<T: Dist + 'static> IndexedLocalIterator for UnsafeLocalChunksMut<T> {
fn iterator_index(&self, index: usize) -> Option<usize> {
if index * self.chunk_size < self.array.len() {
Some(index) //everyone at this point as calculated the actual index (cause we are local only) so just return it
} else {
None
}
}
}

impl<T: Dist> UnsafeArray<T> {
pub fn local_chunks(&self, chunk_size: usize) -> UnsafeLocalChunks<T> {
UnsafeLocalChunks {
chunk_size,
index: 0,
end_index: 0,
array: self.clone()
}
}

pub fn local_chunks_mut(&self, chunk_size: usize) -> UnsafeLocalChunksMut<T> {
UnsafeLocalChunksMut {
chunk_size,
index: 0,
end_index: 0,
array: self.clone()
}
}
}

0 comments on commit 983f340

Please sign in to comment.