diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index d50a9ef1a923719d29f2ab9e348e11c9e66117a9..a18a96f7a77b3b69cd72e287478626649fb3b423 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -137,6 +137,96 @@ pub(crate) fn create_component( // Generic component messaging utilities (for sending and receiving) // ----------------------------------------------------------------------------- +/// Default handling of sending a data message. In case the port is blocked then +/// the `ExecState` will become blocked as well. Note that +/// `default_handle_control_message` will ensure that the port becomes +/// unblocked if so instructed by the receiving component. The returned +/// scheduling value must be used. +#[must_use] +pub(crate) fn default_send_data_message( + exec_state: &mut CompExecState, transmitting_port_id: PortId, value: ValueGroup, + sched_ctx: &SchedulerCtx, consensus: &mut Consensus, comp_ctx: &mut CompCtx +) -> CompScheduling { + debug_assert_eq!(exec_state.mode, CompMode::Sync); + + // TODO: Handle closed ports + let port_handle = comp_ctx.get_port_handle(transmitting_port_id); + let port_info = comp_ctx.get_port(port_handle); + debug_assert_eq!(port_info.kind, PortKind::Putter); + if port_info.state.is_blocked() { + // Port is blocked, so we cannot send + exec_state.set_as_blocked_put(transmitting_port_id, value); + + return CompScheduling::Sleep; + } else { + // Port is not blocked, so send to the peer + let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); + let peer_info = comp_ctx.get_peer(peer_handle); + let annotated_message = consensus.annotate_data_message(comp_ctx, port_info, value); + peer_info.handle.send_message(&sched_ctx.runtime, Message::Data(annotated_message), true); + + return CompScheduling::Immediate; + } +} + +pub(crate) enum IncomingData { + PlacedInSlot, + SlotFull(DataMessage), +} + +/// Default handling of receiving a data message. In case there is no room for +/// the message it is returned from this function. Note that this function is +/// different from PDL code performing a `get` on a port; this is the case where +/// the message first arrives at the component. +// NOTE: This is supposed to be a somewhat temporary implementation. It would be +// nicest if the sending component can figure out it cannot send any more data. +#[must_use] +pub(crate) fn default_handle_incoming_data_message( + exec_state: &mut CompExecState, port_value_slot: &mut Option, + comp_ctx: &mut CompCtx, incoming_message: DataMessage, + sched_ctx: &SchedulerCtx, control: &mut ControlLayer +) -> IncomingData { + let target_port_id = incoming_message.data_header.target_port; + + if port_value_slot.is_none() { + // We can put the value in the slot + *port_value_slot = Some(incoming_message); + + // Check if we're blocked on receiving this message. + dbg_code!({ + // Our port cannot have been blocked itself, because we're able to + // directly insert the message into its slot. + let port_handle = comp_ctx.get_port_handle(target_port_id); + assert!(!comp_ctx.get_port(port_handle).state.is_blocked()); + }); + + if exec_state.is_blocked_on_get(target_port_id) { + // Return to normal operation + exec_state.mode = CompMode::Sync; + exec_state.mode_port = PortId::new_invalid(); + debug_assert!(exec_state.mode_value.values.is_empty()); + } + + return IncomingData::PlacedInSlot + } else { + // Slot is already full, so if the port was previously opened, it will + // now become closed + let port_handle = comp_ctx.get_port_handle(target_port_id); + let port_info = comp_ctx.get_port_mut(port_handle); + debug_assert!(port_info.state == PortState::Open || port_info.state.is_blocked()); // i.e. not closed, but will go off if more states are added in the future + + if port_info.state == PortState::Open { + comp_ctx.set_port_state(port_handle, PortState::BlockedDueToFullBuffers); + let (peer_handle, message) = + control.initiate_port_blocking(comp_ctx, port_handle); + let peer = comp_ctx.get_peer(peer_handle); + peer.handle.send_message(&sched_ctx.runtime, Message::Control(message), true); + } + + return IncomingData::SlotFull(incoming_message) + } +} + /// Handles control messages in the default way. Note that this function may /// take a lot of actions in the name of the caller: pending messages may be /// sent, ports may become blocked/unblocked, etc. So the execution @@ -222,6 +312,7 @@ pub(crate) fn default_handle_control_message( /// Handles a component initiating the exiting procedure, and closing all of its /// ports. Should only be called once per component (which is ensured by /// checking and modifying the mode in the execution state). +#[must_use] pub(crate) fn default_handle_start_exit( exec_state: &mut CompExecState, control: &mut ControlLayer, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx @@ -254,6 +345,7 @@ pub(crate) fn default_handle_start_exit( /// Handles a component waiting until all peers are notified that it is quitting /// (i.e. after calling `default_handle_start_exit`). +#[must_use] pub(crate) fn default_handle_busy_exit( exec_state: &mut CompExecState, control: &ControlLayer, sched_ctx: &SchedulerCtx @@ -271,6 +363,7 @@ pub(crate) fn default_handle_busy_exit( /// Handles a potential synchronous round decision. If there was a decision then /// the `Some(success)` value indicates whether the round succeeded or not. +/// Might also end up changing the `ExecState`. pub(crate) fn default_handle_sync_decision( exec_state: &mut CompExecState, decision: SyncRoundDecision, consensus: &mut Consensus