Files @ 1f78496722d1
Branch filter:

Location: CSY/reowolf/src/runtime2/runtime.rs - annotation

1f78496722d1 12.4 KiB application/rls-services+xml Show Source Show as Raw Download as Raw
Max Henger
feat: runtime error handling
0e1a76667937
0e1a76667937
113e4349a706
0e1a76667937
ccd08a8d8365
ccd08a8d8365
113e4349a706
113e4349a706
ccd08a8d8365
9e771c9cf8d3
113e4349a706
bf4c0ee5ba65
6555f56a22a9
0e1a76667937
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
0e1a76667937
0de39654770f
ccd08a8d8365
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
ccd08a8d8365
ccd08a8d8365
113e4349a706
0e1a76667937
0de39654770f
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
bf4c0ee5ba65
bf4c0ee5ba65
bf4c0ee5ba65
bf4c0ee5ba65
bf4c0ee5ba65
bf4c0ee5ba65
bf4c0ee5ba65
bf4c0ee5ba65
bf4c0ee5ba65
bf4c0ee5ba65
bf4c0ee5ba65
113e4349a706
113e4349a706
113e4349a706
0e1a76667937
0e1a76667937
113e4349a706
9e771c9cf8d3
5415acc02756
5415acc02756
0e1a76667937
0e1a76667937
0e1a76667937
0de39654770f
0de39654770f
0de39654770f
0e1a76667937
0e1a76667937
9e771c9cf8d3
6555f56a22a9
0e1a76667937
ccd08a8d8365
9e771c9cf8d3
9e771c9cf8d3
968e958c3286
968e958c3286
0e1a76667937
0e1a76667937
113e4349a706
968e958c3286
0e1a76667937
0e1a76667937
9e771c9cf8d3
6555f56a22a9
968e958c3286
968e958c3286
6555f56a22a9
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
113e4349a706
6555f56a22a9
6555f56a22a9
113e4349a706
6555f56a22a9
6555f56a22a9
6555f56a22a9
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
113e4349a706
113e4349a706
113e4349a706
113e4349a706
968e958c3286
9e771c9cf8d3
9e771c9cf8d3
9e771c9cf8d3
9e771c9cf8d3
e7df1d2ae35f
e7df1d2ae35f
ab448a66dca3
9e771c9cf8d3
e7df1d2ae35f
5415acc02756
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
e7df1d2ae35f
9e771c9cf8d3
9e771c9cf8d3
9e771c9cf8d3
968e958c3286
968e958c3286
11fd959b348a
968e958c3286
968e958c3286
968e958c3286
6555f56a22a9
968e958c3286
968e958c3286
968e958c3286
968e958c3286
968e958c3286
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
11fd959b348a
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
968e958c3286
968e958c3286
11fd959b348a
968e958c3286
968e958c3286
968e958c3286
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
53d3950d9b6b
113e4349a706
113e4349a706
ccd08a8d8365
ccd08a8d8365
ccd08a8d8365
560ed3c4dc1d
1f78496722d1
113e4349a706
113e4349a706
113e4349a706
53d3950d9b6b
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
53d3950d9b6b
53d3950d9b6b
113e4349a706
1f78496722d1
113e4349a706
113e4349a706
113e4349a706
113e4349a706
53d3950d9b6b
53d3950d9b6b
113e4349a706
113e4349a706
1f78496722d1
113e4349a706
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
53d3950d9b6b
113e4349a706
53d3950d9b6b
53d3950d9b6b
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
53d3950d9b6b
560ed3c4dc1d
560ed3c4dc1d
560ed3c4dc1d
560ed3c4dc1d
560ed3c4dc1d
560ed3c4dc1d
560ed3c4dc1d
560ed3c4dc1d
560ed3c4dc1d
113e4349a706
113e4349a706
560ed3c4dc1d
560ed3c4dc1d
560ed3c4dc1d
560ed3c4dc1d
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
113e4349a706
53d3950d9b6b
53d3950d9b6b
113e4349a706
113e4349a706
ccd08a8d8365
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
113e4349a706
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
4341b8d6790f
53d3950d9b6b
0e1a76667937
4341b8d6790f
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
4341b8d6790f
4341b8d6790f
53d3950d9b6b
0e1a76667937
4341b8d6790f
4341b8d6790f
0e1a76667937
0e1a76667937
0e1a76667937
0e1a76667937
4341b8d6790f
4341b8d6790f
0e1a76667937
0e1a76667937
bf4c0ee5ba65
bf4c0ee5ba65
bf4c0ee5ba65
bf4c0ee5ba65
bf4c0ee5ba65
bf4c0ee5ba65
bf4c0ee5ba65
bf4c0ee5ba65
113e4349a706
bf4c0ee5ba65
bf4c0ee5ba65
bf4c0ee5ba65
bf4c0ee5ba65
bf4c0ee5ba65
bf4c0ee5ba65
bf4c0ee5ba65
bf4c0ee5ba65
bf4c0ee5ba65
bf4c0ee5ba65
bf4c0ee5ba65
bf4c0ee5ba65
113e4349a706
bf4c0ee5ba65
bf4c0ee5ba65
5415acc02756
bf4c0ee5ba65
bf4c0ee5ba65
bf4c0ee5ba65
bf4c0ee5ba65
bf4c0ee5ba65
bf4c0ee5ba65
bf4c0ee5ba65
bf4c0ee5ba65
bf4c0ee5ba65
0e1a76667937
0e1a76667937
0e1a76667937
4341b8d6790f
4341b8d6790f
9e771c9cf8d3
0e1a76667937
6555f56a22a9
4341b8d6790f
4341b8d6790f
113e4349a706
113e4349a706
0e1a76667937
5415acc02756
5415acc02756
5415acc02756
5415acc02756
5415acc02756
113e4349a706
53d3950d9b6b
0e1a76667937
4341b8d6790f
4341b8d6790f
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
5415acc02756
53d3950d9b6b
53d3950d9b6b
53d3950d9b6b
0e1a76667937
use std::sync::{Arc, Mutex, Condvar};
use std::sync::atomic::{AtomicU32, AtomicBool, Ordering};
use std::thread;
use std::collections::VecDeque;

