diff --git a/src/runtime2/runtime.rs b/src/runtime2/runtime.rs index 767088eb2a1eed792272aab2326cf3e5881f29c4..0d8f13909ebaee5567dfdb99a6022e5d3bc58ba0 100644 --- a/src/runtime2/runtime.rs +++ b/src/runtime2/runtime.rs @@ -1,10 +1,12 @@ -use std::mem::{size_of, align_of, transmute}; -use std::alloc::{alloc, dealloc, Layout}; -use std::sync::Arc; -use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex, Condvar}; +use std::sync::atomic::{AtomicU32, AtomicBool, Ordering}; +use std::collections::VecDeque; use crate::protocol::*; +use super::component::{CompCtx, CompPDL}; +use super::store::ComponentStore; + // ----------------------------------------------------------------------------- // Component // ----------------------------------------------------------------------------- @@ -12,37 +14,77 @@ use crate::protocol::*; /// Key to a component. Type system somewhat ensures that there can only be one /// of these. Only with a key one may retrieve privately-accessible memory for /// a component. Practically just a generational index, like `CompId` is. -#[derive(Copy, Clone)] -pub(crate) struct CompKey(CompId); +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub(crate) struct CompKey(u32); -/// Generational ID of a component -#[derive(Copy, Clone)] -pub(crate) struct CompId { - pub index: u32, - pub generation: u32, +impl CompKey { + pub(crate) fn downgrade(&self) -> CompId { + return CompId(self.0); + } } -impl PartialEq for CompId { - fn eq(&self, other: &Self) -> bool { - return self.index.eq(&other.index); +/// Generational ID of a component +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub struct CompId(u32); + +impl CompId { + pub(crate) fn new_invalid() -> CompId { + return CompId(u32::MAX); + } + + /// Upgrade component ID to component key. Unsafe because the caller needs + /// to make sure that only one component key can exist at a time (to ensure + /// a component can only be scheduled/executed by one thread). + pub(crate) unsafe fn upgrade(&self) -> CompKey { + return CompKey(self.0); } } -impl Eq for CompId {} /// In-runtime storage of a component -pub(crate) struct RtComp { +pub(crate) struct RuntimeComp { + pub public: CompPublic, + pub private: CompPrivate, +} + +/// Should contain everything that is accessible in a thread-safe manner +pub(crate) struct CompPublic { + pub sleeping: AtomicBool, + pub num_handles: AtomicU32, // modified upon creating/dropping `CompHandle` instances +} +/// Handle to public part of a component. +pub(crate) struct CompHandle { + target: *const CompPublic, +} + +impl std::ops::Deref for CompHandle { + type Target = CompPublic; + + fn deref(&self) -> &Self::Target { + return unsafe{ &*self.target }; + } +} + +/// May contain non thread-safe fields. Accessed only by the scheduler which +/// will temporarily "own" the component. +pub(crate) struct CompPrivate { + pub code: CompPDL, + pub ctx: CompCtx, } // ----------------------------------------------------------------------------- // Runtime // ----------------------------------------------------------------------------- -type RuntimeHandle = Arc; +pub type RuntimeHandle = Arc; /// Memory that is maintained by "the runtime". In practice it is maintained by /// multiple schedulers, and this serves as the common interface to that memory. pub struct Runtime { + pub protocol: ProtocolDescription, + components: ComponentStore, + work_queue: Mutex>, + work_condvar: Condvar, active_elements: AtomicU32, // active components and APIs (i.e. component creators) } @@ -50,253 +92,71 @@ impl Runtime { pub fn new(num_threads: u32, protocol_description: ProtocolDescription) -> Runtime { assert!(num_threads > 0, "need a thread to perform work"); return Runtime{ + protocol: protocol_description, + components: ComponentStore::new(128), + work_queue: Mutex::new(VecDeque::with_capacity(128)), + work_condvar: Condvar::new(), active_elements: AtomicU32::new(0), }; } -} - -// ----------------------------------------------------------------------------- -// Runtime containers -// ----------------------------------------------------------------------------- - -/// Component storage. Note that it shouldn't be polymorphic, but making it so -/// allows us to test it more easily. The container is essentially a -/// thread-safe freelist. The list always contains *all* free entries in the -/// storage array. -/// -/// The freelist itself is implemented using a thread-safe ringbuffer. But there -/// are some very important properties we exploit in this specific -/// implementation of a ringbuffer. Note that writing to the ringbuffer (i.e. -/// adding to the freelist) corresponds to destroying a component, and reading -/// from the ringbuffer corresponds to creating a component. The aforementioned -/// properties are: one can never write more to the ringbuffer than has been -/// read from it (i.e. destroying more components than are created), we may -/// safely assume that when the `CompStore` is dropped that no thread can access -/// it (because they've all been shut down). This simplifies deallocation code. -/// -/// Internally each individual instance of `T` will be (de)allocated. So we will -/// not store an array of `T`, but an array of `*T`. This keeps the storage of -/// `T` pointer-stable (as is required for the schedulers actually running the -/// components, because they'll fetch a component and then continue running it -/// while this component storage might get reallocated). -/// -/// Note that there is still some unsafety here that is kept in check by the -/// owner of this `CompStore`: the `CompId` and `CompKey` system ensures that -/// only one mutable reference will ever be obtained, and potentially multiple -/// immutable references. But in practice the `&mut T` will be used to access -/// so-called "public" fields immutably, and "private" fields mutable. While the -/// `&T` will only be used to access the "public" fields immutably. -struct CompStore { - freelist: *mut u32, - data: *mut *mut T, - count: usize, - mask: usize, - byte_size: usize, // used for dealloc - write_head: AtomicUsize, - limit_head: AtomicUsize, - read_head: AtomicUsize, -} - -const fn compute_realloc_flag() -> usize { - match size_of::() { - 4 => return 1 << 31, // 32-bit system - 8 => return 1 << 63, // 64-bit system - _ => panic!("unexpected byte size for 'usize'") - } -} - -impl CompStore { - const REALLOC_FLAG: usize = compute_realloc_flag(); - - fn new(initial_count: usize) -> Self { - // Allocate data - debug_assert!(size_of::() > 0); // No ZST during testing (and definitely not in production) - let (freelist, data, byte_size) = Self::alloc_buffer(initial_count); - unsafe { - // Init the freelist to all of the indices in the array of data - let mut target = freelist; - for idx in 0..initial_count as u32 { - *target = idx; - target += 1; - } + // Scheduling and retrieving work - // And init the data such that they're all NULL pointers - std::ptr::write_bytes(data, 0, initial_count); + pub(crate) fn take_work(&self) -> Option { + let mut lock = self.work_queue.lock().unwrap(); + while lock.is_empty() && self.active_elements.load(Ordering::Acquire) != 0 { + lock = self.work_condvar.wait(lock).unwrap(); } - return CompStore{ - freelist, data, - count: initial_count, - mask: initial_count - 1, - byte_size, - write_head: AtomicUsize::new(initial_count), - limit_head: AtomicUsize::new(initial_count), - read_head: AtomicUsize::new(0), - }; + return lock.pop_front(); } - fn get_index_from_freelist(&self) -> u32 { - let compare_mask = (self.count * 2) - 1; - let mut read_index = self.read_head.load(Ordering::Acquire); // read index first - - 'try_loop: loop { - let limit_index = self.limit_head.load(Ordering::Acquire); // limit index second - - // By definition we always have `read_index <= limit_index` (if we would - // have an infinite buffer, in reality we will wrap). - if (read_index & compare_mask) == (limit_index & compare_mask) { - // We need to create a bigger buffer. Note that no reader can - // *ever* set the read index to beyond the limit index, and it - // is currently equal. So we're certain that there is no other - // reader currently updating the read_head. - // - // To test if we are supposed to resize the backing buffer we - // try set the REALLOC_FLAG on the limit index. Note that the - // stored indices are always in the range [0, 2*count). So if - // we add REALLOC_FLAG to the limit index, then the masked - // condition above still holds! Other potential readers will end - // up here and are allowed to wait until we resized the backing - // container. - // - // Furthermore, setting the limit index to this high value also - // notifies the writer that any of it writes should be tried - // again, as they're writing to a buffer that is going to get - // trashed. - todo!("finish reallocation code"); - match self.limit_head.compare_exchange(limit_index, limit_index | Self::REALLOC_FLAG, Ordering::SeqCst, Ordering::Acquire) { - Ok(_) => { - // Limit index has changed, so we're now the ones that - // are supposed to resize the - } - } - } else { - // It seems we have space to read - let preemptive_read = unsafe { *self.freelist.add(read_index & self.mask) }; - if let Err(new_read_index) = self.read_head.compare_exchange(read_index, (read_index + 1) & compare_mask, Ordering::SeqCst, Ordering::Acquire) { - // Failed to do the CAS, try again. We need to start at the - // start again because we might have had other readers that - // were successful, so at the very least, the preemptive - // read we did is no longer correct. - read_index = new_read_index; - continue 'try_loop; - } - - // We now "own" the value at the read index - return preemptive_read; - } - } + pub(crate) fn enqueue_work(&self, key: CompKey) { + let mut lock = self.work_queue.lock().unwrap(); + lock.push_back(key); + self.work_condvar.notify_one(); } - fn put_back_index_into_freelist(&self, index: u32) { - let mut compare_mask = (self.count * 2) - 1; - let mut write_index = self.write_head.load(Ordering::Acquire); - while let Err(new_write_index) = self.write_head.compare_exchange(write_index, (write_index + 1) & compare_mask, Ordering::SeqCst, Ordering::Acquire) { - // Failed to do the CAS, try again - write_index = new_write_index; - } - - 'try_write_loop: loop { - // We are now the only ones that can write at `write_index`. Try to - // do so - unsafe { *self.freelist.add(write_index & self.mask) = index; } - - // But we still need to move the limit head. Only succesful writers - // may move it so we expect it to move from the `write_index` to - // `write_index + 1`, but we might have to spin to achieve it. - // Furthermore, the `limit_head` is used by the index-retrieval - // function to indicate that a read is in progress. - 'commit_to_write_loop: loop { - match self.limit_head.compare_exchange(write_index, (write_index + 1) & compare_mask, Ordering::SeqCst, Ordering::Acquire) { - Ok(_) => break, - Err(new_value) => { - // Two options: the limit is not yet what we expect it - // to be. If so, just try again with the old values. - // But if it is very large (relatively) then this is the - // signal from the reader that the entire storage is - // being resized - if new_value & Self::REALLOC_FLAG != 0 { - // Someone is resizing, wait until that is no longer - // true. - while self.limit_head.load(Ordering::Acquire) & Self::REALLOC_FLAG != 0 { - // still resizing - } - - // Not resizing anymore, try everything again, our - // old write has now become invalid. But our index - // hasn't! So we need to finish our write and our - // increment of the limit head - continue 'try_write_loop; - } else { - // Just try again - continue 'commit_to_write_loop; - } - } + // Creating/destroying components + + pub(crate) fn create_pdl_component(&self, comp: CompPDL, initially_sleeping: bool) -> CompKey { + let comp = RuntimeComp{ + public: CompPublic{ + sleeping: AtomicBool::new(initially_sleeping), + num_handles: AtomicU32::new(1), // the component itself acts like a handle + }, + private: CompPrivate{ + code: comp, + ctx: CompCtx{ + id: CompId(0), + ports: Vec::new(), + peers: Vec::new(), + messages: Vec::new(), } } + }; - // We updated the limit head, so we're done :) - return; - } - } - - /// Retrieves a `&T` from the store. This should be retrieved using `create` - /// and not yet given back by calling `destroy`. - fn get(&self, index: u32) -> &T { + let index = self.components.create(comp); - } - - /// Same as `get`, but now returning a mutable `&mut T`. Make sure that you - /// know what you're doing :) - fn get_mut(&self, index: u32) -> &mut T { + // TODO: just do a reserve_index followed by a consume_index or something + self.components.get_mut(index).private.ctx.id = CompId(index); + return CompKey(index); } - fn alloc_buffer(num: usize) -> (*mut u32, *mut *mut T, usize) { - // Probably overkill considering the amount of memory that is needed to - // exceed this number. But still: ensure `num` adheres to the - // requirements needed for correct functioning of the store. - assert!( - num >= 8 && num <= u32::MAX as usize / 4 && num.is_power_of_two(), - "invalid allocation count for CompStore buffer" - ); - - // Compute byte size of freelist (so we assume alignment of `u32`) - let mut byte_size = num * size_of::(); - - // Align to `*mut T`, then reserve space for all of the pointers - byte_size = Self::align_to(byte_size, align_of::<*mut T>()); - let byte_offset_data = byte_size; - byte_size += num * size_of::; - - unsafe { - // Allocate, then retrieve pointers to allocated regions - let layout = Self::layout_for(byte_size); - let memory = alloc(layout); - let base_free: *mut u32 = transmute(memory); - let base_data: *mut *mut T = transmute(memory.add(byte_offset_data)); - - return (base_free, base_data, byte_size); - } + pub(crate) fn get_component(&self, key: CompKey) -> &mut RuntimeComp { + let component = self.components.get_mut(key.0); + return component; } - fn dealloc_buffer(freelist: *mut u32, _data: *mut *mut T, byte_size: usize) { - // Note: we only did one allocation, freelist is at the front - let layout = Self::layout_for(byte_size); - unsafe { - let base: *mut u8 = transmute(freelist); - dealloc(base, layout); - } + pub(crate) fn get_component_public(&self, id: CompId) -> &CompPublic { + let component = self.components.get(id.0); + return &component.public; } - fn layout_for(byte_size: usize) -> Layout { - debug_assert!(byte_size % size_of::() == 0); - return unsafe{ Layout::from_size_align_unchecked(byte_size, align_of::()) }; + pub(crate) fn destroy_component(&self, key: CompKey) { + self.components.destroy(key.0); } - fn align_to(offset: usize, alignment: usize) -> usize { - debug_assert!(alignment.is_power_of_two()); - let mask = alignment - 1; - return (offset + mask) & !mask; - } -} \ No newline at end of file + // Interacting with components +}