Files @ 38c129959044
Branch filter:

Location: CSY/reowolf/src/runtime2/runtime.rs

38c129959044 13.2 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
Max Henger
feat: transmitting ports
use std::sync::{Arc, Mutex, Condvar};
use std::sync::atomic::{AtomicU32, AtomicBool, Ordering};
use std::thread;
use std::collections::VecDeque;

use crate::protocol::*;
use crate::runtime2::poll::{PollingThread, PollingThreadHandle};
use crate::runtime2::RtError;

use super::communication::Message;
use super::component::{Component, wake_up_if_sleeping, CompPDL, CompCtx};
use super::store::{ComponentStore, ComponentReservation, QueueDynMpsc, QueueDynProducer};
use super::scheduler::*;

#[derive(PartialOrd, PartialEq, Copy, Clone)]
pub enum LogLevel {
    Debug, // all logging (includes messages)
    Info, // rough logging
    None, // no logging
}

// -----------------------------------------------------------------------------
// Component
// -----------------------------------------------------------------------------

/// Key to a component. Type system somewhat ensures that there can only be one
/// of these. Only with a key one may retrieve privately-accessible memory for
/// a component. Practically just a generational index, like `CompId` is.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub(crate) struct CompKey(pub u32);

impl CompKey {
    pub(crate) fn downgrade(&self) -> CompId {
        return CompId(self.0);
    }
}

/// Generational ID of a component.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct CompId(pub u32);

impl CompId {
    pub(crate) fn new_invalid() -> CompId {
        return CompId(u32::MAX);
    }

    /// Upgrade component ID to component key. Unsafe because the caller needs
    /// to make sure that only one component key can exist at a time (to ensure
    /// a component can only be scheduled/executed by one thread).
    pub(crate) unsafe fn upgrade(&self) -> CompKey {
        return CompKey(self.0);
    }
}

/// Handle to a component that is being created.
pub(crate) struct CompReserved {
    reservation: ComponentReservation,
}

impl CompReserved {
    pub(crate) fn id(&self) -> CompId {
        return CompId(self.reservation.index)
    }
}

/// Representation of a runtime component. Contains the bookkeeping variables
/// for the schedulers, the publicly accessible fields, and the private fields
/// that should only be accessed by the thread running the component's routine.
pub(crate) struct RuntimeComp {
    pub public: CompPublic,
    pub component: Box<dyn Component>,
    pub ctx: CompCtx,
    pub inbox: QueueDynMpsc<Message>,
    pub exiting: bool,
}

/// Should contain everything that is accessible in a thread-safe manner. May
/// NOT contain non-threadsafe fields.
// TODO: Do something about the `num_handles` thing. This needs to be a bit more
//  "foolproof" to lighten the mental burden of using the `num_handles`
//  variable.
pub(crate) struct CompPublic {
    pub sleeping: AtomicBool,
    pub num_handles: AtomicU32, // manually modified (!)
    inbox: QueueDynProducer<Message>,
}

/// Handle to public part of a component. Would be nice if we could
/// automagically manage the `num_handles` counter. But when it reaches zero we
/// need to manually remove the handle from the runtime. So we just have debug
/// code to make sure this actually happens.
pub(crate) struct CompHandle {
    target: *const CompPublic,
    id: CompId,
    #[cfg(debug_assertions)] decremented: bool,
}

impl CompHandle {
    /// Creates a new component handle and does not increment the reference
    /// counter.
    fn new_unincremented(id: CompId, public: &CompPublic) -> CompHandle {
        return CompHandle{
            target: public,
            id,
            #[cfg(debug_assertions)] decremented: false,
        };
    }

    /// Creates a new component handle and increments the reference counter.
    fn new(id: CompId, public: &CompPublic) -> CompHandle {
        let mut handle = Self::new_unincremented(id, public);
        handle.increment_users();
        return handle;
    }

    pub(crate) fn send_message(&self, runtime: &RuntimeInner, message: Message, try_wake_up: bool) {
        self.inbox.push(message);
        if try_wake_up {
            wake_up_if_sleeping(runtime, self.id, self);
        }
    }