use crate::protocol::*;
use crate::runtime2::poll::{PollingThread, PollingThreadHandle};
use crate::runtime2::RtError;

use super::communication::Message;
use super::component::{Component, wake_up_if_sleeping, CompPDL, CompCtx};
use super::store::{ComponentStore, ComponentReservation, QueueDynMpsc, QueueDynProducer};
use super::scheduler::*;

#[derive(PartialOrd, PartialEq, Copy, Clone)]
pub enum LogLevel {
    Debug, // all logging (includes messages)
    Info, // rough logging
    None, // no logging
}

// -----------------------------------------------------------------------------
// Component
// -----------------------------------------------------------------------------

/// Key to a component. Type system somewhat ensures that there can only be one
/// of these. Only with a key one may retrieve privately-accessible memory for
/// a component. Practically just a generational index, like `CompId` is.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub(crate) struct CompKey(pub u32);

impl CompKey {
    pub(crate) fn downgrade(&self) -> CompId {
        return CompId(self.0);
    }
}

/// Generational ID of a component.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct CompId(pub u32);

impl CompId {
    pub(crate) fn new_invalid() -> CompId {
        return CompId(u32::MAX);
    }

    /// Upgrade component ID to component key. Unsafe because the caller needs
    /// to make sure that only one component key can exist at a time (to ensure
    /// a component can only be scheduled/executed by one thread).
    pub(crate) unsafe fn upgrade(&self) -> CompKey {
        return CompKey(self.0);
    }
}

/// Handle to a component that is being created.
pub(crate) struct CompReserved {
    reservation: ComponentReservation,
}

impl CompReserved {
    pub(crate) fn id(&self) -> CompId {
        return CompId(self.reservation.index)
    }
}

/// Representation of a runtime component. Contains the bookkeeping variables
/// for the schedulers, the publicly accessible fields, and the private fields
/// that should only be accessed by the thread running the component's routine.
pub(crate) struct RuntimeComp {
    pub public: CompPublic,
    pub component: Box<dyn Component>,
    pub ctx: CompCtx,
    pub inbox: QueueDynMpsc<Message>,
    pub exiting: bool,
}

/// Should contain everything that is accessible in a thread-safe manner
// TODO: Do something about the `num_handles` thing. This needs to be a bit more
//  "foolproof" to lighten the mental burden of using the `num_handles`
//  variable.
pub(crate) struct CompPublic {
    pub sleeping: AtomicBool,
    pub num_handles: AtomicU32, // manually modified (!)
    inbox: QueueDynProducer<Message>,
}

/// Handle to public part of a component. Would be nice if we could
/// automagically manage the `num_handles` counter. But when it reaches zero we
/// need to manually remove the handle from the runtime. So we just have debug
/// code to make sure this actually happens.
pub(crate) struct CompHandle {
    target: *const CompPublic,
    id: CompId,
    #[cfg(debug_assertions)] decremented: bool,
}

