diff --git a/src/runtime2/communication.rs b/src/runtime2/communication.rs index db992c070e8509a42d218b0a14cfeef565d0f096..b73cb11df5390e31ee5582c529b6f21a8494dba3 100644 --- a/src/runtime2/communication.rs +++ b/src/runtime2/communication.rs @@ -2,7 +2,7 @@ use crate::protocol::eval::*; use super::runtime::*; use super::component::*; -#[derive(Copy, Clone)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] pub struct PortId(pub u32); @@ -64,7 +64,7 @@ pub struct MessageDataHeader { } pub struct ControlMessage { - pub id: ControlId, + pub(crate) id: ControlId, pub sender_comp_id: CompId, pub target_port_id: Option, pub content: ControlMessageContent, diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index 8d43073d4c6d83c6dbcf97aeba74eea5a2494106..59f1112be07fc456b70f9df8d97d95748207f575 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -5,7 +5,6 @@ use crate::protocol::eval::{ EvalContinuation, EvalResult, EvalError }; -use crate::runtime2::store::QueueDynMpsc; use crate::runtime2::runtime::*; use crate::runtime2::scheduler::SchedulerCtx; use crate::runtime2::communication::*; @@ -209,7 +208,13 @@ pub(crate) struct CompPDL { } impl CompPDL { - pub(crate) fn new(initial_state: Prompt) -> Self { + pub(crate) fn new(initial_state: Prompt, num_ports: usize) -> Self { + let mut inbox_main = Vec::new(); + inbox_main.reserve(num_ports); + for _ in 0..num_ports { + inbox_main.push(None); + } + return Self{ mode: Mode::NonSync, mode_port: PortId::new_invalid(), @@ -221,15 +226,11 @@ impl CompPDL { exec_ctx: ExecCtx{ stmt: ExecStmt::None, }, - inbox_main: Vec::new(), + inbox_main, inbox_backup: Vec::new(), } } - pub(crate) fn handle_setup(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { - self.inbox.resize(comp_ctx.ports.len(), None); - } - pub(crate) fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) { if let Some(new_target) = self.control.should_reroute(&message) { let target = sched_ctx.runtime.get_component_public(new_target); @@ -243,7 +244,7 @@ impl CompPDL { self.handle_incoming_data_message(sched_ctx, comp_ctx, message); }, Message::Control(message) => { - self.handle_incoming_control_message(sched_ctx, comp_Ctx, message); + self.handle_incoming_control_message(sched_ctx, comp_ctx, message); }, } } @@ -265,7 +266,7 @@ impl CompPDL { EC::BlockGet(port_id) => { debug_assert_eq!(self.mode, Mode::Sync); - let port_id = transform_port_id(port_id); + let port_id = port_id_from_eval(port_id); if let Some(message) = comp_ctx.take_message(port_id) { // We can immediately receive and continue debug_assert!(self.exec_ctx.stmt.is_none()); @@ -310,7 +311,7 @@ impl CompPDL { &protocol.types, &protocol.heap, definition_id, monomorph_idx, arguments ); - self.create_component_and_transfer_ports(sched_ctx, comp_ctx, prompt, &workspace_ports); + self.create_component_and_transfer_ports(sched_ctx, comp_ctx, prompt, &ports); return Ok(CompScheduling::Requeue); }, EC::NewChannel => { @@ -379,7 +380,7 @@ impl CompPDL { // After direct insertion, check if this component's execution is // blocked on receiving a message on that port debug_assert_ne!(comp_ctx.ports[port_index].state, PortState::Blocked); // because we could insert directly - if self.mode == Mode::BlockedGet && self.mode_port == message.data_header.target_port { + if self.mode == Mode::BlockedGet && self.mode_port == target_port_id { // We were indeed blocked self.mode = Mode::Sync; self.mode_port = PortId::new_invalid(); @@ -391,11 +392,12 @@ impl CompPDL { // The direct inbox is full, so the port will become (or was already) blocked let port_info = &mut comp_ctx.ports[port_index]; debug_assert!(port_info.state == PortState::Open || port_info.state == PortState::Blocked); + let _peer_comp_id = port_info.peer_comp_id; if port_info.state == PortState::Open { let (target_comp_id, block_message) = self.control.mark_port_blocked(target_port_id, comp_ctx); - debug_assert_eq!(port_info.peer_comp_id, target_comp_id); + debug_assert_eq!(_peer_comp_id, target_comp_id); let peer = comp_ctx.get_peer(target_comp_id); peer.handle.inbox.push(Message::Control(block_message)); @@ -447,9 +449,10 @@ impl CompPDL { // the component handle as well let port_index = comp_ctx.get_port_index(port_id).unwrap(); let port_info = &mut comp_ctx.ports[port_index]; + let peer_comp_id = port_info.peer_comp_id; port_info.state = PortState::Closed; - let peer_index = comp_ctx.get_peer_index(port_info.peer_comp_id).unwrap(); + let peer_index = comp_ctx.get_peer_index(peer_comp_id).unwrap(); let peer_info = &mut comp_ctx.peers[peer_index]; peer_info.num_associated_ports -= 1; if peer_info.num_associated_ports == 0 { @@ -477,7 +480,7 @@ impl CompPDL { // temporarily block the port (while our original recipient is // potentially rerouting some of the in-flight messages) and // Ack. Then we wait for the `unblock` call. - debug_assert_eq!(message.target_port_id, port_id); + debug_assert_eq!(message.target_port_id, Some(port_id)); let port_info = comp_ctx.get_port_mut(port_id); debug_assert!(port_info.state == PortState::Open || port_info.state == PortState::Blocked); if port_info.state == PortState::Open { @@ -485,7 +488,7 @@ impl CompPDL { } }, ControlMessageContent::PortPeerChangedUnblock(port_id, new_comp_id) => { - debug_assert_eq!(message.target_port_id, port_id); + debug_assert_eq!(message.target_port_id, Some(port_id)); let port_info = comp_ctx.get_port_mut(port_id); debug_assert!(port_info.state == PortState::Blocked); port_info.peer_comp_id = new_comp_id; @@ -512,7 +515,7 @@ impl CompPDL { } fn create_component_and_transfer_ports(&mut self, sched_ctx: &SchedulerCtx, creator_ctx: &mut CompCtx, prompt: Prompt, ports: &[PortId]) { - let component = CompPDL::new(prompt); + let component = CompPDL::new(prompt, ports.len()); let (comp_key, component) = sched_ctx.runtime.create_pdl_component(component, true); let created_ctx = &mut component.ctx; @@ -525,7 +528,8 @@ impl CompPDL { debug_assert_ne!(port_info.state, PortState::Blocked); if port_info.peer_comp_id == creator_ctx.id { // We own the peer port. So retrieve it and modify the peer directly - let port_info = creator_ctx.get_port_mut(port_info.peer_id); + let peer_port_id = port_info.peer_id; + let port_info = creator_ctx.get_port_mut(peer_port_id); port_info.peer_comp_id = created_ctx.id; } else { // We don't own the port, so send the appropriate messages and @@ -540,7 +544,7 @@ impl CompPDL { } // Transfer port and create temporary reroute entry - let (mut port_info, peer_info) = Self::remove_port_from_component(creator_ctx, port_id); + let (port_info, peer_info) = Self::remove_port_from_component(creator_ctx, port_id); if port_info.state == PortState::Blocked { todo!("Think about this when you're not tired!"); } @@ -568,8 +572,6 @@ impl CompPDL { /// returned. If returned then the caller is responsible for decrementing /// the atomic counters of the peer component's handle. fn remove_port_from_component(comp_ctx: &mut CompCtx, port_id: PortId) -> (Port, Option) { - use std::sync::atomic::Ordering; - let port_index = comp_ctx.get_port_index(port_id).unwrap(); let port_info = comp_ctx.ports.remove(port_index); @@ -650,8 +652,6 @@ fn port_id_to_eval(port_id: PortId) -> EvalPortId { /// Duplicates will only be added once. pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec) { // Helper to check a value for a port and recurse if needed. - use crate::protocol::eval::Value; - fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec) { match value { Value::Input(port_id) | Value::Output(port_id) => { diff --git a/src/runtime2/component/consensus.rs b/src/runtime2/component/consensus.rs index 68ea40f957186ec832cff45e89cb9bbd8a25be29..d965aef2993d1dbe8fca8f539491ff7f8f72ceba 100644 --- a/src/runtime2/component/consensus.rs +++ b/src/runtime2/component/consensus.rs @@ -1,4 +1,4 @@ -use crate::protocol::eval::*; +use crate::protocol::eval::ValueGroup; use crate::runtime2::communication::*; use super::component_pdl::*; @@ -52,7 +52,7 @@ impl Consensus { pub(crate) fn transfer_ports(&mut self, comp_ctx: &CompCtx) { let mut needs_setting_ports = false; if comp_ctx.ports.len() != self.ports.len() { - ports_same = true; + needs_setting_ports = true; } else { for idx in 0..comp_ctx.ports.len() { let comp_port_id = comp_ctx.ports[idx].self_id; diff --git a/src/runtime2/component/control_layer.rs b/src/runtime2/component/control_layer.rs index 1d6a5a4de92b16371cefffbe3da16b8fae923855..2f1c4c9233fb8d9106189fe85002996382ee85e5 100644 --- a/src/runtime2/component/control_layer.rs +++ b/src/runtime2/component/control_layer.rs @@ -102,10 +102,10 @@ impl ControlLayer { return AckAction::SendMessageAndAck(target_comp_id, message_to_send, to_ack); }, - ControlContent::ScheduleComponent(content) => { + ControlContent::ScheduleComponent(to_schedule) => { // If all change-of-peers are `Ack`d, then we're ready to // schedule the component! - return AckAction::ScheduleComponent(content.to_schedule); + return AckAction::ScheduleComponent(*to_schedule); }, ControlContent::BlockedPort(_) => unreachable!(), ControlContent::ClosedPort(port_id) => { @@ -143,9 +143,7 @@ impl ControlLayer { self.entries.push(ControlEntry{ id: entry_id, ack_countdown: 0, // incremented by calls to `add_reroute_entry` - content: ControlContent::ScheduleComponent(ContentScheduleComponent{ - to_schedule: to_schedule_id - }), + content: ControlContent::ScheduleComponent(to_schedule_id), }); return entry_id; @@ -201,11 +199,13 @@ impl ControlLayer { pub(crate) fn mark_port_closed<'a>(&mut self, port_id: PortId, comp_ctx: &mut CompCtx) -> Option<(CompId, ControlMessage)> { let port = comp_ctx.get_port_mut(port_id); + let peer_port_id = port.peer_id; + let peer_comp_id = port.peer_comp_id; debug_assert!(port.state == PortState::Open || port.state == PortState::Blocked); port.state = PortState::Closed; - if port.peer_comp_id == comp_ctx.id { + if peer_comp_id == comp_ctx.id { // We own the other end of the channel as well return None; } @@ -218,12 +218,12 @@ impl ControlLayer { }); return Some(( - port.peer_comp_id, + peer_comp_id, ControlMessage{ id: entry_id, sender_comp_id: comp_ctx.id, - target_port_id: Some(port.peer_id), - content: ControlMessageContent::ClosePort(port.peer_id), + target_port_id: Some(peer_port_id), + content: ControlMessageContent::ClosePort(peer_port_id), } )); } @@ -233,24 +233,24 @@ impl ControlLayer { // be renamed. Lets see where the code ends up being let entry_id = self.take_id(); let port_info = comp_ctx.get_port_mut(port_id); + let peer_port_id = port_info.peer_id; + let peer_comp_id = port_info.peer_comp_id; debug_assert_eq!(port_info.state, PortState::Open); // prevent unforeseen issues port_info.state = PortState::Blocked; self.entries.push(ControlEntry{ id: entry_id, ack_countdown: 0, - content: ControlContent::BlockedPort(ContentBlockedPort{ - blocked_port: port_id, - }), + content: ControlContent::BlockedPort(port_id), }); return ( - port_info.peer_comp_id, + peer_comp_id, ControlMessage{ id: entry_id, sender_comp_id: comp_ctx.id, - target_port_id: Some(port_info.peer_id), - content: ControlMessageContent::BlockPort(port_info.peer_id), + target_port_id: Some(peer_port_id), + content: ControlMessageContent::BlockPort(peer_port_id), } ); } @@ -258,10 +258,10 @@ impl ControlLayer { pub(crate) fn mark_port_unblocked(&mut self, port_id: PortId, comp_ctx: &mut CompCtx) -> (CompId, ControlMessage) { // Find the entry that contains the blocking entry for the port let mut entry_index = usize::MAX; - let mut entry_id = ControlId::MAX; + let mut entry_id = ControlId::new_invalid(); for (index, entry) in self.entries.iter().enumerate() { - if let ControlContent::BlockedPort(block_entry) = &entry.content { - if block_entry.blocked_port == port_id { + if let ControlContent::BlockedPort(blocked_port) = &entry.content { + if *blocked_port == port_id { entry_index = index; entry_id = entry.id; break; @@ -270,16 +270,18 @@ impl ControlLayer { } let port_info = comp_ctx.get_port_mut(port_id); + let peer_port_id = port_info.peer_id; + let peer_comp_id = port_info.peer_comp_id; debug_assert_eq!(port_info.state, PortState::Blocked); port_info.state = PortState::Open; return ( - port_info.peer_comp_id, + peer_comp_id, ControlMessage{ id: entry_id, sender_comp_id: comp_ctx.id, - target_port_id: Some(port_info.peer_id), - content: ControlMessageContent::UnblockPort(port_info.peer_id), + target_port_id: Some(peer_port_id), + content: ControlMessageContent::UnblockPort(peer_port_id), } ) } diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index c5b857b03b90ba1183351383536cd73db2d1df77..2ed3f6eac8d91490b86f8752a7e9a99964a06d6e 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -46,7 +46,7 @@ impl Scheduler { // be re-executed immediately. let mut new_scheduling = CompScheduling::Immediate; while let CompScheduling::Immediate = new_scheduling { - new_scheduling = component.code.run(&mut scheduler_ctx, &mut component.private.ctx).expect("TODO: Handle error"); + new_scheduling = component.code.run(&mut scheduler_ctx, &mut component.ctx).expect("TODO: Handle error"); } // Handle the new scheduling @@ -62,7 +62,7 @@ impl Scheduler { // local utilities fn mark_component_as_sleeping(&self, key: CompKey, component: &mut RuntimeComp) { - debug_assert_eq!(key.downgrade(), component.private.ctx.id); // make sure component matches key + debug_assert_eq!(key.downgrade(), component.ctx.id); // make sure component matches key debug_assert_eq!(component.public.sleeping.load(Ordering::Acquire), false); // we're executing it, so it cannot be sleeping component.public.sleeping.store(true, Ordering::Release); @@ -80,7 +80,7 @@ impl Scheduler { fn mark_component_as_exiting(&self, sched_ctx: &SchedulerCtx, component: &mut RuntimeComp) { for port_index in 0..component.ctx.ports.len() { let port_info = &component.ctx.ports[port_index]; - if let Some((peer_id, message)) = component.code.control.mark_port_closed(port_info.id, comp_ctx) { + if let Some((peer_id, message)) = component.code.control.mark_port_closed(port_info.self_id, &mut component.ctx) { let peer_info = component.ctx.get_peer(peer_id); peer_info.handle.inbox.push(Message::Control(message)); diff --git a/src/runtime2/store/component.rs b/src/runtime2/store/component.rs index 8076e2cd65bf6b646ba448ee27f53f227f7e2ce9..d5dc283cdb2a376c8286b7eef849feb9f4c85c66 100644 --- a/src/runtime2/store/component.rs +++ b/src/runtime2/store/component.rs @@ -37,7 +37,7 @@ */ use std::mem::transmute; -use std::alloc::{alloc, dealloc, Layout}; +use std::alloc::{dealloc, Layout}; use std::ptr; use std::sync::atomic::{AtomicUsize, Ordering}; diff --git a/src/runtime2/store/unfair_se_lock.rs b/src/runtime2/store/unfair_se_lock.rs index a89a3f3dc5b475afdcd8c52e64a2a28b72e72410..badd2c84d103664742a41dc5a93b572028aaefc9 100644 --- a/src/runtime2/store/unfair_se_lock.rs +++ b/src/runtime2/store/unfair_se_lock.rs @@ -78,7 +78,7 @@ impl UnfairSeLock { // to wait until the reader count is at 0. shared = new_shared; if shared != EXCLUSIVE_BIT { - shared = self.wait_until_not_shared(shared); + self.wait_until_not_shared(shared); } return UnfairSeLockExclusiveGuard::new(self);