diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index ac1c7b6062fae6ccf56116ff3030f491d56068c5..469b719504ab18c02e06dd3171aab65248f11cd5 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -453,37 +453,7 @@ impl CompPDL { // Handle the content of the control message, and optionally Ack it match message.content { ControlMessageContent::Ack => { - let mut to_ack = message.id; - loop { - let (action, new_to_ack) = self.control.handle_ack(to_ack, sched_ctx, comp_ctx); - match action { - AckAction::SendMessage(target_comp, message) => { - // FIX @NoDirectHandle - let mut handle = sched_ctx.runtime.get_component_public(target_comp); - handle.send_message(sched_ctx, Message::Control(message), true); - let _should_remove = handle.decrement_users(); - debug_assert!(_should_remove.is_none()); - }, - AckAction::ScheduleComponent(to_schedule) => { - // FIX @NoDirectHandle - let mut handle = sched_ctx.runtime.get_component_public(to_schedule); - - // Note that the component is intentionally not - // sleeping, so we just wake it up - debug_assert!(!handle.sleeping.load(std::sync::atomic::Ordering::Acquire)); - let key = unsafe{ to_schedule.upgrade() }; - sched_ctx.runtime.enqueue_work(key); - let _should_remove = handle.decrement_users(); - debug_assert!(_should_remove.is_none()); - }, - AckAction::None => {} - } - - match new_to_ack { - Some(new_to_ack) => to_ack = new_to_ack, - None => break, - } - } + self.handle_ack(sched_ctx, comp_ctx, message.id); }, ControlMessageContent::BlockPort(port_id) => { // On of our messages was accepted, but the port should be @@ -502,9 +472,16 @@ impl CompPDL { let peer_comp_id = comp_ctx.get_port(port_handle).peer_comp_id; let peer_handle = comp_ctx.get_peer_handle(peer_comp_id); - send_control_ack_message(sched_ctx, comp_ctx, message.id, peer_handle); - comp_ctx.remove_peer(sched_ctx, port_handle, peer_comp_id); - comp_ctx.set_port_state(port_handle, PortState::Closed); + // One exception to sending an `Ack` is if we just closed the + // port ourselves, meaning that the `ClosePort` messages got + // sent to one another. + if let Some(control_id) = self.control.has_close_port_entry(port_handle, comp_ctx) { + self.handle_ack(sched_ctx, comp_ctx, control_id); + } else { + send_control_ack_message(sched_ctx, comp_ctx, message.id, peer_handle); + comp_ctx.remove_peer(sched_ctx, port_handle, peer_comp_id, false); // do not remove if closed + comp_ctx.set_port_state(port_handle, PortState::Closed); // now set to closed + } }, ControlMessageContent::UnblockPort(port_id) => { // We were previously blocked (or already closed) @@ -536,7 +513,7 @@ impl CompPDL { debug_assert!(port_info.state == PortState::Blocked); let old_peer_id = port_info.peer_comp_id; - comp_ctx.remove_peer(sched_ctx, port_handle, old_peer_id); + comp_ctx.remove_peer(sched_ctx, port_handle, old_peer_id, false); let port_info = comp_ctx.get_port_mut(port_handle); port_info.peer_comp_id = new_comp_id; @@ -551,9 +528,41 @@ impl CompPDL { let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message); debug_assert!(self.mode == Mode::Sync || self.mode == Mode::SyncEnd); self.handle_sync_decision(sched_ctx, comp_ctx, decision); - if self.mode == Mode::Exit { - // TODO: Bit hacky, move this around - self.mode = Mode::StartExit; + } + + /// Little helper that notifies the control layer of an `Ack`, and takes the + /// appropriate subsequent action + fn handle_ack(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, control_id: ControlId) { + let mut to_ack = control_id; + loop { + let (action, new_to_ack) = self.control.handle_ack(to_ack, sched_ctx, comp_ctx); + match action { + AckAction::SendMessage(target_comp, message) => { + // FIX @NoDirectHandle + let mut handle = sched_ctx.runtime.get_component_public(target_comp); + handle.send_message(sched_ctx, Message::Control(message), true); + let _should_remove = handle.decrement_users(); + debug_assert!(_should_remove.is_none()); + }, + AckAction::ScheduleComponent(to_schedule) => { + // FIX @NoDirectHandle + let mut handle = sched_ctx.runtime.get_component_public(to_schedule); + + // Note that the component is intentionally not + // sleeping, so we just wake it up + debug_assert!(!handle.sleeping.load(std::sync::atomic::Ordering::Acquire)); + let key = unsafe{ to_schedule.upgrade() }; + sched_ctx.runtime.enqueue_work(key); + let _should_remove = handle.decrement_users(); + debug_assert!(_should_remove.is_none()); + }, + AckAction::None => {} + } + + match new_to_ack { + Some(new_to_ack) => to_ack = new_to_ack, + None => break, + } } } @@ -695,7 +704,7 @@ impl CompPDL { let creator_port_info = creator_ctx.get_port(pair.creator_handle); let creator_port_index = creator_ctx.get_port_index(pair.creator_handle); let creator_peer_comp_id = creator_port_info.peer_comp_id; - creator_ctx.remove_peer(sched_ctx, pair.creator_handle, creator_peer_comp_id); + creator_ctx.remove_peer(sched_ctx, pair.creator_handle, creator_peer_comp_id, false); creator_ctx.remove_port(pair.creator_handle); // Transfer any messages