diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index 0b03aa14729445e04e7c8faf891b13aae1d111e8..d5712961e4599ebea7ed0b73bbdf3aa03a2d2c81 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -412,7 +412,7 @@ pub(crate) fn default_attempt_get( match default_handle_received_data_message( target_port, target_port_instruction, &mut message, inbox_main, inbox_backup, - comp_ctx, sched_ctx, control, + comp_ctx, sched_ctx, control, consensus ) { Ok(()) => return GetResult::Received(message), Err(location_and_message) => return GetResult::Error(location_and_message) @@ -440,7 +440,8 @@ pub(crate) fn default_attempt_get( pub(crate) fn default_handle_received_data_message( targeted_port: PortId, _port_instruction: PortInstruction, message: &mut DataMessage, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup, - comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx, control: &mut ControlLayer + comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx, control: &mut ControlLayer, + consensus: &mut Consensus ) -> Result<(), (PortInstruction, String)> { let port_handle = comp_ctx.get_port_handle(targeted_port); let port_index = comp_ctx.get_port_index(port_handle); @@ -470,6 +471,9 @@ pub(crate) fn default_handle_received_data_message( comp_ctx.change_port_peer(sched_ctx, new_port_handle, Some(received_port.peer_comp)); let new_port = comp_ctx.get_port(new_port_handle); + // Add the port tho the consensus + consensus.notify_received_port(_new_inbox_index, new_port_handle, comp_ctx); + // Replace all references to the port in the received message for message_location in received_port.locations.iter().copied() { let value = message.content.get_value_mut(message_location); diff --git a/src/runtime2/component/consensus.rs b/src/runtime2/component/consensus.rs index 6fce74730a9d6cd923207f7239b804b1580cc005..dc11dd2c27b065ab112be629ac1ce3d31144a575 100644 --- a/src/runtime2/component/consensus.rs +++ b/src/runtime2/component/consensus.rs @@ -355,6 +355,20 @@ impl Consensus { self.solution.clear(); } + pub(crate) fn notify_received_port(&mut self, _expected_index: usize, port_handle: LocalPortHandle, comp_ctx: &CompCtx) { + debug_assert_eq!(_expected_index, self.ports.len()); + let port_info = comp_ctx.get_port(port_handle); + self.ports.push(PortAnnotation{ + self_comp_id: comp_ctx.id, + self_port_id: port_info.self_id, + peer_comp_id: port_info.peer_comp_id, + peer_port_id: port_info.peer_port_id, + peer_discovered: false, + mapping: None, + kind: port_info.kind, + }); + } + // ------------------------------------------------------------------------- // Handling inbound and outbound messages // ------------------------------------------------------------------------- diff --git a/src/runtime2/tests/transfer_ports.rs b/src/runtime2/tests/transfer_ports.rs index 133687f7f6bc76edc29ffa311376f4ef87327d78..e84a5b2bb5880230bc0048c5bee8ccd04270a2c5 100644 --- a/src/runtime2/tests/transfer_ports.rs +++ b/src/runtime2/tests/transfer_ports.rs @@ -62,16 +62,6 @@ fn test_transfer_synccreated_port() { ", "constructor", no_args()); } -#[test] -fn test_transfer_precreated_port_with_owned_peer_back_and_forth() { - -} - -#[test] -fn test_transfer_precreated_port_with_foreign_peer_back_and_forth() { - -} - #[test] fn test_transfer_precreated_port_with_owned_peer_and_communication() { compile_and_create_component(" @@ -123,4 +113,69 @@ fn test_transfer_precreated_port_with_foreign_peer_and_communication() { new message_transmitter(value_tx); } ", "constructor", no_args()); +} + +#[test] +fn test_transfer_precreated_port_with_owned_peer_back_and_forth() { + compile_and_create_component(" + primitive port_send_and_receive(out> tx, in> rx) { + channel a -> b; + sync { + put(tx, b); + b = get(rx); + } + } + + primitive port_receive_and_send(in> rx, out> tx) { + channel unused -> transferred; // same problem as in different tests + sync { + transferred = get(rx); + put(tx, transferred); + } + } + + composite constructor() { + channel port_tx_forward -> port_rx_forward; + channel port_tx_backward -> port_rx_backward; + + new port_send_and_receive(port_tx_forward, port_rx_backward); + new port_receive_and_send(port_rx_forward, port_tx_backward); + }", "constructor", no_args()); +} + +#[test] +fn test_transfer_precreated_port_with_foreign_peer_back_and_forth_and_communication() { + compile_and_create_component(" + primitive port_send_and_receive(out> tx, in> rx, in to_transfer) { + sync { + put(tx, to_transfer); + to_transfer = get(rx); + } + sync { + auto value = get(to_transfer); + while (value != 1337) {} + } + } + + primitive port_receive_and_send(in> rx, out> tx) { + channel unused -> transferred; + sync { + transferred = get(rx); + put(tx, transferred); + } + } + + primitive value_sender(out tx) { + sync put(tx, 1337); + } + + composite constructor() { + channel port_tx_forward -> port_rx_forward; + channel port_tx_backward -> port_rx_backward; + channel message_tx -> message_rx; + new port_send_and_receive(port_tx_forward, port_rx_backward, message_rx); + new port_receive_and_send(port_rx_forward, port_tx_backward); + new value_sender(message_tx); + } + ", "constructor", no_args()); } \ No newline at end of file