impl CompHandle {
    fn new(id: CompId, public: &CompPublic) -> CompHandle {
        let handle = CompHandle{
            target: public,
            id,
            #[cfg(debug_assertions)] decremented: false,
        };
        handle.increment_users();
        return handle;
    }

    pub(crate) fn send_message(&self, runtime: &RuntimeInner, message: Message, try_wake_up: bool) {
        self.inbox.push(message);
        if try_wake_up {
            wake_up_if_sleeping(runtime, self.id, self);
        }
    }

    #[inline]
    pub(crate) fn send_message_logged(&self, sched_ctx: &SchedulerCtx, message: Message, try_wake_up: bool) {
        sched_ctx.debug(&format!("Sending message to comp:{} ... {:?}", self.id.0, message));
        self.send_message(&sched_ctx.runtime, message, try_wake_up);
    }

    pub(crate) fn id(&self) -> CompId {
        return self.id;
    }

    fn increment_users(&self) {
        let old_count = self.num_handles.fetch_add(1, Ordering::AcqRel);
        debug_assert!(old_count > 0); // because we should never be able to retrieve a handle when the component is (being) destroyed
    }

    /// Returns the `CompKey` to the component if it should be destroyed
    pub(crate) fn decrement_users(&mut self) -> Option<CompKey> {
        dbg_code!(assert!(!self.decremented, "illegal to 'decrement_users' twice"));
        let old_count = self.num_handles.fetch_sub(1, Ordering::AcqRel);
        let new_count = old_count - 1;
        dbg_code!(self.decremented = true);
        if new_count == 0 {
            return Some(unsafe{ self.id.upgrade() });
        }

        return None;
    }
}

impl Clone for CompHandle {
    fn clone(&self) -> Self {
        dbg_code!(assert!(!self.decremented, "illegal to clone after 'decrement_users'"));
        self.increment_users();
        return CompHandle{
            target: self.target,
            id: self.id,
            #[cfg(debug_assertions)] decremented: false,
        };
    }
}

impl std::ops::Deref for CompHandle {
    type Target = CompPublic;

    fn deref(&self) -> &Self::Target {
        dbg_code!(assert!(!self.decremented)); // cannot access if control is relinquished
        return unsafe{ &*self.target };
    }
}

impl Drop for CompHandle {
    fn drop(&mut self) {
        dbg_code!(assert!(self.decremented, "need call to 'decrement_users' before dropping"));
    }
}

// -----------------------------------------------------------------------------
// Runtime
// -----------------------------------------------------------------------------

pub struct Runtime {
    pub(crate) inner: Arc<RuntimeInner>,
    scheduler_threads: Vec<thread::JoinHandle<()>>,
    polling_handle: PollingThreadHandle,
}

impl Runtime {
    // TODO: debug_logging should be removed at some point
    pub fn new(num_threads: u32, log_level: LogLevel, protocol_description: ProtocolDescription) -> Result<Runtime, RtError> {
        if num_threads == 0 {
            return Err(rt_error!("need at least one thread to create the runtime"));
        }
        let runtime_inner = Arc::new(RuntimeInner {
            protocol: protocol_description,
            components: ComponentStore::new(128),
            work_queue: Mutex::new(VecDeque::with_capacity(128)),
            work_condvar: Condvar::new(),
            active_elements: AtomicU32::new(1),
        });
        let (polling_handle, polling_clients) = rt_error_try!(
            PollingThread::new(runtime_inner.clone(), log_level),
            "failed to build polling thread"
        );

        let mut scheduler_threads = Vec::with_capacity(num_threads as usize);

        for thread_index in 0..num_threads {
            let mut scheduler = Scheduler::new(
                runtime_inner.clone(), polling_clients.client(),
                thread_index, log_level
            );

            let thread_handle = thread::Builder::new()
                .name(format!("scheduler:{}", thread_index))
                .spawn(move || {
                    scheduler.run();
                })
                .map_err(|reason|
                    rt_error!(
                        "failed to spawn scheduler thread {}, because: {}",
                        thread_index, reason
                    )
                )?;

            scheduler_threads.push(thread_handle);
        }

        return Ok(Runtime{
            inner: runtime_inner,
            scheduler_threads,
            polling_handle,
        });
    }

