diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index ba86851b7b78e55725512edcefc15e23a9f91cd9..949372cab20039bb34bd20b88e5872af73ae32f8 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -3,7 +3,6 @@ use std::sync::atomic::Ordering; use super::component::*; use super::runtime::*; -use super::communication::*; /// Data associated with a scheduler thread pub(crate) struct Scheduler { @@ -74,6 +73,10 @@ impl Scheduler { // local utilities + /// Marks component as sleeping, if after marking itself as sleeping the + /// inbox contains messages then the component will be immediately + /// rescheduled. After calling this function the component should not be + /// executed anymore. fn mark_component_as_sleeping(&self, key: CompKey, component: &mut RuntimeComp) { debug_assert_eq!(key.downgrade(), component.ctx.id); // make sure component matches key debug_assert_eq!(component.public.sleeping.load(Ordering::Acquire), false); // we're executing it, so it cannot be sleeping @@ -90,30 +93,26 @@ impl Scheduler { } } + /// Marks the component as exiting by removing the reference it holds to + /// itself. Afterward the component will enter "normal" sleeping mode (if it + /// has not yet been destroyed) fn mark_component_as_exiting(&self, sched_ctx: &SchedulerCtx, component: &mut RuntimeComp) { - // Send messages that all ports will be closed - for port_index in 0..component.ctx.ports.len() { - let port_info = &component.ctx.ports[port_index]; - if let Some((peer_id, message)) = component.code.control.initiate_port_closing(port_info.self_id, &mut component.ctx) { - let peer_info = component.ctx.get_peer(peer_id); - peer_info.handle.send_message(sched_ctx, Message::Control(message), true); + // If we didn't yet decrement our reference count, do so now + let comp_key = unsafe{ component.ctx.id.upgrade() }; + + if !component.exiting { + component.exiting = true; + + let old_count = component.public.num_handles.fetch_sub(1, Ordering::AcqRel); + let new_count = old_count - 1; + println!(" ****** DEBUG [ sched]: Decremented count to {} for {:?}", new_count, component.ctx.id); + if new_count == 0 { + sched_ctx.runtime.destroy_component(comp_key); + return; } } - // Remove all references to the peers that we have - for mut peer in component.ctx.peers.drain(..) { - let should_remove = peer.handle.decrement_users(); - if should_remove { - let key = unsafe{ peer.id.upgrade() }; - sched_ctx.runtime.destroy_component(key); - } - } - - let old_count = component.public.num_handles.fetch_sub(1, Ordering::AcqRel); - let new_count = old_count - 1; - if new_count == 0 { - let comp_key = unsafe{ component.ctx.id.upgrade() }; - sched_ctx.runtime.destroy_component(comp_key); - } + // Enter "regular" sleeping mode + self.mark_component_as_sleeping(comp_key, component); } } \ No newline at end of file