diff --git a/src/runtime2/component/control_layer.rs b/src/runtime2/component/control_layer.rs index b7ad2c40b907deac5b614da1e0cba99181993434..9eabc0cd5f0534f877c7251c46b1296ac11bfbf4 100644 --- a/src/runtime2/component/control_layer.rs +++ b/src/runtime2/component/control_layer.rs @@ -5,12 +5,12 @@ use crate::runtime2::component::*; use super::component_context::*; #[derive(Debug, PartialEq, Eq, Clone, Copy)] -pub(crate) struct ControlId(u32); +pub(crate) struct ControlId(pub(crate) u32); impl ControlId { /// Like other invalid IDs, this one doesn't care any significance, but is /// just set at u32::MAX to hopefully bring out bugs sooner. - fn new_invalid() -> Self { + pub(crate) fn new_invalid() -> Self { return ControlId(u32::MAX); } } @@ -25,6 +25,7 @@ enum ControlContent { PeerChange(ContentPeerChange), ScheduleComponent(CompId), ClosedPort(PortId), + UnblockPutWithPorts } struct ContentPeerChange { @@ -45,6 +46,7 @@ pub(crate) enum AckAction { None, SendMessage(CompId, ControlMessage), ScheduleComponent(CompId), + UnblockPutWithPorts, } /// Handling/sending control messages. @@ -128,6 +130,9 @@ impl ControlLayer { comp_ctx.change_port_peer(sched_ctx, port_handle, None); return (AckAction::None, None); + }, + ControlContent::UnblockPutWithPorts => { + return (AckAction::UnblockPutWithPorts, None); } } } @@ -137,7 +142,7 @@ impl ControlLayer { } // ------------------------------------------------------------------------- - // Port transfer (due to component creation) + // Port transfer // ------------------------------------------------------------------------- /// Adds an entry that, when completely ack'd, will schedule a component. @@ -152,15 +157,20 @@ impl ControlLayer { return entry_id; } - /// Removes a schedule entry. Only used if the caller preemptively called - /// `add_schedule_entry`, but ended up not calling `add_reroute_entry`, - /// hence the `ack_countdown` in the scheduling entry is at 0. - pub(crate) fn remove_schedule_entry(&mut self, schedule_entry_id: ControlId) { - let index = self.get_entry_index_by_id(schedule_entry_id).unwrap(); - debug_assert_eq!(self.entries[index].ack_countdown, 0); - self.entries.remove(index); + /// Adds an entry that returns the similarly named Ack action + pub(crate) fn add_unblock_put_with_ports_entry(&mut self) -> ControlId { + let entry_id = self.take_id(); + self.entries.push(ControlEntry{ + id: entry_id, + ack_countdown: 0, // incremented by calls to `add_reroute_entry` + content: ControlContent::UnblockPutWithPorts, + }); + + return entry_id; } + /// Adds a rerouting entry (used to ensure all messages will end up at a + /// newly created component). Used when creating a new component. pub(crate) fn add_reroute_entry( &mut self, creator_comp_id: CompId, source_port_id: PortId, source_comp_id: CompId, @@ -193,6 +203,24 @@ impl ControlLayer { }) } + /// Creates a PortPeerChanged message (and increments ack-counter on a + /// pre-created control entry) that is used as a preliminary step before + /// transferring a port over a channel. + pub(crate) fn create_port_transfer_message( + &mut self, associated_control_id: ControlId, + sender_comp_id: CompId, target_port_id: PortId + ) -> Message { + let entry_index = self.get_entry_index_by_id(associated_control_id).unwrap(); + self.entries[entry_index].ack_countdown += 1; + + return Message::Control(ControlMessage{ + id: associated_control_id, + sender_comp_id, + target_port_id: Some(target_port_id), + content: ControlMessageContent::PortPeerChangedBlock + }) + } + // ------------------------------------------------------------------------- // Blocking, unblocking, and closing ports // -------------------------------------------------------------------------