Changeset - 0781cf1b7abf
[Not reviewed]
0 4 0
MH - 3 years ago 2022-01-18 12:49:49
contact@maxhenger.nl
WIP: Adding debug logs, add sync test
4 files changed with 54 insertions and 7 deletions:
0 comments (0 inline, 0 general)
src/runtime2/communication.rs
Show inline comments
 
@@ -25,75 +25,80 @@ pub enum PortKind {
 
    Putter,
 
    Getter,
 
}
 

	
 
#[derive(Debug, PartialEq, Eq)]
 
pub enum PortState {
 
    Open,
 
    Blocked,
 
    Closed,
 
}
 

	
 
pub struct Port {
 
    pub self_id: PortId,
 
    pub peer_id: PortId,
 
    pub kind: PortKind,
 
    pub state: PortState,
 
    pub peer_comp_id: CompId,
 
}
 

	
 
pub struct Channel {
 
    pub putter_id: PortId,
 
    pub getter_id: PortId,
 
}
 

	
 
#[derive(Debug)]
 
pub struct DataMessage {
 
    pub data_header: MessageDataHeader,
 
    pub sync_header: MessageSyncHeader,
 
    pub content: ValueGroup,
 
}
 

	
 
#[derive(Debug)]
 
pub struct MessageSyncHeader {
 
    pub sync_round: u32,
 
}
 

	
 
#[derive(Debug)]
 
pub struct MessageDataHeader {
 
    pub expected_mapping: Vec<(PortId, u32)>,
 
    pub new_mapping: u32,
 
    pub source_port: PortId,
 
    pub target_port: PortId,
 
}
 

	
 
#[derive(Debug)]
 
pub struct ControlMessage {
 
    pub(crate) id: ControlId,
 
    pub sender_comp_id: CompId,
 
    pub target_port_id: Option<PortId>,
 
    pub content: ControlMessageContent,
 
}
 

	
 
#[derive(Copy, Clone)]
 
#[derive(Copy, Clone, Debug)]
 
pub enum ControlMessageContent {
 
    Ack,
 
    BlockPort(PortId),
 
    UnblockPort(PortId),
 
    ClosePort(PortId),
 
    PortPeerChangedBlock(PortId),
 
    PortPeerChangedUnblock(PortId, CompId),
 
}
 

	
 
#[derive(Debug)]
 
pub enum Message {
 
    Data(DataMessage),
 
    Control(ControlMessage),
 
}
 

	
 
impl Message {
 
    pub(crate) fn target_port(&self) -> Option<PortId> {
 
        match self {
 
            Message::Data(v) =>
 
                return Some(v.data_header.target_port),
 
            Message::Control(v) =>
 
                return v.target_port_id,
 
        }
 
    }
 
}
 

	
 

	
src/runtime2/component/component_pdl.rs
Show inline comments
 
