Files @ 26d47db4f922
Branch filter:

Location: CSY/reowolf/src/runtime2/global_store.rs - annotation

26d47db4f922 8.5 KiB application/rls-services+xml Show Source Show as Raw Download as Raw
mh
WIP on second rewrite of port management
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
1aef293674a6
1aef293674a6
58dfabd1be9f
cf26538b25dc
daf15df0f8ca
daf15df0f8ca
1aef293674a6
44f84629849b
44f84629849b
b4ac681e0e7f
58dfabd1be9f
b4ac681e0e7f
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
1aef293674a6
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
b4ac681e0e7f
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
daf15df0f8ca
daf15df0f8ca
58dfabd1be9f
44f84629849b
44f84629849b
44f84629849b
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
44f84629849b
b4ac681e0e7f
44f84629849b
b4ac681e0e7f
b4ac681e0e7f
44f84629849b
44f84629849b
44f84629849b
b4ac681e0e7f
44f84629849b
b4ac681e0e7f
b4ac681e0e7f
44f84629849b
44f84629849b
44f84629849b
44f84629849b
daf15df0f8ca
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
44f84629849b
1aef293674a6
1aef293674a6
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
b4ac681e0e7f
b4ac681e0e7f
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
daf15df0f8ca
1aef293674a6
1aef293674a6
1aef293674a6
1aef293674a6
1aef293674a6
cf26538b25dc
b4ac681e0e7f
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
daf15df0f8ca
cf26538b25dc
cf26538b25dc
cf26538b25dc
daf15df0f8ca
cf26538b25dc
cf26538b25dc
1aef293674a6
1aef293674a6
1aef293674a6
cf26538b25dc
cf26538b25dc
daf15df0f8ca
cf26538b25dc
cf26538b25dc
1aef293674a6
cf26538b25dc
1aef293674a6
1aef293674a6
1aef293674a6
1aef293674a6
1aef293674a6
cf26538b25dc
26d47db4f922
26d47db4f922
26d47db4f922
26d47db4f922
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
cf26538b25dc
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
1aef293674a6
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
1aef293674a6
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
1aef293674a6
1aef293674a6
1aef293674a6
b4ac681e0e7f
c97c5d60bc61
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
1aef293674a6
1aef293674a6
cf26538b25dc
cf26538b25dc
cf26538b25dc
1aef293674a6
cf26538b25dc
1aef293674a6
1aef293674a6
1aef293674a6
1aef293674a6
cf26538b25dc
1aef293674a6
1aef293674a6
1aef293674a6
1aef293674a6
1aef293674a6
cf26538b25dc
cf26538b25dc
cf26538b25dc
1aef293674a6
cf26538b25dc
cf26538b25dc
1aef293674a6
1aef293674a6
1aef293674a6
1aef293674a6
1aef293674a6
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
daf15df0f8ca
cf26538b25dc
cf26538b25dc
cf26538b25dc
daf15df0f8ca
cf26538b25dc
1aef293674a6
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
daf15df0f8ca
cf26538b25dc
1aef293674a6
1aef293674a6
use std::ptr;
use std::sync::{Arc, Barrier, RwLock, RwLockReadGuard};
use std::sync::atomic::{AtomicBool, AtomicU32};

use crate::collections::{MpmcQueue, RawVec};

use super::connector::{ConnectorPDL, ConnectorPublic};
use super::port::{PortIdLocal, Port, PortKind, PortOwnership, Channel};
use super::inbox::PublicInbox;
use super::scheduler::Router;

use crate::ProtocolDescription;
use crate::runtime2::connector::{ConnectorScheduling, RunDeltaState};
use crate::runtime2::inbox::{DataMessage, MessageContents, SyncMessage};
use crate::runtime2::native::Connector;
use crate::runtime2::scheduler::ConnectorCtx;

/// A kind of token that, once obtained, allows mutable access to a connector.
/// We're trying to use move semantics as much as possible: the owner of this
/// key is the only one that may execute the connector's code.
pub(crate) struct ConnectorKey {
    pub index: u32, // of connector
}

impl ConnectorKey {
    /// Downcasts the `ConnectorKey` type, which can be used to obtain mutable
    /// access, to a "regular ID" which can be used to obtain immutable access.
    #[inline]
    pub fn downcast(&self) -> ConnectorId {
        return ConnectorId(self.index);
    }

    /// Turns the `ConnectorId` into a `ConnectorKey`, marked as unsafe as it
    /// bypasses the type-enforced `ConnectorKey`/`ConnectorId` system
    #[inline]
    pub unsafe fn from_id(id: ConnectorId) -> ConnectorKey {
        return ConnectorKey{ index: id.0 };
    }
}

/// A kind of token that allows shared access to a connector. Multiple threads
/// may hold this
#[derive(Copy, Clone)]
pub(crate) struct ConnectorId(pub u32);

impl ConnectorId {
    // TODO: Like the other `new_invalid`, maybe remove
    #[inline]
    pub fn new_invalid() -> ConnectorId {
        return ConnectorId(u32::MAX);
    }

    #[inline]
    pub(crate) fn is_valid(&self) -> bool {
        return self.0 != u32::MAX;
    }
}

// TODO: Change this, I hate this. But I also don't want to put `public` and
//  `router` of `ScheduledConnector` back into `Connector`. The reason I don't
//  want `Box<dyn Connector>` everywhere is because of the v-table overhead. But
//  to truly design this properly I need some benchmarks.
pub enum ConnectorVariant {
    UserDefined(ConnectorPDL),
    Native(Box<dyn Connector>),
}

