Files @ 58dfabd1be9f
Branch filter:

Location: CSY/reowolf/src/runtime2/global_store.rs

58dfabd1be9f 9.6 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
MH
moving to laptop
use crate::collections::{MpmcQueue, RawVec};

use super::connector::{ConnectorPDL, ConnectorPublic};
use super::port::{PortIdLocal, Port, PortKind, PortOwnership, Channel};
use super::inbox::PublicInbox;
use super::scheduler::Router;

use std::ptr;
use std::sync::{Barrier, RwLock, RwLockReadGuard};
use std::sync::atomic::AtomicBool;
use crate::runtime2::native::Connector;

/// A kind of token that, once obtained, allows mutable access to a connector.
/// We're trying to use move semantics as much as possible: the owner of this
/// key is the only one that may execute the connector's code.
pub(crate) struct ConnectorKey {
    pub index: u32, // of connector
}

impl ConnectorKey {
    /// Downcasts the `ConnectorKey` type, which can be used to obtain mutable
    /// access, to a "regular ID" which can be used to obtain immutable access.
    #[inline]
    pub fn downcast(&self) -> ConnectorId {
        return ConnectorId(self.index);
    }

    /// Turns the `ConnectorId` into a `ConnectorKey`, marked as unsafe as it
    /// bypasses the type-enforced `ConnectorKey`/`ConnectorId` system
    #[inline]
    pub unsafe fn from_id(id: ConnectorId) -> ConnectorKey {
        return ConnectorKey{ index: id.0 };
    }
}

/// A kind of token that allows shared access to a connector. Multiple threads
/// may hold this
#[derive(Copy, Clone)]
pub(crate) struct ConnectorId(u32);

impl ConnectorId {
    // TODO: Like the other `new_invalid`, maybe remove
    #[inline]
    pub fn new_invalid() -> ConnectorId {
        return ConnectorId(u32::MAX);
    }
}

// TODO: Change this, I hate this. But I also don't want to put `public` and
//  `router` of `ScheduledConnector` back into `Connector`.
pub enum ConnectorVariant {
    UserDefined(ConnectorPDL),
    Native(Box<dyn Connector>),
}

pub struct ScheduledConnector {
    pub connector: ConnectorVariant,
    pub public: ConnectorPublic,
    pub router: Router
}

/// The registry containing all connectors. The idea here is that when someone
/// owns a `ConnectorKey`, then one has unique access to that connector.
/// Otherwise one has shared access.
///
/// This datastructure is built to be wrapped in a RwLock.
struct ConnectorStore {
    inner: RwLock<ConnectorStoreInner>,
}

struct ConnectorStoreInner {
    connectors: RawVec<*mut ScheduledConnector>,
    free: Vec<usize>,
}

impl ConnectorStore {
    fn with_capacity(capacity: usize) -> Self {
        return Self{
            inner: RwLock::new(ConnectorStoreInner {
                connectors: RawVec::with_capacity(capacity),
                free: Vec::with_capacity(capacity),
            }),
        };
    }

    /// Retrieves the shared members of the connector.
    pub(crate) fn get_shared(&self, connector_id: ConnectorId) -> &'static ConnectorPublic {
        let lock = self.inner.read().unwrap();

        unsafe {
            let connector = lock.connectors.get(connector_id.0 as usize);
            debug_assert!(!connector.is_null());
            return &*connector.public;
        }
    }

    /// Retrieves a particular connector. Only the thread that pulled the
    /// associated key out of the execution queue should (be able to) call this.
    pub(crate) fn get_mut(&self, key: &ConnectorKey) -> &'static mut ScheduledConnector {
        let lock = self.inner.read().unwrap();

        unsafe {
            let connector = lock.connectors.get_mut(key.index as usize);
            debug_assert!(!connector.is_null());
            return *connector as &mut _;
        }
    }

    /// Create a new connector, returning the key that can be used to retrieve
    /// and/or queue it.
    pub(crate) fn create(&self, connector: ConnectorVariant) -> ConnectorKey {
        let lock = self.inner.write().unwrap();
        let connector = ScheduledConnector{
            connector,
            public: ConnectorPublic::new(),
            router: Router::new(),
        };

        let index;
        if lock.free.is_empty() {
            let connector = Box::into_raw(Box::new(connector));

            unsafe {
                // Cheating a bit here. Anyway, move to heap, store in list
                index = lock.connectors.len();
                lock.connectors.push(connector);
            }
        } else {
            index = lock.free.pop().unwrap();

            unsafe {
                let target = lock.connectors.get_mut(index);
                debug_assert!(!target.is_null());
                ptr::write(*target, connector);
            }
        }

        return ConnectorKey{ index: index as u32 };
    }

    pub(crate) fn destroy(&self, key: ConnectorKey) {
        let lock = self.inner.write().unwrap();

        unsafe {
            let connector = lock.connectors.get_mut(key.index as usize);
            ptr::drop_in_place(*connector);
            // Note: but not deallocating!
        }

        lock.free.push(key.index as usize);
    }
}

