Files @ 4341b8d6790f
Branch filter:

Location: CSY/reowolf/src/runtime2/runtime.rs - annotation

4341b8d6790f 11.6 KiB application/rls-services+xml Show Source Show as Raw Download as Raw
MH
Initial concrete work on the component storage
4341b8d6790f
4341b8d6790f
ccd08a8d8365
4341b8d6790f
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
ccd08a8d8365
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
4341b8d6790f
ccd08a8d8365
use std::mem::{size_of, align_of, transmute};
use std::alloc::{alloc, dealloc, Layout};
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};

use crate::protocol::*;

// -----------------------------------------------------------------------------
// Component
// -----------------------------------------------------------------------------

/// Key to a component. Type system somewhat ensures that there can only be one
/// of these. Only with a key one may retrieve privately-accessible memory for
/// a component. Practically just a generational index, like `CompId` is.
#[derive(Copy, Clone)]
pub(crate) struct CompKey(CompId);

/// Generational ID of a component
#[derive(Copy, Clone)]
pub(crate) struct CompId {
    pub index: u32,
    pub generation: u32,
}

impl PartialEq for CompId {
    fn eq(&self, other: &Self) -> bool {
        return self.index.eq(&other.index);
    }
}
impl Eq for CompId {}

/// In-runtime storage of a component
pub(crate) struct RtComp {

}

// -----------------------------------------------------------------------------
// Runtime
// -----------------------------------------------------------------------------

type RuntimeHandle = Arc<Runtime>;

/// Memory that is maintained by "the runtime". In practice it is maintained by
/// multiple schedulers, and this serves as the common interface to that memory.
pub struct Runtime {
    active_elements: AtomicU32, // active components and APIs (i.e. component creators)
}

impl Runtime {
    pub fn new(num_threads: u32, protocol_description: ProtocolDescription) -> Runtime {
        assert!(num_threads > 0, "need a thread to perform work");
        return Runtime{
            active_elements: AtomicU32::new(0),
        };
    }
}

// -----------------------------------------------------------------------------
// Runtime containers
// -----------------------------------------------------------------------------

/// Component storage. Note that it shouldn't be polymorphic, but making it so
/// allows us to test it more easily. The container is essentially a
/// thread-safe freelist. The list always contains *all* free entries in the
/// storage array.
///
/// The freelist itself is implemented using a thread-safe ringbuffer. But there
/// are some very important properties we exploit in this specific
/// implementation of a ringbuffer. Note that writing to the ringbuffer (i.e.
/// adding to the freelist) corresponds to destroying a component, and reading
/// from the ringbuffer corresponds to creating a component. The aforementioned
/// properties are: one can never write more to the ringbuffer than has been
/// read from it (i.e. destroying more components than are created), we may
/// safely assume that when the `CompStore` is dropped that no thread can access
/// it (because they've all been shut down). This simplifies deallocation code.
///
/// Internally each individual instance of `T` will be (de)allocated. So we will
/// not store an array of `T`, but an array of `*T`. This keeps the storage of
/// `T` pointer-stable (as is required for the schedulers actually running the
/// components, because they'll fetch a component and then continue running it
/// while this component storage might get reallocated).
///
/// Note that there is still some unsafety here that is kept in check by the
/// owner of this `CompStore`: the `CompId` and `CompKey` system ensures that
/// only one mutable reference will ever be obtained, and potentially multiple
/// immutable references. But in practice the `&mut T` will be used to access
/// so-called "public" fields immutably, and "private" fields mutable. While the
/// `&T` will only be used to access the "public" fields immutably.
struct CompStore<T: Sized> {
    freelist: *mut u32,
    data: *mut *mut T,
    count: usize,
    mask: usize,
    byte_size: usize, // used for dealloc
    write_head: AtomicUsize,
    limit_head: AtomicUsize,
    read_head: AtomicUsize,
}

impl<T: Sized> CompStore<T> {
    fn new(initial_count: usize) -> Self {
        // Allocate data
        debug_assert!(size_of::<T>() > 0); // No ZST during testing (and definitely not in production)
        let (freelist, data, byte_size) = Self::alloc_buffer(initial_count);

        unsafe {
            // Init the freelist to all of the indices in the array of data
            let mut target = freelist;
            for idx in 0..initial_count as u32 {
                *target = idx;
                target += 1;
            }

            // And init the data such that they're all NULL pointers
            std::ptr::write_bytes(data, 0, initial_count);
        }

        return CompStore{
            freelist, data,
            count: initial_count,
            mask: initial_count - 1,
            byte_size,
            write_head: AtomicUsize::new(initial_count),
            limit_head: AtomicUsize::new(initial_count),
            read_head: AtomicUsize::new(0),
        };
    }

