Changeset - 4341b8d6790f
[Not reviewed]
0 1 0
MH - 4 years ago 2021-12-22 21:48:48
contact@maxhenger.nl
Initial concrete work on the component storage
1 file changed with 215 insertions and 58 deletions:
0 comments (0 inline, 0 general)
src/runtime2/runtime.rs
Show inline comments
 
use std::mem::{size_of, align_of, transmute};
 
use std::alloc::{alloc, dealloc, Layout};
 
use std::sync::Arc;
 
use std::sync::atomic::AtomicU32;
 
use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
 

	
 
use crate::protocol::*;
 

	
 
@@ -58,62 +60,217 @@ impl Runtime {
 
// -----------------------------------------------------------------------------
 

	
 
/// Component storage. Note that it shouldn't be polymorphic, but making it so
 
/// allows us to test it.
 
// Requirements:
 
// 1. Performance "fastness" in order of most important:
 
//      1. Access (should be just index retrieval)
 
//      2. Creation (because we want to execute code as fast as possible)
 
//      3. Destruction (because create-and-run is more important than quick dying)
 
// 2. Somewhat safe, with most performance spent in the incorrect case
 
// 3. Thread-safe. Everyone and their dog will be creating and indexing into
 
//  the components concurrently.
 
// 4. Assume low contention.
 
//
 
// Some trade-offs:
 
// We could perhaps make component IDs just a pointer to that component. With
 
// an atomic counter managed by the runtime containing the number of owners
 
// (always starts at 1). However, this feels like too early to do something like
 
// that, especially because I would like to do direct messaging. Even though
 
// sending two u32s is the same as sending a pointer, it feels wrong for now.
 
//
 
// So instead we'll have some kind of concurrent store where we can index into.
 
// This means that it might have to resize. Resizing implies that everyone must
 
// wait until it is resized.
 
//
 
// Furthermore, it would be nice to reuse slots. That is to say: if we create a
 
// bunch of components and then destroy a couple of them, then the storage we
 
// reserved for them should be reusable.
 
//
 
// We'll go the somewhat simple route for now:
 
// 1. Each component will get allocated individually (and we'll define exactly
 
//  what we mean by this sometime later, when we start with the bytecode). This
 
//  way the components are pointer-stable for their lifetime.
 
// 2. We need to have some array that contains these pointers. We index into
 
//  this array with our IDs.
 
// 3. When we destroy components we call the destructor on the allocated memory
 
//  and add the index to some kind of freelist. Because only one thread can ever
 
//  create and/or destroy a component we have an imaginary lock on that
 
//  particular component's index. The freelist acts like a concurrent stack
 
//  where we can push/pop. If we ensure that the freelist is the same size as
 
//  the ID array then we can never run out of size.
 
// 4. At some point the array ID might be full and have to be resized. If we
 
//  ensure that there is only one thread which can ever fill up the array (this
 
//  means we *always* have one slot free, such that we can do a CAS) then we can
 
//  do a pointer-swap on the base pointer of all storage. This takes care of
 
//  resizing due to creation.
 
//
 
//  However, with a freelist accessed at the same time, we must make sure that
 
//  we do the copying of the old freelist and the old ID array correctly. While
 
//  we're creating the new array we might still be destroying components. So
 
//  one component calls a destructor (not too bad) and then pushes the resulting
 
//  ID onto the freelist stack (which is bad). We can either somehow forbid
 
//  destroying during resizing (which feels ridiculous) or try to be smart. Note
 
//  that destruction might cause later creations as well!
 
//
 
//  Since components might have to read a base pointer anyway to arrive at a
 
//  freelist entry or component pointer, we could set it to null and let the
 
//  others spinlock (or take a mutex?). So then the resizer will notice the
 
//
 
struct CompStore {
 
/// allows us to test it more easily. The container is essentially a
 
/// thread-safe freelist. The list always contains *all* free entries in the
 
/// storage array.
 
///
 
/// The freelist itself is implemented using a thread-safe ringbuffer. But there
 
/// are some very important properties we exploit in this specific
 
/// implementation of a ringbuffer. Note that writing to the ringbuffer (i.e.
 
/// adding to the freelist) corresponds to destroying a component, and reading
 
/// from the ringbuffer corresponds to creating a component. The aforementioned
 
/// properties are: one can never write more to the ringbuffer than has been
 
/// read from it (i.e. destroying more components than are created), we may
 
/// safely assume that when the `CompStore` is dropped that no thread can access
 
/// it (because they've all been shut down). This simplifies deallocation code.
 
///
 
/// Internally each individual instance of `T` will be (de)allocated. So we will
 
/// not store an array of `T`, but an array of `*T`. This keeps the storage of
 
/// `T` pointer-stable (as is required for the schedulers actually running the
 
/// components, because they'll fetch a component and then continue running it
 
/// while this component storage might get reallocated).
 
///
 
/// Note that there is still some unsafety here that is kept in check by the
 
/// owner of this `CompStore`: the `CompId` and `CompKey` system ensures that
 
/// only one mutable reference will ever be obtained, and potentially multiple
 
/// immutable references. But in practice the `&mut T` will be used to access
 
/// so-called "public" fields immutably, and "private" fields mutable. While the
 
/// `&T` will only be used to access the "public" fields immutably.
 
struct CompStore<T: Sized> {
 
    freelist: *mut u32,
 
    data: *mut *mut T,
 
    count: usize,
 
    mask: usize,
 
    byte_size: usize, // used for dealloc
 
    write_head: AtomicUsize,
 
    limit_head: AtomicUsize,
 
    read_head: AtomicUsize,
 
}
 

	
 
impl<T: Sized> CompStore<T> {
 
    fn new(initial_count: usize) -> Self {
 
        // Allocate data
 
        debug_assert!(size_of::<T>() > 0); // No ZST during testing (and definitely not in production)
 
        let (freelist, data, byte_size) = Self::alloc_buffer(initial_count);
 

	
 
        unsafe {
 
            // Init the freelist to all of the indices in the array of data
 
            let mut target = freelist;
 
            for idx in 0..initial_count as u32 {
 
                *target = idx;
 
                target += 1;
 
            }
 

	
 
            // And init the data such that they're all NULL pointers
 
            std::ptr::write_bytes(data, 0, initial_count);
 
        }
 

	
 
        return CompStore{
 
            freelist, data,
 
            count: initial_count,
 
            mask: initial_count - 1,
 
            byte_size,
 
            write_head: AtomicUsize::new(initial_count),
 
            limit_head: AtomicUsize::new(initial_count),
 
            read_head: AtomicUsize::new(0),
 
        };
 
    }
 

	
 
    fn get_index_from_freelist(&self) -> u32 {
 
        let compare_mask = (self.count * 2) - 1;
 

	
 
        'try_loop: loop {
 
            let mut read_index = self.read_head.load(Ordering::Acquire); // read index first
 
            let limit_index = self.limit_head.load(Ordering::Acquire); // limit index second
 

	
 
            // By definition we always have `read_index <= limit_index` (if we would
 
            // have an infinite buffer, in reality we will wrap).
 
            if (read_index & compare_mask) == (limit_index & compare_mask) {
 
                // We need to create a bigger buffer. Note that no reader can
 
                // *ever* set the read index to beyond the limit index, and it
 
                // is currently equal. So we're certain that there is no other
 
                // reader currently updating the read_head.
 
                //
 
                // To test if we are supposed to resize the backing buffer we
 
                // try to increment the limit index by 2*count. Note that the
 
                // stored indices are always in the range [0, 2*count). So if
 
                // we add 2*count to the limit index, then the masked condition
 
                // above still holds! Other potential readers will end up here
 
                // and are allowed to wait until we resized the backing
 
                // container.
 
                //
 
                // Furthermore, setting the limit index to this high value also
 
                // notifies the writer that any of it writes should be tried
 
                // again, as they're writing to a buffer that is going to get
 
                // trashed.
 
                todo!("finish reallocation code");
 
                match self.limit_head.compare_exchange(limit_index, limit_index + 2*self.count, Ordering::SeqCst, Ordering::Acquire) {
 
                    Ok(_) => {
 
                        // Limit index has changed, so we're now the ones that
 
                        // are supposed to resize the
 
                    }
 
                }
 
            } else {
 
                // It seems we have space to read
 
                let preemptive_read = unsafe { *self.freelist.add(read_index & self.mask) };
 
                if self.read_head.compare_exchange(read_index, (read_index + 1) & compare_mask, Ordering::SeqCst, Ordering::Acquire).is_err() {
 
                    // Failed to do the CAS, try again. We need to start at the
 
                    // start again because we might have had other readers that
 
                    // were successful, so at the very least, the preemptive
 
                    // read we did is no longer correct.
 
                    continue 'try_loop;
 
                }
 

	
 
                // We now "own" the value at the read index
 
                return preemptive_read;
 
            }
 
        }
 
    }
 

	
 
    fn put_back_index_into_freelist(&self, index: u32) {
 
        let compare_mask = (self.count * 2) - 1;
 
        'try_loop: loop {
 
            let write_index = self.write_head.load(Ordering::Acquire);
 
            while !self.write_head.compare_exchange(write_index, (write_index + 1) & compare_mask, Ordering::SeqCst, Ordering::Acquire).is_ok() {
 
                // Failed to do the CAS, try again
 
                continue 'try_loop
 
            }
 

	
 
            // We are now the only ones that can write at `write_index`. Try to
 
            // do so
 
            unsafe { *self.freelist.add(write_index & self.mask) = index; }
 

	
 
            // But we still need to move the limit head. Only succesful writers
 
            // may move it so we expect it to move from the `write_index` to
 
            // `write_index + 1`, but we might have to spin to achieve it.
 
            // Furthermore, the `limit_head` is used by the index-retrieval
 
            // function to indicate that a read is in progress.
 
            loop {
 
                todo!("finish reallocation code");
 
                match self.limit_head.compare_exchange(write_index, (write_index + 1) & compare_mask, Ordering::SeqCst, Ordering::Acquire) {
 
                    Ok(_) => break,
 
                    Err(new_value) => {
 
                        // Two options: the limit is not yet what we expect it
 
                        // to be. If so, just try again with the old values.
 
                        // But if it is very large (relatively) then this is the
 
                        // signal from the reader that the entire storage is
 
                        // being resized
 
                    }
 
                }
 
            }
 

	
 
            // We updated the limit head, so we're done :)
 
            return;
 
        }
 
    }
 

	
 
    /// Retrieves a `&T` from the store. This should be retrieved using `create`
 
    /// and not yet given back by calling `destroy`.
 
    fn get(&self, index: u32) -> &T {
 

	
 
    }
 

	
 
    /// Same as `get`, but now returning a mutable `&mut T`. Make sure that you
 
    /// know what you're doing :)
 
    fn get_mut(&self, index: u32) -> &mut T {
 

	
 
    }
 

	
 
    fn alloc_buffer(num: usize) -> (*mut u32, *mut *mut T, usize) {
 
        // Probably overkill considering the amount of memory that is needed to
 
        // exceed this number. But still: ensure `num` adheres to the
 
        // requirements needed for correct functioning of the store.
 
        assert!(
 
            num >= 8 && num <= u32::MAX as usize / 4 && num.is_power_of_two(),
 
            "invalid allocation count for CompStore buffer"
 
        );
 

	
 
        // Compute byte size of freelist (so we assume alignment of `u32`)
 
        let mut byte_size = num * size_of::<u32>();
 

	
 
        // Align to `*mut T`, then reserve space for all of the pointers
 
        byte_size = Self::align_to(byte_size, align_of::<*mut T>());
 
        let byte_offset_data = byte_size;
 
        byte_size += num * size_of::<T>;
 

	
 
        unsafe {
 
            // Allocate, then retrieve pointers to allocated regions
 
            let layout = Self::layout_for(byte_size);
 
            let memory = alloc(layout);
 
            let base_free: *mut u32 = transmute(memory);
 
            let base_data: *mut *mut T = transmute(memory.add(byte_offset_data));
 

	
 
            return (base_free, base_data, byte_size);
 
        }
 
    }
 

	
 
    fn dealloc_buffer(freelist: *mut u32, _data: *mut *mut T, byte_size: usize) {
 
        // Note: we only did one allocation, freelist is at the front
 
        let layout = Self::layout_for(byte_size);
 
        unsafe {
 
            let base: *mut u8 = transmute(freelist);
 
            dealloc(base, layout);
 
        }
 
    }
 

	
 
    fn layout_for(byte_size: usize) -> Layout {
 
        debug_assert!(byte_size % size_of::<u32>() == 0);
 
        return unsafe{ Layout::from_size_align_unchecked(byte_size, align_of::<u32>()) };
 
    }
 

	
 
    fn align_to(offset: usize, alignment: usize) -> usize {
 
        debug_assert!(alignment.is_power_of_two());
 
        let mask = alignment - 1;
 
        return (offset + mask) & !mask;
 
    }
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)