Files @ 2c1fa43903ac
Branch filter:

Location: CSY/reowolf/src/runtime2/runtime.rs

2c1fa43903ac 10.9 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
MH
Several unfinished attempts at introducing polling
use std::sync::{Arc, Mutex, Condvar};
use std::sync::atomic::{AtomicU32, AtomicBool, Ordering};
use std::collections::VecDeque;

use crate::protocol::*;

use super::communication::Message;
use super::component::{Component, wake_up_if_sleeping, CompPDL, CompCtx};
use super::store::{ComponentStore, ComponentReservation, QueueDynMpsc, QueueDynProducer};
use super::scheduler::*;

// -----------------------------------------------------------------------------
// Component
// -----------------------------------------------------------------------------

/// Key to a component. Type system somewhat ensures that there can only be one
/// of these. Only with a key one may retrieve privately-accessible memory for
/// a component. Practically just a generational index, like `CompId` is.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub(crate) struct CompKey(pub u32);

impl CompKey {
    pub(crate) fn downgrade(&self) -> CompId {
        return CompId(self.0);
    }
}

/// Generational ID of a component
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct CompId(pub u32);

impl CompId {
    pub(crate) fn new_invalid() -> CompId {
        return CompId(u32::MAX);
    }

    /// Upgrade component ID to component key. Unsafe because the caller needs
    /// to make sure that only one component key can exist at a time (to ensure
    /// a component can only be scheduled/executed by one thread).
    pub(crate) unsafe fn upgrade(&self) -> CompKey {
        return CompKey(self.0);
    }
}

/// Handle to a component that is being created.
pub(crate) struct CompReserved {
    reservation: ComponentReservation,
}

impl CompReserved {
    pub(crate) fn id(&self) -> CompId {
        return CompId(self.reservation.index)
    }
}

/// Representation of a runtime component. Contains the bookkeeping variables
/// for the schedulers, the publicly accessible fields, and the private fields
/// that should only be accessed by the thread running the component's routine.
pub(crate) struct RuntimeComp {
    pub public: CompPublic,
    pub component: Box<dyn Component>,
    pub ctx: CompCtx,
    pub inbox: QueueDynMpsc<Message>,
    pub exiting: bool,
}

/// Should contain everything that is accessible in a thread-safe manner
// TODO: Do something about the `num_handles` thing. This needs to be a bit more
//  "foolproof" to lighten the mental burden of using the `num_handles`
//  variable.
pub(crate) struct CompPublic {
    pub sleeping: AtomicBool,
    pub num_handles: AtomicU32, // manually modified (!)
    inbox: QueueDynProducer<Message>,
}

/// Handle to public part of a component. Would be nice if we could
/// automagically manage the `num_handles` counter. But when it reaches zero we
/// need to manually remove the handle from the runtime. So we just have debug
/// code to make sure this actually happens.
pub(crate) struct CompHandle {
    target: *const CompPublic,
    id: CompId, // TODO: @Remove after debugging
    #[cfg(debug_assertions)] decremented: bool,
}

impl CompHandle {
    fn new(id: CompId, public: &CompPublic) -> CompHandle {
        let handle = CompHandle{
            target: public,
            id,
            #[cfg(debug_assertions)] decremented: false,
        };
        handle.increment_users();
        return handle;
    }

    pub(crate) fn send_message(&self, sched_ctx: &SchedulerCtx, message: Message, try_wake_up: bool) {
        sched_ctx.log(&format!("Sending message to [c:{:03}, wakeup:{}]: {:?}", self.id.0, try_wake_up, message));
        self.inbox.push(message);
        if try_wake_up {
            wake_up_if_sleeping(sched_ctx, self.id, self);
        }
    }

    fn increment_users(&self) {
        let old_count = self.num_handles.fetch_add(1, Ordering::AcqRel);
        debug_assert!(old_count > 0); // because we should never be able to retrieve a handle when the component is (being) destroyed
    }

    /// Returns the `CompKey` to the component if it should be destroyed
    pub(crate) fn decrement_users(&mut self) -> Option<CompKey> {
        dbg_code!(assert!(!self.decremented, "illegal to 'decrement_users' twice"));
        let old_count = self.num_handles.fetch_sub(1, Ordering::AcqRel);
        let new_count = old_count - 1;
        dbg_code!(self.decremented = true);
        if new_count == 0 {
            return Some(unsafe{ self.id.upgrade() });
        }

        return None;
    }
}

impl Clone for CompHandle {
    fn clone(&self) -> Self {
        dbg_code!(assert!(!self.decremented, "illegal to clone after 'decrement_users'"));
        self.increment_users();
        return CompHandle{
            target: self.target,
            id: self.id,
            #[cfg(debug_assertions)] decremented: false,
        };
    }
}

impl std::ops::Deref for CompHandle {
    type Target = CompPublic;

    fn deref(&self) -> &Self::Target {
        dbg_code!(assert!(!self.decremented)); // cannot access if control is relinquished
        return unsafe{ &*self.target };
    }
}

impl Drop for CompHandle {
    fn drop(&mut self) {
        dbg_code!(assert!(self.decremented, "need call to 'decrement_users' before dropping"));
    }
}

// -----------------------------------------------------------------------------
// Runtime
// -----------------------------------------------------------------------------

pub struct Runtime {
    pub(crate) inner: Arc<RuntimeInner>,
    threads: Vec<std::thread::JoinHandle<()>>,
}

