diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index c08dcdce0381a2914611334344291af94e995588..f4b4e6bce2b76eda2f3da9ed9c8c1b3ef09b2666 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -34,10 +34,12 @@ use crate::common::ComponentState; use crate::protocol::eval::{EvalContinuation, EvalError, Prompt, Value, ValueGroup}; use crate::protocol::{RunContext, RunResult}; use crate::runtime2::branch::PreparedStatement; +use crate::runtime2::consensus::RoundConclusion; +use crate::runtime2::inbox::SyncPortMessage; use super::branch::{BranchId, ExecTree, QueueKind, SpeculativeState}; use super::consensus::{Consensus, Consistency, find_ports_in_value_group}; -use super::inbox::{DataMessage, DataContent, Message, SyncMessage, PublicInbox}; +use super::inbox::{DataMessage, DataContent, Message, SyncCompMessage, PublicInbox}; use super::native::Connector; use super::port::{PortKind, PortIdLocal}; use super::scheduler::{ComponentCtx, SchedulerCtx}; @@ -56,7 +58,7 @@ impl ConnectorPublic { } } -#[derive(Debug, PartialEq, Eq, Copy)] +#[derive(Debug, PartialEq, Eq, Clone, Copy)] enum Mode { NonSync, // running non-sync code Sync, // running sync code (in potentially multiple branches) @@ -106,6 +108,7 @@ impl<'a> RunContext for ConnectorRunContext<'a>{ } fn fires(&mut self, port: PortId) -> Option { + todo!("Remove fires() now"); let port_id = PortIdLocal::new(port.0.u32_suffix); let annotation = self.consensus.get_annotation(self.branch_id, port_id); return annotation.expected_firing.map(|v| Value::Bool(v)); @@ -130,7 +133,10 @@ impl<'a> RunContext for ConnectorRunContext<'a>{ impl Connector for ConnectorPDL { fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { - self.handle_new_messages(comp_ctx); + if let Some(scheduling) = self.handle_new_messages(comp_ctx) { + return scheduling; + } + match self.mode { Mode::Sync => { // Run in sync mode @@ -142,10 +148,9 @@ impl Connector for ConnectorPDL { iter_id = self.tree.get_queue_next(branch_id); self.last_finished_handled = Some(branch_id); - if let Some(solution_branch_id) = self.consensus.handle_new_finished_sync_branch(branch_id, comp_ctx) { + if let Some(round_conclusion) = self.consensus.handle_new_finished_sync_branch(branch_id, comp_ctx) { // Actually found a solution - self.enter_non_sync_mode(solution_branch_id, comp_ctx); - return ConnectorScheduling::Immediate; + return self.enter_non_sync_mode(round_conclusion, comp_ctx); } self.last_finished_handled = Some(branch_id); @@ -183,14 +188,19 @@ impl ConnectorPDL { // --- Handling messages - pub fn handle_new_messages(&mut self, ctx: &mut ComponentCtx) { + pub fn handle_new_messages(&mut self, ctx: &mut ComponentCtx) -> Option { while let Some(message) = ctx.read_next_message() { match message { Message::Data(message) => self.handle_new_data_message(message, ctx), - Message::Sync(message) => self.handle_new_sync_message(message, ctx), + Message::SyncComp(message) => { + return self.handle_new_sync_comp_message(message, ctx) + }, + Message::SyncPort(message) => self.handle_new_sync_port_message(message, ctx), Message::Control(_) => unreachable!("control message in component"), } } + + return None; } pub fn handle_new_data_message(&mut self, message: DataMessage, ctx: &mut ComponentCtx) { @@ -211,23 +221,30 @@ impl ConnectorPDL { // This branch can receive, so fork and given it the message let receiving_branch_id = self.tree.fork_branch(branch_id); + println!("DEBUG: ### Branching due to new data message"); self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); let receiving_branch = &mut self.tree[receiving_branch_id]; debug_assert!(receiving_branch.awaiting_port == message.data_header.target_port); receiving_branch.awaiting_port = PortIdLocal::new_invalid(); receiving_branch.prepared = PreparedStatement::PerformedGet(message.content.as_message().unwrap().clone()); - self.consensus.notify_of_received_message(receiving_branch_id, &message); + self.consensus.notify_of_received_message(receiving_branch_id, &message, ctx); // And prepare the branch for running self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); } } - pub fn handle_new_sync_message(&mut self, message: SyncMessage, ctx: &mut ComponentCtx) { - if let Some(solution_branch_id) = self.consensus.handle_new_sync_message(message, ctx) { - self.enter_non_sync_mode(solution_branch_id, ctx); + pub fn handle_new_sync_comp_message(&mut self, message: SyncCompMessage, ctx: &mut ComponentCtx) -> Option { + if let Some(round_conclusion) = self.consensus.handle_new_sync_comp_message(message, ctx) { + return Some(self.enter_non_sync_mode(round_conclusion, ctx)); } + + return None; + } + + pub fn handle_new_sync_port_message(&mut self, message: SyncPortMessage, ctx: &mut ComponentCtx) { + self.consensus.handle_new_sync_port_message(message, ctx); } // --- Running code @@ -275,10 +292,10 @@ impl ConnectorPDL { let firing_branch_id = self.tree.fork_branch(branch_id); let silent_branch_id = self.tree.fork_branch(branch_id); self.consensus.notify_of_new_branch(branch_id, firing_branch_id); - let _result = self.consensus.notify_of_speculative_mapping(firing_branch_id, port_id, true); + let _result = self.consensus.notify_of_speculative_mapping(firing_branch_id, port_id, true, comp_ctx); debug_assert_eq!(_result, Consistency::Valid); self.consensus.notify_of_new_branch(branch_id, silent_branch_id); - let _result = self.consensus.notify_of_speculative_mapping(silent_branch_id, port_id, false); + let _result = self.consensus.notify_of_speculative_mapping(silent_branch_id, port_id, false, comp_ctx); debug_assert_eq!(_result, Consistency::Valid); // Somewhat important: we push the firing one first, such that @@ -310,8 +327,9 @@ impl ConnectorPDL { branch.awaiting_port = PortIdLocal::new_invalid(); branch.prepared = PreparedStatement::PerformedGet(message.content.as_message().unwrap().clone()); + println!("DEBUG: ### Branching due to BlockGet with existing message"); self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); - self.consensus.notify_of_received_message(receiving_branch_id, &message); + self.consensus.notify_of_received_message(receiving_branch_id, &message, comp_ctx); self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); any_message_received = true; @@ -360,7 +378,8 @@ impl ConnectorPDL { &pd.modules, &pd.heap, String::from("attempted to 'put' on port that is no longer owned") ); - self.mode = Mode::SyncError(eval_error); + self.eval_error = Some(eval_error); + self.mode = Mode::SyncError; } branch.prepared = PreparedStatement::PerformedPut; @@ -399,7 +418,7 @@ impl ConnectorPDL { match run_result { EvalContinuation::ComponentTerminated => { branch.sync_state = SpeculativeState::Finished; - + println!("DEBUG: ************ DOING THEM EXITS"); return ConnectorScheduling::Exit; }, EvalContinuation::SyncBlockStart => { @@ -409,6 +428,7 @@ impl ConnectorPDL { self.consensus.start_sync(comp_ctx); self.consensus.notify_of_new_branch(BranchId::new_invalid(), sync_branch_id); self.tree.push_into_queue(QueueKind::Runnable, sync_branch_id); + self.mode = Mode::Sync; return ConnectorScheduling::Immediate; }, @@ -449,38 +469,43 @@ impl ConnectorPDL { /// Helper that moves the component's state back into non-sync mode, using /// the provided solution branch ID as the branch that should be comitted to - /// memory. - fn enter_non_sync_mode(&mut self, solution_branch_id: BranchId, ctx: &mut ComponentCtx) { + /// memory. If this function returns false, then the component is supposed + /// to exit. + fn enter_non_sync_mode(&mut self, conclusion: RoundConclusion, ctx: &mut ComponentCtx) -> ConnectorScheduling { debug_assert!(self.mode == Mode::Sync || self.mode == Mode::SyncError); - let mut fake_vec = Vec::new(); - self.tree.end_sync(solution_branch_id); - self.consensus.end_sync(solution_branch_id, &mut fake_vec); - - for port in fake_vec { - // TODO: Handle sent/received ports - debug_assert!(ctx.get_port_by_id(port).is_some()); - } - - ctx.notify_sync_end(&[]); - self.last_finished_handled = None; - self.eval_error = None; // in case we came from the SyncError mode - self.mode = Mode::NonSync; - } - - /// Helper that moves the component's state into sync-error mode, sending - /// the appropriate errors where needed. The branch that caused the fatal - /// error and the evaluation error should be provided. - fn enter_sync_error_mode(&mut self, failing_branch_id: BranchId, ctx: &mut ComponentCtx, eval_error: EvalError) { - debug_assert!(self.mode == Mode::Sync); - debug_assert!(self.eval_error.is_none()); - self.mode = Mode::SyncError; - self.eval_error = Some(eval_error); + // Depending on local state decide what to do + let final_branch_id = match conclusion { + RoundConclusion::Success(branch_id) => Some(branch_id), + RoundConclusion::Failure => if self.mode == Mode::SyncError { + // We experienced an error, so exit now + None + } else { + // We didn't experience an error, so retry + // TODO: Decide what to do with sync errors + Some(BranchId::new_invalid()) + } + }; - let failing_branch = &mut self.tree[failing_branch_id]; - failing_branch.sync_state = SpeculativeState::Error; + if let Some(solution_branch_id) = final_branch_id { + let mut fake_vec = Vec::new(); + self.tree.end_sync(solution_branch_id); + self.consensus.end_sync(solution_branch_id, &mut fake_vec); + debug_assert!(fake_vec.is_empty()); + ctx.notify_sync_end(&[]); + self.last_finished_handled = None; + self.eval_error = None; // in case we came from the SyncError mode + self.mode = Mode::NonSync; + return ConnectorScheduling::Immediate; + } else { + // No final branch, because we're supposed to exit! + panic!("TEMPTEMP: NOOOOOOOOO 1"); + self.last_finished_handled = None; + self.mode = Mode::Error; + return ConnectorScheduling::Exit; + } } /// Runs the prompt repeatedly until some kind of execution-blocking