From a99ae23c30ec2c8828b1e061f63e082daf7a1fad 2021-11-18 11:36:29 From: mh Date: 2021-11-18 11:36:29 Subject: [PATCH] WIP on consensus error handling --- diff --git a/src/protocol/eval/executor.rs b/src/protocol/eval/executor.rs index f1d6a665dc146700bdb0f38d1bd752a35c97946f..670993cbc27ba8d66c0c91e19b18288c5e7742cf 100644 --- a/src/protocol/eval/executor.rs +++ b/src/protocol/eval/executor.rs @@ -1046,4 +1046,22 @@ impl Prompt { return_value } + + /// Constructs an error at the current expression that lives at the top of + /// the expression stack. Falls back to constructing an error at the current + /// statement if there is no expression. + pub(crate) fn new_error_at_expr(&self, modules: &[Module], heap: &Heap, error_message: String) -> EvalError { + let last_frame = self.frames.last().unwrap(); + for instruction in last_frame.expr_stack.iter().rev() { + if let ExprInstruction::EvalExpr(expression_id) = instruction { + return EvalError::new_error_at_expr( + self, modules, heap, *expression_id, error_message + ); + } + } + + // If here then expression stack was empty (cannot have just rotate + // instructions) + panic!("attempted to construct evaluation error without any expressions to evaluate in frame"); + } } \ No newline at end of file diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index d6ce463e4618165e4fb42b37230c366570ec0fbd..c08dcdce0381a2914611334344291af94e995588 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -56,6 +56,14 @@ impl ConnectorPublic { } } +#[derive(Debug, PartialEq, Eq, Copy)] +enum Mode { + NonSync, // running non-sync code + Sync, // running sync code (in potentially multiple branches) + SyncError, // encountered an unrecoverable error in sync mode + Error, // encountered an error in non-sync mode (or finished handling the sync mode error). +} + #[derive(Debug)] pub(crate) enum ConnectorScheduling { Immediate, // Run again, immediately @@ -66,6 +74,8 @@ pub(crate) enum ConnectorScheduling { } pub(crate) struct ConnectorPDL { + mode: Mode, + eval_error: Option, tree: ExecTree, consensus: Consensus, last_finished_handled: Option, @@ -121,30 +131,41 @@ impl<'a> RunContext for ConnectorRunContext<'a>{ impl Connector for ConnectorPDL { fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { self.handle_new_messages(comp_ctx); - if self.tree.is_in_sync() { - // Run in sync mode - let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx); - - // Handle any new finished branches - let mut iter_id = self.last_finished_handled.or(self.tree.get_queue_first(QueueKind::FinishedSync)); - while let Some(branch_id) = iter_id { - iter_id = self.tree.get_queue_next(branch_id); - self.last_finished_handled = Some(branch_id); - + match self.mode { + Mode::Sync => { + // Run in sync mode + let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx); + + // Handle any new finished branches + let mut iter_id = self.last_finished_handled.or(self.tree.get_queue_first(QueueKind::FinishedSync)); + while let Some(branch_id) = iter_id { + iter_id = self.tree.get_queue_next(branch_id); + self.last_finished_handled = Some(branch_id); + + if let Some(solution_branch_id) = self.consensus.handle_new_finished_sync_branch(branch_id, comp_ctx) { + // Actually found a solution + self.enter_non_sync_mode(solution_branch_id, comp_ctx); + return ConnectorScheduling::Immediate; + } - if let Some(solution_branch_id) = self.consensus.handle_new_finished_sync_branch(branch_id, comp_ctx) { - // Actually found a solution - self.collapse_sync_to_solution_branch(solution_branch_id, comp_ctx); - return ConnectorScheduling::Immediate; + self.last_finished_handled = Some(branch_id); } - self.last_finished_handled = Some(branch_id); - } - - return scheduling; - } else { - let scheduling = self.run_in_deterministic_mode(sched_ctx, comp_ctx); - return scheduling; + return scheduling; + }, + Mode::NonSync => { + let scheduling = self.run_in_deterministic_mode(sched_ctx, comp_ctx); + return scheduling; + }, + Mode::SyncError => { + todo!("write"); + return ConnectorScheduling::Exit; + }, + Mode::Error => { + // This shouldn't really be called. Because when we reach exit + // mode the scheduler should not run the component anymore + unreachable!("called component run() during error-mode"); + }, } } } @@ -152,6 +173,8 @@ impl Connector for ConnectorPDL { impl ConnectorPDL { pub fn new(initial: Prompt) -> Self { Self{ + mode: Mode::NonSync, + eval_error: None, tree: ExecTree::new(initial), consensus: Consensus::new(), last_finished_handled: None, @@ -203,7 +226,7 @@ impl ConnectorPDL { pub fn handle_new_sync_message(&mut self, message: SyncMessage, ctx: &mut ComponentCtx) { if let Some(solution_branch_id) = self.consensus.handle_new_sync_message(message, ctx) { - self.collapse_sync_to_solution_branch(solution_branch_id, ctx); + self.enter_non_sync_mode(solution_branch_id, ctx); } } @@ -327,10 +350,18 @@ impl ConnectorPDL { // Branch is attempting to send data let port_id = PortIdLocal::new(port_id.0.u32_suffix); let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx); - comp_ctx.submit_message(Message::Data(DataMessage { + if let Err(_) = comp_ctx.submit_message(Message::Data(DataMessage { sync_header, data_header, content: DataContent::Message(content), - })); + })) { + // We don't own the port + let pd = &sched_ctx.runtime.protocol_description; + let eval_error = branch.code_state.new_error_at_expr( + &pd.modules, &pd.heap, + String::from("attempted to 'put' on port that is no longer owned") + ); + self.mode = Mode::SyncError(eval_error); + } branch.prepared = PreparedStatement::PerformedPut; self.tree.push_into_queue(QueueKind::Runnable, branch_id); @@ -416,7 +447,12 @@ impl ConnectorPDL { } } - pub fn collapse_sync_to_solution_branch(&mut self, solution_branch_id: BranchId, ctx: &mut ComponentCtx) { + /// Helper that moves the component's state back into non-sync mode, using + /// the provided solution branch ID as the branch that should be comitted to + /// memory. + fn enter_non_sync_mode(&mut self, solution_branch_id: BranchId, ctx: &mut ComponentCtx) { + debug_assert!(self.mode == Mode::Sync || self.mode == Mode::SyncError); + let mut fake_vec = Vec::new(); self.tree.end_sync(solution_branch_id); self.consensus.end_sync(solution_branch_id, &mut fake_vec); @@ -428,6 +464,23 @@ impl ConnectorPDL { ctx.notify_sync_end(&[]); self.last_finished_handled = None; + self.eval_error = None; // in case we came from the SyncError mode + self.mode = Mode::NonSync; + } + + /// Helper that moves the component's state into sync-error mode, sending + /// the appropriate errors where needed. The branch that caused the fatal + /// error and the evaluation error should be provided. + fn enter_sync_error_mode(&mut self, failing_branch_id: BranchId, ctx: &mut ComponentCtx, eval_error: EvalError) { + debug_assert!(self.mode == Mode::Sync); + debug_assert!(self.eval_error.is_none()); + self.mode = Mode::SyncError; + self.eval_error = Some(eval_error); + + let failing_branch = &mut self.tree[failing_branch_id]; + failing_branch.sync_state = SpeculativeState::Error; + + } /// Runs the prompt repeatedly until some kind of execution-blocking diff --git a/src/runtime2/consensus.rs b/src/runtime2/consensus.rs index daea21188bf15d535f184b8faa1c08a3af865ae8..951045671b8adc3c3eb5901f27538ad6d3c881a7 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime2/consensus.rs @@ -144,6 +144,25 @@ impl Consensus { self.branch_markers.push(new_branch_id); } + /// Notifies the consensus algorithm that a particular branch has + /// encountered an unrecoverable error. If the return value is `false`, then + /// the caller can enter a "normal" exit mode instead of the special "sync" + /// exit mode. + pub fn notify_of_fatal_branch(&mut self, failed_branch_id: BranchId) -> bool { + debug_assert!(self.is_in_sync()); + + // Check for trivial case, where branch has not yet communicated within + // the consensus algorithm + let branch = &self.branch_annotations[failed_branch_id.index as usize]; + if branch.port_mapping.iter().all(|v| v.registered_id.is_none()) { + return false; + } + + // Branch has communicated. Since we need to discover the entire + + return true; + } + /// Notifies the consensus algorithm that a branch has reached the end of /// the sync block. A final check for consistency will be performed that the /// caller has to handle. Note that diff --git a/src/runtime2/inbox.rs b/src/runtime2/inbox.rs index 91db3a3711b6eecb1bfb71020328bbc54fad2f25..26983cc5a008de576fe4860a3c1e1b0fbf189777 100644 --- a/src/runtime2/inbox.rs +++ b/src/runtime2/inbox.rs @@ -127,6 +127,19 @@ pub(crate) enum Message { Control(ControlMessage), } +impl Message { + /// If the message is sent through a particular channel, then this function + /// returns the port through which the message was sent. + pub(crate) fn source_port(&self) -> Option { + // Currently only data messages have a source port + if let Message::Data(message) = self { + return Some(message.data_header.sending_port); + } else { + return None; + } + } +} + /// The public inbox of a connector. The thread running the connector that owns /// this inbox may retrieved from it. Non-owning threads may only put new /// messages inside of it. diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index 22299972b63d32556f9f855b82fd5f9e73066e32..e822bdff686170be618cdaab571777e698bd80b6 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -1,6 +1,7 @@ use std::collections::VecDeque; use std::sync::Arc; use std::sync::atomic::Ordering; +use crate::protocol::eval::EvalError; use super::{ScheduledConnector, RuntimeInner, ConnectorId, ConnectorKey}; use super::port::{Port, PortState, PortIdLocal}; @@ -393,6 +394,7 @@ pub(crate) struct ComponentCtx { changed_in_sync: bool, outbox: VecDeque, state_changes: VecDeque, + // Workspaces that may be used by components to (generally) prevent // allocations. Be a good scout and leave it empty after you've used it. // TODO: Move to scheduler ctx, this is the wrong place @@ -431,6 +433,14 @@ impl ComponentCtx { self.state_changes.push_back(ComponentStateChange::CreatedPort(port)) } + /// Notify the runtime of an error. Note that this will not perform any + /// special action beyond printing the error. The component is responsible + /// for waiting until it is appropriate to shut down (i.e. being outside + /// of a sync region) and returning the `Exit` scheduling code. + pub(crate) fn push_error(&mut self, error: EvalError) { + + } + #[inline] pub(crate) fn get_ports(&self) -> &[Port] { return self.ports.as_slice(); @@ -462,9 +472,17 @@ impl ComponentCtx { /// Submit a message for the scheduler to send to the appropriate receiver. /// May only be called inside of a sync block. - pub(crate) fn submit_message(&mut self, contents: Message) { + pub(crate) fn submit_message(&mut self, contents: Message) -> Result<(), ()> { debug_assert!(self.is_in_sync); + if let Some(port_id) = contents.source_port() { + if self.get_port_by_id(port_id).is_none() { + // We don't own the port + return Err(()); + } + } + self.outbox.push_back(contents); + return Ok(()); } /// Notify that component just finished a sync block. Like