diff --git a/src/runtime2/component/control_layer.rs b/src/runtime2/component/control_layer.rs index 67b481c073e8c16a09533a7711d9864bab934326..1d6a5a4de92b16371cefffbe3da16b8fae923855 100644 --- a/src/runtime2/component/control_layer.rs +++ b/src/runtime2/component/control_layer.rs @@ -21,8 +21,9 @@ struct ControlEntry { enum ControlContent { PeerChange(ContentPeerChange), - ScheduleComponent(ContentScheduleComponent), - BlockedPort(ContentBlockedPort), + ScheduleComponent(CompId), + BlockedPort(PortId), + ClosedPort(PortId), } struct ContentPeerChange { @@ -33,14 +34,6 @@ struct ContentPeerChange { schedule_entry_id: ControlId, } -struct ContentScheduleComponent { - to_schedule: CompId, -} - -struct ContentBlockedPort { - blocked_port: PortId, -} - pub(crate) enum AckAction { None, SendMessageAndAck(CompId, ControlMessage, ControlId), @@ -76,7 +69,7 @@ impl ControlLayer { return None; } - pub(crate) fn handle_ack(&mut self, entry_id: ControlId, comp_ctx: &CompCtx) -> AckAction { + pub(crate) fn handle_ack(&mut self, entry_id: ControlId, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> AckAction { let entry_index = self.get_entry_index(entry_id).unwrap(); let entry = &mut self.entries[entry_index]; debug_assert!(entry.ack_countdown > 0); @@ -105,7 +98,7 @@ impl ControlLayer { let to_ack = content.schedule_entry_id; self.entries.remove(entry_index); - self.handle_ack(to_ack, comp_ctx); + self.handle_ack(to_ack, sched_ctx, comp_ctx); return AckAction::SendMessageAndAck(target_comp_id, message_to_send, to_ack); }, @@ -115,6 +108,28 @@ impl ControlLayer { return AckAction::ScheduleComponent(content.to_schedule); }, ControlContent::BlockedPort(_) => unreachable!(), + ControlContent::ClosedPort(port_id) => { + // If a closed port is Ack'd, then we remove the reference to + // that component. + let port_index = comp_ctx.get_port_index(*port_id).unwrap(); + debug_assert_eq!(comp_ctx.ports[port_index].state, PortState::Blocked); + let peer_id = comp_ctx.ports[port_index].peer_comp_id; + let peer_index = comp_ctx.get_peer_index(peer_id).unwrap(); + let peer_info = &mut comp_ctx.peers[peer_index]; + peer_info.num_associated_ports -= 1; + + if peer_info.num_associated_ports == 0 { + let should_remove = peer_info.handle.decrement_users(); + if should_remove { + let comp_key = unsafe{ peer_info.id.upgrade() }; + sched_ctx.runtime.destroy_component(comp_key); + } + + comp_ctx.peers.remove(peer_index); + } + + return AckAction::None; + } } } @@ -181,9 +196,38 @@ impl ControlLayer { } // ------------------------------------------------------------------------- - // Blocking and unblocking ports + // Blocking, unblocking, and closing ports // ------------------------------------------------------------------------- + pub(crate) fn mark_port_closed<'a>(&mut self, port_id: PortId, comp_ctx: &mut CompCtx) -> Option<(CompId, ControlMessage)> { + let port = comp_ctx.get_port_mut(port_id); + debug_assert!(port.state == PortState::Open || port.state == PortState::Blocked); + + port.state = PortState::Closed; + + if port.peer_comp_id == comp_ctx.id { + // We own the other end of the channel as well + return None; + } + + let entry_id = self.take_id(); + self.entries.push(ControlEntry{ + id: entry_id, + ack_countdown: 1, + content: ControlContent::ClosedPort(port_id), + }); + + return Some(( + port.peer_comp_id, + ControlMessage{ + id: entry_id, + sender_comp_id: comp_ctx.id, + target_port_id: Some(port.peer_id), + content: ControlMessageContent::ClosePort(port.peer_id), + } + )); + } + pub(crate) fn mark_port_blocked(&mut self, port_id: PortId, comp_ctx: &mut CompCtx) -> (CompId, ControlMessage) { // TODO: Feels like this shouldn't be an entry. Hence this class should // be renamed. Lets see where the code ends up being