diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index 8d43073d4c6d83c6dbcf97aeba74eea5a2494106..59f1112be07fc456b70f9df8d97d95748207f575 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -5,7 +5,6 @@ use crate::protocol::eval::{ EvalContinuation, EvalResult, EvalError }; -use crate::runtime2::store::QueueDynMpsc; use crate::runtime2::runtime::*; use crate::runtime2::scheduler::SchedulerCtx; use crate::runtime2::communication::*; @@ -209,7 +208,13 @@ pub(crate) struct CompPDL { } impl CompPDL { - pub(crate) fn new(initial_state: Prompt) -> Self { + pub(crate) fn new(initial_state: Prompt, num_ports: usize) -> Self { + let mut inbox_main = Vec::new(); + inbox_main.reserve(num_ports); + for _ in 0..num_ports { + inbox_main.push(None); + } + return Self{ mode: Mode::NonSync, mode_port: PortId::new_invalid(), @@ -221,15 +226,11 @@ impl CompPDL { exec_ctx: ExecCtx{ stmt: ExecStmt::None, }, - inbox_main: Vec::new(), + inbox_main, inbox_backup: Vec::new(), } } - pub(crate) fn handle_setup(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { - self.inbox.resize(comp_ctx.ports.len(), None); - } - pub(crate) fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) { if let Some(new_target) = self.control.should_reroute(&message) { let target = sched_ctx.runtime.get_component_public(new_target); @@ -243,7 +244,7 @@ impl CompPDL { self.handle_incoming_data_message(sched_ctx, comp_ctx, message); }, Message::Control(message) => { - self.handle_incoming_control_message(sched_ctx, comp_Ctx, message); + self.handle_incoming_control_message(sched_ctx, comp_ctx, message); }, } } @@ -265,7 +266,7 @@ impl CompPDL { EC::BlockGet(port_id) => { debug_assert_eq!(self.mode, Mode::Sync); - let port_id = transform_port_id(port_id); + let port_id = port_id_from_eval(port_id); if let Some(message) = comp_ctx.take_message(port_id) { // We can immediately receive and continue debug_assert!(self.exec_ctx.stmt.is_none()); @@ -310,7 +311,7 @@ impl CompPDL { &protocol.types, &protocol.heap, definition_id, monomorph_idx, arguments ); - self.create_component_and_transfer_ports(sched_ctx, comp_ctx, prompt, &workspace_ports); + self.create_component_and_transfer_ports(sched_ctx, comp_ctx, prompt, &ports); return Ok(CompScheduling::Requeue); }, EC::NewChannel => { @@ -379,7 +380,7 @@ impl CompPDL { // After direct insertion, check if this component's execution is // blocked on receiving a message on that port debug_assert_ne!(comp_ctx.ports[port_index].state, PortState::Blocked); // because we could insert directly - if self.mode == Mode::BlockedGet && self.mode_port == message.data_header.target_port { + if self.mode == Mode::BlockedGet && self.mode_port == target_port_id { // We were indeed blocked self.mode = Mode::Sync; self.mode_port = PortId::new_invalid(); @@ -391,11 +392,12 @@ impl CompPDL { // The direct inbox is full, so the port will become (or was already) blocked let port_info = &mut comp_ctx.ports[port_index]; debug_assert!(port_info.state == PortState::Open || port_info.state == PortState::Blocked); + let _peer_comp_id = port_info.peer_comp_id; if port_info.state == PortState::Open { let (target_comp_id, block_message) = self.control.mark_port_blocked(target_port_id, comp_ctx); - debug_assert_eq!(port_info.peer_comp_id, target_comp_id); + debug_assert_eq!(_peer_comp_id, target_comp_id); let peer = comp_ctx.get_peer(target_comp_id); peer.handle.inbox.push(Message::Control(block_message)); @@ -447,9 +449,10 @@ impl CompPDL { // the component handle as well let port_index = comp_ctx.get_port_index(port_id).unwrap(); let port_info = &mut comp_ctx.ports[port_index]; + let peer_comp_id = port_info.peer_comp_id; port_info.state = PortState::Closed; - let peer_index = comp_ctx.get_peer_index(port_info.peer_comp_id).unwrap(); + let peer_index = comp_ctx.get_peer_index(peer_comp_id).unwrap(); let peer_info = &mut comp_ctx.peers[peer_index]; peer_info.num_associated_ports -= 1; if peer_info.num_associated_ports == 0 { @@ -477,7 +480,7 @@ impl CompPDL { // temporarily block the port (while our original recipient is // potentially rerouting some of the in-flight messages) and // Ack. Then we wait for the `unblock` call. - debug_assert_eq!(message.target_port_id, port_id); + debug_assert_eq!(message.target_port_id, Some(port_id)); let port_info = comp_ctx.get_port_mut(port_id); debug_assert!(port_info.state == PortState::Open || port_info.state == PortState::Blocked); if port_info.state == PortState::Open { @@ -485,7 +488,7 @@ impl CompPDL { } }, ControlMessageContent::PortPeerChangedUnblock(port_id, new_comp_id) => { - debug_assert_eq!(message.target_port_id, port_id); + debug_assert_eq!(message.target_port_id, Some(port_id)); let port_info = comp_ctx.get_port_mut(port_id); debug_assert!(port_info.state == PortState::Blocked); port_info.peer_comp_id = new_comp_id; @@ -512,7 +515,7 @@ impl CompPDL { } fn create_component_and_transfer_ports(&mut self, sched_ctx: &SchedulerCtx, creator_ctx: &mut CompCtx, prompt: Prompt, ports: &[PortId]) { - let component = CompPDL::new(prompt); + let component = CompPDL::new(prompt, ports.len()); let (comp_key, component) = sched_ctx.runtime.create_pdl_component(component, true); let created_ctx = &mut component.ctx; @@ -525,7 +528,8 @@ impl CompPDL { debug_assert_ne!(port_info.state, PortState::Blocked); if port_info.peer_comp_id == creator_ctx.id { // We own the peer port. So retrieve it and modify the peer directly - let port_info = creator_ctx.get_port_mut(port_info.peer_id); + let peer_port_id = port_info.peer_id; + let port_info = creator_ctx.get_port_mut(peer_port_id); port_info.peer_comp_id = created_ctx.id; } else { // We don't own the port, so send the appropriate messages and @@ -540,7 +544,7 @@ impl CompPDL { } // Transfer port and create temporary reroute entry - let (mut port_info, peer_info) = Self::remove_port_from_component(creator_ctx, port_id); + let (port_info, peer_info) = Self::remove_port_from_component(creator_ctx, port_id); if port_info.state == PortState::Blocked { todo!("Think about this when you're not tired!"); } @@ -568,8 +572,6 @@ impl CompPDL { /// returned. If returned then the caller is responsible for decrementing /// the atomic counters of the peer component's handle. fn remove_port_from_component(comp_ctx: &mut CompCtx, port_id: PortId) -> (Port, Option) { - use std::sync::atomic::Ordering; - let port_index = comp_ctx.get_port_index(port_id).unwrap(); let port_info = comp_ctx.ports.remove(port_index); @@ -650,8 +652,6 @@ fn port_id_to_eval(port_id: PortId) -> EvalPortId { /// Duplicates will only be added once. pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec) { // Helper to check a value for a port and recurse if needed. - use crate::protocol::eval::Value; - fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec) { match value { Value::Input(port_id) | Value::Output(port_id) => {