Files @ b4ac681e0e7f
Branch filter:

Location: CSY/reowolf/src/runtime2/global_store.rs

b4ac681e0e7f 8.4 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
mh
WIP on message-based sync impl
use std::ptr;
use std::sync::{Arc, Barrier, RwLock, RwLockReadGuard};
use std::sync::atomic::{AtomicBool, AtomicU32};

use crate::collections::{MpmcQueue, RawVec};

use super::connector::{ConnectorPDL, ConnectorPublic};
use super::port::{PortIdLocal, Port, PortKind, PortOwnership, Channel};
use super::inbox::PublicInbox;
use super::scheduler::Router;

use crate::ProtocolDescription;
use crate::runtime2::connector::{ConnectorScheduling, RunDeltaState};
use crate::runtime2::inbox::{DataMessage, MessageContents, SyncMessage};
use crate::runtime2::native::Connector;
use crate::runtime2::scheduler::ConnectorCtx;

/// A kind of token that, once obtained, allows mutable access to a connector.
/// We're trying to use move semantics as much as possible: the owner of this
/// key is the only one that may execute the connector's code.
pub(crate) struct ConnectorKey {
    pub index: u32, // of connector
}

impl ConnectorKey {
    /// Downcasts the `ConnectorKey` type, which can be used to obtain mutable
    /// access, to a "regular ID" which can be used to obtain immutable access.
    #[inline]
    pub fn downcast(&self) -> ConnectorId {
        return ConnectorId(self.index);
    }

    /// Turns the `ConnectorId` into a `ConnectorKey`, marked as unsafe as it
    /// bypasses the type-enforced `ConnectorKey`/`ConnectorId` system
    #[inline]
    pub unsafe fn from_id(id: ConnectorId) -> ConnectorKey {
        return ConnectorKey{ index: id.0 };
    }
}

/// A kind of token that allows shared access to a connector. Multiple threads
/// may hold this
#[derive(Copy, Clone)]
pub(crate) struct ConnectorId(pub u32);

impl ConnectorId {
    // TODO: Like the other `new_invalid`, maybe remove
    #[inline]
    pub fn new_invalid() -> ConnectorId {
        return ConnectorId(u32::MAX);
    }

    #[inline]
    pub(crate) fn is_valid(&self) -> bool {
        return self.0 != u32::MAX;
    }
}

// TODO: Change this, I hate this. But I also don't want to put `public` and
//  `router` of `ScheduledConnector` back into `Connector`. The reason I don't
//  want `Box<dyn Connector>` everywhere is because of the v-table overhead. But
//  to truly design this properly I need some benchmarks.
pub enum ConnectorVariant {
    UserDefined(ConnectorPDL),
    Native(Box<dyn Connector>),
}

impl Connector for ConnectorVariant {
    fn handle_message(&mut self, message: MessageContents, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) {
        match self {
            ConnectorVariant::UserDefined(c) => c.handle_message(message, ctx, delta_state),
            ConnectorVariant::Native(c) => c.handle_message(message, ctx, delta_state),
        }
    }

    fn run(&mut self, protocol_description: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
        match self {
            ConnectorVariant::UserDefined(c) => c.run(protocol_description, ctx, delta_state),
            ConnectorVariant::Native(c) => c.run(protocol_description, ctx, delta_state),
        }
    }
}

pub struct ScheduledConnector {
    pub connector: ConnectorVariant, // access by connector
    pub context: ConnectorCtx, // mutable access by scheduler, immutable by connector
    pub public: ConnectorPublic, // accessible by all schedulers and connectors
    pub router: Router,
}

/// The registry containing all connectors. The idea here is that when someone
/// owns a `ConnectorKey`, then one has unique access to that connector.
/// Otherwise one has shared access.
///
/// This datastructure is built to be wrapped in a RwLock.
pub(crate) struct ConnectorStore {
    pub(crate) port_counter: Arc<AtomicU32>,
    inner: RwLock<ConnectorStoreInner>,
}

struct ConnectorStoreInner {
    connectors: RawVec<*mut ScheduledConnector>,
    free: Vec<usize>,
}

