diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index 57d89d9ba93a959fd7f7c14ebe9ff3d5f0fa7f0c..299c64f158dd53f70f55f17456bcb4a1d7570d8c 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -481,7 +481,7 @@ pub(crate) fn default_handle_control_message( let last_instruction = port_info.last_instruction; let port_has_had_message = port_info.received_message_for_sync; default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx); - comp_ctx.remove_peer(sched_ctx, port_handle, peer_comp_id, false); // do not remove if closed + comp_ctx.change_port_peer(sched_ctx, port_handle, None); // Handle any possible error conditions (which boil down to: the // port has been used, but the peer has died). If not in sync @@ -500,7 +500,7 @@ pub(crate) fn default_handle_control_message( if closed_during_sync_round || closed_before_sync_round { return Err(( last_instruction, - format!("Peer component (id:{}) shut down, so previous communication cannot have succeeded", peer_comp_id.0) + format!("Peer component (id:{}) shut down, so communication cannot (have) succeed(ed)", peer_comp_id.0) )); } } else { @@ -543,14 +543,11 @@ pub(crate) fn default_handle_control_message( debug_assert!(port_info.state.is_set(PortStateFlag::BlockedDueToPeerChange)); let old_peer_id = port_info.peer_comp_id; - comp_ctx.remove_peer(sched_ctx, port_handle, old_peer_id, false); - let port_info = comp_ctx.get_port_mut(port_handle); - port_info.peer_comp_id = new_comp_id; port_info.peer_port_id = new_port_id; port_info.state.clear(PortStateFlag::BlockedDueToPeerChange); - comp_ctx.add_peer(port_handle, sched_ctx, new_comp_id, None); + comp_ctx.change_port_peer(sched_ctx, port_handle, Some(new_comp_id)); default_handle_recently_unblocked_port(exec_state, consensus, port_handle, sched_ctx, comp_ctx); } } diff --git a/src/runtime2/component/component_context.rs b/src/runtime2/component/component_context.rs index bb1d8c6254cdff67386d18035d0d3d23c0ac757f..b2c3435d5606f217b687597005aec6e3b4e3a262 100644 --- a/src/runtime2/component/component_context.rs +++ b/src/runtime2/component/component_context.rs @@ -197,58 +197,51 @@ impl CompCtx { return port; } - /// Adds a new peer. This must be called for every port, no matter the - /// component the channel is connected to. If a `CompHandle` is supplied, - /// then it will be used to add the peer. Otherwise it will be retrieved - /// from the runtime using its ID. - pub(crate) fn add_peer(&mut self, port_handle: LocalPortHandle, sched_ctx: &SchedulerCtx, peer_comp_id: CompId, handle: Option<&CompHandle>) { - let self_id = self.id; - let port = self.get_port_mut(port_handle); - debug_assert_eq!(port.peer_comp_id, peer_comp_id); - dbg_code!(assert!(!port.associated_with_peer)); - if !Self::requires_peer_reference(port, self_id, false) { - return; - } - - dbg_code!(port.associated_with_peer = true); - match self.get_peer_index_by_id(peer_comp_id) { - Some(peer_index) => { - let peer = &mut self.peers[peer_index]; - peer.num_associated_ports += 1; - }, - None => { - let handle = match handle { - Some(handle) => handle.clone(), - None => sched_ctx.runtime.get_component_public(peer_comp_id) - }; - self.peers.push(Peer{ - id: peer_comp_id, - num_associated_ports: 1, - handle, - }); + /// Changes a peer + pub(crate) fn change_port_peer(&mut self, sched_ctx: &SchedulerCtx, port_handle: LocalPortHandle, new_peer_comp_id: Option) { + // If port is currently associated with a peer, then remove that peer + let port_index = self.get_port_index(port_handle); + let port = &mut self.ports[port_index]; + let port_is_closed = port.state.is_closed(); + if port.associated_with_peer { + // Remove old peer association + port.associated_with_peer = false; + let peer_comp_id = port.peer_comp_id; + let peer_index = self.get_peer_index_by_id(peer_comp_id).unwrap(); + let peer = &mut self.peers[peer_index]; + + peer.num_associated_ports -= 1; + if peer.num_associated_ports == 0 { + let mut peer = self.peers.remove(peer_index); + if let Some(key) = peer.handle.decrement_users() { + sched_ctx.runtime.destroy_component(key); + } } } - } - - /// Removes a peer associated with a port. - pub(crate) fn remove_peer(&mut self, sched_ctx: &SchedulerCtx, port_handle: LocalPortHandle, peer_id: CompId, also_remove_if_closed: bool) { - let self_id = self.id; - let port = self.get_port_mut(port_handle); - debug_assert_eq!(port.peer_comp_id, peer_id); - if !Self::requires_peer_reference(port, self_id, also_remove_if_closed) { - return; - } - dbg_code!(assert!(port.associated_with_peer)); - dbg_code!(port.associated_with_peer = false); - let peer_index = self.get_peer_index_by_id(peer_id).unwrap(); - let peer = &mut self.peers[peer_index]; - peer.num_associated_ports -= 1; - if peer.num_associated_ports == 0 { - let mut peer = self.peers.remove(peer_index); - if let Some(key) = peer.handle.decrement_users() { - debug_assert_ne!(key.downgrade(), self.id); // should be upheld by the code that shuts down a component - sched_ctx.runtime.destroy_component(key); + // If there is a new peer, then set it as the peer associated with the + // port + if let Some(peer_id) = new_peer_comp_id { + let port = &mut self.ports[port_index]; + port.peer_comp_id = peer_id; + + if peer_id != self.id && !port_is_closed { + port.associated_with_peer = true; + + match self.get_peer_index_by_id(peer_id) { + Some(index) => { + let peer = &mut self.peers[index]; + peer.num_associated_ports += 1; + }, + None => { + let handle = sched_ctx.runtime.get_component_public(peer_id); + self.peers.push(Peer { + id: peer_id, + num_associated_ports: 1, + handle + }) + } + } } } } diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index afa053bf3d65d24b3d13b14d3c8fcbc5921973cf..ed94ded96a5f9d0277ae3ffe542378e49cf3b5f3 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -580,9 +580,8 @@ impl CompPDL { let reservation = sched_ctx.runtime.start_create_pdl_component(); let mut created_ctx = CompCtx::new(&reservation); - let other_proc = &sched_ctx.runtime.protocol.heap[definition_id]; - let self_proc = &sched_ctx.runtime.protocol.heap[self.prompt.frames[0].definition]; - + // let other_proc = &sched_ctx.runtime.protocol.heap[definition_id]; + // let self_proc = &sched_ctx.runtime.protocol.heap[self.prompt.frames[0].definition]; // dbg_code!({ // sched_ctx.log(&format!( // "DEBUG: Comp '{}' (ID {:?}) is creating comp '{}' (ID {:?})", @@ -643,23 +642,24 @@ impl CompPDL { let created_port_info = created_ctx.get_port_mut(pair.created_handle); if created_port_info.peer_comp_id == creator_ctx.id { - // Port peer is owned by the creator as well + // Peer of the transferred port is the component that is + // creating the new component. let created_peer_port_index = opened_port_id_pairs .iter() .position(|v| v.creator_id == creator_port_info.peer_port_id); match created_peer_port_index { Some(created_peer_port_index) => { - // Peer port moved to the new component as well. So - // adjust IDs appropriately. + // Addendum to the above comment: but that port is also + // moving to the new component let peer_pair = &opened_port_id_pairs[created_peer_port_index]; created_port_info.peer_port_id = peer_pair.created_id; created_port_info.peer_comp_id = reservation.id(); - todo!("either add 'self peer', or remove that idea from Ctx altogether") + todo!("either add 'self peer', or remove that idea from Ctx altogether");` }, None => { // Peer port remains with creator component. created_port_info.peer_comp_id = creator_ctx.id; - created_ctx.add_peer(pair.created_handle, sched_ctx, creator_ctx.id, None); + created_ctx.change_port_peer(sched_ctx, pair.created_handle, Some(creator_ctx.id)); } } } else { @@ -667,7 +667,7 @@ impl CompPDL { // appropriate messages later let peer_handle = creator_ctx.get_peer_handle(created_port_info.peer_comp_id); let peer_info = creator_ctx.get_peer(peer_handle); - created_ctx.add_peer(pair.created_handle, sched_ctx, peer_info.id, Some(&peer_info.handle)); + created_ctx.change_port_peer(sched_ctx, pair.created_handle, Some(peer_info.id)); created_component_has_remote_peers = true; } } @@ -687,10 +687,8 @@ impl CompPDL { // potentially remove the peer component. for pair in opened_port_id_pairs.iter() { // Remove peer if appropriate - let creator_port_info = creator_ctx.get_port(pair.creator_handle); let creator_port_index = creator_ctx.get_port_index(pair.creator_handle); - let creator_peer_comp_id = creator_port_info.peer_comp_id; - creator_ctx.remove_peer(sched_ctx, pair.creator_handle, creator_peer_comp_id, false); + creator_ctx.change_port_peer(sched_ctx, pair.creator_handle, None); creator_ctx.remove_port(pair.creator_handle); // Transfer any messages @@ -712,15 +710,17 @@ impl CompPDL { } } - // Handle potential channel between creator and created component let created_port_info = component.ctx.get_port(pair.created_handle); - if created_port_info.peer_comp_id == creator_ctx.id { + // This handles the creation of a channel between the creator + // component and the newly created component. So if the creator + // had a `a -> b` channel, and `b` is moved to the new + // component, then `a` needs to set its peer component. let peer_port_handle = creator_ctx.get_port_handle(created_port_info.peer_port_id); let peer_port_info = creator_ctx.get_port_mut(peer_port_handle); peer_port_info.peer_comp_id = component.ctx.id; peer_port_info.peer_port_id = created_port_info.self_id; - creator_ctx.add_peer(peer_port_handle, sched_ctx, component.ctx.id, None); + creator_ctx.change_port_peer(sched_ctx, peer_port_handle, Some(component.ctx.id)); } } diff --git a/src/runtime2/component/control_layer.rs b/src/runtime2/component/control_layer.rs index 94cd052e5014343afd81807c3f2efe4bc4af2717..b7ad2c40b907deac5b614da1e0cba99181993434 100644 --- a/src/runtime2/component/control_layer.rs +++ b/src/runtime2/component/control_layer.rs @@ -124,10 +124,8 @@ impl ControlLayer { // If a closed port is Ack'd, then we remove the reference to // that component. let port_handle = comp_ctx.get_port_handle(closed_port); - let port_info = comp_ctx.get_port(port_handle); - let port_peer_comp_id = port_info.peer_comp_id; - debug_assert!(port_info.state.is_closed()); - comp_ctx.remove_peer(sched_ctx, port_handle, port_peer_comp_id, true); // remove if closed + debug_assert!(comp_ctx.get_port(port_handle).state.is_closed()); + comp_ctx.change_port_peer(sched_ctx, port_handle, None); return (AckAction::None, None); } diff --git a/src/runtime2/tests/error_handling.rs b/src/runtime2/tests/error_handling.rs index 9ab1a7eba5fccf1f6498fdde2ea8fbee3abb1060..ed2632d8cda214f382c891410a3843ada855c689 100644 --- a/src/runtime2/tests/error_handling.rs +++ b/src/runtime2/tests/error_handling.rs @@ -21,9 +21,9 @@ fn test_connected_uncommunicating_component_error() { } composite constructor() { // Test one way - channel a -> b; - new sitting_idly_waiting(b); - new crashing_and_burning(a); + // channel a -> b; + // new sitting_idly_waiting(b); + // new crashing_and_burning(a); // And the other way around channel c -> d;