Files @ d06da4e9296c
Branch filter:

Location: CSY/reowolf/src/runtime2/runtime.rs

d06da4e9296c 8.8 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
mh
WIP: Reimplementing messaging and consensus
use std::sync::{Arc, Mutex, Condvar};
use std::sync::atomic::{AtomicU32, AtomicBool, Ordering};
use std::collections::VecDeque;

use crate::protocol::*;

use super::communication::Message;
use super::component::{CompCtx, CompPDL};
use super::store::{ComponentStore, QueueDynMpsc, QueueDynProducer};
use super::scheduler::Scheduler;

// -----------------------------------------------------------------------------
// Component
// -----------------------------------------------------------------------------

/// Key to a component. Type system somewhat ensures that there can only be one
/// of these. Only with a key one may retrieve privately-accessible memory for
/// a component. Practically just a generational index, like `CompId` is.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub(crate) struct CompKey(pub u32);

impl CompKey {
    pub(crate) fn downgrade(&self) -> CompId {
        return CompId(self.0);
    }
}

/// Generational ID of a component
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct CompId(pub u32);

impl CompId {
    pub(crate) fn new_invalid() -> CompId {
        return CompId(u32::MAX);
    }

    /// Upgrade component ID to component key. Unsafe because the caller needs
    /// to make sure that only one component key can exist at a time (to ensure
    /// a component can only be scheduled/executed by one thread).
    pub(crate) unsafe fn upgrade(&self) -> CompKey {
        return CompKey(self.0);
    }
}

/// Private fields of a component, may only be modified by a single thread at
/// a time.
pub(crate) struct RuntimeComp {
    pub public: CompPublic,
    pub code: CompPDL,
    pub ctx: CompCtx,
    pub inbox: QueueDynMpsc<Message>
}

/// Should contain everything that is accessible in a thread-safe manner
// TODO: Do something about the `num_handles` thing. This needs to be a bit more
//  "foolproof" to lighten the mental burden of using the `num_handles`
//  variable.
pub(crate) struct CompPublic {
    pub sleeping: AtomicBool,
    pub num_handles: AtomicU32, // manually modified (!)
    pub inbox: QueueDynProducer<Message>,
}

/// Handle to public part of a component. Would be nice if we could
/// automagically manage the `num_handles` counter. But when it reaches zero we
/// need to manually remove the handle from the runtime. So we just have debug
/// code to make sure this actually happens.
pub(crate) struct CompHandle {
    target: *const CompPublic,
    #[cfg(debug_assertions)] decremented: bool,
}

impl CompHandle {
    fn new(public: &CompPublic) -> CompHandle {
        let handle = CompHandle{
            target: public,
            #[cfg(debug_assertions)] decremented: false,
        };
        handle.increment_users();
        return handle;
    }

    fn increment_users(&self) {
        let old_count = self.num_handles.fetch_add(1, Ordering::AcqRel);
        debug_assert!(old_count > 0); // because we should never be able to retrieve a handle when the component is (being) destroyed
    }

    /// Returns true if the component should be destroyed
    pub(crate) fn decrement_users(&mut self) -> bool {
        debug_assert!(!self.decremented, "illegal to 'decrement_users' twice");
        dbg_code!(self.decremented = true);
        let old_count = self.num_handles.fetch_sub(1, Ordering::AcqRel);
        return old_count == 1;
    }
}

impl Clone for CompHandle {
    fn clone(&self) -> Self {
        debug_assert!(!self.decremented, "illegal to clone after 'decrement_users'");
        self.increment_users();
        return CompHandle{
            target: self.target,
            #[cfg(debug_assertions)] decremented: false,
        };
    }
}

impl std::ops::Deref for CompHandle {
    type Target = CompPublic;

    fn deref(&self) -> &Self::Target {
        return unsafe{ &*self.target };
    }
}

impl Drop for CompHandle {
    fn drop(&mut self) {
        debug_assert!(self.decremented, "need call to 'decrement_users' before dropping");
    }
}

// -----------------------------------------------------------------------------
// Runtime
// -----------------------------------------------------------------------------

pub struct Runtime {
    pub(crate) inner: Arc<RuntimeInner>,
    threads: Vec<std::thread::JoinHandle<()>>,
}

