Files @ d06da4e9296c
Branch filter:

Location: CSY/reowolf/src/runtime2/runtime.rs - annotation

d06da4e9296c 8.8 KiB application/rls-services+xml Show Source Show as Raw Download as Raw
mh
WIP: Reimplementing messaging and consensus
0e1a76667937
0e1a76667937
0e1a76667937
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
9e771c9cf8d3
0e1a76667937
9e771c9cf8d3
53d3950d9b6b
0e1a76667937
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
0e1a76667937
0de39654770f
ccd08a8d8365
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
ccd08a8d8365
ccd08a8d8365
0e1a76667937
0e1a76667937
0de39654770f
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
9e771c9cf8d3
9e771c9cf8d3
0e1a76667937
0e1a76667937
9e771c9cf8d3
9e771c9cf8d3
9e771c9cf8d3
0e1a76667937
0e1a76667937
0e1a76667937
0de39654770f
0de39654770f
0de39654770f
0e1a76667937
0e1a76667937
9e771c9cf8d3
9e771c9cf8d3
0e1a76667937
ccd08a8d8365
9e771c9cf8d3
9e771c9cf8d3
968e958c3286
968e958c3286
0e1a76667937
0e1a76667937
968e958c3286
0e1a76667937
0e1a76667937
9e771c9cf8d3
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
9e771c9cf8d3
9e771c9cf8d3
9e771c9cf8d3
9e771c9cf8d3
9e771c9cf8d3
968e958c3286
968e958c3286
968e958c3286
9e771c9cf8d3
9e771c9cf8d3
9e771c9cf8d3
9e771c9cf8d3
9e771c9cf8d3
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
53d3950d9b6b
53d3950d9b6b
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
53d3950d9b6b
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
ccd08a8d8365
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
ccd08a8d8365
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
4341b8d6790f
53d3950d9b6b
0e1a76667937
4341b8d6790f
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
4341b8d6790f
4341b8d6790f
53d3950d9b6b
0e1a76667937
4341b8d6790f
4341b8d6790f
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
4341b8d6790f
4341b8d6790f
0e1a76667937
0e1a76667937
9e771c9cf8d3
9e771c9cf8d3
9e771c9cf8d3
9e771c9cf8d3
9e771c9cf8d3
9e771c9cf8d3
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
9e771c9cf8d3
0e1a76667937
9e771c9cf8d3
9e771c9cf8d3
9e771c9cf8d3
0e1a76667937
4341b8d6790f
0e1a76667937
4341b8d6790f
0e1a76667937
53d3950d9b6b
9e771c9cf8d3
9e771c9cf8d3
4341b8d6790f
9e771c9cf8d3
4341b8d6790f
4341b8d6790f
0e1a76667937
0e1a76667937
0e1a76667937
4341b8d6790f
4341b8d6790f
9e771c9cf8d3
0e1a76667937
968e958c3286
4341b8d6790f
4341b8d6790f
0e1a76667937
0de39654770f
53d3950d9b6b
0e1a76667937
4341b8d6790f
4341b8d6790f
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
0e1a76667937
use std::sync::{Arc, Mutex, Condvar};
use std::sync::atomic::{AtomicU32, AtomicBool, Ordering};
use std::collections::VecDeque;

use crate::protocol::*;

use super::communication::Message;
use super::component::{CompCtx, CompPDL};
use super::store::{ComponentStore, QueueDynMpsc, QueueDynProducer};
use super::scheduler::Scheduler;

// -----------------------------------------------------------------------------
// Component
// -----------------------------------------------------------------------------

/// Key to a component. Type system somewhat ensures that there can only be one
/// of these. Only with a key one may retrieve privately-accessible memory for
/// a component. Practically just a generational index, like `CompId` is.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub(crate) struct CompKey(pub u32);

impl CompKey {
    pub(crate) fn downgrade(&self) -> CompId {
        return CompId(self.0);
    }
}

/// Generational ID of a component
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct CompId(pub u32);

impl CompId {
    pub(crate) fn new_invalid() -> CompId {
        return CompId(u32::MAX);
    }

    /// Upgrade component ID to component key. Unsafe because the caller needs
    /// to make sure that only one component key can exist at a time (to ensure
    /// a component can only be scheduled/executed by one thread).
    pub(crate) unsafe fn upgrade(&self) -> CompKey {
        return CompKey(self.0);
    }
}

/// Private fields of a component, may only be modified by a single thread at
/// a time.
pub(crate) struct RuntimeComp {
    pub public: CompPublic,
    pub code: CompPDL,
    pub ctx: CompCtx,
    pub inbox: QueueDynMpsc<Message>
}

/// Should contain everything that is accessible in a thread-safe manner
// TODO: Do something about the `num_handles` thing. This needs to be a bit more
//  "foolproof" to lighten the mental burden of using the `num_handles`
//  variable.
pub(crate) struct CompPublic {
    pub sleeping: AtomicBool,
    pub num_handles: AtomicU32, // manually modified (!)
    pub inbox: QueueDynProducer<Message>,
}

/// Handle to public part of a component. Would be nice if we could
/// automagically manage the `num_handles` counter. But when it reaches zero we
/// need to manually remove the handle from the runtime. So we just have debug
/// code to make sure this actually happens.
pub(crate) struct CompHandle {
    target: *const CompPublic,
    #[cfg(debug_assertions)] decremented: bool,
}

impl CompHandle {
    fn new(public: &CompPublic) -> CompHandle {
        let handle = CompHandle{
            target: public,
            #[cfg(debug_assertions)] decremented: false,
        };
        handle.increment_users();
        return handle;
    }

    fn increment_users(&self) {
        let old_count = self.num_handles.fetch_add(1, Ordering::AcqRel);
        debug_assert!(old_count > 0); // because we should never be able to retrieve a handle when the component is (being) destroyed
    }

