/* * Component Store * * Concurrent datastructure for creating/destroying/retrieving components using * their ID. It is essentially a variation on a concurrent freelist. We store an * array of (potentially null) pointers to data. Indices into this array that * are unused (but may be left allocated) are in a freelist. So creating a new * bit of data involves taking an index from this freelist. Destruction involves * putting the index back. * * This datastructure takes care of the threadsafe implementation of the * freelist and calling the data's destructor when needed. Note that it is not * completely safe (in Rust's sense of the word) because it is possible to * get more than one mutable reference to a piece of data. Likewise it is * possible to put back bogus indices into the freelist, which will destroy the * integrity of the datastructure. * * Some underlying assumptions that led to this design (note that I haven't * actually checked these conditions or performed any real profiling, yet): * - Resizing the freelist should be very rare. The datastructure should grow * to some kind of maximum size and stay at that size. * - Creation should (preferably) be faster than deletion of data. Reason being * that creation implies we're creating a component that has code to be * executed. Better to quickly be able to execute code than being able to * quickly tear down finished components. * - Retrieval is much more likely than creation/destruction. * * Some obvious flaws with this implementation: * - Because of the freelist implementation we will generally allocate all of * the data pointers that are available (i.e. if we have a buffer of size * 64, but we generally use 33 elements, than we'll have 64 elements * allocated), which might be wasteful at larger array sizes (which are * always powers of two). * - A lot of concurrent operations are not necessary: we may move some of the * access to the global concurrent datastructure by an initial access to some * kind of thread-local datastructure. */ use std::mem::transmute; use std::alloc::{dealloc, Layout}; use std::ptr; use std::sync::atomic::{AtomicUsize, Ordering}; use super::unfair_se_lock::{UnfairSeLock, UnfairSeLockSharedGuard}; /// Generic store of components. Essentially a resizable freelist (implemented /// as a ringbuffer) combined with an array of actual elements. pub struct ComponentStore { inner: UnfairSeLock>, read_head: AtomicUsize, write_head: AtomicUsize, limit_head: AtomicUsize, } unsafe impl Send for ComponentStore{} unsafe impl Sync for ComponentStore{} /// Contents of the `ComponentStore` that require a shared/exclusive locking /// mechanism for consistency. struct Inner { freelist: Vec, data: Vec<*mut T>, size: usize, compare_mask: usize, index_mask: usize, } type InnerShared<'a, T> = UnfairSeLockSharedGuard<'a, Inner>; /// Reservation of a slot in the component store. Corresponds to the case where /// an index has been taken from the freelist, but the element has not yet been /// initialized pub struct ComponentReservation { pub(crate) index: u32, #[cfg(debug_assertions)] submitted: bool, } impl ComponentReservation { fn new(index: u32) -> Self { return Self{ index, #[cfg(debug_assertions)] submitted: false, } } } impl Drop for ComponentReservation { fn drop(&mut self) { debug_assert!(self.submitted); } } impl ComponentStore { pub fn new(initial_size: usize) -> Self { Self::assert_valid_size(initial_size); // Fill initial freelist and preallocate data array let mut initial_freelist = Vec::with_capacity(initial_size); for idx in 0..initial_size { initial_freelist.push(idx as u32) } let mut initial_data = Vec::new(); initial_data.resize(initial_size, ptr::null_mut()); // Return initial store return Self{ inner: UnfairSeLock::new(Inner{ freelist: initial_freelist, data: initial_data, size: initial_size, compare_mask: 2*initial_size - 1, index_mask: initial_size - 1, }), read_head: AtomicUsize::new(0), write_head: AtomicUsize::new(initial_size), limit_head: AtomicUsize::new(initial_size), }; } /// Creates a new element initialized to the provided `value`. This returns /// the index at which the element can be retrieved. pub fn create(&self, value: T) -> u32 { let lock = self.inner.lock_shared(); let (lock, index) = self.pop_freelist_index(lock); Self::initialize_at_index(lock, index, value); return index; } pub fn reserve(&self) -> ComponentReservation { let lock = self.inner.lock_shared(); let (_lock, index) = self.pop_freelist_index(lock); return ComponentReservation::new(index); } pub fn submit(&self, mut reservation: ComponentReservation, value: T) -> u32 { dbg_code!({ reservation.submitted = true; }); let lock = self.inner.lock_shared(); Self::initialize_at_index(lock, reservation.index, value); return reservation.index; } /// Destroys an element at the provided `index`. The caller must make sure /// that it does not use any previously received references to the data at /// this index, and that no more calls to `get` are performed using this /// index. This is allowed again if the index has been reacquired using /// `create`. pub fn destroy(&self, index: u32) { let lock = self.inner.lock_shared(); self.destruct_at_index(&lock, index); self.push_freelist_index(&lock, index); } /// Retrieves an element by reference pub fn get(&self, index: u32) -> &T { let lock = self.inner.lock_shared(); let value = lock.data[index as usize]; unsafe { debug_assert!(!value.is_null()); return &*value; } } /// Retrieves an element by mutable reference. The caller should ensure that /// use of that mutability is thread-safe pub fn get_mut(&self, index: u32) -> &mut T { let lock = self.inner.lock_shared(); let value = lock.data[index as usize]; unsafe { debug_assert!(!value.is_null()); return &mut *value; } } #[inline] fn pop_freelist_index<'a>(&'a self, mut shared_lock: InnerShared<'a, T>) -> (InnerShared<'a, T>, u32) { 'attempt_read: loop { // Load indices and check for reallocation condition let current_size = shared_lock.size; let mut read_index = self.read_head.load(Ordering::Relaxed); let limit_index = self.limit_head.load(Ordering::Acquire); if read_index == limit_index { shared_lock = self.reallocate(current_size, shared_lock); continue 'attempt_read; } loop { let preemptive_read = shared_lock.freelist[read_index & shared_lock.index_mask]; if let Err(actual_read_index) = self.read_head.compare_exchange( read_index, (read_index + 1) & shared_lock.compare_mask, Ordering::AcqRel, Ordering::Acquire ) { // We need to try again read_index = actual_read_index; continue 'attempt_read; } // If here then we performed the read return (shared_lock, preemptive_read); } } } #[inline] fn initialize_at_index(read_lock: InnerShared, index: u32, value: T) { let mut target_ptr = read_lock.data[index as usize]; unsafe { if target_ptr.is_null() { let layout = Layout::for_value(&value); target_ptr = std::alloc::alloc(layout).cast(); let rewrite: *mut *mut T = transmute(read_lock.data.as_ptr()); *rewrite.add(index as usize) = target_ptr; } std::ptr::write(target_ptr, value); } } #[inline] fn push_freelist_index(&self, read_lock: &InnerShared, index_to_put_back: u32) { // Acquire an index in the freelist to which we can write let mut cur_write_index = self.write_head.load(Ordering::Relaxed); let mut new_write_index = (cur_write_index + 1) & read_lock.compare_mask; while let Err(actual_write_index) = self.write_head.compare_exchange( cur_write_index, new_write_index, Ordering::AcqRel, Ordering::Acquire ) { cur_write_index = actual_write_index; new_write_index = (cur_write_index + 1) & read_lock.compare_mask; } // We own the data at the index, write to it and notify reader through // limit_head that it can be read from. Note that we cheat around the // rust mutability system here :) unsafe { let target: *mut u32 = transmute(read_lock.freelist.as_ptr()); *(target.add(cur_write_index & read_lock.index_mask)) = index_to_put_back; } // Essentially spinlocking, relaxed failure ordering because the logic // is that a write first moves the `write_head`, then the `limit_head`. while let Err(_) = self.limit_head.compare_exchange( cur_write_index, new_write_index, Ordering::AcqRel, Ordering::Relaxed ) {}; } #[inline] fn destruct_at_index(&self, read_lock: &InnerShared, index: u32) { let target_ptr = read_lock.data[index as usize]; unsafe{ ptr::drop_in_place(target_ptr); } } // NOTE: Bit of a mess, and could have a cleanup with better logic for the // resizing. Maybe even a different indexing scheme... fn reallocate(&self, old_size: usize, inner: InnerShared) -> InnerShared { drop(inner); { // After dropping read lock, acquire write lock let mut lock = self.inner.lock_exclusive(); if old_size == lock.size { // We are the thread that is supposed to reallocate let new_size = old_size * 2; Self::assert_valid_size(new_size); // Note that the atomic indices are in the range [0, new_size) // already, so we need to be careful let new_index_mask = new_size - 1; let new_compare_mask = (2 * new_size) - 1; lock.data.resize(new_size, ptr::null_mut()); lock.freelist.resize(new_size, 0); for idx in 0..old_size { lock.freelist[old_size + idx] = lock.freelist[idx]; } // We need to fill the freelist with the indices of all of the // new elements that we have just created. debug_assert_eq!(self.limit_head.load(Ordering::SeqCst), self.write_head.load(Ordering::SeqCst)); let old_read_index = self.read_head.load(Ordering::SeqCst); let old_write_index = self.write_head.load(Ordering::SeqCst); if old_read_index > old_write_index { // Read index wraps, so keep it as-is and fill let new_read_index = old_read_index + old_size; for index in 0..old_size { let target_idx = (new_read_index + index) & new_index_mask; lock.freelist[target_idx] = (old_size + index) as u32; } self.read_head.store(new_read_index, Ordering::SeqCst); debug_assert!(new_read_index < 2*new_size); debug_assert!(old_write_index.wrapping_sub(new_read_index) & new_compare_mask <= new_size); } else { // No wrapping, so increment write index let new_write_index = old_write_index + old_size; for index in 0..old_size { let target_idx = (old_write_index + index) & new_index_mask; lock.freelist[target_idx] = (old_size + index) as u32; } // Update write/limit heads self.write_head.store(new_write_index, Ordering::SeqCst); self.limit_head.store(new_write_index, Ordering::SeqCst); debug_assert!(new_write_index < 2*new_size); debug_assert!(new_write_index.wrapping_sub(old_read_index) & new_compare_mask <= new_size); } // Update sizes and masks lock.size = new_size; lock.compare_mask = new_compare_mask; lock.index_mask = new_index_mask; } // else: someone else allocated, so we don't have to } // We've dropped the write lock, acquire the read lock again return self.inner.lock_shared(); } #[inline] fn assert_valid_size(size: usize) { // Condition the size needs to adhere to. Some are a bit excessive, but // we don't hit this check very often assert!( size.is_power_of_two() && size >= 4 && size <= usize::MAX / 2 && size <= u32::MAX as usize ); } } impl Drop for ComponentStore { fn drop(&mut self) { let value_layout = Layout::from_size_align( std::mem::size_of::(), std::mem::align_of::() ).unwrap(); // Note that if the indices exist in the freelist then the destructor // has already been called. So handle them first let mut lock = self.inner.lock_exclusive(); let read_index = self.read_head.load(Ordering::Acquire); let write_index = self.write_head.load(Ordering::Acquire); debug_assert_eq!(write_index, self.limit_head.load(Ordering::Acquire)); let mut index = read_index; while index != write_index { let dealloc_index = lock.freelist[index & lock.index_mask] as usize; let target_ptr = lock.data[dealloc_index]; unsafe { dealloc(target_ptr.cast(), value_layout); lock.data[dealloc_index] = ptr::null_mut(); } index += 1; index &= lock.compare_mask; } // With all of those set to null, we'll just iterate through all // pointers and destruct+deallocate the ones not set to null yet for target_ptr in lock.data.iter().copied() { if !target_ptr.is_null() { unsafe { ptr::drop_in_place(target_ptr); dealloc(target_ptr.cast(), value_layout); } } } } } #[cfg(test)] mod tests { use super::*; use super::super::tests::*; use rand::prelude::*; use rand_pcg::Pcg32; fn seeds() -> Vec<[u8;16]> { return vec![ [241, 47, 70, 87, 240, 246, 20, 173, 219, 143, 74, 23, 158, 58, 205, 172], [178, 112, 230, 205, 230, 178, 2, 90, 162, 218, 49, 196, 224, 222, 208, 43], [245, 42, 35, 167, 153, 205, 221, 144, 200, 253, 144, 117, 176, 231, 17, 70], [143, 39, 177, 216, 124, 96, 225, 39, 30, 82, 239, 193, 133, 58, 255, 193], [25, 105, 10, 52, 161, 212, 190, 112, 178, 193, 68, 249, 167, 153, 172, 144], ] } #[test] fn test_ctor_dtor_simple_unthreaded() { const NUM_ROUNDS: usize = 5; const NUM_ELEMENTS: usize = 1024; let store = ComponentStore::new(32); let counters = Counters::new(); let mut indices = Vec::with_capacity(NUM_ELEMENTS); for _round_index in 0..NUM_ROUNDS { // Creation round for value in 0..NUM_ELEMENTS { let new_resource = Resource::new(&counters, value as u64); let new_index = store.create(new_resource); indices.push(new_index); } // Checking round for el_index in indices.iter().copied() { let element = store.get(el_index); assert_eq!(element.val, el_index as u64); } // Destruction round for el_index in indices.iter().copied() { store.destroy(el_index); } indices.clear(); } let num_expected = (NUM_ROUNDS * NUM_ELEMENTS) as u64; assert_ctor_eq!(counters, num_expected); assert_dtor_eq!(counters, num_expected); } #[test] fn test_ctor_dtor_simple_threaded() { const MAX_SIZE: usize = 1024; const NUM_THREADS: usize = 4; const NUM_PER_THREAD: usize = MAX_SIZE / NUM_THREADS; const NUM_ROUNDS: usize = 4; assert!(MAX_SIZE % NUM_THREADS == 0); let store = Arc::new(ComponentStore::new(16)); let counters = Counters::new(); let mut threads = Vec::with_capacity(NUM_THREADS); for thread_index in 0..NUM_THREADS { // Setup local clones to move into the thread let store = store.clone(); let first_index = thread_index * NUM_PER_THREAD; let last_index = (thread_index + 1) * NUM_PER_THREAD; let counters = counters.clone(); let handle = std::thread::spawn(move || { let mut indices = Vec::with_capacity(last_index - first_index); for _round_index in 0..NUM_ROUNDS { // Creation round for value in first_index..last_index { let el_index = store.create(Resource::new(&counters, value as u64)); indices.push(el_index); } // Checking round for (value_offset, el_index) in indices.iter().copied().enumerate() { let element = store.get(el_index); assert_eq!(element.val, (first_index + value_offset) as u64); } // Destruction round for el_index in indices.iter().copied() { store.destroy(el_index); } indices.clear(); } }); threads.push(handle); } for thread in threads { thread.join().expect("clean exit"); } let num_expected = (NUM_ROUNDS * MAX_SIZE) as u64; assert_ctor_eq!(counters, num_expected); assert_dtor_eq!(counters, num_expected); } #[test] fn test_ctor_dtor_random_threaded() { const NUM_ROUNDS: usize = 4; const NUM_THREADS: usize = 4; const NUM_OPERATIONS: usize = 1024; const NUM_OPS_PER_THREAD: usize = NUM_OPERATIONS / NUM_THREADS; const NUM_OPS_PER_ROUND: usize = NUM_OPS_PER_THREAD / NUM_ROUNDS; const NUM_STORED_PER_THREAD: usize = 32; assert!(NUM_OPERATIONS % NUM_THREADS == 0); assert!(NUM_OPS_PER_THREAD / 2 > NUM_STORED_PER_THREAD); let seeds = seeds(); for seed_index in 0..seeds.len() { // Setup store, counters and threads let store = Arc::new(ComponentStore::new(16)); let counters = Counters::new(); let mut threads = Vec::with_capacity(NUM_THREADS); for thread_index in 0..NUM_THREADS { // Setup local clones to move into the thread let store = store.clone(); let counters = counters.clone(); // Setup local rng let mut seed = seeds[seed_index]; for seed_val_idx in 0..16 { seed[seed_val_idx] ^= thread_index as u8; // blegh } let mut rng = Pcg32::from_seed(seed); let handle = std::thread::spawn(move || { let mut stored = Vec::with_capacity(NUM_STORED_PER_THREAD); for _round_index in 0..NUM_ROUNDS { // Modify store elements in the store randomly, for some // silly definition of random for _op_index in 0..NUM_OPS_PER_ROUND { // Perform a single operation, depending on current // size of the number of values owned by this thread let new_value = rng.next_u64(); let should_create = rng.next_u32() % 2 == 0; let is_empty = stored.is_empty(); let is_full = stored.len() == NUM_STORED_PER_THREAD; if is_empty || (!is_full && should_create) { // Must create let el_index = store.create(Resource::new(&counters, new_value)); stored.push((el_index, new_value)); } else { // Must destroy let stored_index = new_value as usize % stored.len(); let (el_index, el_value) = stored.remove(stored_index); store.destroy(el_index); } } // Checking if the values we own still make sense for (el_index, value) in stored.iter().copied() { let gotten = store.get(el_index); assert_eq!(value, gotten.val, "failed at thread {} value {}", thread_index, el_index); } } return stored.len(); // return number of remaining elements }); threads.push(handle); } // Done with the current round let mut total_left_allocated = 0; for thread in threads { let num_still_stored = thread.join().unwrap(); total_left_allocated += num_still_stored as u64; } // Before store is dropped // note: cannot determine number of creations, creation/destructor // is random let num_ctor = counters.ctor.load(Ordering::Acquire); assert_dtor_eq!(counters, num_ctor - total_left_allocated); // After store is dropped drop(store); assert_dtor_eq!(counters, num_ctor); } } }