From 2d7facfb2c823a8fcd0ed042ef77d7a46396b1c8 Mon Sep 17 00:00:00 2001 From: a Date: Fri, 16 Apr 2021 21:25:49 -0400 Subject: [PATCH] Initial Version --- Cargo.toml | 2 +- src/bin/test_ad_hoc.rs | 167 ----------------------------------------- src/bin/test_simple.rs | 40 ++++++++-- src/lib.rs | 58 +++++++++++++- 4 files changed, 87 insertions(+), 180 deletions(-) delete mode 100644 src/bin/test_ad_hoc.rs diff --git a/Cargo.toml b/Cargo.toml index d8301c1..3d34921 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,8 +5,8 @@ authors = ["Yiyao Yu "] edition = "2018" [lib] -proc-macro = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +rand = "*" \ No newline at end of file diff --git a/src/bin/test_ad_hoc.rs b/src/bin/test_ad_hoc.rs deleted file mode 100644 index ee2fd34..0000000 --- a/src/bin/test_ad_hoc.rs +++ /dev/null @@ -1,167 +0,0 @@ -// libraries used: -// std::sync::Arc; -// std::sync::RwLock; -// std::sync::atomic::AtomicIsize; -// std::sync::atomic::AtomicI32; -// std::sync::atomic::Ordering; -// std::thread; - -/* - * A basic sequential function that we want to modify. - * This is written in the most human readable way possible. Not compatable - * with `#[rmp_parallel_for]`. - */ -fn seq_main() { - let mut counter = 0; - - for i in 0..4 { - println!("Index {}: Hello from loop {}!", counter, i); - counter += 1; - } -} - -/* - * What the user should write for an RustMP parallel program - * The parallel section needs to be in its own function, due to current - * Rust limitations with custom attributes on expressions. - * See for details - * - * Function should be valid Rust regardless of whether macro is applied. - * Whether Arc gets applied automatically or manually is up for debate. - */ -fn aug_main() { - let mut counter = 0; - - //#[rmp_parallel_for(shared(counter) schedule(static, 1))] - fn _loop(counter: &mut i32) { - for i in 0..4 { - println!("Index {}: Hello from loop {}!", counter, i); - *counter += 1; - } - } - _loop(&mut counter); -} - -/* - * What `#[rmp_parallel_for]` should convert the function into - * Current implementation is still ad-hoc, but hopefully the - * macro would expand the function as designed. - * - * Number of threads __rmp_internal_max_threads = 4 - */ -fn rmp_main() { - let mut counter = 0; - - fn _loop(counter: &mut i32) { - // Startup - Populate environment variables using env::var - let __rmp_internal_max_threads = 4; - - // Startup - Populate macro parameters - let __rmp_internal_block_size = 1; - - // Startup - Initialize required arrays - let mut __rmp_internal_threads_arr = vec![]; - let mut __rmp_internal_iter_arr = vec![]; - for _ in 0..__rmp_internal_max_threads { - __rmp_internal_iter_arr.push(vec![]); - } - let mut __rmp_internal_curr_block_size = 0; - let mut __rmp_internal_curr_block_thread = 0; - - // Startup - Promote shared mutables into Arc references - // Idea - Possible optimization based on type? RwLock is expensive. - let __rmp_var_counter = std::sync::Arc::new(std::sync::atomic::AtomicI32::new(*counter)); - - // Execution - Precompute the iterations for each loop - // The 0..4 here should be parsed from the original tokens - for __rmp_internal_i in 0..4 { - __rmp_internal_iter_arr[__rmp_internal_curr_block_thread].push(__rmp_internal_i); - __rmp_internal_curr_block_size += 1; - if __rmp_internal_curr_block_size >= __rmp_internal_block_size { - __rmp_internal_curr_block_thread = - (__rmp_internal_curr_block_thread + 1) % __rmp_internal_max_threads; - } - } - - // Startup - Extract the thread's own iterator - let __rmp_internal_iter_self = __rmp_internal_iter_arr.remove(0); - - // Execution - Spawn threads with loop contents - for __rmp_internal_iter in __rmp_internal_iter_arr { - // Clone used Arcs here - let __rmp_var_counter = std::sync::Arc::clone(&__rmp_var_counter); - - // Spawn threads - __rmp_internal_threads_arr.push(std::thread::spawn(move || { - for i in __rmp_internal_iter { - // Having separate load and fetch_add should be a data race, - // However, I believe OpenMP also treats it as a data race, - // so its fine to have this issue - // Need to implement #[rmp_critical] to update it correctly - println!( - "Index {}: Hello from loop {}!", - __rmp_var_counter.load(std::sync::atomic::Ordering::SeqCst), - i - ); - __rmp_var_counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst); - } - })); - } - - // Execution - Extract the same thread logic for self - for i in __rmp_internal_iter_self { - println!( - "Index {}: Hello from loop {}!", - __rmp_var_counter.load(std::sync::atomic::Ordering::SeqCst), - i - ); - __rmp_var_counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst); - } - - // Cleanup - Wait for threads - for __rmp_internal_thread in __rmp_internal_threads_arr { - let _ = __rmp_internal_thread.join(); - } - - // Cleanup - Restore variables from Arc references - *counter = __rmp_var_counter.load(std::sync::atomic::Ordering::SeqCst); - } - _loop(&mut counter); -} - -/* - * A basic parallel function written by hand in the most human readable way - * possible. - */ -fn par_main() { - let counter = std::sync::Arc::new(std::sync::atomic::AtomicIsize::new(0)); - let mut children = vec![]; - - for i in 1..4 { - let counter = std::sync::Arc::clone(&counter); - children.push(std::thread::spawn(move || { - let index = counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst); - println!("Index {}: Hello from loop {}!", index, i); - })); - } - let index = counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst); - println!("Index {}: Hello from loop {}!", index, 0); - - for child in children { - let _ = child.join(); - } -} - -fn main() { - println!("Running Sequential Version:"); - seq_main(); - - println!("\nRunning Augmented Sequential Version:"); - aug_main(); - - println!("\nRunning Ad-hoc Parallel Version:"); - par_main(); - - println!("\nRunning Augmented Parallel Version:"); - rmp_main(); -} diff --git a/src/bin/test_simple.rs b/src/bin/test_simple.rs index 89e166d..b440f11 100644 --- a/src/bin/test_simple.rs +++ b/src/bin/test_simple.rs @@ -1,12 +1,36 @@ -use rustmp::rmp_parallel_for; +use rand::Rng; +use rustmp::par_for; +use std::time; + +#[derive(Debug)] +struct Student { + name: String, + age: u8, + gpa: f32, +} + +impl Student { + pub fn new(age: u8) -> Student { + Student { name: "Default".to_string(), + age: age, + gpa: age as f32 } + } +} fn main() { - #[rmp_parallel_for] - fn inner() { - for i in 1..10 { - println!("Hello from {}!", i); - } - } + let numbers: Vec = vec![]; - inner(); + par_for! { + for i in 1..10, capturing numbers { + std::thread::sleep( + time::Duration::from_secs( + rand::thread_rng().gen_range(1..10))); + let mut lock = numbers.write(); + lock.push(Student::new(i)); + println!("Thread {} running!", i); + } }; + + for num in numbers { + println!("{:?}", num); + } } diff --git a/src/lib.rs b/src/lib.rs index b3bf305..43e2a5e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,57 @@ -use proc_macro::TokenStream; +use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}; -#[proc_macro_attribute] -pub fn rmp_parallel_for(args: TokenStream, func: TokenStream) -> TokenStream { - func +pub struct Capture { + value: Arc>, } +impl Capture { + pub fn new(inner: T) -> Capture { + return Capture { + value: Arc::new(RwLock::new(inner)), + }; + } + + pub fn clone(&self) -> Capture { + Capture { + value: Arc::clone(&self.value), + } + } + + pub fn read(&self) -> RwLockReadGuard { + return self.value.as_ref().read().unwrap(); + } + + pub fn write(&self) -> RwLockWriteGuard { + return self.value.as_ref().write().unwrap(); + } + + pub fn unwrap(self) -> T { + Arc::try_unwrap(self.value) + .ok() + .and_then(|o| o.into_inner().ok()) + .expect("Error: reference copied out of loop") + } +} + +#[macro_export] +macro_rules! par_for { + (for $name:ident in $iterator:expr, capturing $captured:ident $blk:block) => { + use rustmp::Capture; + use std::sync::{Arc, RwLock}; + use std::thread; + + let itr = $iterator; + let $captured = Capture::new($captured); + let mut handles: Vec> = vec![]; + for $name in itr { + let $captured = $captured.clone(); + handles.push(thread::spawn(move || $blk)); + } + + for handle in handles { + handle.join().expect("Thread paniced!"); + } + + let $captured = $captured.unwrap(); + }; +}