Files @ cf26538b25dc
Branch filter:

Location: CSY/reowolf/src/runtime2/global_store.rs

cf26538b25dc 7.8 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
MH
architecture for send/recv ports in place
use crate::collections::{MpmcQueue, RawVec};

use super::connector::{Connector, ConnectorPublic};
use super::port::{PortIdLocal, Port, PortKind, PortOwnership, Channel};

use std::ptr;
use std::sync::{RwLock, RwLockReadGuard};

/// A kind of token that, once obtained, allows access to a container.
struct ConnectorKey {
    index: u32, // of connector
}

/// The registry containing all connectors. The idea here is that when someone
/// owns a `ConnectorKey`, then one has unique access to that connector.
/// Otherwise one has shared access.
///
/// This datastructure is built to be wrapped in a RwLock.
struct ConnectorStore {
    inner: RwLock<ConnectorStoreInner>,
}

struct ConnectorStoreInner {
    connectors: RawVec<*mut Connector>,
    free: Vec<usize>,
}

impl ConnectorStore {
    fn with_capacity(capacity: usize) -> Self {
        return Self{
            inner: RwLock::new(ConnectorStoreInner {
                connectors: RawVec::with_capacity(capacity),
                free: Vec::with_capacity(capacity),
            }),
        };
    }

    /// Retrieves the shared members of the connector.
    pub(crate) fn get_shared(&self, connector_id: u32) -> &'static ConnectorPublic {
        let lock = self.inner.read().unwrap();

        unsafe {
            let connector = lock.connectors.get(connector_id as usize);
            debug_assert!(!connector.is_null());
            return &*connector.public;
        }
    }

    /// Retrieves a particular connector. Only the thread that pulled the
    /// associated key out of the execution queue should (be able to) call this.
    pub(crate) fn get_mut(&self, key: &ConnectorKey) -> &'static mut Connector {
        let lock = self.inner.read().unwrap();

        unsafe {
            let connector = lock.connectors.get_mut(key.index as usize);
            debug_assert!(!connector.is_null());
            return *connector as &mut _;
        }
    }

    /// Create a new connector, returning the key that can be used to retrieve
    /// and/or queue it.
    pub(crate) fn create(&self, connector: Connector) -> ConnectorKey {
        let lock = self.inner.write().unwrap();

        let index;
        if lock.free.is_empty() {
            let connector = Box::into_raw(Box::new(connector));

            unsafe {
                // Cheating a bit here. Anyway, move to heap, store in list
                index = lock.connectors.len();
                lock.connectors.push(connector);
            }
        } else {
            index = lock.free.pop().unwrap();

            unsafe {
                let target = lock.connectors.get_mut(index);
                debug_assert!(!target.is_null());
                ptr::write(*target, connector);
            }
        }

        return ConnectorKey{ index: index as u32 };
    }

    pub(crate) fn destroy(&self, key: ConnectorKey) {
        let lock = self.inner.write().unwrap();

        unsafe {
            let connector = lock.connectors.get_mut(key.index as usize);
            ptr::drop_in_place(*connector);
            // Note: but not deallocating!
        }

        lock.free.push(key.index as usize);
    }
}

impl Drop for ConnectorStore {
    fn drop(&mut self) {
        let lock = self.inner.write().unwrap();

        for idx in 0..lock.connectors.len() {
            unsafe {
                let memory = *lock.connectors.get_mut(idx);
                let _ = Box::from_raw(memory); // takes care of deallocation
            }
        }
    }
}

/// The registry of all ports
pub struct PortStore {
    inner: RwLock<PortStoreInner>,
}

struct PortStoreInner {
    ports: RawVec<Port>,
    free: Vec<usize>,
}

impl PortStore {
    fn with_capacity(capacity: usize) -> Self {
        Self{
            inner: RwLock::new(PortStoreInner{
                ports: RawVec::with_capacity(capacity),
                free: Vec::with_capacity(capacity),
            }),
        }
    }

