Files @ 4341b8d6790f
Branch filter:

Location: CSY/reowolf/src/runtime2/runtime.rs

4341b8d6790f 11.6 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
MH
Initial concrete work on the component storage
use std::mem::{size_of, align_of, transmute};
use std::alloc::{alloc, dealloc, Layout};
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};

use crate::protocol::*;

// -----------------------------------------------------------------------------
// Component
// -----------------------------------------------------------------------------

/// Key to a component. Type system somewhat ensures that there can only be one
/// of these. Only with a key one may retrieve privately-accessible memory for
/// a component. Practically just a generational index, like `CompId` is.
#[derive(Copy, Clone)]
pub(crate) struct CompKey(CompId);

/// Generational ID of a component
#[derive(Copy, Clone)]
pub(crate) struct CompId {
    pub index: u32,
    pub generation: u32,
}

impl PartialEq for CompId {
    fn eq(&self, other: &Self) -> bool {
        return self.index.eq(&other.index);
    }
}
impl Eq for CompId {}

/// In-runtime storage of a component
pub(crate) struct RtComp {

}

// -----------------------------------------------------------------------------
// Runtime
// -----------------------------------------------------------------------------

type RuntimeHandle = Arc<Runtime>;

/// Memory that is maintained by "the runtime". In practice it is maintained by
/// multiple schedulers, and this serves as the common interface to that memory.
pub struct Runtime {
    active_elements: AtomicU32, // active components and APIs (i.e. component creators)
}

impl Runtime {
    pub fn new(num_threads: u32, protocol_description: ProtocolDescription) -> Runtime {
        assert!(num_threads > 0, "need a thread to perform work");
        return Runtime{
            active_elements: AtomicU32::new(0),
        };
    }
}

// -----------------------------------------------------------------------------
// Runtime containers
// -----------------------------------------------------------------------------

/// Component storage. Note that it shouldn't be polymorphic, but making it so
/// allows us to test it more easily. The container is essentially a
/// thread-safe freelist. The list always contains *all* free entries in the
/// storage array.
///
/// The freelist itself is implemented using a thread-safe ringbuffer. But there
/// are some very important properties we exploit in this specific
/// implementation of a ringbuffer. Note that writing to the ringbuffer (i.e.
/// adding to the freelist) corresponds to destroying a component, and reading
/// from the ringbuffer corresponds to creating a component. The aforementioned
/// properties are: one can never write more to the ringbuffer than has been
/// read from it (i.e. destroying more components than are created), we may
/// safely assume that when the `CompStore` is dropped that no thread can access
/// it (because they've all been shut down). This simplifies deallocation code.
///
/// Internally each individual instance of `T` will be (de)allocated. So we will
/// not store an array of `T`, but an array of `*T`. This keeps the storage of
/// `T` pointer-stable (as is required for the schedulers actually running the
/// components, because they'll fetch a component and then continue running it
/// while this component storage might get reallocated).
///
/// Note that there is still some unsafety here that is kept in check by the
/// owner of this `CompStore`: the `CompId` and `CompKey` system ensures that
/// only one mutable reference will ever be obtained, and potentially multiple
/// immutable references. But in practice the `&mut T` will be used to access
/// so-called "public" fields immutably, and "private" fields mutable. While the
/// `&T` will only be used to access the "public" fields immutably.
struct CompStore<T: Sized> {
    freelist: *mut u32,
    data: *mut *mut T,
    count: usize,
    mask: usize,
    byte_size: usize, // used for dealloc
    write_head: AtomicUsize,
    limit_head: AtomicUsize,
    read_head: AtomicUsize,
}

impl<T: Sized> CompStore<T> {
    fn new(initial_count: usize) -> Self {
        // Allocate data
        debug_assert!(size_of::<T>() > 0); // No ZST during testing (and definitely not in production)
        let (freelist, data, byte_size) = Self::alloc_buffer(initial_count);

        unsafe {
            // Init the freelist to all of the indices in the array of data
            let mut target = freelist;
            for idx in 0..initial_count as u32 {
                *target = idx;
                target += 1;
            }

            // And init the data such that they're all NULL pointers
            std::ptr::write_bytes(data, 0, initial_count);
        }

        return CompStore{
            freelist, data,
            count: initial_count,
            mask: initial_count - 1,
            byte_size,
            write_head: AtomicUsize::new(initial_count),
            limit_head: AtomicUsize::new(initial_count),
            read_head: AtomicUsize::new(0),
        };
    }

