Changeset - ead29a08c0cf
[Not reviewed]
0 2 0
mh - 3 years ago 2022-01-07 11:15:57
contact@maxhenger.nl
WIP: Adding ctor/dtor tests to MPSC queue
2 files changed with 27 insertions and 18 deletions:
0 comments (0 inline, 0 general)
src/runtime2/store/component.rs
Show inline comments
 
@@ -243,217 +243,200 @@ impl<T: Sized> ComponentStore<T> {
 
                let old_write_index = self.write_head.load(Ordering::SeqCst);
 

	
 
                if old_read_index > old_write_index {
 
                    // Read index wraps, so keep it as-is and fill
 
                    let new_read_index = old_read_index + old_size;
 
                    for index in 0..old_size {
 
                        let target_idx = (new_read_index + index) & new_index_mask;
 
                        lock.freelist[target_idx] = (old_size + index) as u32;
 
                    }
 

	
 
                    self.read_head.store(new_read_index, Ordering::SeqCst);
 
                    debug_assert!(new_read_index < 2*new_size);
 
                    debug_assert!(old_write_index.wrapping_sub(new_read_index) & new_compare_mask <= new_size);
 
                } else {
 
                    // No wrapping, so increment write index
 
                    let new_write_index = old_write_index + old_size;
 
                    for index in 0..old_size {
 
                        let target_idx = (old_write_index + index) & new_index_mask;
 
                        lock.freelist[target_idx] = (old_size + index) as u32;
 
                    }
 

	
 
                    // Update write/limit heads
 
                    self.write_head.store(new_write_index, Ordering::SeqCst);
 
                    self.limit_head.store(new_write_index, Ordering::SeqCst);
 
                    debug_assert!(new_write_index < 2*new_size);
 
                    debug_assert!(new_write_index.wrapping_sub(old_read_index) & new_compare_mask <= new_size);
 
                }
 

	
 
                // Update sizes and masks
 
                lock.size = new_size;
 
                lock.compare_mask = new_compare_mask;
 
                lock.index_mask = new_index_mask;
 
            } // else: someone else allocated, so we don't have to
 
        }
 

	
 
        // We've dropped the write lock, acquire the read lock again
 
        return self.inner.lock_shared();
 
    }
 

	
 
    #[inline]
 
    fn assert_valid_size(size: usize) {
 
        // Condition the size needs to adhere to. Some are a bit excessive, but
 
        // we don't hit this check very often
 
        assert!(
 
            size.is_power_of_two() &&
 
                size >= 4 &&
 
                size <= usize::MAX / 2 &&
 
                size <= u32::MAX as usize
 
        );
 
    }
 
}
 

	
 
impl<T: Sized> Drop for ComponentStore<T> {
 
    fn drop(&mut self) {
 
        let value_layout = Layout::from_size_align(
 
            std::mem::size_of::<T>(), std::mem::align_of::<T>()
 
        ).unwrap();
 

	
 
        // Note that if the indices exist in the freelist then the destructor
 
        // has already been called. So handle them first
 
        let mut lock = self.inner.lock_exclusive();
 

	
 
        let read_index = self.read_head.load(Ordering::Acquire);
 
        let write_index = self.write_head.load(Ordering::Acquire);
 
        debug_assert_eq!(write_index, self.limit_head.load(Ordering::Acquire));
 

	
 
        let mut index = read_index;
 
        while index != write_index {
 
            let dealloc_index = lock.freelist[index & lock.index_mask] as usize;
 
            let target_ptr = lock.data[dealloc_index];
 

	
 
            unsafe {
 
                dealloc(target_ptr.cast(), value_layout);
 
                lock.data[dealloc_index] = ptr::null_mut();
 
            }
 

	
 
            index += 1;
 
            index &= lock.compare_mask;
 
        }
 

	
 
        // With all of those set to null, we'll just iterate through all
 
        // pointers and destruct+deallocate the ones not set to null yet
 
        for target_ptr in lock.data.iter().copied() {
 
            if !target_ptr.is_null() {
 
                unsafe {
 
                    ptr::drop_in_place(target_ptr);
 
                    dealloc(target_ptr.cast(), value_layout);
 
                }
 
            }
 
        }
 
    }
 
}
 

	
 
#[cfg(test)]
 
mod tests {
 
    use super::*;
 
    use crate::runtime2::store::tests::Resource;
 

	
 
    use rand::prelude::*;
 
    use rand_pcg::Pcg32;
 

	
 
    use std::sync::Arc;
 
    use std::sync::atomic::{AtomicU64, Ordering};
 

	
 
    pub struct Resource {
 
        dtor: Arc<AtomicU64>,
 
        val: u64,
 
    }
 

	
 
    impl Resource {
 
        fn new(ctor: Arc<AtomicU64>, dtor: Arc<AtomicU64>, val: u64) -> Self {
 
            ctor.fetch_add(1, Ordering::SeqCst);
 
            return Self{ dtor, val };
 
        }
 
    }
 

	
 
    impl Drop for Resource {
 
        fn drop(&mut self) {
 
            self.dtor.fetch_add(1, Ordering::SeqCst);
 
        }
 
    }
 

	
 
