Files @ 0e1a76667937
Branch filter:

Location: CSY/reowolf/src/runtime2/runtime.rs

0e1a76667937 5.1 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
MH
Started work on speculationless runtime
use std::sync::{Arc, Mutex, Condvar};
use std::sync::atomic::{AtomicU32, AtomicBool, Ordering};
use std::collections::VecDeque;

use crate::protocol::*;

use super::component::{CompCtx, CompPDL};
use super::store::ComponentStore;

// -----------------------------------------------------------------------------
// Component
// -----------------------------------------------------------------------------

/// Key to a component. Type system somewhat ensures that there can only be one
/// of these. Only with a key one may retrieve privately-accessible memory for
/// a component. Practically just a generational index, like `CompId` is.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub(crate) struct CompKey(u32);

impl CompKey {
    pub(crate) fn downgrade(&self) -> CompId {
        return CompId(self.0);
    }
}

/// Generational ID of a component
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct CompId(u32);

impl CompId {
    pub(crate) fn new_invalid() -> CompId {
        return CompId(u32::MAX);
    }

    /// Upgrade component ID to component key. Unsafe because the caller needs
    /// to make sure that only one component key can exist at a time (to ensure
    /// a component can only be scheduled/executed by one thread).
    pub(crate) unsafe fn upgrade(&self) -> CompKey {
        return CompKey(self.0);
    }
}

/// In-runtime storage of a component
pub(crate) struct RuntimeComp {
    pub public: CompPublic,
    pub private: CompPrivate,
}

/// Should contain everything that is accessible in a thread-safe manner
pub(crate) struct CompPublic {
    pub sleeping: AtomicBool,
    pub num_handles: AtomicU32, // modified upon creating/dropping `CompHandle` instances
}

/// Handle to public part of a component.
pub(crate) struct CompHandle {
    target: *const CompPublic,
}

impl std::ops::Deref for CompHandle {
    type Target = CompPublic;

    fn deref(&self) -> &Self::Target {
        return unsafe{ &*self.target };
    }
}

/// May contain non thread-safe fields. Accessed only by the scheduler which
/// will temporarily "own" the component.
pub(crate) struct CompPrivate {
    pub code: CompPDL,
    pub ctx: CompCtx,
}

// -----------------------------------------------------------------------------
// Runtime
// -----------------------------------------------------------------------------

pub type RuntimeHandle = Arc<Runtime>;

/// Memory that is maintained by "the runtime". In practice it is maintained by
/// multiple schedulers, and this serves as the common interface to that memory.
pub struct Runtime {
    pub protocol: ProtocolDescription,
    components: ComponentStore<RuntimeComp>,
    work_queue: Mutex<VecDeque<CompKey>>,
    work_condvar: Condvar,
    active_elements: AtomicU32, // active components and APIs (i.e. component creators)
}

impl Runtime {
    pub fn new(num_threads: u32, protocol_description: ProtocolDescription) -> Runtime {
        assert!(num_threads > 0, "need a thread to perform work");
        return Runtime{
            protocol: protocol_description,
            components: ComponentStore::new(128),
            work_queue: Mutex::new(VecDeque::with_capacity(128)),
            work_condvar: Condvar::new(),
            active_elements: AtomicU32::new(0),
        };
    }

    // Scheduling and retrieving work

    pub(crate) fn take_work(&self) -> Option<CompKey> {
        let mut lock = self.work_queue.lock().unwrap();
        while lock.is_empty() && self.active_elements.load(Ordering::Acquire) != 0 {
            lock = self.work_condvar.wait(lock).unwrap();
        }

        return lock.pop_front();
    }

    pub(crate) fn enqueue_work(&self, key: CompKey) {
        let mut lock = self.work_queue.lock().unwrap();
        lock.push_back(key);
        self.work_condvar.notify_one();
    }

    // Creating/destroying components

    pub(crate) fn create_pdl_component(&self, comp: CompPDL, initially_sleeping: bool) -> CompKey {
        let comp = RuntimeComp{
            public: CompPublic{
                sleeping: AtomicBool::new(initially_sleeping),
                num_handles: AtomicU32::new(1), // the component itself acts like a handle
            },
            private: CompPrivate{
                code: comp,
                ctx: CompCtx{
                    id: CompId(0),
                    ports: Vec::new(),
                    peers: Vec::new(),
                    messages: Vec::new(),
                }
            }
        };

        let index = self.components.create(comp);

        // TODO: just do a reserve_index followed by a consume_index or something
        self.components.get_mut(index).private.ctx.id = CompId(index);

        return CompKey(index);
    }

    pub(crate) fn get_component(&self, key: CompKey) -> &mut RuntimeComp {
        let component = self.components.get_mut(key.0);
        return component;
    }

    pub(crate) fn get_component_public(&self, id: CompId) -> &CompPublic {
        let component = self.components.get(id.0);
        return &component.public;
    }

    pub(crate) fn destroy_component(&self, key: CompKey) {
        self.components.destroy(key.0);
    }

    // Interacting with components
}