    fn get_index_from_freelist(&self) -> u32 {
        let compare_mask = (self.count * 2) - 1;

        'try_loop: loop {
            let mut read_index = self.read_head.load(Ordering::Acquire); // read index first
            let limit_index = self.limit_head.load(Ordering::Acquire); // limit index second

            // By definition we always have `read_index <= limit_index` (if we would
            // have an infinite buffer, in reality we will wrap).
            if (read_index & compare_mask) == (limit_index & compare_mask) {
                // We need to create a bigger buffer. Note that no reader can
                // *ever* set the read index to beyond the limit index, and it
                // is currently equal. So we're certain that there is no other
                // reader currently updating the read_head.
                //
                // To test if we are supposed to resize the backing buffer we
                // try to increment the limit index by 2*count. Note that the
                // stored indices are always in the range [0, 2*count). So if
                // we add 2*count to the limit index, then the masked condition
                // above still holds! Other potential readers will end up here
                // and are allowed to wait until we resized the backing
                // container.
                //
                // Furthermore, setting the limit index to this high value also
                // notifies the writer that any of it writes should be tried
                // again, as they're writing to a buffer that is going to get
                // trashed.
                todo!("finish reallocation code");
                match self.limit_head.compare_exchange(limit_index, limit_index + 2*self.count, Ordering::SeqCst, Ordering::Acquire) {
                    Ok(_) => {
                        // Limit index has changed, so we're now the ones that
                        // are supposed to resize the
                    }
                }
            } else {
                // It seems we have space to read
                let preemptive_read = unsafe { *self.freelist.add(read_index & self.mask) };
                if self.read_head.compare_exchange(read_index, (read_index + 1) & compare_mask, Ordering::SeqCst, Ordering::Acquire).is_err() {
                    // Failed to do the CAS, try again. We need to start at the
                    // start again because we might have had other readers that
                    // were successful, so at the very least, the preemptive
                    // read we did is no longer correct.
                    continue 'try_loop;
                }

                // We now "own" the value at the read index
                return preemptive_read;
            }
        }
    }

    fn put_back_index_into_freelist(&self, index: u32) {
        let compare_mask = (self.count * 2) - 1;
        'try_loop: loop {
            let write_index = self.write_head.load(Ordering::Acquire);
            while !self.write_head.compare_exchange(write_index, (write_index + 1) & compare_mask, Ordering::SeqCst, Ordering::Acquire).is_ok() {
                // Failed to do the CAS, try again
                continue 'try_loop
            }

            // We are now the only ones that can write at `write_index`. Try to
            // do so
            unsafe { *self.freelist.add(write_index & self.mask) = index; }

            // But we still need to move the limit head. Only succesful writers
            // may move it so we expect it to move from the `write_index` to
            // `write_index + 1`, but we might have to spin to achieve it.
            // Furthermore, the `limit_head` is used by the index-retrieval
            // function to indicate that a read is in progress.
            loop {
                todo!("finish reallocation code");
                match self.limit_head.compare_exchange(write_index, (write_index + 1) & compare_mask, Ordering::SeqCst, Ordering::Acquire) {
                    Ok(_) => break,
                    Err(new_value) => {
                        // Two options: the limit is not yet what we expect it
                        // to be. If so, just try again with the old values.
                        // But if it is very large (relatively) then this is the
                        // signal from the reader that the entire storage is
                        // being resized
                    }
                }
            }

            // We updated the limit head, so we're done :)
            return;
        }
    }

    /// Retrieves a `&T` from the store. This should be retrieved using `create`
    /// and not yet given back by calling `destroy`.
    fn get(&self, index: u32) -> &T {

    }

    /// Same as `get`, but now returning a mutable `&mut T`. Make sure that you
    /// know what you're doing :)
    fn get_mut(&self, index: u32) -> &mut T {

    }

    fn alloc_buffer(num: usize) -> (*mut u32, *mut *mut T, usize) {
        // Probably overkill considering the amount of memory that is needed to
        // exceed this number. But still: ensure `num` adheres to the
        // requirements needed for correct functioning of the store.
        assert!(
            num >= 8 && num <= u32::MAX as usize / 4 && num.is_power_of_two(),
            "invalid allocation count for CompStore buffer"
        );

        // Compute byte size of freelist (so we assume alignment of `u32`)
        let mut byte_size = num * size_of::<u32>();

        // Align to `*mut T`, then reserve space for all of the pointers
        byte_size = Self::align_to(byte_size, align_of::<*mut T>());
        let byte_offset_data = byte_size;
        byte_size += num * size_of::<T>;

        unsafe {
            // Allocate, then retrieve pointers to allocated regions
            let layout = Self::layout_for(byte_size);
            let memory = alloc(layout);
            let base_free: *mut u32 = transmute(memory);
            let base_data: *mut *mut T = transmute(memory.add(byte_offset_data));

            return (base_free, base_data, byte_size);
        }
    }

    fn dealloc_buffer(freelist: *mut u32, _data: *mut *mut T, byte_size: usize) {
        // Note: we only did one allocation, freelist is at the front
        let layout = Self::layout_for(byte_size);
        unsafe {
            let base: *mut u8 = transmute(freelist);
            dealloc(base, layout);
        }
    }

    fn layout_for(byte_size: usize) -> Layout {
        debug_assert!(byte_size % size_of::<u32>() == 0);
        return unsafe{ Layout::from_size_align_unchecked(byte_size, align_of::<u32>()) };
    }

    fn align_to(offset: usize, alignment: usize) -> usize {
        debug_assert!(alignment.is_power_of_two());
        let mask = alignment - 1;
        return (offset + mask) & !mask;
    }
}