Changeset - d65eb4f44f1a
[Not reviewed]
0 2 0
MH - 3 years ago 2022-05-13 23:50:41
contact@maxhenger.nl
Fix fake tcp test
2 files changed with 5 insertions and 2 deletions:
0 comments (0 inline, 0 general)
src/runtime2/component/component.rs
Show inline comments
 
@@ -792,385 +792,385 @@ pub(crate) fn default_handle_sync_decision(
 
    };
 

	
 
    debug_assert!(
 
        exec_state.mode == CompMode::SyncEnd || (
 
            exec_state.mode.is_busy_exiting() && exec_state.exit_reason.is_error()
 
        ) || (
 
            exec_state.mode.is_in_sync_block() && decision == SyncRoundDecision::Failure
 
        )
 
    );
 

	
 
    sched_ctx.info(&format!("Handling decision {:?} (in mode: {:?})", decision, exec_state.mode));
 
    consensus.notify_sync_decision(decision);
 
    if success {
 
        // We cannot get a success message if the component has encountered an
 
        // error.
 
        for port_index in 0..comp_ctx.num_ports() {
 
            let port_info = comp_ctx.get_port_by_index_mut(port_index);
 
            if port_info.close_at_sync_end {
 
                port_info.state.set(PortStateFlag::Closed);
 
            }
 
            port_info.state.clear(PortStateFlag::Received);
 
        }
 
        debug_assert_eq!(exec_state.mode, CompMode::SyncEnd);
 
        exec_state.mode = CompMode::NonSync;
 
        return Some(true);
 
    } else {
 
        // We may get failure both in all possible cases. But we should only
 
        // modify the execution state if we're not already in exit mode
 
        if !exec_state.mode.is_busy_exiting() {
 
            sched_ctx.error("failed synchronous round, initiating exit");
 
            exec_state.set_as_start_exit(ExitReason::ErrorNonSync);
 
        }
 
        return Some(false);
 
    }
 
}
 

	
 

	
 
pub(crate) fn default_start_create_component(
 
    exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx,
 
    control: &mut ControlLayer, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup,
 
    definition_id: ProcedureDefinitionId, type_id: TypeId, arguments: ValueGroup
 
) {
 
    debug_assert_eq!(exec_state.mode, CompMode::NonSync);
 

	
 
    let mut transferred_ports = Vec::new();
 
    find_ports_in_value_group(&arguments, &mut transferred_ports);
 

	
 
    // Set execution state as waiting until we can create the component. If we
 
    // can do so right away, then we will.
 
    exec_state.set_as_create_component_blocked(definition_id, type_id, arguments);
 
    if ports_not_blocked(comp_ctx, &transferred_ports) {
 
        perform_create_component(exec_state, sched_ctx, comp_ctx, control, inbox_main, inbox_backup);
 
    }
 
}
 

	
 
/// Actually creates a component (and assumes that the caller made sure that
 
/// none of the ports are involved in a blocking operation).
 
