Files @ a43d61913724
Branch filter:

Location: CSY/reowolf/src/runtime2/global_store.rs - annotation

a43d61913724 9.0 KiB application/rls-services+xml Show Source Show as Raw Download as Raw
MH
prepare for debugging
b4ac681e0e7f
a43d61913724
b4ac681e0e7f
b4ac681e0e7f
1aef293674a6
1aef293674a6
58dfabd1be9f
daf15df0f8ca
1aef293674a6
44f84629849b
44f84629849b
a43d61913724
a43d61913724
b4ac681e0e7f
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
1aef293674a6
daf15df0f8ca
daf15df0f8ca
a43d61913724
b4ac681e0e7f
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
daf15df0f8ca
daf15df0f8ca
58dfabd1be9f
44f84629849b
44f84629849b
44f84629849b
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
58dfabd1be9f
44f84629849b
b4ac681e0e7f
44f84629849b
b4ac681e0e7f
b4ac681e0e7f
44f84629849b
44f84629849b
44f84629849b
b4ac681e0e7f
44f84629849b
b4ac681e0e7f
b4ac681e0e7f
44f84629849b
44f84629849b
44f84629849b
44f84629849b
daf15df0f8ca
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
44f84629849b
1aef293674a6
1aef293674a6
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
b4ac681e0e7f
b4ac681e0e7f
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
daf15df0f8ca
1aef293674a6
1aef293674a6
1aef293674a6
1aef293674a6
1aef293674a6
cf26538b25dc
b4ac681e0e7f
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
daf15df0f8ca
cf26538b25dc
cf26538b25dc
cf26538b25dc
daf15df0f8ca
cf26538b25dc
a43d61913724
1aef293674a6
1aef293674a6
1aef293674a6
cf26538b25dc
cf26538b25dc
daf15df0f8ca
cf26538b25dc
cf26538b25dc
1aef293674a6
cf26538b25dc
1aef293674a6
a43d61913724
1aef293674a6
1aef293674a6
1aef293674a6
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
cf26538b25dc
26d47db4f922
26d47db4f922
26d47db4f922
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
a43d61913724
b4ac681e0e7f
a43d61913724
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
cf26538b25dc
b4ac681e0e7f
b4ac681e0e7f
1aef293674a6
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
1aef293674a6
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
1aef293674a6
1aef293674a6
1aef293674a6
a43d61913724
c97c5d60bc61
b4ac681e0e7f
b4ac681e0e7f
b4ac681e0e7f
a43d61913724
c97c5d60bc61
1aef293674a6
1aef293674a6
1aef293674a6
1aef293674a6
1aef293674a6
cf26538b25dc
cf26538b25dc
cf26538b25dc
1aef293674a6
cf26538b25dc
cf26538b25dc
1aef293674a6
1aef293674a6
1aef293674a6
1aef293674a6
1aef293674a6
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
daf15df0f8ca
cf26538b25dc
cf26538b25dc
cf26538b25dc
daf15df0f8ca
cf26538b25dc
1aef293674a6
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
daf15df0f8ca
cf26538b25dc
1aef293674a6
1aef293674a6
use std::ptr;
use std::sync::{Arc, RwLock};
use std::sync::atomic::{AtomicBool, AtomicU32};

use crate::collections::{MpmcQueue, RawVec};

use super::connector::{ConnectorPDL, ConnectorPublic};
use super::scheduler::Router;

use crate::ProtocolDescription;
use crate::runtime2::connector::{ConnectorScheduling, RunDeltaState};
use crate::runtime2::inbox::MessageContents;
use crate::runtime2::native::{Connector, ConnectorApplication};
use crate::runtime2::scheduler::ConnectorCtx;

/// A kind of token that, once obtained, allows mutable access to a connector.
/// We're trying to use move semantics as much as possible: the owner of this
/// key is the only one that may execute the connector's code.
pub(crate) struct ConnectorKey {
    pub index: u32, // of connector
}