impl Runtime {
    // TODO: debug_logging should be removed at some point
    pub fn new(num_threads: u32, debug_logging: bool, protocol_description: ProtocolDescription) -> Runtime {
        assert!(num_threads > 0, "need a thread to perform work");
        let runtime_inner = Arc::new(RuntimeInner {
            protocol: protocol_description,
            components: ComponentStore::new(128),
            work_queue: Mutex::new(VecDeque::with_capacity(128)),
            work_condvar: Condvar::new(),
            active_elements: AtomicU32::new(1),
        });
        let mut runtime = Runtime {
            inner: runtime_inner,
            threads: Vec::with_capacity(num_threads as usize),
        };

        for thread_index in 0..num_threads {
            let mut scheduler = Scheduler::new(runtime.inner.clone(), thread_index, debug_logging);
            let thread_handle = std::thread::spawn(move || {
                scheduler.run();
            });

            runtime.threads.push(thread_handle);
        }

        return runtime;
    }

    pub fn create_component(&self, module_name: &[u8], routine_name: &[u8]) -> Result<(), ComponentCreationError> {
        use crate::protocol::eval::ValueGroup;
        let prompt = self.inner.protocol.new_component(
            module_name, routine_name,
            ValueGroup::new_stack(Vec::new())
        )?;
        let reserved = self.inner.start_create_pdl_component();
        let ctx = CompCtx::new(&reserved);
        let component = Box::new(CompPDL::new(prompt, 0));
        let (key, _) = self.inner.finish_create_pdl_component(reserved, component, ctx, false);
        self.inner.enqueue_work(key);

        return Ok(())
    }
}

impl Drop for Runtime {
    fn drop(&mut self) {
        self.inner.decrement_active_components();
        for handle in self.threads.drain(..) {
            handle.join().expect("join scheduler thread");
        }
    }
}

/// Memory that is maintained by "the runtime". In practice it is maintained by
/// multiple schedulers, and this serves as the common interface to that memory.
pub(crate) struct RuntimeInner {
    pub protocol: ProtocolDescription,
    components: ComponentStore<RuntimeComp>,
    work_queue: Mutex<VecDeque<CompKey>>,
    work_condvar: Condvar,
    active_elements: AtomicU32, // active components and APIs (i.e. component creators)
}

impl RuntimeInner {
    // Scheduling and retrieving work

    pub(crate) fn take_work(&self) -> Option<CompKey> {
        let mut lock = self.work_queue.lock().unwrap();
        while lock.is_empty() && self.active_elements.load(Ordering::Acquire) != 0 {
            lock = self.work_condvar.wait(lock).unwrap();
        }

        // We have work, or the schedulers should exit.
        return lock.pop_front();
    }

    pub(crate) fn enqueue_work(&self, key: CompKey) {
        let mut lock = self.work_queue.lock().unwrap();
        lock.push_back(key);
        self.work_condvar.notify_one();
    }

    // Creating/destroying components

    pub(crate) fn start_create_pdl_component(&self) -> CompReserved {
        self.increment_active_components();
        let reservation = self.components.reserve();
        return CompReserved{ reservation };
    }

    pub(crate) fn finish_create_pdl_component(
        &self, reserved: CompReserved,
        component: Box<dyn Component>, mut context: CompCtx, initially_sleeping: bool,
    ) -> (CompKey, &mut RuntimeComp) {
        let inbox_queue = QueueDynMpsc::new(16);
        let inbox_producer = inbox_queue.producer();

        let _id = reserved.id();
        context.id = reserved.id();
        let component = RuntimeComp {
            public: CompPublic{
                sleeping: AtomicBool::new(initially_sleeping),
                num_handles: AtomicU32::new(1), // the component itself acts like a handle
                inbox: inbox_producer,
            },
            component,
            ctx: context,
            inbox: inbox_queue,
            exiting: false,
        };

        let index = self.components.submit(reserved.reservation, component);
        debug_assert_eq!(index, _id.0);
        let component = self.components.get_mut(index);

        return (CompKey(index), component);
    }

    pub(crate) fn get_component(&self, key: CompKey) -> &mut RuntimeComp {
        let component = self.components.get_mut(key.0);
        return component;
    }

    pub(crate) fn get_component_public(&self, id: CompId) -> CompHandle {
        let component = self.components.get(id.0);
        return CompHandle::new(id, &component.public);
    }

    pub(crate) fn destroy_component(&self, key: CompKey) {
        dbg_code!({
            let component = self.get_component(key);
            debug_assert!(component.exiting);
            debug_assert_eq!(component.public.num_handles.load(Ordering::Acquire), 0);
        });
        self.decrement_active_components();
        self.components.destroy(key.0);
    }

    // Tracking number of active interfaces and the active components

    #[inline]
    fn increment_active_components(&self) {
        let _old_val = self.active_elements.fetch_add(1, Ordering::AcqRel);
        debug_assert!(_old_val > 0); // can only create a component from a API/component, so can never be 0.
    }

    fn decrement_active_components(&self) {
        let old_val = self.active_elements.fetch_sub(1, Ordering::AcqRel);
        debug_assert!(old_val > 0); // something wrong with incr/decr logic
        let new_val = old_val - 1;
        if new_val == 0 {
            // Just to be sure, in case the last thing that gets destroyed is an
            // API instead of a thread.
            let _lock = self.work_queue.lock();
            self.work_condvar.notify_all();
        }
    }
}