From cdb4810532c29027ce899e2119b08572df51cd09 2022-05-11 15:19:14 From: mh Date: 2022-05-11 15:19:14 Subject: [PATCH] Fix rerouting bug for transmitted ports --- diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index 782399c46374828d05bb0b6074b2511e7de8a519..53c362b7058caf4c05f741de40eb19604767d410 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -577,7 +577,7 @@ pub(crate) fn default_handle_control_message( if exec_state.mode.is_in_sync_block() { let closed_during_sync_round = content.closed_in_sync_round && port_was_used; - let closed_before_sync_round = !content.closed_in_sync_round && !port_has_had_message; + let closed_before_sync_round = !content.closed_in_sync_round && !port_has_had_message && port_was_used; if closed_during_sync_round || closed_before_sync_round { return Err(( @@ -690,6 +690,13 @@ pub(crate) fn default_handle_start_exit( sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus ) -> CompScheduling { debug_assert_eq!(exec_state.mode, CompMode::StartExit); + for port_index in 0..comp_ctx.num_ports() { + let port_info = comp_ctx.get_port_by_index_mut(port_index); + if port_info.state.is_blocked() { + return CompScheduling::Sleep; + } + } + sched_ctx.info(&format!("Component starting exit (reason: {:?})", exec_state.exit_reason)); exec_state.mode = CompMode::BusyExit; let exit_inside_sync = exec_state.exit_reason.is_in_sync(); @@ -704,7 +711,8 @@ pub(crate) fn default_handle_start_exit( // Iterating over ports by index to work around borrowing rules for port_index in 0..comp_ctx.num_ports() { let port = comp_ctx.get_port_by_index_mut(port_index); - if port.state.is_closed() || port.close_at_sync_end { + println!("DEBUG: Considering port:\n{:?}", port); + if port.state.is_closed() || port.state.is_set(PortStateFlag::Transmitted) || port.close_at_sync_end { // Already closed, or in the process of being closed continue; } @@ -904,7 +912,7 @@ fn prepare_send_message_with_ports( /// all stored in the component's execution state by the /// `prepare_send_message_with_ports` function. Port must be ready to send! fn perform_send_message_with_ports( - exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, consensus: &mut Consensus, + exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup ) { debug_assert_eq!(exec_state.mode, CompMode::BlockedPutPortsReady); @@ -941,7 +949,9 @@ fn perform_send_message_with_ports( peer_port: transmit_port_info.peer_port_id, kind: transmit_port_info.kind, state: transmit_port_state - }) + }); + + comp_ctx.change_port_peer(sched_ctx, transmit_port_handle, None); } // And finally, send the message to the peer diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index 2227d58f69243c627a582a07643868bfbbd6ea36..0f2fd6c887a4062a0dff4fccee2df85abde65ce4 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -431,7 +431,6 @@ impl Component for CompPDL { return CompScheduling::Requeue; }, EC::NewChannel => { - debug_assert_eq!(self.exec_state.mode, CompMode::NonSync); debug_assert!(self.exec_ctx.stmt.is_none()); let channel = comp_ctx.create_channel(); self.exec_ctx.stmt = ExecStmt::CreatedChannel(( diff --git a/src/runtime2/tests/transfer_ports.rs b/src/runtime2/tests/transfer_ports.rs index e1234cbe3760456e80e4c73b32daa94df1d285c8..bd34250eec82e7e0e8eb49fbfdc5349a992ad680 100644 --- a/src/runtime2/tests/transfer_ports.rs +++ b/src/runtime2/tests/transfer_ports.rs @@ -18,4 +18,66 @@ fn test_transfer_precreated_port_with_owned_peer() { new port_receiver(b); } ", "constructor", no_args()); +} + +#[test] +fn test_transfer_precreated_port_with_foreign_peer() { + compile_and_create_component(" + primitive port_sender(out> tx, in to_send) { + sync put(tx, to_send); + } + + primitive port_receiver(in> rx) { + sync auto a = get(rx); + } + + composite constructor() { + channel tx -> rx; + channel forgotten -> to_send; + new port_sender(tx, to_send); + new port_receiver(rx); + } + ", "constructor", no_args()); +} + +#[test] +fn test_transfer_synccreated_port() { + compile_and_create_component(" + primitive port_sender(out> tx) { + sync { + channel a -> b; + put(tx, b); + } + } + + primitive port_receiver(in> rx) { + sync auto a = get(rx); + } + + composite constructor() { + channel a -> b; + new port_sender(a); + new port_receiver(b); + } + ", "constructor", no_args()); +} + +#[test] +fn test_transfer_precreated_port_with_owned_peer_back_and_forth() { + +} + +#[test] +fn test_transfer_precreated_port_with_foreign_peer_back_and_forth() { + +} + +#[test] +fn test_transfer_precreated_port_with_owned_peer_and_communication() { + +} + +#[test] +fn test_transfer_precreated_port_with_foreign_peer_and_communication() { + } \ No newline at end of file