    #[inline]
    pub(crate) fn send_message_logged(&self, sched_ctx: &SchedulerCtx, message: Message, try_wake_up: bool) {
        sched_ctx.debug(&format!("Sending message to comp:{} ... {:?}", self.id.0, message));
        self.send_message(&sched_ctx.runtime, message, try_wake_up);
    }

    pub(crate) fn id(&self) -> CompId {
        return self.id;
    }

    fn increment_users(&self) {
        let old_count = self.num_handles.fetch_add(1, Ordering::AcqRel);
        debug_assert!(old_count > 0); // because we should never be able to retrieve a handle when the component is (being) destroyed
    }

    /// Returns the `CompKey` to the component if it should be destroyed
    pub(crate) fn decrement_users(&mut self) -> Option<CompKey> {
        dbg_code!(assert!(!self.decremented, "illegal to 'decrement_users' twice"));
        let old_count = self.num_handles.fetch_sub(1, Ordering::AcqRel);
        let new_count = old_count - 1;
        dbg_code!(self.decremented = true);
        if new_count == 0 {
            return Some(unsafe{ self.id.upgrade() });
        }

        return None;
    }
}

impl Clone for CompHandle {
    fn clone(&self) -> Self {
        dbg_code!(assert!(!self.decremented, "illegal to clone after 'decrement_users'"));
        self.increment_users();
        return CompHandle{
            target: self.target,
            id: self.id,
            #[cfg(debug_assertions)] decremented: false,
        };
    }
}

impl std::ops::Deref for CompHandle {
    type Target = CompPublic;

    fn deref(&self) -> &Self::Target {
        dbg_code!(assert!(!self.decremented)); // cannot access if control is relinquished
        return unsafe{ &*self.target };
    }
}

impl Drop for CompHandle {
    fn drop(&mut self) {
        dbg_code!(assert!(self.decremented, "need call to 'decrement_users' before dropping"));
    }
}

// -----------------------------------------------------------------------------
// Runtime
// -----------------------------------------------------------------------------

pub struct Runtime {
    pub(crate) inner: Arc<RuntimeInner>,
    scheduler_threads: Vec<thread::JoinHandle<()>>,
    polling_handle: PollingThreadHandle,
}

impl Runtime {
    // TODO: debug_logging should be removed at some point
    pub fn new(num_threads: u32, log_level: LogLevel, protocol_description: ProtocolDescription) -> Result<Runtime, RtError> {
        if num_threads == 0 {
            return Err(rt_error!("need at least one thread to create the runtime"));
        }
        let runtime_inner = Arc::new(RuntimeInner {
            protocol: protocol_description,
            components: ComponentStore::new(128),
            work_queue: Mutex::new(VecDeque::with_capacity(128)),
            work_condvar: Condvar::new(),
            active_elements: AtomicU32::new(1),
        });
        let (polling_handle, polling_clients) = rt_error_try!(
            PollingThread::new(runtime_inner.clone(), log_level),
            "failed to build polling thread"
        );

        let mut scheduler_threads = Vec::with_capacity(num_threads as usize);

        for thread_index in 0..num_threads {
            let mut scheduler = Scheduler::new(
                runtime_inner.clone(), polling_clients.client(),
                thread_index, log_level
            );

            let thread_handle = thread::Builder::new()
                .name(format!("scheduler:{}", thread_index))
                .spawn(move || {
                    scheduler.run();
                })
                .map_err(|reason|
                    rt_error!(
                        "failed to spawn scheduler thread {}, because: {}",
                        thread_index, reason
                    )
                )?;

            scheduler_threads.push(thread_handle);
        }

        return Ok(Runtime{
            inner: runtime_inner,
            scheduler_threads,
            polling_handle,
        });
    }

    pub fn create_component(&self, module_name: &[u8], routine_name: &[u8]) -> Result<(), ComponentCreationError> {
        use crate::protocol::eval::ValueGroup;
        let prompt = self.inner.protocol.new_component(
            module_name, routine_name,
            ValueGroup::new_stack(Vec::new())
        )?;
        let reserved = self.inner.start_create_component();
        let ctx = CompCtx::new(&reserved);
        let component = Box::new(CompPDL::new(prompt, 0));
        let (key, _) = self.inner.finish_create_component(reserved, component, ctx, false);
        self.inner.enqueue_work(key);

        return Ok(())
    }
}

