From 1aef293674a6c82352fd7fa9e61fb6cf22699d2c 2021-10-01 09:55:56 From: mh Date: 2021-10-01 09:55:56 Subject: [PATCH] experimenting with multithreaded scheduler sync primitives --- diff --git a/src/collections/mod.rs b/src/collections/mod.rs index 9afc8ee863a525c4d0abefd3ed1d592aa02be875..bdb02b45a4a07748023fa8cfcbd0183264d441ae 100644 --- a/src/collections/mod.rs +++ b/src/collections/mod.rs @@ -1,10 +1,14 @@ mod string_pool; mod scoped_buffer; mod sets; +mod mpmc_queue; +mod raw_vec; // TODO: Finish this later, use alloc::alloc and alloc::Layout // mod freelist; pub(crate) use string_pool::{StringPool, StringRef}; pub(crate) use scoped_buffer::{ScopedBuffer, ScopedSection}; -pub(crate) use sets::DequeSet; \ No newline at end of file +pub(crate) use sets::DequeSet; +pub(crate) use mpmc_queue::MpmcQueue; +pub(crate) use raw_vec::RawVec; \ No newline at end of file diff --git a/src/collections/mpmc_queue.rs b/src/collections/mpmc_queue.rs new file mode 100644 index 0000000000000000000000000000000000000000..41e21da3830e9feef648a4f20eb70e05716045fc --- /dev/null +++ b/src/collections/mpmc_queue.rs @@ -0,0 +1,31 @@ +use std::sync::Mutex; +use std::collections::VecDeque; + +/// Generic multiple-producer, multiple-consumer queue. Current implementation +/// has the required functionality, without all of the optimizations. +/// TODO: @Optimize +pub struct MpmcQueue { + queue: Mutex>, +} + +impl MpmcQueue { + pub fn new() -> Self { + Self::with_capacity(0) + } + + pub fn with_capacity(capacity: usize) -> Self { + Self{ + queue: Mutex::new(VecDeque::with_capacity(capacity)), + } + } + + pub fn push_back(&self, item: T) { + let mut queue = self.queue.lock().unwrap(); + queue.push_back(item); + } + + pub fn pop_front(&self) -> Option { + let mut queue = self.queue.lock().unwrap(); + return queue.pop_front(); + } +} \ No newline at end of file diff --git a/src/collections/raw_vec.rs b/src/collections/raw_vec.rs new file mode 100644 index 0000000000000000000000000000000000000000..89ca6c2ac19bfc99e5ec1dad7609213e0dd2df70 --- /dev/null +++ b/src/collections/raw_vec.rs @@ -0,0 +1,124 @@ +use std::{mem, ptr, cmp}; +use std::alloc::{Layout, alloc, dealloc}; + +#[derive(Debug)] +enum AllocError { + CapacityOverflow, +} + +/// Generic raw vector. It has a base pointer, a capacity and a length. Basic +/// operations are supported, but the user of the structure is responsible for +/// ensuring that no illegal mutable access occurs. +/// A lot of the logic is simply stolen from the std lib. The destructor will +/// free the backing memory, but will not run any destructors. +pub struct RawVec { + base: *mut T, + cap: usize, + len: usize, +} + +impl RawVec { + const T_ALIGNMENT: usize = mem::align_of::(); + const T_SIZE: usize = mem::size_of::(); + + const GROWTH_RATE: usize = 2; + + pub fn new() -> Self { + Self{ + base: ptr::null_mut(), + cap: 0, + len: 0, + } + } + + pub fn with_capacity(capacity: usize) -> Self { + // Could be done a bit more efficiently + let mut result = Self::new(); + result.ensure_space(capacity); + return result; + } + + pub unsafe fn get(&self, idx: usize) -> *const T { + debug_assert!(idx < self.len); + return self.base.add(idx); + } + + pub unsafe fn get_mut(&self, idx: usize) -> *mut T { + debug_assert!(idx < self.len); + return self.base.add(idx); + } + + pub fn push(&mut self, item: T) { + self.ensure_space(1); + unsafe { + let target = self.base.add(self.len); + std::ptr::write(target, item); + self.len += 1; + } + } + + pub fn len(&self) -> usize { + return self.len; + } + + fn ensure_space(&mut self, additional: usize) -> Result<(), AllocError>{ + debug_assert!(Self::T_SIZE != 0); + debug_assert!(self.cap >= self.len); + if self.cap - self.len < additional { + // Need to resize. Note that due to all checked conditions we have + // that new_cap >= 1. + debug_assert!(additional > 0); + let new_cap = self.len.checked_add(additional).unwrap(); + let new_cap = cmp::max(new_cap, self.cap * Self::GROWTH_RATE); + + let layout = Layout::array::(new_cap) + .map_err(|_| AllocError::CapacityOverflow)?; + debug_assert_eq!(new_cap * Self::T_SIZE, layout.size()); + + unsafe { + // Allocate new storage, transfer bits, deallocate old store + let new_base = alloc(layout); + + if self.cap > 0 { + let old_base = self.base as *mut u8; + let (old_size, old_layout) = self.current_layout(); + + ptr::copy_nonoverlapping(new_base, old_base, old_size); + dealloc(old_base, old_layout); + } + + self.base = new_base; + self.cap = new_cap; + } + } // else: still enough space + + return Ok(()); + } + + #[inline] + fn current_layout(&self) -> (usize, Layout) { + debug_assert!(Self::T_SIZE > 0); + let old_size = self.cap * Self::T_SIZE; + unsafe { + return ( + old_size, + Layout::from_size_align_unchecked(old_size, Self::T_ALIGNMENT) + ); + } + } +} + +impl Drop for RawVec { + fn drop(&mut self) { + if self.cap > 0 { + debug_assert!(!self.base.is_null()); + let (_, layout) = self.current_layout(); + unsafe { + dealloc(self.base, layout); + if cfg!(debug_assertions) { + self.base = ptr::null_mut(); + } + } + } + } +} \ No newline at end of file diff --git a/src/runtime2/global_store.rs b/src/runtime2/global_store.rs new file mode 100644 index 0000000000000000000000000000000000000000..ca7b4ea548b598fca2e83d3555780fed381fbefb --- /dev/null +++ b/src/runtime2/global_store.rs @@ -0,0 +1,132 @@ +use crate::collections::{MpmcQueue, RawVec}; + +use super::connector::Connector; + +use std::ptr; +use std::sync::RwLock; + +/// A kind of token that, once obtained, allows access to a container. +struct ConnectorKey { + index: u32, // of connector +} + +struct ConnectorStore { + connectors: RawVec<*mut Connector>, + free: Vec, +} + +impl ConnectorStore { + fn with_capacity(capacity: usize) -> Self { + Self{ + connectors: RawVec::with_capacity(capacity), + free: Vec::with_capacity(capacity), + } + } + + fn get_mut(&self, key: &ConnectorKey) -> &'static mut Connector { + unsafe { + let connector = self.connectors.get_mut(key.index as usize); + debug_assert!(!connector.is_null()); + return *connector as &mut _; + } + } + + fn create(&mut self, connector: Connector) -> ConnectorKey { + let index; + if self.free.is_empty() { + let connector = Box::into_raw(Box::new(connector)); + + unsafe { + // Cheating a bit here. Anyway, move to heap, store in list + index = self.connectors.len(); + self.connectors.push(connector); + } + } else { + index = self.free.pop().unwrap(); + + unsafe { + let target = self.connectors.get_mut(index); + debug_assert!(!target.is_null()); + ptr::write(*target, connector); + } + } + + return ConnectorKey{ index: index as u32 }; + } + + fn destroy(&mut self, key: ConnectorKey) { + unsafe { + let connector = self.connectors.get_mut(key.index as usize); + ptr::drop_in_place(*connector); + // Note: but not deallocating! + } + + self.free.push(key.index as usize); + } +} + +impl Drop for ConnectorStore { + fn drop(&mut self) { + for idx in 0..self.connectors.len() { + unsafe { + let memory = *self.connectors.get_mut(idx); + let boxed = Box::from_raw(memory); // takes care of deallocation + } + } + } +} + +/// Global store of connectors, ports and queues that are used by the sceduler +/// threads. The global store has the appearance of a thread-safe datatype, but +/// one needs to be careful using it. +/// +/// The intention of this data structure is to enforce the rules: +/// TODO: @docs +pub struct GlobalStore { + connector_queue: MpmcQueue, + connectors: RwLock, +} + +impl GlobalStore { + pub fn new() -> Self { + Self{ + connector_queue: MpmcQueue::with_capacity(256), + connectors: RwLock::new(ConnectorStore::with_capacity(256)), + } + } + + // Taking connectors out of global queue + + pub fn pop_key(&self) -> Option { + return self.connector_queue.pop_front(); + } + + pub fn push_key(&self, key: ConnectorKey) { + self.connector_queue.push_back(key); + } + + // Creating, retrieving and destroying connectors + + /// Retrieves a connector using the provided key. Note that the returned + /// reference is not truly static, the `GlobalStore` needs to stay alive. + pub fn get_connector(&self, key: &ConnectorKey) -> &'static mut Connector { + let connectors = self.connectors.read().unwrap(); + return connectors.get_mut(key); + } + + /// Adds a connector to the global system. Will also queue it to run + pub fn add_connector(&self, connector: Connector) { + let key = { + let mut connectors = self.connectors.write().unwrap(); + connectors.create(connector) + }; + + self.connector_queue.push_back(key); + } + + /// Destroys a connector + pub fn destroy_connector(&self, key: ConnectorKey) { + let mut connectors = self.connectors.write().unwrap(); + connectors.destroy(key); + } +} \ No newline at end of file diff --git a/src/runtime2/mod.rs b/src/runtime2/mod.rs index d892bc50a2c1a8aca763cfb2c14e1d399cfb38f3..4eb25b841315eaafaa4f67f5c519212bc1067f9a 100644 --- a/src/runtime2/mod.rs +++ b/src/runtime2/mod.rs @@ -1,5 +1,7 @@ mod runtime; mod messages; mod connector; +mod global_store; +mod scheduler; #[cfg(test)] mod tests; diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs new file mode 100644 index 0000000000000000000000000000000000000000..9a8c1e0ac1360c8ae4c57f91db609de354e3b1cc --- /dev/null +++ b/src/runtime2/scheduler.rs @@ -0,0 +1,13 @@ +use std::sync::Condvar; + +use super::global_store::GlobalStore; + +struct Scheduler<'g> { + global: &'g GlobalStore, +} + +impl<'g> Scheduler<'g> { + pub fn new(store: &'g GlobalStore) { + + } +} \ No newline at end of file