diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index 5d1b853af70754d563968c1b355dadd4c0c48e92..914ad380d3a355ed5516cf05c7063c80499fcdcc 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -67,7 +67,9 @@ pub(crate) trait Component { fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling; } -/// Representation of the generic operating mode of a component. +/// Representation of the generic operating mode of a component. Although not +/// every state may be used by every kind of (builtin) component, this allows +/// writing standard handlers for particular events in a component's lifetime. #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub(crate) enum CompMode { NonSync, // not in sync mode @@ -179,6 +181,14 @@ impl CompExecState { } } +/// Generic representation of a component's inbox. +// NOTE: It would be nice to remove the `backup` at some point and make the +// blocking occur the moment a peer tries to send a message. +pub(crate) struct CompInbox { + main: Vec>, + backup: Vec, +} + /// Creates a new component based on its definition. Meaning that if it is a /// user-defined component then we set up the PDL code state. Otherwise we /// construct a custom component. This does NOT take care of port and message @@ -409,28 +419,38 @@ pub(crate) fn default_handle_control_message( // sent to one another. if let Some(control_id) = control.has_close_port_entry(port_handle, comp_ctx) { // The two components (sender and this component) are closing - // the channel at the same time. + // the channel at the same time. So we don't care about the + // content of the `ClosePort` message. default_handle_ack(control, control_id, sched_ctx, comp_ctx); } else { // Respond to the message let last_instruction = port_info.last_instruction; - let port_was_used = last_instruction != PortInstruction::None; + let port_has_had_message = port_info.received_message_for_sync; default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx); comp_ctx.remove_peer(sched_ctx, port_handle, peer_comp_id, false); // do not remove if closed - comp_ctx.set_port_state(port_handle, PortState::Closed); // now set to closed - - // Make sure that we've not reached an error condition. Note - // that if this condition is not met, then we don't error out - // now, but we may error out in the next sync block when we - // try to `put`/`get` on the port. This condition makes sure - // that if we have a successful sync round, followed by the peer - // closing the port, that we don't consider the sync round to - // have failed by mistake. - if content.closed_in_sync_round && exec_state.mode.is_in_sync_block() && port_was_used { - return Err(( - last_instruction, - format!("Peer component (id:{}) shut down, so previous communication cannot have succeeded", peer_comp_id.0) - )); + + // Handle any possible error conditions (which boil down to: the + // port has been used, but the peer has died). If not in sync + // mode then we close the port immediately. + + // Note that `port_was_used` does not mean that any messages + // were actually received. It might also mean that e.g. the + // component attempted a `get`, but there were no messages, so + // now it is in the `BlockedGet` state. + let port_was_used = last_instruction != PortInstruction::None; + + if exec_state.mode.is_in_sync_block() { + let closed_during_sync_round = content.closed_in_sync_round && port_was_used; + // TODO: Finish this + + if closed_during_sync_round { + return Err(( + last_instruction, + format!("Peer component (id:{}) shut down, so previous communication cannot have succeeded", peer_comp_id.0) + )); + } + } else { + comp_ctx.set_port_state(port_handle, PortState::Closed); } } }, @@ -476,6 +496,12 @@ pub(crate) fn default_handle_control_message( return Ok(()); } +/// Handles a component entering the synchronous block. Will ensure that the +/// `Consensus` and the `ComponentCtx` are initialized properly. +pub(crate) fn default_handle_start_sync( + exec_state: &mut CompExecState, +) {} + /// Handles a component initiating the exiting procedure, and closing all of its /// ports. Should only be called once per component (which is ensured by /// checking and modifying the mode in the execution state). @@ -493,7 +519,7 @@ pub(crate) fn default_handle_start_exit( // round that we've failed. if exit_inside_sync { let decision = consensus.notify_sync_end_failure(sched_ctx, comp_ctx); - default_handle_sync_decision(sched_ctx, exec_state, decision, consensus); + default_handle_sync_decision(sched_ctx, exec_state, comp_ctx, decision, consensus); } // Iterating over ports by index to work around borrowing rules