diff --git a/src/runtime2/communication.rs b/src/runtime2/communication.rs index c4bee587535d12a0410fcc24ff6545f853431c3c..db992c070e8509a42d218b0a14cfeef565d0f096 100644 --- a/src/runtime2/communication.rs +++ b/src/runtime2/communication.rs @@ -75,6 +75,7 @@ pub enum ControlMessageContent { Ack, BlockPort(PortId), UnblockPort(PortId), + ClosePort(PortId), PortPeerChangedBlock(PortId), PortPeerChangedUnblock(PortId, CompId), } diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index 2ced679fcbfca7f6f706bb51df86ac43baf9e775..8d43073d4c6d83c6dbcf97aeba74eea5a2494106 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -10,6 +10,7 @@ use crate::runtime2::runtime::*; use crate::runtime2::scheduler::SchedulerCtx; use crate::runtime2::communication::*; +use super::*; use super::control_layer::*; use super::consensus::Consensus; @@ -95,7 +96,7 @@ impl CompCtx { return None; } - fn get_peer(&self, peer_id: CompId) -> &Peer { + pub(crate) fn get_peer(&self, peer_id: CompId) -> &Peer { let index = self.get_peer_index(peer_id).unwrap(); return &self.peers[index]; } @@ -216,7 +217,7 @@ impl CompPDL { prompt: initial_state, control: ControlLayer::default(), consensus: Consensus::new(), - sync_counter: u32, + sync_counter: 0, exec_ctx: ExecCtx{ stmt: ExecStmt::None, }, @@ -410,7 +411,7 @@ impl CompPDL { ControlMessageContent::Ack => { let mut to_ack = message.id; loop { - let action = self.control.handle_ack(to_ack, comp_ctx); + let action = self.control.handle_ack(to_ack, sched_ctx, comp_ctx); match action { AckAction::SendMessageAndAck(target_comp, message, new_to_ack) => { // FIX @NoDirectHandle @@ -441,6 +442,27 @@ impl CompPDL { port_info.state = PortState::Blocked; } }, + ControlMessageContent::ClosePort(port_id) => { + // Request to close the port. We immediately comply and remove + // the component handle as well + let port_index = comp_ctx.get_port_index(port_id).unwrap(); + let port_info = &mut comp_ctx.ports[port_index]; + port_info.state = PortState::Closed; + + let peer_index = comp_ctx.get_peer_index(port_info.peer_comp_id).unwrap(); + let peer_info = &mut comp_ctx.peers[peer_index]; + peer_info.num_associated_ports -= 1; + if peer_info.num_associated_ports == 0 { + // TODO: @Refactor clean up all these uses of "num_associated_ports" + let should_remove = peer_info.handle.decrement_users(); + if should_remove { + let comp_key = unsafe{ peer_info.id.upgrade() }; + sched_ctx.runtime.destroy_component(comp_key); + } + + comp_ctx.peers.remove(peer_index); + } + } ControlMessageContent::UnblockPort(port_id) => { // We were previously blocked (or already closed) let port_info = comp_ctx.get_port(port_id); @@ -665,20 +687,4 @@ pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Ve for value in &value_group.values { find_port_in_value(value_group, value, ports); } -} - -/// If the component is sleeping, then that flag will be atomically set to -/// false. If we're the ones that made that happen then we add it to the work -/// queue. -fn wake_up_if_sleeping(sched_ctx: &SchedulerCtx, comp_id: CompId, handle: &CompHandle) { - use std::sync::atomic::Ordering; - - let should_wake_up = handle.sleeping - .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire) - .is_ok(); - - if should_wake_up { - let comp_key = unsafe{ comp_id.upgrade() }; - sched_ctx.runtime.enqueue_work(comp_key); - } } \ No newline at end of file diff --git a/src/runtime2/component/control_layer.rs b/src/runtime2/component/control_layer.rs index 67b481c073e8c16a09533a7711d9864bab934326..1d6a5a4de92b16371cefffbe3da16b8fae923855 100644 --- a/src/runtime2/component/control_layer.rs +++ b/src/runtime2/component/control_layer.rs @@ -21,8 +21,9 @@ struct ControlEntry { enum ControlContent { PeerChange(ContentPeerChange), - ScheduleComponent(ContentScheduleComponent), - BlockedPort(ContentBlockedPort), + ScheduleComponent(CompId), + BlockedPort(PortId), + ClosedPort(PortId), } struct ContentPeerChange { @@ -33,14 +34,6 @@ struct ContentPeerChange { schedule_entry_id: ControlId, } -struct ContentScheduleComponent { - to_schedule: CompId, -} - -struct ContentBlockedPort { - blocked_port: PortId, -} - pub(crate) enum AckAction { None, SendMessageAndAck(CompId, ControlMessage, ControlId), @@ -76,7 +69,7 @@ impl ControlLayer { return None; } - pub(crate) fn handle_ack(&mut self, entry_id: ControlId, comp_ctx: &CompCtx) -> AckAction { + pub(crate) fn handle_ack(&mut self, entry_id: ControlId, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> AckAction { let entry_index = self.get_entry_index(entry_id).unwrap(); let entry = &mut self.entries[entry_index]; debug_assert!(entry.ack_countdown > 0); @@ -105,7 +98,7 @@ impl ControlLayer { let to_ack = content.schedule_entry_id; self.entries.remove(entry_index); - self.handle_ack(to_ack, comp_ctx); + self.handle_ack(to_ack, sched_ctx, comp_ctx); return AckAction::SendMessageAndAck(target_comp_id, message_to_send, to_ack); }, @@ -115,6 +108,28 @@ impl ControlLayer { return AckAction::ScheduleComponent(content.to_schedule); }, ControlContent::BlockedPort(_) => unreachable!(), + ControlContent::ClosedPort(port_id) => { + // If a closed port is Ack'd, then we remove the reference to + // that component. + let port_index = comp_ctx.get_port_index(*port_id).unwrap(); + debug_assert_eq!(comp_ctx.ports[port_index].state, PortState::Blocked); + let peer_id = comp_ctx.ports[port_index].peer_comp_id; + let peer_index = comp_ctx.get_peer_index(peer_id).unwrap(); + let peer_info = &mut comp_ctx.peers[peer_index]; + peer_info.num_associated_ports -= 1; + + if peer_info.num_associated_ports == 0 { + let should_remove = peer_info.handle.decrement_users(); + if should_remove { + let comp_key = unsafe{ peer_info.id.upgrade() }; + sched_ctx.runtime.destroy_component(comp_key); + } + + comp_ctx.peers.remove(peer_index); + } + + return AckAction::None; + } } } @@ -181,9 +196,38 @@ impl ControlLayer { } // ------------------------------------------------------------------------- - // Blocking and unblocking ports + // Blocking, unblocking, and closing ports // ------------------------------------------------------------------------- + pub(crate) fn mark_port_closed<'a>(&mut self, port_id: PortId, comp_ctx: &mut CompCtx) -> Option<(CompId, ControlMessage)> { + let port = comp_ctx.get_port_mut(port_id); + debug_assert!(port.state == PortState::Open || port.state == PortState::Blocked); + + port.state = PortState::Closed; + + if port.peer_comp_id == comp_ctx.id { + // We own the other end of the channel as well + return None; + } + + let entry_id = self.take_id(); + self.entries.push(ControlEntry{ + id: entry_id, + ack_countdown: 1, + content: ControlContent::ClosedPort(port_id), + }); + + return Some(( + port.peer_comp_id, + ControlMessage{ + id: entry_id, + sender_comp_id: comp_ctx.id, + target_port_id: Some(port.peer_id), + content: ControlMessageContent::ClosePort(port.peer_id), + } + )); + } + pub(crate) fn mark_port_blocked(&mut self, port_id: PortId, comp_ctx: &mut CompCtx) -> (CompId, ControlMessage) { // TODO: Feels like this shouldn't be an entry. Hence this class should // be renamed. Lets see where the code ends up being diff --git a/src/runtime2/component/mod.rs b/src/runtime2/component/mod.rs index 5c92219776421421c209667fb96bfad54503828a..a7f2b521f7d3c51c08b6a9feae8d61fd27e36520 100644 --- a/src/runtime2/component/mod.rs +++ b/src/runtime2/component/mod.rs @@ -3,4 +3,23 @@ mod control_layer; mod consensus; pub(crate) use component_pdl::{CompPDL, CompCtx, CompScheduling}; -pub(crate) use control_layer::{ControlId}; \ No newline at end of file +pub(crate) use control_layer::{ControlId}; + +use super::scheduler::*; +use super::runtime::*; + +/// If the component is sleeping, then that flag will be atomically set to +/// false. If we're the ones that made that happen then we add it to the work +/// queue. +pub(crate) fn wake_up_if_sleeping(sched_ctx: &SchedulerCtx, comp_id: CompId, handle: &CompHandle) { + use std::sync::atomic::Ordering; + + let should_wake_up = handle.sleeping + .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire) + .is_ok(); + + if should_wake_up { + let comp_key = unsafe{ comp_id.upgrade() }; + sched_ctx.runtime.enqueue_work(comp_key); + } +} \ No newline at end of file diff --git a/src/runtime2/runtime.rs b/src/runtime2/runtime.rs index 4255058472a74d8a9f8ce05a1720bc0169d71cf7..846a60389f8b022adcb7810efa45f05effcc3a83 100644 --- a/src/runtime2/runtime.rs +++ b/src/runtime2/runtime.rs @@ -16,7 +16,7 @@ use super::store::{ComponentStore, QueueDynMpsc, QueueDynProducer}; /// of these. Only with a key one may retrieve privately-accessible memory for /// a component. Practically just a generational index, like `CompId` is. #[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub(crate) struct CompKey(u32); +pub(crate) struct CompKey(pub u32); impl CompKey { pub(crate) fn downgrade(&self) -> CompId { @@ -26,7 +26,7 @@ impl CompKey { /// Generational ID of a component #[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub struct CompId(u32); +pub struct CompId(pub u32); impl CompId { pub(crate) fn new_invalid() -> CompId { @@ -51,6 +51,9 @@ pub(crate) struct RuntimeComp { } /// Should contain everything that is accessible in a thread-safe manner +// TODO: Do something about the `num_handles` thing. This needs to be a bit more +// "foolproof" to lighten the mental burden of using the `num_handles` +// variable. pub(crate) struct CompPublic { pub sleeping: AtomicBool, pub num_handles: AtomicU32, // manually modified (!) @@ -199,6 +202,7 @@ impl Runtime { } pub(crate) fn destroy_component(&self, key: CompKey) { + debug_assert_eq!(self.get_component(key).public.num_handles.load(Ordering::Acquire), 0); self.components.destroy(key.0); } diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index 1bcdcbc7ab628629a8cb5a46bbc1a57d553e59f3..c5b857b03b90ba1183351383536cd73db2d1df77 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -40,7 +40,6 @@ impl Scheduler { } let comp_key = comp_key.unwrap(); - let comp_id = comp_key.downgrade(); let component = self.runtime.get_component(comp_key); // Run the component until it no longer indicates that it needs to @@ -55,7 +54,7 @@ impl Scheduler { CompScheduling::Immediate => unreachable!(), CompScheduling::Requeue => { self.runtime.enqueue_work(comp_key); }, CompScheduling::Sleep => { self.mark_component_as_sleeping(comp_key, component); }, - CompScheduling::Exit => { self.mark_component_as_exiting(comp_key, component); } + CompScheduling::Exit => { self.mark_component_as_exiting(&scheduler_ctx, component); } } } } @@ -67,10 +66,33 @@ impl Scheduler { debug_assert_eq!(component.public.sleeping.load(Ordering::Acquire), false); // we're executing it, so it cannot be sleeping component.public.sleeping.store(true, Ordering::Release); - todo!("check for messages"); + if component.inbox.can_pop() { + let should_reschedule = component.public.sleeping + .compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed) + .is_ok(); + + if should_reschedule { + self.runtime.enqueue_work(key); + } + } } - fn mark_component_as_exiting(&self, key: CompKey, component: &mut RuntimeComp) { - todo!("do something") + fn mark_component_as_exiting(&self, sched_ctx: &SchedulerCtx, component: &mut RuntimeComp) { + for port_index in 0..component.ctx.ports.len() { + let port_info = &component.ctx.ports[port_index]; + if let Some((peer_id, message)) = component.code.control.mark_port_closed(port_info.id, comp_ctx) { + let peer_info = component.ctx.get_peer(peer_id); + peer_info.handle.inbox.push(Message::Control(message)); + + wake_up_if_sleeping(sched_ctx, peer_id, &peer_info.handle); + } + } + + let old_count = component.public.num_handles.fetch_sub(1, Ordering::AcqRel); + let new_count = old_count - 1; + if new_count == 0 { + let comp_key = unsafe{ component.ctx.id.upgrade() }; + sched_ctx.runtime.destroy_component(comp_key); + } } } \ No newline at end of file diff --git a/src/runtime2/store/queue_mpsc.rs b/src/runtime2/store/queue_mpsc.rs index 10e659838299bbee643fd5cadadcecbb2d8b1467..cbf75e8379809ec83a16aad961922690a74f3d29 100644 --- a/src/runtime2/store/queue_mpsc.rs +++ b/src/runtime2/store/queue_mpsc.rs @@ -78,6 +78,17 @@ impl QueueDynMpsc { return QueueDynProducer::new(self); } + /// Return `true` if a subsequent call to `pop` will return a value. Note + /// that if it returns `false`, there *might* also be a value returned by + /// `pop`. + pub fn can_pop(&mut self) -> bool { + let data_lock = self.inner.data.lock_shared(); + let cur_read = self.inner.read_head.load(Ordering::Acquire); + let cur_limit = self.inner.limit_head.load(Ordering::Acquire); + let buf_size = data_lock.data.cap() as u32; + return (cur_read + buf_size) & data_lock.compare_mask != cur_limit; + } + /// Perform an attempted read from the queue. It might be that some producer /// is putting something in the queue while this function is executing, and /// we don't get the consume it.