diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index a0cc811fcc5e0ee8cfbf52a3be015719545f2dbd..299c64f158dd53f70f55f17456bcb4a1d7570d8c 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -1,3 +1,5 @@ +use std::fmt::{Display as FmtDisplay, Result as FmtResult, Formatter}; + use crate::protocol::eval::{Prompt, EvalError, ValueGroup, PortId as EvalPortId}; use crate::protocol::*; use crate::runtime2::*; @@ -17,6 +19,29 @@ pub enum CompScheduling { Exit, } +/// Potential error emitted by a component +pub enum CompError { + /// Error originating from the code executor. Hence has an associated + /// source location. + Executor(EvalError), + /// Error originating from a component, but not necessarily associated with + /// a location in the source. + Component(String), // TODO: Maybe a different embedded value in the future? + /// Pure runtime error. Not necessarily originating from the component + /// itself. Should be treated as a very severe runtime-compromising error. + Runtime(RtError), +} + +impl FmtDisplay for CompError { + fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { + match self { + CompError::Executor(v) => v.fmt(f), + CompError::Component(v) => v.fmt(f), + CompError::Runtime(v) => v.fmt(f), + } + } +} + /// Generic representation of a component (as viewed by a scheduler). pub(crate) trait Component { /// Called upon the creation of the component. Note that the scheduler @@ -39,10 +64,12 @@ pub(crate) trait Component { /// Called if the component's routine should be executed. The return value /// can be used to indicate when the routine should be run again. - fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result; + fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling; } -/// Representation of the generic operating mode of a component. +/// Representation of the generic operating mode of a component. Although not +/// every state may be used by every kind of (builtin) component, this allows +/// writing standard handlers for particular events in a component's lifetime. #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub(crate) enum CompMode { NonSync, // not in sync mode @@ -51,8 +78,8 @@ pub(crate) enum CompMode { BlockedGet, // blocked because we need to receive a message on a particular port BlockedPut, // component is blocked because the port is blocked BlockedSelect, // waiting on message to complete the select statement - StartExit, // temporary state: if encountered then we start the shutdown process - BusyExit, // temporary state: waiting for Acks for all the closed ports + StartExit, // temporary state: if encountered then we start the shutdown process. + BusyExit, // temporary state: waiting for Acks for all the closed ports, potentially waiting for sync round to finish Exit, // exiting: shutdown process started, now waiting until the reference count drops to 0 } @@ -65,6 +92,43 @@ impl CompMode { NonSync | StartExit | BusyExit | Exit => false, } } + + pub(crate) fn is_busy_exiting(&self) -> bool { + use CompMode::*; + + match self { + NonSync | Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect => false, + StartExit | BusyExit => true, + Exit => false, + } + } +} + +#[derive(Debug)] +pub(crate) enum ExitReason { + Termination, // regular termination of component + ErrorInSync, + ErrorNonSync, +} + +impl ExitReason { + pub(crate) fn is_in_sync(&self) -> bool { + use ExitReason::*; + + match self { + Termination | ErrorNonSync => false, + ErrorInSync => true, + } + } + + pub(crate) fn is_error(&self) -> bool { + use ExitReason::*; + + match self { + Termination => false, + ErrorInSync | ErrorNonSync => true, + } + } } /// Component execution state: the execution mode along with some descriptive @@ -74,6 +138,7 @@ pub(crate) struct CompExecState { pub mode: CompMode, pub mode_port: PortId, // valid if blocked on a port (put/get) pub mode_value: ValueGroup, // valid if blocked on a put + pub exit_reason: ExitReason, // valid if in StartExit/BusyExit/Exit mode } impl CompExecState { @@ -82,9 +147,15 @@ impl CompExecState { mode: CompMode::NonSync, mode_port: PortId::new_invalid(), mode_value: ValueGroup::default(), + exit_reason: ExitReason::Termination, } } + pub(crate) fn set_as_start_exit(&mut self, reason: ExitReason) { + self.mode = CompMode::StartExit; + self.exit_reason = reason; + } + pub(crate) fn set_as_blocked_get(&mut self, port: PortId) { self.mode = CompMode::BlockedGet; self.mode_port = port; @@ -110,6 +181,12 @@ impl CompExecState { } } +// TODO: Replace when implementing port sending. Should probably be incorporated +// into CompCtx (and rename CompCtx into CompComms) +pub(crate) type InboxMain = Vec>; +pub(crate) type InboxMainRef = [Option]; +pub(crate) type InboxBackup = Vec; + /// Creates a new component based on its definition. Meaning that if it is a /// user-defined component then we set up the PDL code state. Otherwise we /// construct a custom component. This does NOT take care of port and message @@ -153,28 +230,39 @@ pub(crate) fn create_component( /// scheduling value must be used. #[must_use] pub(crate) fn default_send_data_message( - exec_state: &mut CompExecState, transmitting_port_id: PortId, value: ValueGroup, + exec_state: &mut CompExecState, transmitting_port_id: PortId, + port_instruction: PortInstruction, value: ValueGroup, sched_ctx: &SchedulerCtx, consensus: &mut Consensus, comp_ctx: &mut CompCtx -) -> CompScheduling { +) -> Result { debug_assert_eq!(exec_state.mode, CompMode::Sync); - // TODO: Handle closed ports let port_handle = comp_ctx.get_port_handle(transmitting_port_id); + let port_info = comp_ctx.get_port_mut(port_handle); + port_info.last_instruction = port_instruction; + let port_info = comp_ctx.get_port(port_handle); debug_assert_eq!(port_info.kind, PortKind::Putter); - if port_info.state.is_blocked() { + + if port_info.state.is_closed() { + // Note: normally peer is eventually consistent, but if it has shut down + // then we can be sure it is consistent (I think?) + return Err(( + port_info.last_instruction, + format!("Cannot send on this port, as the peer (id:{}) has shut down", port_info.peer_comp_id.0) + )) + } else if port_info.state.is_blocked() { // Port is blocked, so we cannot send exec_state.set_as_blocked_put(transmitting_port_id, value); - return CompScheduling::Sleep; + return Ok(CompScheduling::Sleep); } else { // Port is not blocked, so send to the peer let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); let peer_info = comp_ctx.get_peer(peer_handle); let annotated_message = consensus.annotate_data_message(comp_ctx, port_info, value); - peer_info.handle.send_message(&sched_ctx.runtime, Message::Data(annotated_message), true); + peer_info.handle.send_message_logged(sched_ctx, Message::Data(annotated_message), true); - return CompScheduling::Immediate; + return Ok(CompScheduling::Immediate); } } @@ -191,10 +279,14 @@ pub(crate) enum IncomingData { // nicest if the sending component can figure out it cannot send any more data. #[must_use] pub(crate) fn default_handle_incoming_data_message( - exec_state: &mut CompExecState, port_value_slot: &mut Option, + exec_state: &mut CompExecState, inbox_main: &mut InboxMain, comp_ctx: &mut CompCtx, incoming_message: DataMessage, sched_ctx: &SchedulerCtx, control: &mut ControlLayer ) -> IncomingData { + let port_handle = comp_ctx.get_port_handle(incoming_message.data_header.target_port); + let port_index = comp_ctx.get_port_index(port_handle); + comp_ctx.get_port_mut(port_handle).received_message_for_sync = true; + let port_value_slot = &mut inbox_main[port_index]; let target_port_id = incoming_message.data_header.target_port; if port_value_slot.is_none() { @@ -205,7 +297,6 @@ pub(crate) fn default_handle_incoming_data_message( dbg_code!({ // Our port cannot have been blocked itself, because we're able to // directly insert the message into its slot. - let port_handle = comp_ctx.get_port_handle(target_port_id); assert!(!comp_ctx.get_port(port_handle).state.is_blocked()); }); @@ -220,55 +311,123 @@ pub(crate) fn default_handle_incoming_data_message( } else { // Slot is already full, so if the port was previously opened, it will // now become closed - let port_handle = comp_ctx.get_port_handle(target_port_id); let port_info = comp_ctx.get_port_mut(port_handle); - debug_assert!(port_info.state == PortState::Open || port_info.state.is_blocked()); // i.e. not closed, but will go off if more states are added in the future + if port_info.state.is_open() { + port_info.state.set(PortStateFlag::BlockedDueToFullBuffers); - if port_info.state == PortState::Open { - comp_ctx.set_port_state(port_handle, PortState::BlockedDueToFullBuffers); let (peer_handle, message) = control.initiate_port_blocking(comp_ctx, port_handle); let peer = comp_ctx.get_peer(peer_handle); - peer.handle.send_message(&sched_ctx.runtime, Message::Control(message), true); + peer.handle.send_message_logged(sched_ctx, Message::Control(message), true); } return IncomingData::SlotFull(incoming_message) } } +pub(crate) enum GetResult { + Received(DataMessage), + NoMessage, + Error((PortInstruction, String)), +} + +/// Default attempt at trying to receive from a port (i.e. through a `get`, or +/// the equivalent operation for a builtin component). `target_port` is the port +/// we're trying to receive from, and the `target_port_instruction` is the +/// instruction we're attempting on this port. +pub(crate) fn default_attempt_get( + exec_state: &mut CompExecState, target_port: PortId, target_port_instruction: PortInstruction, + inbox_main: &mut InboxMainRef, inbox_backup: &mut InboxBackup, sched_ctx: &SchedulerCtx, + comp_ctx: &mut CompCtx, control: &mut ControlLayer, consensus: &mut Consensus +) -> GetResult { + let port_handle = comp_ctx.get_port_handle(target_port); + let port_index = comp_ctx.get_port_index(port_handle); + + let port_info = comp_ctx.get_port_mut(port_handle); + port_info.last_instruction = target_port_instruction; + if port_info.state.is_closed() { + let peer_id = port_info.peer_comp_id; + return GetResult::Error(( + target_port_instruction, + format!("Cannot get from this port, as the peer component (id:{}) closed the port", peer_id.0) + )); + } + + if let Some(message) = &inbox_main[port_index] { + if consensus.try_receive_data_message(sched_ctx, comp_ctx, message) { + // We're allowed to receive this message + let message = inbox_main[port_index].take().unwrap(); + debug_assert_eq!(target_port, message.data_header.target_port); + + // Note: we can still run into an unrecoverable error when actually + // receiving this message + match default_handle_received_data_message( + target_port, target_port_instruction, inbox_main, inbox_backup, + comp_ctx, sched_ctx, control, + ) { + Ok(()) => return GetResult::Received(message), + Err(location_and_message) => return GetResult::Error(location_and_message) + } + } else { + // We're not allowed to receive this message. This means that the + // receiver is attempting to receive something out of order with + // respect to the sender. + return GetResult::Error((target_port_instruction, String::from( + "Cannot get from this port, as this causes a deadlock. This happens if you `get` in a different order as another component `put`s" + ))); + } + } else { + // We don't have a message waiting for us and the port is not blocked. + // So enter the BlockedGet state + exec_state.set_as_blocked_get(target_port); + return GetResult::NoMessage; + } +} + /// Default handling that has been received through a `get`. Will check if any /// more messages are waiting, and if the corresponding port was blocked because /// of full buffers (hence, will use the control layer to make sure the peer /// will become unblocked). pub(crate) fn default_handle_received_data_message( - targeted_port: PortId, slot: &mut Option, inbox_backup: &mut Vec, + targeted_port: PortId, port_instruction: PortInstruction, + inbox_main: &mut InboxMainRef, inbox_backup: &mut InboxBackup, comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx, control: &mut ControlLayer -) { +) -> Result<(), (PortInstruction, String)> { + let port_handle = comp_ctx.get_port_handle(targeted_port); + let port_index = comp_ctx.get_port_index(port_handle); + let slot = &mut inbox_main[port_index]; debug_assert!(slot.is_none()); // because we've just received from it - // Check if there are any more messages in the backup buffer - let port_handle = comp_ctx.get_port_handle(targeted_port); + // Modify last-known location where port instruction was retrieved let port_info = comp_ctx.get_port(port_handle); + debug_assert_ne!(port_info.last_instruction, PortInstruction::None); // set by caller + debug_assert!(port_info.state.is_open()); // checked by caller + + // Check if there are any more messages in the backup buffer for message_index in 0..inbox_backup.len() { let message = &inbox_backup[message_index]; if message.data_header.target_port == targeted_port { // One more message, place it in the slot let message = inbox_backup.remove(message_index); - debug_assert!(port_info.state.is_blocked()); // since we're removing another message from the backup + debug_assert!(comp_ctx.get_port(port_handle).state.is_blocked()); // since we're removing another message from the backup *slot = Some(message); - return; + return Ok(()); } } // Did not have any more messages, so if we were blocked, then we need to // unblock the port now (and inform the peer of this unblocking) - if port_info.state == PortState::BlockedDueToFullBuffers { - comp_ctx.set_port_state(port_handle, PortState::Open); + if port_info.state.is_set(PortStateFlag::BlockedDueToFullBuffers) { + let port_info = comp_ctx.get_port_mut(port_handle); + port_info.state.clear(PortStateFlag::BlockedDueToFullBuffers); + let (peer_handle, message) = control.cancel_port_blocking(comp_ctx, port_handle); let peer_info = comp_ctx.get_peer(peer_handle); - peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(message), true); + peer_info.handle.send_message_logged(sched_ctx, Message::Control(message), true); } + + return Ok(()); } /// Handles control messages in the default way. Note that this function may @@ -279,78 +438,160 @@ pub(crate) fn default_handle_received_data_message( pub(crate) fn default_handle_control_message( exec_state: &mut CompExecState, control: &mut ControlLayer, consensus: &mut Consensus, message: ControlMessage, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx -) { +) -> Result<(), (PortInstruction, String)> { match message.content { ControlMessageContent::Ack => { default_handle_ack(control, message.id, sched_ctx, comp_ctx); }, - ControlMessageContent::BlockPort(port_id) => { + ControlMessageContent::BlockPort => { // One of our messages was accepted, but the port should be // blocked. - let port_handle = comp_ctx.get_port_handle(port_id); - let port_info = comp_ctx.get_port(port_handle); + let port_to_block = message.target_port_id.unwrap(); + let port_handle = comp_ctx.get_port_handle(port_to_block); + let port_info = comp_ctx.get_port_mut(port_handle); debug_assert_eq!(port_info.kind, PortKind::Putter); - if port_info.state == PortState::Open { - // only when open: we don't do this when closed, and we we don't do this if we're blocked due to peer changes - comp_ctx.set_port_state(port_handle, PortState::BlockedDueToFullBuffers); - } + port_info.state.set(PortStateFlag::BlockedDueToFullBuffers); }, - ControlMessageContent::ClosePort(port_id) => { + ControlMessageContent::ClosePort(content) => { // Request to close the port. We immediately comply and remove // the component handle as well - let port_handle = comp_ctx.get_port_handle(port_id); - let peer_comp_id = comp_ctx.get_port(port_handle).peer_comp_id; + let port_to_close = message.target_port_id.unwrap(); + let port_handle = comp_ctx.get_port_handle(port_to_close); + + // We're closing the port, so we will always update the peer of the + // port (in case of error messages) + let port_info = comp_ctx.get_port_mut(port_handle); + port_info.peer_comp_id = message.sender_comp_id; + port_info.close_at_sync_end = true; // might be redundant (we might set it closed now) + + let peer_comp_id = port_info.peer_comp_id; let peer_handle = comp_ctx.get_peer_handle(peer_comp_id); // One exception to sending an `Ack` is if we just closed the // port ourselves, meaning that the `ClosePort` messages got // sent to one another. if let Some(control_id) = control.has_close_port_entry(port_handle, comp_ctx) { + // The two components (sender and this component) are closing + // the channel at the same time. So we don't care about the + // content of the `ClosePort` message. default_handle_ack(control, control_id, sched_ctx, comp_ctx); } else { + // Respond to the message + let port_info = comp_ctx.get_port(port_handle); + let last_instruction = port_info.last_instruction; + let port_has_had_message = port_info.received_message_for_sync; default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx); - comp_ctx.remove_peer(sched_ctx, port_handle, peer_comp_id, false); // do not remove if closed - comp_ctx.set_port_state(port_handle, PortState::Closed); // now set to closed + comp_ctx.change_port_peer(sched_ctx, port_handle, None); + + // Handle any possible error conditions (which boil down to: the + // port has been used, but the peer has died). If not in sync + // mode then we close the port immediately. + + // Note that `port_was_used` does not mean that any messages + // were actually received. It might also mean that e.g. the + // component attempted a `get`, but there were no messages, so + // now it is in the `BlockedGet` state. + let port_was_used = last_instruction != PortInstruction::None; + + if exec_state.mode.is_in_sync_block() { + let closed_during_sync_round = content.closed_in_sync_round && port_was_used; + let closed_before_sync_round = !content.closed_in_sync_round && !port_has_had_message; + + if closed_during_sync_round || closed_before_sync_round { + return Err(( + last_instruction, + format!("Peer component (id:{}) shut down, so communication cannot (have) succeed(ed)", peer_comp_id.0) + )); + } + } else { + let port_info = comp_ctx.get_port_mut(port_handle); + port_info.state.set(PortStateFlag::Closed); + } } }, - ControlMessageContent::UnblockPort(port_id) => { + ControlMessageContent::UnblockPort => { // We were previously blocked (or already closed) - let port_handle = comp_ctx.get_port_handle(port_id); - let port_info = comp_ctx.get_port(port_handle); + let port_to_unblock = message.target_port_id.unwrap(); + let port_handle = comp_ctx.get_port_handle(port_to_unblock); + let port_info = comp_ctx.get_port_mut(port_handle); + debug_assert_eq!(port_info.kind, PortKind::Putter); - if port_info.state == PortState::BlockedDueToFullBuffers { - default_handle_unblock_put(exec_state, consensus, port_handle, sched_ctx, comp_ctx); - } + debug_assert!(port_info.state.is_set(PortStateFlag::BlockedDueToFullBuffers)); + + port_info.state.clear(PortStateFlag::BlockedDueToFullBuffers); + default_handle_recently_unblocked_port(exec_state, consensus, port_handle, sched_ctx, comp_ctx); }, - ControlMessageContent::PortPeerChangedBlock(port_id) => { + ControlMessageContent::PortPeerChangedBlock => { // The peer of our port has just changed. So we are asked to // temporarily block the port (while our original recipient is // potentially rerouting some of the in-flight messages) and // Ack. Then we wait for the `unblock` call. - debug_assert_eq!(message.target_port_id, Some(port_id)); - let port_handle = comp_ctx.get_port_handle(port_id); - comp_ctx.set_port_state(port_handle, PortState::BlockedDueToPeerChange); + let port_to_change = message.target_port_id.unwrap(); + let port_handle = comp_ctx.get_port_handle(port_to_change); - let port_info = comp_ctx.get_port(port_handle); - let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); + let port_info = comp_ctx.get_port_mut(port_handle); + let peer_comp_id = port_info.peer_comp_id; + port_info.state.set(PortStateFlag::BlockedDueToPeerChange); + let peer_handle = comp_ctx.get_peer_handle(peer_comp_id); default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx); }, ControlMessageContent::PortPeerChangedUnblock(new_port_id, new_comp_id) => { - let port_handle = comp_ctx.get_port_handle(message.target_port_id.unwrap()); + let port_to_change = message.target_port_id.unwrap(); + let port_handle = comp_ctx.get_port_handle(port_to_change); let port_info = comp_ctx.get_port(port_handle); - debug_assert!(port_info.state == PortState::BlockedDueToPeerChange); + debug_assert!(port_info.state.is_set(PortStateFlag::BlockedDueToPeerChange)); let old_peer_id = port_info.peer_comp_id; - comp_ctx.remove_peer(sched_ctx, port_handle, old_peer_id, false); - let port_info = comp_ctx.get_port_mut(port_handle); - port_info.peer_comp_id = new_comp_id; port_info.peer_port_id = new_port_id; - comp_ctx.add_peer(port_handle, sched_ctx, new_comp_id, None); - default_handle_unblock_put(exec_state, consensus, port_handle, sched_ctx, comp_ctx); + + port_info.state.clear(PortStateFlag::BlockedDueToPeerChange); + comp_ctx.change_port_peer(sched_ctx, port_handle, Some(new_comp_id)); + default_handle_recently_unblocked_port(exec_state, consensus, port_handle, sched_ctx, comp_ctx); } } + + return Ok(()); +} + +/// Handles a component entering the synchronous block. Will ensure that the +/// `Consensus` and the `ComponentCtx` are initialized properly. +pub(crate) fn default_handle_sync_start( + exec_state: &mut CompExecState, inbox_main: &mut InboxMainRef, + sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus +) { + sched_ctx.info("Component starting sync mode"); + + // If any messages are present for this sync round, set the appropriate flag + // and notify the consensus handler of the present messages + consensus.notify_sync_start(comp_ctx); + for (port_index, message) in inbox_main.iter().enumerate() { + if let Some(message) = message { + consensus.handle_incoming_data_message(comp_ctx, message); + let port_info = comp_ctx.get_port_by_index_mut(port_index); + port_info.received_message_for_sync = true; + } + } + + // Modify execution state + debug_assert_eq!(exec_state.mode, CompMode::NonSync); + exec_state.mode = CompMode::Sync; +} + +/// Handles a component that has reached the end of the sync block. This does +/// not necessarily mean that the component will go into the `NonSync` mode, as +/// it might have to wait for the leader to finish the round for everyone (see +/// `default_handle_sync_decision`) +pub(crate) fn default_handle_sync_end( + exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, + consensus: &mut Consensus +) { + sched_ctx.info("Component ending sync mode (but possibly waiting for a solution)"); + debug_assert_eq!(exec_state.mode, CompMode::Sync); + let decision = consensus.notify_sync_end_success(sched_ctx, comp_ctx); + exec_state.mode = CompMode::SyncEnd; + default_handle_sync_decision(sched_ctx, exec_state, comp_ctx, decision, consensus); } /// Handles a component initiating the exiting procedure, and closing all of its @@ -359,29 +600,37 @@ pub(crate) fn default_handle_control_message( #[must_use] pub(crate) fn default_handle_start_exit( exec_state: &mut CompExecState, control: &mut ControlLayer, - sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx + sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus ) -> CompScheduling { debug_assert_eq!(exec_state.mode, CompMode::StartExit); - sched_ctx.log("Component starting exit"); + sched_ctx.info(&format!("Component starting exit (reason: {:?})", exec_state.exit_reason)); exec_state.mode = CompMode::BusyExit; + let exit_inside_sync = exec_state.exit_reason.is_in_sync(); + + // If exiting while inside sync mode, report to the leader of the current + // round that we've failed. + if exit_inside_sync { + let decision = consensus.notify_sync_end_failure(sched_ctx, comp_ctx); + default_handle_sync_decision(sched_ctx, exec_state, comp_ctx, decision, consensus); + } - // Iterating by index to work around borrowing rules + // Iterating over ports by index to work around borrowing rules for port_index in 0..comp_ctx.num_ports() { let port = comp_ctx.get_port_by_index_mut(port_index); - if port.state == PortState::Closed { + if port.state.is_closed() || port.close_at_sync_end { // Already closed, or in the process of being closed continue; } // Mark as closed let port_id = port.self_id; - port.state = PortState::Closed; + port.state.set(PortStateFlag::Closed); // Notify peer of closing let port_handle = comp_ctx.get_port_handle(port_id); - let (peer, message) = control.initiate_port_closing(port_handle, comp_ctx); + let (peer, message) = control.initiate_port_closing(port_handle, exit_inside_sync, comp_ctx); let peer_info = comp_ctx.get_peer(peer); - peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(message), true); + peer_info.handle.send_message_logged(sched_ctx, Message::Control(message), true); } return CompScheduling::Immediate; // to check if we can shut down immediately @@ -396,10 +645,10 @@ pub(crate) fn default_handle_busy_exit( ) -> CompScheduling { debug_assert_eq!(exec_state.mode, CompMode::BusyExit); if control.has_acks_remaining() { - sched_ctx.log("Component busy exiting, still has `Ack`s remaining"); + sched_ctx.info("Component busy exiting, still has `Ack`s remaining"); return CompScheduling::Sleep; } else { - sched_ctx.log("Component busy exiting, now shutting down"); + sched_ctx.info("Component busy exiting, now shutting down"); exec_state.mode = CompMode::Exit; return CompScheduling::Exit; } @@ -408,28 +657,74 @@ pub(crate) fn default_handle_busy_exit( /// Handles a potential synchronous round decision. If there was a decision then /// the `Some(success)` value indicates whether the round succeeded or not. /// Might also end up changing the `ExecState`. +/// +/// Might be called in two cases: +/// 1. The component is in regular execution mode, at the end of a sync round, +/// and is waiting for a solution to the round. +/// 2. The component has encountered an error during a sync round and is +/// exiting, hence is waiting for a "Failure" message from the leader. pub(crate) fn default_handle_sync_decision( - exec_state: &mut CompExecState, decision: SyncRoundDecision, - consensus: &mut Consensus + sched_ctx: &SchedulerCtx, exec_state: &mut CompExecState, comp_ctx: &mut CompCtx, + decision: SyncRoundDecision, consensus: &mut Consensus ) -> Option { - debug_assert_eq!(exec_state.mode, CompMode::SyncEnd); let success = match decision { SyncRoundDecision::None => return None, SyncRoundDecision::Solution => true, SyncRoundDecision::Failure => false, }; - debug_assert_eq!(exec_state.mode, CompMode::SyncEnd); + debug_assert!( + exec_state.mode == CompMode::SyncEnd || ( + exec_state.mode.is_busy_exiting() && exec_state.exit_reason.is_error() + ) || ( + exec_state.mode.is_in_sync_block() && decision == SyncRoundDecision::Failure + ) + ); + + sched_ctx.info(&format!("Handling decision {:?} (in mode: {:?})", decision, exec_state.mode)); + consensus.notify_sync_decision(decision); if success { + // We cannot get a success message if the component has encountered an + // error. + for port_index in 0..comp_ctx.num_ports() { + let port_info = comp_ctx.get_port_by_index_mut(port_index); + if port_info.close_at_sync_end { + port_info.state.set(PortStateFlag::Closed); + } + } + debug_assert_eq!(exec_state.mode, CompMode::SyncEnd); exec_state.mode = CompMode::NonSync; - consensus.notify_sync_decision(decision); return Some(true); } else { - exec_state.mode = CompMode::StartExit; + // We may get failure both in all possible cases. But we should only + // modify the execution state if we're not already in exit mode + if !exec_state.mode.is_busy_exiting() { + sched_ctx.error("failed synchronous round, initiating exit"); + exec_state.set_as_start_exit(ExitReason::ErrorNonSync); + } return Some(false); } } +/// Performs the default action of printing the provided error, and then putting +/// the component in the state where it will shut down. Only to be used for +/// builtin components: their error message construction is simpler (and more +/// common) as they don't have any source code. +pub(crate) fn default_handle_error_for_builtin( + exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, + location_and_message: (PortInstruction, String) +) { + let (_location, message) = location_and_message; + sched_ctx.error(&message); + + let exit_reason = if exec_state.mode.is_in_sync_block() { + ExitReason::ErrorInSync + } else { + ExitReason::ErrorNonSync + }; + + exec_state.set_as_start_exit(exit_reason); +} #[inline] pub(crate) fn default_handle_exit(_exec_state: &CompExecState) -> CompScheduling { @@ -454,7 +749,7 @@ fn default_handle_ack( AckAction::SendMessage(target_comp, message) => { // FIX @NoDirectHandle let mut handle = sched_ctx.runtime.get_component_public(target_comp); - handle.send_message(&sched_ctx.runtime, Message::Control(message), true); + handle.send_message_logged(sched_ctx, Message::Control(message), true); let _should_remove = handle.decrement_users(); debug_assert!(_should_remove.is_none()); }, @@ -486,7 +781,7 @@ fn default_send_ack( sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx ) { let peer_info = comp_ctx.get_peer(peer_handle); - peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(ControlMessage{ + peer_info.handle.send_message_logged(sched_ctx, Message::Control(ControlMessage{ id: causer_of_ack_id, sender_comp_id: comp_ctx.id, target_port_id: None, @@ -496,14 +791,13 @@ fn default_send_ack( /// Handles the unblocking of a putter port. In case there is a pending message /// on that port then it will be sent. -fn default_handle_unblock_put( +fn default_handle_recently_unblocked_port( exec_state: &mut CompExecState, consensus: &mut Consensus, port_handle: LocalPortHandle, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, ) { let port_info = comp_ctx.get_port_mut(port_handle); let port_id = port_info.self_id; - debug_assert!(port_info.state.is_blocked()); - port_info.state = PortState::Open; + debug_assert!(!port_info.state.is_blocked()); // should have been done by the caller if exec_state.is_blocked_on_put(port_id) { // Annotate the message that we're going to send @@ -515,7 +809,7 @@ fn default_handle_unblock_put( // Retrieve peer to send the message let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); let peer_info = comp_ctx.get_peer(peer_handle); - peer_info.handle.send_message(&sched_ctx.runtime, Message::Data(to_send), true); + peer_info.handle.send_message_logged(sched_ctx, Message::Data(to_send), true); exec_state.mode = CompMode::Sync; // because we're blocked on a `put`, we must've started in the sync state. exec_state.mode_port = PortId::new_invalid();