Changeset - 1aef293674a6
[Not reviewed]
0 2 4
mh - 4 years ago 2021-10-01 09:55:56
contact@maxhenger.nl
experimenting with multithreaded scheduler sync primitives
6 files changed with 307 insertions and 1 deletions:
0 comments (0 inline, 0 general)
src/collections/mod.rs
Show inline comments
 
mod string_pool;
 
mod scoped_buffer;
 
mod sets;
 
mod mpmc_queue;
 
mod raw_vec;
 

	
 
// TODO: Finish this later, use alloc::alloc and alloc::Layout
 
// mod freelist;
 

	
 
pub(crate) use string_pool::{StringPool, StringRef};
 
pub(crate) use scoped_buffer::{ScopedBuffer, ScopedSection};
 
pub(crate) use sets::DequeSet;
 
\ No newline at end of file
 
pub(crate) use sets::DequeSet;
 
pub(crate) use mpmc_queue::MpmcQueue;
 
pub(crate) use raw_vec::RawVec;
 
\ No newline at end of file
src/collections/mpmc_queue.rs
Show inline comments
 
new file 100644
 
use std::sync::Mutex;
 
use std::collections::VecDeque;
 

	
 
/// Generic multiple-producer, multiple-consumer queue. Current implementation
 
/// has the required functionality, without all of the optimizations.
 
/// TODO: @Optimize
 
pub struct MpmcQueue<T: Sized> {
 
    queue: Mutex<VecDeque<T>>,
 
}
 

	
 
impl<T: Sized> MpmcQueue<T> {
 
    pub fn new() -> Self {
 
        Self::with_capacity(0)
 
    }
 

	
 
    pub fn with_capacity(capacity: usize) -> Self {
 
        Self{
 
            queue: Mutex::new(VecDeque::with_capacity(capacity)),
 
        }
 
    }
 

	
 
    pub fn push_back(&self, item: T) {
 
        let mut queue = self.queue.lock().unwrap();
 
        queue.push_back(item);
 
    }
 

	
 
    pub fn pop_front(&self) -> Option<T> {
 
        let mut queue = self.queue.lock().unwrap();
 
        return queue.pop_front();
 
    }
 
}
 
\ No newline at end of file
src/collections/raw_vec.rs
Show inline comments
 
new file 100644
 
use std::{mem, ptr, cmp};
 
use std::alloc::{Layout, alloc, dealloc};
 

	
 
#[derive(Debug)]
 
enum AllocError {
 
    CapacityOverflow,
 
}
 

	
 
/// Generic raw vector. It has a base pointer, a capacity and a length. Basic
 
/// operations are supported, but the user of the structure is responsible for
 
/// ensuring that no illegal mutable access occurs.
 
/// A lot of the logic is simply stolen from the std lib. The destructor will
 
/// free the backing memory, but will not run any destructors.
 
pub struct RawVec<T: Sized> {
 
    base: *mut T,
 
    cap: usize,
 
    len: usize,
 
}
 

	
 
impl<T: Sized> RawVec<T> {
 
    const T_ALIGNMENT: usize = mem::align_of::<T>();
 
    const T_SIZE: usize = mem::size_of::<T>();
 
    
 
    const GROWTH_RATE: usize = 2;
 

	
 
    pub fn new() -> Self {
 
        Self{
 
            base: ptr::null_mut(),
 
            cap: 0,
 
            len: 0,
 
        }
 
    }
 

	
 
    pub fn with_capacity(capacity: usize) -> Self {
 
        // Could be done a bit more efficiently
 
        let mut result = Self::new();
 
        result.ensure_space(capacity);
 
        return result;
 
    }
 

	
 
    pub unsafe fn get(&self, idx: usize) -> *const T {
 
        debug_assert!(idx < self.len);
 
        return self.base.add(idx);
 
    }
 

	
 
    pub unsafe fn get_mut(&self, idx: usize) -> *mut T {
 
        debug_assert!(idx < self.len);
 
        return self.base.add(idx);
 
    }
 

	
 
