Changeset - 8622657b70f9
[Not reviewed]
0 3 1
MH - 3 years ago 2022-01-07 11:48:04
contact@maxhenger.nl
Add ctor/dtor tests to MPSC queue

Now also testing that resources are properly moved and/or bitwise
copied inside of the MPSC queue. To make this possible the Resource
testing struct was moved outside of the store::component::tests
module.
4 files changed with 142 insertions and 72 deletions:
0 comments (0 inline, 0 general)
src/runtime2/store/component.rs
Show inline comments
 
@@ -336,14 +336,11 @@ impl<T: Sized> Drop for ComponentStore<T> {
 
#[cfg(test)]
 
mod tests {
 
    use super::*;
 
    use crate::runtime2::store::tests::Resource;
 
    use super::super::tests::*;
 

	
 
    use rand::prelude::*;
 
    use rand_pcg::Pcg32;
 

	
 
    use std::sync::Arc;
 
    use std::sync::atomic::{AtomicU64, Ordering};
 

	
 
    fn seeds() -> Vec<[u8;16]> {
 
        return vec![
 
            [241, 47, 70, 87, 240, 246, 20, 173, 219, 143, 74, 23, 158, 58, 205, 172],
 
@@ -360,14 +357,13 @@ mod tests {
 
        const NUM_ELEMENTS: usize = 1024;
 

	
 
        let store = ComponentStore::new(32);
 
        let ctor_counter = Arc::new(AtomicU64::new(0));
 
        let dtor_counter = Arc::new(AtomicU64::new(0));
 
        let counters = Counters::new();
 

	
 
        let mut indices = Vec::with_capacity(NUM_ELEMENTS);
 
        for _round_index in 0..NUM_ROUNDS {
 
            // Creation round
 
            for value in 0..NUM_ELEMENTS {
 
                let new_resource = Resource::new(ctor_counter.clone(), dtor_counter.clone(), value as u64);
 
                let new_resource = Resource::new(&counters, value as u64);
 
                let new_index = store.create(new_resource);
 
                indices.push(new_index);
 
            }
 
@@ -386,10 +382,9 @@ mod tests {
 
            indices.clear();
 
        }
 

	
 
        let num_ctor_calls = ctor_counter.load(Ordering::Acquire);
 
        let num_dtor_calls = dtor_counter.load(Ordering::Acquire);
 
        assert_eq!(num_ctor_calls, num_dtor_calls);
 
        assert_eq!(num_ctor_calls, (NUM_ROUNDS * NUM_ELEMENTS) as u64);
 
        let num_expected = (NUM_ROUNDS * NUM_ELEMENTS) as u64;
 
        assert_ctor_eq!(counters, num_expected);
 
        assert_dtor_eq!(counters, num_expected);
 
    }
 

	
 
    #[test]
 
@@ -402,8 +397,7 @@ mod tests {
 
        assert!(MAX_SIZE % NUM_THREADS == 0);
 

	
 
        let store = Arc::new(ComponentStore::new(16));
 
        let ctor_counter = Arc::new(AtomicU64::new(0));
 
        let dtor_counter = Arc::new(AtomicU64::new(0));
 
        let counters = Counters::new();
 

	
 
        let mut threads = Vec::with_capacity(NUM_THREADS);
 
        for thread_index in 0..NUM_THREADS {
 
@@ -411,15 +405,14 @@ mod tests {
 
            let store = store.clone();
 
            let first_index = thread_index * NUM_PER_THREAD;
 
            let last_index = (thread_index + 1) * NUM_PER_THREAD;
 
            let ctor_counter = ctor_counter.clone();
 
            let dtor_counter = dtor_counter.clone();
 
            let counters = counters.clone();
 

	
 
            let handle = std::thread::spawn(move || {
 
                let mut indices = Vec::with_capacity(last_index - first_index);
 
                for _round_index in 0..NUM_ROUNDS {
 
                    // Creation round
 
                    for value in first_index..last_index {
 
                        let el_index = store.create(Resource::new(ctor_counter.clone(), dtor_counter.clone(), value as u64));
 
                        let el_index = store.create(Resource::new(&counters, value as u64));
 
                        indices.push(el_index);
 
                    }
 

	
 
@@ -444,10 +437,9 @@ mod tests {
 
            thread.join().expect("clean exit");
 
        }
 

	
 
        let num_ctor_calls = ctor_counter.load(Ordering::Acquire);
 
        let num_dtor_calls = dtor_counter.load(Ordering::Acquire);
 
        assert_eq!(num_ctor_calls, num_dtor_calls);
 
        assert_eq!(num_ctor_calls, (NUM_ROUNDS * MAX_SIZE) as u64);
 
        let num_expected = (NUM_ROUNDS * MAX_SIZE) as u64;
 
        assert_ctor_eq!(counters, num_expected);
 
        assert_dtor_eq!(counters, num_expected);
 
    }
 

	
 
    #[test]
 
@@ -466,15 +458,13 @@ mod tests {
 
        for seed_index in 0..seeds.len() {
 
            // Setup store, counters and threads
 
            let store = Arc::new(ComponentStore::new(16));
 
            let ctor_counter = Arc::new(AtomicU64::new(0));
 
            let dtor_counter = Arc::new(AtomicU64::new(0));
 
            let counters = Counters::new();
 

	
 
            let mut threads = Vec::with_capacity(NUM_THREADS);
 
            for thread_index in 0..NUM_THREADS {
 
                // Setup local clones to move into the thread
 
                let store = store.clone();
 
                let ctor_counter = ctor_counter.clone();
 
                let dtor_counter = dtor_counter.clone();
 
                let counters = counters.clone();
 

	
 
                // Setup local rng
 
                let mut seed = seeds[seed_index];
 
@@ -499,9 +489,7 @@ mod tests {
 

	
 
                            if is_empty || (!is_full && should_create) {
 
                                // Must create
 
                                let el_index = store.create(Resource::new(
 
                                    ctor_counter.clone(), dtor_counter.clone(), new_value
 
                                ));
 
                                let el_index = store.create(Resource::new(&counters, new_value));
 
                                stored.push((el_index, new_value));
 
                            } else {
 
                                // Must destroy
 
@@ -531,14 +519,14 @@ mod tests {
 
            }
 

	
 
            // Before store is dropped
 
            let num_ctor_calls = ctor_counter.load(Ordering::Acquire);
 
            let num_dtor_calls = dtor_counter.load(Ordering::Acquire);
 
            assert_eq!(num_ctor_calls - total_left_allocated, num_dtor_calls);
 
            // note: cannot determine number of creations, creation/destructor
 
            // is random
 
            let num_ctor = counters.ctor.load(Ordering::Acquire);
 
            assert_dtor_eq!(counters, num_ctor - total_left_allocated);
 

	
 
            // After store is dropped
 
            drop(store);
 
            let num_dtor_calls = dtor_counter.load(Ordering::Acquire);
 
            assert_eq!(num_ctor_calls, num_dtor_calls);
 
            assert_dtor_eq!(counters, num_ctor);
 
        }
 
    }
 
}
 
\ No newline at end of file
src/runtime2/store/mod.rs
Show inline comments
 
#[macro_use]
 
#[cfg(test)]
 
mod tests;
 

	
 
pub mod component;
 
pub mod unfair_se_lock;
 
pub mod queue_mpsc;
 

	
 
pub(crate) use component::ComponentStore;
 

	
 
#[cfg(test)]
 
mod tests {
 
    use std::sync::Arc;
 
    use std::sync::atomic::{AtomicU64, Ordering};
 

	
 
    // Utility resource structure that counts the number of constructors and
 
    // destructor calls.
 
    pub struct Resource {
 
        dtor: Arc<AtomicU64>,
 
        val: u64,
 
    }
 

	
 
    impl Resource {
 
        fn new(ctor: Arc<AtomicU64>, dtor: Arc<AtomicU64>, val: u64) -> Self {
 
            ctor.fetch_add(1, Ordering::SeqCst);
 
            return Self{ dtor, val };
 
        }
 
    }
 

	
 
    impl Drop for Resource {
 
        fn drop(&mut self) {
 
            self.dtor.fetch_add(1, Ordering::SeqCst);
 
        }
 
    }
 
}
 
\ No newline at end of file
 
pub(crate) use component::ComponentStore;
 
\ No newline at end of file
src/runtime2/store/queue_mpsc.rs
Show inline comments
 
@@ -268,6 +268,7 @@ fn assert_correct_capacity(capacity: usize) {
 
#[cfg(test)]
 
mod tests {
 
    use super::*;
 
    use super::super::tests::*;
 

	
 
    fn queue_size<T>(queue: &QueueDynMpsc<T>) -> usize {
 
        let lock = queue.inner.data.lock_exclusive();
 
@@ -277,24 +278,31 @@ mod tests {
 
    #[test]
 
    fn single_threaded_fixed_size_push_pop() {
 
        const INIT_SIZE: usize = 16;
 
        const NUM_ROUNDS: usize = 3;
 
        let mut cons = QueueDynMpsc::new(INIT_SIZE);
 
        let prod = cons.producer();
 

	
 
        for _round in 0..3 {
 
        let counters = Counters::new();
 

	
 
        for _round in 0..NUM_ROUNDS {
 
            // Fill up with indices
 
            for idx in 0..INIT_SIZE {
 
                prod.push(idx);
 
                prod.push(Resource::new(&counters, idx as u64));
 
            }
 

	
 
            // Take out indices and check
 
            for idx in 0..INIT_SIZE {
 
                let gotten = cons.pop().unwrap();
 
                assert_eq!(idx, gotten);
 
                assert_eq!(idx as u64, gotten.val);
 
            }
 

	
 
            assert!(cons.pop().is_none()); // nothing left in queue
 
            assert_eq!(queue_size(&cons), INIT_SIZE); // queue still of same size
 
        }
 

	
 
        let num_expected = (INIT_SIZE * NUM_ROUNDS) as u64;
 
        assert_ctor_eq!(counters, num_expected);
 
        assert_dtor_eq!(counters, num_expected);
 
    }
 

	
 
    #[test]
 
@@ -305,17 +313,19 @@ mod tests {
 
        let mut cons = QueueDynMpsc::new(INIT_SIZE);
 
        let prod = cons.producer();
 

	
 
        let counters = Counters::new();
 

	
 
        for resize_idx in 0..NUM_RESIZE {
 
            // Fill up with indices, one more than the size
 
            let cur_size = INIT_SIZE << resize_idx;
 
            let new_size = cur_size << 1;
 
            for idx in 0..new_size {
 
                prod.push(idx);
 
                prod.push(Resource::new(&counters, idx as u64));
 
            }
 

	
 
            for idx in 0..new_size {
 
                let gotten = cons.pop().unwrap();
 
                assert_eq!(idx, gotten);
 
                assert_eq!(idx as u64, gotten.val);
 
            }
 

	
 
            assert!(cons.pop().is_none());
 
@@ -323,11 +333,17 @@ mod tests {
 
        }
 

	
 
        assert_eq!(queue_size(&cons), INIT_SIZE << NUM_RESIZE);
 

	
 
        // Bit trickery supremo (fails if INIT_SIZE is not a power of two)!
 
        let num_expected = ((INIT_SIZE << (NUM_RESIZE + 1)) - 1 - ((INIT_SIZE << 1) - 1)) as u64;
 
        assert_ctor_eq!(counters, num_expected);
 
        assert_dtor_eq!(counters, num_expected);
 
    }
 

	
 
    #[test]
 
    fn single_threaded_alternating_push_pop() {
 
        const INIT_SIZE: usize = 32;
 
        const NUM_ROUNDS: usize = 4;
 
        const NUM_PROD: usize = 4;
 
        assert!(INIT_SIZE % NUM_PROD == 0);
 

	
 
@@ -337,25 +353,56 @@ mod tests {
 
            prods.push(cons.producer());
 
        }
 

	
 
        for _round_idx in 0..4 {
 
        let counters = Counters::new();
 

	
 
        for _round_idx in 0..NUM_ROUNDS {
 
            // Fill up, alternating per producer
 
            let mut prod_idx = 0;
 
            for idx in 0..INIT_SIZE {
 
                let prod = &prods[prod_idx];
 
                prod_idx += 1;
 
                prod_idx %= NUM_PROD;
 
                prod.push(idx);
 
                prod.push(Resource::new(&counters, idx as u64));
 
            }
 

	
 
            // Retrieve and check again
 
            for idx in 0..INIT_SIZE {
 
                let gotten = cons.pop().unwrap();
 
                assert_eq!(idx, gotten);
 
                assert_eq!(idx as u64, gotten.val);
 
            }
 

	
 
            assert!(cons.pop().is_none());
 
            assert_eq!(queue_size(&cons), INIT_SIZE);
 
        }
 

	
 
        let num_expected = (NUM_ROUNDS * INIT_SIZE) as u64;
 
        assert_ctor_eq!(counters, num_expected);
 
        assert_dtor_eq!(counters, num_expected);
 
    }
 

	
 
    #[test]
 
    fn partially_filled_cleanup() {
 
        // Init at 16, fill until 8, take out 4, 4 destructors not called before
 
        // queue consumer side is dropped
 
        let mut cons = QueueDynMpsc::new(16);
 
        let mut prod = cons.producer();
 

	
 
        let counters = Counters::new();
 

	
 
        for _ in 0..8 {
 
            prod.push(Resource::new(&counters, 0));
 
        }
 

	
 
        for _ in 0..4 {
 
            cons.pop().expect("a value");
 
        }
 

	
 
        assert_ctor_eq!(counters, 8);
 
        assert_dtor_eq!(counters, 4);
 
        drop(prod);
 
        drop(cons);
 
        assert_ctor_eq!(counters, 8);
 
        assert_dtor_eq!(counters, 8);
 
    }
 

	
 
    #[test]
 
@@ -375,12 +422,14 @@ mod tests {
 

	
 
        // Span queue and producers
 
        for _stress_idx in 0..NUM_STRESS_TESTS {
 
            let mut queue = QueueDynMpsc::new(4);
 
            let mut queue = QueueDynMpsc::<Resource>::new(4);
 
            let mut producers = Vec::with_capacity(NUM_PROD_THREADS);
 
            for _idx in 0..NUM_PROD_THREADS {
 
                producers.push(queue.producer());
 
            }
 

	
 
            let counters = Counters::new();
 

	
 
            // Start up consume thread and let it spin immediately. Note that it
 
            // must die last.
 
            let can_exit_lock = Arc::new(Mutex::new(false));
 
@@ -389,18 +438,18 @@ mod tests {
 
            let consume_handle = {
 
                let can_exit_lock = can_exit_lock.clone();
 
                std::thread::spawn(move || {
 
                    let mut counters = [0u64; NUM_PROD_THREADS];
 
                    let mut thread_val_counters = [0u64; NUM_PROD_THREADS];
 
                    let mut num_done = 0;
 
                    while num_done != NUM_PROD_THREADS {
 
                        // Spin until we get something
 
                        let new_value = loop {
 
                            if let Some(value) = queue.pop() {
 
                                break value;
 
                                break value.val;
 
                            }
 
                        };
 

	
 
                        let thread_idx = take_num_thread_idx(new_value);
 
                        let counter = &mut counters[thread_idx as usize];
 
                        let counter = &mut thread_val_counters[thread_idx as usize];
 
                        assert_eq!(*counter, take_num(new_value)); // values per thread arrive in order
 

	
 
                        *counter += 1;
 
@@ -418,11 +467,12 @@ mod tests {
 
            let mut handles = Vec::with_capacity(NUM_PROD_THREADS);
 
            for prod_idx in 0..NUM_PROD_THREADS {
 
                let prod_handle = producers.pop().unwrap();
 
                let counters = counters.clone();
 

	
 
                let handle = std::thread::spawn(move || {
 
                    let base_value = (prod_idx as u64) << 32;
 
                    for number in 0..NUM_PER_THREAD as u64 {
 
                        prod_handle.push(base_value + number);
 
                        prod_handle.push(Resource::new(&counters, base_value + number));
 
                    }
 
                });
 

	
 
@@ -437,6 +487,10 @@ mod tests {
 

	
 
            drop(held_exit_lock);
 
            consume_handle.join().expect("clean consumer exit");
 

	
 
            let num_expected = (NUM_PER_THREAD * NUM_PROD_THREADS) as u64;
 
            assert_ctor_eq!(counters, num_expected);
 
            assert_dtor_eq!(counters, num_expected);
 
        }
 
    }
 
}
 
\ No newline at end of file
src/runtime2/store/tests.rs
Show inline comments
 
new file 100644
 
pub use std::sync::Arc;
 
pub use std::sync::atomic::{AtomicU64, Ordering};
 

	
 
// Little wrapper for the two atomic ctor/dtor counters
 
#[derive(Clone)]
 
pub struct Counters {
 
    pub ctor: Arc<AtomicU64>,
 
    pub dtor: Arc<AtomicU64>,
 
}
 

	
 
impl Counters {
 
    pub fn new() -> Self {
 
        return Self{
 
            ctor: Arc::new(AtomicU64::new(0)),
 
            dtor: Arc::new(AtomicU64::new(0)),
 
        };
 
    }
 
}
 

	
 
macro_rules! assert_ctor_eq {
 
        ($counters:expr, $count:expr) => {
 
            assert_eq!($counters.ctor.load(Ordering::Acquire), $count);
 
        }
 
    }
 

	
 
macro_rules! assert_dtor_eq {
 
        ($counters:expr, $count:expr) => {
 
            assert_eq!($counters.dtor.load(Ordering::Acquire), $count);
 
        }
 
    }
 

	
 
// Utility resource structure that counts the number of constructors and
 
// destructor calls.
 
pub struct Resource {
 
    dtor: Arc<AtomicU64>,
 
    pub val: u64,
 
}
 

	
 
impl Resource {
 
    pub fn new(counters: &Counters, val: u64) -> Self {
 
        counters.ctor.fetch_add(1, Ordering::SeqCst);
 
        return Self{ dtor: counters.dtor.clone(), val };
 
    }
 
}
 

	
 
impl Drop for Resource {
 
    fn drop(&mut self) {
 
        self.dtor.fetch_add(1, Ordering::SeqCst);
 
    }
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)