Changeset - 7c5078fc4f81
[Not reviewed]
0 4 0
MH - 3 years ago 2022-05-14 19:21:46
contact@maxhenger.nl
Rename Cmd in std.internet to ClientCmd, fix port transfer bug
4 files changed with 19 insertions and 19 deletions:
0 comments (0 inline, 0 general)
src/runtime2/component/component.rs
Show inline comments
 
@@ -863,98 +863,98 @@ pub(crate) fn special_create_component(
 
            instantiator_id: instantiator_port_id,
 
            instantiator_handle: instantiator_port_handle,
 
            created_id: created_port_id,
 
            created_handle: created_port_handle,
 
            is_open,
 
        });
 
    }
 

	
 
    // Set peer of the port for the new component
 
    for pair in port_pairs.iter() {
 
        let instantiator_port_info = instantiator_ctx.get_port(pair.instantiator_handle);
 
        let created_port_info = created_ctx.get_port_mut(pair.created_handle);
 

	
 
        // Note: we checked above (in debug mode) that the peer of the port is
 
        // owned by the creator as well, now check if the peer is transferred
 
        // as well.
 
        let created_port_peer_index = port_pairs.iter()
 
            .position(|v| v.instantiator_id == instantiator_port_info.peer_port_id);
 

	
 
        match created_port_peer_index {
 
            Some(created_port_peer_index) => {
 
                // Both ends of the channel are moving to the new component
 
                let peer_pair = &port_pairs[created_port_peer_index];
 
                created_port_info.peer_port_id = peer_pair.created_id;
 
                created_port_info.peer_comp_id = reservation.id();
 
            },
 
            None => {
 
                created_port_info.peer_comp_id = instantiator_ctx.id;
 
                if pair.is_open {
 
                    created_ctx.change_port_peer(sched_ctx, pair.created_handle, Some(instantiator_ctx.id));
 
                }
 
            }
 
        }
 
    }
 

	
 
    // Store component in runtime storage and retrieve component fields in their
 
    // name memory location
 
    let (created_key, created_runtime_component) = sched_ctx.runtime.finish_create_component(
 
        reservation, component_instance, created_ctx, false
 
    );
 

	
 
    let created_ctx = &mut created_runtime_component.ctx;
 
    let created_component = &mut created_runtime_component.component;
 
    created_component.on_creation(created_key.downgrade(), sched_ctx);
 

	
 
    // Transfer messages and link instantiator to created component
 
    for pair in port_pairs.iter() {
 
        instantiator_ctx.change_port_peer(sched_ctx, pair.instantiator_handle, None);
 
        instantiator_ctx.remove_port(pair.instantiator_handle);
 
        transfer_messages(inbox_main, inbox_backup, pair, instantiator_ctx, created_ctx, created_component.as_mut());
 
        instantiator_ctx.remove_port(pair.instantiator_handle);
 

	
 
        let created_port_info = created_ctx.get_port(pair.created_handle);
 
        if pair.is_open && created_port_info.peer_comp_id == instantiator_ctx.id {
 
            // Set up channel between instantiator component port, and its peer,
 
            // which is owned by the new component
 
            let instantiator_port_handle = instantiator_ctx.get_port_handle(created_port_info.peer_port_id);
 
            let instantiator_port_info = instantiator_ctx.get_port_mut(instantiator_port_handle);
 
            instantiator_port_info.peer_port_id = created_port_info.self_id;
 
            instantiator_ctx.change_port_peer(sched_ctx, instantiator_port_handle, Some(created_ctx.id));
 
        }
 
    }
 

	
 
    // By definition we did not have any remote peers for the transferred ports,
 
    // so we can schedule the new component immediately
 
    sched_ctx.runtime.enqueue_work(created_key);
 
}
 

	
 
/// Puts the component in an execution state where the specified component will
 
/// end up being created. The component goes through state changes (driven by
 
/// incoming control messages) to make sure that all of the ports that are going
 
/// to be transferred are not in a blocked state. Once finished the component
 
/// returns to the `NonSync` mode.
 
