diff --git a/Cargo.toml b/Cargo.toml index 3d34921..1178633 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,9 +4,9 @@ version = "0.1.0" authors = ["Yiyao Yu "] edition = "2018" -[lib] - # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -rand = "*" \ No newline at end of file +rand = "*" +hwloc2 = "*" +lazy_static = "*" diff --git a/src/bin/test_simple.rs b/src/bin/test_simple.rs index b440f11..655d8c8 100644 --- a/src/bin/test_simple.rs +++ b/src/bin/test_simple.rs @@ -1,5 +1,6 @@ use rand::Rng; use rustmp::par_for; +use rustmp::SystemObject; use std::time; #[derive(Debug)] @@ -11,9 +12,9 @@ struct Student { impl Student { pub fn new(age: u8) -> Student { - Student { name: "Default".to_string(), - age: age, - gpa: age as f32 } + Student { name: "Default".to_string(), + age: age, + gpa: age as f32 } } } @@ -22,15 +23,19 @@ fn main() { par_for! { for i in 1..10, capturing numbers { - std::thread::sleep( - time::Duration::from_secs( - rand::thread_rng().gen_range(1..10))); - let mut lock = numbers.write(); - lock.push(Student::new(i)); - println!("Thread {} running!", i); + // TODO: move this to parallel macro once tid design is finalized + SystemObject::get_instance().set_affinity(i as usize - 1) + .expect("Failed to bind thread to proc!"); + + std::thread::sleep( + time::Duration::from_secs( + rand::thread_rng().gen_range(1..10))); + let mut lock = numbers.write(); + lock.push(Student::new(i)); + println!("Thread {} running!", i); } }; for num in numbers { - println!("{:?}", num); + println!("{:?}", num); } } diff --git a/src/lib.rs b/src/lib.rs index 43e2a5e..e991011 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,9 @@ +mod sysinfo; + use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}; +pub use sysinfo::SystemObject; + pub struct Capture { value: Arc>, } diff --git a/src/sysinfo.rs b/src/sysinfo.rs new file mode 100644 index 0000000..fcd8bdd --- /dev/null +++ b/src/sysinfo.rs @@ -0,0 +1,123 @@ +use hwloc2::{CpuBindError, CpuBindFlags, ObjectType, Topology, TopologyObject}; +use lazy_static::lazy_static; +use std::cmp::max; +use std::sync::Arc; + +lazy_static! { + static ref INSTANCE: Arc = Arc::new(SystemObject::new()); +} + +/// Represents a system object. +/// +/// The system object contains both hardware topology information as well as environment variable +/// information used by RustMP. Interactions with the OS such as setting process affinity should be +/// done through the system object. +/// +/// Only a single instance of SystemObject may exist at a time. Use get_instance() to get a +/// thread-safe reference to the current SystemObject instance. +pub struct SystemObject { + /// ThreadID to hwthread (PU) mapping for process binding + cpu_bind_map: Vec, + /// Total number of hwthreads (PUs) on the machine + pub available_hwthreads: usize, + /// Maximum number of threads to spawn for the RustMP thread pool + pub max_num_threads: usize, +} + +impl SystemObject { + /// Instantiates a new SystemObject. + /// + /// Extra environment variables and hardware information used should be added here. + /// + /// SystemObject::new() should only be called by INSTANCE. + fn new() -> SystemObject { + // Assume that the machine uses a symmetric topology + let topo = Topology::new().unwrap(); + let package_set = topo.objects_with_type(&ObjectType::Package).unwrap(); + let core_set = children_with_type(package_set[0], &ObjectType::Core).unwrap(); + + // PUs per Core + let pupco = children_with_type(core_set[0], &ObjectType::PU) + .unwrap() + .len(); + // Packages per Machine + let papma = package_set.len(); + // Cores per Package + let coppa = core_set.len(); + // PUs per Package + let puppa = coppa * pupco; + // PUs per Machine + let available_hwthreads= papma * puppa; + + // The cpu_bind_map can be instantiated using environment variables if desired. + // Possible interesting improvement would be building a cpu_bind_map using a syntax like + // OpenMP's OMP_PLACES. But that is outside of this project's scope due to difficulty of + // writing a parser. + let cpu_bind_map= (0..available_hwthreads).map(|x| { + let package_id = x / puppa; + let package_offset = x % puppa; + let core_id = package_offset % coppa; + let core_offset = package_offset / coppa; + return puppa * package_id + core_id * pupco + core_offset; + }).collect::>(); + + let max_num_threads = max( + option_env!("RMP_NUM_THREADS") + .unwrap_or("") + .parse::() + .unwrap_or(available_hwthreads), + 1, + ); + SystemObject { + cpu_bind_map, + available_hwthreads, + max_num_threads, + } + } + + /// Gets a thread-safe reference to the current SystemObject instance. + pub fn get_instance() -> Arc { + INSTANCE.clone() + } + + /// Binds the current thread to the corresponding hwthread specified in cpu_bind_map. + /// + /// Default binding rules are by core, then by hwthread on the same core, then by socket. + /// + /// Returns an error if the process failed to bind + pub fn set_affinity(&self, tid: usize) -> Result<(), CpuBindError> { + let mut topo = Topology::new().unwrap(); + let pu_vec = topo.objects_with_type(&ObjectType::PU).unwrap(); + let cpuset = pu_vec[self.cpu_bind_map[tid % self.available_hwthreads]] + .cpuset() + .unwrap(); + topo.set_cpubind(cpuset, CpuBindFlags::CPUBIND_THREAD) + } +} + +/// Recursively finds child topology objects of a defined type. +/// +/// Returns None if no children of the type is found. +fn children_with_type<'a>( + topo_obj: &'a TopologyObject, + object_type: &ObjectType, +) -> Option> { + let mut objects: Vec<&TopologyObject> = Vec::new(); + + for child in topo_obj.children() { + if child.object_type() == *object_type { + objects.push(&child); + } else { + match children_with_type(child, object_type) { + Some(mut child_vec) => objects.append(&mut child_vec), + None => (), + } + } + } + + if objects.len() > 0 { + Some(objects) + } else { + None + } +}