Changeset - cdb4810532c2
[Not reviewed]
0 3 0
mh - 3 years ago 2022-05-11 15:19:14
contact@maxhenger.nl
Fix rerouting bug for transmitted ports
3 files changed with 76 insertions and 5 deletions:
0 comments (0 inline, 0 general)
src/runtime2/component/component.rs
Show inline comments
 
@@ -577,7 +577,7 @@ pub(crate) fn default_handle_control_message(
 

	
 
                if exec_state.mode.is_in_sync_block() {
 
                    let closed_during_sync_round = content.closed_in_sync_round && port_was_used;
 
                    let closed_before_sync_round = !content.closed_in_sync_round && !port_has_had_message;
 
                    let closed_before_sync_round = !content.closed_in_sync_round && !port_has_had_message && port_was_used;
 

	
 
                    if closed_during_sync_round || closed_before_sync_round {
 
                        return Err((
 
@@ -690,6 +690,13 @@ pub(crate) fn default_handle_start_exit(
 
    sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus
 
) -> CompScheduling {
 
    debug_assert_eq!(exec_state.mode, CompMode::StartExit);
 
    for port_index in 0..comp_ctx.num_ports() {
 
        let port_info = comp_ctx.get_port_by_index_mut(port_index);
 
        if port_info.state.is_blocked() {
 
            return CompScheduling::Sleep;
 
        }
 
    }
 

	
 
    sched_ctx.info(&format!("Component starting exit (reason: {:?})", exec_state.exit_reason));
 
    exec_state.mode = CompMode::BusyExit;
 
    let exit_inside_sync = exec_state.exit_reason.is_in_sync();
 
@@ -704,7 +711,8 @@ pub(crate) fn default_handle_start_exit(
 
    // Iterating over ports by index to work around borrowing rules
 
    for port_index in 0..comp_ctx.num_ports() {
 
        let port = comp_ctx.get_port_by_index_mut(port_index);
 
        if port.state.is_closed() || port.close_at_sync_end {
 
        println!("DEBUG: Considering port:\n{:?}", port);
 
        if port.state.is_closed() || port.state.is_set(PortStateFlag::Transmitted) || port.close_at_sync_end {
 
            // Already closed, or in the process of being closed
 
            continue;
 
        }
 
@@ -904,7 +912,7 @@ fn prepare_send_message_with_ports(
 
/// all stored in the component's execution state by the
 
/// `prepare_send_message_with_ports` function. Port must be ready to send!
 
fn perform_send_message_with_ports(
 
    exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, consensus: &mut Consensus,
 
    exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus,
 
    inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup
 
) {
 
    debug_assert_eq!(exec_state.mode, CompMode::BlockedPutPortsReady);
 
@@ -941,7 +949,9 @@ fn perform_send_message_with_ports(
 
            peer_port: transmit_port_info.peer_port_id,
 
            kind: transmit_port_info.kind,
 
            state: transmit_port_state
 
        })
 
        });
 

	
 
        comp_ctx.change_port_peer(sched_ctx, transmit_port_handle, None);
 
    }
 

	
 
    // And finally, send the message to the peer
src/runtime2/component/component_pdl.rs
Show inline comments
 
@@ -431,7 +431,6 @@ impl Component for CompPDL {
 
                return CompScheduling::Requeue;
 
            },
 
            EC::NewChannel => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::NonSync);
 
                debug_assert!(self.exec_ctx.stmt.is_none());
 
                let channel = comp_ctx.create_channel();
 
                self.exec_ctx.stmt = ExecStmt::CreatedChannel((
src/runtime2/tests/transfer_ports.rs
Show inline comments
 
@@ -19,3 +19,65 @@ fn test_transfer_precreated_port_with_owned_peer() {
 
    }
 
    ", "constructor", no_args());
 
}
 

	
 
#[test]
 
fn test_transfer_precreated_port_with_foreign_peer() {
 
    compile_and_create_component("
 
    primitive port_sender(out<in<u32>> tx, in<u32> to_send) {
 
        sync put(tx, to_send);
 
    }
 

	
 
    primitive port_receiver(in<in<u32>> rx) {
 
        sync auto a = get(rx);
 
    }
 

	
 
    composite constructor() {
 
        channel tx -> rx;
 
        channel forgotten -> to_send;
 
        new port_sender(tx, to_send);
 
        new port_receiver(rx);
 
    }
 
    ", "constructor", no_args());
 
}
 

	
 
#[test]
 
fn test_transfer_synccreated_port() {
 
    compile_and_create_component("
 
    primitive port_sender(out<in<u32>> tx) {
 
        sync {
 
            channel a -> b;
 
            put(tx, b);
 
        }
 
    }
 

	
 
    primitive port_receiver(in<in<u32>> rx) {
 
        sync auto a = get(rx);
 
    }
 

	
 
    composite constructor() {
 
        channel a -> b;
 
        new port_sender(a);
 
        new port_receiver(b);
 
    }
 
    ", "constructor", no_args());
 
}
 

	
 
#[test]
 
fn test_transfer_precreated_port_with_owned_peer_back_and_forth() {
 

	
 
}
 

	
 
#[test]
 
fn test_transfer_precreated_port_with_foreign_peer_back_and_forth() {
 

	
 
}
 

	
 
#[test]
 
fn test_transfer_precreated_port_with_owned_peer_and_communication() {
 

	
 
}
 

	
 
#[test]
 
fn test_transfer_precreated_port_with_foreign_peer_and_communication() {
 

	
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)