pub(crate) fn perform_create_component(
 
    exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, instantiator_ctx: &mut CompCtx,
 
    control: &mut ControlLayer, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup
 
) {
 
    // Small internal utilities
 
    struct PortPair {
 
        instantiator_id: PortId,
 
        instantiator_handle: LocalPortHandle,
 
        created_id: PortId,
 
        created_handle: LocalPortHandle,
 
        is_open: bool,
 
    }
 

	
 
    // Retrieve ports from the arguments
 
    debug_assert_eq!(exec_state.mode, CompMode::NewComponentBlocked);
 

	
 
    let (procedure_id, procedure_type_id) = exec_state.mode_component;
 
    let mut arguments = exec_state.mode_value.take();
 
    let mut ports = Vec::new();
 
    find_ports_in_value_group(&arguments, &mut ports);
 
    debug_assert!(ports_not_blocked(instantiator_ctx, &ports));
 

	
 
    // Reserve a location for the new component
 
    let reservation = sched_ctx.runtime.start_create_component();
 
    let mut created_ctx = CompCtx::new(&reservation);
 

	
 
    let mut port_pairs = Vec::with_capacity(ports.len());
 

	
 
    // Go over all the ports that will be transferred. Since the ports will get
 
    // a new ID in the new component, we will take care of that here.
 
    for (port_location, instantiator_port_id) in &ports {
 
        // Retrieve port information from instantiator
 
        let instantiator_port_id = *instantiator_port_id;
 
        let instantiator_port_handle = instantiator_ctx.get_port_handle(instantiator_port_id);
 
        let instantiator_port = instantiator_ctx.get_port(instantiator_port_handle);
 

	
 
        // Create port at created component
 
        let created_port_handle = created_ctx.add_port(
 
            instantiator_port.peer_comp_id, instantiator_port.peer_port_id,
 
            instantiator_port.kind, instantiator_port.state
 
        );
 
        let created_port = created_ctx.get_port(created_port_handle);
 
        let created_port_id = created_port.self_id;
 

	
 
        // Modify port ID in the arguments to the new component and store them
 
        // for later access
 
        let is_open = instantiator_port.state.is_open();
 
        port_pairs.push(PortPair{
 
            instantiator_id: instantiator_port_id,
 
            instantiator_handle: instantiator_port_handle,
 
            created_id: created_port_id,
 
            created_handle: created_port_handle,
 
            is_open,
 
        });
 

	
 
        for location in port_location.iter().copied() {
 
            let value = arguments.get_value_mut(location);
 
            match value {
 
                Value::Input(id) => *id = port_id_to_eval(created_port_id),
 
                Value::Output(id) => *id = port_id_to_eval(created_port_id),
 
                _ => unreachable!(),
 
            }
 
        }
 
    }
 

	
 
    // For each of the ports in the newly created component we set the peer to
 
    // the correct value. We will not yet change the peer on the instantiator's
 
    // ports (as we haven't yet stored the new component in the runtime's
 
    // component storage)
 
    let mut created_component_has_remote_peers = false;
 
    for pair in port_pairs.iter() {
 
        let instantiator_port_info = instantiator_ctx.get_port(pair.instantiator_handle);
 
        let created_port_info = created_ctx.get_port_mut(pair.created_handle);
 

	
 
        if created_port_info.peer_comp_id == instantiator_ctx.id {
 
            // The peer of the created component's port seems to be the
 
            // instantiator.
 
            let created_port_peer_index = port_pairs.iter()
 
                .position(|v| v.instantiator_id == instantiator_port_info.peer_port_id);
 

	
 
            match created_port_peer_index {
 
                Some(created_port_peer_index) => {
 
                    // However, the peer port is also moved to the new
 
                    // component, so the complete channel is owned by the new
 
                    // component.
 
                    let peer_pair = &port_pairs[created_port_peer_index];
 
                    created_port_info.peer_port_id = peer_pair.created_id;
 
                    created_port_info.peer_comp_id = reservation.id();
 
                },
 
                None => {
 
                    // Peer port remains with instantiator. However, we cannot
 
                    // set the peer on the instantiator yet, because the new
 
                    // component has not yet been stored in the runtime's
 
                    // component storage. So we do this later
 
                    created_port_info.peer_comp_id = instantiator_ctx.id;
 
                    if pair.is_open {
 
                        created_ctx.change_port_peer(sched_ctx, pair.created_handle, Some(instantiator_ctx.id));
 
                    }
 
                }
 
            }
 
        } else {
 
            // Peer is a different component
 
            if pair.is_open {
 
                // And the port is still open, so we need to notify the peer
 
                let peer_handle = instantiator_ctx.get_peer_handle(created_port_info.peer_comp_id);
 
                let peer_info = instantiator_ctx.get_peer(peer_handle);
 
                created_ctx.change_port_peer(sched_ctx, pair.created_handle, Some(peer_info.id));
 
                created_component_has_remote_peers = true;
 
            }
 
        }
 
    }
 

	
 
    // Now we store the new component into the runtime's component storage using
 
    // the reservation.
 
    let component = create_component(
 
        &sched_ctx.runtime.protocol, procedure_id, procedure_type_id,
 
        arguments, port_pairs.len()
 
    );
 
    let (created_key, created_runtime_component) = sched_ctx.runtime.finish_create_component(
 
        reservation, component, created_ctx, false
 
    );
 
    let created_ctx = &mut created_runtime_component.ctx;
 
    let created_component = &mut created_runtime_component.component;
 
    created_component.on_creation(created_key.downgrade(), sched_ctx);
 

	
 
    // We now pass along the messages that the instantiator component still has
 
    // that belong to the new component. At the same time we'll take care of
 
    // setting the correct peer of the instantiator component
 
    for pair in port_pairs.iter() {
 
        // Transferring the messages and removing the port from the
 
        // instantiator component
 
        let instantiator_port_index = instantiator_ctx.get_port_index(pair.instantiator_handle);
 
        instantiator_ctx.change_port_peer(sched_ctx, pair.instantiator_handle, None);
 
        instantiator_ctx.remove_port(pair.instantiator_handle);
 

	
 
        if let Some(mut message) = inbox_main[instantiator_port_index].take() {
 
        if let Some(mut message) = inbox_main.remove(instantiator_port_index) {
 
            message.data_header.target_port = pair.created_id;
 
            created_component.adopt_message(created_ctx, message);
 
        }
 

	
 
        let mut message_index = 0;
 
        while message_index < inbox_backup.len() {
 
            let message = &inbox_backup[message_index];
 
            if message.data_header.target_port == pair.instantiator_id {
 
                // Transfer the message
 
                let mut message = inbox_backup.remove(message_index);
 
                message.data_header.target_port = pair.created_id;
 
                created_component.adopt_message(created_ctx, message);
 
            } else {
 
                // Message does not belong to the port pair that we're
 
                // transferring to the new component.
 
                message_index += 1;
 
            }
 
        }
 

	
 
        // Here we take care of the case where the instantiator previously owned
 
        // both ends of the channel, but has transferred one port to the new
 
        // component (hence creating a channel between the instantiator
 
        // component and the new component).
 
        let created_port_info = created_ctx.get_port(pair.created_handle);
 
        if pair.is_open && created_port_info.peer_comp_id == instantiator_ctx.id {
 
            // Note: the port we're receiving here belongs to the instantiator
 
            // and is NOT in the "port_pairs" array.
 
            let instantiator_port_handle = instantiator_ctx.get_port_handle(created_port_info.peer_port_id);
 
            let instantiator_port_info = instantiator_ctx.get_port_mut(instantiator_port_handle);
 
            instantiator_port_info.peer_port_id = created_port_info.self_id;
 
            instantiator_ctx.change_port_peer(sched_ctx, instantiator_port_handle, Some(created_ctx.id));
 
        }
 
    }
 

	
 
    // Finally: if we did move ports around whose peers are different
 
    // components, then we'll initiate the appropriate protocol to notify them.
 
    if created_component_has_remote_peers {
 
        let schedule_entry_id = control.add_schedule_entry(created_ctx.id);
 
        for pair in &port_pairs {
 
            let port_info = created_ctx.get_port(pair.created_handle);
 
            if pair.is_open && port_info.peer_comp_id != instantiator_ctx.id && port_info.peer_comp_id != created_ctx.id {
 
                // Peer component is not the instantiator, and it is not the
 
                // new component itself
 
                let message = control.add_reroute_entry(
 
                    instantiator_ctx.id, port_info.peer_port_id, port_info.peer_comp_id,
 
                    pair.instantiator_id, pair.created_id, created_ctx.id,
 
                    schedule_entry_id
 
                );
 
                let peer_handle = created_ctx.get_peer_handle(port_info.peer_comp_id);
 
                let peer_info = created_ctx.get_peer(peer_handle);
 

	
 
                peer_info.handle.send_message_logged(sched_ctx, message, true);
 
            }
 
        }
 
    } else {
 
        // We can schedule the component immediately, we do not have to wait
 
        // for any peers: there are none.
 
        sched_ctx.runtime.enqueue_work(created_key);
 
    }
 

	
 
    exec_state.mode = CompMode::NonSync;
 
    exec_state.mode_component = (ProcedureDefinitionId::new_invalid(), TypeId::new_invalid());
 
}
 

	
 
pub(crate) fn ports_not_blocked(comp_ctx: &CompCtx, ports: &EncounteredPorts) -> bool {
 
    for (_port_locations, port_id) in ports {
 
        let port_handle = comp_ctx.get_port_handle(*port_id);
 
        let port_info = comp_ctx.get_port(port_handle);
 

	
 
        if port_info.state.is_blocked_due_to_port_change() {
 
            return false;
 
        }
 
    }
 

	
 
    return true;
 
}
 

	
 
/// Performs the default action of printing the provided error, and then putting
 
/// the component in the state where it will shut down. Only to be used for
 
/// builtin components: their error message construction is simpler (and more
 
/// common) as they don't have any source code.
 
pub(crate) fn default_handle_error_for_builtin(
 
    exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx,
 
    location_and_message: (PortInstruction, String)
 
) {
 
    let (_location, message) = location_and_message;
 
    sched_ctx.error(&message);
 

	
 
    let exit_reason = if exec_state.mode.is_in_sync_block() {
 
        ExitReason::ErrorInSync
 
    } else {
 
        ExitReason::ErrorNonSync
 
    };
 

	
 
    exec_state.set_as_start_exit(exit_reason);
 
}
 

	
 
#[inline]
 
pub(crate) fn default_handle_exit(_exec_state: &CompExecState) -> CompScheduling {
 
    debug_assert_eq!(_exec_state.mode, CompMode::Exit);
 
    return CompScheduling::Exit;
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Internal messaging/state utilities
 
// -----------------------------------------------------------------------------
 

	
 
/// Sends a message without any transmitted ports. Does not check if sending
 
/// is actually valid.
 
fn send_message_without_ports(
 
    sending_port_handle: LocalPortHandle, value: ValueGroup,
 
    comp_ctx: &CompCtx, sched_ctx: &SchedulerCtx, consensus: &mut Consensus,
 
) {
 
    let port_info = comp_ctx.get_port(sending_port_handle);
 
    debug_assert!(port_info.state.can_send());
 
    let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 
    let peer_info = comp_ctx.get_peer(peer_handle);
 

	
 
    let annotated_message = consensus.annotate_data_message(comp_ctx, port_info, value);
 
    peer_info.handle.send_message_logged(sched_ctx, Message::Data(annotated_message), true);
 
}
 

	
 
/// Prepares sending a message that contains ports. Only once a particular
 
/// protocol has completed (where we notify all the peers that the ports will
 
/// be transferred) will we actually send the message to the recipient.
 
fn start_send_message_with_ports(
 
    sending_port_id: PortId, sending_port_instruction: PortInstruction, value: ValueGroup,
 
    exec_state: &mut CompExecState, comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx,
 
    control: &mut ControlLayer
 
) -> Result<(), (PortInstruction, String)> {
 
    debug_assert_eq!(exec_state.mode, CompMode::Sync); // busy in sync, trying to send
 

	
 
    // Retrieve ports we're going to transfer
 
    let sending_port_handle = comp_ctx.get_port_handle(sending_port_id);
 
    let sending_port_info = comp_ctx.get_port_mut(sending_port_handle);
 
    sending_port_info.last_instruction = sending_port_instruction;
 

	
 
    let mut transmit_ports = Vec::new();
 
    find_ports_in_value_group(&value, &mut transmit_ports);
 
    debug_assert!(!transmit_ports.is_empty()); // required from caller
 

	
 
    // Enter the state where we'll wait until all transferred ports are not
 
    // blocked.
 
    exec_state.set_as_blocked_put_with_ports(sending_port_id, value);
 

	
 
    if ports_not_blocked(comp_ctx, &transmit_ports) {
 
        // Ports are not blocked, so we can send them right away.
 
        perform_send_message_with_ports_notify_peers(
 
            exec_state, comp_ctx, sched_ctx, control, transmit_ports
 
        )?;
 
    } // else: wait until they become unblocked
 

	
 
    return Ok(())
 
}
 

	
 
fn perform_send_message_with_ports_notify_peers(
 
    exec_state: &mut CompExecState, comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx,
 
    control: &mut ControlLayer, transmit_ports: EncounteredPorts
 
) -> Result<(), (PortInstruction, String)> {
 
    // Check we're in the correct state in debug mode
 
    debug_assert_eq!(exec_state.mode, CompMode::PutPortsBlockedTransferredPorts);
 
    debug_assert!(ports_not_blocked(comp_ctx, &transmit_ports));
 

	
 
    // Set up the final Ack that triggers us to send our final message
 
    let unblock_put_control_id = control.add_unblock_put_with_ports_entry();
 
    for (_, port_id) in &transmit_ports {
 
        let transmit_port_handle = comp_ctx.get_port_handle(*port_id);
 
        let transmit_port_info = comp_ctx.get_port_mut(transmit_port_handle);
 
        let peer_comp_id = transmit_port_info.peer_comp_id;
 
        let peer_port_id = transmit_port_info.peer_port_id;
 

	
 

	
 
        // Note: we checked earlier that we are currently in sync mode. Now we
 
        // will check if we've already used the port we're about to transmit.
 
        if !transmit_port_info.last_instruction.is_none() {
 
            let sending_port_handle = comp_ctx.get_port_handle(exec_state.mode_port);
 
            let sending_port_instruction = comp_ctx.get_port(sending_port_handle).last_instruction;
 
            return Err((
 
                sending_port_instruction,
 
                String::from("Cannot transmit one of the ports in this message, as it is used in this sync round")
 
            ));
 
        }
 

	
 
        if transmit_port_info.state.is_set(PortStateFlag::Transmitted) {
 
            let sending_port_handle = comp_ctx.get_port_handle(exec_state.mode_port);
 
            let sending_port_instruction = comp_ctx.get_port(sending_port_handle).last_instruction;
 
            return Err((
 
                sending_port_instruction,
 
                String::from("Cannot transmit one of the ports in this message, as that port is already transmitted")
 
            ));
 
        }
 

	
src/runtime2/tests/internet.rs
Show inline comments
 
use super::*;
 

	
 
// silly test to make sure that the PDL will never be an issue when doing TCP
 
// stuff with the actual components
 
#[test]
 
fn test_stdlib_file() {
 
    compile_and_create_component("
 
    import std.internet as inet;
 

	
 
    comp fake_listener_once(out<inet::TcpConnection> tx) {
 
        channel cmd_tx -> cmd_rx;
 
        channel data_tx -> data_rx;
 
        new fake_socket(cmd_rx, data_tx);
 
        sync put(tx, inet::TcpConnection{
 
            tx: cmd_tx,
 
            rx: data_rx,
 
        });
 
    }
 

	
 
    comp fake_socket(in<inet::Cmd> cmds, out<u8[]> tx) {
 
        auto to_send = {};
 

	
 
        auto shutdown = false;
 
        while (!shutdown) {
 
            auto keep_going = true;
 
            sync {
 
                while (keep_going) {
 
                    auto cmd = get(cmds);
 
                    if (let inet::Cmd::Send(data) = cmd) {
 
                        to_send = data;
 
                        keep_going = false;
 
                    } else if (let inet::Cmd::Receive = cmd) {
 
                        put(tx, to_send);
 
                    } else if (let inet::Cmd::Finish = cmd) {
 
                        keep_going = false;
 
                    } else if (let inet::Cmd::Shutdown = cmd) {
 
                        keep_going = false;
 
                        shutdown = true;
 
                    }
 
                }
 
            }
 
        }
 
    }
 

	
 
    comp fake_client(inet::TcpConnection conn) {
 
        sync put(conn.tx, inet::Cmd::Send({1, 3, 3, 7}));
 
        sync {
 
            put(conn.tx, inet::Cmd::Receive);
 
            auto val = get(conn.rx);
 
            while (val[0] != 1 || val[1] != 3 || val[2] != 3 || val[3] != 7) {}
 
            while (val[0] != 1 || val[1] != 3 || val[2] != 3 || val[3] != 7) {
 
                print(\"this is going very wrong\");
 
            }
 
            put(conn.tx, inet::Cmd::Finish);
 
        }
 
        sync put(conn.tx, inet::Cmd::Shutdown);
 
    }
 

	
 
    comp constructor() {
 
        channel conn_tx -> conn_rx;
 
        new fake_listener_once(conn_tx);
 

	
 
        // Same crap as before:
 
        channel cmd_tx -> unused_cmd_rx;
 
        channel unused_data_tx -> data_rx;
 
        auto connection = inet::TcpConnection{ tx: cmd_tx, rx: data_rx };
 

	
 
        sync {
 
            connection = get(conn_rx);
 
        }
 

	
 
        new fake_client(connection);
 
    }
 
    ", "constructor", no_args());
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)