diff --git a/src/bin/test_simple.rs b/src/bin/test_simple.rs index b7e8236..f631d55 100644 --- a/src/bin/test_simple.rs +++ b/src/bin/test_simple.rs @@ -1,6 +1,5 @@ use rand::Rng; use rustmp::par_for; -use rustmp::sysinfo::SystemObject; use std::time; #[derive(Debug)] @@ -23,13 +22,10 @@ fn main() { par_for! { for i in 1..10, capturing numbers { - // TODO: move this to parallel macro once tid design is finalized - SystemObject::get_instance().set_affinity(i as usize - 1) - .expect("Failed to bind thread to proc!"); - std::thread::sleep( - time::Duration::from_secs( - rand::thread_rng().gen_range(1..10))); + //std::thread::sleep( + // time::Duration::from_secs( + // rand::thread_rng().gen_range(1..10))); let mut lock = numbers.write(); lock.push(Student::new(i)); println!("Thread {} running!", i); diff --git a/src/bin/tpm_example.rs b/src/bin/tpm_example.rs new file mode 100644 index 0000000..4640740 --- /dev/null +++ b/src/bin/tpm_example.rs @@ -0,0 +1,28 @@ +use rustmp::threadpool::{ThreadPoolManager, Job, as_static_job}; +use std::sync::Arc; + +fn main() { + let tpm_mtx= ThreadPoolManager::get_instance_guard(); + let tpm = tpm_mtx.lock().unwrap(); + + println!("Submitting jobs!"); + let mut vector = Vec::new(); + for i in 0..tpm.num_threads { + let cl = as_static_job(move || {println!("Hello from {}!", i)}); + vector.push(cl); + } + tpm.exec(vector); + + println!("Submitting more jobs with panic on tid=3!"); + let mut vector2 = Vec::new(); + for i in 0..tpm.num_threads { + let x = 9; + let cl = Arc::new(move || { + if x * i == 27 { + //panic!("Panic test"); + } + }) as Job; + vector2.push(cl); + } + tpm.exec(vector2); +} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 00990ae..b7c6f55 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -41,20 +41,37 @@ impl Capture { macro_rules! par_for { (for $name:ident in $iterator:expr, capturing $captured:ident $blk:block) => { use rustmp::Capture; + use rustmp::threadpool::{Job, ThreadPoolManager, as_static_job}; use std::sync::{Arc, RwLock}; use std::thread; - let itr = $iterator; + let mut tasks = Vec::new(); let $captured = Capture::new($captured); - let mut handles: Vec> = vec![]; - for $name in itr { - let $captured = $captured.clone(); - handles.push(thread::spawn(move || $blk)); + { + let tpm_mtx = ThreadPoolManager::get_instance_guard(); + let tpm = tpm_mtx.lock().unwrap(); + let iters = tpm.split_iterators($iterator, 1); + for iter in iters { + let $captured = $captured.clone(); + tasks.push(as_static_job(move || { + for &$name in &iter + $blk + })); + } + tpm.exec(tasks); } - for handle in handles { - handle.join().expect("Thread paniced!"); - } + //let itr = $iterator; + //let $captured = Capture::new($captured); + //let mut handles: Vec> = vec![]; + //for $name in itr { + // let $captured = $captured.clone(); + // handles.push(thread::spawn(move || $blk)); + //} + + //for handle in handles { + // handle.join().expect("Thread paniced!"); + //} let $captured = $captured.unwrap(); }; diff --git a/src/threadpool.rs b/src/threadpool.rs index a19aab0..f846f70 100644 --- a/src/threadpool.rs +++ b/src/threadpool.rs @@ -111,6 +111,34 @@ impl ThreadPoolManager { // Used to return main thread from exec self.task_barrier.wait(); } + + /// Splits an iterator into RMP_NUM_THREADS iterators, each with a step size of + /// block_size. + /// + /// Returned iterators are stored in a Vec>, but anything should work as + /// long as the default Rust for loop accepts it. + pub fn split_iterators(&self, iter: T, block_size: usize) -> Vec> + where + T: Iterator + { + let mut split = Vec::new(); + split.reserve_exact(self.num_threads); + for _ in 0..self.num_threads { + split.push(Vec::new()); + } + + let mut index: usize = 0; + let mut block: usize = 0; + for element in iter { + split[index].push(element); + block += 1; + if block % block_size == 0 { + block = 0; + index = (index + 1) % self.num_threads; + } + } + split + } } /// Wrapper routine for threads in the ThreadPoolManager