diff --git a/src/runtime2/store/component.rs b/src/runtime2/store/component.rs new file mode 100644 index 0000000000000000000000000000000000000000..12c56123b5ca25e899ce0ddd1ad8bb72747f72de --- /dev/null +++ b/src/runtime2/store/component.rs @@ -0,0 +1,559 @@ +/* + * Component Store + * + * Concurrent datastructure for creating/destroying/retrieving components using + * their ID. It is essentially a variation on a concurrent freelist. We store an + * array of (potentially null) pointers to data. Indices into this array that + * are unused (but may be left allocated) are in a freelist. So creating a new + * bit of data involves taking an index from this freelist. Destruction involves + * putting the index back. + * + * This datastructure takes care of the threadsafe implementation of the + * freelist and calling the data's destructor when needed. Note that it is not + * completely safe (in Rust's sense of the word) because it is possible to + * get more than one mutable reference to a piece of data. Likewise it is + * possible to put back bogus indices into the freelist, which will destroy the + * integrity of the datastructure. + * + * Some underlying assumptions that led to this design (note that I haven't + * actually checked these conditions or performed any real profiling, yet): + * - Resizing the freelist should be very rare. The datastructure should grow + * to some kind of maximum size and stay at that size. + * - Creation should (preferably) be faster than deletion of data. Reason being + * that creation implies we're creating a component that has code to be + * executed. Better to quickly be able to execute code than being able to + * quickly tear down finished components. + * - Retrieval is much more likely than creation/destruction. + * + * Some obvious flaws with this implementation: + * - Because of the freelist implementation we will generally allocate all of + * the data pointers that are available (i.e. if we have a buffer of size + * 64, but we generally use 33 elements, than we'll have 64 elements + * allocated), which might be wasteful at larger array sizes (which are + * always powers of two). + * - A lot of concurrent operations are not necessary: we may move some of the + * access to the global concurrent datastructure by an initial access to some + * kind of thread-local datastructure. + */ + +use std::mem::transmute; +use std::alloc::{alloc, dealloc, Layout}; +use std::ptr; +use std::sync::atomic::{AtomicUsize, Ordering}; + +use super::unfair_se_lock::{UnfairSeLock, UnfairSeLockSharedGuard}; + +pub struct ComponentStore { + inner: UnfairSeLock>, + read_head: AtomicUsize, + write_head: AtomicUsize, + limit_head: AtomicUsize, +} + +unsafe impl Send for ComponentStore{} +unsafe impl Sync for ComponentStore{} + +struct Inner { + freelist: Vec, + data: Vec<*mut T>, + size: usize, + compare_mask: usize, + index_mask: usize, +} + +type InnerRead<'a, T> = UnfairSeLockSharedGuard<'a, Inner>; + +impl ComponentStore { + pub fn new(initial_size: usize) -> Self { + Self::assert_valid_size(initial_size); + + // Fill initial freelist and preallocate data array + let mut initial_freelist = Vec::with_capacity(initial_size); + for idx in 0..initial_size { + initial_freelist.push(idx as u32) + } + + let mut initial_data = Vec::new(); + initial_data.resize(initial_size, ptr::null_mut()); + + // Return initial store + return Self{ + inner: UnfairSeLock::new(Inner{ + freelist: initial_freelist, + data: initial_data, + size: initial_size, + compare_mask: 2*initial_size - 1, + index_mask: initial_size - 1, + }), + read_head: AtomicUsize::new(0), + write_head: AtomicUsize::new(initial_size), + limit_head: AtomicUsize::new(initial_size), + }; + } + + /// Creates a new element initialized to the provided `value`. This returns + /// the index at which the element can be retrieved. + pub fn create(&self, value: T) -> u32 { + let lock = self.inner.lock_shared(); + let (lock, index) = self.pop_freelist_index(lock); + self.initialize_at_index(lock, index, value); + return index; + } + + /// Destroys an element at the provided `index`. The caller must make sure + /// that it does not use any previously received references to the data at + /// this index, and that no more calls to `get` are performed using this + /// index. This is allowed again if the index has been reacquired using + /// `create`. + pub fn destroy(&self, index: u32) { + let lock = self.inner.lock_shared(); + self.destruct_at_index(&lock, index); + self.push_freelist_index(&lock, index); + } + + /// Retrieves an element by reference + pub fn get(&self, index: u32) -> &T { + let lock = self.inner.lock_shared(); + let value = lock.data[index as usize]; + unsafe { + debug_assert!(!value.is_null()); + return &*value; + } + } + + /// Retrieves an element by mutable reference. The caller should ensure that + /// use of that mutability is thread-safe + pub fn get_mut(&self, index: u32) -> &mut T { + let lock = self.inner.lock_shared(); + let value = lock.data[index as usize]; + unsafe { + debug_assert!(!value.is_null()); + return &mut *value; + } + } + + #[inline] + fn pop_freelist_index<'a>(&'a self, mut read_lock: InnerRead<'a, T>) -> (InnerRead<'a, T>, u32) { + 'attempt_read: loop { + // Load indices and check for reallocation condition + let current_size = read_lock.size; + let mut read_index = self.read_head.load(Ordering::Relaxed); + let limit_index = self.limit_head.load(Ordering::Acquire); + + if read_index == limit_index { + read_lock = self.reallocate(current_size, read_lock); + continue 'attempt_read; + } + + loop { + let preemptive_read = read_lock.freelist[read_index & read_lock.index_mask]; + if let Err(actual_read_index) = self.read_head.compare_exchange( + read_index, (read_index + 1) & read_lock.compare_mask, + Ordering::AcqRel, Ordering::Acquire + ) { + // We need to try again + read_index = actual_read_index; + continue 'attempt_read; + } + + // If here then we performed the read + return (read_lock, preemptive_read); + } + } + } + + #[inline] + fn initialize_at_index(&self, read_lock: InnerRead, index: u32, value: T) { + let mut target_ptr = read_lock.data[index as usize]; + + unsafe { + if target_ptr.is_null() { + let layout = Layout::for_value(&value); + target_ptr = std::alloc::alloc(layout).cast(); + let rewrite: *mut *mut T = transmute(read_lock.data.as_ptr()); + *rewrite.add(index as usize) = target_ptr; + } + + std::ptr::write(target_ptr, value); + } + } + + #[inline] + fn push_freelist_index(&self, read_lock: &InnerRead, index_to_put_back: u32) { + // Acquire an index in the freelist to which we can write + let mut cur_write_index = self.write_head.load(Ordering::Relaxed); + let mut new_write_index = (cur_write_index + 1) & read_lock.compare_mask; + while let Err(actual_write_index) = self.write_head.compare_exchange( + cur_write_index, new_write_index, + Ordering::AcqRel, Ordering::Acquire + ) { + cur_write_index = actual_write_index; + new_write_index = (cur_write_index + 1) & read_lock.compare_mask; + } + + // We own the data at the index, write to it and notify reader through + // limit_head that it can be read from. Note that we cheat around the + // rust mutability system here :) + unsafe { + let target: *mut u32 = transmute(read_lock.freelist.as_ptr()); + *(target.add(cur_write_index & read_lock.index_mask)) = index_to_put_back; + } + + // Essentially spinlocking, relaxed failure ordering because the logic + // is that a write first moves the `write_head`, then the `limit_head`. + while let Err(_) = self.limit_head.compare_exchange( + cur_write_index, new_write_index, + Ordering::AcqRel, Ordering::Relaxed + ) {}; + } + + #[inline] + fn destruct_at_index(&self, read_lock: &InnerRead, index: u32) { + let target_ptr = read_lock.data[index as usize]; + unsafe{ ptr::drop_in_place(target_ptr); } + } + + fn reallocate(&self, old_size: usize, inner: InnerRead) -> InnerRead { + drop(inner); + { + // After dropping read lock, acquire write lock + let mut lock = self.inner.lock_exclusive(); + + if old_size == lock.size { + // We are the thread that is supposed to reallocate + let new_size = old_size * 2; + Self::assert_valid_size(new_size); + + // Note that the atomic indices are in the range [0, new_size) + // already, so we need to be careful + let new_index_mask = new_size - 1; + let new_compare_mask = (2 * new_size) - 1; + lock.data.resize(new_size, ptr::null_mut()); + lock.freelist.resize(new_size, 0); + for idx in 0..old_size { + lock.freelist[old_size + idx] = lock.freelist[idx]; + } + + // We need to fill the freelist with the indices of all of the + // new elements that we have just created. + debug_assert_eq!(self.limit_head.load(Ordering::SeqCst), self.write_head.load(Ordering::SeqCst)); + let old_read_index = self.read_head.load(Ordering::SeqCst); + let old_write_index = self.write_head.load(Ordering::SeqCst); + + if old_read_index > old_write_index { + // Read index wraps, so keep it as-is and fill + let new_read_index = old_read_index + old_size; + for index in 0..old_size { + let target_idx = (new_read_index + index) & new_index_mask; + lock.freelist[target_idx] = (old_size + index) as u32; + } + + self.read_head.store(new_read_index, Ordering::SeqCst); + debug_assert!(new_read_index < 2*new_size); + debug_assert!(old_write_index.wrapping_sub(new_read_index) & new_compare_mask <= new_size); + } else { + // No wrapping, so increment write index + let new_write_index = old_write_index + old_size; + for index in 0..old_size { + let target_idx = (old_write_index + index) & new_index_mask; + lock.freelist[target_idx] = (old_size + index) as u32; + } + + // Update write/limit heads + self.write_head.store(new_write_index, Ordering::SeqCst); + self.limit_head.store(new_write_index, Ordering::SeqCst); + debug_assert!(new_write_index < 2*new_size); + debug_assert!(new_write_index.wrapping_sub(old_read_index) & new_compare_mask <= new_size); + } + + // Update sizes and masks + lock.size = new_size; + lock.compare_mask = new_compare_mask; + lock.index_mask = new_index_mask; + } // else: someone else allocated, so we don't have to + } + + // We've dropped the write lock, acquire the read lock again + return self.inner.lock_shared(); + } + + #[inline] + fn assert_valid_size(size: usize) { + // Condition the size needs to adhere to. Some are a bit excessive, but + // we don't hit this check very often + assert!( + size.is_power_of_two() && + size >= 4 && + size <= usize::MAX / 2 && + size <= u32::MAX as usize + ); + } +} + +impl Drop for ComponentStore { + fn drop(&mut self) { + let value_layout = Layout::from_size_align( + std::mem::size_of::(), std::mem::align_of::() + ).unwrap(); + + // Note that if the indices exist in the freelist then the destructor + // has already been called. So handle them first + let mut lock = self.inner.lock_exclusive(); + + let read_index = self.read_head.load(Ordering::Acquire); + let write_index = self.write_head.load(Ordering::Acquire); + debug_assert_eq!(write_index, self.limit_head.load(Ordering::Acquire)); + + let mut index = read_index; + while index != write_index { + let dealloc_index = lock.freelist[index & lock.index_mask] as usize; + let target_ptr = lock.data[dealloc_index]; + + unsafe { + dealloc(target_ptr.cast(), value_layout); + lock.data[dealloc_index] = ptr::null_mut(); + } + + index += 1; + index &= lock.compare_mask; + } + + // With all of those set to null, we'll just iterate through all + // pointers and destruct+deallocate the ones not set to null yet + for target_ptr in lock.data.iter().copied() { + if !target_ptr.is_null() { + unsafe { + ptr::drop_in_place(target_ptr); + dealloc(target_ptr.cast(), value_layout); + } + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use rand::prelude::*; + use rand_pcg::Pcg32; + + use std::sync::Arc; + use std::sync::atomic::{AtomicU64, Ordering}; + + pub struct Resource { + dtor: Arc, + val: u64, + } + + impl Resource { + fn new(ctor: Arc, dtor: Arc, val: u64) -> Self { + ctor.fetch_add(1, Ordering::SeqCst); + return Self{ dtor, val }; + } + } + + impl Drop for Resource { + fn drop(&mut self) { + self.dtor.fetch_add(1, Ordering::SeqCst); + } + } + + fn seeds() -> Vec<[u8;16]> { + return vec![ + [241, 47, 70, 87, 240, 246, 20, 173, 219, 143, 74, 23, 158, 58, 205, 172], + [178, 112, 230, 205, 230, 178, 2, 90, 162, 218, 49, 196, 224, 222, 208, 43], + [245, 42, 35, 167, 153, 205, 221, 144, 200, 253, 144, 117, 176, 231, 17, 70], + [143, 39, 177, 216, 124, 96, 225, 39, 30, 82, 239, 193, 133, 58, 255, 193], + [25, 105, 10, 52, 161, 212, 190, 112, 178, 193, 68, 249, 167, 153, 172, 144], + ] + } + + #[test] + fn test_ctor_dtor_simple_unthreaded() { + const NUM_ROUNDS: usize = 5; + const NUM_ELEMENTS: usize = 1024; + + let store = ComponentStore::new(32); + let ctor_counter = Arc::new(AtomicU64::new(0)); + let dtor_counter = Arc::new(AtomicU64::new(0)); + + let mut indices = Vec::with_capacity(NUM_ELEMENTS); + for _round_index in 0..NUM_ROUNDS { + // Creation round + for value in 0..NUM_ELEMENTS { + let new_resource = Resource::new(ctor_counter.clone(), dtor_counter.clone(), value as u64); + let new_index = store.create(new_resource); + indices.push(new_index); + } + + // Checking round + for el_index in indices.iter().copied() { + let element = store.get(el_index); + assert_eq!(element.val, el_index as u64); + } + + // Destruction round + for el_index in indices.iter().copied() { + store.destroy(el_index); + } + + indices.clear(); + } + + let num_ctor_calls = ctor_counter.load(Ordering::Acquire); + let num_dtor_calls = dtor_counter.load(Ordering::Acquire); + assert_eq!(num_ctor_calls, num_dtor_calls); + assert_eq!(num_ctor_calls, (NUM_ROUNDS * NUM_ELEMENTS) as u64); + } + + #[test] + fn test_ctor_dtor_simple_threaded() { + const MAX_SIZE: usize = 1024; + const NUM_THREADS: usize = 4; + const NUM_PER_THREAD: usize = MAX_SIZE / NUM_THREADS; + const NUM_ROUNDS: usize = 4; + + assert!(MAX_SIZE % NUM_THREADS == 0); + + let store = Arc::new(ComponentStore::new(16)); + let ctor_counter = Arc::new(AtomicU64::new(0)); + let dtor_counter = Arc::new(AtomicU64::new(0)); + + let mut threads = Vec::with_capacity(NUM_THREADS); + for thread_index in 0..NUM_THREADS { + // Setup local clones to move into the thread + let store = store.clone(); + let first_index = thread_index * NUM_PER_THREAD; + let last_index = (thread_index + 1) * NUM_PER_THREAD; + let ctor_counter = ctor_counter.clone(); + let dtor_counter = dtor_counter.clone(); + + let handle = std::thread::spawn(move || { + let mut indices = Vec::with_capacity(last_index - first_index); + for _round_index in 0..NUM_ROUNDS { + // Creation round + for value in first_index..last_index { + let el_index = store.create(Resource::new(ctor_counter.clone(), dtor_counter.clone(), value as u64)); + indices.push(el_index); + } + + // Checking round + for (value_offset, el_index) in indices.iter().copied().enumerate() { + let element = store.get(el_index); + assert_eq!(element.val, (first_index + value_offset) as u64); + } + + // Destruction round + for el_index in indices.iter().copied() { + store.destroy(el_index); + } + + indices.clear(); + } + }); + threads.push(handle); + } + + for thread in threads { + thread.join().expect("clean exit"); + } + + let num_ctor_calls = ctor_counter.load(Ordering::Acquire); + let num_dtor_calls = dtor_counter.load(Ordering::Acquire); + assert_eq!(num_ctor_calls, num_dtor_calls); + assert_eq!(num_ctor_calls, (NUM_ROUNDS * MAX_SIZE) as u64); + } + + #[test] + fn test_ctor_dtor_random_threaded() { + const NUM_ROUNDS: usize = 4; + const NUM_THREADS: usize = 4; + const NUM_OPERATIONS: usize = 1024; + const NUM_OPS_PER_THREAD: usize = NUM_OPERATIONS / NUM_THREADS; + const NUM_OPS_PER_ROUND: usize = NUM_OPS_PER_THREAD / NUM_ROUNDS; + const NUM_STORED_PER_THREAD: usize = 32; + + assert!(NUM_OPERATIONS % NUM_THREADS == 0); + assert!(NUM_OPS_PER_THREAD / 2 > NUM_STORED_PER_THREAD); + + let seeds = seeds(); + for seed_index in 0..seeds.len() { + // Setup store, counters and threads + let store = Arc::new(ComponentStore::new(16)); + let ctor_counter = Arc::new(AtomicU64::new(0)); + let dtor_counter = Arc::new(AtomicU64::new(0)); + + let mut threads = Vec::with_capacity(NUM_THREADS); + for thread_index in 0..NUM_THREADS { + // Setup local clones to move into the thread + let store = store.clone(); + let ctor_counter = ctor_counter.clone(); + let dtor_counter = dtor_counter.clone(); + + // Setup local rng + let mut seed = seeds[seed_index]; + for seed_val_idx in 0..16 { + seed[seed_val_idx] ^= thread_index as u8; // blegh + } + let mut rng = Pcg32::from_seed(seed); + + let handle = std::thread::spawn(move || { + let mut stored = Vec::with_capacity(NUM_STORED_PER_THREAD); + + for _round_index in 0..NUM_ROUNDS { + // Modify store elements in the store randomly, for some + // silly definition of random + for _op_index in 0..NUM_OPS_PER_ROUND { + // Perform a single operation, depending on current + // size of the number of values owned by this thread + let new_value = rng.next_u64(); + let should_create = rng.next_u32() % 2 == 0; + let is_empty = stored.is_empty(); + let is_full = stored.len() == NUM_STORED_PER_THREAD; + + if is_empty || (!is_full && should_create) { + // Must create + let el_index = store.create(Resource::new( + ctor_counter.clone(), dtor_counter.clone(), new_value + )); + stored.push((el_index, new_value)); + } else { + // Must destroy + let stored_index = new_value as usize % stored.len(); + let (el_index, el_value) = stored.remove(stored_index); + store.destroy(el_index); + } + } + + // Checking if the values we own still make sense + for (el_index, value) in stored.iter().copied() { + let gotten = store.get(el_index); + assert_eq!(value, gotten.val, "failed at thread {} value {}", thread_index, el_index); + } + } + + return stored.len(); // return number of remaining elements + }); + threads.push(handle); + } + + // Done with the current round + let mut total_left_allocated = 0; + for thread in threads { + let num_still_stored = thread.join().unwrap(); + total_left_allocated += num_still_stored as u64; + } + + // Before store is dropped + let num_ctor_calls = ctor_counter.load(Ordering::Acquire); + let num_dtor_calls = dtor_counter.load(Ordering::Acquire); + assert_eq!(num_ctor_calls - total_left_allocated, num_dtor_calls); + + // After store is dropped + drop(store); + let num_dtor_calls = dtor_counter.load(Ordering::Acquire); + assert_eq!(num_ctor_calls, num_dtor_calls); + } + } +} \ No newline at end of file