From 0781cf1b7abf9f2e8ef09827953acb46473388b9 2022-01-18 12:49:49 From: MH Date: 2022-01-18 12:49:49 Subject: [PATCH] WIP: Adding debug logs, add sync test --- diff --git a/src/runtime2/communication.rs b/src/runtime2/communication.rs index b73cb11df5390e31ee5582c529b6f21a8494dba3..50bd8692519941c659dcfbaff9bb0e3287631a12 100644 --- a/src/runtime2/communication.rs +++ b/src/runtime2/communication.rs @@ -46,16 +46,19 @@ pub struct Channel { pub getter_id: PortId, } +#[derive(Debug)] pub struct DataMessage { pub data_header: MessageDataHeader, pub sync_header: MessageSyncHeader, pub content: ValueGroup, } +#[derive(Debug)] pub struct MessageSyncHeader { pub sync_round: u32, } +#[derive(Debug)] pub struct MessageDataHeader { pub expected_mapping: Vec<(PortId, u32)>, pub new_mapping: u32, @@ -63,6 +66,7 @@ pub struct MessageDataHeader { pub target_port: PortId, } +#[derive(Debug)] pub struct ControlMessage { pub(crate) id: ControlId, pub sender_comp_id: CompId, @@ -70,7 +74,7 @@ pub struct ControlMessage { pub content: ControlMessageContent, } -#[derive(Copy, Clone)] +#[derive(Copy, Clone, Debug)] pub enum ControlMessageContent { Ack, BlockPort(PortId), @@ -80,6 +84,7 @@ pub enum ControlMessageContent { PortPeerChangedUnblock(PortId, CompId), } +#[derive(Debug)] pub enum Message { Data(DataMessage), Control(ControlMessage), diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index 59f1112be07fc456b70f9df8d97d95748207f575..20955eba657a3074c44b00e754cca9207bac0063 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -232,6 +232,7 @@ impl CompPDL { } pub(crate) fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) { + sched_ctx.log(&format!("handling message: {:?}", message)); if let Some(new_target) = self.control.should_reroute(&message) { let target = sched_ctx.runtime.get_component_public(new_target); target.inbox.push(message); @@ -252,6 +253,7 @@ impl CompPDL { pub(crate) fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { use EvalContinuation as EC; + sched_ctx.log("Running component"); let run_result = self.execute_prompt(&sched_ctx)?; match run_result { @@ -284,11 +286,10 @@ impl CompPDL { let port_id = port_id_from_eval(port_id); let port_info = comp_ctx.get_port(port_id); if port_info.state == PortState::Blocked { - - } else { - + todo!("handle blocked port"); } self.send_message_and_wake_up(sched_ctx, comp_ctx, port_id, value); + self.exec_ctx.stmt = ExecStmt::PerformedPut; return Ok(CompScheduling::Immediate); }, // Results that can be returned outside of sync mode diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index 9286998dd0f41e3ff8e36d78657323061b4a1bd4..cdc329f9892abb8c3a290a7528604c5773899328 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -13,14 +13,22 @@ pub(crate) struct Scheduler { pub(crate) struct SchedulerCtx<'a> { pub runtime: &'a RuntimeInner, + pub id: u32, + pub comp: u32, } impl<'a> SchedulerCtx<'a> { - pub fn new(runtime: &'a RuntimeInner) -> Self { + pub fn new(runtime: &'a RuntimeInner, id: u32) -> Self { return Self { runtime, + id, + comp: 0, } } + + pub(crate) fn log(&self, text: &str) { + println!("[s:{:02}, c:{:03}] {}", self.id, self.comp, text); + } } impl Scheduler { @@ -31,7 +39,7 @@ impl Scheduler { } pub fn run(&mut self) { - let mut scheduler_ctx = SchedulerCtx::new(&*self.runtime); + let mut scheduler_ctx = SchedulerCtx::new(&*self.runtime, self.scheduler_id); 'run_loop: loop { // Wait until we have something to do (or need to quit) @@ -42,11 +50,15 @@ impl Scheduler { let comp_key = comp_key.unwrap(); let component = self.runtime.get_component(comp_key); + scheduler_ctx.comp = comp_key.0; // Run the component until it no longer indicates that it needs to // be re-executed immediately. let mut new_scheduling = CompScheduling::Immediate; while let CompScheduling::Immediate = new_scheduling { + while let Some(message) = component.inbox.pop() { + component.code.handle_message(&mut scheduler_ctx, &mut component.ctx, message); + } new_scheduling = component.code.run(&mut scheduler_ctx, &mut component.ctx).expect("TODO: Handle error"); } diff --git a/src/runtime2/tests/mod.rs b/src/runtime2/tests/mod.rs index e1e36e0ebfa199dc82afa8adafc2ba6afe54037c..49a2f41a5183022d5f967d62d9efa27006c6e2e1 100644 --- a/src/runtime2/tests/mod.rs +++ b/src/runtime2/tests/mod.rs @@ -8,15 +8,44 @@ fn test_component_creation() { let pd = ProtocolDescription::parse(b" primitive nothing_at_all() { s32 a = 5; - print(\"hello\"); auto b = 5 + a; } ").expect("compilation"); let rt = Runtime::new(1, pd); - let prompt = rt.inner.protocol.new_component(b"", b"nothing_at_all", ValueGroup::new_stack(Vec::new())) - .expect("component creation"); - let comp = CompPDL::new(prompt, 0); - let (key, _) = rt.inner.create_pdl_component(comp, true); + for i in 0..20 { + let prompt = rt.inner.protocol.new_component(b"", b"nothing_at_all", ValueGroup::new_stack(Vec::new())) + .expect("component creation"); + let comp = CompPDL::new(prompt, 0); + let (key, _) = rt.inner.create_pdl_component(comp, true); + rt.inner.enqueue_work(key); + } +} + +#[test] +fn test_component_communication() { + let pd = ProtocolDescription::parse(b" + primitive sender(out o) { + print(\"sender\"); + sync put(o, 1); + } + primitive receiver(in i) { + print(\"receiver\"); + sync get(i); + } + composite constructor() { + channel o -> i; + print(\"creating sender\"); + new sender(o); + print(\"creating receiver\"); + new receiver(i); + print(\"done\"); + } + ").expect("compilation"); + let rt = Runtime::new(1, pd); + + let prompt = rt.inner.protocol.new_component(b"", b"constructor", ValueGroup::new_stack(Vec::new())) + .expect("creation"); + let (key, _) = rt.inner.create_pdl_component(CompPDL::new(prompt, 0), true); rt.inner.enqueue_work(key); } \ No newline at end of file