use std::sync::atomic::AtomicBool; use crate::common::ComponentState; use crate::PortId; use crate::protocol::eval::{Value, ValueGroup}; use crate::protocol::{RunContext, RunResult}; use crate::runtime2::branch::{Branch, BranchId, ExecTree, QueueKind, SpeculativeState}; use crate::runtime2::connector::ConnectorScheduling; use crate::runtime2::consensus::{Consensus, Consistency}; use crate::runtime2::inbox2::{DataMessageFancy, MessageFancy, SyncMessageFancy}; use crate::runtime2::inbox::PublicInbox; use crate::runtime2::native::Connector; use crate::runtime2::port::PortIdLocal; use crate::runtime2::scheduler::{ComponentCtxFancy, SchedulerCtx}; pub(crate) struct ConnectorPublic { pub inbox: PublicInbox, pub sleeping: AtomicBool, } impl ConnectorPublic { pub fn new(initialize_as_sleeping: bool) -> Self { ConnectorPublic{ inbox: PublicInbox::new(), sleeping: AtomicBool::new(initialize_as_sleeping), } } } pub(crate) struct ConnectorPDL { tree: ExecTree, consensus: Consensus, branch_workspace: Vec, } struct ConnectorRunContext {}; impl RunContext for ConnectorRunContext{ fn did_put(&mut self, port: PortId) -> bool { todo!() } fn get(&mut self, port: PortId) -> Option { todo!() } fn fires(&mut self, port: PortId) -> Option { todo!() } fn get_channel(&mut self) -> Option<(Value, Value)> { todo!() } } impl Connector for ConnectorPDL { fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling { todo!() } } impl ConnectorPDL { pub fn new(initial: ComponentState, owned_ports: Vec) -> Self { Self{ tree: ExecTree::new(initial), consensus: Consensus::new(), } } // --- Handling messages pub fn handle_new_messages(&mut self, ctx: &mut ComponentCtxFancy) { while let Some(message) = ctx.read_next_message() { match message { MessageFancy::Data(message) => handle_new_data_message(message, ctx), MessageFancy::Sync(message) => handle_new_sync_message(message, ctx), MessageFancy::Control(_) => unreachable!("control message in component"), } } } pub fn handle_new_data_message(&mut self, message: DataMessageFancy, ctx: &mut ComponentCtxFancy) { // Go through all branches that are awaiting new messages and see if // there is one that can receive this message. debug_assert!(self.branch_workspace.is_empty()); self.consensus.handle_received_sync_header(&message.sync_header, ctx); self.consensus.handle_received_data_header(&self.tree, &message.data_header, &mut self.branch_workspace); for branch_id in self.branch_workspace.drain(..) { // This branch can receive, so fork and given it the message let receiving_branch_id = self.tree.fork_branch(branch_id); self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); let receiving_branch = &mut self.tree[receiving_branch_id]; receiving_branch.insert_message(message.data_header.target_port, message.content.clone()); self.consensus.notify_of_received_message(branch_id, &message.data_header); // And prepare the branch for running self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); } } pub fn handle_new_sync_message(&mut self, message: SyncMessageFancy, ctx: &mut ComponentCtxFancy) { self.consensus.handle_received_sync_header(&message.sync_header, ctx); todo!("handle content of message?"); } // --- Running code pub fn run_in_sync_mode(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling { // Check if we have any branch that needs running let branch_id = self.tree.pop_from_queue(QueueKind::Runnable); if branch_id.is_none() { return ConnectorScheduling::NotNow; } // Retrieve the branch and run it let branch_id = branch_id.unwrap(); let branch = &mut self.tree[branch_id]; let mut run_context = ConnectorRunContext{}; let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description); // Handle the returned result. Note that this match statement contains // explicit returns in case the run result requires that the component's // code is ran again immediately match run_result { RunResult::BranchInconsistent => { // Branch became inconsistent branch.sync_state = SpeculativeState::Inconsistent; }, RunResult::BranchMissingPortState(port_id) => { // Branch called `fires()` on a port that has not been used yet. let port_id = PortIdLocal::new(port_id.0.u32_suffix); // Create two forks, one that assumes the port will fire, and // one that assumes the port remains silent branch.sync_state = SpeculativeState::HaltedAtBranchPoint; let firing_branch_id = self.tree.fork_branch(branch_id); let silent_branch_id = self.tree.fork_branch(branch_id); self.consensus.notify_of_new_branch(branch_id, firing_branch_id); let _result = self.consensus.notify_of_speculative_mapping(firing_branch_id, port_id, true); debug_assert_eq!(_result, Consistency::Valid); self.consensus.notify_of_new_branch(branch_id, silent_branch_id); let _result = self.consensus.notify_of_speculative_mapping(silent_branch_id, port_id, false); debug_assert_eq!(_result, Consistency::Valid); // Somewhat important: we push the firing one first, such that // that branch is ran again immediately. self.tree.push_into_queue(QueueKind::Runnable, firing_branch_id); self.tree.push_into_queue(QueueKind::Runnable, silent_branch_id); return ConnectorScheduling::Immediate; }, RunResult::BranchMissingPortValue(port_id) => { // Branch performed a `get()` on a port that does not have a // received message on that port. let port_id = PortIdLocal::new(port_id.0.u32_suffix); let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true); if consistency == Consistency::Valid { // `get()` is valid, so mark the branch as awaiting a message branch.sync_state = SpeculativeState::HaltedAtBranchPoint; branch.awaiting_port = port_id; self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id); // Note: we only know that a branch is waiting on a message when // it reaches the `get` call. But we might have already received // a message that targets this branch, so check now. let mut any_branch_received = false; for message in comp_ctx.get_read_data_messages(port_id) { if self.consensus.branch_can_receive(branch_id, &message.data_header) { // This branch can receive the message, so we do the // fork-and-receive dance let recv_branch_id = self.tree.fork_branch(branch_id); let branch = &mut self.tree[recv_branch_id]; branch.insert_message(port_id, message.content.clone()); self.consensus.notify_of_new_branch(branch_id, recv_branch_id); self.consensus.notify_of_received_message(recv_branch_id, &message.data_header); self.tree.push_into_queue(QueueKind::Runnable, recv_branch_id); any_branch_received = true; } } if any_branch_received { return ConnectorScheduling::Immediate; } } else { branch.sync_state = SpeculativeState::Inconsistent; } } RunResult::BranchAtSyncEnd => { let consistency = self.consensus.notify_of_finished_branch(branch_id); if consistency == Consistency::Valid { branch.sync_state = SpeculativeState::ReachedSyncEnd; self.tree.push_into_queue(QueueKind::FinishedSync, branch_id); } else if consistency == Consistency::Inconsistent { branch.sync_state == SpeculativeState::Inconsistent; } }, RunResult::BranchPut(port_id, contents) => { // Branch is attempting to send data let port_id = PortIdLocal::new(port_id.0.u32_suffix); let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true); if consistency == Consistency::Valid { // `put()` is valid. self.consensus. } else { branch.sync_state = SpeculativeState::Inconsistent; } }, _ => unreachable!("unexpected run result {:?} in sync mode", run_result), } // If here then the run result did not require a particular action. We // return whether we have more active branches to run or not. if self.tree.queue_is_empty(QueueKind::Runnable) { return ConnectorScheduling::NotNow; } else { return ConnectorScheduling::Later; } } }