impl Connector for ConnectorVariant {
    fn handle_message(&mut self, message: MessageContents, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) {
        match self {
            ConnectorVariant::UserDefined(c) => c.handle_message(message, ctx, delta_state),
            ConnectorVariant::Native(c) => c.handle_message(message, ctx, delta_state),
        }
    }

    fn run(&mut self, protocol_description: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
        match self {
            ConnectorVariant::UserDefined(c) => c.run(protocol_description, ctx, delta_state),
            ConnectorVariant::Native(c) => c.run(protocol_description, ctx, delta_state),
        }
    }
}

pub struct ScheduledConnector {
    pub connector: ConnectorVariant, // access by connector
    pub context: ConnectorCtx, // mutable access by scheduler, immutable by connector
    pub public: ConnectorPublic, // accessible by all schedulers and connectors
    pub router: Router,
}

/// The registry containing all connectors. The idea here is that when someone
/// owns a `ConnectorKey`, then one has unique access to that connector.
/// Otherwise one has shared access.
///
/// This datastructure is built to be wrapped in a RwLock.
pub(crate) struct ConnectorStore {
    pub(crate) port_counter: Arc<AtomicU32>,
    inner: RwLock<ConnectorStoreInner>,
}

struct ConnectorStoreInner {
    connectors: RawVec<*mut ScheduledConnector>,
    free: Vec<usize>,
}

impl ConnectorStore {
    fn with_capacity(capacity: usize) -> Self {
        return Self{
            port_counter: Arc::new(AtomicU32::new(0)),
            inner: RwLock::new(ConnectorStoreInner {
                connectors: RawVec::with_capacity(capacity),
                free: Vec::with_capacity(capacity),
            }),
        };
    }

    /// Retrieves the shared members of the connector.
    pub(crate) fn get_shared(&self, connector_id: ConnectorId) -> &'static ConnectorPublic {
        let lock = self.inner.read().unwrap();

        unsafe {
            let connector = lock.connectors.get(connector_id.0 as usize);
            debug_assert!(!connector.is_null());
            return &*connector.public;
        }
    }

    /// Retrieves a particular connector. Only the thread that pulled the
    /// associated key out of the execution queue should (be able to) call this.
    pub(crate) fn get_mut(&self, key: &ConnectorKey) -> &'static mut ScheduledConnector {
        let lock = self.inner.read().unwrap();

        unsafe {
            let connector = lock.connectors.get_mut(key.index as usize);
            debug_assert!(!connector.is_null());
            return *connector as &mut _;
        }
    }

    /// Create a new connector, returning the key that can be used to retrieve
    /// and/or queue it. The caller must make sure that the constructed
    /// connector's code is initialized with the same ports as the ports in the
    /// `initial_ports` array.
    pub(crate) fn create(&self, created_by: &mut ScheduledConnector, connector: ConnectorVariant, initial_ports: Vec<Port>) -> ConnectorKey {
        // Creation of the connector in the global store, requires a lock
        {
            let lock = self.inner.write().unwrap();
            let connector = ScheduledConnector {
                connector,
                context: ConnectorCtx::new(self.port_counter.clone()),
                public: ConnectorPublic::new(),
                router: Router::new(),
            };

            let index;
            if lock.free.is_empty() {
                let connector = Box::into_raw(Box::new(connector));

                unsafe {
                    // Cheating a bit here. Anyway, move to heap, store in list
                    index = lock.connectors.len();
                    lock.connectors.push(connector);
                }
            } else {
                index = lock.free.pop().unwrap();

                unsafe {
                    let target = lock.connectors.get_mut(index);
                    debug_assert!(!target.is_null());
                    ptr::write(*target, connector);
                }
            }
        }

        // Setting of new connector's ID
        let key = ConnectorKey{ index: index as u32 };
        let new_connector = self.get_mut(&key);
        new_connector.context.id = key.downcast();

        // Transferring ownership of ports (and crashing if there is a
        // programmer's mistake in port management)
        match &new_connector.connector {
            ConnectorVariant::UserDefined(connector) => {
                for port_id in &connector.ports.owned_ports {
                    let mut port = created_by.context.remove_port(*port_id);
                    new_connector.context.add_port(port);
                }
            },
            ConnectorVariant::Native(_) => {}, // no initial ports (yet!)
        }

        return key;
    }

    pub(crate) fn destroy(&self, key: ConnectorKey) {
        let lock = self.inner.write().unwrap();

        unsafe {
            let connector = lock.connectors.get_mut(key.index as usize);
            ptr::drop_in_place(*connector);
            // Note: but not deallocating!
        }

        lock.free.push(key.index as usize);
    }
}

impl Drop for ConnectorStore {
    fn drop(&mut self) {
        let lock = self.inner.write().unwrap();

        for idx in 0..lock.connectors.len() {
            unsafe {
                let memory = *lock.connectors.get_mut(idx);
                let _ = Box::from_raw(memory); // takes care of deallocation
            }
        }
    }
}

/// Global store of connectors, ports and queues that are used by the sceduler
/// threads. The global store has the appearance of a thread-safe datatype, but
/// one needs to be careful using it.
///
/// TODO: @docs
/// TODO: @Optimize, very lazy implementation of concurrent datastructures.
///     This includes the `should_exit` and `did_exit` pair!
pub struct GlobalStore {
    pub connector_queue: MpmcQueue<ConnectorKey>,
    pub connectors: ConnectorStore,
    pub should_exit: AtomicBool,    // signal threads to exit
}

impl GlobalStore {
    pub fn new() -> Self {
        Self{
            connector_queue: MpmcQueue::with_capacity(256),
            connectors: ConnectorStore::with_capacity(256),
            should_exit: AtomicBool::new(false),
        }
    }
}