Changeset - cdb4810532c2
[Not reviewed]
0 3 0
mh - 3 years ago 2022-05-11 15:19:14
contact@maxhenger.nl
Fix rerouting bug for transmitted ports
3 files changed with 76 insertions and 5 deletions:
0 comments (0 inline, 0 general)
src/runtime2/component/component.rs
Show inline comments
 
@@ -556,49 +556,49 @@ pub(crate) fn default_handle_control_message(
 
                // The two components (sender and this component) are closing
 
                // the channel at the same time. So we don't care about the
 
                // content of the `ClosePort` message.
 
                default_handle_ack(exec_state, control, control_id, sched_ctx, comp_ctx, consensus, inbox_main, inbox_backup);
 
            } else {
 
                // Respond to the message
 
                let port_info = comp_ctx.get_port(port_handle);
 
                let last_instruction = port_info.last_instruction;
 
                let port_has_had_message = port_info.received_message_for_sync;
 
                default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx);
 
                comp_ctx.change_port_peer(sched_ctx, port_handle, None);
 

	
 
                // Handle any possible error conditions (which boil down to: the
 
                // port has been used, but the peer has died). If not in sync
 
                // mode then we close the port immediately.
 

	
 
                // Note that `port_was_used` does not mean that any messages
 
                // were actually received. It might also mean that e.g. the
 
                // component attempted a `get`, but there were no messages, so
 
                // now it is in the `BlockedGet` state.
 
                let port_was_used = last_instruction != PortInstruction::None;
 

	
 
                if exec_state.mode.is_in_sync_block() {
 
                    let closed_during_sync_round = content.closed_in_sync_round && port_was_used;
 
                    let closed_before_sync_round = !content.closed_in_sync_round && !port_has_had_message;
 
                    let closed_before_sync_round = !content.closed_in_sync_round && !port_has_had_message && port_was_used;
 

	
 
                    if closed_during_sync_round || closed_before_sync_round {
 
                        return Err((
 
                            last_instruction,
 
                            format!("Peer component (id:{}) shut down, so communication cannot (have) succeed(ed)", peer_comp_id.0)
 
                        ));
 
                    }
 
                } else {
 
                    let port_info = comp_ctx.get_port_mut(port_handle);
 
                    port_info.state.set(PortStateFlag::Closed);
 
                }
 
            }
 
        },
 
        ControlMessageContent::UnblockPort => {
 
            // We were previously blocked (or already closed)
 
            let port_to_unblock = message.target_port_id.unwrap();
 
            let port_handle = comp_ctx.get_port_handle(port_to_unblock);
 
            let port_info = comp_ctx.get_port_mut(port_handle);
 

	
 
            debug_assert_eq!(port_info.kind, PortKind::Putter);
 
            debug_assert!(port_info.state.is_set(PortStateFlag::BlockedDueToFullBuffers));
 

	
 
            port_info.state.clear(PortStateFlag::BlockedDueToFullBuffers);
 
            default_handle_recently_unblocked_port(
 
@@ -669,63 +669,71 @@ pub(crate) fn default_handle_sync_start(
 
/// Handles a component that has reached the end of the sync block. This does
 
/// not necessarily mean that the component will go into the `NonSync` mode, as
 
/// it might have to wait for the leader to finish the round for everyone (see
 
/// `default_handle_sync_decision`)
 
pub(crate) fn default_handle_sync_end(
 
    exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx,
 
    consensus: &mut Consensus
 
) {
 
    sched_ctx.info("Component ending sync mode (but possibly waiting for a solution)");
 
    debug_assert_eq!(exec_state.mode, CompMode::Sync);
 
    let decision = consensus.notify_sync_end_success(sched_ctx, comp_ctx);
 
    exec_state.mode = CompMode::SyncEnd;
 
    default_handle_sync_decision(sched_ctx, exec_state, comp_ctx, decision, consensus);
 
}
 

	
 
/// Handles a component initiating the exiting procedure, and closing all of its
 
/// ports. Should only be called once per component (which is ensured by
 
/// checking and modifying the mode in the execution state).
 
#[must_use]
 
pub(crate) fn default_handle_start_exit(
 
    exec_state: &mut CompExecState, control: &mut ControlLayer,
 
    sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus
 
) -> CompScheduling {
 
    debug_assert_eq!(exec_state.mode, CompMode::StartExit);
 
    for port_index in 0..comp_ctx.num_ports() {
 
        let port_info = comp_ctx.get_port_by_index_mut(port_index);
 
        if port_info.state.is_blocked() {
 
            return CompScheduling::Sleep;
 
        }
 
    }
 

	
 
    sched_ctx.info(&format!("Component starting exit (reason: {:?})", exec_state.exit_reason));
 
    exec_state.mode = CompMode::BusyExit;
 
    let exit_inside_sync = exec_state.exit_reason.is_in_sync();
 

	
 
    // If exiting while inside sync mode, report to the leader of the current
 
    // round that we've failed.
 
    if exit_inside_sync {
 
        let decision = consensus.notify_sync_end_failure(sched_ctx, comp_ctx);
 
        default_handle_sync_decision(sched_ctx, exec_state, comp_ctx, decision, consensus);
 
    }
 

	
 
    // Iterating over ports by index to work around borrowing rules
 
    for port_index in 0..comp_ctx.num_ports() {
 
        let port = comp_ctx.get_port_by_index_mut(port_index);
 
        if port.state.is_closed() || port.close_at_sync_end {
 
        println!("DEBUG: Considering port:\n{:?}", port);
 
        if port.state.is_closed() || port.state.is_set(PortStateFlag::Transmitted) || port.close_at_sync_end {
 
            // Already closed, or in the process of being closed
 
            continue;
 
        }
 

	
 
        // Mark as closed
 
        let port_id = port.self_id;
 
        port.state.set(PortStateFlag::Closed);
 

	
 
        // Notify peer of closing
 
        let port_handle = comp_ctx.get_port_handle(port_id);
 
        let (peer, message) = control.initiate_port_closing(port_handle, exit_inside_sync, comp_ctx);
 
        let peer_info = comp_ctx.get_peer(peer);
 
        peer_info.handle.send_message_logged(sched_ctx, Message::Control(message), true);
 
    }
 

	
 
    return CompScheduling::Immediate; // to check if we can shut down immediately
 
}
 

	
 
/// Handles a component waiting until all peers are notified that it is quitting
 
/// (i.e. after calling `default_handle_start_exit`).
 
#[must_use]
 
pub(crate) fn default_handle_busy_exit(
 
    exec_state: &mut CompExecState, control: &ControlLayer,
 
    sched_ctx: &SchedulerCtx
 
@@ -883,86 +891,88 @@ fn prepare_send_message_with_ports(
 

	
 
        // Set the flag for transmission
 
        transmit_port_info.state.set(PortStateFlag::Transmitted);
 

	
 
        // Block the peer of the port
 
        let message = control.create_port_transfer_message(unblock_put_control_id, comp_ctx.id, peer_port_id);
 
        println!("DEBUG: Port transfer message\nControl ID: {:?}\nMessage: {:?}", unblock_put_control_id, message);
 
        let peer_handle = comp_ctx.get_peer_handle(peer_comp_id);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 

	
 
        peer_info.handle.send_message_logged(sched_ctx, message, true);
 
    }
 

	
 
    // We've set up the protocol, once all the PPC's are blocked we are supposed
 
    // to transfer the message to the recipient. So store it temporarily
 
    exec_state.set_as_blocked_put_with_ports(sending_port_id, value);
 

	
 
    return Ok(());
 
}
 

	
 
/// Performs the transmission of a data message that contains ports. These were
 
/// all stored in the component's execution state by the
 
/// `prepare_send_message_with_ports` function. Port must be ready to send!
 
fn perform_send_message_with_ports(
 
    exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, consensus: &mut Consensus,
 
    exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus,
 
    inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup
 
) {
 
    debug_assert_eq!(exec_state.mode, CompMode::BlockedPutPortsReady);
 

	
 
    // Find all ports again
 
    let mut transmit_ports = Vec::new();
 
    find_ports_in_value_group(&exec_state.mode_value, &mut transmit_ports);
 

	
 
    let port_handle = comp_ctx.get_port_handle(exec_state.mode_port);
 
    let port_info = comp_ctx.get_port(port_handle);
 
    debug_assert!(port_info.state.can_send() && !port_info.state.is_blocked());
 
    let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 

	
 
    // Annotate the data message
 
    let message_value = exec_state.mode_value.take();
 
    let mut annotated_message = consensus.annotate_data_message(comp_ctx, port_info, message_value);
 

	
 
    // And further enhance the message by adding data about the ports that are
 
    // being transferred
 
    for (port_locations, transmit_port_id) in transmit_ports {
 
        let transmit_port_handle = comp_ctx.get_port_handle(transmit_port_id);
 
        let transmit_port_info = comp_ctx.get_port(transmit_port_handle);
 

	
 
        let transmit_messages = take_port_messages(comp_ctx, transmit_port_id, inbox_main, inbox_backup);
 

	
 
        let mut transmit_port_state = transmit_port_info.state;
 
        debug_assert!(transmit_port_state.is_set(PortStateFlag::Transmitted));
 
        transmit_port_state.clear(PortStateFlag::Transmitted);
 

	
 
        annotated_message.ports.push(TransmittedPort{
 
            locations: port_locations,
 
            messages: transmit_messages,
 
            peer_comp: transmit_port_info.peer_comp_id,
 
            peer_port: transmit_port_info.peer_port_id,
 
            kind: transmit_port_info.kind,
 
            state: transmit_port_state
 
        })
 
        });
 

	
 
        comp_ctx.change_port_peer(sched_ctx, transmit_port_handle, None);
 
    }
 

	
 
    // And finally, send the message to the peer
 
    let peer_info = comp_ctx.get_peer(peer_handle);
 
    peer_info.handle.send_message_logged(sched_ctx, Message::Data(annotated_message), true);
 
}
 

	
 
/// Handles an `Ack` for the control layer.
 
fn default_handle_ack(
 
    exec_state: &mut CompExecState, control: &mut ControlLayer, control_id: ControlId,
 
    sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus,
 
    inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup
 
) {
 
    // Since an `Ack` may cause another one, handle them in a loop
 
    let mut to_ack = control_id;
 

	
 
    loop {
 
        let (action, new_to_ack) = control.handle_ack(to_ack, sched_ctx, comp_ctx);
 
        match action {
 
            AckAction::SendMessage(target_comp, message) => {
 
                // FIX @NoDirectHandle
 
                let mut handle = sched_ctx.runtime.get_component_public(target_comp);
 
                handle.send_message_logged(sched_ctx, Message::Control(message), true);
 
                let _should_remove = handle.decrement_users();
src/runtime2/component/component_pdl.rs
Show inline comments
 
@@ -410,49 +410,48 @@ impl Component for CompPDL {
 
                    self.exec_state.mode = CompMode::BlockedSelect;
 
                    return CompScheduling::Sleep;
 
                }
 
            },
 
            // Results that can be returned outside of sync mode
 
            EC::ComponentTerminated => {
 
                self.exec_state.set_as_start_exit(ExitReason::Termination);
 
                return CompScheduling::Immediate;
 
            },
 
            EC::SyncBlockStart => {
 
                component::default_handle_sync_start(
 
                    &mut self.exec_state, &mut self.inbox_main, sched_ctx, comp_ctx, &mut self.consensus
 
                );
 
                return CompScheduling::Immediate;
 
            },
 
            EC::NewComponent(definition_id, type_id, arguments) => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::NonSync);
 
                self.create_component_and_transfer_ports(
 
                    sched_ctx, comp_ctx,
 
                    definition_id, type_id, arguments
 
                );
 
                return CompScheduling::Requeue;
 
            },
 
            EC::NewChannel => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::NonSync);
 
                debug_assert!(self.exec_ctx.stmt.is_none());
 
                let channel = comp_ctx.create_channel();
 
                self.exec_ctx.stmt = ExecStmt::CreatedChannel((
 
                    Value::Output(port_id_to_eval(channel.putter_id)),
 
                    Value::Input(port_id_to_eval(channel.getter_id))
 
                ));
 
                self.inbox_main.push(None);
 
                self.inbox_main.push(None);
 
                return CompScheduling::Immediate;
 
            }
 
        }
 
    }
 
}
 

	
 
