Files @ ecc47971d535
Branch filter:

Location: CSY/reowolf/src/runtime2/global_store.rs

ecc47971d535 6.4 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
MH
WIP on handling sync solution messages
use std::ptr;
use std::sync::{Arc, RwLock};
use std::sync::atomic::{AtomicBool, AtomicU32};

use crate::collections::{MpmcQueue, RawVec};
use crate::ProtocolDescription;

use super::scheduler::{Router, ConnectorCtx};
use super::connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling, RunDeltaState};
use super::inbox::Message;
use super::native::{Connector, ConnectorApplication};


/// The registry containing all connectors. The idea here is that when someone
/// owns a `ConnectorKey`, then one has unique access to that connector.
/// Otherwise one has shared access.
///
/// This datastructure is built to be wrapped in a RwLock.
pub(crate) struct ConnectorStore {
    pub(crate) port_counter: Arc<AtomicU32>,
    inner: RwLock<ConnectorStoreInner>,
}

struct ConnectorStoreInner {
    connectors: RawVec<*mut ScheduledConnector>,
    free: Vec<usize>,
}

impl ConnectorStore {
    fn with_capacity(capacity: usize) -> Self {
        return Self{
            port_counter: Arc::new(AtomicU32::new(0)),
            inner: RwLock::new(ConnectorStoreInner {
                connectors: RawVec::with_capacity(capacity),
                free: Vec::with_capacity(capacity),
            }),
        };
    }

    /// Retrieves the shared members of the connector.
    pub(crate) fn get_shared(&self, connector_id: ConnectorId) -> &'static ConnectorPublic {
        let lock = self.inner.read().unwrap();

        unsafe {
            let connector = lock.connectors.get(connector_id.0 as usize);
            debug_assert!(!connector.is_null());
            return &(**connector).public;
        }
    }

    /// Retrieves a particular connector. Only the thread that pulled the
    /// associated key out of the execution queue should (be able to) call this.
    pub(crate) fn get_mut(&self, key: &ConnectorKey) -> &'static mut ScheduledConnector {
        let lock = self.inner.read().unwrap();

        unsafe {
            let connector = lock.connectors.get_mut(key.index as usize);
            debug_assert!(!connector.is_null());
            return &mut (**connector);
        }
    }

    pub(crate) fn create_interface(&self, connector: ConnectorApplication) -> ConnectorKey {
        // Connector interface does not own any initial ports, and cannot be
        // created by another connector
        let key = self.create_connector_raw(ConnectorVariant::Native(Box::new(connector)), true);
        return key;
    }

    /// Create a new connector, returning the key that can be used to retrieve
    /// and/or queue it. The caller must make sure that the constructed
    /// connector's code is initialized with the same ports as the ports in the
    /// `initial_ports` array. Furthermore the connector is initialized as not
    /// sleeping, so MUST be put on the connector queue by the caller.
    pub(crate) fn create_pdl(&self, created_by: &mut ScheduledConnector, connector: ConnectorPDL) -> ConnectorKey {
        let key = self.create_connector_raw(ConnectorVariant::UserDefined(connector), false);
        let new_connector = self.get_mut(&key);

        // Transferring ownership of ports (and crashing if there is a
        // programmer's mistake in port management)
        match &new_connector.connector {
            ConnectorVariant::UserDefined(connector) => {
                for port_id in &connector.ports.owned_ports {
                    let mut port = created_by.context.remove_port(*port_id);
                    new_connector.context.add_port(port);
                }
            },
            ConnectorVariant::Native(_) => unreachable!(),
        }

        return key;
    }

    pub(crate) fn destroy(&self, key: ConnectorKey) {
        let mut lock = self.inner.write().unwrap();

        unsafe {
            let connector = lock.connectors.get_mut(key.index as usize);
            ptr::drop_in_place(*connector);
            // Note: but not deallocating!
        }

        lock.free.push(key.index as usize);
    }

    /// Creates a connector but does not set its initial ports
    fn create_connector_raw(&self, connector: ConnectorVariant, initialize_as_sleeping: bool) -> ConnectorKey {
        // Creation of the connector in the global store, requires a lock
        let index;
        {
            let mut lock = self.inner.write().unwrap();
            let connector = ScheduledConnector {
                connector,
                context: ConnectorCtx::new(self.port_counter.clone()),
                public: ConnectorPublic::new(initialize_as_sleeping),
                router: Router::new(),
            };

            if lock.free.is_empty() {
                let connector = Box::into_raw(Box::new(connector));

                index = lock.connectors.len();
                lock.connectors.push(connector);
            } else {
                index = lock.free.pop().unwrap();

                unsafe {
                    let target = lock.connectors.get_mut(index);
                    debug_assert!(!target.is_null());
                    ptr::write(*target, connector);
                }
            }
        }

        // Generate key and retrieve the connector to set its ID
        let key = ConnectorKey{ index: index as u32 };
        let new_connector = self.get_mut(&key);
        new_connector.context.id = key.downcast();

        // Return the connector key
        return key;
    }
}

impl Drop for ConnectorStore {
    fn drop(&mut self) {
        let lock = self.inner.write().unwrap();

        for idx in 0..lock.connectors.len() {
            unsafe {
                let memory = *lock.connectors.get_mut(idx);
                let _ = Box::from_raw(memory); // takes care of deallocation
            }
        }
    }
}

/// Global store of connectors, ports and queues that are used by the sceduler
/// threads. The global store has the appearance of a thread-safe datatype, but
/// one needs to be careful using it.
///
/// TODO: @docs
/// TODO: @Optimize, very lazy implementation of concurrent datastructures.
///     This includes the `should_exit` and `did_exit` pair!
pub(crate) struct GlobalStore {
    pub connector_queue: MpmcQueue<ConnectorKey>,
    pub connectors: ConnectorStore,
    pub should_exit: AtomicBool,    // signal threads to exit
}

impl GlobalStore {
    pub(crate) fn new() -> Self {
        Self{
            connector_queue: MpmcQueue::with_capacity(256),
            connectors: ConnectorStore::with_capacity(256),
            should_exit: AtomicBool::new(false),
        }
    }
}