use std::mem::{size_of, align_of, transmute}; use std::alloc::{alloc, dealloc, Layout}; use std::sync::Arc; use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering}; use crate::protocol::*; // ----------------------------------------------------------------------------- // Component // ----------------------------------------------------------------------------- /// Key to a component. Type system somewhat ensures that there can only be one /// of these. Only with a key one may retrieve privately-accessible memory for /// a component. Practically just a generational index, like `CompId` is. #[derive(Copy, Clone)] pub(crate) struct CompKey(CompId); /// Generational ID of a component #[derive(Copy, Clone)] pub(crate) struct CompId { pub index: u32, pub generation: u32, } impl PartialEq for CompId { fn eq(&self, other: &Self) -> bool { return self.index.eq(&other.index); } } impl Eq for CompId {} /// In-runtime storage of a component pub(crate) struct RtComp { } // ----------------------------------------------------------------------------- // Runtime // ----------------------------------------------------------------------------- type RuntimeHandle = Arc; /// Memory that is maintained by "the runtime". In practice it is maintained by /// multiple schedulers, and this serves as the common interface to that memory. pub struct Runtime { active_elements: AtomicU32, // active components and APIs (i.e. component creators) } impl Runtime { pub fn new(num_threads: u32, protocol_description: ProtocolDescription) -> Runtime { assert!(num_threads > 0, "need a thread to perform work"); return Runtime{ active_elements: AtomicU32::new(0), }; } } // ----------------------------------------------------------------------------- // Runtime containers // ----------------------------------------------------------------------------- /// Component storage. Note that it shouldn't be polymorphic, but making it so /// allows us to test it more easily. The container is essentially a /// thread-safe freelist. The list always contains *all* free entries in the /// storage array. /// /// The freelist itself is implemented using a thread-safe ringbuffer. But there /// are some very important properties we exploit in this specific /// implementation of a ringbuffer. Note that writing to the ringbuffer (i.e. /// adding to the freelist) corresponds to destroying a component, and reading /// from the ringbuffer corresponds to creating a component. The aforementioned /// properties are: one can never write more to the ringbuffer than has been /// read from it (i.e. destroying more components than are created), we may /// safely assume that when the `CompStore` is dropped that no thread can access /// it (because they've all been shut down). This simplifies deallocation code. /// /// Internally each individual instance of `T` will be (de)allocated. So we will /// not store an array of `T`, but an array of `*T`. This keeps the storage of /// `T` pointer-stable (as is required for the schedulers actually running the /// components, because they'll fetch a component and then continue running it /// while this component storage might get reallocated). /// /// Note that there is still some unsafety here that is kept in check by the /// owner of this `CompStore`: the `CompId` and `CompKey` system ensures that /// only one mutable reference will ever be obtained, and potentially multiple /// immutable references. But in practice the `&mut T` will be used to access /// so-called "public" fields immutably, and "private" fields mutable. While the /// `&T` will only be used to access the "public" fields immutably. struct CompStore { freelist: *mut u32, data: *mut *mut T, count: usize, mask: usize, byte_size: usize, // used for dealloc write_head: AtomicUsize, limit_head: AtomicUsize, read_head: AtomicUsize, } const fn compute_realloc_flag() -> usize { match size_of::() { 4 => return 1 << 31, // 32-bit system 8 => return 1 << 63, // 64-bit system _ => panic!("unexpected byte size for 'usize'") } } impl CompStore { const REALLOC_FLAG: usize = compute_realloc_flag(); fn new(initial_count: usize) -> Self { // Allocate data debug_assert!(size_of::() > 0); // No ZST during testing (and definitely not in production) let (freelist, data, byte_size) = Self::alloc_buffer(initial_count); unsafe { // Init the freelist to all of the indices in the array of data let mut target = freelist; for idx in 0..initial_count as u32 { *target = idx; target += 1; } // And init the data such that they're all NULL pointers std::ptr::write_bytes(data, 0, initial_count); } return CompStore{ freelist, data, count: initial_count, mask: initial_count - 1, byte_size, write_head: AtomicUsize::new(initial_count), limit_head: AtomicUsize::new(initial_count), read_head: AtomicUsize::new(0), }; } fn get_index_from_freelist(&self) -> u32 { let compare_mask = (self.count * 2) - 1; let mut read_index = self.read_head.load(Ordering::Acquire); // read index first 'try_loop: loop { let limit_index = self.limit_head.load(Ordering::Acquire); // limit index second // By definition we always have `read_index <= limit_index` (if we would // have an infinite buffer, in reality we will wrap). if (read_index & compare_mask) == (limit_index & compare_mask) { // We need to create a bigger buffer. Note that no reader can // *ever* set the read index to beyond the limit index, and it // is currently equal. So we're certain that there is no other // reader currently updating the read_head. // // To test if we are supposed to resize the backing buffer we // try set the REALLOC_FLAG on the limit index. Note that the // stored indices are always in the range [0, 2*count). So if // we add REALLOC_FLAG to the limit index, then the masked // condition above still holds! Other potential readers will end // up here and are allowed to wait until we resized the backing // container. // // Furthermore, setting the limit index to this high value also // notifies the writer that any of it writes should be tried // again, as they're writing to a buffer that is going to get // trashed. todo!("finish reallocation code"); match self.limit_head.compare_exchange(limit_index, limit_index | Self::REALLOC_FLAG, Ordering::SeqCst, Ordering::Acquire) { Ok(_) => { // Limit index has changed, so we're now the ones that // are supposed to resize the } } } else { // It seems we have space to read let preemptive_read = unsafe { *self.freelist.add(read_index & self.mask) }; if let Err(new_read_index) = self.read_head.compare_exchange(read_index, (read_index + 1) & compare_mask, Ordering::SeqCst, Ordering::Acquire) { // Failed to do the CAS, try again. We need to start at the // start again because we might have had other readers that // were successful, so at the very least, the preemptive // read we did is no longer correct. read_index = new_read_index; continue 'try_loop; } // We now "own" the value at the read index return preemptive_read; } } } fn put_back_index_into_freelist(&self, index: u32) { let mut compare_mask = (self.count * 2) - 1; let mut write_index = self.write_head.load(Ordering::Acquire); while let Err(new_write_index) = self.write_head.compare_exchange(write_index, (write_index + 1) & compare_mask, Ordering::SeqCst, Ordering::Acquire) { // Failed to do the CAS, try again write_index = new_write_index; } 'try_write_loop: loop { // We are now the only ones that can write at `write_index`. Try to // do so unsafe { *self.freelist.add(write_index & self.mask) = index; } // But we still need to move the limit head. Only succesful writers // may move it so we expect it to move from the `write_index` to // `write_index + 1`, but we might have to spin to achieve it. // Furthermore, the `limit_head` is used by the index-retrieval // function to indicate that a read is in progress. 'commit_to_write_loop: loop { match self.limit_head.compare_exchange(write_index, (write_index + 1) & compare_mask, Ordering::SeqCst, Ordering::Acquire) { Ok(_) => break, Err(new_value) => { // Two options: the limit is not yet what we expect it // to be. If so, just try again with the old values. // But if it is very large (relatively) then this is the // signal from the reader that the entire storage is // being resized if new_value & Self::REALLOC_FLAG != 0 { // Someone is resizing, wait until that is no longer // true. while self.limit_head.load(Ordering::Acquire) & Self::REALLOC_FLAG != 0 { // still resizing } // Not resizing anymore, try everything again, our // old write has now become invalid. But our index // hasn't! So we need to finish our write and our // increment of the limit head continue 'try_write_loop; } else { // Just try again continue 'commit_to_write_loop; } } } } // We updated the limit head, so we're done :) return; } } /// Retrieves a `&T` from the store. This should be retrieved using `create` /// and not yet given back by calling `destroy`. fn get(&self, index: u32) -> &T { } /// Same as `get`, but now returning a mutable `&mut T`. Make sure that you /// know what you're doing :) fn get_mut(&self, index: u32) -> &mut T { } fn alloc_buffer(num: usize) -> (*mut u32, *mut *mut T, usize) { // Probably overkill considering the amount of memory that is needed to // exceed this number. But still: ensure `num` adheres to the // requirements needed for correct functioning of the store. assert!( num >= 8 && num <= u32::MAX as usize / 4 && num.is_power_of_two(), "invalid allocation count for CompStore buffer" ); // Compute byte size of freelist (so we assume alignment of `u32`) let mut byte_size = num * size_of::(); // Align to `*mut T`, then reserve space for all of the pointers byte_size = Self::align_to(byte_size, align_of::<*mut T>()); let byte_offset_data = byte_size; byte_size += num * size_of::; unsafe { // Allocate, then retrieve pointers to allocated regions let layout = Self::layout_for(byte_size); let memory = alloc(layout); let base_free: *mut u32 = transmute(memory); let base_data: *mut *mut T = transmute(memory.add(byte_offset_data)); return (base_free, base_data, byte_size); } } fn dealloc_buffer(freelist: *mut u32, _data: *mut *mut T, byte_size: usize) { // Note: we only did one allocation, freelist is at the front let layout = Self::layout_for(byte_size); unsafe { let base: *mut u8 = transmute(freelist); dealloc(base, layout); } } fn layout_for(byte_size: usize) -> Layout { debug_assert!(byte_size % size_of::() == 0); return unsafe{ Layout::from_size_align_unchecked(byte_size, align_of::()) }; } fn align_to(offset: usize, alignment: usize) -> usize { debug_assert!(alignment.is_power_of_two()); let mask = alignment - 1; return (offset + mask) & !mask; } }