diff --git a/src/lib.rs b/src/lib.rs index d8c8198..00990ae 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,7 @@ -pub mod sysinfo; pub mod threadpool; +mod sysinfo; + use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}; pub struct Capture { diff --git a/src/threadpool.rs b/src/threadpool.rs index 2ac10aa..a19aab0 100644 --- a/src/threadpool.rs +++ b/src/threadpool.rs @@ -11,8 +11,15 @@ lazy_static! { Arc::new(Mutex::new(ThreadPoolManager::new())); } +/// The Job type used to submit tasks for the ThreadPoolManager +/// +/// Most function captures can be cast to a Job directly. Or the +/// "as_static_job()" function can be used. pub type Job = Arc; +/// Converts a function capture into a Job with a static lifetime. +/// +/// It's also possible to use "Arc::new(|| {})" and cast it as a Job instead. pub fn as_static_job(capture: T) -> Job where T: Fn() + Send + Sync + 'static, @@ -20,6 +27,11 @@ where Arc::new(capture) } +/// The ThreadPoolManager handles dispatching threads and sending Jobs to threads. +/// +/// Only one thread can submit and execute Jobs to the ThreadPoolManager instance at a time. +/// Other threads attempting to lock the manager would wait on the ThreadPoolManager until +/// the last thread using it unlocks the instance. pub struct ThreadPoolManager { pub num_threads: usize, task_barrier: Arc, @@ -28,12 +40,21 @@ pub struct ThreadPoolManager { } impl ThreadPoolManager { + /// Creates a new ThreadPoolManager object. + /// + /// To get the current ThreadPoolManager, use get_instance_guard() instead. + /// + /// Should only be called by the INSTANCE. fn new() -> ThreadPoolManager { let master_hook = panic::take_hook(); // Crash the program if any of our threads panic panic::set_hook(Box::new(move |info| { // Only panic on our own threads, leave application programmer's threads alone - if current().name().unwrap_or_default().starts_with("RMP_PAR_THREAD_") { + if current() + .name() + .unwrap_or_default() + .starts_with("RMP_PAR_THREAD_#") + { master_hook(info); process::exit(1); } else { @@ -49,7 +70,7 @@ impl ThreadPoolManager { for tid in 0..num_threads { let task_barrier = task_barrier.clone(); let builder = Builder::new() // Thread builder configuration - .name(format!("RMP_PAR_THREAD_{}", tid)) // Name: RMP_PAR_THREAD_tid + .name(format!("RMP_PAR_THREAD_#{}", tid)) // Name: RMP_PAR_THREAD_tid .stack_size(8 << 20); // Stack size: 8MB (Linux default) let (sender, receiver) = channel::(); task_comms.push(sender); @@ -68,14 +89,22 @@ impl ThreadPoolManager { } } + /// Gets the current ThreadPoolManager instance. + /// + /// The instance needs to be locked before using, not unlocking the TPM after use + /// may result in deadlock. pub fn get_instance_guard() -> Arc> { return INSTANCE.clone(); } + /// Execute a set of tasks on the ThreadPoolManager. + /// + /// The task vector must be the same size as the number of threads, otherwise a panic will + /// be thrown. pub fn exec(&self, tasks: Vec) { // Used to wake up threads self.task_barrier.wait(); - assert_eq!(SystemObject::get_instance().max_num_threads, tasks.len()); + assert_eq!(self.num_threads, tasks.len()); for i in 0..tasks.len() { self.task_comms[i].send(tasks[i].clone()).unwrap(); } @@ -84,14 +113,14 @@ impl ThreadPoolManager { } } +/// Wrapper routine for threads in the ThreadPoolManager fn routine_wrapper(tid: usize, task_barrier: Arc, receiver: Receiver) { SystemObject::get_instance() .set_affinity(tid) .unwrap_or_else(|e| eprintln!("Failed to bind process #{} to hwthread: {:?}", tid, e)); loop { task_barrier.wait(); - let func = receiver.recv().unwrap(); - func(); + receiver.recv().unwrap()(); task_barrier.wait(); } }