diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index c2b6c834d7720443944a418ba07dd0353de7499e..ba86851b7b78e55725512edcefc15e23a9f91cd9 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -91,14 +91,24 @@ impl Scheduler { } fn mark_component_as_exiting(&self, sched_ctx: &SchedulerCtx, component: &mut RuntimeComp) { + // Send messages that all ports will be closed for port_index in 0..component.ctx.ports.len() { let port_info = &component.ctx.ports[port_index]; - if let Some((peer_id, message)) = component.code.control.mark_port_closed(port_info.self_id, &mut component.ctx) { + if let Some((peer_id, message)) = component.code.control.initiate_port_closing(port_info.self_id, &mut component.ctx) { let peer_info = component.ctx.get_peer(peer_id); peer_info.handle.send_message(sched_ctx, Message::Control(message), true); } } + // Remove all references to the peers that we have + for mut peer in component.ctx.peers.drain(..) { + let should_remove = peer.handle.decrement_users(); + if should_remove { + let key = unsafe{ peer.id.upgrade() }; + sched_ctx.runtime.destroy_component(key); + } + } + let old_count = component.public.num_handles.fetch_sub(1, Ordering::AcqRel); let new_count = old_count - 1; if new_count == 0 {