Changeset - 53d3950d9b6b
[Not reviewed]
0 4 1
MH - 3 years ago 2022-01-17 16:17:11
contact@maxhenger.nl
WIP: Initial non-communicating component test
5 files changed with 98 insertions and 18 deletions:
0 comments (0 inline, 0 general)
src/protocol/parser/type_table.rs
Show inline comments
 
@@ -433,7 +433,7 @@ use std::cell::UnsafeCell;
 

	
 
/// Lookup table for monomorphs. Wrapped in a special struct because we don't
 
/// want to allocate for each lookup (what we really want is a HashMap that
 
/// exposes it CompareFn and HashFn, but whatevs).
 
/// exposes its CompareFn and HashFn, but whatevs).
 
pub(crate) struct MonomorphTable {
 
    lookup: HashMap<MonomorphKey, i32>, // indexes into `monomorphs`
 
    pub(crate) monomorphs: Vec<TypeMonomorph>,
 
@@ -450,6 +450,11 @@ pub(crate) struct MonomorphTable {
 
    key: UnsafeCell<MonomorphKey>,
 
}
 

	
 
// TODO: Clean this up: somehow prevent the `key`, but also do not allocate for
 
//  each "get_monomorph_index"
 
unsafe impl Send for MonomorphTable{}
 
unsafe impl Sync for MonomorphTable{}
 

	
 
impl MonomorphTable {
 