    /// Returns true if the component should be destroyed
    pub(crate) fn decrement_users(&mut self) -> bool {
        debug_assert!(!self.decremented, "illegal to 'decrement_users' twice");
        dbg_code!(self.decremented = true);
        let old_count = self.num_handles.fetch_sub(1, Ordering::AcqRel);
        return old_count == 1;
    }
}

impl Clone for CompHandle {
    fn clone(&self) -> Self {
        debug_assert!(!self.decremented, "illegal to clone after 'decrement_users'");
        self.increment_users();
        return CompHandle{
            target: self.target,
            #[cfg(debug_assertions)] decremented: false,
        };
    }
}

impl std::ops::Deref for CompHandle {
    type Target = CompPublic;

    fn deref(&self) -> &Self::Target {
        return unsafe{ &*self.target };
    }
}

impl Drop for CompHandle {
    fn drop(&mut self) {
        debug_assert!(self.decremented, "need call to 'decrement_users' before dropping");
    }
}

// -----------------------------------------------------------------------------
// Runtime
// -----------------------------------------------------------------------------

pub struct Runtime {
    pub(crate) inner: Arc<RuntimeInner>,
    threads: Vec<std::thread::JoinHandle<()>>,
}

impl Runtime {
    pub fn new(num_threads: u32, protocol_description: ProtocolDescription) -> Runtime {
        assert!(num_threads > 0, "need a thread to perform work");
        let runtime_inner = Arc::new(RuntimeInner {
            protocol: protocol_description,
            components: ComponentStore::new(128),
            work_queue: Mutex::new(VecDeque::with_capacity(128)),
            work_condvar: Condvar::new(),
            active_elements: AtomicU32::new(1),
        });
        let mut runtime = Runtime {
            inner: runtime_inner,
            threads: Vec::with_capacity(num_threads as usize),
        };

        for thread_index in 0..num_threads {
            let mut scheduler = Scheduler::new(runtime.inner.clone(), thread_index);
            let thread_handle = std::thread::spawn(move || {
                scheduler.run();
            });

            runtime.threads.push(thread_handle);
        }

        return runtime;
    }
}

impl Drop for Runtime {
    fn drop(&mut self) {
        self.inner.decrement_active_components();
        for handle in self.threads.drain(..) {
            handle.join().expect("join scheduler thread");
        }
    }
}

/// Memory that is maintained by "the runtime". In practice it is maintained by
/// multiple schedulers, and this serves as the common interface to that memory.
pub(crate) struct RuntimeInner {
    pub protocol: ProtocolDescription,
    components: ComponentStore<RuntimeComp>,
    work_queue: Mutex<VecDeque<CompKey>>,
    work_condvar: Condvar,
    active_elements: AtomicU32, // active components and APIs (i.e. component creators)
}

impl RuntimeInner {
    // Scheduling and retrieving work

    pub(crate) fn take_work(&self) -> Option<CompKey> {
        let mut lock = self.work_queue.lock().unwrap();
        while lock.is_empty() && self.active_elements.load(Ordering::Acquire) != 0 {
            lock = self.work_condvar.wait(lock).unwrap();
        }

        // We have work, or the schedulers should exit.
        return lock.pop_front();
    }

    pub(crate) fn enqueue_work(&self, key: CompKey) {
        let mut lock = self.work_queue.lock().unwrap();
        lock.push_back(key);
        self.work_condvar.notify_one();
    }

    // Creating/destroying components

    /// Creates a new component. Note that the public part will be properly
    /// initialized, but the private fields (e.g. owned ports, peers, etc.)
    /// are not.
    pub(crate) fn create_pdl_component(&self, comp: CompPDL, initially_sleeping: bool) -> (CompKey, &mut RuntimeComp) {
        let inbox_queue = QueueDynMpsc::new(16);
        let inbox_producer = inbox_queue.producer();
        let comp = RuntimeComp{
            public: CompPublic{
                sleeping: AtomicBool::new(initially_sleeping),
                num_handles: AtomicU32::new(1), // the component itself acts like a handle
                inbox: inbox_producer,
            },
            code: comp,
            ctx: CompCtx::default(),
            inbox: inbox_queue,
        };

        let index = self.components.create(comp);

        // TODO: just do a reserve_index followed by a consume_index or something
        self.increment_active_components();
        let component = self.components.get_mut(index);
        component.ctx.id = CompId(index);

        return (CompKey(index), component);
    }

    pub(crate) fn get_component(&self, key: CompKey) -> &mut RuntimeComp {
        let component = self.components.get_mut(key.0);
        return component;
    }

    pub(crate) fn get_component_public(&self, id: CompId) -> CompHandle {
        let component = self.components.get(id.0);
        return CompHandle::new(&component.public);
    }

    pub(crate) fn destroy_component(&self, key: CompKey) {
        debug_assert_eq!(self.get_component(key).public.num_handles.load(Ordering::Acquire), 0);
        self.decrement_active_components();
        self.components.destroy(key.0);
    }

    // Tracking number of active interfaces and the active components

    #[inline]
    fn increment_active_components(&self) {
        let _old_val = self.active_elements.fetch_add(1, Ordering::AcqRel);
        debug_assert!(_old_val > 0); // can only create a component from a API/component, so can never be 0.
    }

    fn decrement_active_components(&self) {
        let old_val = self.active_elements.fetch_sub(1, Ordering::AcqRel);
        debug_assert!(old_val > 0); // something wrong with incr/decr logic
        let new_val = old_val - 1;
        if new_val == 0 {
            // Just to be sure, in case the last thing that gets destroyed is an
            // API instead of a thread.
            let lock = self.work_queue.lock();
            self.work_condvar.notify_all();
        }
    }
}