impl ConnectorKey {
    /// Downcasts the `ConnectorKey` type, which can be used to obtain mutable
    /// access, to a "regular ID" which can be used to obtain immutable access.
    #[inline]
    pub fn downcast(&self) -> ConnectorId {
        return ConnectorId(self.index);
    }

    /// Turns the `ConnectorId` into a `ConnectorKey`, marked as unsafe as it
    /// bypasses the type-enforced `ConnectorKey`/`ConnectorId` system
    #[inline]
    pub unsafe fn from_id(id: ConnectorId) -> ConnectorKey {
        return ConnectorKey{ index: id.0 };
    }
}

/// A kind of token that allows shared access to a connector. Multiple threads
/// may hold this
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub(crate) struct ConnectorId(pub u32);

impl ConnectorId {
    // TODO: Like the other `new_invalid`, maybe remove
    #[inline]
    pub fn new_invalid() -> ConnectorId {
        return ConnectorId(u32::MAX);
    }

    #[inline]
    pub(crate) fn is_valid(&self) -> bool {
        return self.0 != u32::MAX;
    }
}

// TODO: Change this, I hate this. But I also don't want to put `public` and
//  `router` of `ScheduledConnector` back into `Connector`. The reason I don't
//  want `Box<dyn Connector>` everywhere is because of the v-table overhead. But
//  to truly design this properly I need some benchmarks.
pub enum ConnectorVariant {
    UserDefined(ConnectorPDL),
    Native(Box<dyn Connector>),
}

impl Connector for ConnectorVariant {
    fn handle_message(&mut self, message: MessageContents, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) {
        match self {
            ConnectorVariant::UserDefined(c) => c.handle_message(message, ctx, delta_state),
            ConnectorVariant::Native(c) => c.handle_message(message, ctx, delta_state),
        }
    }

    fn run(&mut self, protocol_description: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
        match self {
            ConnectorVariant::UserDefined(c) => c.run(protocol_description, ctx, delta_state),
            ConnectorVariant::Native(c) => c.run(protocol_description, ctx, delta_state),
        }
    }
}

pub struct ScheduledConnector {
    pub connector: ConnectorVariant, // access by connector
    pub context: ConnectorCtx, // mutable access by scheduler, immutable by connector
    pub public: ConnectorPublic, // accessible by all schedulers and connectors
    pub router: Router,
}

/// The registry containing all connectors. The idea here is that when someone
/// owns a `ConnectorKey`, then one has unique access to that connector.
/// Otherwise one has shared access.
///
/// This datastructure is built to be wrapped in a RwLock.
pub(crate) struct ConnectorStore {
    pub(crate) port_counter: Arc<AtomicU32>,
    inner: RwLock<ConnectorStoreInner>,
}

struct ConnectorStoreInner {
    connectors: RawVec<*mut ScheduledConnector>,
    free: Vec<usize>,
}

impl ConnectorStore {
    fn with_capacity(capacity: usize) -> Self {
        return Self{
            port_counter: Arc::new(AtomicU32::new(0)),
            inner: RwLock::new(ConnectorStoreInner {
                connectors: RawVec::with_capacity(capacity),
                free: Vec::with_capacity(capacity),
            }),
        };
    }

    /// Retrieves the shared members of the connector.
    pub(crate) fn get_shared(&self, connector_id: ConnectorId) -> &'static ConnectorPublic {
        let lock = self.inner.read().unwrap();

