diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index 2ced679fcbfca7f6f706bb51df86ac43baf9e775..8d43073d4c6d83c6dbcf97aeba74eea5a2494106 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -10,6 +10,7 @@ use crate::runtime2::runtime::*; use crate::runtime2::scheduler::SchedulerCtx; use crate::runtime2::communication::*; +use super::*; use super::control_layer::*; use super::consensus::Consensus; @@ -95,7 +96,7 @@ impl CompCtx { return None; } - fn get_peer(&self, peer_id: CompId) -> &Peer { + pub(crate) fn get_peer(&self, peer_id: CompId) -> &Peer { let index = self.get_peer_index(peer_id).unwrap(); return &self.peers[index]; } @@ -216,7 +217,7 @@ impl CompPDL { prompt: initial_state, control: ControlLayer::default(), consensus: Consensus::new(), - sync_counter: u32, + sync_counter: 0, exec_ctx: ExecCtx{ stmt: ExecStmt::None, }, @@ -410,7 +411,7 @@ impl CompPDL { ControlMessageContent::Ack => { let mut to_ack = message.id; loop { - let action = self.control.handle_ack(to_ack, comp_ctx); + let action = self.control.handle_ack(to_ack, sched_ctx, comp_ctx); match action { AckAction::SendMessageAndAck(target_comp, message, new_to_ack) => { // FIX @NoDirectHandle @@ -441,6 +442,27 @@ impl CompPDL { port_info.state = PortState::Blocked; } }, + ControlMessageContent::ClosePort(port_id) => { + // Request to close the port. We immediately comply and remove + // the component handle as well + let port_index = comp_ctx.get_port_index(port_id).unwrap(); + let port_info = &mut comp_ctx.ports[port_index]; + port_info.state = PortState::Closed; + + let peer_index = comp_ctx.get_peer_index(port_info.peer_comp_id).unwrap(); + let peer_info = &mut comp_ctx.peers[peer_index]; + peer_info.num_associated_ports -= 1; + if peer_info.num_associated_ports == 0 { + // TODO: @Refactor clean up all these uses of "num_associated_ports" + let should_remove = peer_info.handle.decrement_users(); + if should_remove { + let comp_key = unsafe{ peer_info.id.upgrade() }; + sched_ctx.runtime.destroy_component(comp_key); + } + + comp_ctx.peers.remove(peer_index); + } + } ControlMessageContent::UnblockPort(port_id) => { // We were previously blocked (or already closed) let port_info = comp_ctx.get_port(port_id); @@ -665,20 +687,4 @@ pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Ve for value in &value_group.values { find_port_in_value(value_group, value, ports); } -} - -/// If the component is sleeping, then that flag will be atomically set to -/// false. If we're the ones that made that happen then we add it to the work -/// queue. -fn wake_up_if_sleeping(sched_ctx: &SchedulerCtx, comp_id: CompId, handle: &CompHandle) { - use std::sync::atomic::Ordering; - - let should_wake_up = handle.sleeping - .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire) - .is_ok(); - - if should_wake_up { - let comp_key = unsafe{ comp_id.upgrade() }; - sched_ctx.runtime.enqueue_work(comp_key); - } } \ No newline at end of file