Files @ 6e3f85de2a0a
Branch filter:

Location: CSY/reowolf/src/runtime2/global_store.rs - annotation

6e3f85de2a0a 6.4 KiB application/rls-services+xml Show Source Show as Raw Download as Raw
MH
initial version of new consensus
b4ac681e0e7f
a43d61913724
b4ac681e0e7f
b4ac681e0e7f
1aef293674a6
44f84629849b
1aef293674a6
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
7d01f1245b7c
daf15df0f8ca
1aef293674a6
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
b4ac681e0e7f
b4ac681e0e7f
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
daf15df0f8ca
1aef293674a6
1aef293674a6
1aef293674a6
1aef293674a6
1aef293674a6
cf26538b25dc
b4ac681e0e7f
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
daf15df0f8ca
cf26538b25dc
cf26538b25dc
cf26538b25dc
daf15df0f8ca
cf26538b25dc
a43d61913724
1aef293674a6
1aef293674a6
1aef293674a6
cf26538b25dc
cf26538b25dc
daf15df0f8ca
cf26538b25dc
cf26538b25dc
1aef293674a6
cf26538b25dc
1aef293674a6
a43d61913724
1aef293674a6
1aef293674a6
1aef293674a6
a43d61913724
a43d61913724
a43d61913724
7d01f1245b7c
a43d61913724
a43d61913724
a43d61913724
cf26538b25dc
26d47db4f922
26d47db4f922
7d01f1245b7c
7d01f1245b7c
a43d61913724
7d01f1245b7c
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
7d01f1245b7c
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
7d01f1245b7c
b4ac681e0e7f
a43d61913724
b4ac681e0e7f
7d01f1245b7c
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
7d01f1245b7c
b4ac681e0e7f
b4ac681e0e7f
cf26538b25dc
b4ac681e0e7f
b4ac681e0e7f
1aef293674a6
7d01f1245b7c
7d01f1245b7c
b4ac681e0e7f
b4ac681e0e7f
1aef293674a6
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
1aef293674a6
1aef293674a6
1aef293674a6
a43d61913724
c97c5d60bc61
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
a43d61913724
c97c5d60bc61
1aef293674a6
1aef293674a6
1aef293674a6
1aef293674a6
1aef293674a6
cf26538b25dc
cf26538b25dc
cf26538b25dc
1aef293674a6
cf26538b25dc
cf26538b25dc
1aef293674a6
1aef293674a6
1aef293674a6
1aef293674a6
1aef293674a6
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
daf15df0f8ca
7d01f1245b7c
cf26538b25dc
cf26538b25dc
daf15df0f8ca
cf26538b25dc
1aef293674a6
cf26538b25dc
7d01f1245b7c
cf26538b25dc
cf26538b25dc
cf26538b25dc
daf15df0f8ca
cf26538b25dc
1aef293674a6
1aef293674a6
use std::ptr;
use std::sync::{Arc, RwLock};
use std::sync::atomic::{AtomicBool, AtomicU32};

use crate::collections::{MpmcQueue, RawVec};
use crate::ProtocolDescription;

use super::scheduler::{Router, ConnectorCtx};
use super::connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling, RunDeltaState};
use super::inbox::Message;
use super::native::{Connector, ConnectorApplication};


/// The registry containing all connectors. The idea here is that when someone
/// owns a `ConnectorKey`, then one has unique access to that connector.
/// Otherwise one has shared access.
///
/// This datastructure is built to be wrapped in a RwLock.
pub(crate) struct ConnectorStore {
    pub(crate) port_counter: Arc<AtomicU32>,
    inner: RwLock<ConnectorStoreInner>,
}

struct ConnectorStoreInner {
    connectors: RawVec<*mut ScheduledConnector>,
    free: Vec<usize>,
}

impl ConnectorStore {
    fn with_capacity(capacity: usize) -> Self {
        return Self{
            port_counter: Arc::new(AtomicU32::new(0)),
            inner: RwLock::new(ConnectorStoreInner {
                connectors: RawVec::with_capacity(capacity),
                free: Vec::with_capacity(capacity),
            }),
        };
    }

    /// Retrieves the shared members of the connector.
    pub(crate) fn get_shared(&self, connector_id: ConnectorId) -> &'static ConnectorPublic {
        let lock = self.inner.read().unwrap();

