diff --git a/src/runtime2/communication.rs b/src/runtime2/communication.rs index 2d0aee324b8730312c3a22e5c81f677fd31e7e5c..c615f06d5d5d08a99f8dbae16ff36c34639be553 100644 --- a/src/runtime2/communication.rs +++ b/src/runtime2/communication.rs @@ -26,10 +26,20 @@ pub enum PortKind { #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum PortState { Open, - Blocked, + BlockedDueToPeerChange, + BlockedDueToFullBuffers, Closed, } +impl PortState { + pub fn is_blocked(&self) -> bool { + match self { + PortState::BlockedDueToPeerChange | PortState::BlockedDueToFullBuffers => true, + PortState::Open | PortState::Closed => false, + } + } +} + pub struct Channel { pub putter_id: PortId, pub getter_id: PortId, diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index 469b719504ab18c02e06dd3171aab65248f11cd5..c0effb7fd129d39a2d8cb625cf7c1c26ad0a5d05 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -233,12 +233,17 @@ impl CompPDL { let port_id = port_id_from_eval(port_id); let port_handle = comp_ctx.get_port_handle(port_id); let port_info = comp_ctx.get_port(port_handle); - if port_info.state == PortState::Blocked { - todo!("handle blocked port"); + if port_info.state.is_blocked() { + self.mode = Mode::BlockedPut; + self.mode_port = port_id; + self.mode_value = value; + self.exec_ctx.stmt = ExecStmt::PerformedPut; // prepare for when we become unblocked + return Ok(CompScheduling::Sleep); + } else { + self.send_data_message_and_wake_up(sched_ctx, comp_ctx, port_handle, value); + self.exec_ctx.stmt = ExecStmt::PerformedPut; + return Ok(CompScheduling::Immediate); } - self.send_data_message_and_wake_up(sched_ctx, comp_ctx, port_handle, value); - self.exec_ctx.stmt = ExecStmt::PerformedPut; - return Ok(CompScheduling::Immediate); }, // Results that can be returned outside of sync mode EC::ComponentTerminated => { @@ -379,7 +384,7 @@ impl CompPDL { // After direct insertion, check if this component's execution is // blocked on receiving a message on that port - debug_assert_ne!(comp_ctx.get_port(port_handle).state, PortState::Blocked); // because we could insert directly + debug_assert!(!comp_ctx.get_port(port_handle).state.is_blocked()); // because we could insert directly if self.mode == Mode::BlockedGet && self.mode_port == target_port_id { // We were indeed blocked self.mode = Mode::Sync; @@ -391,10 +396,10 @@ impl CompPDL { // The direct inbox is full, so the port will become (or was already) blocked let port_info = comp_ctx.get_port_mut(port_handle); - debug_assert!(port_info.state == PortState::Open || port_info.state == PortState::Blocked); + debug_assert!(port_info.state == PortState::Open || port_info.state.is_blocked()); if port_info.state == PortState::Open { - comp_ctx.set_port_state(port_handle, PortState::Blocked); + comp_ctx.set_port_state(port_handle, PortState::BlockedDueToFullBuffers); let (peer_handle, message) = self.control.initiate_port_blocking(comp_ctx, port_handle); @@ -421,7 +426,7 @@ impl CompPDL { if message.data_header.target_port == port_info.self_id { // One more message for this port let message = self.inbox_backup.remove(message_index); - debug_assert_eq!(comp_ctx.get_port(port_handle).state, PortState::Blocked); // since we had >1 message on the port + debug_assert!(comp_ctx.get_port(port_handle).state.is_blocked()); // since we had >1 message on the port self.inbox_main[port_index] = Some(message); return; @@ -430,7 +435,7 @@ impl CompPDL { // Did not have any more messages. So if we were blocked, then we need // to send the "unblock" message. - if port_info.state == PortState::Blocked { + if port_info.state == PortState::BlockedDueToFullBuffers { comp_ctx.set_port_state(port_handle, PortState::Open); let (peer_handle, message) = self.control.cancel_port_blocking(comp_ctx, port_handle); let peer_info = comp_ctx.get_peer(peer_handle); @@ -461,8 +466,9 @@ impl CompPDL { let port_handle = comp_ctx.get_port_handle(port_id); let port_info = comp_ctx.get_port(port_handle); debug_assert_eq!(port_info.kind, PortKind::Putter); - if port_info.state != PortState::Closed { - comp_ctx.set_port_state(port_handle, PortState::Blocked); + if port_info.state == PortState::Open { + // only when open: we don't do this when closed, and we we don't do this if we're blocked due to peer changes + comp_ctx.set_port_state(port_handle, PortState::BlockedDueToFullBuffers); } }, ControlMessageContent::ClosePort(port_id) => { @@ -488,8 +494,7 @@ impl CompPDL { let port_handle = comp_ctx.get_port_handle(port_id); let port_info = comp_ctx.get_port(port_handle); debug_assert_eq!(port_info.kind, PortKind::Putter); - debug_assert!(port_info.state == PortState::Blocked || port_info.state == PortState::Closed); - if port_info.state == PortState::Blocked { + if port_info.state == PortState::BlockedDueToFullBuffers { self.handle_unblock_port_instruction(sched_ctx, comp_ctx, port_handle); } }, @@ -500,7 +505,7 @@ impl CompPDL { // Ack. Then we wait for the `unblock` call. debug_assert_eq!(message.target_port_id, Some(port_id)); let port_handle = comp_ctx.get_port_handle(port_id); - comp_ctx.set_port_state(port_handle, PortState::Blocked); + comp_ctx.set_port_state(port_handle, PortState::BlockedDueToPeerChange); let port_info = comp_ctx.get_port(port_handle); let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); @@ -510,7 +515,7 @@ impl CompPDL { ControlMessageContent::PortPeerChangedUnblock(new_port_id, new_comp_id) => { let port_handle = comp_ctx.get_port_handle(message.target_port_id.unwrap()); let port_info = comp_ctx.get_port(port_handle); - debug_assert!(port_info.state == PortState::Blocked); + debug_assert!(port_info.state == PortState::BlockedDueToPeerChange); let old_peer_id = port_info.peer_comp_id; comp_ctx.remove_peer(sched_ctx, port_handle, old_peer_id, false); @@ -575,7 +580,7 @@ impl CompPDL { fn handle_unblock_port_instruction(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_handle: LocalPortHandle) { let port_info = comp_ctx.get_port_mut(port_handle); let port_id = port_info.self_id; - debug_assert_eq!(port_info.state, PortState::Blocked); + debug_assert!(port_info.state.is_blocked()); port_info.state = PortState::Open; if self.mode == Mode::BlockedPut && port_id == self.mode_port { diff --git a/src/runtime2/component/control_layer.rs b/src/runtime2/component/control_layer.rs index c21c9e81c9cc1bc355fc8af2b0a3687a92d5cc0f..15f4cac2faa53c69514a2da1fc15064197ad6c27 100644 --- a/src/runtime2/component/control_layer.rs +++ b/src/runtime2/component/control_layer.rs @@ -24,7 +24,6 @@ struct ControlEntry { enum ControlContent { PeerChange(ContentPeerChange), ScheduleComponent(CompId), - BlockedPort(PortId), ClosedPort(PortId), } @@ -121,7 +120,6 @@ impl ControlLayer { // schedule the component! return (AckAction::ScheduleComponent(to_schedule), None); }, - ControlContent::BlockedPort(_) => unreachable!(), ControlContent::ClosedPort(closed_port) => { // If a closed port is Ack'd, then we remove the reference to // that component. @@ -243,29 +241,22 @@ impl ControlLayer { ); } - /// Adds a control entry to track that a port is blocked. Expects the caller - /// to have set the port's state to blocking already. The returned tuple - /// contains a message and the peer to send it to. + /// Generates the control message used to indicate to a peer that a port + /// should be blocked (expects the caller to have set the port's state to + /// blocked). pub(crate) fn initiate_port_blocking(&mut self, comp_ctx: &CompCtx, port_handle: LocalPortHandle) -> (LocalPeerHandle, ControlMessage) { let port_info = comp_ctx.get_port(port_handle); debug_assert_eq!(port_info.kind, PortKind::Getter); // because we're telling the putter to block - debug_assert_eq!(port_info.state, PortState::Blocked); // contract with caller + debug_assert_eq!(port_info.state, PortState::BlockedDueToFullBuffers); // contract with caller let peer_port_id = port_info.peer_port_id; let peer_comp_id = port_info.peer_comp_id; let peer_handle = comp_ctx.get_peer_handle(peer_comp_id); - let entry_id = self.take_id(); - self.entries.push(ControlEntry{ - id: entry_id, - ack_countdown: 0, - content: ControlContent::BlockedPort(port_info.self_id), - }); - return ( peer_handle, ControlMessage{ - id: entry_id, + id: ControlId::new_invalid(), sender_comp_id: comp_ctx.id, target_port_id: Some(port_info.peer_port_id), content: ControlMessageContent::BlockPort(peer_port_id), @@ -273,32 +264,19 @@ impl ControlLayer { ); } - /// Removes the control entry that tracks that a port is blocked. Expects - /// the caller to have already marked the port as unblocked. Again the - /// returned tuple contains a message and the target it is intended for + /// Generates a messages used to indicate to a peer that a port should be + /// unblocked again. pub(crate) fn cancel_port_blocking(&mut self, comp_ctx: &CompCtx, port_handle: LocalPortHandle) -> (LocalPeerHandle, ControlMessage) { let port_info = comp_ctx.get_port(port_handle); debug_assert_eq!(port_info.kind, PortKind::Getter); // because we're initiating the unblocking debug_assert_eq!(port_info.state, PortState::Open); // contract with caller, the locally stored entry ensures we were blocked before - let position = self.entries.iter() - .position(|v| { - if let ControlContent::BlockedPort(blocked_port_id) = &v.content { - if *blocked_port_id == port_info.self_id { - return true; - } - } - return false; - }) - .unwrap(); - - let entry = self.entries.remove(position); let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); return ( peer_handle, ControlMessage{ - id: entry.id, + id: ControlId::new_invalid(), sender_comp_id: comp_ctx.id, target_port_id: Some(port_info.peer_port_id), content: ControlMessageContent::UnblockPort(port_info.peer_port_id) diff --git a/src/runtime2/tests/mod.rs b/src/runtime2/tests/mod.rs index 0225f944d0c23c4aa5eaf7e55a26939ebd204bc9..d8e4a4011bb4df5916df9ed0105701eae40a8246 100644 --- a/src/runtime2/tests/mod.rs +++ b/src/runtime2/tests/mod.rs @@ -31,26 +31,50 @@ fn test_component_creation() { } #[test] -fn test_component_communication() { +fn test_component_communication_a() { let pd = ProtocolDescription::parse(b" primitive sender(out o) { - print(\"sender\"); sync put(o, 1); } primitive receiver(in i) { - print(\"receiver\"); sync auto a = get(i); } composite constructor() { channel o -> i; - print(\"creating sender\"); new sender(o); - print(\"creating receiver\"); new receiver(i); - print(\"done\"); } ").expect("compilation"); let rt = Runtime::new(1, pd); + create_component(&rt, "", "constructor", no_args()); +} + +#[test] +fn test_component_communication_b() { + let pd = ProtocolDescription::parse(b" + primitive sender(out o, u32 rounds) { + u32 index = 0; + sync while (index < rounds) { + put(o, index); + index += 1; + } + } + + primitive receiver(in i, u32 rounds) { + u32 index = 0; + sync while (index < rounds) { + auto val = get(i); + while (val != index) {} // infinite loop if incorrect value is received + index += 1; + } + } + + composite constructor() { + channel o -> i; + new sender(o, 5); + new receiver(i, 5); + }").expect("compilation"); + let rt = Runtime::new(1, pd); create_component(&rt, "", "constructor", no_args()); } \ No newline at end of file