Refactored and documented threadpool.rs

This commit is contained in:
Jack Yu 2021-04-18 11:08:18 -04:00
parent d97b660b4b
commit 5e7d6da4ec
2 changed files with 36 additions and 6 deletions

View File

@ -1,6 +1,7 @@
pub mod sysinfo;
pub mod threadpool; pub mod threadpool;
mod sysinfo;
use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}; use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
pub struct Capture<T> { pub struct Capture<T> {

View File

@ -11,8 +11,15 @@ lazy_static! {
Arc::new(Mutex::new(ThreadPoolManager::new())); Arc::new(Mutex::new(ThreadPoolManager::new()));
} }
/// The Job type used to submit tasks for the ThreadPoolManager
///
/// Most function captures can be cast to a Job directly. Or the
/// "as_static_job()" function can be used.
pub type Job = Arc<dyn Fn() + Send + Sync>; pub type Job = Arc<dyn Fn() + Send + Sync>;
/// Converts a function capture into a Job with a static lifetime.
///
/// It's also possible to use "Arc::new(|| {})" and cast it as a Job instead.
pub fn as_static_job<T>(capture: T) -> Job pub fn as_static_job<T>(capture: T) -> Job
where where
T: Fn() + Send + Sync + 'static, T: Fn() + Send + Sync + 'static,
@ -20,6 +27,11 @@ where
Arc::new(capture) Arc::new(capture)
} }
/// The ThreadPoolManager handles dispatching threads and sending Jobs to threads.
///
/// Only one thread can submit and execute Jobs to the ThreadPoolManager instance at a time.
/// Other threads attempting to lock the manager would wait on the ThreadPoolManager until
/// the last thread using it unlocks the instance.
pub struct ThreadPoolManager { pub struct ThreadPoolManager {
pub num_threads: usize, pub num_threads: usize,
task_barrier: Arc<Barrier>, task_barrier: Arc<Barrier>,
@ -28,12 +40,21 @@ pub struct ThreadPoolManager {
} }
impl ThreadPoolManager { impl ThreadPoolManager {
/// Creates a new ThreadPoolManager object.
///
/// To get the current ThreadPoolManager, use get_instance_guard() instead.
///
/// Should only be called by the INSTANCE.
fn new() -> ThreadPoolManager { fn new() -> ThreadPoolManager {
let master_hook = panic::take_hook(); let master_hook = panic::take_hook();
// Crash the program if any of our threads panic // Crash the program if any of our threads panic
panic::set_hook(Box::new(move |info| { panic::set_hook(Box::new(move |info| {
// Only panic on our own threads, leave application programmer's threads alone // Only panic on our own threads, leave application programmer's threads alone
if current().name().unwrap_or_default().starts_with("RMP_PAR_THREAD_") { if current()
.name()
.unwrap_or_default()
.starts_with("RMP_PAR_THREAD_#")
{
master_hook(info); master_hook(info);
process::exit(1); process::exit(1);
} else { } else {
@ -49,7 +70,7 @@ impl ThreadPoolManager {
for tid in 0..num_threads { for tid in 0..num_threads {
let task_barrier = task_barrier.clone(); let task_barrier = task_barrier.clone();
let builder = Builder::new() // Thread builder configuration let builder = Builder::new() // Thread builder configuration
.name(format!("RMP_PAR_THREAD_{}", tid)) // Name: RMP_PAR_THREAD_tid .name(format!("RMP_PAR_THREAD_#{}", tid)) // Name: RMP_PAR_THREAD_tid
.stack_size(8 << 20); // Stack size: 8MB (Linux default) .stack_size(8 << 20); // Stack size: 8MB (Linux default)
let (sender, receiver) = channel::<Job>(); let (sender, receiver) = channel::<Job>();
task_comms.push(sender); task_comms.push(sender);
@ -68,14 +89,22 @@ impl ThreadPoolManager {
} }
} }
/// Gets the current ThreadPoolManager instance.
///
/// The instance needs to be locked before using, not unlocking the TPM after use
/// may result in deadlock.
pub fn get_instance_guard() -> Arc<Mutex<ThreadPoolManager>> { pub fn get_instance_guard() -> Arc<Mutex<ThreadPoolManager>> {
return INSTANCE.clone(); return INSTANCE.clone();
} }
/// Execute a set of tasks on the ThreadPoolManager.
///
/// The task vector must be the same size as the number of threads, otherwise a panic will
/// be thrown.
pub fn exec(&self, tasks: Vec<Job>) { pub fn exec(&self, tasks: Vec<Job>) {
// Used to wake up threads // Used to wake up threads
self.task_barrier.wait(); self.task_barrier.wait();
assert_eq!(SystemObject::get_instance().max_num_threads, tasks.len()); assert_eq!(self.num_threads, tasks.len());
for i in 0..tasks.len() { for i in 0..tasks.len() {
self.task_comms[i].send(tasks[i].clone()).unwrap(); self.task_comms[i].send(tasks[i].clone()).unwrap();
} }
@ -84,14 +113,14 @@ impl ThreadPoolManager {
} }
} }
/// Wrapper routine for threads in the ThreadPoolManager
fn routine_wrapper(tid: usize, task_barrier: Arc<Barrier>, receiver: Receiver<Job>) { fn routine_wrapper(tid: usize, task_barrier: Arc<Barrier>, receiver: Receiver<Job>) {
SystemObject::get_instance() SystemObject::get_instance()
.set_affinity(tid) .set_affinity(tid)
.unwrap_or_else(|e| eprintln!("Failed to bind process #{} to hwthread: {:?}", tid, e)); .unwrap_or_else(|e| eprintln!("Failed to bind process #{} to hwthread: {:?}", tid, e));
loop { loop {
task_barrier.wait(); task_barrier.wait();
let func = receiver.recv().unwrap(); receiver.recv().unwrap()();
func();
task_barrier.wait(); task_barrier.wait();
} }
} }