diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index f3a14e53bda533ed9cf9c8fcb57c487321bf6225..1ce574c330f24cb8b74fc25d7d5fdb35d5e6c00e 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -1,3 +1,5 @@ +use std::fmt::{Display as FmtDisplay, Result as FmtResult, Formatter}; + use crate::protocol::eval::{Prompt, EvalError, ValueGroup, PortId as EvalPortId}; use crate::protocol::*; use crate::runtime2::*; @@ -17,6 +19,29 @@ pub enum CompScheduling { Exit, } +/// Potential error emitted by a component +pub enum CompError { + /// Error originating from the code executor. Hence has an associated + /// source location. + Executor(EvalError), + /// Error originating from a component, but not necessarily associated with + /// a location in the source. + Component(String), // TODO: Maybe a different embedded value in the future? + /// Pure runtime error. Not necessarily originating from the component + /// itself. Should be treated as a very severe runtime-compromising error. + Runtime(RtError), +} + +impl FmtDisplay for CompError { + fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { + match self { + CompError::Executor(v) => v.fmt(f), + CompError::Component(v) => v.fmt(f), + CompError::Runtime(v) => v.fmt(f), + } + } +} + /// Generic representation of a component (as viewed by a scheduler). pub(crate) trait Component { /// Called upon the creation of the component. Note that the scheduler @@ -39,7 +64,7 @@ pub(crate) trait Component { /// Called if the component's routine should be executed. The return value /// can be used to indicate when the routine should be run again. - fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result; + fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling; } /// Representation of the generic operating mode of a component. @@ -51,8 +76,8 @@ pub(crate) enum CompMode { BlockedGet, // blocked because we need to receive a message on a particular port BlockedPut, // component is blocked because the port is blocked BlockedSelect, // waiting on message to complete the select statement - StartExit, // temporary state: if encountered then we start the shutdown process - BusyExit, // temporary state: waiting for Acks for all the closed ports + StartExit, // temporary state: if encountered then we start the shutdown process. + BusyExit, // temporary state: waiting for Acks for all the closed ports, potentially waiting for sync round to finish Exit, // exiting: shutdown process started, now waiting until the reference count drops to 0 } @@ -65,6 +90,43 @@ impl CompMode { NonSync | StartExit | BusyExit | Exit => false, } } + + pub(crate) fn is_busy_exiting(&self) -> bool { + use CompMode::*; + + match self { + NonSync | Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect => false, + StartExit | BusyExit => true, + Exit => false, + } + } +} + +#[derive(Debug)] +pub(crate) enum ExitReason { + Termination, // regular termination of component + ErrorInSync, + ErrorNonSync, +} + +impl ExitReason { + pub(crate) fn is_in_sync(&self) -> bool { + use ExitReason::*; + + match self { + Termination | ErrorNonSync => false, + ErrorInSync => true, + } + } + + pub(crate) fn is_error(&self) -> bool { + use ExitReason::*; + + match self { + Termination => false, + ErrorInSync | ErrorNonSync => true, + } + } } /// Component execution state: the execution mode along with some descriptive @@ -74,6 +136,7 @@ pub(crate) struct CompExecState { pub mode: CompMode, pub mode_port: PortId, // valid if blocked on a port (put/get) pub mode_value: ValueGroup, // valid if blocked on a put + pub exit_reason: ExitReason, // valid if in StartExit/BusyExit/Exit mode } impl CompExecState { @@ -82,9 +145,15 @@ impl CompExecState { mode: CompMode::NonSync, mode_port: PortId::new_invalid(), mode_value: ValueGroup::default(), + exit_reason: ExitReason::Termination, } } + pub(crate) fn set_as_start_exit(&mut self, reason: ExitReason) { + self.mode = CompMode::StartExit; + self.exit_reason = reason; + } + pub(crate) fn set_as_blocked_get(&mut self, port: PortId) { self.mode = CompMode::BlockedGet; self.mode_port = port; @@ -153,19 +222,26 @@ pub(crate) fn create_component( /// scheduling value must be used. #[must_use] pub(crate) fn default_send_data_message( - exec_state: &mut CompExecState, transmitting_port_id: PortId, value: ValueGroup, + exec_state: &mut CompExecState, transmitting_port_id: PortId, + port_instruction: PortInstruction, value: ValueGroup, sched_ctx: &SchedulerCtx, consensus: &mut Consensus, comp_ctx: &mut CompCtx -) -> Result { // @nocommit: Something better than Err(String) +) -> Result { debug_assert_eq!(exec_state.mode, CompMode::Sync); let port_handle = comp_ctx.get_port_handle(transmitting_port_id); + let port_info = comp_ctx.get_port_mut(port_handle); + port_info.last_instruction = port_instruction; + let port_info = comp_ctx.get_port(port_handle); debug_assert_eq!(port_info.kind, PortKind::Putter); if port_info.state == PortState::Closed { // Note: normally peer is eventually consistent, but if it has shut down // then we can be sure it is consistent (I think?) - return Err(format!("Cannot send on this port, as the peer (id:{}) has shut down", port_info.peer_comp_id.0)) + return Err(( + port_info.last_instruction, + format!("Cannot send on this port, as the peer (id:{}) has shut down", port_info.peer_comp_id.0) + )) } else if port_info.state.is_blocked() { // Port is blocked, so we cannot send exec_state.set_as_blocked_put(transmitting_port_id, value); @@ -245,13 +321,25 @@ pub(crate) fn default_handle_incoming_data_message( /// of full buffers (hence, will use the control layer to make sure the peer /// will become unblocked). pub(crate) fn default_handle_received_data_message( - targeted_port: PortId, slot: &mut Option, inbox_backup: &mut Vec, + targeted_port: PortId, port_instruction: PortInstruction, + slot: &mut Option, inbox_backup: &mut Vec, comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx, control: &mut ControlLayer -) { +) -> Result<(), (PortInstruction, String)> { debug_assert!(slot.is_none()); // because we've just received from it - // Check if there are any more messages in the backup buffer + // Modify last-known location where port instruction was retrieved let port_handle = comp_ctx.get_port_handle(targeted_port); + let port_info = comp_ctx.get_port_mut(port_handle); + port_info.last_instruction = port_instruction; + + if port_info.state == PortState::Closed { + return Err(( + port_info.last_instruction, + format!("Cannot 'get' because the channel is closed")) + ); + } + + // Check if there are any more messages in the backup buffer let port_info = comp_ctx.get_port(port_handle); for message_index in 0..inbox_backup.len() { let message = &inbox_backup[message_index]; @@ -261,7 +349,7 @@ pub(crate) fn default_handle_received_data_message( debug_assert!(port_info.state.is_blocked()); // since we're removing another message from the backup *slot = Some(message); - return; + return Ok(()); } } @@ -273,6 +361,8 @@ pub(crate) fn default_handle_received_data_message( let peer_info = comp_ctx.get_peer(peer_handle); peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(message), true); } + + return Ok(()); } /// Handles control messages in the default way. Note that this function may @@ -283,7 +373,7 @@ pub(crate) fn default_handle_received_data_message( pub(crate) fn default_handle_control_message( exec_state: &mut CompExecState, control: &mut ControlLayer, consensus: &mut Consensus, message: ControlMessage, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx -) -> Result<(), String> { // @nocommit, use something else than Err(String) +) -> Result<(), (PortInstruction, String)> { match message.content { ControlMessageContent::Ack => { default_handle_ack(control, message.id, sched_ctx, comp_ctx); @@ -302,15 +392,17 @@ pub(crate) fn default_handle_control_message( ControlMessageContent::ClosePort(content) => { // Request to close the port. We immediately comply and remove // the component handle as well - let port_handle = comp_ctx.get_port_handle(content.port_id); - let port_info = comp_ctx.get_port_mut(port_handle); - let peer_comp_id = port_info.peer_comp_id; - let peer_handle = comp_ctx.get_peer_handle(peer_comp_id); + let port_handle = comp_ctx.get_port_handle(content.port_to_close); // We're closing the port, so we will always update the peer of the // port (in case of error messages) + let port_info = comp_ctx.get_port_mut(port_handle); port_info.peer_comp_id = message.sender_comp_id; + let port_info = comp_ctx.get_port(port_handle); + let peer_comp_id = port_info.peer_comp_id; + let peer_handle = comp_ctx.get_peer_handle(peer_comp_id); + // One exception to sending an `Ack` is if we just closed the // port ourselves, meaning that the `ClosePort` messages got // sent to one another. @@ -334,19 +426,10 @@ pub(crate) fn default_handle_control_message( // closing the port, that we don't consider the sync round to // have failed by mistake. if content.closed_in_sync_round && exec_state.mode.is_in_sync_block() && port_was_used { - let error_message = match last_instruction { - PortInstruction::None => unreachable!(), // port was used - PortInstruction::NoSource => format!( - "Peer component (id:{}) shut down, so operation on port cannot have succeeded", - message.sender_comp_id.0 - ), - PortInstruction::SourceLocation(source_location) => format!( - "Peer component (id:{}) shut down, so this operation cannot have succeeded", - message.sender_comp_id.0 - ), - }; - - return Err(error_message); + return Err(( + last_instruction, + format!("Peer component (id:{}) shut down, so previous communication cannot have succeeded", peer_comp_id.0) + )); } } }, @@ -398,13 +481,21 @@ pub(crate) fn default_handle_control_message( #[must_use] pub(crate) fn default_handle_start_exit( exec_state: &mut CompExecState, control: &mut ControlLayer, - sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx + sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus ) -> CompScheduling { debug_assert_eq!(exec_state.mode, CompMode::StartExit); sched_ctx.log("Component starting exit"); exec_state.mode = CompMode::BusyExit; + let exit_inside_sync = exec_state.exit_reason.is_in_sync(); + + // If exiting while inside sync mode, report to the leader of the current + // round that we've failed. + if exit_inside_sync { + let decision = consensus.notify_sync_end_failure(sched_ctx, comp_ctx); + default_handle_sync_decision(sched_ctx, exec_state, decision, consensus); + } - // Iterating by index to work around borrowing rules + // Iterating over ports by index to work around borrowing rules for port_index in 0..comp_ctx.num_ports() { let port = comp_ctx.get_port_by_index_mut(port_index); if port.state == PortState::Closed { @@ -418,7 +509,7 @@ pub(crate) fn default_handle_start_exit( // Notify peer of closing let port_handle = comp_ctx.get_port_handle(port_id); - let (peer, message) = control.initiate_port_closing(port_handle, comp_ctx); + let (peer, message) = control.initiate_port_closing(port_handle, exit_inside_sync, comp_ctx); let peer_info = comp_ctx.get_peer(peer); peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(message), true); } @@ -447,28 +538,68 @@ pub(crate) fn default_handle_busy_exit( /// Handles a potential synchronous round decision. If there was a decision then /// the `Some(success)` value indicates whether the round succeeded or not. /// Might also end up changing the `ExecState`. +/// +/// Might be called in two cases: +/// 1. The component is in regular execution mode, at the end of a sync round, +/// and is waiting for a solution to the round. +/// 2. The component has encountered an error during a sync round and is +/// exiting, hence is waiting for a "Failure" message from the leader. pub(crate) fn default_handle_sync_decision( - exec_state: &mut CompExecState, decision: SyncRoundDecision, - consensus: &mut Consensus + sched_ctx: &SchedulerCtx, exec_state: &mut CompExecState, + decision: SyncRoundDecision, consensus: &mut Consensus ) -> Option { - debug_assert_eq!(exec_state.mode, CompMode::SyncEnd); let success = match decision { SyncRoundDecision::None => return None, SyncRoundDecision::Solution => true, SyncRoundDecision::Failure => false, }; - debug_assert_eq!(exec_state.mode, CompMode::SyncEnd); + debug_assert!( + exec_state.mode == CompMode::SyncEnd || ( + exec_state.mode.is_busy_exiting() && exec_state.exit_reason.is_error() + ) || ( + exec_state.mode.is_in_sync_block() && decision == SyncRoundDecision::Failure + ) + ); + + sched_ctx.log(&format!("Handling decision {:?} (in mode: {:?})", decision, exec_state.mode)); + consensus.notify_sync_decision(decision); if success { + // We cannot get a success message if the component has encountered an + // error. + debug_assert_eq!(exec_state.mode, CompMode::SyncEnd); exec_state.mode = CompMode::NonSync; - consensus.notify_sync_decision(decision); return Some(true); } else { - exec_state.mode = CompMode::StartExit; + // We may get failure both in all possible cases. But we should only + // modify the execution state if we're not already in exit mode + if !exec_state.mode.is_busy_exiting() { + sched_ctx.error("failed synchronous round, initiating exit"); + exec_state.set_as_start_exit(ExitReason::ErrorNonSync); + } return Some(false); } } +/// Performs the default action of printing the provided error, and then putting +/// the component in the state where it will shut down. Only to be used for +/// builtin components: their error message construction is simpler (and more +/// common) as they don't have any source code. +pub(crate) fn default_handle_error_for_builtin( + exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, + location_and_message: (PortInstruction, String) +) { + let (_location, message) = location_and_message; + sched_ctx.error(&message); + + let exit_reason = if exec_state.mode.is_in_sync_block() { + ExitReason::ErrorInSync + } else { + ExitReason::ErrorNonSync + }; + + exec_state.set_as_start_exit(exit_reason); +} #[inline] pub(crate) fn default_handle_exit(_exec_state: &CompExecState) -> CompScheduling {