diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index 914ad380d3a355ed5516cf05c7063c80499fcdcc..cddd244fa3f52157eb6ff24e617dd061daf02408 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -181,13 +181,11 @@ impl CompExecState { } } -/// Generic representation of a component's inbox. -// NOTE: It would be nice to remove the `backup` at some point and make the -// blocking occur the moment a peer tries to send a message. -pub(crate) struct CompInbox { - main: Vec>, - backup: Vec, -} +// TODO: Replace when implementing port sending. Should probably be incorporated +// into CompCtx (and rename CompCtx into CompComms) +pub(crate) type InboxMain = Vec>; +pub(crate) type InboxMainRef = [Option]; +pub(crate) type InboxBackup = Vec; /// Creates a new component based on its definition. Meaning that if it is a /// user-defined component then we set up the PDL code state. Otherwise we @@ -281,10 +279,13 @@ pub(crate) enum IncomingData { // nicest if the sending component can figure out it cannot send any more data. #[must_use] pub(crate) fn default_handle_incoming_data_message( - exec_state: &mut CompExecState, port_value_slot: &mut Option, + exec_state: &mut CompExecState, inbox_main: &mut InboxMain, comp_ctx: &mut CompCtx, incoming_message: DataMessage, sched_ctx: &SchedulerCtx, control: &mut ControlLayer ) -> IncomingData { + let port_handle = comp_ctx.get_port_handle(incoming_message.data_header.target_port); + let port_index = comp_ctx.get_port_index(port_handle); + let port_value_slot = &mut inbox_main[port_index]; let target_port_id = incoming_message.data_header.target_port; if port_value_slot.is_none() { @@ -295,7 +296,6 @@ pub(crate) fn default_handle_incoming_data_message( dbg_code!({ // Our port cannot have been blocked itself, because we're able to // directly insert the message into its slot. - let port_handle = comp_ctx.get_port_handle(target_port_id); assert!(!comp_ctx.get_port(port_handle).state.is_blocked()); }); @@ -310,7 +310,6 @@ pub(crate) fn default_handle_incoming_data_message( } else { // Slot is already full, so if the port was previously opened, it will // now become closed - let port_handle = comp_ctx.get_port_handle(target_port_id); let port_info = comp_ctx.get_port_mut(port_handle); debug_assert!(port_info.state == PortState::Open || port_info.state.is_blocked()); // i.e. not closed, but will go off if more states are added in the future @@ -326,19 +325,80 @@ pub(crate) fn default_handle_incoming_data_message( } } +pub(crate) enum GetResult { + Received(DataMessage), + NoMessage, + Error((PortInstruction, String)), +} + +/// Default attempt at trying to receive from a port (i.e. through a `get`, or +/// the equivalent operation for a builtin component). `target_port` is the port +/// we're trying to receive from, and the `target_port_instruction` is the +/// instruction we're attempting on this port. +pub(crate) fn default_attempt_get( + exec_state: &mut CompExecState, target_port: PortId, target_port_instruction: PortInstruction, + inbox_main: &mut InboxMainRef, inbox_backup: &mut InboxBackup, sched_ctx: &SchedulerCtx, + comp_ctx: &mut CompCtx, control: &mut ControlLayer, consensus: &mut Consensus +) -> GetResult { + let port_handle = comp_ctx.get_port_handle(target_port); + let port_index = comp_ctx.get_port_index(port_handle); + + if let Some(message) = &inbox_main[port_index] { + if consensus.try_receive_data_message(sched_ctx, comp_ctx, message) { + // We're allowed to receive this message + let message = inbox_main[port_index].take().unwrap(); + debug_assert_eq!(target_port, message.data_header.target_port); + + // Note: we can still run into an unrecoverable error when actually + // receiving this message + match default_handle_received_data_message( + target_port, target_port_instruction, inbox_main, inbox_backup, + comp_ctx, sched_ctx, control, + ) { + Ok(()) => return GetResult::Received(message), + Err(location_and_message) => return GetResult::Error(location_and_message) + } + } else { + // We're not allowed to receive this message. This means that the + // receiver is attempting to receive something out of order with + // respect to the sender. + return GetResult::Error((target_port_instruction, String::from( + "Cannot get from this port, as this causes a deadlock. This happens if you `get` in a different order as another component `put`s" + ))); + } + } else { + // We don't have a message waiting for us. + let port_info = comp_ctx.get_port(port_handle); + let port_is_closed = port_info.state == PortState::Closed; + if port_is_closed { + let peer_id = port_info.peer_comp_id; + return GetResult::Error(( + target_port_instruction, + format!("Cannot get from this port, as the peer component (id:{}) shut down", peer_id.0) + )); + } + + // No error ocurred, so enter the BlockedGet state + exec_state.set_as_blocked_get(target_port); + return GetResult::NoMessage; + } +} + /// Default handling that has been received through a `get`. Will check if any /// more messages are waiting, and if the corresponding port was blocked because /// of full buffers (hence, will use the control layer to make sure the peer /// will become unblocked). pub(crate) fn default_handle_received_data_message( targeted_port: PortId, port_instruction: PortInstruction, - slot: &mut Option, inbox_backup: &mut Vec, + inbox_main: &mut InboxMainRef, inbox_backup: &mut InboxBackup, comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx, control: &mut ControlLayer ) -> Result<(), (PortInstruction, String)> { + let port_handle = comp_ctx.get_port_handle(targeted_port); + let port_index = comp_ctx.get_port_index(port_handle); + let slot = &mut inbox_main[port_index]; debug_assert!(slot.is_none()); // because we've just received from it // Modify last-known location where port instruction was retrieved - let port_handle = comp_ctx.get_port_handle(targeted_port); let port_info = comp_ctx.get_port_mut(port_handle); port_info.last_instruction = port_instruction; @@ -498,9 +558,42 @@ pub(crate) fn default_handle_control_message( /// Handles a component entering the synchronous block. Will ensure that the /// `Consensus` and the `ComponentCtx` are initialized properly. -pub(crate) fn default_handle_start_sync( - exec_state: &mut CompExecState, -) {} +pub(crate) fn default_handle_sync_start( + exec_state: &mut CompExecState, inbox_main: &mut InboxMainRef, + sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus +) { + sched_ctx.log("Component starting sync mode"); + + // If any messages are present for this sync round, set the appropriate flag + // and notify the consensus handler of the present messages + consensus.notify_sync_start(comp_ctx); + for (port_index, message) in inbox_main.iter().enumerate() { + if let Some(message) = message { + consensus.handle_incoming_data_message(comp_ctx, message); + let port_info = comp_ctx.get_port_by_index_mut(port_index); + port_info.received_message_for_sync = true; + } + } + + // Modify execution state + debug_assert_eq!(exec_state.mode, CompMode::NonSync); + exec_state.mode = CompMode::Sync; +} + +/// Handles a component that has reached the end of the sync block. This does +/// not necessarily mean that the component will go into the `NonSync` mode, as +/// it might have to wait for the leader to finish the round for everyone (see +/// `default_handle_sync_decision`) +pub(crate) fn default_handle_sync_end( + exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, + consensus: &mut Consensus +) { + sched_ctx.log("Component ending sync mode (but possibly waiting for a solution)"); + debug_assert_eq!(exec_state.mode, CompMode::Sync); + let decision = consensus.notify_sync_end_success(sched_ctx, comp_ctx); + exec_state.mode = CompMode::SyncEnd; + default_handle_sync_decision(sched_ctx, exec_state, comp_ctx, decision, consensus); +} /// Handles a component initiating the exiting procedure, and closing all of its /// ports. Should only be called once per component (which is ensured by