impl ConnectorStore {
    fn with_capacity(capacity: usize) -> Self {
        return Self{
            port_counter: Arc::new(AtomicU32::new(0)),
            inner: RwLock::new(ConnectorStoreInner {
                connectors: RawVec::with_capacity(capacity),
                free: Vec::with_capacity(capacity),
            }),
        };
    }

    /// Retrieves the shared members of the connector.
    pub(crate) fn get_shared(&self, connector_id: ConnectorId) -> &'static ConnectorPublic {
        let lock = self.inner.read().unwrap();

        unsafe {
            let connector = lock.connectors.get(connector_id.0 as usize);
            debug_assert!(!connector.is_null());
            return &*connector.public;
        }
    }

    /// Retrieves a particular connector. Only the thread that pulled the
    /// associated key out of the execution queue should (be able to) call this.
    pub(crate) fn get_mut(&self, key: &ConnectorKey) -> &'static mut ScheduledConnector {
        let lock = self.inner.read().unwrap();

        unsafe {
            let connector = lock.connectors.get_mut(key.index as usize);
            debug_assert!(!connector.is_null());
            return *connector as &mut _;
        }
    }

    /// Create a new connector, returning the key that can be used to retrieve
    /// and/or queue it.
    pub(crate) fn create(&self, created_by: &mut ScheduledConnector, connector: ConnectorVariant) -> ConnectorKey {
        // Creation of the connector in the global store, requires a lock
        {
            let lock = self.inner.write().unwrap();
            let connector = ScheduledConnector {
                connector,
                context: ConnectorCtx::new(self.port_counter.clone()),
                public: ConnectorPublic::new(),
                router: Router::new(),
            };

            let index;
            if lock.free.is_empty() {
                let connector = Box::into_raw(Box::new(connector));

                unsafe {
                    // Cheating a bit here. Anyway, move to heap, store in list
                    index = lock.connectors.len();
                    lock.connectors.push(connector);
                }
            } else {
                index = lock.free.pop().unwrap();

                unsafe {
                    let target = lock.connectors.get_mut(index);
                    debug_assert!(!target.is_null());
                    ptr::write(*target, connector);
                }
            }
        }

        // Setting of new connector's ID
        let key = ConnectorKey{ index: index as u32 };
        let new_connector = self.get_mut(&key);
        new_connector.context.id = key.downcast();

        // Transferring ownership of ports (and crashing if there is a
        // programmer's mistake in port management)
        match &new_connector.connector {
            ConnectorVariant::UserDefined(connector) => {
                for port_id in &connector.ports.owned_ports {
                    let mut port = created_by.context.remove_port(*port_id);
                    port.owning_connector = new_connector.context.id;
                    new_connector.context.add_port(port);
                }
            },
            ConnectorVariant::Native(_) => {}, // no initial ports (yet!)
        }

        return key;
    }

    pub(crate) fn destroy(&self, key: ConnectorKey) {
        let lock = self.inner.write().unwrap();

        unsafe {
            let connector = lock.connectors.get_mut(key.index as usize);
            ptr::drop_in_place(*connector);
            // Note: but not deallocating!
        }

        lock.free.push(key.index as usize);
    }
}

impl Drop for ConnectorStore {
    fn drop(&mut self) {
        let lock = self.inner.write().unwrap();

        for idx in 0..lock.connectors.len() {
            unsafe {
                let memory = *lock.connectors.get_mut(idx);
                let _ = Box::from_raw(memory); // takes care of deallocation
            }
        }
    }
}

/// Global store of connectors, ports and queues that are used by the sceduler
/// threads. The global store has the appearance of a thread-safe datatype, but
/// one needs to be careful using it.
///
/// TODO: @docs
/// TODO: @Optimize, very lazy implementation of concurrent datastructures.
///     This includes the `should_exit` and `did_exit` pair!
pub struct GlobalStore {
    pub connector_queue: MpmcQueue<ConnectorKey>,
    pub connectors: ConnectorStore,
    pub should_exit: AtomicBool,    // signal threads to exit
}

impl GlobalStore {
    pub fn new() -> Self {
        Self{
            connector_queue: MpmcQueue::with_capacity(256),
            connectors: ConnectorStore::with_capacity(256),
            should_exit: AtomicBool::new(false),
        }
    }
}