    fn seeds() -> Vec<[u8;16]> {
 
        return vec![
 
            [241, 47, 70, 87, 240, 246, 20, 173, 219, 143, 74, 23, 158, 58, 205, 172],
 
            [178, 112, 230, 205, 230, 178, 2, 90, 162, 218, 49, 196, 224, 222, 208, 43],
 
            [245, 42, 35, 167, 153, 205, 221, 144, 200, 253, 144, 117, 176, 231, 17, 70],
 
            [143, 39, 177, 216, 124, 96, 225, 39, 30, 82, 239, 193, 133, 58, 255, 193],
 
            [25, 105, 10, 52, 161, 212, 190, 112, 178, 193, 68, 249, 167, 153, 172, 144],
 
        ]
 
    }
 

	
 
    #[test]
 
    fn test_ctor_dtor_simple_unthreaded() {
 
        const NUM_ROUNDS: usize = 5;
 
        const NUM_ELEMENTS: usize = 1024;
 

	
 
        let store = ComponentStore::new(32);
 
        let ctor_counter = Arc::new(AtomicU64::new(0));
 
        let dtor_counter = Arc::new(AtomicU64::new(0));
 

	
 
        let mut indices = Vec::with_capacity(NUM_ELEMENTS);
 
        for _round_index in 0..NUM_ROUNDS {
 
            // Creation round
 
            for value in 0..NUM_ELEMENTS {
 
                let new_resource = Resource::new(ctor_counter.clone(), dtor_counter.clone(), value as u64);
 
                let new_index = store.create(new_resource);
 
                indices.push(new_index);
 
            }
 

	
 
            // Checking round
 
            for el_index in indices.iter().copied() {
 
                let element = store.get(el_index);
 
                assert_eq!(element.val, el_index as u64);
 
            }
 

	
 
            // Destruction round
 
            for el_index in indices.iter().copied() {
 
                store.destroy(el_index);
 
            }
 

	
 
            indices.clear();
 
        }
 

	
 
        let num_ctor_calls = ctor_counter.load(Ordering::Acquire);
 
        let num_dtor_calls = dtor_counter.load(Ordering::Acquire);
 
        assert_eq!(num_ctor_calls, num_dtor_calls);
 
        assert_eq!(num_ctor_calls, (NUM_ROUNDS * NUM_ELEMENTS) as u64);
 
    }
 

	
 
    #[test]
 
    fn test_ctor_dtor_simple_threaded() {
 
        const MAX_SIZE: usize = 1024;
 
        const NUM_THREADS: usize = 4;
 
        const NUM_PER_THREAD: usize = MAX_SIZE / NUM_THREADS;
 
        const NUM_ROUNDS: usize = 4;
 

	
 
        assert!(MAX_SIZE % NUM_THREADS == 0);
 

	
 
        let store = Arc::new(ComponentStore::new(16));
 
        let ctor_counter = Arc::new(AtomicU64::new(0));
 
        let dtor_counter = Arc::new(AtomicU64::new(0));
 

	
 
        let mut threads = Vec::with_capacity(NUM_THREADS);
 
        for thread_index in 0..NUM_THREADS {
 
            // Setup local clones to move into the thread
 
            let store = store.clone();
 
            let first_index = thread_index * NUM_PER_THREAD;
 
            let last_index = (thread_index + 1) * NUM_PER_THREAD;
 
            let ctor_counter = ctor_counter.clone();
 
            let dtor_counter = dtor_counter.clone();
 

	
 
            let handle = std::thread::spawn(move || {
 
                let mut indices = Vec::with_capacity(last_index - first_index);
 
                for _round_index in 0..NUM_ROUNDS {
 
                    // Creation round
 
                    for value in first_index..last_index {
 
                        let el_index = store.create(Resource::new(ctor_counter.clone(), dtor_counter.clone(), value as u64));
 
                        indices.push(el_index);
 
                    }
 

	
 
                    // Checking round
 
                    for (value_offset, el_index) in indices.iter().copied().enumerate() {
 
                        let element = store.get(el_index);
 
                        assert_eq!(element.val, (first_index + value_offset) as u64);
 
                    }
 

	
 
                    // Destruction round
 
                    for el_index in indices.iter().copied() {
 
                        store.destroy(el_index);
 
                    }
 

	
 
                    indices.clear();
 
                }
 
            });
 
            threads.push(handle);
 
        }
 

	
src/runtime2/store/mod.rs
Show inline comments
 
pub mod component;
 
pub mod unfair_se_lock;
 
pub mod queue_mpsc;
 

	
 
pub(crate) use component::ComponentStore;
 

	
 
#[cfg(test)]
 
mod tests {
 
    use std::sync::Arc;
 
    use std::sync::atomic::{AtomicU64, Ordering};
 

	
 
    // Utility resource structure that counts the number of constructors and
 
    // destructor calls.
 
    pub struct Resource {
 
        dtor: Arc<AtomicU64>,
 
        val: u64,
 
    }
 

	
 
    impl Resource {
 
        fn new(ctor: Arc<AtomicU64>, dtor: Arc<AtomicU64>, val: u64) -> Self {
 
            ctor.fetch_add(1, Ordering::SeqCst);
 
            return Self{ dtor, val };
 
        }
 
    }
 

	
 
    impl Drop for Resource {
 
        fn drop(&mut self) {
 
            self.dtor.fetch_add(1, Ordering::SeqCst);
 
        }
 
    }
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)