diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index 94a11291b2605b9209da0974ff3a32044e0a32b0..e3bbc5105e1884f3a66e3cd2002165d27c8b10bc 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -867,6 +867,7 @@ fn prepare_send_message_with_ports( // Block the peer of the port let message = control.create_port_transfer_message(unblock_put_control_id, comp_ctx.id, peer_port_id); + println!("DEBUG: Port transfer message\nControl ID: {:?}\nMessage: {:?}", unblock_put_control_id, message); let peer_handle = comp_ctx.get_peer_handle(peer_comp_id); let peer_info = comp_ctx.get_peer(peer_handle); @@ -938,6 +939,7 @@ fn default_handle_ack( ) { // Since an `Ack` may cause another one, handle them in a loop let mut to_ack = control_id; + loop { let (action, new_to_ack) = control.handle_ack(to_ack, sched_ctx, comp_ctx); match action { @@ -962,6 +964,7 @@ fn default_handle_ack( }, AckAction::UnblockPutWithPorts => { // Send the message (containing ports) to the recipient + println!("DEBUG: Unblocking put with ports"); perform_send_message_with_ports(exec_state, sched_ctx, comp_ctx, consensus, inbox_main, inbox_backup); exec_state.mode = CompMode::Sync; exec_state.mode_port = PortId::new_invalid(); diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index b9ea01cc12e8b5278cdd85698c98e7ff25dae917..72e1a7dd27cf4167fa96d46a68a74763d6e6d111 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -241,15 +241,17 @@ impl Component for CompPDL { } fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, mut message: Message) { - // sched_ctx.log(&format!("handling message: {:?}", message)); + sched_ctx.debug(&format!("handling message: {:?}", message)); if let Some(new_target) = self.control.should_reroute(&mut message) { let mut target = sched_ctx.runtime.get_component_public(new_target); // TODO: @NoDirectHandle + sched_ctx.debug(&format!("rerouting to: {:?}", new_target)); target.send_message_logged(sched_ctx, message, false); // not waking up: we schedule once we've received all PortPeerChanged Acks let _should_remove = target.decrement_users(); debug_assert!(_should_remove.is_none()); return; } + sched_ctx.debug("handling message myself"); match message { Message::Data(message) => { self.handle_incoming_data_message(sched_ctx, comp_ctx, message); diff --git a/src/runtime2/component/control_layer.rs b/src/runtime2/component/control_layer.rs index ccce27eca977256b682599edd1db0dc6e7946604..9eabc0cd5f0534f877c7251c46b1296ac11bfbf4 100644 --- a/src/runtime2/component/control_layer.rs +++ b/src/runtime2/component/control_layer.rs @@ -5,7 +5,7 @@ use crate::runtime2::component::*; use super::component_context::*; #[derive(Debug, PartialEq, Eq, Clone, Copy)] -pub(crate) struct ControlId(u32); +pub(crate) struct ControlId(pub(crate) u32); impl ControlId { /// Like other invalid IDs, this one doesn't care any significance, but is @@ -162,7 +162,7 @@ impl ControlLayer { let entry_id = self.take_id(); self.entries.push(ControlEntry{ id: entry_id, - ack_countdown: 1, // incremented by calls to `add_reroute_entry` + ack_countdown: 0, // incremented by calls to `add_reroute_entry` content: ControlContent::UnblockPutWithPorts, }); diff --git a/src/runtime2/tests/transfer_ports.rs b/src/runtime2/tests/transfer_ports.rs index 5fa876e7a88677c0571bc8579df721893da1e865..e1234cbe3760456e80e4c73b32daa94df1d285c8 100644 --- a/src/runtime2/tests/transfer_ports.rs +++ b/src/runtime2/tests/transfer_ports.rs @@ -1,7 +1,7 @@ use super::*; #[test] -fn test_transfer_precreated_port_without_using() { +fn test_transfer_precreated_port_with_owned_peer() { compile_and_create_component(" primitive port_sender(out> tx) { channel a -> b;