impl Drop for ConnectorStore {
    fn drop(&mut self) {
        let lock = self.inner.write().unwrap();

        for idx in 0..lock.connectors.len() {
            unsafe {
                let memory = *lock.connectors.get_mut(idx);
                let _ = Box::from_raw(memory); // takes care of deallocation
            }
        }
    }
}

/// The registry of all ports
pub struct PortStore {
    inner: RwLock<PortStoreInner>,
}

struct PortStoreInner {
    ports: RawVec<Port>,
    free: Vec<usize>,
}

impl PortStore {
    fn with_capacity(capacity: usize) -> Self {
        Self{
            inner: RwLock::new(PortStoreInner{
                ports: RawVec::with_capacity(capacity),
                free: Vec::with_capacity(capacity),
            }),
        }
    }

    pub(crate) fn get(&self, key: &ConnectorKey, port_id: PortIdLocal) -> PortRef {
        let lock = self.inner.read().unwrap();
        debug_assert!(port_id.is_valid());

        unsafe {
            let port = lock.ports.get_mut(port_id.index as usize);
            let port = &mut *port;
            debug_assert_eq!(port.owning_connector_id, key.index); // race condition (if they are not equal, which should never happen), better than nothing

            return PortRef{ lock, port };
        }
    }

    pub(crate) fn create_channel(&self, creating_connector: ConnectorId) -> Channel {
        let mut lock = self.inner.write().unwrap();

        // Reserves a new port. Doesn't point it to its counterpart
        fn reserve_port(lock: &mut std::sync::RwLockWriteGuard<'_, PortStoreInner>, kind: PortKind, creating_connector: ConnectorId) -> u32 {
            let index;

            if lock.free.is_empty() {
                index = lock.ports.len() as u32;
                lock.ports.push(Port{
                    self_id: PortIdLocal::new(index),
                    peer_id: PortIdLocal::new_invalid(),
                    kind,
                    ownership: PortOwnership::Owned,
                    owning_connector: connector_id,
                    peer_connector: connector_id
                });
            } else {
                index = lock.free.pop().unwrap() as u32;
                let port = unsafe{ &mut *lock.ports.get_mut(index as usize) };

                port.peer_id = PortIdLocal::new_invalid();
                port.kind = kind;
                port.ownership = PortOwnership::Owned;
                port.owning_connector = connector_id;
                port.peer_connector = connector_id;
            }

            return index;
        }

        // Create the ports
        let putter_id = reserve_port(&mut lock, PortKind::Putter, creating_connector);
        let getter_id = reserve_port(&mut lock, PortKind::Getter, creating_connector);
        debug_assert_ne!(putter_id, getter_id);

        // Point them to one another
        unsafe {
            let putter_port = &mut *lock.ports.get_mut(putter_id as usize);
            let getter_port = &mut *lock.ports.get_mut(getter_id as usize);
            putter_port.peer_id = getter_port.self_id;
            getter_port.peer_id = putter_port.self_id;
        }

        return Channel{
            putter_id: PortIdLocal::new(putter_id),
            getter_id: PortIdLocal::new(getter_id),
        }
    }
}

pub struct PortRef<'p> {
    lock: RwLockReadGuard<'p, PortStoreInner>,
    port: &'static mut Port,
}

impl<'p> std::ops::Deref for PortRef<'p> {
    type Target = Port;

    fn deref(&self) -> &Self::Target {
        return self.port;
    }
}

impl<'p> std::ops::DerefMut for PortRef<'p> {
    fn deref_mut(&mut self) -> &mut Self::Target {
        return self.port;
    }
}

impl Drop for PortStore {
    fn drop(&mut self) {
        let lock = self.inner.write().unwrap();

        // Very lazy code
        for idx in 0..lock.ports.len() {
            if lock.free.contains(&idx) {
                continue;
            }

            unsafe {
                let port = lock.ports.get_mut(idx);
                std::ptr::drop_in_place(port);
            }
        }
    }
}

/// Global store of connectors, ports and queues that are used by the sceduler
/// threads. The global store has the appearance of a thread-safe datatype, but
/// one needs to be careful using it.
///
/// TODO: @docs
/// TODO: @Optimize, very lazy implementation of concurrent datastructures.
///     This includes the `should_exit` and `did_exit` pair!
pub struct GlobalStore {
    pub connector_queue: MpmcQueue<ConnectorKey>,
    pub connectors: ConnectorStore,
    pub ports: PortStore,
    pub should_exit: AtomicBool,    // signal threads to exit
}

impl GlobalStore {
    pub fn new() -> Self {
        Self{
            connector_queue: MpmcQueue::with_capacity(256),
            connectors: ConnectorStore::with_capacity(256),
            ports: PortStore::with_capacity(256),
            should_exit: AtomicBool::new(false),
        }
    }
}