@@ -211,105 +211,106 @@ impl CompPDL {
 
    pub(crate) fn new(initial_state: Prompt, num_ports: usize) -> Self {
 
        let mut inbox_main = Vec::new();
 
        inbox_main.reserve(num_ports);
 
        for _ in 0..num_ports {
 
            inbox_main.push(None);
 
        }
 

	
 
        return Self{
 
            mode: Mode::NonSync,
 
            mode_port: PortId::new_invalid(),
 
            mode_value: ValueGroup::default(),
 
            prompt: initial_state,
 
            control: ControlLayer::default(),
 
            consensus: Consensus::new(),
 
            sync_counter: 0,
 
            exec_ctx: ExecCtx{
 
                stmt: ExecStmt::None,
 
            },
 
            inbox_main,
 
            inbox_backup: Vec::new(),
 
        }
 
    }
 

	
 
    pub(crate) fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) {
 
        sched_ctx.log(&format!("handling message: {:?}", message));
 
        if let Some(new_target) = self.control.should_reroute(&message) {
 
            let target = sched_ctx.runtime.get_component_public(new_target);
 
            target.inbox.push(message);
 

	
 
            return;
 
        }
 

	
 
        match message {
 
            Message::Data(message) => {
 
                self.handle_incoming_data_message(sched_ctx, comp_ctx, message);
 
            },
 
            Message::Control(message) => {
 
                self.handle_incoming_control_message(sched_ctx, comp_ctx, message);
 
            },
 
        }
 
    }
 

	
 
    pub(crate) fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result<CompScheduling, EvalError> {
 
        use EvalContinuation as EC;
 

	
 
        sched_ctx.log("Running component");
 
        let run_result = self.execute_prompt(&sched_ctx)?;
 

	
 
        match run_result {
 
            EC::Stepping => unreachable!(), // execute_prompt runs until this is no longer returned
 
            EC::BranchInconsistent | EC::NewFork | EC::BlockFires(_) => todo!("remove these"),
 
            // Results that can be returned in sync mode
 
            EC::SyncBlockEnd => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 
                self.handle_sync_end(sched_ctx, comp_ctx);
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            EC::BlockGet(port_id) => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 

	
 
                let port_id = port_id_from_eval(port_id);
 
                if let Some(message) = comp_ctx.take_message(port_id) {
 
                    // We can immediately receive and continue
 
                    debug_assert!(self.exec_ctx.stmt.is_none());
 
                    self.exec_ctx.stmt = ExecStmt::PerformedGet(message);
 
                    return Ok(CompScheduling::Immediate);
 
                } else {
 
                    // We need to wait
 
                    self.mode = Mode::BlockedGet;
 
                    self.mode_port = port_id;
 
                    return Ok(CompScheduling::Sleep);
 
                }
 
            },
 
            EC::Put(port_id, value) => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 
                let port_id = port_id_from_eval(port_id);
 
                let port_info = comp_ctx.get_port(port_id);
 
                if port_info.state == PortState::Blocked {
 

	
 
                } else {
 

	
 
                    todo!("handle blocked port");
 
                }
 
                self.send_message_and_wake_up(sched_ctx, comp_ctx, port_id, value);
 
                self.exec_ctx.stmt = ExecStmt::PerformedPut;
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            // Results that can be returned outside of sync mode
 
            EC::ComponentTerminated => {
 
                debug_assert_eq!(self.mode, Mode::NonSync);
 
                return Ok(CompScheduling::Exit);
 
            },
 
            EC::SyncBlockStart => {
 
                debug_assert_eq!(self.mode, Mode::NonSync);
 
                self.handle_sync_start(sched_ctx, comp_ctx);
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            EC::NewComponent(definition_id, monomorph_idx, arguments) => {
 
                debug_assert_eq!(self.mode, Mode::NonSync);
 

	
 
                let mut ports = Vec::new(); // TODO: Optimize
 
                let protocol = &sched_ctx.runtime.protocol;
 
                find_ports_in_value_group(&arguments, &mut ports);
 
                let prompt = Prompt::new(
 
                    &protocol.types, &protocol.heap,
 
                    definition_id, monomorph_idx, arguments
 
                );
 
                self.create_component_and_transfer_ports(sched_ctx, comp_ctx, prompt, &ports);
 
                return Ok(CompScheduling::Requeue);
src/runtime2/scheduler.rs
Show inline comments
 
use std::sync::Arc;
 
use std::sync::atomic::Ordering;
 

	
 
use super::component::*;
 
use super::runtime::*;
 
use super::communication::*;
 

	
 
/// Data associated with a scheduler thread
 
pub(crate) struct Scheduler {
 
    runtime: Arc<RuntimeInner>,
 
    scheduler_id: u32,
 
}
 

	
 
pub(crate) struct SchedulerCtx<'a> {
 
    pub runtime: &'a RuntimeInner,
 
    pub id: u32,
 
    pub comp: u32,
 
}
 

	
 
impl<'a> SchedulerCtx<'a> {
 
    pub fn new(runtime: &'a RuntimeInner) -> Self {
 
    pub fn new(runtime: &'a RuntimeInner, id: u32) -> Self {
 
        return Self {
 
            runtime,
 
            id,
 
            comp: 0,
 
        }
 
    }
 

	
 
    pub(crate) fn log(&self, text: &str) {
 
        println!("[s:{:02}, c:{:03}] {}", self.id, self.comp, text);
 
    }
 
}
 

	
 
impl Scheduler {
 
    // public interface to thread
 

	
 
    pub fn new(runtime: Arc<RuntimeInner>, scheduler_id: u32) -> Self {
 
        return Scheduler{ runtime, scheduler_id }
 
    }
 

	
 
    pub fn run(&mut self) {
 
        let mut scheduler_ctx = SchedulerCtx::new(&*self.runtime);
 
        let mut scheduler_ctx = SchedulerCtx::new(&*self.runtime, self.scheduler_id);
 

	
 
        'run_loop: loop {
 
            // Wait until we have something to do (or need to quit)
 
            let comp_key = self.runtime.take_work();
 
            if comp_key.is_none() {
 
                break 'run_loop;
 
            }
 

	
 
            let comp_key = comp_key.unwrap();
 
            let component = self.runtime.get_component(comp_key);
 
            scheduler_ctx.comp = comp_key.0;
 

	
 
            // Run the component until it no longer indicates that it needs to
 
            // be re-executed immediately.
 
            let mut new_scheduling = CompScheduling::Immediate;
 
            while let CompScheduling::Immediate = new_scheduling {
 
                while let Some(message) = component.inbox.pop() {
 
                    component.code.handle_message(&mut scheduler_ctx, &mut component.ctx, message);
 
                }
 
                new_scheduling = component.code.run(&mut scheduler_ctx, &mut component.ctx).expect("TODO: Handle error");
 
            }
 

	
 
            // Handle the new scheduling
 
            match new_scheduling {
 
                CompScheduling::Immediate => unreachable!(),
 
                CompScheduling::Requeue => { self.runtime.enqueue_work(comp_key); },
 
                CompScheduling::Sleep => { self.mark_component_as_sleeping(comp_key, component); },
 
                CompScheduling::Exit => { self.mark_component_as_exiting(&scheduler_ctx, component); }
 
            }
 
        }
 
    }
 

	
 
    // local utilities
 

	
 
    fn mark_component_as_sleeping(&self, key: CompKey, component: &mut RuntimeComp) {
 
        debug_assert_eq!(key.downgrade(), component.ctx.id); // make sure component matches key
 
        debug_assert_eq!(component.public.sleeping.load(Ordering::Acquire), false); // we're executing it, so it cannot be sleeping
 

	
 
        component.public.sleeping.store(true, Ordering::Release);
 
        if component.inbox.can_pop() {
 
            let should_reschedule = component.public.sleeping
 
                .compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed)
 
                .is_ok();
src/runtime2/tests/mod.rs
Show inline comments
 
use crate::protocol::*;
 
use crate::protocol::eval::*;
 
use crate::runtime2::runtime::*;
 
use crate::runtime2::component::CompPDL;
 

	
 
#[test]
 
fn test_component_creation() {
 
    let pd = ProtocolDescription::parse(b"
 
    primitive nothing_at_all() {
 
        s32 a = 5;
 
        print(\"hello\");
 
        auto b = 5 + a;
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(1, pd);
 

	
 
    for i in 0..20 {
 
        let prompt = rt.inner.protocol.new_component(b"", b"nothing_at_all", ValueGroup::new_stack(Vec::new()))
 
            .expect("component creation");
 
        let comp = CompPDL::new(prompt, 0);
 
        let (key, _) = rt.inner.create_pdl_component(comp, true);
 
        rt.inner.enqueue_work(key);
 
    }
 
}
 

	
 
#[test]
 
fn test_component_communication() {
 
    let pd = ProtocolDescription::parse(b"
 
    primitive sender(out<u32> o) {
 
        print(\"sender\");
 
        sync put(o, 1);
 
    }
 
    primitive receiver(in<u32> i) {
 
        print(\"receiver\");
 
        sync get(i);
 
    }
 
    composite constructor() {
 
        channel o -> i;
 
        print(\"creating sender\");
 
        new sender(o);
 
        print(\"creating receiver\");
 
        new receiver(i);
 
        print(\"done\");
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(1, pd);
 

	
 
    let prompt = rt.inner.protocol.new_component(b"", b"constructor", ValueGroup::new_stack(Vec::new()))
 
        .expect("creation");
 
    let (key, _) = rt.inner.create_pdl_component(CompPDL::new(prompt, 0), true);
 
    rt.inner.enqueue_work(key);
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)