diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index d6ce463e4618165e4fb42b37230c366570ec0fbd..c08dcdce0381a2914611334344291af94e995588 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -56,6 +56,14 @@ impl ConnectorPublic { } } +#[derive(Debug, PartialEq, Eq, Copy)] +enum Mode { + NonSync, // running non-sync code + Sync, // running sync code (in potentially multiple branches) + SyncError, // encountered an unrecoverable error in sync mode + Error, // encountered an error in non-sync mode (or finished handling the sync mode error). +} + #[derive(Debug)] pub(crate) enum ConnectorScheduling { Immediate, // Run again, immediately @@ -66,6 +74,8 @@ pub(crate) enum ConnectorScheduling { } pub(crate) struct ConnectorPDL { + mode: Mode, + eval_error: Option, tree: ExecTree, consensus: Consensus, last_finished_handled: Option, @@ -121,30 +131,41 @@ impl<'a> RunContext for ConnectorRunContext<'a>{ impl Connector for ConnectorPDL { fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { self.handle_new_messages(comp_ctx); - if self.tree.is_in_sync() { - // Run in sync mode - let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx); - - // Handle any new finished branches - let mut iter_id = self.last_finished_handled.or(self.tree.get_queue_first(QueueKind::FinishedSync)); - while let Some(branch_id) = iter_id { - iter_id = self.tree.get_queue_next(branch_id); - self.last_finished_handled = Some(branch_id); - + match self.mode { + Mode::Sync => { + // Run in sync mode + let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx); + + // Handle any new finished branches + let mut iter_id = self.last_finished_handled.or(self.tree.get_queue_first(QueueKind::FinishedSync)); + while let Some(branch_id) = iter_id { + iter_id = self.tree.get_queue_next(branch_id); + self.last_finished_handled = Some(branch_id); + + if let Some(solution_branch_id) = self.consensus.handle_new_finished_sync_branch(branch_id, comp_ctx) { + // Actually found a solution + self.enter_non_sync_mode(solution_branch_id, comp_ctx); + return ConnectorScheduling::Immediate; + } - if let Some(solution_branch_id) = self.consensus.handle_new_finished_sync_branch(branch_id, comp_ctx) { - // Actually found a solution - self.collapse_sync_to_solution_branch(solution_branch_id, comp_ctx); - return ConnectorScheduling::Immediate; + self.last_finished_handled = Some(branch_id); } - self.last_finished_handled = Some(branch_id); - } - - return scheduling; - } else { - let scheduling = self.run_in_deterministic_mode(sched_ctx, comp_ctx); - return scheduling; + return scheduling; + }, + Mode::NonSync => { + let scheduling = self.run_in_deterministic_mode(sched_ctx, comp_ctx); + return scheduling; + }, + Mode::SyncError => { + todo!("write"); + return ConnectorScheduling::Exit; + }, + Mode::Error => { + // This shouldn't really be called. Because when we reach exit + // mode the scheduler should not run the component anymore + unreachable!("called component run() during error-mode"); + }, } } } @@ -152,6 +173,8 @@ impl Connector for ConnectorPDL { impl ConnectorPDL { pub fn new(initial: Prompt) -> Self { Self{ + mode: Mode::NonSync, + eval_error: None, tree: ExecTree::new(initial), consensus: Consensus::new(), last_finished_handled: None, @@ -203,7 +226,7 @@ impl ConnectorPDL { pub fn handle_new_sync_message(&mut self, message: SyncMessage, ctx: &mut ComponentCtx) { if let Some(solution_branch_id) = self.consensus.handle_new_sync_message(message, ctx) { - self.collapse_sync_to_solution_branch(solution_branch_id, ctx); + self.enter_non_sync_mode(solution_branch_id, ctx); } } @@ -327,10 +350,18 @@ impl ConnectorPDL { // Branch is attempting to send data let port_id = PortIdLocal::new(port_id.0.u32_suffix); let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx); - comp_ctx.submit_message(Message::Data(DataMessage { + if let Err(_) = comp_ctx.submit_message(Message::Data(DataMessage { sync_header, data_header, content: DataContent::Message(content), - })); + })) { + // We don't own the port + let pd = &sched_ctx.runtime.protocol_description; + let eval_error = branch.code_state.new_error_at_expr( + &pd.modules, &pd.heap, + String::from("attempted to 'put' on port that is no longer owned") + ); + self.mode = Mode::SyncError(eval_error); + } branch.prepared = PreparedStatement::PerformedPut; self.tree.push_into_queue(QueueKind::Runnable, branch_id); @@ -416,7 +447,12 @@ impl ConnectorPDL { } } - pub fn collapse_sync_to_solution_branch(&mut self, solution_branch_id: BranchId, ctx: &mut ComponentCtx) { + /// Helper that moves the component's state back into non-sync mode, using + /// the provided solution branch ID as the branch that should be comitted to + /// memory. + fn enter_non_sync_mode(&mut self, solution_branch_id: BranchId, ctx: &mut ComponentCtx) { + debug_assert!(self.mode == Mode::Sync || self.mode == Mode::SyncError); + let mut fake_vec = Vec::new(); self.tree.end_sync(solution_branch_id); self.consensus.end_sync(solution_branch_id, &mut fake_vec); @@ -428,6 +464,23 @@ impl ConnectorPDL { ctx.notify_sync_end(&[]); self.last_finished_handled = None; + self.eval_error = None; // in case we came from the SyncError mode + self.mode = Mode::NonSync; + } + + /// Helper that moves the component's state into sync-error mode, sending + /// the appropriate errors where needed. The branch that caused the fatal + /// error and the evaluation error should be provided. + fn enter_sync_error_mode(&mut self, failing_branch_id: BranchId, ctx: &mut ComponentCtx, eval_error: EvalError) { + debug_assert!(self.mode == Mode::Sync); + debug_assert!(self.eval_error.is_none()); + self.mode = Mode::SyncError; + self.eval_error = Some(eval_error); + + let failing_branch = &mut self.tree[failing_branch_id]; + failing_branch.sync_state = SpeculativeState::Error; + + } /// Runs the prompt repeatedly until some kind of execution-blocking