diff --git a/src/runtime2/store/component.rs b/src/runtime2/store/component.rs index 58de8ce520ba17e2f8b6a1293226bf4a55e0d2e1..52f8ace8bd4d3b0161ee35d3ce417d5e111d811c 100644 --- a/src/runtime2/store/component.rs +++ b/src/runtime2/store/component.rs @@ -336,14 +336,11 @@ impl Drop for ComponentStore { #[cfg(test)] mod tests { use super::*; - use crate::runtime2::store::tests::Resource; + use super::super::tests::*; use rand::prelude::*; use rand_pcg::Pcg32; - use std::sync::Arc; - use std::sync::atomic::{AtomicU64, Ordering}; - fn seeds() -> Vec<[u8;16]> { return vec![ [241, 47, 70, 87, 240, 246, 20, 173, 219, 143, 74, 23, 158, 58, 205, 172], @@ -360,14 +357,13 @@ mod tests { const NUM_ELEMENTS: usize = 1024; let store = ComponentStore::new(32); - let ctor_counter = Arc::new(AtomicU64::new(0)); - let dtor_counter = Arc::new(AtomicU64::new(0)); + let counters = Counters::new(); let mut indices = Vec::with_capacity(NUM_ELEMENTS); for _round_index in 0..NUM_ROUNDS { // Creation round for value in 0..NUM_ELEMENTS { - let new_resource = Resource::new(ctor_counter.clone(), dtor_counter.clone(), value as u64); + let new_resource = Resource::new(&counters, value as u64); let new_index = store.create(new_resource); indices.push(new_index); } @@ -386,10 +382,9 @@ mod tests { indices.clear(); } - let num_ctor_calls = ctor_counter.load(Ordering::Acquire); - let num_dtor_calls = dtor_counter.load(Ordering::Acquire); - assert_eq!(num_ctor_calls, num_dtor_calls); - assert_eq!(num_ctor_calls, (NUM_ROUNDS * NUM_ELEMENTS) as u64); + let num_expected = (NUM_ROUNDS * NUM_ELEMENTS) as u64; + assert_ctor_eq!(counters, num_expected); + assert_dtor_eq!(counters, num_expected); } #[test] @@ -402,8 +397,7 @@ mod tests { assert!(MAX_SIZE % NUM_THREADS == 0); let store = Arc::new(ComponentStore::new(16)); - let ctor_counter = Arc::new(AtomicU64::new(0)); - let dtor_counter = Arc::new(AtomicU64::new(0)); + let counters = Counters::new(); let mut threads = Vec::with_capacity(NUM_THREADS); for thread_index in 0..NUM_THREADS { @@ -411,15 +405,14 @@ mod tests { let store = store.clone(); let first_index = thread_index * NUM_PER_THREAD; let last_index = (thread_index + 1) * NUM_PER_THREAD; - let ctor_counter = ctor_counter.clone(); - let dtor_counter = dtor_counter.clone(); + let counters = counters.clone(); let handle = std::thread::spawn(move || { let mut indices = Vec::with_capacity(last_index - first_index); for _round_index in 0..NUM_ROUNDS { // Creation round for value in first_index..last_index { - let el_index = store.create(Resource::new(ctor_counter.clone(), dtor_counter.clone(), value as u64)); + let el_index = store.create(Resource::new(&counters, value as u64)); indices.push(el_index); } @@ -444,10 +437,9 @@ mod tests { thread.join().expect("clean exit"); } - let num_ctor_calls = ctor_counter.load(Ordering::Acquire); - let num_dtor_calls = dtor_counter.load(Ordering::Acquire); - assert_eq!(num_ctor_calls, num_dtor_calls); - assert_eq!(num_ctor_calls, (NUM_ROUNDS * MAX_SIZE) as u64); + let num_expected = (NUM_ROUNDS * MAX_SIZE) as u64; + assert_ctor_eq!(counters, num_expected); + assert_dtor_eq!(counters, num_expected); } #[test] @@ -466,15 +458,13 @@ mod tests { for seed_index in 0..seeds.len() { // Setup store, counters and threads let store = Arc::new(ComponentStore::new(16)); - let ctor_counter = Arc::new(AtomicU64::new(0)); - let dtor_counter = Arc::new(AtomicU64::new(0)); + let counters = Counters::new(); let mut threads = Vec::with_capacity(NUM_THREADS); for thread_index in 0..NUM_THREADS { // Setup local clones to move into the thread let store = store.clone(); - let ctor_counter = ctor_counter.clone(); - let dtor_counter = dtor_counter.clone(); + let counters = counters.clone(); // Setup local rng let mut seed = seeds[seed_index]; @@ -499,9 +489,7 @@ mod tests { if is_empty || (!is_full && should_create) { // Must create - let el_index = store.create(Resource::new( - ctor_counter.clone(), dtor_counter.clone(), new_value - )); + let el_index = store.create(Resource::new(&counters, new_value)); stored.push((el_index, new_value)); } else { // Must destroy @@ -531,14 +519,14 @@ mod tests { } // Before store is dropped - let num_ctor_calls = ctor_counter.load(Ordering::Acquire); - let num_dtor_calls = dtor_counter.load(Ordering::Acquire); - assert_eq!(num_ctor_calls - total_left_allocated, num_dtor_calls); + // note: cannot determine number of creations, creation/destructor + // is random + let num_ctor = counters.ctor.load(Ordering::Acquire); + assert_dtor_eq!(counters, num_ctor - total_left_allocated); // After store is dropped drop(store); - let num_dtor_calls = dtor_counter.load(Ordering::Acquire); - assert_eq!(num_ctor_calls, num_dtor_calls); + assert_dtor_eq!(counters, num_ctor); } } } \ No newline at end of file diff --git a/src/runtime2/store/mod.rs b/src/runtime2/store/mod.rs index 2280851592f0825f9e3a253f27090f01eb44c38d..e5bc5f7fe36343e731f085ddf0304e04170c5b25 100644 --- a/src/runtime2/store/mod.rs +++ b/src/runtime2/store/mod.rs @@ -1,31 +1,9 @@ +#[macro_use] +#[cfg(test)] +mod tests; + pub mod component; pub mod unfair_se_lock; pub mod queue_mpsc; -pub(crate) use component::ComponentStore; - -#[cfg(test)] -mod tests { - use std::sync::Arc; - use std::sync::atomic::{AtomicU64, Ordering}; - - // Utility resource structure that counts the number of constructors and - // destructor calls. - pub struct Resource { - dtor: Arc, - val: u64, - } - - impl Resource { - fn new(ctor: Arc, dtor: Arc, val: u64) -> Self { - ctor.fetch_add(1, Ordering::SeqCst); - return Self{ dtor, val }; - } - } - - impl Drop for Resource { - fn drop(&mut self) { - self.dtor.fetch_add(1, Ordering::SeqCst); - } - } -} \ No newline at end of file +pub(crate) use component::ComponentStore; \ No newline at end of file diff --git a/src/runtime2/store/queue_mpsc.rs b/src/runtime2/store/queue_mpsc.rs index e2aeaee7da9c36bc54896a7324d38f5f130f95ca..e6a1c67340302f6c23d5d9e22b30d89499eeccb9 100644 --- a/src/runtime2/store/queue_mpsc.rs +++ b/src/runtime2/store/queue_mpsc.rs @@ -268,6 +268,7 @@ fn assert_correct_capacity(capacity: usize) { #[cfg(test)] mod tests { use super::*; + use super::super::tests::*; fn queue_size(queue: &QueueDynMpsc) -> usize { let lock = queue.inner.data.lock_exclusive(); @@ -277,24 +278,31 @@ mod tests { #[test] fn single_threaded_fixed_size_push_pop() { const INIT_SIZE: usize = 16; + const NUM_ROUNDS: usize = 3; let mut cons = QueueDynMpsc::new(INIT_SIZE); let prod = cons.producer(); - for _round in 0..3 { + let counters = Counters::new(); + + for _round in 0..NUM_ROUNDS { // Fill up with indices for idx in 0..INIT_SIZE { - prod.push(idx); + prod.push(Resource::new(&counters, idx as u64)); } // Take out indices and check for idx in 0..INIT_SIZE { let gotten = cons.pop().unwrap(); - assert_eq!(idx, gotten); + assert_eq!(idx as u64, gotten.val); } assert!(cons.pop().is_none()); // nothing left in queue assert_eq!(queue_size(&cons), INIT_SIZE); // queue still of same size } + + let num_expected = (INIT_SIZE * NUM_ROUNDS) as u64; + assert_ctor_eq!(counters, num_expected); + assert_dtor_eq!(counters, num_expected); } #[test] @@ -305,17 +313,19 @@ mod tests { let mut cons = QueueDynMpsc::new(INIT_SIZE); let prod = cons.producer(); + let counters = Counters::new(); + for resize_idx in 0..NUM_RESIZE { // Fill up with indices, one more than the size let cur_size = INIT_SIZE << resize_idx; let new_size = cur_size << 1; for idx in 0..new_size { - prod.push(idx); + prod.push(Resource::new(&counters, idx as u64)); } for idx in 0..new_size { let gotten = cons.pop().unwrap(); - assert_eq!(idx, gotten); + assert_eq!(idx as u64, gotten.val); } assert!(cons.pop().is_none()); @@ -323,11 +333,17 @@ mod tests { } assert_eq!(queue_size(&cons), INIT_SIZE << NUM_RESIZE); + + // Bit trickery supremo (fails if INIT_SIZE is not a power of two)! + let num_expected = ((INIT_SIZE << (NUM_RESIZE + 1)) - 1 - ((INIT_SIZE << 1) - 1)) as u64; + assert_ctor_eq!(counters, num_expected); + assert_dtor_eq!(counters, num_expected); } #[test] fn single_threaded_alternating_push_pop() { const INIT_SIZE: usize = 32; + const NUM_ROUNDS: usize = 4; const NUM_PROD: usize = 4; assert!(INIT_SIZE % NUM_PROD == 0); @@ -337,25 +353,56 @@ mod tests { prods.push(cons.producer()); } - for _round_idx in 0..4 { + let counters = Counters::new(); + + for _round_idx in 0..NUM_ROUNDS { // Fill up, alternating per producer let mut prod_idx = 0; for idx in 0..INIT_SIZE { let prod = &prods[prod_idx]; prod_idx += 1; prod_idx %= NUM_PROD; - prod.push(idx); + prod.push(Resource::new(&counters, idx as u64)); } // Retrieve and check again for idx in 0..INIT_SIZE { let gotten = cons.pop().unwrap(); - assert_eq!(idx, gotten); + assert_eq!(idx as u64, gotten.val); } assert!(cons.pop().is_none()); assert_eq!(queue_size(&cons), INIT_SIZE); } + + let num_expected = (NUM_ROUNDS * INIT_SIZE) as u64; + assert_ctor_eq!(counters, num_expected); + assert_dtor_eq!(counters, num_expected); + } + + #[test] + fn partially_filled_cleanup() { + // Init at 16, fill until 8, take out 4, 4 destructors not called before + // queue consumer side is dropped + let mut cons = QueueDynMpsc::new(16); + let mut prod = cons.producer(); + + let counters = Counters::new(); + + for _ in 0..8 { + prod.push(Resource::new(&counters, 0)); + } + + for _ in 0..4 { + cons.pop().expect("a value"); + } + + assert_ctor_eq!(counters, 8); + assert_dtor_eq!(counters, 4); + drop(prod); + drop(cons); + assert_ctor_eq!(counters, 8); + assert_dtor_eq!(counters, 8); } #[test] @@ -375,12 +422,14 @@ mod tests { // Span queue and producers for _stress_idx in 0..NUM_STRESS_TESTS { - let mut queue = QueueDynMpsc::new(4); + let mut queue = QueueDynMpsc::::new(4); let mut producers = Vec::with_capacity(NUM_PROD_THREADS); for _idx in 0..NUM_PROD_THREADS { producers.push(queue.producer()); } + let counters = Counters::new(); + // Start up consume thread and let it spin immediately. Note that it // must die last. let can_exit_lock = Arc::new(Mutex::new(false)); @@ -389,18 +438,18 @@ mod tests { let consume_handle = { let can_exit_lock = can_exit_lock.clone(); std::thread::spawn(move || { - let mut counters = [0u64; NUM_PROD_THREADS]; + let mut thread_val_counters = [0u64; NUM_PROD_THREADS]; let mut num_done = 0; while num_done != NUM_PROD_THREADS { // Spin until we get something let new_value = loop { if let Some(value) = queue.pop() { - break value; + break value.val; } }; let thread_idx = take_num_thread_idx(new_value); - let counter = &mut counters[thread_idx as usize]; + let counter = &mut thread_val_counters[thread_idx as usize]; assert_eq!(*counter, take_num(new_value)); // values per thread arrive in order *counter += 1; @@ -418,11 +467,12 @@ mod tests { let mut handles = Vec::with_capacity(NUM_PROD_THREADS); for prod_idx in 0..NUM_PROD_THREADS { let prod_handle = producers.pop().unwrap(); + let counters = counters.clone(); let handle = std::thread::spawn(move || { let base_value = (prod_idx as u64) << 32; for number in 0..NUM_PER_THREAD as u64 { - prod_handle.push(base_value + number); + prod_handle.push(Resource::new(&counters, base_value + number)); } }); @@ -437,6 +487,10 @@ mod tests { drop(held_exit_lock); consume_handle.join().expect("clean consumer exit"); + + let num_expected = (NUM_PER_THREAD * NUM_PROD_THREADS) as u64; + assert_ctor_eq!(counters, num_expected); + assert_dtor_eq!(counters, num_expected); } } } \ No newline at end of file diff --git a/src/runtime2/store/tests.rs b/src/runtime2/store/tests.rs new file mode 100644 index 0000000000000000000000000000000000000000..7b720a6de059a165a379cb29676b5250216828d3 --- /dev/null +++ b/src/runtime2/store/tests.rs @@ -0,0 +1,50 @@ +pub use std::sync::Arc; +pub use std::sync::atomic::{AtomicU64, Ordering}; + +// Little wrapper for the two atomic ctor/dtor counters +#[derive(Clone)] +pub struct Counters { + pub ctor: Arc, + pub dtor: Arc, +} + +impl Counters { + pub fn new() -> Self { + return Self{ + ctor: Arc::new(AtomicU64::new(0)), + dtor: Arc::new(AtomicU64::new(0)), + }; + } +} + +macro_rules! assert_ctor_eq { + ($counters:expr, $count:expr) => { + assert_eq!($counters.ctor.load(Ordering::Acquire), $count); + } + } + +macro_rules! assert_dtor_eq { + ($counters:expr, $count:expr) => { + assert_eq!($counters.dtor.load(Ordering::Acquire), $count); + } + } + +// Utility resource structure that counts the number of constructors and +// destructor calls. +pub struct Resource { + dtor: Arc, + pub val: u64, +} + +impl Resource { + pub fn new(counters: &Counters, val: u64) -> Self { + counters.ctor.fetch_add(1, Ordering::SeqCst); + return Self{ dtor: counters.dtor.clone(), val }; + } +} + +impl Drop for Resource { + fn drop(&mut self) { + self.dtor.fetch_add(1, Ordering::SeqCst); + } +} \ No newline at end of file