        unsafe {
            let connector = lock.connectors.get(connector_id.0 as usize);
            debug_assert!(!connector.is_null());
            return &(**connector).public;
        }
    }

    /// Retrieves a particular connector. Only the thread that pulled the
    /// associated key out of the execution queue should (be able to) call this.
    pub(crate) fn get_mut(&self, key: &ConnectorKey) -> &'static mut ScheduledConnector {
        let lock = self.inner.read().unwrap();

        unsafe {
            let connector = lock.connectors.get_mut(key.index as usize);
            debug_assert!(!connector.is_null());
            return &mut (**connector);
        }
    }

    pub(crate) fn create_interface(&self, connector: ConnectorApplication) -> ConnectorKey {
        // Connector interface does not own any initial ports, and cannot be
        // created by another connector
        let key = self.create_connector_raw(ConnectorVariant::Native(Box::new(connector)));
        return key;
    }

    /// Create a new connector, returning the key that can be used to retrieve
    /// and/or queue it. The caller must make sure that the constructed
    /// connector's code is initialized with the same ports as the ports in the
    /// `initial_ports` array.
    pub(crate) fn create_pdl(&self, created_by: &mut ScheduledConnector, connector: ConnectorPDL) -> ConnectorKey {
        let key = self.create_connector_raw(ConnectorVariant::UserDefined(connector));
        let new_connector = self.get_mut(&key);

        // Transferring ownership of ports (and crashing if there is a
        // programmer's mistake in port management)
        match &new_connector.connector {
            ConnectorVariant::UserDefined(connector) => {
                for port_id in &connector.ports.owned_ports {
                    let mut port = created_by.context.remove_port(*port_id);
                    new_connector.context.add_port(port);
                }
            },
            ConnectorVariant::Native(_) => unreachable!(),
        }

        return key;
    }

    pub(crate) fn destroy(&self, key: ConnectorKey) {
        let lock = self.inner.write().unwrap();

        unsafe {
            let connector = lock.connectors.get_mut(key.index as usize);
            ptr::drop_in_place(*connector);
            // Note: but not deallocating!
        }

        lock.free.push(key.index as usize);
    }

    /// Creates a connector but does not set its initial ports
    fn create_connector_raw(&self, connector: ConnectorVariant) -> ConnectorKey {
        // Creation of the connector in the global store, requires a lock
        let index;
        {
            let lock = self.inner.write().unwrap();
            let connector = ScheduledConnector {
                connector,
                context: ConnectorCtx::new(self.port_counter.clone()),
                public: ConnectorPublic::new(),
                router: Router::new(),
            };

            if lock.free.is_empty() {
                let connector = Box::into_raw(Box::new(connector));

                unsafe {
                    // Cheating a bit here. Anyway, move to heap, store in list
                    index = lock.connectors.len();
                    lock.connectors.push(connector);
                }
            } else {
                index = lock.free.pop().unwrap();

                unsafe {
                    let target = lock.connectors.get_mut(index);
                    debug_assert!(!target.is_null());
                    ptr::write(*target, connector);
                }
            }
        }

        // Generate key and retrieve the connector to set its ID
        let key = ConnectorKey{ index: index as u32 };
        let new_connector = self.get_mut(&key);
        new_connector.context.id = key.downcast();

        // Return the connector key
        return key;
    }
}

impl Drop for ConnectorStore {
    fn drop(&mut self) {
        let lock = self.inner.write().unwrap();

        for idx in 0..lock.connectors.len() {
            unsafe {
                let memory = *lock.connectors.get_mut(idx);
                let _ = Box::from_raw(memory); // takes care of deallocation
            }
        }
    }
}

/// Global store of connectors, ports and queues that are used by the sceduler
/// threads. The global store has the appearance of a thread-safe datatype, but
/// one needs to be careful using it.
///
/// TODO: @docs
/// TODO: @Optimize, very lazy implementation of concurrent datastructures.
///     This includes the `should_exit` and `did_exit` pair!
pub struct GlobalStore {
    pub connector_queue: MpmcQueue<ConnectorKey>,
    pub connectors: ConnectorStore,
    pub should_exit: AtomicBool,    // signal threads to exit
}

impl GlobalStore {
    pub fn new() -> Self {
        Self{
            connector_queue: MpmcQueue::with_capacity(256),
            connectors: ConnectorStore::with_capacity(256),
            should_exit: AtomicBool::new(false),
        }
    }
}