impl Drop for Runtime {
    fn drop(&mut self) {
        self.inner.decrement_active_components();
        for handle in self.scheduler_threads.drain(..) {
            handle.join().expect("join scheduler thread");
        }

        self.polling_handle.shutdown().expect("shutdown polling thread");
    }
}

/// Memory that is maintained by "the runtime". In practice it is maintained by
/// multiple schedulers, and this serves as the common interface to that memory.
pub(crate) struct RuntimeInner {
    pub protocol: ProtocolDescription,
    components: ComponentStore<RuntimeComp>,
    work_queue: Mutex<VecDeque<CompKey>>, // TODO: should be MPMC queue
    work_condvar: Condvar,
    active_elements: AtomicU32, // active components and APIs (i.e. component creators)
}

impl RuntimeInner {
    // Scheduling and retrieving work

    pub(crate) fn take_work(&self) -> Option<CompKey> {
        let mut lock = self.work_queue.lock().unwrap();
        while lock.is_empty() && self.active_elements.load(Ordering::Acquire) != 0 {
            lock = self.work_condvar.wait(lock).unwrap();
        }

        // We have work, or the schedulers should exit.
        return lock.pop_front();
    }

    pub(crate) fn enqueue_work(&self, key: CompKey) {
        let mut lock = self.work_queue.lock().unwrap();
        lock.push_back(key);
        self.work_condvar.notify_one();
    }

    // Creating/destroying components

    pub(crate) fn start_create_component(&self) -> CompReserved {
        self.increment_active_components();
        let reservation = self.components.reserve();
        return CompReserved{ reservation };
    }

    pub(crate) fn finish_create_component(
        &self, reserved: CompReserved,
        component: Box<dyn Component>, mut context: CompCtx, initially_sleeping: bool,
    ) -> (CompKey, &mut RuntimeComp) {
        // Construct runtime component
        let inbox_queue = QueueDynMpsc::new(16);
        let inbox_producer = inbox_queue.producer();

        let component_id = reserved.id();
        context.id = component_id;

        let mut component = RuntimeComp {
            public: CompPublic{
                sleeping: AtomicBool::new(initially_sleeping),
                num_handles: AtomicU32::new(1), // the component itself acts like a handle
                inbox: inbox_producer,
            },
            component,
            ctx: context,
            inbox: inbox_queue,
            exiting: false,
        };

        // Submit created component into storage.
        let index = self.components.submit(reserved.reservation, component);
        debug_assert_eq!(index, component_id.0);
        let component = self.components.get_mut(index);

        // Bit messy, but here we create the reference of a component to itself,
        // the `num_handles` being initialized to `1` above, and add it to the
        // component context.
        let self_handle = CompHandle::new_unincremented(component_id, &component.public);
        component.ctx.add_self_reference(self_handle);

        return (CompKey(index), component);
    }

    pub(crate) fn get_component(&self, key: CompKey) -> &mut RuntimeComp {
        let component = self.components.get_mut(key.0);
        return component;
    }

    pub(crate) fn get_component_public(&self, id: CompId) -> CompHandle {
        let component = self.components.get(id.0);
        return CompHandle::new(id, &component.public);
    }

    /// Will remove a component and its memory from the runtime. May only be
    /// called if the necessary conditions for destruction have been met.
    pub(crate) fn destroy_component(&self, key: CompKey) {
        dbg_code!({
            let component = self.get_component(key);
            debug_assert!(component.exiting);
            debug_assert_eq!(component.public.num_handles.load(Ordering::Acquire), 0);
        });

        self.decrement_active_components();
        self.components.destroy(key.0);
    }

    // Tracking number of active interfaces and the active components

    #[inline]
    fn increment_active_components(&self) {
        let _old_val = self.active_elements.fetch_add(1, Ordering::AcqRel);
        debug_assert!(_old_val > 0); // can only create a component from a API/component, so can never be 0.
    }

    fn decrement_active_components(&self) {
        let old_val = self.active_elements.fetch_sub(1, Ordering::AcqRel);
        debug_assert!(old_val > 0); // something wrong with incr/decr logic
        let new_val = old_val - 1;
        if new_val == 0 {
            // Just to be sure, in case the last thing that gets destroyed is an
            // API instead of a thread.
            let _lock = self.work_queue.lock();
            self.work_condvar.notify_all();
        }
    }
}