    pub fn push(&mut self, item: T) {
 
        self.ensure_space(1);
 
        unsafe {
 
            let target = self.base.add(self.len);
 
            std::ptr::write(target, item);
 
            self.len += 1;
 
        }
 
    }
 

	
 
    pub fn len(&self) -> usize {
 
        return self.len;
 
    }
 

	
 
    fn ensure_space(&mut self, additional: usize) -> Result<(), AllocError>{
 
        debug_assert!(Self::T_SIZE != 0);
 
        debug_assert!(self.cap >= self.len);
 
        if self.cap - self.len < additional {
 
            // Need to resize. Note that due to all checked conditions we have
 
            // that new_cap >= 1.
 
            debug_assert!(additional > 0);
 
            let new_cap = self.len.checked_add(additional).unwrap();
 
            let new_cap = cmp::max(new_cap, self.cap * Self::GROWTH_RATE);
 
            
 
            let layout = Layout::array::<T>(new_cap)
 
                .map_err(|_| AllocError::CapacityOverflow)?;
 
            debug_assert_eq!(new_cap * Self::T_SIZE, layout.size());
 

	
 
            unsafe {
 
                // Allocate new storage, transfer bits, deallocate old store
 
                let new_base = alloc(layout);
 

	
 
                if self.cap > 0 {
 
                    let old_base = self.base as *mut u8;
 
                    let (old_size, old_layout) = self.current_layout();
 

	
 
                    ptr::copy_nonoverlapping(new_base, old_base, old_size);
 
                    dealloc(old_base, old_layout);
 
                }
 

	
 
                self.base = new_base;
 
                self.cap = new_cap;
 
            }
 
        } // else: still enough space
 

	
 
        return Ok(());
 
    }
 

	
 
    #[inline]
 
    fn current_layout(&self) -> (usize, Layout) {
 
        debug_assert!(Self::T_SIZE > 0);
 
        let old_size = self.cap * Self::T_SIZE;
 
        unsafe {
 
            return (
 
                old_size,
 
                Layout::from_size_align_unchecked(old_size, Self::T_ALIGNMENT)
 
            );
 
        }
 
    }
 
}
 

	
 
impl<T: Sized> Drop for RawVec<T> {
 
    fn drop(&mut self) {
 
        if self.cap > 0 {
 
            debug_assert!(!self.base.is_null());
 
            let (_, layout) = self.current_layout();
 
            unsafe {
 
                dealloc(self.base, layout);
 
                if cfg!(debug_assertions) {
 
                    self.base = ptr::null_mut();
 
                }
 
            }
 
        }
 
    }
 
}
 
\ No newline at end of file
src/runtime2/global_store.rs
Show inline comments
 
new file 100644
 
use crate::collections::{MpmcQueue, RawVec};
 

	
 
use super::connector::Connector;
 

	
 
use std::ptr;
 
use std::sync::RwLock;
 

	
 
/// A kind of token that, once obtained, allows access to a container.
 
struct ConnectorKey {
 
    index: u32, // of connector
 
}
 

	
 
struct ConnectorStore {
 
    connectors: RawVec<*mut Connector>,
 
    free: Vec<usize>,
 
}
 

	
 
impl ConnectorStore {
 
    fn with_capacity(capacity: usize) -> Self {
 
        Self{
 
            connectors: RawVec::with_capacity(capacity),
 
            free: Vec::with_capacity(capacity),
 
        }
 
    }
 

	
 
    fn get_mut(&self, key: &ConnectorKey) -> &'static mut Connector {
 
