From 5415acc02756666d2ba5ba419aee46df87fba1c1 2022-01-28 18:40:04 From: mh Date: 2022-01-28 18:40:04 Subject: [PATCH] WIP: Refactored port/peer management, pending more bugfixes to component shutdown --- diff --git a/src/runtime2/component/component_context.rs b/src/runtime2/component/component_context.rs index 84c5594683bb234fe23c173307648b1b895590cc..4b6f03e7f95c03b7fb010afc0513bcd4f78aa3cb 100644 --- a/src/runtime2/component/component_context.rs +++ b/src/runtime2/component/component_context.rs @@ -95,10 +95,11 @@ impl CompCtx { /// then it will be used to add the peer. Otherwise it will be retrieved /// from the runtime using its ID. pub(crate) fn add_peer(&mut self, port_handle: LocalPortHandle, sched_ctx: &SchedulerCtx, peer_comp_id: CompId, handle: Option<&CompHandle>) { + let self_id = self.id; let port = self.get_port_mut(port_handle); debug_assert_eq!(port.peer_comp_id, peer_comp_id); debug_assert!(!port.associated_with_peer); - if !self.requires_peer_reference(port) { + if !Self::requires_peer_reference(port, self_id) { return; } @@ -124,9 +125,10 @@ impl CompCtx { /// Removes a peer associated with a port. pub(crate) fn remove_peer(&mut self, sched_ctx: &SchedulerCtx, port_handle: LocalPortHandle, peer_id: CompId) { + let self_id = self.id; let port = self.get_port_mut(port_handle); debug_assert_eq!(port.peer_comp_id, peer_id); - if !self.requires_peer_reference(port) { + if !Self::requires_peer_reference(port, self_id) { return; } @@ -135,6 +137,7 @@ impl CompCtx { let peer_index = self.get_peer_index_by_id(peer_id).unwrap(); let peer = &mut self.peers[peer_index]; peer.num_associated_ports -= 1; + println!(" ****** DEBUG: Removed peer {:?} from {:?}, now at {}", peer.id, self_id, peer.num_associated_ports); if peer.num_associated_ports == 0 { let mut peer = self.peers.remove(peer_index); if let Some(key) = peer.handle.decrement_users() { @@ -173,6 +176,10 @@ impl CompCtx { return &mut self.ports[index]; } + pub(crate) fn get_port_by_index_mut(&mut self, index: usize) -> &mut Port { + return &mut self.ports[index]; + } + pub(crate) fn get_peer(&self, peer_handle: LocalPeerHandle) -> &Peer { let index = self.must_get_peer_index(peer_handle); return &self.peers[index]; @@ -188,6 +195,11 @@ impl CompCtx { return self.ports.iter(); } + #[inline] + pub(crate) fn iter_ports_mut(&mut self) -> impl Iterator { + return self.ports.iter_mut(); + } + #[inline] pub(crate) fn iter_peers(&self) -> impl Iterator { return self.peers.iter(); @@ -203,8 +215,8 @@ impl CompCtx { // ------------------------------------------------------------------------- #[inline] - fn requires_peer_reference(&self, port: &Port) -> bool { - return port.state == PortState::Closed; + fn requires_peer_reference(port: &Port, self_id: CompId) -> bool { + return port.state != PortState::Closed && port.peer_comp_id != self_id; } fn must_get_port_index(&self, handle: LocalPortHandle) -> usize { diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index c2c4b13b725990310eb9c7d7454f5e5dbac4f7bb..ac1c7b6062fae6ccf56116ff3030f491d56068c5 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -6,11 +6,9 @@ use crate::protocol::eval::{ EvalContinuation, EvalResult, EvalError }; -use crate::runtime2::runtime::*; use crate::runtime2::scheduler::SchedulerCtx; use crate::runtime2::communication::*; -use super::*; use super::component_context::*; use super::control_layer::*; use super::consensus::Consensus; @@ -86,23 +84,12 @@ impl RunContext for ExecCtx { pub(crate) enum Mode { NonSync, // not in sync mode Sync, // in sync mode, can interact with other components - SyncFail, // something went wrong during sync mode (deadlocked, error, whatever) SyncEnd, // awaiting a solution, i.e. encountered the end of the sync block BlockedGet, BlockedPut, - StartExit, // temp state - Exit, -} - -impl Mode { - fn can_run(&self) -> bool { - match self { - Mode::NonSync | Mode::Sync => - return true, - Mode::SyncFail | Mode::SyncEnd | Mode::BlockedGet | Mode::BlockedPut | Mode::StartExit | Mode::Exit => - return false, - } - } + StartExit, // temporary state: if encountered then we start the shutdown process + BusyExit, // temporary state: waiting for Acks for all the closed ports + Exit, // exiting: shutdown process started, now waiting until the reference count drops to 0 } pub(crate) struct CompPDL { @@ -176,15 +163,30 @@ impl CompPDL { pub(crate) fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { use EvalContinuation as EC; - if self.mode == Mode::StartExit { - self.mode = Mode::Exit; - return Ok(CompScheduling::Exit); - } + sched_ctx.log(&format!("Running component (mode: {:?})", self.mode)); - let can_run = self.mode.can_run(); - sched_ctx.log(&format!("Running component (mode: {:?}, can run: {})", self.mode, can_run)); - if !can_run { - return Ok(CompScheduling::Sleep); + // Depending on the mode don't do anything at all, take some special + // actions, or fall through and run the PDL code. + match self.mode { + Mode::NonSync | Mode::Sync => {}, + Mode::SyncEnd | Mode::BlockedGet | Mode::BlockedPut => { + return Ok(CompScheduling::Sleep); + } + Mode::StartExit => { + self.handle_component_exit(sched_ctx, comp_ctx); + return Ok(CompScheduling::Immediate); + }, + Mode::BusyExit => { + if self.control.has_acks_remaining() { + return Ok(CompScheduling::Sleep); + } else { + self.mode = Mode::Exit; + return Ok(CompScheduling::Exit); + } + }, + Mode::Exit => { + return Ok(CompScheduling::Exit); + } } let run_result = self.execute_prompt(&sched_ctx)?; @@ -195,8 +197,8 @@ impl CompPDL { // Results that can be returned in sync mode EC::SyncBlockEnd => { debug_assert_eq!(self.mode, Mode::Sync); - let scheduling = self.handle_sync_end(sched_ctx, comp_ctx); - return Ok(scheduling.unwrap_or(CompScheduling::Immediate)); + self.handle_sync_end(sched_ctx, comp_ctx); + return Ok(CompScheduling::Immediate); }, EC::BlockGet(port_id) => { debug_assert_eq!(self.mode, Mode::Sync); @@ -216,7 +218,7 @@ impl CompPDL { self.exec_ctx.stmt = ExecStmt::PerformedGet(message.content); return Ok(CompScheduling::Immediate); } else { - self.mode = Mode::SyncFail; + todo!("handle sync failure due to message deadlock"); return Ok(CompScheduling::Sleep); } } else { @@ -240,8 +242,8 @@ impl CompPDL { }, // Results that can be returned outside of sync mode EC::ComponentTerminated => { - self.handle_component_exit(sched_ctx, comp_ctx); - return Ok(CompScheduling::Exit); + self.mode = Mode::StartExit; // next call we'll take care of the exit + return Ok(CompScheduling::Immediate); }, EC::SyncBlockStart => { debug_assert_eq!(self.mode, Mode::NonSync); @@ -290,45 +292,65 @@ impl CompPDL { self.mode = Mode::Sync; } - fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> Option { + /// Handles end of sync. The conclusion to the sync round might arise + /// immediately (and be handled immediately), or might come later through + /// messaging. In any case the component should be scheduled again + /// immediately + fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { sched_ctx.log("Component ending sync mode (now waiting for solution)"); let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx); self.mode = Mode::SyncEnd; - self.handle_sync_decision(sched_ctx, comp_ctx, decision) + self.handle_sync_decision(sched_ctx, comp_ctx, decision); } - fn handle_sync_decision(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, decision: SyncRoundDecision) -> Option { + /// Handles decision from the consensus round. This will cause a change in + /// the internal `Mode`, such that the next call to `run` can take the + /// appropriate next steps. + fn handle_sync_decision(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, decision: SyncRoundDecision) { debug_assert_eq!(self.mode, Mode::SyncEnd); sched_ctx.log(&format!("Handling sync decision: {:?}", decision)); let is_success = match decision { SyncRoundDecision::None => { // No decision yet - return None; + return; }, SyncRoundDecision::Solution => true, SyncRoundDecision::Failure => false, }; // If here then we've reached a decision - self.mode = Mode::NonSync; if is_success { + self.mode = Mode::NonSync; self.consensus.notify_sync_decision(decision); - return None; } else { - todo!("handle this better, show some kind of error"); - self.handle_component_exit(sched_ctx, comp_ctx); - self.mode = Mode::Exit; - return Some(CompScheduling::Exit); + self.mode = Mode::StartExit; } } fn handle_component_exit(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { sched_ctx.log("Component exiting"); - debug_assert_eq!(self.mode, Mode::NonSync); // not a perfect assert, but just to remind myself: cannot exit while in sync + debug_assert_eq!(self.mode, Mode::StartExit); + self.mode = Mode::BusyExit; + + // Doing this by index, then retrieving the handle is a bit rediculous, + // but Rust is being Rust with its borrowing rules. + for port_index in 0..comp_ctx.num_ports() { + let port = comp_ctx.get_port_by_index_mut(port_index); + if port.state == PortState::Closed { + // Already closed, or in the process of being closed + continue; + } + + // Mark as closed + let port_id = port.self_id; + port.state = PortState::Closed; - // Note: for now we have that the scheduler handles exiting. I don't - // know if that is a good idea, we'll see - self.mode = Mode::Exit; + // Notify peer of closing + let port_handle = comp_ctx.get_port_handle(port_id); + let (peer, message) = self.control.initiate_port_closing(port_handle, comp_ctx); + let peer_info = comp_ctx.get_peer(peer); + peer_info.handle.send_message(sched_ctx, Message::Control(message), true); + } } // ------------------------------------------------------------------------- @@ -433,15 +455,14 @@ impl CompPDL { ControlMessageContent::Ack => { let mut to_ack = message.id; loop { - let action = self.control.handle_ack(to_ack, sched_ctx, comp_ctx); + let (action, new_to_ack) = self.control.handle_ack(to_ack, sched_ctx, comp_ctx); match action { - AckAction::SendMessageAndAck(target_comp, message, new_to_ack) => { + AckAction::SendMessage(target_comp, message) => { // FIX @NoDirectHandle let mut handle = sched_ctx.runtime.get_component_public(target_comp); handle.send_message(sched_ctx, Message::Control(message), true); let _should_remove = handle.decrement_users(); debug_assert!(_should_remove.is_none()); - to_ack = new_to_ack; }, AckAction::ScheduleComponent(to_schedule) => { // FIX @NoDirectHandle @@ -454,11 +475,13 @@ impl CompPDL { sched_ctx.runtime.enqueue_work(key); let _should_remove = handle.decrement_users(); debug_assert!(_should_remove.is_none()); - break; }, - AckAction::None => { - break; - } + AckAction::None => {} + } + + match new_to_ack { + Some(new_to_ack) => to_ack = new_to_ack, + None => break, } } }, @@ -479,9 +502,9 @@ impl CompPDL { let peer_comp_id = comp_ctx.get_port(port_handle).peer_comp_id; let peer_handle = comp_ctx.get_peer_handle(peer_comp_id); - comp_ctx.set_port_state(port_handle, PortState::Closed); send_control_ack_message(sched_ctx, comp_ctx, message.id, peer_handle); comp_ctx.remove_peer(sched_ctx, port_handle, peer_comp_id); + comp_ctx.set_port_state(port_handle, PortState::Closed); }, ControlMessageContent::UnblockPort(port_id) => { // We were previously blocked (or already closed) @@ -636,6 +659,7 @@ impl CompPDL { }, None => { // Peer port remains with creator component. + println!("DEBUG: Setting peer for port {:?} of component {:?} to {:?}", created_port_info.self_id, reservation.id(), creator_ctx.id); created_port_info.peer_comp_id = creator_ctx.id; created_ctx.add_peer(pair.created_handle, sched_ctx, creator_ctx.id, None); } @@ -670,7 +694,8 @@ impl CompPDL { // Remove peer if appropriate let creator_port_info = creator_ctx.get_port(pair.creator_handle); let creator_port_index = creator_ctx.get_port_index(pair.creator_handle); - creator_ctx.remove_peer(sched_ctx, pair.creator_handle, creator_port_info.peer_comp_id); + let creator_peer_comp_id = creator_port_info.peer_comp_id; + creator_ctx.remove_peer(sched_ctx, pair.creator_handle, creator_peer_comp_id); creator_ctx.remove_port(pair.creator_handle); // Transfer any messages @@ -700,7 +725,7 @@ impl CompPDL { let peer_port_handle = creator_ctx.get_port_handle(created_port_info.peer_port_id); let peer_port_info = creator_ctx.get_port_mut(peer_port_handle); peer_port_info.peer_comp_id = created_ctx.id; - creator_ctx.add_peer(pair.created_handle, sched_ctx, created_ctx.id, None); + creator_ctx.add_peer(peer_port_handle, sched_ctx, created_ctx.id, None); } } diff --git a/src/runtime2/component/consensus.rs b/src/runtime2/component/consensus.rs index 2be94616b20a62067d567a01f3bf70b108bbc889..4683ff164f11a5d64815a14a18ef5d94395ad24e 100644 --- a/src/runtime2/component/consensus.rs +++ b/src/runtime2/component/consensus.rs @@ -3,7 +3,6 @@ use crate::runtime2::scheduler::*; use crate::runtime2::runtime::*; use crate::runtime2::communication::*; -use super::component_pdl::*; use super::component_context::*; pub struct PortAnnotation { @@ -556,7 +555,7 @@ impl Consensus { }); handle.send_message(sched_ctx, message, true); let _should_remove = handle.decrement_users(); - debug_assert!(!_should_remove); + debug_assert!(_should_remove.is_none()); } } @@ -565,8 +564,7 @@ impl Consensus { let mut leader_info = sched_ctx.runtime.get_component_public(self.highest_id); leader_info.send_message(sched_ctx, message, true); let should_remove = leader_info.decrement_users(); - if should_remove { - let key = unsafe{ self.highest_id.upgrade() }; + if let Some(key) = should_remove { sched_ctx.runtime.destroy_component(key); } } diff --git a/src/runtime2/component/control_layer.rs b/src/runtime2/component/control_layer.rs index c026ef88f9d3836dd87fc8795230c68bbc27d343..da7bae60bb8b257f65b3b9e05780aeda5fc1b267 100644 --- a/src/runtime2/component/control_layer.rs +++ b/src/runtime2/component/control_layer.rs @@ -37,9 +37,14 @@ struct ContentPeerChange { schedule_entry_id: ControlId, } +struct ControlClosedPort { + closed_port: PortId, + exit_entry_id: Option, +} + pub(crate) enum AckAction { None, - SendMessageAndAck(CompId, ControlMessage, ControlId), + SendMessage(CompId, ControlMessage), ScheduleComponent(CompId), } @@ -73,18 +78,23 @@ impl ControlLayer { return None; } - pub(crate) fn handle_ack(&mut self, entry_id: ControlId, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> AckAction { + /// Handles an acknowledgement. The returned action must be performed by the + /// caller. The optionally returned `ControlId` must be used and passed to + /// `handle_ack` again. + pub(crate) fn handle_ack(&mut self, entry_id: ControlId, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> (AckAction, Option) { let entry_index = self.get_entry_index_by_id(entry_id).unwrap(); let entry = &mut self.entries[entry_index]; debug_assert!(entry.ack_countdown > 0); entry.ack_countdown -= 1; if entry.ack_countdown != 0 { - return AckAction::None; + return (AckAction::None, None); } + let entry = self.entries.remove(entry_index); + // All `Ack`s received, take action based on the kind of entry - match &entry.content { + match entry.content { ControlContent::PeerChange(content) => { // If change of peer is ack'd. Then we are certain we have // rerouted all of the messages, and the sender's port can now @@ -100,29 +110,36 @@ impl ControlLayer { ) }; let to_ack = content.schedule_entry_id; - self.entries.remove(entry_index); - return AckAction::SendMessageAndAck(target_comp_id, message_to_send, to_ack); + return ( + AckAction::SendMessage(target_comp_id, message_to_send), + Some(to_ack) + ); }, ControlContent::ScheduleComponent(to_schedule) => { // If all change-of-peers are `Ack`d, then we're ready to // schedule the component! - return AckAction::ScheduleComponent(*to_schedule); + return (AckAction::ScheduleComponent(to_schedule), None); }, ControlContent::BlockedPort(_) => unreachable!(), - ControlContent::ClosedPort(port_id) => { + ControlContent::ClosedPort(closed_port) => { // If a closed port is Ack'd, then we remove the reference to // that component. - let port_handle = comp_ctx.get_port_handle(*port_id); + let port_handle = comp_ctx.get_port_handle(closed_port); let port_info = comp_ctx.get_port(port_handle); + let port_peer_comp_id = port_info.peer_comp_id; debug_assert_eq!(port_info.state, PortState::Closed); - comp_ctx.remove_peer(sched_ctx, port_handle, port_info.peer_comp_id); + comp_ctx.remove_peer(sched_ctx, port_handle, port_peer_comp_id); - return AckAction::None; + return (AckAction::None, None); } } } + pub(crate) fn has_acks_remaining(&self) -> bool { + return !self.entries.is_empty(); + } + // ------------------------------------------------------------------------- // Port transfer (due to component creation) // ------------------------------------------------------------------------- @@ -169,12 +186,8 @@ impl ControlLayer { }); // increment counter on schedule entry - for entry in &mut self.entries { - if entry.id == schedule_entry_id { - entry.ack_countdown += 1; - break; - } - } + let entry_index = self.get_entry_index_by_id(schedule_entry_id).unwrap(); + self.entries[entry_index].ack_countdown += 1; return Message::Control(ControlMessage{ id: entry_id, @@ -188,36 +201,33 @@ impl ControlLayer { // Blocking, unblocking, and closing ports // ------------------------------------------------------------------------- - pub(crate) fn initiate_port_closing(&mut self, port_handle: PortHandle, comp_ctx: &mut CompCtx) -> Option<(CompId, ControlMessage)> { - let port = comp_ctx.get_port_mut(port_handle); - let port_id = port.self_id; - let peer_port_id = port.peer_port_id; - let peer_comp_id = port.peer_comp_id; - debug_assert!(port.state == PortState::Open || port.state == PortState::Blocked); - port.state = PortState::Closed; - - if peer_comp_id == comp_ctx.id { - // We own the other end of the channel as well. - return None; - } + /// Initiates the control message procedures for closing a port. Caller must + /// make sure that the port state has already been set to `Closed`. + pub(crate) fn initiate_port_closing(&mut self, port_handle: LocalPortHandle, comp_ctx: &CompCtx) -> (LocalPeerHandle, ControlMessage) { + let port = comp_ctx.get_port(port_handle); + let peer_port_id = port.peer_port_id; + debug_assert!(port.state == PortState::Closed); + // Construct the port-closing entry let entry_id = self.take_id(); self.entries.push(ControlEntry{ id: entry_id, ack_countdown: 1, - content: ControlContent::ClosedPort(port_id), + content: ControlContent::ClosedPort(port.self_id), }); - return Some(( - peer_comp_id, + // Return the messages notifying peer of the closed port + let peer_handle = comp_ctx.get_peer_handle(port.peer_comp_id); + return ( + peer_handle, ControlMessage{ id: entry_id, sender_comp_id: comp_ctx.id, target_port_id: Some(peer_port_id), content: ControlMessageContent::ClosePort(peer_port_id), } - )); + ); } /// Adds a control entry to track that a port is blocked. Expects the caller diff --git a/src/runtime2/runtime.rs b/src/runtime2/runtime.rs index 8265a8c8b88e2c94d7279d3a96f3ec595a8e1b65..ffee9f5038c8faf3758730e71870b54862dccccf 100644 --- a/src/runtime2/runtime.rs +++ b/src/runtime2/runtime.rs @@ -59,7 +59,8 @@ pub(crate) struct RuntimeComp { pub public: CompPublic, pub code: CompPDL, pub ctx: CompCtx, - pub inbox: QueueDynMpsc + pub inbox: QueueDynMpsc, + pub exiting: bool, } /// Should contain everything that is accessible in a thread-safe manner @@ -109,9 +110,10 @@ impl CompHandle { /// Returns the `CompKey` to the component if it should be destroyed pub(crate) fn decrement_users(&mut self) -> Option { debug_assert!(!self.decremented, "illegal to 'decrement_users' twice"); - dbg_code!(self.decremented = true); let old_count = self.num_handles.fetch_sub(1, Ordering::AcqRel); let new_count = old_count - 1; + dbg_code!(self.decremented = true); + println!(" ****** DEBUG [handle]: Decremented count to {} for {:?}", new_count, self.id); if new_count == 0 { return Some(unsafe{ self.id.upgrade() }); } @@ -248,6 +250,7 @@ impl RuntimeInner { code: component, ctx: context, inbox: inbox_queue, + exiting: false, }; let index = self.components.submit(reserved.reservation, component); @@ -271,6 +274,7 @@ impl RuntimeInner { code: comp, ctx, inbox: inbox_queue, + exiting: false, }; let index = self.components.create(comp); @@ -294,7 +298,11 @@ impl RuntimeInner { } pub(crate) fn destroy_component(&self, key: CompKey) { - debug_assert_eq!(self.get_component(key).public.num_handles.load(Ordering::Acquire), 0); + dbg_code!({ + let component = self.get_component(key); + debug_assert!(component.exiting); + debug_assert_eq!(component.public.num_handles.load(Ordering::Acquire), 0); + }); self.decrement_active_components(); self.components.destroy(key.0); } @@ -314,7 +322,7 @@ impl RuntimeInner { if new_val == 0 { // Just to be sure, in case the last thing that gets destroyed is an // API instead of a thread. - let lock = self.work_queue.lock(); + let _lock = self.work_queue.lock(); self.work_condvar.notify_all(); } } diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index ba86851b7b78e55725512edcefc15e23a9f91cd9..949372cab20039bb34bd20b88e5872af73ae32f8 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -3,7 +3,6 @@ use std::sync::atomic::Ordering; use super::component::*; use super::runtime::*; -use super::communication::*; /// Data associated with a scheduler thread pub(crate) struct Scheduler { @@ -74,6 +73,10 @@ impl Scheduler { // local utilities + /// Marks component as sleeping, if after marking itself as sleeping the + /// inbox contains messages then the component will be immediately + /// rescheduled. After calling this function the component should not be + /// executed anymore. fn mark_component_as_sleeping(&self, key: CompKey, component: &mut RuntimeComp) { debug_assert_eq!(key.downgrade(), component.ctx.id); // make sure component matches key debug_assert_eq!(component.public.sleeping.load(Ordering::Acquire), false); // we're executing it, so it cannot be sleeping @@ -90,30 +93,26 @@ impl Scheduler { } } + /// Marks the component as exiting by removing the reference it holds to + /// itself. Afterward the component will enter "normal" sleeping mode (if it + /// has not yet been destroyed) fn mark_component_as_exiting(&self, sched_ctx: &SchedulerCtx, component: &mut RuntimeComp) { - // Send messages that all ports will be closed - for port_index in 0..component.ctx.ports.len() { - let port_info = &component.ctx.ports[port_index]; - if let Some((peer_id, message)) = component.code.control.initiate_port_closing(port_info.self_id, &mut component.ctx) { - let peer_info = component.ctx.get_peer(peer_id); - peer_info.handle.send_message(sched_ctx, Message::Control(message), true); + // If we didn't yet decrement our reference count, do so now + let comp_key = unsafe{ component.ctx.id.upgrade() }; + + if !component.exiting { + component.exiting = true; + + let old_count = component.public.num_handles.fetch_sub(1, Ordering::AcqRel); + let new_count = old_count - 1; + println!(" ****** DEBUG [ sched]: Decremented count to {} for {:?}", new_count, component.ctx.id); + if new_count == 0 { + sched_ctx.runtime.destroy_component(comp_key); + return; } } - // Remove all references to the peers that we have - for mut peer in component.ctx.peers.drain(..) { - let should_remove = peer.handle.decrement_users(); - if should_remove { - let key = unsafe{ peer.id.upgrade() }; - sched_ctx.runtime.destroy_component(key); - } - } - - let old_count = component.public.num_handles.fetch_sub(1, Ordering::AcqRel); - let new_count = old_count - 1; - if new_count == 0 { - let comp_key = unsafe{ component.ctx.id.upgrade() }; - sched_ctx.runtime.destroy_component(comp_key); - } + // Enter "regular" sleeping mode + self.mark_component_as_sleeping(comp_key, component); } } \ No newline at end of file diff --git a/src/runtime2/store/component.rs b/src/runtime2/store/component.rs index cd35878591ace87d2922fd5483a73d85c508fdd8..0370e5956540d36a4784546678e35ab0dca7408f 100644 --- a/src/runtime2/store/component.rs +++ b/src/runtime2/store/component.rs @@ -129,7 +129,7 @@ impl ComponentStore { pub fn reserve(&self) -> ComponentReservation { let lock = self.inner.lock_shared(); - let (lock, index) = self.pop_freelist_index(lock); + let (_lock, index) = self.pop_freelist_index(lock); return ComponentReservation::new(index); }