diff --git a/src/runtime2/component.rs b/src/runtime2/component.rs index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..654c6ce2d4b6af1b9d4de3130ee304b4e0620985 100644 --- a/src/runtime2/component.rs +++ b/src/runtime2/component.rs @@ -0,0 +1,216 @@ +use crate::protocol::*; +use crate::protocol::eval::{ + PortId as EvalPortId, Prompt, + ValueGroup, Value, + EvalContinuation, EvalResult, EvalError +}; + +use super::runtime::*; +use super::scheduler::SchedulerCtx; +use super::communication::*; + +pub enum CompScheduling { + Immediate, + Requeue, + Sleep, + Exit, +} + +pub struct CompCtx { + pub id: CompId, + pub ports: Vec, + pub peers: Vec, + pub messages: Vec, // same size as "ports" +} + +impl CompCtx { + fn take_message(&mut self, port_id: PortId) -> Option { + let old_value = &mut self.messages[port_id.0 as usize]; + if old_value.values.is_empty() { + return None; + } + + // Replace value in array with an empty one + let mut message = ValueGroup::new_stack(Vec::new()); + std::mem::swap(old_value, &mut message); + return Some(message); + } + + fn find_peer(&self, port_id: PortId) -> &Peer { + let port_info = &self.ports[port_id.0 as usize]; + let peer_info = &self.peers[port_info.local_peer_index as usize]; + return peer_info; + } +} + +pub enum ExecStmt { + CreatedChannel((Value, Value)), + PerformedPut, + PerformedGet(ValueGroup), + None, +} + +impl ExecStmt { + fn take(&mut self) -> ExecStmt { + let mut value = ExecStmt::None; + std::mem::swap(self, &mut value); + return value; + } + + fn is_none(&self) -> bool { + match self { + ExecStmt::None => return true, + _ => return false, + } + } +} + +pub struct ExecCtx { + stmt: ExecStmt, +} + +impl RunContext for ExecCtx { + fn performed_put(&mut self, _port: EvalPortId) -> bool { + match self.stmt.take() { + ExecStmt::None => return false, + ExecStmt::PerformedPut => return true, + _ => unreachable!(), + } + } + + fn performed_get(&mut self, _port: EvalPortId) -> Option { + match self.stmt.take() { + ExecStmt::None => return None, + ExecStmt::PerformedGet(value) => return Some(value), + _ => unreachable!(), + } + } + + fn fires(&mut self, _port: EvalPortId) -> Option { + todo!("remove fires") + } + + fn performed_fork(&mut self) -> Option { + todo!("remove fork") + } + + fn created_channel(&mut self) -> Option<(Value, Value)> { + match self.stmt.take() { + ExecStmt::None => return None, + ExecStmt::CreatedChannel(ports) => return Some(ports), + _ => unreachable!(), + } + } +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub(crate) enum Mode { + NonSync, + Sync, + BlockedGet, + BlockedPut, +} + +pub(crate) struct CompPDL { + pub mode: Mode, + pub mode_port: PortId, // when blocked on a port + pub mode_value: ValueGroup, // when blocked on a put + pub prompt: Prompt, + pub exec_ctx: ExecCtx, +} + +impl CompPDL { + pub(crate) fn new(initial_state: Prompt) -> Self { + return Self{ + mode: Mode::NonSync, + mode_port: PortId::new_invalid(), + mode_value: ValueGroup::default(), + prompt: initial_state, + exec_ctx: ExecCtx{ + stmt: ExecStmt::None, + } + } + } + + pub(crate) fn run(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { + use EvalContinuation as EC; + + let run_result = self.execute_prompt(&sched_ctx)?; + + match run_result { + EC::Stepping => unreachable!(), // execute_prompt runs until this is no longer returned + EC::BranchInconsistent | EC::NewFork | EC::BlockFires(_) => todo!("remove these"), + // Results that can be returned in sync mode + EC::SyncBlockEnd => { + debug_assert_eq!(self.mode, Mode::Sync); + self.handle_sync_end(sched_ctx, comp_ctx); + }, + EC::BlockGet(port_id) => { + debug_assert_eq!(self.mode, Mode::Sync); + + let port_id = transform_port_id(port_id); + if let Some(message) = comp_ctx.take_message(port_id) { + // We can immediately receive and continue + debug_assert!(self.exec_ctx.stmt.is_none()); + self.exec_ctx.stmt = ExecStmt::PerformedGet(message); + return Ok(CompScheduling::Immediate); + } else { + // We need to wait + self.mode = Mode::BlockedGet; + self.mode_port = port_id; + return Ok(CompScheduling::Sleep); + } + }, + EC::Put(port_id, value) => { + debug_assert_eq!(self.mode, Mode::Sync); + + let port_id = transform_port_id(port_id); + let peer = comp_ctx.find_peer(port_id); + }, + // Results that can be returned outside of sync mode + EC::ComponentTerminated => { + debug_assert_eq!(self.mode, Mode::NonSync); + + }, + EC::SyncBlockStart => { + debug_assert_eq!(self.mode, Mode::NonSync); + self.handle_sync_start(sched_ctx, comp_ctx); + }, + EC::NewComponent(definition_id, monomorph_idx, arguments) => { + debug_assert_eq!(self.mode, Mode::NonSync); + + }, + EC::NewChannel => { + debug_assert_eq!(self.mode, Mode::NonSync); + + } + } + + return Ok(CompScheduling::Sleep); + } + + fn execute_prompt(&mut self, sched_ctx: &SchedulerCtx) -> EvalResult { + let mut step_result = EvalContinuation::Stepping; + while let EvalContinuation::Stepping = step_result { + step_result = self.prompt.step( + &sched_ctx.runtime.protocol.types, &sched_ctx.runtime.protocol.heap, + &sched_ctx.runtime.protocol.modules, &mut self.exec_ctx, + )?; + } + + return Ok(step_result) + } + + fn handle_sync_start(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { + + } + + fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { + + } +} + +#[inline] +fn transform_port_id(port_id: EvalPortId) -> PortId { + return PortId(port_id.id); +} \ No newline at end of file