impl Runtime {
    pub fn new(num_threads: u32, protocol_description: ProtocolDescription) -> Runtime {
        assert!(num_threads > 0, "need a thread to perform work");
        let runtime_inner = Arc::new(RuntimeInner {
            protocol: protocol_description,
            components: ComponentStore::new(128),
            work_queue: Mutex::new(VecDeque::with_capacity(128)),
            work_condvar: Condvar::new(),
            active_elements: AtomicU32::new(1),
        });
        let mut runtime = Runtime {
            inner: runtime_inner,
            threads: Vec::with_capacity(num_threads as usize),
        };

        for thread_index in 0..num_threads {
            let mut scheduler = Scheduler::new(runtime.inner.clone(), thread_index);
            let thread_handle = std::thread::spawn(move || {
                scheduler.run();
            });

            runtime.threads.push(thread_handle);
        }

        return runtime;
    }
}

impl Drop for Runtime {
    fn drop(&mut self) {
        self.inner.decrement_active_components();
        for handle in self.threads.drain(..) {
            handle.join().expect("join scheduler thread");
        }
    }
}

/// Memory that is maintained by "the runtime". In practice it is maintained by
/// multiple schedulers, and this serves as the common interface to that memory.
pub(crate) struct RuntimeInner {
    pub protocol: ProtocolDescription,
    components: ComponentStore<RuntimeComp>,
    work_queue: Mutex<VecDeque<CompKey>>,
    work_condvar: Condvar,
    active_elements: AtomicU32, // active components and APIs (i.e. component creators)
}

impl RuntimeInner {
    // Scheduling and retrieving work

    pub(crate) fn take_work(&self) -> Option<CompKey> {
        let mut lock = self.work_queue.lock().unwrap();
        while lock.is_empty() && self.active_elements.load(Ordering::Acquire) != 0 {
            lock = self.work_condvar.wait(lock).unwrap();
        }

        // We have work, or the schedulers should exit.
        return lock.pop_front();
    }

    pub(crate) fn enqueue_work(&self, key: CompKey) {
        let mut lock = self.work_queue.lock().unwrap();
        lock.push_back(key);
        self.work_condvar.notify_one();
    }

    // Creating/destroying components

    /// Creates a new component. Note that the public part will be properly
    /// initialized, but the private fields (e.g. owned ports, peers, etc.)
    /// are not.
    pub(crate) fn create_pdl_component(&self, comp: CompPDL, initially_sleeping: bool) -> (CompKey, &mut RuntimeComp) {
        let inbox_queue = QueueDynMpsc::new(16);
        let inbox_producer = inbox_queue.producer();
        let comp = RuntimeComp{
            public: CompPublic{
                sleeping: AtomicBool::new(initially_sleeping),
                num_handles: AtomicU32::new(1), // the component itself acts like a handle
                inbox: inbox_producer,
            },
            code: comp,
            ctx: CompCtx::default(),
            inbox: inbox_queue,
        };

        let index = self.components.create(comp);

        // TODO: just do a reserve_index followed by a consume_index or something
        self.increment_active_components();
        let component = self.components.get_mut(index);
        component.ctx.id = CompId(index);

        return (CompKey(index), component);
    }

    pub(crate) fn get_component(&self, key: CompKey) -> &mut RuntimeComp {
        let component = self.components.get_mut(key.0);
        return component;
    }

    pub(crate) fn get_component_public(&self, id: CompId) -> CompHandle {
        let component = self.components.get(id.0);
        return CompHandle::new(&component.public);
    }

    pub(crate) fn destroy_component(&self, key: CompKey) {
        debug_assert_eq!(self.get_component(key).public.num_handles.load(Ordering::Acquire), 0);
        self.decrement_active_components();
        self.components.destroy(key.0);
    }

    // Tracking number of active interfaces and the active components

    #[inline]
    fn increment_active_components(&self) {
        let _old_val = self.active_elements.fetch_add(1, Ordering::AcqRel);
        debug_assert!(_old_val > 0); // can only create a component from a API/component, so can never be 0.
    }

    fn decrement_active_components(&self) {
        let old_val = self.active_elements.fetch_sub(1, Ordering::AcqRel);
        debug_assert!(old_val > 0); // something wrong with incr/decr logic
        let new_val = old_val - 1;
        if new_val == 0 {
            // Just to be sure, in case the last thing that gets destroyed is an
            // API instead of a thread.
            let lock = self.work_queue.lock();
            self.work_condvar.notify_all();
        }
    }
}