diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index 283de1a84b879d95adbb7446ff8bbee7bca11b4d..05119e155abc9827abad7eaa7cbe7f6222af6a50 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -298,7 +298,7 @@ impl CompPDL { } pub(crate) fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, mut message: Message) { - sched_ctx.log(&format!("handling message: {:?}", message)); + sched_ctx.log(&format!("handling message: {:#?}", message)); if let Some(new_target) = self.control.should_reroute(&mut message) { let mut target = sched_ctx.runtime.get_component_public(new_target); target.send_message(sched_ctx, message, false); // not waking up: we schedule once we've received all PortPeerChanged Acks @@ -558,7 +558,12 @@ impl CompPDL { AckAction::ScheduleComponent(to_schedule) => { // FIX @NoDirectHandle let mut handle = sched_ctx.runtime.get_component_public(to_schedule); - wake_up_if_sleeping(sched_ctx, to_schedule, &handle); + + // Note that the component is intentionally not + // sleeping, so we just wake it up + debug_assert!(!handle.sleeping.load(std::sync::atomic::Ordering::Acquire)); + let key = unsafe{ to_schedule.upgrade() }; + sched_ctx.runtime.enqueue_work(key); let _should_remove = handle.decrement_users(); debug_assert!(!_should_remove); break; @@ -780,18 +785,40 @@ impl CompPDL { let created_ctx = &component.ctx; // Now modify the creator's ports: remove every transferred port and - // potentially remove the peer component + // potentially remove the peer component. Here is also where we will + // transfer messages in the main inbox. for pair in port_id_pairs.iter() { + // Remove peer if appropriate let creator_port_index = creator_ctx.get_port_index(pair.creator).unwrap(); let creator_port_info = creator_ctx.ports.remove(creator_port_index); if creator_port_info.peer_comp_id != creator_ctx.id { creator_ctx.remove_peer(sched_ctx, creator_port_info.peer_comp_id); } - let created_port_info = created_ctx.get_port(pair.created); + // Transfer any messages + let created_port_index = created_ctx.get_port_index(pair.created).unwrap(); + let created_port_info = &created_ctx.ports[created_port_index]; + debug_assert!(component.code.inbox_main[created_port_index].is_none()); + if let Some(mut message) = self.inbox_main.remove(creator_port_index) { + message.data_header.target_port = pair.created; + component.code.inbox_main[created_port_index] = Some(message); + } + + let mut message_index = 0; + while message_index < self.inbox_backup.len() { + let message = &self.inbox_backup[message_index]; + if message.data_header.target_port == pair.creator { + // transfer message + let mut message = self.inbox_backup.remove(message_index); + message.data_header.target_port = pair.created; + component.code.inbox_backup.push(message); + } else { + message_index += 1; + } + } + + // Handle potential channel between creator and created component if created_port_info.peer_comp_id == creator_ctx.id { - // This is the cause where the creator obtains a reference - // to the created component let peer_port_info = creator_ctx.get_port_mut(created_port_info.peer_id); peer_port_info.peer_comp_id = created_ctx.id; creator_ctx.add_peer(sched_ctx, created_ctx.id, None); diff --git a/src/runtime2/component/consensus.rs b/src/runtime2/component/consensus.rs index d30f5874a04afb8df60b09bb39b3ad8f0c4f08e7..a0544a3d7f80b0db21a56e09d5c096e28ba89732 100644 --- a/src/runtime2/component/consensus.rs +++ b/src/runtime2/component/consensus.rs @@ -666,8 +666,13 @@ impl Consensus { fn send_to_leader(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, message: Message) { debug_assert_ne!(self.highest_id, comp_ctx.id); // we're not the leader - let leader_info = sched_ctx.runtime.get_component_public(self.highest_id); + let mut leader_info = sched_ctx.runtime.get_component_public(self.highest_id); leader_info.send_message(sched_ctx, message, true); + let should_remove = leader_info.decrement_users(); + if should_remove { + let key = unsafe{ self.highest_id.upgrade() }; + sched_ctx.runtime.destroy_component(key); + } } // ------------------------------------------------------------------------- diff --git a/src/runtime2/tests/mod.rs b/src/runtime2/tests/mod.rs index 00483c0c9a5217fe43870aa270e3e618ffad4fe0..0225f944d0c23c4aa5eaf7e55a26939ebd204bc9 100644 --- a/src/runtime2/tests/mod.rs +++ b/src/runtime2/tests/mod.rs @@ -39,7 +39,7 @@ fn test_component_communication() { } primitive receiver(in i) { print(\"receiver\"); - sync get(i); + sync auto a = get(i); } composite constructor() { channel o -> i;