diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index 1bcdcbc7ab628629a8cb5a46bbc1a57d553e59f3..c5b857b03b90ba1183351383536cd73db2d1df77 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -40,7 +40,6 @@ impl Scheduler { } let comp_key = comp_key.unwrap(); - let comp_id = comp_key.downgrade(); let component = self.runtime.get_component(comp_key); // Run the component until it no longer indicates that it needs to @@ -55,7 +54,7 @@ impl Scheduler { CompScheduling::Immediate => unreachable!(), CompScheduling::Requeue => { self.runtime.enqueue_work(comp_key); }, CompScheduling::Sleep => { self.mark_component_as_sleeping(comp_key, component); }, - CompScheduling::Exit => { self.mark_component_as_exiting(comp_key, component); } + CompScheduling::Exit => { self.mark_component_as_exiting(&scheduler_ctx, component); } } } } @@ -67,10 +66,33 @@ impl Scheduler { debug_assert_eq!(component.public.sleeping.load(Ordering::Acquire), false); // we're executing it, so it cannot be sleeping component.public.sleeping.store(true, Ordering::Release); - todo!("check for messages"); + if component.inbox.can_pop() { + let should_reschedule = component.public.sleeping + .compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed) + .is_ok(); + + if should_reschedule { + self.runtime.enqueue_work(key); + } + } } - fn mark_component_as_exiting(&self, key: CompKey, component: &mut RuntimeComp) { - todo!("do something") + fn mark_component_as_exiting(&self, sched_ctx: &SchedulerCtx, component: &mut RuntimeComp) { + for port_index in 0..component.ctx.ports.len() { + let port_info = &component.ctx.ports[port_index]; + if let Some((peer_id, message)) = component.code.control.mark_port_closed(port_info.id, comp_ctx) { + let peer_info = component.ctx.get_peer(peer_id); + peer_info.handle.inbox.push(Message::Control(message)); + + wake_up_if_sleeping(sched_ctx, peer_id, &peer_info.handle); + } + } + + let old_count = component.public.num_handles.fetch_sub(1, Ordering::AcqRel); + let new_count = old_count - 1; + if new_count == 0 { + let comp_key = unsafe{ component.ctx.id.upgrade() }; + sched_ctx.runtime.destroy_component(comp_key); + } } } \ No newline at end of file