    fn get_index_from_freelist(&self) -> u32 {
        let compare_mask = (self.count * 2) - 1;

        'try_loop: loop {
            let mut read_index = self.read_head.load(Ordering::Acquire); // read index first
            let limit_index = self.limit_head.load(Ordering::Acquire); // limit index second

            // By definition we always have `read_index <= limit_index` (if we would
            // have an infinite buffer, in reality we will wrap).
            if (read_index & compare_mask) == (limit_index & compare_mask) {
                // We need to create a bigger buffer. Note that no reader can
                // *ever* set the read index to beyond the limit index, and it
                // is currently equal. So we're certain that there is no other
                // reader currently updating the read_head.
                //
                // To test if we are supposed to resize the backing buffer we
                // try to increment the limit index by 2*count. Note that the
                // stored indices are always in the range [0, 2*count). So if
                // we add 2*count to the limit index, then the masked condition
                // above still holds! Other potential readers will end up here
                // and are allowed to wait until we resized the backing
                // container.
                //
                // Furthermore, setting the limit index to this high value also
                // notifies the writer that any of it writes should be tried
                // again, as they're writing to a buffer that is going to get
                // trashed.
                todo!("finish reallocation code");
                match self.limit_head.compare_exchange(limit_index, limit_index + 2*self.count, Ordering::SeqCst, Ordering::Acquire) {
                    Ok(_) => {
                        // Limit index has changed, so we're now the ones that
                        // are supposed to resize the
                    }
                }
            } else {
                // It seems we have space to read
                let preemptive_read = unsafe { *self.freelist.add(read_index & self.mask) };
                if self.read_head.compare_exchange(read_index, (read_index + 1) & compare_mask, Ordering::SeqCst, Ordering::Acquire).is_err() {
                    // Failed to do the CAS, try again. We need to start at the
                    // start again because we might have had other readers that
                    // were successful, so at the very least, the preemptive
                    // read we did is no longer correct.
                    continue 'try_loop;
                }

                // We now "own" the value at the read index
                return preemptive_read;
            }
        }
    }

    fn put_back_index_into_freelist(&self, index: u32) {
        let compare_mask = (self.count * 2) - 1;
        'try_loop: loop {
            let write_index = self.write_head.load(Ordering::Acquire);
            while !self.write_head.compare_exchange(write_index, (write_index + 1) & compare_mask, Ordering::SeqCst, Ordering::Acquire).is_ok() {
                // Failed to do the CAS, try again
                continue 'try_loop
            }

            // We are now the only ones that can write at `write_index`. Try to
            // do so
            unsafe { *self.freelist.add(write_index & self.mask) = index; }

            // But we still need to move the limit head. Only succesful writers
            // may move it so we expect it to move from the `write_index` to
            // `write_index + 1`, but we might have to spin to achieve it.
            // Furthermore, the `limit_head` is used by the index-retrieval
            // function to indicate that a read is in progress.
            loop {
                todo!("finish reallocation code");
                match self.limit_head.compare_exchange(write_index, (write_index + 1) & compare_mask, Ordering::SeqCst, Ordering::Acquire) {
                    Ok(_) => break,
                    Err(new_value) => {
                        // Two options: the limit is not yet what we expect it
                        // to be. If so, just try again with the old values.
                        // But if it is very large (relatively) then this is the
                        // signal from the reader that the entire storage is
                        // being resized
                    }
                }
            }

            // We updated the limit head, so we're done :)
            return;
        }
    }

    /// Retrieves a `&T` from the store. This should be retrieved using `create`
    /// and not yet given back by calling `destroy`.
    fn get(&self, index: u32) -> &T {

    }

    /// Same as `get`, but now returning a mutable `&mut T`. Make sure that you
    /// know what you're doing :)
    fn get_mut(&self, index: u32) -> &mut T {

    }

    fn alloc_buffer(num: usize) -> (*mut u32, *mut *mut T, usize) {
        // Probably overkill considering the amount of memory that is needed to
        // exceed this number. But still: ensure `num` adheres to the
        // requirements needed for correct functioning of the store.
        assert!(
            num >= 8 && num <= u32::MAX as usize / 4 && num.is_power_of_two(),
            "invalid allocation count for CompStore buffer"
        );

        // Compute byte size of freelist (so we assume alignment of `u32`)
        let mut byte_size = num * size_of::<u32>();

        // Align to `*mut T`, then reserve space for all of the pointers
        byte_size = Self::align_to(byte_size, align_of::<*mut T>());
        let byte_offset_data = byte_size;
        byte_size += num * size_of::<T>;

        unsafe {
            // Allocate, then retrieve pointers to allocated regions
            let layout = Self::layout_for(byte_size);
            let memory = alloc(layout);
            let base_free: *mut u32 = transmute(memory);
            let base_data: *mut *mut T = transmute(memory.add(byte_offset_data));

            return (base_free, base_data, byte_size);
        }
    }

    fn dealloc_buffer(freelist: *mut u32, _data: *mut *mut T, byte_size: usize) {
        // Note: we only did one allocation, freelist is at the front
        let layout = Self::layout_for(byte_size);
        unsafe {
            let base: *mut u8 = transmute(freelist);
            dealloc(base, layout);
        }
    }

    fn layout_for(byte_size: usize) -> Layout {
        debug_assert!(byte_size % size_of::<u32>() == 0);
        return unsafe{ Layout::from_size_align_unchecked(byte_size, align_of::<u32>()) };
    }

    fn align_to(offset: usize, alignment: usize) -> usize {
        debug_assert!(alignment.is_power_of_two());
        let mask = alignment - 1;
        return (offset + mask) & !mask;
    }
}