diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index 3f3450e88a7a301c828c75b40e73fa7568a59583..4e735be3142b0246610035fb13e7d4a7befa2b01 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -799,11 +799,22 @@ impl CompPDL { created_handle: LocalPortHandle, created_id: PortId, } - let mut port_id_pairs = Vec::new(); + let mut opened_port_id_pairs = Vec::new(); + let mut closed_port_id_pairs = Vec::new(); + + // TODO: @Nocommit + let other_proc = &sched_ctx.runtime.protocol.heap[definition_id]; + let self_proc = &sched_ctx.runtime.protocol.heap[self.prompt.frames[0].definition]; let reservation = sched_ctx.runtime.start_create_pdl_component(); let mut created_ctx = CompCtx::new(&reservation); + println!( + "DEBUG: Comp '{}' is creating comp '{}' at ID {:?}", + self_proc.identifier.value.as_str(), other_proc.identifier.value.as_str(), + reservation.id() + ); + // Take all the ports ID that are in the `args` (and currently belong to // the creator component) and translate them into new IDs that are // associated with the component we're about to create @@ -820,12 +831,18 @@ impl CompPDL { let created_port = created_ctx.get_port(created_port_handle); let created_port_id = created_port.self_id; - port_id_pairs.push(PortPair{ + let port_id_pair = PortPair { creator_handle: creator_port_handle, creator_id: creator_port_id, created_handle: created_port_handle, created_id: created_port_id, - }); + }; + + if creator_port.state == PortState::Closed { + closed_port_id_pairs.push(port_id_pair) + } else { + opened_port_id_pairs.push(port_id_pair); + } // Modify value in arguments (bit dirty, but double vec in ValueGroup causes lifetime issues) let arg_value = if let Some(heap_pos) = port_reference.heap_pos { @@ -845,20 +862,20 @@ impl CompPDL { // the new component. let mut created_component_has_remote_peers = false; - for pair in port_id_pairs.iter() { + for pair in opened_port_id_pairs.iter() { let creator_port_info = creator_ctx.get_port(pair.creator_handle); let created_port_info = created_ctx.get_port_mut(pair.created_handle); if created_port_info.peer_comp_id == creator_ctx.id { // Port peer is owned by the creator as well - let created_peer_port_index = port_id_pairs + let created_peer_port_index = opened_port_id_pairs .iter() .position(|v| v.creator_id == creator_port_info.peer_port_id); match created_peer_port_index { Some(created_peer_port_index) => { // Peer port moved to the new component as well. So // adjust IDs appropriately. - let peer_pair = &port_id_pairs[created_peer_port_index]; + let peer_pair = &opened_port_id_pairs[created_peer_port_index]; created_port_info.peer_port_id = peer_pair.created_id; created_port_info.peer_comp_id = reservation.id(); todo!("either add 'self peer', or remove that idea from Ctx altogether") @@ -883,18 +900,15 @@ impl CompPDL { // actual component. Note that we initialize it as "not sleeping" as // its initial scheduling might be performed based on `Ack`s in response // to message exchanges between remote peers. - let prompt = Prompt::new( - &sched_ctx.runtime.protocol.types, &sched_ctx.runtime.protocol.heap, - definition_id, type_id, arguments, - ); - let component = CompPDL::new(prompt, port_id_pairs.len()); + let total_num_ports = opened_port_id_pairs.len() + closed_port_id_pairs.len(); + let component = create_component(&sched_ctx.runtime.protocol, definition_id, type_id, arguments, total_num_ports); let (created_key, component) = sched_ctx.runtime.finish_create_pdl_component( reservation, component, created_ctx, false, ); // Now modify the creator's ports: remove every transferred port and // potentially remove the peer component. - for pair in port_id_pairs.iter() { + for pair in opened_port_id_pairs.iter() { // Remove peer if appropriate let creator_port_info = creator_ctx.get_port(pair.creator_handle); let creator_port_index = creator_ctx.get_port_index(pair.creator_handle); @@ -933,13 +947,25 @@ impl CompPDL { } } + // Do the same for the closed ports + for pair in closed_port_id_pairs.iter() { + let port_index = creator_ctx.get_port_index(pair.creator_handle); + creator_ctx.remove_port(pair.creator_handle); + let _removed_message = self.inbox_main.remove(port_index); + + // In debug mode: since we've closed the port we shouldn't have any + // messages for that port. + debug_assert!(_removed_message.is_none()); + debug_assert!(!self.inbox_backup.iter().any(|v| v.data_header.target_port == pair.creator_id)); + } + // By now all ports and messages have been transferred. If there are any // peers that need to be notified about this new component, then we // initiate the protocol that will notify everyone here. if created_component_has_remote_peers { let created_ctx = &component.ctx; let schedule_entry_id = self.control.add_schedule_entry(created_ctx.id); - for pair in port_id_pairs.iter() { + for pair in opened_port_id_pairs.iter() { let port_info = created_ctx.get_port(pair.created_handle); if port_info.peer_comp_id != creator_ctx.id && port_info.peer_comp_id != created_ctx.id { let message = self.control.add_reroute_entry( @@ -959,16 +985,6 @@ impl CompPDL { } } -#[inline] -fn port_id_from_eval(port_id: EvalPortId) -> PortId { - return PortId(port_id.id); -} - -#[inline] -fn port_id_to_eval(port_id: PortId) -> EvalPortId { - return EvalPortId{ id: port_id.0 }; -} - /// Recursively goes through the value group, attempting to find ports. /// Duplicates will only be added once. pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec) {