        unsafe {
 
            let connector = self.connectors.get_mut(key.index as usize);
 
            debug_assert!(!connector.is_null());
 
            return *connector as &mut _;
 
        }
 
    }
 

	
 
    fn create(&mut self, connector: Connector) -> ConnectorKey {
 
        let index;
 
        if self.free.is_empty() {
 
            let connector = Box::into_raw(Box::new(connector));
 

	
 
            unsafe {
 
                // Cheating a bit here. Anyway, move to heap, store in list
 
                index = self.connectors.len();
 
                self.connectors.push(connector);
 
            }
 
        } else {
 
            index = self.free.pop().unwrap();
 

	
 
            unsafe {
 
                let target = self.connectors.get_mut(index);
 
                debug_assert!(!target.is_null());
 
                ptr::write(*target, connector);
 
            }
 
        }
 

	
 
        return ConnectorKey{ index: index as u32 };
 
    }
 

	
 
    fn destroy(&mut self, key: ConnectorKey) {
 
        unsafe {
 
            let connector = self.connectors.get_mut(key.index as usize);
 
            ptr::drop_in_place(*connector);
 
            // Note: but not deallocating!
 
        }
 

	
 
        self.free.push(key.index as usize);
 
    }
 
}
 

	
 
impl Drop for ConnectorStore {
 
    fn drop(&mut self) {
 
        for idx in 0..self.connectors.len() {
 
            unsafe {
 
                let memory = *self.connectors.get_mut(idx);
 
                let boxed = Box::from_raw(memory); // takes care of deallocation
 
            }
 
        }
 
    }
 
}
 

	
 
/// Global store of connectors, ports and queues that are used by the sceduler
 
/// threads. The global store has the appearance of a thread-safe datatype, but
 
/// one needs to be careful using it.
 
///
 
/// The intention of this data structure is to enforce the rules:
 
/// TODO: @docs
 
pub struct GlobalStore {
 
    connector_queue: MpmcQueue<ConnectorKey>,
 
    connectors: RwLock<ConnectorStore>,
 
}
 

	
 
impl GlobalStore {
 
    pub fn new() -> Self {
 
        Self{
 
            connector_queue: MpmcQueue::with_capacity(256),
 
            connectors: RwLock::new(ConnectorStore::with_capacity(256)),
 
        }
 
    }
 

	
 
    // Taking connectors out of global queue
 

	
 
    pub fn pop_key(&self) -> Option<ConnectorKey> {
 
        return self.connector_queue.pop_front();
 
    }
 

	
 
    pub fn push_key(&self, key: ConnectorKey) {
 
        self.connector_queue.push_back(key);
 
    }
 

	
 
    // Creating, retrieving and destroying connectors
 

	
 
    /// Retrieves a connector using the provided key. Note that the returned
 
    /// reference is not truly static, the `GlobalStore` needs to stay alive.
 
    pub fn get_connector(&self, key: &ConnectorKey) -> &'static mut Connector {
 
        let connectors = self.connectors.read().unwrap();
 
        return connectors.get_mut(key);
 
    }
 

	
 
    /// Adds a connector to the global system. Will also queue it to run
 
    pub fn add_connector(&self, connector: Connector) {
 
        let key = {
 
            let mut connectors = self.connectors.write().unwrap();
 
            connectors.create(connector)
 
        };
 

	
 
        self.connector_queue.push_back(key);
 
    }
 

	
 
    /// Destroys a connector
 
    pub fn destroy_connector(&self, key: ConnectorKey) {
 
        let mut connectors = self.connectors.write().unwrap();
 
        connectors.destroy(key);
 
    }
 
}
 
\ No newline at end of file
src/runtime2/mod.rs
Show inline comments
 
mod runtime;
 
mod messages;
 
mod connector;
 
mod global_store;
 
mod scheduler;
 

	
 
#[cfg(test)] mod tests;
src/runtime2/scheduler.rs
Show inline comments
 
new file 100644
 
use std::sync::Condvar;
 

	
 
use super::global_store::GlobalStore;
 

	
 
struct Scheduler<'g> {
 
    global: &'g GlobalStore,
 
}
 

	
 
impl<'g> Scheduler<'g> {
 
    pub fn new(store: &'g GlobalStore) {
 

	
 
    }
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)