mirror of https://github.com/xythrez/RustMP.git
Initial Version
This commit is contained in:
parent
570e884156
commit
2d7facfb2c
|
@ -5,8 +5,8 @@ authors = ["Yiyao Yu <yuydevel@protonmail.com>"]
|
||||||
edition = "2018"
|
edition = "2018"
|
||||||
|
|
||||||
[lib]
|
[lib]
|
||||||
proc-macro = true
|
|
||||||
|
|
||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
rand = "*"
|
|
@ -1,167 +0,0 @@
|
||||||
// libraries used:
|
|
||||||
// std::sync::Arc;
|
|
||||||
// std::sync::RwLock;
|
|
||||||
// std::sync::atomic::AtomicIsize;
|
|
||||||
// std::sync::atomic::AtomicI32;
|
|
||||||
// std::sync::atomic::Ordering;
|
|
||||||
// std::thread;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* A basic sequential function that we want to modify.
|
|
||||||
* This is written in the most human readable way possible. Not compatable
|
|
||||||
* with `#[rmp_parallel_for]`.
|
|
||||||
*/
|
|
||||||
fn seq_main() {
|
|
||||||
let mut counter = 0;
|
|
||||||
|
|
||||||
for i in 0..4 {
|
|
||||||
println!("Index {}: Hello from loop {}!", counter, i);
|
|
||||||
counter += 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* What the user should write for an RustMP parallel program
|
|
||||||
* The parallel section needs to be in its own function, due to current
|
|
||||||
* Rust limitations with custom attributes on expressions.
|
|
||||||
* See <https://github.com/rust-lang/rust/issues/54727> for details
|
|
||||||
*
|
|
||||||
* Function should be valid Rust regardless of whether macro is applied.
|
|
||||||
* Whether Arc gets applied automatically or manually is up for debate.
|
|
||||||
*/
|
|
||||||
fn aug_main() {
|
|
||||||
let mut counter = 0;
|
|
||||||
|
|
||||||
//#[rmp_parallel_for(shared(counter) schedule(static, 1))]
|
|
||||||
fn _loop(counter: &mut i32) {
|
|
||||||
for i in 0..4 {
|
|
||||||
println!("Index {}: Hello from loop {}!", counter, i);
|
|
||||||
*counter += 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_loop(&mut counter);
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* What `#[rmp_parallel_for]` should convert the function into
|
|
||||||
* Current implementation is still ad-hoc, but hopefully the
|
|
||||||
* macro would expand the function as designed.
|
|
||||||
*
|
|
||||||
* Number of threads __rmp_internal_max_threads = 4
|
|
||||||
*/
|
|
||||||
fn rmp_main() {
|
|
||||||
let mut counter = 0;
|
|
||||||
|
|
||||||
fn _loop(counter: &mut i32) {
|
|
||||||
// Startup - Populate environment variables using env::var
|
|
||||||
let __rmp_internal_max_threads = 4;
|
|
||||||
|
|
||||||
// Startup - Populate macro parameters
|
|
||||||
let __rmp_internal_block_size = 1;
|
|
||||||
|
|
||||||
// Startup - Initialize required arrays
|
|
||||||
let mut __rmp_internal_threads_arr = vec![];
|
|
||||||
let mut __rmp_internal_iter_arr = vec![];
|
|
||||||
for _ in 0..__rmp_internal_max_threads {
|
|
||||||
__rmp_internal_iter_arr.push(vec![]);
|
|
||||||
}
|
|
||||||
let mut __rmp_internal_curr_block_size = 0;
|
|
||||||
let mut __rmp_internal_curr_block_thread = 0;
|
|
||||||
|
|
||||||
// Startup - Promote shared mutables into Arc references
|
|
||||||
// Idea - Possible optimization based on type? RwLock is expensive.
|
|
||||||
let __rmp_var_counter = std::sync::Arc::new(std::sync::atomic::AtomicI32::new(*counter));
|
|
||||||
|
|
||||||
// Execution - Precompute the iterations for each loop
|
|
||||||
// The 0..4 here should be parsed from the original tokens
|
|
||||||
for __rmp_internal_i in 0..4 {
|
|
||||||
__rmp_internal_iter_arr[__rmp_internal_curr_block_thread].push(__rmp_internal_i);
|
|
||||||
__rmp_internal_curr_block_size += 1;
|
|
||||||
if __rmp_internal_curr_block_size >= __rmp_internal_block_size {
|
|
||||||
__rmp_internal_curr_block_thread =
|
|
||||||
(__rmp_internal_curr_block_thread + 1) % __rmp_internal_max_threads;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Startup - Extract the thread's own iterator
|
|
||||||
let __rmp_internal_iter_self = __rmp_internal_iter_arr.remove(0);
|
|
||||||
|
|
||||||
// Execution - Spawn threads with loop contents
|
|
||||||
for __rmp_internal_iter in __rmp_internal_iter_arr {
|
|
||||||
// Clone used Arcs here
|
|
||||||
let __rmp_var_counter = std::sync::Arc::clone(&__rmp_var_counter);
|
|
||||||
|
|
||||||
// Spawn threads
|
|
||||||
__rmp_internal_threads_arr.push(std::thread::spawn(move || {
|
|
||||||
for i in __rmp_internal_iter {
|
|
||||||
// Having separate load and fetch_add should be a data race,
|
|
||||||
// However, I believe OpenMP also treats it as a data race,
|
|
||||||
// so its fine to have this issue
|
|
||||||
// Need to implement #[rmp_critical] to update it correctly
|
|
||||||
println!(
|
|
||||||
"Index {}: Hello from loop {}!",
|
|
||||||
__rmp_var_counter.load(std::sync::atomic::Ordering::SeqCst),
|
|
||||||
i
|
|
||||||
);
|
|
||||||
__rmp_var_counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
|
|
||||||
}
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Execution - Extract the same thread logic for self
|
|
||||||
for i in __rmp_internal_iter_self {
|
|
||||||
println!(
|
|
||||||
"Index {}: Hello from loop {}!",
|
|
||||||
__rmp_var_counter.load(std::sync::atomic::Ordering::SeqCst),
|
|
||||||
i
|
|
||||||
);
|
|
||||||
__rmp_var_counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Cleanup - Wait for threads
|
|
||||||
for __rmp_internal_thread in __rmp_internal_threads_arr {
|
|
||||||
let _ = __rmp_internal_thread.join();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Cleanup - Restore variables from Arc references
|
|
||||||
*counter = __rmp_var_counter.load(std::sync::atomic::Ordering::SeqCst);
|
|
||||||
}
|
|
||||||
_loop(&mut counter);
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* A basic parallel function written by hand in the most human readable way
|
|
||||||
* possible.
|
|
||||||
*/
|
|
||||||
fn par_main() {
|
|
||||||
let counter = std::sync::Arc::new(std::sync::atomic::AtomicIsize::new(0));
|
|
||||||
let mut children = vec![];
|
|
||||||
|
|
||||||
for i in 1..4 {
|
|
||||||
let counter = std::sync::Arc::clone(&counter);
|
|
||||||
children.push(std::thread::spawn(move || {
|
|
||||||
let index = counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
|
|
||||||
println!("Index {}: Hello from loop {}!", index, i);
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
let index = counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
|
|
||||||
println!("Index {}: Hello from loop {}!", index, 0);
|
|
||||||
|
|
||||||
for child in children {
|
|
||||||
let _ = child.join();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn main() {
|
|
||||||
println!("Running Sequential Version:");
|
|
||||||
seq_main();
|
|
||||||
|
|
||||||
println!("\nRunning Augmented Sequential Version:");
|
|
||||||
aug_main();
|
|
||||||
|
|
||||||
println!("\nRunning Ad-hoc Parallel Version:");
|
|
||||||
par_main();
|
|
||||||
|
|
||||||
println!("\nRunning Augmented Parallel Version:");
|
|
||||||
rmp_main();
|
|
||||||
}
|
|
|
@ -1,12 +1,36 @@
|
||||||
use rustmp::rmp_parallel_for;
|
use rand::Rng;
|
||||||
|
use rustmp::par_for;
|
||||||
|
use std::time;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct Student {
|
||||||
|
name: String,
|
||||||
|
age: u8,
|
||||||
|
gpa: f32,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Student {
|
||||||
|
pub fn new(age: u8) -> Student {
|
||||||
|
Student { name: "Default".to_string(),
|
||||||
|
age: age,
|
||||||
|
gpa: age as f32 }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
#[rmp_parallel_for]
|
let numbers: Vec<Student> = vec![];
|
||||||
fn inner() {
|
|
||||||
for i in 1..10 {
|
|
||||||
println!("Hello from {}!", i);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
inner();
|
par_for! {
|
||||||
|
for i in 1..10, capturing numbers {
|
||||||
|
std::thread::sleep(
|
||||||
|
time::Duration::from_secs(
|
||||||
|
rand::thread_rng().gen_range(1..10)));
|
||||||
|
let mut lock = numbers.write();
|
||||||
|
lock.push(Student::new(i));
|
||||||
|
println!("Thread {} running!", i);
|
||||||
|
} };
|
||||||
|
|
||||||
|
for num in numbers {
|
||||||
|
println!("{:?}", num);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
58
src/lib.rs
58
src/lib.rs
|
@ -1,7 +1,57 @@
|
||||||
use proc_macro::TokenStream;
|
use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||||
|
|
||||||
#[proc_macro_attribute]
|
pub struct Capture<T> {
|
||||||
pub fn rmp_parallel_for(args: TokenStream, func: TokenStream) -> TokenStream {
|
value: Arc<RwLock<T>>,
|
||||||
func
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<T> Capture<T> {
|
||||||
|
pub fn new(inner: T) -> Capture<T> {
|
||||||
|
return Capture {
|
||||||
|
value: Arc::new(RwLock::new(inner)),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn clone(&self) -> Capture<T> {
|
||||||
|
Capture {
|
||||||
|
value: Arc::clone(&self.value),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn read(&self) -> RwLockReadGuard<T> {
|
||||||
|
return self.value.as_ref().read().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn write(&self) -> RwLockWriteGuard<T> {
|
||||||
|
return self.value.as_ref().write().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn unwrap(self) -> T {
|
||||||
|
Arc::try_unwrap(self.value)
|
||||||
|
.ok()
|
||||||
|
.and_then(|o| o.into_inner().ok())
|
||||||
|
.expect("Error: reference copied out of loop")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[macro_export]
|
||||||
|
macro_rules! par_for {
|
||||||
|
(for $name:ident in $iterator:expr, capturing $captured:ident $blk:block) => {
|
||||||
|
use rustmp::Capture;
|
||||||
|
use std::sync::{Arc, RwLock};
|
||||||
|
use std::thread;
|
||||||
|
|
||||||
|
let itr = $iterator;
|
||||||
|
let $captured = Capture::new($captured);
|
||||||
|
let mut handles: Vec<thread::JoinHandle<()>> = vec![];
|
||||||
|
for $name in itr {
|
||||||
|
let $captured = $captured.clone();
|
||||||
|
handles.push(thread::spawn(move || $blk));
|
||||||
|
}
|
||||||
|
|
||||||
|
for handle in handles {
|
||||||
|
handle.join().expect("Thread paniced!");
|
||||||
|
}
|
||||||
|
|
||||||
|
let $captured = $captured.unwrap();
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue