Changeset - ead29a08c0cf
[Not reviewed]
0 2 0
mh - 3 years ago 2022-01-07 11:15:57
contact@maxhenger.nl
WIP: Adding ctor/dtor tests to MPSC queue
2 files changed with 27 insertions and 18 deletions:
0 comments (0 inline, 0 general)
src/runtime2/store/component.rs
Show inline comments
 
@@ -147,409 +147,392 @@ impl<T: Sized> ComponentStore<T> {
 

	
 
            loop {
 
                let preemptive_read = read_lock.freelist[read_index & read_lock.index_mask];
 
                if let Err(actual_read_index) = self.read_head.compare_exchange(
 
                    read_index, (read_index + 1) & read_lock.compare_mask,
 
                    Ordering::AcqRel, Ordering::Acquire
 
                ) {
 
                    // We need to try again
 
                    read_index = actual_read_index;
 
                    continue 'attempt_read;
 
                }
 

	
 
                // If here then we performed the read
 
                return (read_lock, preemptive_read);
 
            }
 
        }
 
    }
 

	
 
    #[inline]
 
    fn initialize_at_index(&self, read_lock: InnerRead<T>, index: u32, value: T) {
 
        let mut target_ptr = read_lock.data[index as usize];
 

	
 
        unsafe {
 
            if target_ptr.is_null() {
 
                let layout = Layout::for_value(&value);
 
                target_ptr = std::alloc::alloc(layout).cast();
 
                let rewrite: *mut *mut T = transmute(read_lock.data.as_ptr());
 
                *rewrite.add(index as usize) = target_ptr;
 
            }
 

	
 
            std::ptr::write(target_ptr, value);
 
        }
 
    }
 

	
 
    #[inline]
 
    fn push_freelist_index(&self, read_lock: &InnerRead<T>, index_to_put_back: u32) {
 
        // Acquire an index in the freelist to which we can write
 
        let mut cur_write_index = self.write_head.load(Ordering::Relaxed);
 
        let mut new_write_index = (cur_write_index + 1) & read_lock.compare_mask;
 
        while let Err(actual_write_index) = self.write_head.compare_exchange(
 
            cur_write_index, new_write_index,
 
            Ordering::AcqRel, Ordering::Acquire
 
        ) {
 
            cur_write_index = actual_write_index;
 
            new_write_index = (cur_write_index + 1) & read_lock.compare_mask;
 
        }
 

	
 
        // We own the data at the index, write to it and notify reader through
 
        // limit_head that it can be read from. Note that we cheat around the
 
        // rust mutability system here :)
 
        unsafe {
 
            let target: *mut u32 = transmute(read_lock.freelist.as_ptr());
 
            *(target.add(cur_write_index & read_lock.index_mask)) = index_to_put_back;
 
        }
 

	
 
        // Essentially spinlocking, relaxed failure ordering because the logic
 
        // is that a write first moves the `write_head`, then the `limit_head`.
 
        while let Err(_) = self.limit_head.compare_exchange(
 
            cur_write_index, new_write_index,
 
            Ordering::AcqRel, Ordering::Relaxed
 
        ) {};
 
    }
 

	
 
    #[inline]
 
    fn destruct_at_index(&self, read_lock: &InnerRead<T>, index: u32) {
 
        let target_ptr = read_lock.data[index as usize];
 
        unsafe{ ptr::drop_in_place(target_ptr); }
 
    }
 

	
 
    // NOTE: Bit of a mess, and could have a cleanup with better logic for the
 
    // resizing. Maybe even a different indexing scheme...
 
    fn reallocate(&self, old_size: usize, inner: InnerRead<T>) -> InnerRead<T> {
 
        drop(inner);
 
        {
 
            // After dropping read lock, acquire write lock
 
            let mut lock = self.inner.lock_exclusive();
 

	
 
            if old_size == lock.size {
 
                // We are the thread that is supposed to reallocate
 
                let new_size = old_size * 2;
 
                Self::assert_valid_size(new_size);
 

	
 
                // Note that the atomic indices are in the range [0, new_size)
 
                // already, so we need to be careful
 
                let new_index_mask = new_size - 1;
 
                let new_compare_mask = (2 * new_size) - 1;
 
                lock.data.resize(new_size, ptr::null_mut());
 
                lock.freelist.resize(new_size, 0);
 
                for idx in 0..old_size {
 
                    lock.freelist[old_size + idx] = lock.freelist[idx];
 
                }
 

	
 
                // We need to fill the freelist with the indices of all of the
 
                // new elements that we have just created.
 
                debug_assert_eq!(self.limit_head.load(Ordering::SeqCst), self.write_head.load(Ordering::SeqCst));
 
                let old_read_index = self.read_head.load(Ordering::SeqCst);
 
                let old_write_index = self.write_head.load(Ordering::SeqCst);
 

	
 
                if old_read_index > old_write_index {
 
                    // Read index wraps, so keep it as-is and fill
 
                    let new_read_index = old_read_index + old_size;
 
                    for index in 0..old_size {
 
                        let target_idx = (new_read_index + index) & new_index_mask;
 
                        lock.freelist[target_idx] = (old_size + index) as u32;
 
                    }
 

	
 
                    self.read_head.store(new_read_index, Ordering::SeqCst);
 
                    debug_assert!(new_read_index < 2*new_size);
 
                    debug_assert!(old_write_index.wrapping_sub(new_read_index) & new_compare_mask <= new_size);
 
                } else {
 
                    // No wrapping, so increment write index
 
                    let new_write_index = old_write_index + old_size;
 
                    for index in 0..old_size {
 
                        let target_idx = (old_write_index + index) & new_index_mask;
 
                        lock.freelist[target_idx] = (old_size + index) as u32;
 
                    }
 

	
 
                    // Update write/limit heads
 
                    self.write_head.store(new_write_index, Ordering::SeqCst);
 
                    self.limit_head.store(new_write_index, Ordering::SeqCst);
 
                    debug_assert!(new_write_index < 2*new_size);
 
                    debug_assert!(new_write_index.wrapping_sub(old_read_index) & new_compare_mask <= new_size);
 
                }
 

	
 
                // Update sizes and masks
 
                lock.size = new_size;
 
                lock.compare_mask = new_compare_mask;
 
                lock.index_mask = new_index_mask;
 
            } // else: someone else allocated, so we don't have to
 
        }
 

	
 
        // We've dropped the write lock, acquire the read lock again
 
        return self.inner.lock_shared();
 
    }
 

	
 
    #[inline]
 
    fn assert_valid_size(size: usize) {
 
        // Condition the size needs to adhere to. Some are a bit excessive, but
 
        // we don't hit this check very often
 
        assert!(
 
            size.is_power_of_two() &&
 
                size >= 4 &&
 
                size <= usize::MAX / 2 &&
 
                size <= u32::MAX as usize
 
        );
 
    }
 
}
 

	
 
impl<T: Sized> Drop for ComponentStore<T> {
 
    fn drop(&mut self) {
 
        let value_layout = Layout::from_size_align(
 
            std::mem::size_of::<T>(), std::mem::align_of::<T>()
 
        ).unwrap();
 

	
 
        // Note that if the indices exist in the freelist then the destructor
 
        // has already been called. So handle them first
 
        let mut lock = self.inner.lock_exclusive();
 

	
 
        let read_index = self.read_head.load(Ordering::Acquire);
 
        let write_index = self.write_head.load(Ordering::Acquire);
 
        debug_assert_eq!(write_index, self.limit_head.load(Ordering::Acquire));
 

	
 
        let mut index = read_index;
 
        while index != write_index {
 
            let dealloc_index = lock.freelist[index & lock.index_mask] as usize;
 
            let target_ptr = lock.data[dealloc_index];
 

	
 
            unsafe {
 
                dealloc(target_ptr.cast(), value_layout);
 
                lock.data[dealloc_index] = ptr::null_mut();
 
            }
 

	
 
            index += 1;
 
            index &= lock.compare_mask;
 
        }
 

	
 
        // With all of those set to null, we'll just iterate through all
 
        // pointers and destruct+deallocate the ones not set to null yet
 
        for target_ptr in lock.data.iter().copied() {
 
            if !target_ptr.is_null() {
 
                unsafe {
 
                    ptr::drop_in_place(target_ptr);
 
                    dealloc(target_ptr.cast(), value_layout);
 
                }
 
            }
 
        }
 
    }
 
}
 

	
 
#[cfg(test)]
 
mod tests {
 
    use super::*;
 
    use crate::runtime2::store::tests::Resource;
 

	
 
    use rand::prelude::*;
 
    use rand_pcg::Pcg32;
 

	
 
    use std::sync::Arc;
 
    use std::sync::atomic::{AtomicU64, Ordering};
 

	
 
    pub struct Resource {
 
        dtor: Arc<AtomicU64>,
 
        val: u64,
 
    }
 

	
 
    impl Resource {
 
        fn new(ctor: Arc<AtomicU64>, dtor: Arc<AtomicU64>, val: u64) -> Self {
 
            ctor.fetch_add(1, Ordering::SeqCst);
 
            return Self{ dtor, val };
 
        }
 
    }
 

	
 
    impl Drop for Resource {
 
        fn drop(&mut self) {
 
            self.dtor.fetch_add(1, Ordering::SeqCst);
 
        }
 
    }
 

	
 
