diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index 469b719504ab18c02e06dd3171aab65248f11cd5..c0effb7fd129d39a2d8cb625cf7c1c26ad0a5d05 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -233,12 +233,17 @@ impl CompPDL { let port_id = port_id_from_eval(port_id); let port_handle = comp_ctx.get_port_handle(port_id); let port_info = comp_ctx.get_port(port_handle); - if port_info.state == PortState::Blocked { - todo!("handle blocked port"); + if port_info.state.is_blocked() { + self.mode = Mode::BlockedPut; + self.mode_port = port_id; + self.mode_value = value; + self.exec_ctx.stmt = ExecStmt::PerformedPut; // prepare for when we become unblocked + return Ok(CompScheduling::Sleep); + } else { + self.send_data_message_and_wake_up(sched_ctx, comp_ctx, port_handle, value); + self.exec_ctx.stmt = ExecStmt::PerformedPut; + return Ok(CompScheduling::Immediate); } - self.send_data_message_and_wake_up(sched_ctx, comp_ctx, port_handle, value); - self.exec_ctx.stmt = ExecStmt::PerformedPut; - return Ok(CompScheduling::Immediate); }, // Results that can be returned outside of sync mode EC::ComponentTerminated => { @@ -379,7 +384,7 @@ impl CompPDL { // After direct insertion, check if this component's execution is // blocked on receiving a message on that port - debug_assert_ne!(comp_ctx.get_port(port_handle).state, PortState::Blocked); // because we could insert directly + debug_assert!(!comp_ctx.get_port(port_handle).state.is_blocked()); // because we could insert directly if self.mode == Mode::BlockedGet && self.mode_port == target_port_id { // We were indeed blocked self.mode = Mode::Sync; @@ -391,10 +396,10 @@ impl CompPDL { // The direct inbox is full, so the port will become (or was already) blocked let port_info = comp_ctx.get_port_mut(port_handle); - debug_assert!(port_info.state == PortState::Open || port_info.state == PortState::Blocked); + debug_assert!(port_info.state == PortState::Open || port_info.state.is_blocked()); if port_info.state == PortState::Open { - comp_ctx.set_port_state(port_handle, PortState::Blocked); + comp_ctx.set_port_state(port_handle, PortState::BlockedDueToFullBuffers); let (peer_handle, message) = self.control.initiate_port_blocking(comp_ctx, port_handle); @@ -421,7 +426,7 @@ impl CompPDL { if message.data_header.target_port == port_info.self_id { // One more message for this port let message = self.inbox_backup.remove(message_index); - debug_assert_eq!(comp_ctx.get_port(port_handle).state, PortState::Blocked); // since we had >1 message on the port + debug_assert!(comp_ctx.get_port(port_handle).state.is_blocked()); // since we had >1 message on the port self.inbox_main[port_index] = Some(message); return; @@ -430,7 +435,7 @@ impl CompPDL { // Did not have any more messages. So if we were blocked, then we need // to send the "unblock" message. - if port_info.state == PortState::Blocked { + if port_info.state == PortState::BlockedDueToFullBuffers { comp_ctx.set_port_state(port_handle, PortState::Open); let (peer_handle, message) = self.control.cancel_port_blocking(comp_ctx, port_handle); let peer_info = comp_ctx.get_peer(peer_handle); @@ -461,8 +466,9 @@ impl CompPDL { let port_handle = comp_ctx.get_port_handle(port_id); let port_info = comp_ctx.get_port(port_handle); debug_assert_eq!(port_info.kind, PortKind::Putter); - if port_info.state != PortState::Closed { - comp_ctx.set_port_state(port_handle, PortState::Blocked); + if port_info.state == PortState::Open { + // only when open: we don't do this when closed, and we we don't do this if we're blocked due to peer changes + comp_ctx.set_port_state(port_handle, PortState::BlockedDueToFullBuffers); } }, ControlMessageContent::ClosePort(port_id) => { @@ -488,8 +494,7 @@ impl CompPDL { let port_handle = comp_ctx.get_port_handle(port_id); let port_info = comp_ctx.get_port(port_handle); debug_assert_eq!(port_info.kind, PortKind::Putter); - debug_assert!(port_info.state == PortState::Blocked || port_info.state == PortState::Closed); - if port_info.state == PortState::Blocked { + if port_info.state == PortState::BlockedDueToFullBuffers { self.handle_unblock_port_instruction(sched_ctx, comp_ctx, port_handle); } }, @@ -500,7 +505,7 @@ impl CompPDL { // Ack. Then we wait for the `unblock` call. debug_assert_eq!(message.target_port_id, Some(port_id)); let port_handle = comp_ctx.get_port_handle(port_id); - comp_ctx.set_port_state(port_handle, PortState::Blocked); + comp_ctx.set_port_state(port_handle, PortState::BlockedDueToPeerChange); let port_info = comp_ctx.get_port(port_handle); let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); @@ -510,7 +515,7 @@ impl CompPDL { ControlMessageContent::PortPeerChangedUnblock(new_port_id, new_comp_id) => { let port_handle = comp_ctx.get_port_handle(message.target_port_id.unwrap()); let port_info = comp_ctx.get_port(port_handle); - debug_assert!(port_info.state == PortState::Blocked); + debug_assert!(port_info.state == PortState::BlockedDueToPeerChange); let old_peer_id = port_info.peer_comp_id; comp_ctx.remove_peer(sched_ctx, port_handle, old_peer_id, false); @@ -575,7 +580,7 @@ impl CompPDL { fn handle_unblock_port_instruction(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_handle: LocalPortHandle) { let port_info = comp_ctx.get_port_mut(port_handle); let port_id = port_info.self_id; - debug_assert_eq!(port_info.state, PortState::Blocked); + debug_assert!(port_info.state.is_blocked()); port_info.state = PortState::Open; if self.mode == Mode::BlockedPut && port_id == self.mode_port {