    fn new() -> Self {
 
        return Self {
src/runtime2/mod.rs
Show inline comments
 
@@ -2,4 +2,5 @@ mod store;
 
mod runtime;
 
mod component;
 
mod communication;
 
mod scheduler;
 
\ No newline at end of file
 
mod scheduler;
 
#[cfg(test)] mod tests;
 
\ No newline at end of file
src/runtime2/runtime.rs
Show inline comments
 
@@ -7,6 +7,7 @@ use crate::protocol::*;
 
use super::communication::Message;
 
use super::component::{CompCtx, CompPDL};
 
use super::store::{ComponentStore, QueueDynMpsc, QueueDynProducer};
 
use super::scheduler::Scheduler;
 

	
 
// -----------------------------------------------------------------------------
 
// Component
 
@@ -122,30 +123,59 @@ impl Drop for CompHandle {
 
// Runtime
 
// -----------------------------------------------------------------------------
 

	
 
pub type RuntimeHandle = Arc<Runtime>;
 

	
 
/// Memory that is maintained by "the runtime". In practice it is maintained by
 
/// multiple schedulers, and this serves as the common interface to that memory.
 
pub struct Runtime {
 
    pub protocol: ProtocolDescription,
 
    components: ComponentStore<RuntimeComp>,
 
    work_queue: Mutex<VecDeque<CompKey>>,
 
    work_condvar: Condvar,
 
    active_elements: AtomicU32, // active components and APIs (i.e. component creators)
 
    pub(crate) inner: Arc<RuntimeInner>,
 
    threads: Vec<std::thread::JoinHandle<()>>,
 
}
 

	
 
impl Runtime {
 
    pub fn new(num_threads: u32, protocol_description: ProtocolDescription) -> Runtime {
 
        assert!(num_threads > 0, "need a thread to perform work");
 
        return Runtime{
 
        let runtime_inner = Arc::new(RuntimeInner {
 
            protocol: protocol_description,
 
            components: ComponentStore::new(128),
 
            work_queue: Mutex::new(VecDeque::with_capacity(128)),
 
            work_condvar: Condvar::new(),
 
            active_elements: AtomicU32::new(0),
 
            active_elements: AtomicU32::new(1),
 
        });
 
        let mut runtime = Runtime {
 
            inner: runtime_inner,
 
            threads: Vec::with_capacity(num_threads as usize),
 
        };
 

	
 
        for thread_index in 0..num_threads {
 
            let mut scheduler = Scheduler::new(runtime.inner.clone(), thread_index);
 
            let thread_handle = std::thread::spawn(move || {
 
                scheduler.run();
 
            });
 

	
 
            runtime.threads.push(thread_handle);
 
        }
 

	
 
        return runtime;
 
    }
 
}
 

	
 
impl Drop for Runtime {
 
    fn drop(&mut self) {
 
        self.inner.decrement_active_components();
 
        for handle in self.threads.drain(..) {
 
            handle.join().expect("join scheduler thread");
 
        }
 
    }
 
}
 

	
 
/// Memory that is maintained by "the runtime". In practice it is maintained by
 
/// multiple schedulers, and this serves as the common interface to that memory.
 
pub(crate) struct RuntimeInner {
 
    pub protocol: ProtocolDescription,
 
    components: ComponentStore<RuntimeComp>,
 
    work_queue: Mutex<VecDeque<CompKey>>,
 
    work_condvar: Condvar,
 
    active_elements: AtomicU32, // active components and APIs (i.e. component creators)
 
}
 

	
 
impl RuntimeInner {
 
    // Scheduling and retrieving work
 

	
 
    pub(crate) fn take_work(&self) -> Option<CompKey> {
 
@@ -154,6 +184,7 @@ impl Runtime {
 
            lock = self.work_condvar.wait(lock).unwrap();
 
        }
 

	
 
        // We have work, or the schedulers should exit.
 
        return lock.pop_front();
 
    }
 

	
 
@@ -185,6 +216,7 @@ impl Runtime {
 
        let index = self.components.create(comp);
 

	
 
        // TODO: just do a reserve_index followed by a consume_index or something
 
        self.increment_active_components();
 
        let component = self.components.get_mut(index);
 
        component.ctx.id = CompId(index);
 

	
 
@@ -203,8 +235,27 @@ impl Runtime {
 

	
 
    pub(crate) fn destroy_component(&self, key: CompKey) {
 
        debug_assert_eq!(self.get_component(key).public.num_handles.load(Ordering::Acquire), 0);
 
        self.decrement_active_components();
 
        self.components.destroy(key.0);
 
    }
 

	
 
    // Interacting with components
 
    // Tracking number of active interfaces and the active components
 

	
 
    #[inline]
 
    fn increment_active_components(&self) {
 
        let _old_val = self.active_elements.fetch_add(1, Ordering::AcqRel);
 
        debug_assert!(_old_val > 0); // can only create a component from a API/component, so can never be 0.
 
    }
 

	
 
    fn decrement_active_components(&self) {
 
        let old_val = self.active_elements.fetch_sub(1, Ordering::AcqRel);
 
        debug_assert!(old_val > 0); // something wrong with incr/decr logic
 
        let new_val = old_val - 1;
 
        if new_val == 0 {
 
            // Just to be sure, in case the last thing that gets destroyed is an
 
            // API instead of a thread.
 
            let lock = self.work_queue.lock();
 
            self.work_condvar.notify_all();
 
        }
 
    }
 
}
src/runtime2/scheduler.rs
Show inline comments
 
use std::sync::Arc;
 
use std::sync::atomic::Ordering;
 

	
 
use super::component::*;
 
@@ -6,16 +7,16 @@ use super::communication::*;
 

	
 
/// Data associated with a scheduler thread
 
pub(crate) struct Scheduler {
 
    runtime: RuntimeHandle,
 
    runtime: Arc<RuntimeInner>,
 
    scheduler_id: u32,
 
}
 

	
 
pub(crate) struct SchedulerCtx<'a> {
 
    pub runtime: &'a Runtime,
 
    pub runtime: &'a RuntimeInner,
 
}
 

	
 
impl<'a> SchedulerCtx<'a> {
 
    pub fn new(runtime: &'a Runtime) -> Self {
 
    pub fn new(runtime: &'a RuntimeInner) -> Self {
 
        return Self {
 
            runtime,
 
        }
 
@@ -25,7 +26,7 @@ impl<'a> SchedulerCtx<'a> {
 
impl Scheduler {
 
    // public interface to thread
 

	
 
    pub fn new(runtime: RuntimeHandle, scheduler_id: u32) -> Self {
 
    pub fn new(runtime: Arc<RuntimeInner>, scheduler_id: u32) -> Self {
 
        return Scheduler{ runtime, scheduler_id }
 
    }
 

	
src/runtime2/tests/mod.rs
Show inline comments
 
new file 100644
 
use crate::protocol::*;
 
use crate::protocol::eval::*;
 
use crate::runtime2::runtime::*;
 
use crate::runtime2::component::CompPDL;
 

	
 
#[test]
 
fn test_component_creation() {
 
    let pd = ProtocolDescription::parse(b"
 
    primitive nothing_at_all() {
 
        s32 a = 5;
 
        print(\"hello\");
 
        auto b = 5 + a;
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(1, pd);
 

	
 
    let prompt = rt.inner.protocol.new_component(b"", b"nothing_at_all", ValueGroup::new_stack(Vec::new()))
 
        .expect("component creation");
 
    let comp = CompPDL::new(prompt, 0);
 
    let (key, _) = rt.inner.create_pdl_component(comp, true);
 
    rt.inner.enqueue_work(key);
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)