    fn seeds() -> Vec<[u8;16]> {
 
        return vec![
 
            [241, 47, 70, 87, 240, 246, 20, 173, 219, 143, 74, 23, 158, 58, 205, 172],
 
            [178, 112, 230, 205, 230, 178, 2, 90, 162, 218, 49, 196, 224, 222, 208, 43],
 
            [245, 42, 35, 167, 153, 205, 221, 144, 200, 253, 144, 117, 176, 231, 17, 70],
 
            [143, 39, 177, 216, 124, 96, 225, 39, 30, 82, 239, 193, 133, 58, 255, 193],
 
            [25, 105, 10, 52, 161, 212, 190, 112, 178, 193, 68, 249, 167, 153, 172, 144],
 
        ]
 
    }
 

	
 
    #[test]
 
    fn test_ctor_dtor_simple_unthreaded() {
 
        const NUM_ROUNDS: usize = 5;
 
        const NUM_ELEMENTS: usize = 1024;
 

	
 
        let store = ComponentStore::new(32);
 
        let ctor_counter = Arc::new(AtomicU64::new(0));
 
        let dtor_counter = Arc::new(AtomicU64::new(0));
 

	
 
        let mut indices = Vec::with_capacity(NUM_ELEMENTS);
 
        for _round_index in 0..NUM_ROUNDS {
 
            // Creation round
 
            for value in 0..NUM_ELEMENTS {
 
                let new_resource = Resource::new(ctor_counter.clone(), dtor_counter.clone(), value as u64);
 
                let new_index = store.create(new_resource);
 
                indices.push(new_index);
 
            }
 

	
 
            // Checking round
 
            for el_index in indices.iter().copied() {
 
                let element = store.get(el_index);
 
                assert_eq!(element.val, el_index as u64);
 
            }
 

	
 
            // Destruction round
 
            for el_index in indices.iter().copied() {
 
                store.destroy(el_index);
 
            }
 

	
 
            indices.clear();
 
        }
 

	
 
        let num_ctor_calls = ctor_counter.load(Ordering::Acquire);
 
        let num_dtor_calls = dtor_counter.load(Ordering::Acquire);
 
        assert_eq!(num_ctor_calls, num_dtor_calls);
 
        assert_eq!(num_ctor_calls, (NUM_ROUNDS * NUM_ELEMENTS) as u64);
 
    }
 

	
 
    #[test]
 
    fn test_ctor_dtor_simple_threaded() {
 
        const MAX_SIZE: usize = 1024;
 
        const NUM_THREADS: usize = 4;
 
        const NUM_PER_THREAD: usize = MAX_SIZE / NUM_THREADS;
 
        const NUM_ROUNDS: usize = 4;
 

	
 
        assert!(MAX_SIZE % NUM_THREADS == 0);
 

	
 
        let store = Arc::new(ComponentStore::new(16));
 
        let ctor_counter = Arc::new(AtomicU64::new(0));
 
        let dtor_counter = Arc::new(AtomicU64::new(0));
 

	
 
        let mut threads = Vec::with_capacity(NUM_THREADS);
 
        for thread_index in 0..NUM_THREADS {
 
            // Setup local clones to move into the thread
 
            let store = store.clone();
 
            let first_index = thread_index * NUM_PER_THREAD;
 
            let last_index = (thread_index + 1) * NUM_PER_THREAD;
 
            let ctor_counter = ctor_counter.clone();
 
            let dtor_counter = dtor_counter.clone();
 

	
 
            let handle = std::thread::spawn(move || {
 
                let mut indices = Vec::with_capacity(last_index - first_index);
 
                for _round_index in 0..NUM_ROUNDS {
 
                    // Creation round
 
                    for value in first_index..last_index {
 
                        let el_index = store.create(Resource::new(ctor_counter.clone(), dtor_counter.clone(), value as u64));
 
                        indices.push(el_index);
 
                    }
 

	
 
                    // Checking round
 
                    for (value_offset, el_index) in indices.iter().copied().enumerate() {
 
                        let element = store.get(el_index);
 
                        assert_eq!(element.val, (first_index + value_offset) as u64);
 
                    }
 

	
 
                    // Destruction round
 
                    for el_index in indices.iter().copied() {
 
                        store.destroy(el_index);
 
                    }
 

	
 
                    indices.clear();
 
                }
 
            });
 
            threads.push(handle);
 
        }
 

	
 
        for thread in threads {
 
            thread.join().expect("clean exit");
 
        }
 

	
 
        let num_ctor_calls = ctor_counter.load(Ordering::Acquire);
 
        let num_dtor_calls = dtor_counter.load(Ordering::Acquire);
 
        assert_eq!(num_ctor_calls, num_dtor_calls);
 
        assert_eq!(num_ctor_calls, (NUM_ROUNDS * MAX_SIZE) as u64);
 
    }
 

	
 
    #[test]
 