impl CompPDL {
 
    pub(crate) fn new(initial_state: Prompt, num_ports: usize) -> Self {
 
        let mut inbox_main = Vec::new();
 
        inbox_main.reserve(num_ports);
 
        for _ in 0..num_ports {
 
            inbox_main.push(None);
 
        }
 

	
 
        return Self{
 
            exec_state: CompExecState::new(),
src/runtime2/tests/transfer_ports.rs
Show inline comments
 
use super::*;
 

	
 
#[test]
 
fn test_transfer_precreated_port_with_owned_peer() {
 
    compile_and_create_component("
 
    primitive port_sender(out<in<u32>> tx) {
 
        channel a -> b;
 
        sync put(tx, b);
 
    }
 

	
 
    primitive port_receiver(in<in<u32>> rx) {
 
        sync auto a = get(rx);
 
    }
 

	
 
    composite constructor() {
 
        channel a -> b;
 
        new port_sender(a);
 
        new port_receiver(b);
 
    }
 
    ", "constructor", no_args());
 
}
 

	
 
#[test]
 
fn test_transfer_precreated_port_with_foreign_peer() {
 
    compile_and_create_component("
 
    primitive port_sender(out<in<u32>> tx, in<u32> to_send) {
 
        sync put(tx, to_send);
 
    }
 

	
 
    primitive port_receiver(in<in<u32>> rx) {
 
        sync auto a = get(rx);
 
    }
 

	
 
    composite constructor() {
 
        channel tx -> rx;
 
        channel forgotten -> to_send;
 
        new port_sender(tx, to_send);
 
        new port_receiver(rx);
 
    }
 
    ", "constructor", no_args());
 
}
 

	
 
#[test]
 
fn test_transfer_synccreated_port() {
 
    compile_and_create_component("
 
    primitive port_sender(out<in<u32>> tx) {
 
        sync {
 
            channel a -> b;
 
            put(tx, b);
 
        }
 
    }
 

	
 
    primitive port_receiver(in<in<u32>> rx) {
 
        sync auto a = get(rx);
 
    }
 

	
 
    composite constructor() {
 
        channel a -> b;
 
        new port_sender(a);
 
        new port_receiver(b);
 
    }
 
    ", "constructor", no_args());
 
}
 

	
 
#[test]
 
fn test_transfer_precreated_port_with_owned_peer_back_and_forth() {
 

	
 
}
 

	
 
#[test]
 
fn test_transfer_precreated_port_with_foreign_peer_back_and_forth() {
 

	
 
}
 

	
 
#[test]
 
fn test_transfer_precreated_port_with_owned_peer_and_communication() {
 

	
 
}
 

	
 
#[test]
 
fn test_transfer_precreated_port_with_foreign_peer_and_communication() {
 

	
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)