diff --git a/src/runtime2/component/control_layer.rs b/src/runtime2/component/control_layer.rs index c026ef88f9d3836dd87fc8795230c68bbc27d343..da7bae60bb8b257f65b3b9e05780aeda5fc1b267 100644 --- a/src/runtime2/component/control_layer.rs +++ b/src/runtime2/component/control_layer.rs @@ -37,9 +37,14 @@ struct ContentPeerChange { schedule_entry_id: ControlId, } +struct ControlClosedPort { + closed_port: PortId, + exit_entry_id: Option, +} + pub(crate) enum AckAction { None, - SendMessageAndAck(CompId, ControlMessage, ControlId), + SendMessage(CompId, ControlMessage), ScheduleComponent(CompId), } @@ -73,18 +78,23 @@ impl ControlLayer { return None; } - pub(crate) fn handle_ack(&mut self, entry_id: ControlId, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> AckAction { + /// Handles an acknowledgement. The returned action must be performed by the + /// caller. The optionally returned `ControlId` must be used and passed to + /// `handle_ack` again. + pub(crate) fn handle_ack(&mut self, entry_id: ControlId, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> (AckAction, Option) { let entry_index = self.get_entry_index_by_id(entry_id).unwrap(); let entry = &mut self.entries[entry_index]; debug_assert!(entry.ack_countdown > 0); entry.ack_countdown -= 1; if entry.ack_countdown != 0 { - return AckAction::None; + return (AckAction::None, None); } + let entry = self.entries.remove(entry_index); + // All `Ack`s received, take action based on the kind of entry - match &entry.content { + match entry.content { ControlContent::PeerChange(content) => { // If change of peer is ack'd. Then we are certain we have // rerouted all of the messages, and the sender's port can now @@ -100,29 +110,36 @@ impl ControlLayer { ) }; let to_ack = content.schedule_entry_id; - self.entries.remove(entry_index); - return AckAction::SendMessageAndAck(target_comp_id, message_to_send, to_ack); + return ( + AckAction::SendMessage(target_comp_id, message_to_send), + Some(to_ack) + ); }, ControlContent::ScheduleComponent(to_schedule) => { // If all change-of-peers are `Ack`d, then we're ready to // schedule the component! - return AckAction::ScheduleComponent(*to_schedule); + return (AckAction::ScheduleComponent(to_schedule), None); }, ControlContent::BlockedPort(_) => unreachable!(), - ControlContent::ClosedPort(port_id) => { + ControlContent::ClosedPort(closed_port) => { // If a closed port is Ack'd, then we remove the reference to // that component. - let port_handle = comp_ctx.get_port_handle(*port_id); + let port_handle = comp_ctx.get_port_handle(closed_port); let port_info = comp_ctx.get_port(port_handle); + let port_peer_comp_id = port_info.peer_comp_id; debug_assert_eq!(port_info.state, PortState::Closed); - comp_ctx.remove_peer(sched_ctx, port_handle, port_info.peer_comp_id); + comp_ctx.remove_peer(sched_ctx, port_handle, port_peer_comp_id); - return AckAction::None; + return (AckAction::None, None); } } } + pub(crate) fn has_acks_remaining(&self) -> bool { + return !self.entries.is_empty(); + } + // ------------------------------------------------------------------------- // Port transfer (due to component creation) // ------------------------------------------------------------------------- @@ -169,12 +186,8 @@ impl ControlLayer { }); // increment counter on schedule entry - for entry in &mut self.entries { - if entry.id == schedule_entry_id { - entry.ack_countdown += 1; - break; - } - } + let entry_index = self.get_entry_index_by_id(schedule_entry_id).unwrap(); + self.entries[entry_index].ack_countdown += 1; return Message::Control(ControlMessage{ id: entry_id, @@ -188,36 +201,33 @@ impl ControlLayer { // Blocking, unblocking, and closing ports // ------------------------------------------------------------------------- - pub(crate) fn initiate_port_closing(&mut self, port_handle: PortHandle, comp_ctx: &mut CompCtx) -> Option<(CompId, ControlMessage)> { - let port = comp_ctx.get_port_mut(port_handle); - let port_id = port.self_id; - let peer_port_id = port.peer_port_id; - let peer_comp_id = port.peer_comp_id; - debug_assert!(port.state == PortState::Open || port.state == PortState::Blocked); - port.state = PortState::Closed; - - if peer_comp_id == comp_ctx.id { - // We own the other end of the channel as well. - return None; - } + /// Initiates the control message procedures for closing a port. Caller must + /// make sure that the port state has already been set to `Closed`. + pub(crate) fn initiate_port_closing(&mut self, port_handle: LocalPortHandle, comp_ctx: &CompCtx) -> (LocalPeerHandle, ControlMessage) { + let port = comp_ctx.get_port(port_handle); + let peer_port_id = port.peer_port_id; + debug_assert!(port.state == PortState::Closed); + // Construct the port-closing entry let entry_id = self.take_id(); self.entries.push(ControlEntry{ id: entry_id, ack_countdown: 1, - content: ControlContent::ClosedPort(port_id), + content: ControlContent::ClosedPort(port.self_id), }); - return Some(( - peer_comp_id, + // Return the messages notifying peer of the closed port + let peer_handle = comp_ctx.get_peer_handle(port.peer_comp_id); + return ( + peer_handle, ControlMessage{ id: entry_id, sender_comp_id: comp_ctx.id, target_port_id: Some(peer_port_id), content: ControlMessageContent::ClosePort(peer_port_id), } - )); + ); } /// Adds a control entry to track that a port is blocked. Expects the caller