    pub fn create_component(&self, module_name: &[u8], routine_name: &[u8]) -> Result<(), ComponentCreationError> {
        use crate::protocol::eval::ValueGroup;
        let prompt = self.inner.protocol.new_component(
            module_name, routine_name,
            ValueGroup::new_stack(Vec::new())
        )?;
        let reserved = self.inner.start_create_pdl_component();
        let ctx = CompCtx::new(&reserved);
        let component = Box::new(CompPDL::new(prompt, 0));
        let (key, _) = self.inner.finish_create_pdl_component(reserved, component, ctx, false);
        self.inner.enqueue_work(key);

        return Ok(())
    }
}

impl Drop for Runtime {
    fn drop(&mut self) {
        self.inner.decrement_active_components();
        for handle in self.scheduler_threads.drain(..) {
            handle.join().expect("join scheduler thread");
        }

        self.polling_handle.shutdown().expect("shutdown polling thread");
    }
}

/// Memory that is maintained by "the runtime". In practice it is maintained by
/// multiple schedulers, and this serves as the common interface to that memory.
pub(crate) struct RuntimeInner {
    pub protocol: ProtocolDescription,
    components: ComponentStore<RuntimeComp>,
    work_queue: Mutex<VecDeque<CompKey>>, // TODO: should be MPMC queue
    work_condvar: Condvar,
    active_elements: AtomicU32, // active components and APIs (i.e. component creators)
}

impl RuntimeInner {
    // Scheduling and retrieving work

    pub(crate) fn take_work(&self) -> Option<CompKey> {
        let mut lock = self.work_queue.lock().unwrap();
        while lock.is_empty() && self.active_elements.load(Ordering::Acquire) != 0 {
            lock = self.work_condvar.wait(lock).unwrap();
        }

        // We have work, or the schedulers should exit.
        return lock.pop_front();
    }

    pub(crate) fn enqueue_work(&self, key: CompKey) {
        let mut lock = self.work_queue.lock().unwrap();
        lock.push_back(key);
        self.work_condvar.notify_one();
    }

    // Creating/destroying components

    pub(crate) fn start_create_pdl_component(&self) -> CompReserved {
        self.increment_active_components();
        let reservation = self.components.reserve();
        return CompReserved{ reservation };
    }

    pub(crate) fn finish_create_pdl_component(
        &self, reserved: CompReserved,
        component: Box<dyn Component>, mut context: CompCtx, initially_sleeping: bool,
    ) -> (CompKey, &mut RuntimeComp) {
        let inbox_queue = QueueDynMpsc::new(16);
        let inbox_producer = inbox_queue.producer();

        let _id = reserved.id();
        context.id = reserved.id();
        let component = RuntimeComp {
            public: CompPublic{
                sleeping: AtomicBool::new(initially_sleeping),
                num_handles: AtomicU32::new(1), // the component itself acts like a handle
                inbox: inbox_producer,
            },
            component,
            ctx: context,
            inbox: inbox_queue,
            exiting: false,
        };

        let index = self.components.submit(reserved.reservation, component);
        debug_assert_eq!(index, _id.0);
        let component = self.components.get_mut(index);

        return (CompKey(index), component);
    }

    pub(crate) fn get_component(&self, key: CompKey) -> &mut RuntimeComp {
        let component = self.components.get_mut(key.0);
        return component;
    }

    pub(crate) fn get_component_public(&self, id: CompId) -> CompHandle {
        let component = self.components.get(id.0);
        return CompHandle::new(id, &component.public);
    }

    /// Will remove a component and its memory from the runtime. May only be
    /// called if the necessary conditions for destruction have been met.
    pub(crate) fn destroy_component(&self, key: CompKey) {
        dbg_code!({
            let component = self.get_component(key);
            debug_assert!(component.exiting);
            debug_assert_eq!(component.public.num_handles.load(Ordering::Acquire), 0);
        });

        self.decrement_active_components();
        self.components.destroy(key.0);
    }

    // Tracking number of active interfaces and the active components

    #[inline]
    fn increment_active_components(&self) {
        let _old_val = self.active_elements.fetch_add(1, Ordering::AcqRel);
        debug_assert!(_old_val > 0); // can only create a component from a API/component, so can never be 0.
    }

    fn decrement_active_components(&self) {
        let old_val = self.active_elements.fetch_sub(1, Ordering::AcqRel);
        debug_assert!(old_val > 0); // something wrong with incr/decr logic
        let new_val = old_val - 1;
        if new_val == 0 {
            // Just to be sure, in case the last thing that gets destroyed is an
            // API instead of a thread.
            let _lock = self.work_queue.lock();
            self.work_condvar.notify_all();
        }
    }
}