diff --git a/src/runtime2/connector2.rs b/src/runtime2/connector2.rs new file mode 100644 index 0000000000000000000000000000000000000000..63bf93d48624863bd605bf203596f14d3dab5e74 --- /dev/null +++ b/src/runtime2/connector2.rs @@ -0,0 +1,223 @@ +use std::sync::atomic::AtomicBool; +use crate::common::ComponentState; +use crate::PortId; +use crate::protocol::eval::{Value, ValueGroup}; +use crate::protocol::{RunContext, RunResult}; +use crate::runtime2::branch::{Branch, BranchId, ExecTree, QueueKind, SpeculativeState}; +use crate::runtime2::connector::ConnectorScheduling; +use crate::runtime2::consensus::{Consensus, Consistency}; +use crate::runtime2::inbox2::{DataMessageFancy, MessageFancy, SyncMessageFancy}; +use crate::runtime2::inbox::PublicInbox; +use crate::runtime2::native::Connector; +use crate::runtime2::port::PortIdLocal; +use crate::runtime2::scheduler::{ComponentCtxFancy, SchedulerCtx}; + +pub(crate) struct ConnectorPublic { + pub inbox: PublicInbox, + pub sleeping: AtomicBool, +} + +impl ConnectorPublic { + pub fn new(initialize_as_sleeping: bool) -> Self { + ConnectorPublic{ + inbox: PublicInbox::new(), + sleeping: AtomicBool::new(initialize_as_sleeping), + } + } +} + +pub(crate) struct ConnectorPDL { + tree: ExecTree, + consensus: Consensus, + branch_workspace: Vec, +} + +struct ConnectorRunContext {}; +impl RunContext for ConnectorRunContext{ + fn did_put(&mut self, port: PortId) -> bool { + todo!() + } + + fn get(&mut self, port: PortId) -> Option { + todo!() + } + + fn fires(&mut self, port: PortId) -> Option { + todo!() + } + + fn get_channel(&mut self) -> Option<(Value, Value)> { + todo!() + } +} + +impl Connector for ConnectorPDL { + fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling { + todo!() + } +} + +impl ConnectorPDL { + pub fn new(initial: ComponentState, owned_ports: Vec) -> Self { + Self{ + tree: ExecTree::new(initial), + consensus: Consensus::new(), + } + } + + // --- Handling messages + + pub fn handle_new_messages(&mut self, ctx: &mut ComponentCtxFancy) { + while let Some(message) = ctx.read_next_message() { + match message { + MessageFancy::Data(message) => handle_new_data_message(message, ctx), + MessageFancy::Sync(message) => handle_new_sync_message(message, ctx), + MessageFancy::Control(_) => unreachable!("control message in component"), + } + } + } + + pub fn handle_new_data_message(&mut self, message: DataMessageFancy, ctx: &mut ComponentCtxFancy) { + // Go through all branches that are awaiting new messages and see if + // there is one that can receive this message. + debug_assert!(self.branch_workspace.is_empty()); + self.consensus.handle_received_sync_header(&message.sync_header, ctx); + self.consensus.handle_received_data_header(&self.tree, &message.data_header, &mut self.branch_workspace); + + for branch_id in self.branch_workspace.drain(..) { + // This branch can receive, so fork and given it the message + let receiving_branch_id = self.tree.fork_branch(branch_id); + self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); + let receiving_branch = &mut self.tree[receiving_branch_id]; + + receiving_branch.insert_message(message.data_header.target_port, message.content.clone()); + self.consensus.notify_of_received_message(branch_id, &message.data_header); + + // And prepare the branch for running + self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); + } + } + + pub fn handle_new_sync_message(&mut self, message: SyncMessageFancy, ctx: &mut ComponentCtxFancy) { + self.consensus.handle_received_sync_header(&message.sync_header, ctx); + todo!("handle content of message?"); + } + + // --- Running code + + pub fn run_in_sync_mode(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling { + // Check if we have any branch that needs running + let branch_id = self.tree.pop_from_queue(QueueKind::Runnable); + if branch_id.is_none() { + return ConnectorScheduling::NotNow; + } + + // Retrieve the branch and run it + let branch_id = branch_id.unwrap(); + let branch = &mut self.tree[branch_id]; + + let mut run_context = ConnectorRunContext{}; + let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description); + + // Handle the returned result. Note that this match statement contains + // explicit returns in case the run result requires that the component's + // code is ran again immediately + match run_result { + RunResult::BranchInconsistent => { + // Branch became inconsistent + branch.sync_state = SpeculativeState::Inconsistent; + }, + RunResult::BranchMissingPortState(port_id) => { + // Branch called `fires()` on a port that has not been used yet. + let port_id = PortIdLocal::new(port_id.0.u32_suffix); + + // Create two forks, one that assumes the port will fire, and + // one that assumes the port remains silent + branch.sync_state = SpeculativeState::HaltedAtBranchPoint; + + let firing_branch_id = self.tree.fork_branch(branch_id); + let silent_branch_id = self.tree.fork_branch(branch_id); + self.consensus.notify_of_new_branch(branch_id, firing_branch_id); + let _result = self.consensus.notify_of_speculative_mapping(firing_branch_id, port_id, true); + debug_assert_eq!(_result, Consistency::Valid); + self.consensus.notify_of_new_branch(branch_id, silent_branch_id); + let _result = self.consensus.notify_of_speculative_mapping(silent_branch_id, port_id, false); + debug_assert_eq!(_result, Consistency::Valid); + + // Somewhat important: we push the firing one first, such that + // that branch is ran again immediately. + self.tree.push_into_queue(QueueKind::Runnable, firing_branch_id); + self.tree.push_into_queue(QueueKind::Runnable, silent_branch_id); + + return ConnectorScheduling::Immediate; + }, + RunResult::BranchMissingPortValue(port_id) => { + // Branch performed a `get()` on a port that does not have a + // received message on that port. + let port_id = PortIdLocal::new(port_id.0.u32_suffix); + let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true); + if consistency == Consistency::Valid { + // `get()` is valid, so mark the branch as awaiting a message + branch.sync_state = SpeculativeState::HaltedAtBranchPoint; + branch.awaiting_port = port_id; + self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id); + + // Note: we only know that a branch is waiting on a message when + // it reaches the `get` call. But we might have already received + // a message that targets this branch, so check now. + let mut any_branch_received = false; + for message in comp_ctx.get_read_data_messages(port_id) { + if self.consensus.branch_can_receive(branch_id, &message.data_header) { + // This branch can receive the message, so we do the + // fork-and-receive dance + let recv_branch_id = self.tree.fork_branch(branch_id); + let branch = &mut self.tree[recv_branch_id]; + branch.insert_message(port_id, message.content.clone()); + + self.consensus.notify_of_new_branch(branch_id, recv_branch_id); + self.consensus.notify_of_received_message(recv_branch_id, &message.data_header); + self.tree.push_into_queue(QueueKind::Runnable, recv_branch_id); + + any_branch_received = true; + } + } + + if any_branch_received { + return ConnectorScheduling::Immediate; + } + } else { + branch.sync_state = SpeculativeState::Inconsistent; + } + } + RunResult::BranchAtSyncEnd => { + let consistency = self.consensus.notify_of_finished_branch(branch_id); + if consistency == Consistency::Valid { + branch.sync_state = SpeculativeState::ReachedSyncEnd; + self.tree.push_into_queue(QueueKind::FinishedSync, branch_id); + } else if consistency == Consistency::Inconsistent { + branch.sync_state == SpeculativeState::Inconsistent; + } + }, + RunResult::BranchPut(port_id, contents) => { + // Branch is attempting to send data + let port_id = PortIdLocal::new(port_id.0.u32_suffix); + let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true); + if consistency == Consistency::Valid { + // `put()` is valid. + self.consensus. + } else { + branch.sync_state = SpeculativeState::Inconsistent; + } + }, + _ => unreachable!("unexpected run result {:?} in sync mode", run_result), + } + + // If here then the run result did not require a particular action. We + // return whether we have more active branches to run or not. + if self.tree.queue_is_empty(QueueKind::Runnable) { + return ConnectorScheduling::NotNow; + } else { + return ConnectorScheduling::Later; + } + } +} \ No newline at end of file