pub(crate) fn default_start_create_component(
 
    exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx,
 
    control: &mut ControlLayer, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup,
 
    definition_id: ProcedureDefinitionId, type_id: TypeId, arguments: ValueGroup
 
) {
 
    debug_assert_eq!(exec_state.mode, CompMode::NonSync);
 

	
 
    let mut transferred_ports = Vec::new();
 
    find_ports_in_value_group(&arguments, &mut transferred_ports);
 

	
 
    // Set execution state as waiting until we can create the component. If we
 
    // can do so right away, then we will.
 
    exec_state.set_as_create_component_blocked(definition_id, type_id, arguments);
 
    if ports_not_blocked(comp_ctx, &transferred_ports) {
 
        perform_create_component(exec_state, sched_ctx, comp_ctx, control, inbox_main, inbox_backup);
 
    }
 
}
 

	
 
/// Actually creates a component (and assumes that the caller made sure that
 
/// none of the ports are involved in a blocking operation).
 
pub(crate) fn perform_create_component(
 
    exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, instantiator_ctx: &mut CompCtx,
 
    control: &mut ControlLayer, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup
 
) {
 
    // Retrieve ports from the arguments
 
    debug_assert_eq!(exec_state.mode, CompMode::NewComponentBlocked);
 
@@ -1030,98 +1030,98 @@ pub(crate) fn perform_create_component(
 
                    // component.
 
                    let peer_pair = &port_pairs[created_port_peer_index];
 
                    created_port_info.peer_port_id = peer_pair.created_id;
 
                    created_port_info.peer_comp_id = reservation.id();
 
                },
 
                None => {
 
                    // Peer port remains with instantiator. However, we cannot
 
                    // set the peer on the instantiator yet, because the new
 
                    // component has not yet been stored in the runtime's
 
                    // component storage. So we do this later
 
                    created_port_info.peer_comp_id = instantiator_ctx.id;
 
                    if pair.is_open {
 
                        created_ctx.change_port_peer(sched_ctx, pair.created_handle, Some(instantiator_ctx.id));
 
                    }
 
                }
 
            }
 
        } else {
 
            // Peer is a different component
 
            if pair.is_open {
 
                // And the port is still open, so we need to notify the peer
 
                let peer_handle = instantiator_ctx.get_peer_handle(created_port_info.peer_comp_id);
 
                let peer_info = instantiator_ctx.get_peer(peer_handle);
 
                created_ctx.change_port_peer(sched_ctx, pair.created_handle, Some(peer_info.id));
 
                created_component_has_remote_peers = true;
 
            }
 
        }
 
    }
 

	
 
    // Now we store the new component into the runtime's component storage using
 
    // the reservation.
 
    let component = create_component(
 
        &sched_ctx.runtime.protocol, procedure_id, procedure_type_id,
 
        arguments, port_pairs.len()
 
    );
 
    let (created_key, created_runtime_component) = sched_ctx.runtime.finish_create_component(
 
        reservation, component, created_ctx, false
 
    );
 
    let created_ctx = &mut created_runtime_component.ctx;
 
    let created_component = &mut created_runtime_component.component;
 
    created_component.on_creation(created_key.downgrade(), sched_ctx);
 

	
 
    // We now pass along the messages that the instantiator component still has
 
    // that belong to the new component. At the same time we'll take care of
 
    // setting the correct peer of the instantiator component
 
    for pair in port_pairs.iter() {
 
        // Transferring the messages and removing the port from the
 
        // instantiator component
 
        instantiator_ctx.change_port_peer(sched_ctx, pair.instantiator_handle, None);
 
        instantiator_ctx.remove_port(pair.instantiator_handle);
 
        transfer_messages(inbox_main, inbox_backup, pair, instantiator_ctx, created_ctx, created_component.as_mut());
 
        instantiator_ctx.remove_port(pair.instantiator_handle);
 

	
 
        // Here we take care of the case where the instantiator previously owned
 
        // both ends of the channel, but has transferred one port to the new
 
        // component (hence creating a channel between the instantiator
 
        // component and the new component).
 
        let created_port_info = created_ctx.get_port(pair.created_handle);
 
        if pair.is_open && created_port_info.peer_comp_id == instantiator_ctx.id {
 
            // Note: the port we're receiving here belongs to the instantiator
 
            // and is NOT in the "port_pairs" array.
 
            let instantiator_port_handle = instantiator_ctx.get_port_handle(created_port_info.peer_port_id);
 
            let instantiator_port_info = instantiator_ctx.get_port_mut(instantiator_port_handle);
 
            instantiator_port_info.peer_port_id = created_port_info.self_id;
 
            instantiator_ctx.change_port_peer(sched_ctx, instantiator_port_handle, Some(created_ctx.id));
 
        }
 
    }
 

	
 
    // Finally: if we did move ports around whose peers are different
 
    // components, then we'll initiate the appropriate protocol to notify them.
 
    if created_component_has_remote_peers {
 
        let schedule_entry_id = control.add_schedule_entry(created_ctx.id);
 
        for pair in &port_pairs {
 
            let port_info = created_ctx.get_port(pair.created_handle);
 
            if pair.is_open && port_info.peer_comp_id != instantiator_ctx.id && port_info.peer_comp_id != created_ctx.id {
 
                // Peer component is not the instantiator, and it is not the
 
                // new component itself
 
                let message = control.add_reroute_entry(
 
                    instantiator_ctx.id, port_info.peer_port_id, port_info.peer_comp_id,
 
                    pair.instantiator_id, pair.created_id, created_ctx.id,
 
                    schedule_entry_id
 
                );
 
                let peer_handle = created_ctx.get_peer_handle(port_info.peer_comp_id);
 
                let peer_info = created_ctx.get_peer(peer_handle);
 

	
 
                peer_info.handle.send_message_logged(sched_ctx, message, true);
 
            }
 
        }
 
    } else {
 
        // We can schedule the component immediately, we do not have to wait
 
        // for any peers: there are none.
 
        sched_ctx.runtime.enqueue_work(created_key);
 
    }
 

	
 
    exec_state.mode = CompMode::NonSync;
 
    exec_state.mode_component = (ProcedureDefinitionId::new_invalid(), TypeId::new_invalid());
 
}
 

	
 