    pub(crate) fn get(&self, key: &ConnectorKey, port_id: PortIdLocal) -> PortRef {
        let lock = self.inner.read().unwrap();
        debug_assert!(port_id.is_valid());

        unsafe {
            let port = lock.ports.get_mut(port_id.index as usize);
            let port = &mut *port;
            debug_assert_eq!(port.owning_connector_id, key.index); // race condition (if they are not equal, which should never happen), better than nothing

            return PortRef{ lock, port };
        }
    }

    pub(crate) fn create_channel(&self, creating_connector: Option<u32>) -> Channel {
        let mut lock = self.inner.write().unwrap();

        // Reserves a new port. Doesn't point it to its counterpart
        fn reserve_port(lock: &mut std::sync::RwLockWriteGuard<'_, PortStoreInner>, kind: PortKind, creating_connector: Option<u32>) -> u32 {
            let index;
            let ownership = if creating_connector.is_some() { PortOwnership::Owned } else { PortOwnership::Unowned };
            let connector_id = creating_connector.unwrap_or(0);

            if lock.free.is_empty() {
                index = lock.ports.len() as u32;
                lock.ports.push(Port{
                    self_id: PortIdLocal::new(index),
                    peer_id: PortIdLocal::new_invalid(),
                    kind,
                    ownership,
                    owning_connector: connector_id,
                    peer_connector: connector_id
                });
            } else {
                index = lock.free.pop().unwrap() as u32;
                let port = unsafe{ &mut *lock.ports.get_mut(index as usize) };

                port.peer_id = PortIdLocal::new_invalid();
                port.kind = kind;
                port.ownership = ownership;
                port.owning_connector = connector_id;
                port.peer_connector = connector_id;
            }

            return index;
        }

        // Create the ports
        let putter_id = reserve_port(&mut lock, PortKind::Putter, creating_connector);
        let getter_id = reserve_port(&mut lock, PortKind::Getter, creating_connector);
        debug_assert_ne!(putter_id, getter_id);

        // Point them to one another
        unsafe {
            let putter_port = &mut *lock.ports.get_mut(putter_id as usize);
            let getter_port = &mut *lock.ports.get_mut(getter_id as usize);
            putter_port.peer_id = getter_port.self_id;
            getter_port.peer_id = putter_port.self_id;
        }

        return Channel{ putter_id, getter_id }
    }
}

pub struct PortRef<'p> {
    lock: RwLockReadGuard<'p, PortStoreInner>,
    port: &'static mut Port,
}

impl<'p> std::ops::Deref for PortRef<'p> {
    type Target = Port;

    fn deref(&self) -> &Self::Target {
        return self.port;
    }
}

impl<'p> std::ops::DerefMut for PortRef<'p> {
    fn deref_mut(&mut self) -> &mut Self::Target {
        return self.port;
    }
}

impl Drop for PortStore {
    fn drop(&mut self) {
        let lock = self.inner.write().unwrap();

        // Very lazy code
        for idx in 0..lock.ports.len() {
            if lock.free.contains(&idx) {
                continue;
            }

            unsafe {
                let port = lock.ports.get_mut(idx);
                std::ptr::drop_in_place(port);
            }
        }
    }
}

/// Global store of connectors, ports and queues that are used by the sceduler
/// threads. The global store has the appearance of a thread-safe datatype, but
/// one needs to be careful using it.
///
/// TODO: @docs
/// TODO: @Optimize, very lazy implementation of concurrent datastructures.
pub struct GlobalStore {
    pub connector_queue: MpmcQueue<ConnectorKey>,
    pub connectors: ConnectorStore,
    pub ports: PortStore,
}

impl GlobalStore {
    pub fn new() -> Self {
        Self{
            connector_queue: MpmcQueue::with_capacity(256),
            connectors: ConnectorStore::with_capacity(256),
            ports: PortStore::with_capacity(256),
        }
    }
}