diff --git a/src/runtime2/component/component_context.rs b/src/runtime2/component/component_context.rs index 4b6f03e7f95c03b7fb010afc0513bcd4f78aa3cb..2c3f6870dbcc6e49553b8daeedc118627150ab69 100644 --- a/src/runtime2/component/component_context.rs +++ b/src/runtime2/component/component_context.rs @@ -99,7 +99,7 @@ impl CompCtx { let port = self.get_port_mut(port_handle); debug_assert_eq!(port.peer_comp_id, peer_comp_id); debug_assert!(!port.associated_with_peer); - if !Self::requires_peer_reference(port, self_id) { + if !Self::requires_peer_reference(port, self_id, false) { return; } @@ -124,11 +124,11 @@ impl CompCtx { } /// Removes a peer associated with a port. - pub(crate) fn remove_peer(&mut self, sched_ctx: &SchedulerCtx, port_handle: LocalPortHandle, peer_id: CompId) { + pub(crate) fn remove_peer(&mut self, sched_ctx: &SchedulerCtx, port_handle: LocalPortHandle, peer_id: CompId, also_remove_if_closed: bool) { let self_id = self.id; let port = self.get_port_mut(port_handle); debug_assert_eq!(port.peer_comp_id, peer_id); - if !Self::requires_peer_reference(port, self_id) { + if !Self::requires_peer_reference(port, self_id, also_remove_if_closed) { return; } @@ -215,8 +215,8 @@ impl CompCtx { // ------------------------------------------------------------------------- #[inline] - fn requires_peer_reference(port: &Port, self_id: CompId) -> bool { - return port.state != PortState::Closed && port.peer_comp_id != self_id; + fn requires_peer_reference(port: &Port, self_id: CompId, required_if_closed: bool) -> bool { + return (port.state != PortState::Closed || required_if_closed) && port.peer_comp_id != self_id; } fn must_get_port_index(&self, handle: LocalPortHandle) -> usize { diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index ac1c7b6062fae6ccf56116ff3030f491d56068c5..469b719504ab18c02e06dd3171aab65248f11cd5 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -453,37 +453,7 @@ impl CompPDL { // Handle the content of the control message, and optionally Ack it match message.content { ControlMessageContent::Ack => { - let mut to_ack = message.id; - loop { - let (action, new_to_ack) = self.control.handle_ack(to_ack, sched_ctx, comp_ctx); - match action { - AckAction::SendMessage(target_comp, message) => { - // FIX @NoDirectHandle - let mut handle = sched_ctx.runtime.get_component_public(target_comp); - handle.send_message(sched_ctx, Message::Control(message), true); - let _should_remove = handle.decrement_users(); - debug_assert!(_should_remove.is_none()); - }, - AckAction::ScheduleComponent(to_schedule) => { - // FIX @NoDirectHandle - let mut handle = sched_ctx.runtime.get_component_public(to_schedule); - - // Note that the component is intentionally not - // sleeping, so we just wake it up - debug_assert!(!handle.sleeping.load(std::sync::atomic::Ordering::Acquire)); - let key = unsafe{ to_schedule.upgrade() }; - sched_ctx.runtime.enqueue_work(key); - let _should_remove = handle.decrement_users(); - debug_assert!(_should_remove.is_none()); - }, - AckAction::None => {} - } - - match new_to_ack { - Some(new_to_ack) => to_ack = new_to_ack, - None => break, - } - } + self.handle_ack(sched_ctx, comp_ctx, message.id); }, ControlMessageContent::BlockPort(port_id) => { // On of our messages was accepted, but the port should be @@ -502,9 +472,16 @@ impl CompPDL { let peer_comp_id = comp_ctx.get_port(port_handle).peer_comp_id; let peer_handle = comp_ctx.get_peer_handle(peer_comp_id); - send_control_ack_message(sched_ctx, comp_ctx, message.id, peer_handle); - comp_ctx.remove_peer(sched_ctx, port_handle, peer_comp_id); - comp_ctx.set_port_state(port_handle, PortState::Closed); + // One exception to sending an `Ack` is if we just closed the + // port ourselves, meaning that the `ClosePort` messages got + // sent to one another. + if let Some(control_id) = self.control.has_close_port_entry(port_handle, comp_ctx) { + self.handle_ack(sched_ctx, comp_ctx, control_id); + } else { + send_control_ack_message(sched_ctx, comp_ctx, message.id, peer_handle); + comp_ctx.remove_peer(sched_ctx, port_handle, peer_comp_id, false); // do not remove if closed + comp_ctx.set_port_state(port_handle, PortState::Closed); // now set to closed + } }, ControlMessageContent::UnblockPort(port_id) => { // We were previously blocked (or already closed) @@ -536,7 +513,7 @@ impl CompPDL { debug_assert!(port_info.state == PortState::Blocked); let old_peer_id = port_info.peer_comp_id; - comp_ctx.remove_peer(sched_ctx, port_handle, old_peer_id); + comp_ctx.remove_peer(sched_ctx, port_handle, old_peer_id, false); let port_info = comp_ctx.get_port_mut(port_handle); port_info.peer_comp_id = new_comp_id; @@ -551,9 +528,41 @@ impl CompPDL { let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message); debug_assert!(self.mode == Mode::Sync || self.mode == Mode::SyncEnd); self.handle_sync_decision(sched_ctx, comp_ctx, decision); - if self.mode == Mode::Exit { - // TODO: Bit hacky, move this around - self.mode = Mode::StartExit; + } + + /// Little helper that notifies the control layer of an `Ack`, and takes the + /// appropriate subsequent action + fn handle_ack(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, control_id: ControlId) { + let mut to_ack = control_id; + loop { + let (action, new_to_ack) = self.control.handle_ack(to_ack, sched_ctx, comp_ctx); + match action { + AckAction::SendMessage(target_comp, message) => { + // FIX @NoDirectHandle + let mut handle = sched_ctx.runtime.get_component_public(target_comp); + handle.send_message(sched_ctx, Message::Control(message), true); + let _should_remove = handle.decrement_users(); + debug_assert!(_should_remove.is_none()); + }, + AckAction::ScheduleComponent(to_schedule) => { + // FIX @NoDirectHandle + let mut handle = sched_ctx.runtime.get_component_public(to_schedule); + + // Note that the component is intentionally not + // sleeping, so we just wake it up + debug_assert!(!handle.sleeping.load(std::sync::atomic::Ordering::Acquire)); + let key = unsafe{ to_schedule.upgrade() }; + sched_ctx.runtime.enqueue_work(key); + let _should_remove = handle.decrement_users(); + debug_assert!(_should_remove.is_none()); + }, + AckAction::None => {} + } + + match new_to_ack { + Some(new_to_ack) => to_ack = new_to_ack, + None => break, + } } } @@ -695,7 +704,7 @@ impl CompPDL { let creator_port_info = creator_ctx.get_port(pair.creator_handle); let creator_port_index = creator_ctx.get_port_index(pair.creator_handle); let creator_peer_comp_id = creator_port_info.peer_comp_id; - creator_ctx.remove_peer(sched_ctx, pair.creator_handle, creator_peer_comp_id); + creator_ctx.remove_peer(sched_ctx, pair.creator_handle, creator_peer_comp_id, false); creator_ctx.remove_port(pair.creator_handle); // Transfer any messages diff --git a/src/runtime2/component/control_layer.rs b/src/runtime2/component/control_layer.rs index da7bae60bb8b257f65b3b9e05780aeda5fc1b267..c21c9e81c9cc1bc355fc8af2b0a3687a92d5cc0f 100644 --- a/src/runtime2/component/control_layer.rs +++ b/src/runtime2/component/control_layer.rs @@ -129,7 +129,7 @@ impl ControlLayer { let port_info = comp_ctx.get_port(port_handle); let port_peer_comp_id = port_info.peer_comp_id; debug_assert_eq!(port_info.state, PortState::Closed); - comp_ctx.remove_peer(sched_ctx, port_handle, port_peer_comp_id); + comp_ctx.remove_peer(sched_ctx, port_handle, port_peer_comp_id, true); // remove if closed return (AckAction::None, None); } @@ -201,6 +201,19 @@ impl ControlLayer { // Blocking, unblocking, and closing ports // ------------------------------------------------------------------------- + pub(crate) fn has_close_port_entry(&self, port_handle: LocalPortHandle, comp_ctx: &CompCtx) -> Option { + let port = comp_ctx.get_port(port_handle); + let port_id = port.self_id; + for entry in self.entries.iter() { + if let ControlContent::ClosedPort(entry_port_id) = &entry.content { + if *entry_port_id == port_id { + return Some(entry.id); + } + } + } + + return None; + } /// Initiates the control message procedures for closing a port. Caller must /// make sure that the port state has already been set to `Closed`.