        unsafe {
            let connector = lock.connectors.get(connector_id.0 as usize);
            debug_assert!(!connector.is_null());
            return &(**connector).public;
        }
    }

    /// Retrieves a particular connector. Only the thread that pulled the
    /// associated key out of the execution queue should (be able to) call this.
    pub(crate) fn get_mut(&self, key: &ConnectorKey) -> &'static mut ScheduledConnector {
        let lock = self.inner.read().unwrap();

        unsafe {
            let connector = lock.connectors.get_mut(key.index as usize);
            debug_assert!(!connector.is_null());
            return &mut (**connector);
        }
    }

    pub(crate) fn create_interface(&self, connector: ConnectorApplication) -> ConnectorKey {
        // Connector interface does not own any initial ports, and cannot be
        // created by another connector
        let key = self.create_connector_raw(ConnectorVariant::Native(Box::new(connector)), true);
        return key;
    }

    /// Create a new connector, returning the key that can be used to retrieve
    /// and/or queue it. The caller must make sure that the constructed
    /// connector's code is initialized with the same ports as the ports in the
    /// `initial_ports` array. Furthermore the connector is initialized as not
    /// sleeping, so MUST be put on the connector queue by the caller.
    pub(crate) fn create_pdl(&self, created_by: &mut ScheduledConnector, connector: ConnectorPDL) -> ConnectorKey {
        let key = self.create_connector_raw(ConnectorVariant::UserDefined(connector), false);
        let new_connector = self.get_mut(&key);

        // Transferring ownership of ports (and crashing if there is a
        // programmer's mistake in port management)
        match &new_connector.connector {
            ConnectorVariant::UserDefined(connector) => {
                for port_id in &connector.ports.owned_ports {
                    let mut port = created_by.context.remove_port(*port_id);
                    new_connector.context.add_port(port);
                }
            },
            ConnectorVariant::Native(_) => unreachable!(),
        }

        return key;
    }

    pub(crate) fn destroy(&self, key: ConnectorKey) {
        let mut lock = self.inner.write().unwrap();

        unsafe {
            let connector = lock.connectors.get_mut(key.index as usize);
            ptr::drop_in_place(*connector);
            // Note: but not deallocating!
        }

        lock.free.push(key.index as usize);
    }

    /// Creates a connector but does not set its initial ports
    fn create_connector_raw(&self, connector: ConnectorVariant, initialize_as_sleeping: bool) -> ConnectorKey {
        // Creation of the connector in the global store, requires a lock
        let index;
        {
            let mut lock = self.inner.write().unwrap();
            let connector = ScheduledConnector {
                connector,
                context: ConnectorCtx::new(self.port_counter.clone()),
                public: ConnectorPublic::new(initialize_as_sleeping),
                router: Router::new(),
            };

            if lock.free.is_empty() {
                let connector = Box::into_raw(Box::new(connector));

                index = lock.connectors.len();
                lock.connectors.push(connector);
            } else {
                index = lock.free.pop().unwrap();

                unsafe {
                    let target = lock.connectors.get_mut(index);
                    debug_assert!(!target.is_null());
                    ptr::write(*target, connector);
                }
            }
        }

        // Generate key and retrieve the connector to set its ID
        let key = ConnectorKey{ index: index as u32 };
        let new_connector = self.get_mut(&key);
        new_connector.context.id = key.downcast();

        // Return the connector key
        return key;
    }
}

impl Drop for ConnectorStore {
    fn drop(&mut self) {
        let lock = self.inner.write().unwrap();

        for idx in 0..lock.connectors.len() {
            unsafe {
                let memory = *lock.connectors.get_mut(idx);
                let _ = Box::from_raw(memory); // takes care of deallocation
            }
        }
    }
}

/// Global store of connectors, ports and queues that are used by the sceduler
/// threads. The global store has the appearance of a thread-safe datatype, but
/// one needs to be careful using it.
///
/// TODO: @docs
/// TODO: @Optimize, very lazy implementation of concurrent datastructures.
///     This includes the `should_exit` and `did_exit` pair!
pub(crate) struct GlobalStore {
    pub connector_queue: MpmcQueue<ConnectorKey>,
    pub connectors: ConnectorStore,
    pub should_exit: AtomicBool,    // signal threads to exit
}

impl GlobalStore {
    pub(crate) fn new() -> Self {
        Self{
            connector_queue: MpmcQueue::with_capacity(256),
            connectors: ConnectorStore::with_capacity(256),
            should_exit: AtomicBool::new(false),
        }
    }
}