#[inline]
 
pub(crate) fn default_handle_exit(_exec_state: &CompExecState) -> CompScheduling {
src/runtime2/tests/internet.rs
Show inline comments
 
use super::*;
 

	
 
// silly test to make sure that the PDL will never be an issue when doing TCP
 
// stuff with the actual components
 
#[test]
 
fn test_stdlib_file() {
 
    compile_and_create_component("
 
    import std.internet as inet;
 

	
 
    comp fake_listener_once(out<inet::TcpConnection> tx) {
 
        channel cmd_tx -> cmd_rx;
 
        channel data_tx -> data_rx;
 
        new fake_socket(cmd_rx, data_tx);
 
        sync put(tx, inet::TcpConnection{
 
            tx: cmd_tx,
 
            rx: data_rx,
 
        });
 
    }
 

	
 
    comp fake_socket(in<inet::Cmd> cmds, out<u8[]> tx) {
 
    comp fake_socket(in<inet::ClientCmd> cmds, out<u8[]> tx) {
 
        auto to_send = {};
 

	
 
        auto shutdown = false;
 
        while (!shutdown) {
 
            auto keep_going = true;
 
            sync {
 
                while (keep_going) {
 
                    auto cmd = get(cmds);
 
                    if (let inet::Cmd::Send(data) = cmd) {
 
                    if (let inet::ClientCmd::Send(data) = cmd) {
 
                        to_send = data;
 
                        keep_going = false;
 
                    } else if (let inet::Cmd::Receive = cmd) {
 
                    } else if (let inet::ClientCmd::Receive = cmd) {
 
                        put(tx, to_send);
 
                    } else if (let inet::Cmd::Finish = cmd) {
 
                    } else if (let inet::ClientCmd::Finish = cmd) {
 
                        keep_going = false;
 
                    } else if (let inet::Cmd::Shutdown = cmd) {
 
                    } else if (let inet::ClientCmd::Shutdown = cmd) {
 
                        keep_going = false;
 
                        shutdown = true;
 
                    }
 
                }
 
            }
 
        }
 
    }
 

	
 
    comp fake_client(inet::TcpConnection conn) {
 
        sync put(conn.tx, inet::Cmd::Send({1, 3, 3, 7}));
 
        sync put(conn.tx, inet::ClientCmd::Send({1, 3, 3, 7}));
 
        sync {
 
            put(conn.tx, inet::Cmd::Receive);
 
            put(conn.tx, inet::ClientCmd::Receive);
 
            auto val = get(conn.rx);
 
            while (val[0] != 1 || val[1] != 3 || val[2] != 3 || val[3] != 7) {
 
                print(\"this is going very wrong\");
 
            }
 
            put(conn.tx, inet::Cmd::Finish);
 
            put(conn.tx, inet::ClientCmd::Finish);
 
        }
 
        sync put(conn.tx, inet::Cmd::Shutdown);
 
        sync put(conn.tx, inet::ClientCmd::Shutdown);
 
    }
 

	
 
    comp constructor() {
 
        channel conn_tx -> conn_rx;
 
        new fake_listener_once(conn_tx);
 

	
 
        // Same crap as before:
 
        channel cmd_tx -> unused_cmd_rx;
 
        channel unused_data_tx -> data_rx;
 
        auto connection = inet::TcpConnection{ tx: cmd_tx, rx: data_rx };
 

	
 
        sync {
 
            connection = get(conn_rx);
 
        }
 

	
 
        new fake_client(connection);
 
    }
 
    ", "constructor", no_args());
 
}
 
\ No newline at end of file
src/runtime2/tests/mod.rs
Show inline comments
 
@@ -121,155 +121,155 @@ fn test_unguarded_select() {
 
}
 

	
 
#[test]
 
fn test_empty_select() {
 
    let pd = ProtocolDescription::parse(b"
 
    comp constructor() {
 
        u32 index = 0;
 
        while (index < 5) {
 
            sync select {}
 
            index += 1;
 
        }
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(3, LOG_LEVEL, pd).unwrap();
 
    create_component(&rt, "", "constructor", no_args());
 
}
 

	
 
#[test]
 
fn test_random_u32_temporary_thingo() {
 
    let pd = ProtocolDescription::parse(b"
 
    import std.random::random_u32;
 

	
 
    comp random_taker(in<u32> generator, u32 num_values) {
 
        auto i = 0;
 
        while (i < num_values) {
 
            sync {
 
                auto a = get(generator);
 
            }
 
            i += 1;
 
        }
 
    }
 

	
 
    comp constructor() {
 
        channel tx -> rx;
 
        auto num_values = 25;
 
        new random_u32(tx, 1, 100, num_values);
 
        new random_taker(rx, num_values);
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(1, LOG_LEVEL, pd).unwrap();
 
    create_component(&rt, "", "constructor", no_args());
 
}
 

	
 
#[test]
 
fn test_tcp_socket_http_request() {
 
    let _pd = ProtocolDescription::parse(b"
 
    import std.internet::*;
 

	
 
    comp requester(out<Cmd> cmd_tx, in<u8[]> data_rx) {
 
    comp requester(out<ClientCmd> cmd_tx, in<u8[]> data_rx) {
 
        print(\"*** TCPSocket: Sending request\");
 
        sync {
 
            put(cmd_tx, Cmd::Send(b\"GET / HTTP/1.1\\r\\n\\r\\n\"));
 
            put(cmd_tx, ClientCmd::Send(b\"GET / HTTP/1.1\\r\\n\\r\\n\"));
 
        }
 

	
 
        print(\"*** TCPSocket: Receiving response\");
 
        auto buffer = {};
 
        auto done_receiving = false;
 
        sync while (!done_receiving) {
 
            put(cmd_tx, Cmd::Receive);
 
            put(cmd_tx, ClientCmd::Receive);
 
            auto data = get(data_rx);
 
            buffer @= data;
 

	
 
            // Completely crap detection of end-of-document. But here we go, we
 
            // try to detect the trailing </html>. Proper way would be to parse
 
            // for 'content-length' or 'content-encoding'
 
            s32 index = 0;
 
            s32 partial_length = cast(length(data) - 7);
 
            while (index < partial_length) {
 
                // No string conversion yet, so check byte buffer one byte at
 
                // a time.
 
                auto c1 = data[index];
 
                if (c1 == cast('<')) {
 
                    auto c2 = data[index + 1];
 
                    auto c3 = data[index + 2];
 
                    auto c4 = data[index + 3];
 
                    auto c5 = data[index + 4];
 
                    auto c6 = data[index + 5];
 
                    auto c7 = data[index + 6];
 
                    if ( // i.e. if (data[index..] == '</html>'
 
                        c2 == cast('/') && c3 == cast('h') && c4 == cast('t') &&
 
                        c5 == cast('m') && c6 == cast('l') && c7 == cast('>')
 
                    ) {
 
                        print(\"*** TCPSocket: Detected </html>\");
 
                        put(cmd_tx, Cmd::Finish);
 
                        put(cmd_tx, ClientCmd::Finish);
 
                        done_receiving = true;
 
                    }
 
                }
 
                index += 1;
 
            }
 
        }
 

	
 
        print(\"*** TCPSocket: Requesting shutdown\");
 
        sync {
 
            put(cmd_tx, Cmd::Shutdown);
 
            put(cmd_tx, ClientCmd::Shutdown);
 
        }
 
    }
 

	
 
    comp main() {
 
        channel cmd_tx -> cmd_rx;
 
        channel data_tx -> data_rx;
 
        new tcp_client({142, 250, 179, 163}, 80, cmd_rx, data_tx); // port 80 of google
 
        new requester(cmd_tx, data_rx);
 
    }
 
    ").expect("compilation");
 

	
 
    // This test is disabled because it performs a HTTP request to google.
 
    // let rt = Runtime::new(1, true, pd).unwrap();
 
    // let rt = Runtime::new(1, LOG_LEVEL, _pd).unwrap();
 
    // create_component(&rt, "", "main", no_args());
 
}
 

	
 
#[test]
 
fn test_sending_receiving_union() {
 
    let pd = ProtocolDescription::parse(b"
 
    union Cmd {
 
        Set(u8[]),
 
        Get,
 
        Shutdown,
 
    }
 

	
 
    comp database(in<Cmd> rx, out<u8[]> tx) {
 
        auto stored = {};
 
        auto done = false;
 
        while (!done) {
 
            sync {
 
                auto command = get(rx);
 
                if (let Cmd::Set(bytes) = command) {
 
                    print(\"database: storing value\");
 
                    stored = bytes;
 
                } else if (let Cmd::Get = command) {
 
                    print(\"database: returning value\");
 
                    put(tx, stored);
 
                } else if (let Cmd::Shutdown = command) {
 
                    print(\"database: shutting down\");
 
                    done = true;
 
                } else while (true) print(\"impossible\"); // no other case possible
 
            }
 
        }
 
    }
 

	
 
    comp client(out<Cmd> tx, in<u8[]> rx, u32 num_rounds) {
 
        auto round = 0;
 
        while (round < num_rounds) {
 
            auto set_value = b\"hello there\";
 
            print(\"client: putting a value\");
 
            sync put(tx, Cmd::Set(set_value));
 

	
 
            auto retrieved = {};
 
            print(\"client: retrieving what was sent\");
 
            sync {
 
                put(tx, Cmd::Get);
 
                retrieved = get(rx);
 
            }
 

	
 
            if (set_value != retrieved) while (true) print(\"wrong!\");
 

	
std/std.internet.pdl
Show inline comments
 
#module std.internet
 

	
 
union ClientCmd {
 
    Send(u8[]),
 
    Receive,
 
    Finish,
 
    Shutdown,
 
}
 

	
 
comp tcp_client(u8[] ip, u16 port, in<Cmd> cmds, out<u8[]> rx) {
 
comp tcp_client(u8[] ip, u16 port, in<ClientCmd> cmds, out<u8[]> rx) {
 
    #builtin
 
}
 

	
 
union ListenerCmd {
 
    Accept,
 
    Shutdown,
 
}
 

	
 
struct TcpConnection {
 
    out<Cmd> tx,
 
    out<ClientCmd> tx,
 
    in<u8[]> rx,
 
}
 

	
 
/* comp tcp_listener(u8[] ip, u16 port, in<ListenerCmd> cmds, out<TcpConnection> rx) {
 
    #builtin
 
} */
 
\ No newline at end of file
0 comments (0 inline, 0 general)