diff --git a/src/runtime2/component/component_context.rs b/src/runtime2/component/component_context.rs index bb1d8c6254cdff67386d18035d0d3d23c0ac757f..b2c3435d5606f217b687597005aec6e3b4e3a262 100644 --- a/src/runtime2/component/component_context.rs +++ b/src/runtime2/component/component_context.rs @@ -197,58 +197,51 @@ impl CompCtx { return port; } - /// Adds a new peer. This must be called for every port, no matter the - /// component the channel is connected to. If a `CompHandle` is supplied, - /// then it will be used to add the peer. Otherwise it will be retrieved - /// from the runtime using its ID. - pub(crate) fn add_peer(&mut self, port_handle: LocalPortHandle, sched_ctx: &SchedulerCtx, peer_comp_id: CompId, handle: Option<&CompHandle>) { - let self_id = self.id; - let port = self.get_port_mut(port_handle); - debug_assert_eq!(port.peer_comp_id, peer_comp_id); - dbg_code!(assert!(!port.associated_with_peer)); - if !Self::requires_peer_reference(port, self_id, false) { - return; - } - - dbg_code!(port.associated_with_peer = true); - match self.get_peer_index_by_id(peer_comp_id) { - Some(peer_index) => { - let peer = &mut self.peers[peer_index]; - peer.num_associated_ports += 1; - }, - None => { - let handle = match handle { - Some(handle) => handle.clone(), - None => sched_ctx.runtime.get_component_public(peer_comp_id) - }; - self.peers.push(Peer{ - id: peer_comp_id, - num_associated_ports: 1, - handle, - }); + /// Changes a peer + pub(crate) fn change_port_peer(&mut self, sched_ctx: &SchedulerCtx, port_handle: LocalPortHandle, new_peer_comp_id: Option) { + // If port is currently associated with a peer, then remove that peer + let port_index = self.get_port_index(port_handle); + let port = &mut self.ports[port_index]; + let port_is_closed = port.state.is_closed(); + if port.associated_with_peer { + // Remove old peer association + port.associated_with_peer = false; + let peer_comp_id = port.peer_comp_id; + let peer_index = self.get_peer_index_by_id(peer_comp_id).unwrap(); + let peer = &mut self.peers[peer_index]; + + peer.num_associated_ports -= 1; + if peer.num_associated_ports == 0 { + let mut peer = self.peers.remove(peer_index); + if let Some(key) = peer.handle.decrement_users() { + sched_ctx.runtime.destroy_component(key); + } } } - } - - /// Removes a peer associated with a port. - pub(crate) fn remove_peer(&mut self, sched_ctx: &SchedulerCtx, port_handle: LocalPortHandle, peer_id: CompId, also_remove_if_closed: bool) { - let self_id = self.id; - let port = self.get_port_mut(port_handle); - debug_assert_eq!(port.peer_comp_id, peer_id); - if !Self::requires_peer_reference(port, self_id, also_remove_if_closed) { - return; - } - dbg_code!(assert!(port.associated_with_peer)); - dbg_code!(port.associated_with_peer = false); - let peer_index = self.get_peer_index_by_id(peer_id).unwrap(); - let peer = &mut self.peers[peer_index]; - peer.num_associated_ports -= 1; - if peer.num_associated_ports == 0 { - let mut peer = self.peers.remove(peer_index); - if let Some(key) = peer.handle.decrement_users() { - debug_assert_ne!(key.downgrade(), self.id); // should be upheld by the code that shuts down a component - sched_ctx.runtime.destroy_component(key); + // If there is a new peer, then set it as the peer associated with the + // port + if let Some(peer_id) = new_peer_comp_id { + let port = &mut self.ports[port_index]; + port.peer_comp_id = peer_id; + + if peer_id != self.id && !port_is_closed { + port.associated_with_peer = true; + + match self.get_peer_index_by_id(peer_id) { + Some(index) => { + let peer = &mut self.peers[index]; + peer.num_associated_ports += 1; + }, + None => { + let handle = sched_ctx.runtime.get_component_public(peer_id); + self.peers.push(Peer { + id: peer_id, + num_associated_ports: 1, + handle + }) + } + } } } }