diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index 828421de9704ad6aa50d13a770b5ba0fe6984cbd..a0cc811fcc5e0ee8cfbf52a3be015719545f2dbd 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -19,9 +19,16 @@ pub enum CompScheduling { /// Generic representation of a component (as viewed by a scheduler). pub(crate) trait Component { - /// Called upon the creation of the component. + /// Called upon the creation of the component. Note that the scheduler + /// context is officially running another component (the component that is + /// creating the new component). fn on_creation(&mut self, comp_id: CompId, sched_ctx: &SchedulerCtx); + /// Called when a component crashes or wishes to exit. So is not called + /// right before destruction, other components may still hold a handle to + /// the component and send it messages! + fn on_shutdown(&mut self, sched_ctx: &SchedulerCtx); + /// Called if the component is created by another component and the messages /// are being transferred between the two. fn adopt_message(&mut self, comp_ctx: &mut CompCtx, message: DataMessage); @@ -229,6 +236,41 @@ pub(crate) fn default_handle_incoming_data_message( } } +/// Default handling that has been received through a `get`. Will check if any +/// more messages are waiting, and if the corresponding port was blocked because +/// of full buffers (hence, will use the control layer to make sure the peer +/// will become unblocked). +pub(crate) fn default_handle_received_data_message( + targeted_port: PortId, slot: &mut Option, inbox_backup: &mut Vec, + comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx, control: &mut ControlLayer +) { + debug_assert!(slot.is_none()); // because we've just received from it + + // Check if there are any more messages in the backup buffer + let port_handle = comp_ctx.get_port_handle(targeted_port); + let port_info = comp_ctx.get_port(port_handle); + for message_index in 0..inbox_backup.len() { + let message = &inbox_backup[message_index]; + if message.data_header.target_port == targeted_port { + // One more message, place it in the slot + let message = inbox_backup.remove(message_index); + debug_assert!(port_info.state.is_blocked()); // since we're removing another message from the backup + *slot = Some(message); + + return; + } + } + + // Did not have any more messages, so if we were blocked, then we need to + // unblock the port now (and inform the peer of this unblocking) + if port_info.state == PortState::BlockedDueToFullBuffers { + comp_ctx.set_port_state(port_handle, PortState::Open); + let (peer_handle, message) = control.cancel_port_blocking(comp_ctx, port_handle); + let peer_info = comp_ctx.get_peer(peer_handle); + peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(message), true); + } +} + /// Handles control messages in the default way. Note that this function may /// take a lot of actions in the name of the caller: pending messages may be /// sent, ports may become blocked/unblocked, etc. So the execution