    fn test_ctor_dtor_random_threaded() {
 
        const NUM_ROUNDS: usize = 4;
 
        const NUM_THREADS: usize = 4;
 
        const NUM_OPERATIONS: usize = 1024;
 
        const NUM_OPS_PER_THREAD: usize = NUM_OPERATIONS / NUM_THREADS;
 
        const NUM_OPS_PER_ROUND: usize = NUM_OPS_PER_THREAD / NUM_ROUNDS;
 
        const NUM_STORED_PER_THREAD: usize = 32;
 

	
 
        assert!(NUM_OPERATIONS % NUM_THREADS == 0);
 
        assert!(NUM_OPS_PER_THREAD / 2 > NUM_STORED_PER_THREAD);
 

	
 
        let seeds = seeds();
 
        for seed_index in 0..seeds.len() {
 
            // Setup store, counters and threads
 
            let store = Arc::new(ComponentStore::new(16));
 
            let ctor_counter = Arc::new(AtomicU64::new(0));
 
            let dtor_counter = Arc::new(AtomicU64::new(0));
 

	
 
            let mut threads = Vec::with_capacity(NUM_THREADS);
 
            for thread_index in 0..NUM_THREADS {
 
                // Setup local clones to move into the thread
 
                let store = store.clone();
 
                let ctor_counter = ctor_counter.clone();
 
                let dtor_counter = dtor_counter.clone();
 

	
 
                // Setup local rng
 
                let mut seed = seeds[seed_index];
 
                for seed_val_idx in 0..16 {
 
                    seed[seed_val_idx] ^= thread_index as u8; // blegh
 
                }
 
                let mut rng = Pcg32::from_seed(seed);
 

	
 
                let handle = std::thread::spawn(move || {
 
                    let mut stored = Vec::with_capacity(NUM_STORED_PER_THREAD);
 

	
 
                    for _round_index in 0..NUM_ROUNDS {
 
                        // Modify store elements in the store randomly, for some
 
                        // silly definition of random
 
                        for _op_index in 0..NUM_OPS_PER_ROUND {
 
                            // Perform a single operation, depending on current
 
                            // size of the number of values owned by this thread
 
                            let new_value = rng.next_u64();
 
                            let should_create = rng.next_u32() % 2 == 0;
 
                            let is_empty = stored.is_empty();
 
                            let is_full = stored.len() == NUM_STORED_PER_THREAD;
 

	
 
                            if is_empty || (!is_full && should_create) {
 
                                // Must create
 
                                let el_index = store.create(Resource::new(
 
                                    ctor_counter.clone(), dtor_counter.clone(), new_value
 
                                ));
 
                                stored.push((el_index, new_value));
 
                            } else {
 
                                // Must destroy
 
                                let stored_index = new_value as usize % stored.len();
 
                                let (el_index, el_value) = stored.remove(stored_index);
 
                                store.destroy(el_index);
 
                            }
 
                        }
 

	
 
                        // Checking if the values we own still make sense
 
                        for (el_index, value) in stored.iter().copied() {
 
                            let gotten = store.get(el_index);
 
                            assert_eq!(value, gotten.val, "failed at thread {} value {}", thread_index, el_index);
 
                        }
 
                    }
 

	
 
                    return stored.len(); // return number of remaining elements
 
                });
 
                threads.push(handle);
 
            }
 

	
 
            // Done with the current round
 
            let mut total_left_allocated = 0;
 
            for thread in threads {
 
                let num_still_stored = thread.join().unwrap();
 
                total_left_allocated += num_still_stored as u64;
 
            }
 

	
 
            // Before store is dropped
 
            let num_ctor_calls = ctor_counter.load(Ordering::Acquire);
 
            let num_dtor_calls = dtor_counter.load(Ordering::Acquire);
 
            assert_eq!(num_ctor_calls - total_left_allocated, num_dtor_calls);
 

	
 
            // After store is dropped
src/runtime2/store/mod.rs
Show inline comments
 
pub mod component;
 
pub mod unfair_se_lock;
 
pub mod queue_mpsc;
 

	
 
pub(crate) use component::ComponentStore;
 

	
 
#[cfg(test)]
 
mod tests {
 
    use std::sync::Arc;
 
    use std::sync::atomic::{AtomicU64, Ordering};
 

	
 
    // Utility resource structure that counts the number of constructors and
 
    // destructor calls.
 
    pub struct Resource {
 
        dtor: Arc<AtomicU64>,
 
        val: u64,
 
    }
 

	
 
    impl Resource {
 
        fn new(ctor: Arc<AtomicU64>, dtor: Arc<AtomicU64>, val: u64) -> Self {
 
            ctor.fetch_add(1, Ordering::SeqCst);
 
            return Self{ dtor, val };
 
        }
 
    }
 

	
 
    impl Drop for Resource {
 
        fn drop(&mut self) {
 
            self.dtor.fetch_add(1, Ordering::SeqCst);
 
        }
 
    }
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)