use crate::protocol::*; use crate::protocol::eval::{ PortId as EvalPortId, Prompt, ValueGroup, Value, EvalContinuation, EvalResult, EvalError }; use super::runtime::*; use super::scheduler::SchedulerCtx; use super::communication::*; pub enum CompScheduling { Immediate, Requeue, Sleep, Exit, } pub struct CompCtx { pub id: CompId, pub ports: Vec, pub peers: Vec, pub messages: Vec, // same size as "ports" } impl CompCtx { fn take_message(&mut self, port_id: PortId) -> Option { let old_value = &mut self.messages[port_id.0 as usize]; if old_value.values.is_empty() { return None; } // Replace value in array with an empty one let mut message = ValueGroup::new_stack(Vec::new()); std::mem::swap(old_value, &mut message); return Some(message); } fn find_peer(&self, port_id: PortId) -> &Peer { let port_info = &self.ports[port_id.0 as usize]; let peer_info = &self.peers[port_info.local_peer_index as usize]; return peer_info; } } pub enum ExecStmt { CreatedChannel((Value, Value)), PerformedPut, PerformedGet(ValueGroup), None, } impl ExecStmt { fn take(&mut self) -> ExecStmt { let mut value = ExecStmt::None; std::mem::swap(self, &mut value); return value; } fn is_none(&self) -> bool { match self { ExecStmt::None => return true, _ => return false, } } } pub struct ExecCtx { stmt: ExecStmt, } impl RunContext for ExecCtx { fn performed_put(&mut self, _port: EvalPortId) -> bool { match self.stmt.take() { ExecStmt::None => return false, ExecStmt::PerformedPut => return true, _ => unreachable!(), } } fn performed_get(&mut self, _port: EvalPortId) -> Option { match self.stmt.take() { ExecStmt::None => return None, ExecStmt::PerformedGet(value) => return Some(value), _ => unreachable!(), } } fn fires(&mut self, _port: EvalPortId) -> Option { todo!("remove fires") } fn performed_fork(&mut self) -> Option { todo!("remove fork") } fn created_channel(&mut self) -> Option<(Value, Value)> { match self.stmt.take() { ExecStmt::None => return None, ExecStmt::CreatedChannel(ports) => return Some(ports), _ => unreachable!(), } } } #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub(crate) enum Mode { NonSync, Sync, BlockedGet, BlockedPut, } pub(crate) struct CompPDL { pub mode: Mode, pub mode_port: PortId, // when blocked on a port pub mode_value: ValueGroup, // when blocked on a put pub prompt: Prompt, pub exec_ctx: ExecCtx, } impl CompPDL { pub(crate) fn new(initial_state: Prompt) -> Self { return Self{ mode: Mode::NonSync, mode_port: PortId::new_invalid(), mode_value: ValueGroup::default(), prompt: initial_state, exec_ctx: ExecCtx{ stmt: ExecStmt::None, } } } pub(crate) fn run(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { use EvalContinuation as EC; let run_result = self.execute_prompt(&sched_ctx)?; match run_result { EC::Stepping => unreachable!(), // execute_prompt runs until this is no longer returned EC::BranchInconsistent | EC::NewFork | EC::BlockFires(_) => todo!("remove these"), // Results that can be returned in sync mode EC::SyncBlockEnd => { debug_assert_eq!(self.mode, Mode::Sync); self.handle_sync_end(sched_ctx, comp_ctx); }, EC::BlockGet(port_id) => { debug_assert_eq!(self.mode, Mode::Sync); let port_id = transform_port_id(port_id); if let Some(message) = comp_ctx.take_message(port_id) { // We can immediately receive and continue debug_assert!(self.exec_ctx.stmt.is_none()); self.exec_ctx.stmt = ExecStmt::PerformedGet(message); return Ok(CompScheduling::Immediate); } else { // We need to wait self.mode = Mode::BlockedGet; self.mode_port = port_id; return Ok(CompScheduling::Sleep); } }, EC::Put(port_id, value) => { debug_assert_eq!(self.mode, Mode::Sync); let port_id = transform_port_id(port_id); let peer = comp_ctx.find_peer(port_id); }, // Results that can be returned outside of sync mode EC::ComponentTerminated => { debug_assert_eq!(self.mode, Mode::NonSync); }, EC::SyncBlockStart => { debug_assert_eq!(self.mode, Mode::NonSync); self.handle_sync_start(sched_ctx, comp_ctx); }, EC::NewComponent(definition_id, monomorph_idx, arguments) => { debug_assert_eq!(self.mode, Mode::NonSync); }, EC::NewChannel => { debug_assert_eq!(self.mode, Mode::NonSync); } } return Ok(CompScheduling::Sleep); } fn execute_prompt(&mut self, sched_ctx: &SchedulerCtx) -> EvalResult { let mut step_result = EvalContinuation::Stepping; while let EvalContinuation::Stepping = step_result { step_result = self.prompt.step( &sched_ctx.runtime.protocol.types, &sched_ctx.runtime.protocol.heap, &sched_ctx.runtime.protocol.modules, &mut self.exec_ctx, )?; } return Ok(step_result) } fn handle_sync_start(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { } fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { } } #[inline] fn transform_port_id(port_id: EvalPortId) -> PortId { return PortId(port_id.id); }