Changeset - 339b493050e2
[Not reviewed]
0 9 0
MH - 3 years ago 2022-05-14 21:18:24
contact@maxhenger.nl
WIP on tcp listener/socket test
9 files changed with 202 insertions and 50 deletions:
0 comments (0 inline, 0 general)
src/protocol/ast.rs
Show inline comments
 
@@ -1083,12 +1083,13 @@ pub enum ProcedureSource {
 
    FuncSelectStart,
 
    FuncSelectRegisterCasePort,
 
    FuncSelectWait,
 
    // Builtin components, available to user
 
    CompRandomU32, // TODO: Remove, temporary thing
 
    CompTcpClient,
 
    CompTcpListener,
 
}
 

	
 
impl ProcedureSource {
 
    pub(crate) fn is_builtin(&self) -> bool {
 
        match self {
 
            ProcedureSource::FuncUserDefined | ProcedureSource::CompUserDefined => false,
 
@@ -1849,23 +1850,24 @@ pub enum Method {
 
    SelectStart, // SelectStart(total_num_cases, total_num_ports)
 
    SelectRegisterCasePort, // SelectRegisterCasePort(case_index, port_index, port_id)
 
    SelectWait, // SelectWait() -> u32
 
    // Builtin component,
 
    ComponentRandomU32,
 
    ComponentTcpClient,
 
    ComponentTcpListener,
 
    // User-defined
 
    UserFunction,
 
    UserComponent,
 
}
 

	
 
impl Method {
 
    pub(crate) fn is_public_builtin(&self) -> bool {
 
        use Method::*;
 
        match self {
 
            Get | Put | Fires | Create | Length | Assert | Print => true,
 
            ComponentRandomU32 | ComponentTcpClient => true,
 
            ComponentRandomU32 | ComponentTcpClient | ComponentTcpListener => true,
 
            _ => false,
 
        }
 
    }
 

	
 
    pub(crate) fn is_user_defined(&self) -> bool {
 
        use Method::*;
src/protocol/eval/executor.rs
Show inline comments
 
@@ -767,13 +767,13 @@ impl Prompt {
 
                                        None => {
 
                                            cur_frame.expr_stack.push_back(ExprInstruction::EvalExpr(expr.this.upcast()));
 
                                            return Ok(EvalContinuation::SelectWait)
 
                                        },
 
                                    }
 
                                },
 
                                Method::ComponentRandomU32 | Method::ComponentTcpClient => {
 
                                Method::ComponentRandomU32 | Method::ComponentTcpClient | Method::ComponentTcpListener => {
 
                                    debug_assert_eq!(heap[expr.procedure].parameters.len(), cur_frame.expr_values.len());
 
                                    debug_assert_eq!(heap[cur_frame.position].as_new().expression, expr.this);
 
                                },
 
                                Method::UserComponent => {
 
                                    // This is actually handled by the evaluation
 
                                    // of the statement.
src/protocol/mod.rs
Show inline comments
 
@@ -157,12 +157,38 @@ impl ProtocolDescription {
 
        return Some(TypeInspector{
 
            heap: definition,
 
            type_table: type_monomorph
 
        });
 
    }
 

	
 
    /// Again a somewhat temporary method. Can be used by components to look up
 
    /// the definition of a particular procedure. Intended use is to find the
 
    /// DefinitionId/TypeId of builtin components.
 
    pub(crate) fn find_procedure(&self, module_name: &[u8], proc_name: &[u8]) -> Option<(ProcedureDefinitionId, TypeId)> {
 
        // Lookup type definition in module
 
        let root_id = self.lookup_module_root(module_name)?;
 
        let module = &self.heap[root_id];
 
        let definition_id = module.get_definition_by_ident(&self.heap, proc_name)?;
 
        let definition = &self.heap[definition_id];
 

	
 
        // Make sure the procedure is not polymorphic
 
        if !definition.poly_vars().is_empty() {
 
            return None;
 
        }
 
        if !definition.is_procedure() {
 
            return None;
 
        }
 

	
 
        // Lookup in type table
 
        let definition = definition.as_procedure();
 
        let type_parts = [ConcreteTypePart::Component(definition.this, 0)];
 
        let type_id = self.types.get_monomorph_type_id(&definition.this.upcast(), &type_parts)
 
            .expect("type ID for non-polymorphic procedure");
 
        return Some((definition.this, type_id));
 
    }
 

	
 
    fn lookup_module_root(&self, module_name: &[u8]) -> Option<RootId> {
 
        for module in self.modules.iter() {
 
            match &module.name {
 
                Some(name) => if name.as_bytes() == module_name {
 
                    return Some(module.root_id);
 
                },
src/protocol/parser/pass_definitions.rs
Show inline comments
 
@@ -375,12 +375,13 @@ impl PassDefinitions {
 
                ("std.global", "create") => ProcedureSource::FuncCreate,
 
                ("std.global", "length") => ProcedureSource::FuncLength,
 
                ("std.global", "assert") => ProcedureSource::FuncAssert,
 
                ("std.global", "print") => ProcedureSource::FuncPrint,
 
                ("std.random", "random_u32") => ProcedureSource::CompRandomU32,
 
                ("std.internet", "tcp_client") => ProcedureSource::CompTcpClient,
 
                ("std.internet", "tcp_listener") => ProcedureSource::CompTcpListener,
 
                _ => panic!(
 
                    "compiler error: unknown builtin procedure '{}' in module '{}'",
 
                    procedure_name, module_name
 
                ),
 
            };
 

	
 
@@ -1677,12 +1678,13 @@ impl PassDefinitions {
 
                                    ProcedureSource::FuncCreate => Method::Create,
 
                                    ProcedureSource::FuncLength => Method::Length,
 
                                    ProcedureSource::FuncAssert => Method::Assert,
 
                                    ProcedureSource::FuncPrint => Method::Print,
 
                                    ProcedureSource::CompRandomU32 => Method::ComponentRandomU32,
 
                                    ProcedureSource::CompTcpClient => Method::ComponentTcpClient,
 
                                    ProcedureSource::CompTcpListener => Method::ComponentTcpListener,
 
                                    _ => todo!("other procedure sources"),
 
                                };
 

	
 
                                // Function call: consume the arguments
 
                                let func_span = parser_type.full_span;
 
                                let mut full_span = func_span;
src/protocol/parser/pass_validation_linking.rs
Show inline comments
 
@@ -1155,13 +1155,14 @@ impl Visitor for PassValidationLinking {
 
            },
 
            Method::Print => {},
 
            Method::SelectStart
 
            | Method::SelectRegisterCasePort
 
            | Method::SelectWait => unreachable!(), // not usable by programmer directly
 
            Method::ComponentRandomU32
 
            | Method::ComponentTcpClient => {
 
            | Method::ComponentTcpClient
 
            | Method::ComponentTcpListener => {
 
                expecting_wrapping_new_stmt = true;
 
            },
 
            Method::UserFunction => {}
 
            Method::UserComponent => {
 
                expecting_wrapping_new_stmt = true;
 
            },
src/runtime2/component/component.rs
Show inline comments
 
@@ -238,12 +238,13 @@ pub(crate) fn create_component(
 

	
 
    if definition.source.is_builtin() {
 
        // Builtin component
 
        let component: Box<dyn Component> = match definition.source {
 
            ProcedureSource::CompRandomU32 => Box::new(ComponentRandomU32::new(arguments)),
 
            ProcedureSource::CompTcpClient => Box::new(ComponentTcpClient::new(arguments)),
 
            ProcedureSource::CompTcpListener => Box::new(ComponentTcpListener::new(arguments)),
 
            _ => unreachable!(),
 
        };
 

	
 
        return component;
 
    } else {
 
        // User-defined component
src/runtime2/component/component_internet.rs
Show inline comments
 
@@ -5,14 +5,16 @@ use crate::runtime2::stdlib::internet::*;
 
use crate::runtime2::poll::*;
 

	
 
use super::component::{self, *};
 
use super::control_layer::*;
 
use super::consensus::*;
 

	
 

	
 
use std::io::ErrorKind as IoErrorKind;
 
use std::net::{IpAddr, Ipv4Addr};
 
use crate::protocol::{ProcedureDefinitionId, TypeId};
 

	
 
// -----------------------------------------------------------------------------
 
// ComponentTcpClient
 
// -----------------------------------------------------------------------------
 

	
 
enum ClientSocketState {
 
@@ -410,49 +412,38 @@ impl ListenerSocketState {
 
            ListenerSocketState::Connected(v) => return v,
 
            ListenerSocketState::Error => unreachable!(),
 
        }
 
    }
 
}
 

	
 
struct PendingComponent {
 
    client: SocketTcpClient,
 
    cmd_rx: PortId,
 
    data_tx: PortId,
 
}
 

	
 
enum ListenerSyncState {
 
    AwaitingCmd,
 
    AcceptCommandReceived,
 
    AcceptChannelGenerated{ client: SocketTcpClient, cmd_rx: PortId, data_tx: PortId },
 
    AcceptCommandReceived, // just received `Accept` command
 
    AcceptChannelGenerated, // created channel, waiting to end the sync round
 
    AcceptGenerateComponent, // sync ended, back in non-sync, now generate component
 
    FinishSyncThenQuit,
 
}
 

	
 
impl ListenerSyncState {
 
    // Bit of a hacky solution: keeps the listener sync state intact, except
 
    // if it is `AcceptChannelGenerated`, that will be replaced with
 
    // `AwaitingCmd`. The reason is to move the `client` out.
 
    fn take(&mut self) -> ListenerSyncState {
 
        use ListenerSyncState::*;
 

	
 
        match self {
 
            AwaitingCmd => return AwaitingCmd,
 
            AcceptCommandReceived => return AcceptCommandReceived,
 
            FinishSyncThenQuit => return FinishSyncThenQuit,
 
            AcceptChannelGenerated{ .. } => {
 
                let mut swapped = ListenerSyncState::AwaitingCmd;
 
                std::mem::swap(self, &mut swapped);
 
                return swapped;
 
            }
 
        }
 
    }
 
}
 

	
 
pub struct ComponentTcpListener {
 
    // Properties for the tcp socket
 
    socket_state: ListenerSocketState,
 
    sync_state: ListenerSyncState,
 
    pending_component: Option<PendingComponent>,
 
    poll_ticket: Option<PollTicket>,
 
    inbox_main: InboxMain,
 
    inbox_backup: InboxBackup,
 
    pdl_input_port_id: PortId, // input port, receives commands
 
    pdl_output_port_id: PortId, // output port, sends connections
 
    // Information about union tags
 
    // Type information extracted from protocol
 
    tcp_client_definition: (ProcedureDefinitionId, TypeId),
 
    input_union_accept_tag: i64,
 
    input_union_shutdown_tag: i64,
 
    output_struct_rx_index: usize,
 
    output_struct_tx_index: usize,
 
    // Generic component state
 
    exec_state: CompExecState,
 
@@ -500,13 +491,15 @@ impl Component for ComponentTcpListener {
 
    fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, _message: DataMessage) {
 
        unreachable!();
 
    }
 

	
 
    fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) {
 
        match message {
 
            Message::Data(_message) => unreachable!(),
 
            Message::Data(message) => {
 
                self.handle_incoming_data_message(sched_ctx, comp_ctx, message);
 
            },
 
            Message::Sync(message) => {
 
                let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message);
 
                component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, comp_ctx, decision, &mut self.consensus);
 
            },
 
            Message::Control(message) => {
 
                if let Err(location_and_message) = component::default_handle_control_message(
 
@@ -520,40 +513,44 @@ impl Component for ComponentTcpListener {
 
                sched_ctx.info("Received polling event");
 
            },
 
        }
 
    }
 

	
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling {
 
        sched_ctx.info(&format!("Running comonent ComponentTcpListener (mode: {:?})", self.exec_state.mode));
 
        sched_ctx.info(&format!("Running component ComponentTcpListener (mode: {:?})", self.exec_state.mode));
 

	
 
        match self.exec_state.mode {
 
            CompMode::BlockedSelect
 
                => unreachable!(),
 
            CompMode::PutPortsBlockedTransferredPorts |
 
            CompMode::PutPortsBlockedAwaitingAcks |
 
            CompMode::PutPortsBlockedSendingPort |
 
            CompMode::NewComponentBlocked
 
                => return CompScheduling::Sleep,
 
            CompMode::NonSync => {
 
                match &self.socket_state {
 
                    ListenerSocketState::Connected(_socket) => {
 
                        match self.sync_state.take() {
 
                        match self.sync_state {
 
                            ListenerSyncState::AwaitingCmd => {
 
                                component::default_handle_sync_start(
 
                                    &mut self.exec_state, &mut self.inbox_main, sched_ctx, comp_ctx, &mut self.consensus
 
                                );
 
                            },
 
                            ListenerSyncState::AcceptCommandReceived => unreachable!(),
 
                            ListenerSyncState::AcceptChannelGenerated{ client, cmd_rx, data_tx } => {
 
                            ListenerSyncState::AcceptCommandReceived |
 
                            ListenerSyncState::AcceptChannelGenerated => unreachable!(),
 
                            ListenerSyncState::AcceptGenerateComponent => {
 
                                // Now that we're outside the sync round, create the tcp client
 
                                // component
 
                                let socket_component: Box<dyn Component> = Box::new(ComponentTcpClient::new_with_existing_connection(client, cmd_rx, data_tx));
 
                                let pending = self.pending_component.take().unwrap();
 
                                let socket_component: Box<dyn Component> = Box::new(ComponentTcpClient::new_with_existing_connection(
 
                                    pending.client, pending.cmd_rx, pending.data_tx
 
                                ));
 
                                component::special_create_component(
 
                                    &mut self.exec_state, sched_ctx, comp_ctx, &mut self.control,
 
                                    &mut self.inbox_main, &mut self.inbox_backup, socket_component,
 
                                    vec![cmd_rx, data_tx]
 
                                    vec![pending.cmd_rx, pending.data_tx]
 
                                );
 
                                self.sync_state = ListenerSyncState::AwaitingCmd; // superfluous, see ListenerSyncState.take()
 
                            },
 
                            ListenerSyncState::FinishSyncThenQuit => {
 
                                self.exec_state.set_as_start_exit(ExitReason::Termination);
 
                            },
 
@@ -616,54 +613,57 @@ impl Component for ComponentTcpListener {
 
                                    self.inbox_main.push(None);
 
                                    self.consensus.notify_of_new_port(expected_port_index, port_handle, comp_ctx);
 
                                }
 

	
 
                                // Construct the message containing the appropriate ports that will
 
                                // be sent to the component commanding this listener.
 
                                let mut values = ValueGroup::new_stack(vec![
 
                                    Value::Unassigned, Value::Unassigned
 
                                ]);
 
                                values.values[self.output_struct_tx_index] = Value::Output(port_id_to_eval(cmd_channel.putter_id));
 
                                values.values[self.output_struct_rx_index] = Value::Input(port_id_to_eval(data_channel.getter_id));
 
                                let mut values = ValueGroup::new_stack(Vec::with_capacity(1));
 
                                values.values.push(Value::Struct(0));
 
                                values.regions.push(vec![Value::Unassigned, Value::Unassigned]);
 
                                values.regions[0][self.output_struct_tx_index] = Value::Output(port_id_to_eval(cmd_channel.putter_id));
 
                                values.regions[0][self.output_struct_rx_index] = Value::Input(port_id_to_eval(data_channel.getter_id));
 
                                if let Err(location_and_message) = component::default_send_data_message(
 
                                    &mut self.exec_state, self.pdl_output_port_id, PortInstruction::NoSource, values,
 
                                    sched_ctx, &mut self.consensus, &mut self.control, comp_ctx
 
                                ) {
 
                                    component::default_handle_error_for_builtin(
 
                                        &mut self.exec_state, sched_ctx, location_and_message
 
                                    );
 
                                }
 

	
 
                                // And finish the consensus round
 
                                component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus);
 

	
 
                                // Enter a state such that when we leave the consensus round and
 
                                // go back to the nonsync state, that we will actually create the
 
                                // tcp client component.
 
                                self.sync_state = ListenerSyncState::AcceptChannelGenerated {
 
                                // Prepare for finishing the consensus round, and once finished,
 
                                // create the tcp client component
 
                                self.sync_state = ListenerSyncState::AcceptChannelGenerated;
 
                                debug_assert!(self.pending_component.is_none());
 
                                self.pending_component = Some(PendingComponent{
 
                                    client,
 
                                    cmd_rx: cmd_channel.getter_id,
 
                                    data_tx: data_channel.putter_id
 
                                };
 
                                });
 

	
 
                                return CompScheduling::Requeue;
 
                            },
 
                            Err(err) => {
 
                                if err.kind() == IoErrorKind::WouldBlock {
 
                                    return CompScheduling::Sleep;
 
                                } else {
 
                                    todo!("handle listener.accept error {:?}", err)
 
                                }
 
                            }
 
                        }
 
                    },
 
                    ListenerSyncState::AcceptChannelGenerated{ .. } => unreachable!(),
 
                    ListenerSyncState::FinishSyncThenQuit => {
 
                    ListenerSyncState::AcceptChannelGenerated => {
 
                        component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus);
 
                        self.sync_state = ListenerSyncState::AcceptGenerateComponent;
 
                        return CompScheduling::Requeue;
 
                    }
 
                    ListenerSyncState::FinishSyncThenQuit => {
 
                        component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus);
 
                        return CompScheduling::Requeue;
 
                    },
 
                    ListenerSyncState::AcceptGenerateComponent => unreachable!(),
 
                }
 
            },
 
            CompMode::BlockedGet => {
 
                return CompScheduling::Sleep;
 
            },
 
            CompMode::SyncEnd | CompMode::BlockedPut
 
@@ -692,12 +692,13 @@ impl ComponentTcpListener {
 
            todo!("friendly error reporting: failed to open socket (reason: {:?})", socket);
 
        }
 

	
 
        return Self {
 
            socket_state: ListenerSocketState::Connected(socket.unwrap()),
 
            sync_state: ListenerSyncState::AwaitingCmd,
 
            pending_component: None,
 
            poll_ticket: None,
 
            inbox_main: vec![None, None],
 
            inbox_backup: InboxBackup::new(),
 
            pdl_input_port_id: input_port,
 
            pdl_output_port_id: output_port,
 
            input_union_accept_tag: -1,
 
@@ -706,12 +707,27 @@ impl ComponentTcpListener {
 
            output_struct_rx_index: 0,
 
            exec_state: CompExecState::new(),
 
            control: ControlLayer::default(),
 
            consensus: Consensus::new(),
 
        }
 
    }
 

	
 
    fn handle_incoming_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: DataMessage) {
 
        if self.exec_state.mode.is_in_sync_block() {
 
            self.consensus.handle_incoming_data_message(comp_ctx, &message);
 
        }
 

	
 
        match component::default_handle_incoming_data_message(
 
            &mut self.exec_state, &mut self.inbox_main, comp_ctx, message, sched_ctx, &mut self.control
 
        ) {
 
            IncomingData::PlacedInSlot => {},
 
            IncomingData::SlotFull(message) => {
 
                self.inbox_backup.push(message);
 
            }
 
        }
 
    }
 
}
 

	
 
fn ip_addr_and_port_from_args(
 
    arguments: &ValueGroup, ip_index: usize, port_index: usize
 
) -> (IpAddr, u16) {
 
    debug_assert!(ip_index < arguments.values.len());
src/runtime2/tests/internet.rs
Show inline comments
 
@@ -69,6 +69,110 @@ fn test_stdlib_file() {
 
        }
 

	
 
        new fake_client(connection);
 
    }
 
    ", "constructor", no_args());
 
}
 

	
 
#[test]
 
fn test_tcp_listener_and_client() {
 
    compile_and_create_component("
 
    import std.internet::*;
 

	
 
    func listen_port() -> u16 {
 
        return 2393;
 
    }
 

	
 
    comp server(u32 num_connections) {
 
        // Start tcp listener
 
        channel listen_cmd_tx -> listen_cmd_rx;
 
        channel listen_conn_tx -> listen_conn_rx;
 
        new tcp_listener({}, listen_port(), listen_cmd_rx, listen_conn_tx);
 

	
 
        // Fake channels such that we can create a dummy connection variable
 
        channel client_cmd_tx -> unused_client_cmd_rx;
 
        channel unused_client_data_tx -> client_data_rx;
 
        auto new_connection = TcpConnection{
 
            tx: client_cmd_tx,
 
            rx: client_data_rx,
 
        };
 

	
 
        auto connection_counter = 0;
 
        while (connection_counter < num_connections) {
 
            // Wait until we get a connection
 
            print(\"server: waiting for an accepted connection\");
 
            sync {
 
                put(listen_cmd_tx, ListenerCmd::Accept);
 
                new_connection = get(listen_conn_rx);
 
            }
 

	
 
            // We have a new connection, spawn an 'echoer' for it
 
            print(\"server: spawning an echo'ing component\");
 
            new echo_machine(new_connection);
 
            connection_counter += 1;
 
        }
 

	
 
        // Shut down the listener
 
        print(\"server: shutting down listener\");
 
    }
 

	
 
    // Waits for a single TCP byte (to simplify potentially having to
 
    // concatenate requests) and echos it
 
    comp echo_machine(TcpConnection conn) {
 
        auto data_to_echo = {};
 

	
 
        // Wait for a message
 
        sync {
 
            print(\"echo: receiving data\");
 
            put(conn.tx, ClientCmd::Receive);
 
            data_to_echo = get(conn.rx);
 
            put(conn.tx, ClientCmd::Finish);
 
        }
 

	
 
        // Echo the message
 
        print(\"echo: sending back data\");
 
        sync put(conn.tx, ClientCmd::Send(data_to_echo));
 

	
 
        // Ask the tcp connection to shut down
 
        print(\"echo: shutting down\");
 
        sync put(conn.tx, ClientCmd::Shutdown);
 
    }
 

	
 
    comp echo_requester(u8 byte_to_send) {
 
        channel cmd_tx -> cmd_rx;
 
        channel data_tx -> data_rx;
 
        new tcp_client({127, 0, 0, 1}, listen_port(), cmd_rx, data_tx);
 

	
 
        // Send the message
 
        print(\"requester: sending bytes\");
 
        sync put(cmd_tx, ClientCmd::Send({ byte_to_send }));
 

	
 
        // Receive the echo'd byte
 
        auto received_byte = byte_to_send + 1;
 
        sync {
 
            print(\"requester: receiving echo response\");
 
            put(cmd_tx, ClientCmd::Receive);
 
            received_byte = get(data_rx)[0];
 
            put(cmd_tx, ClientCmd::Finish);
 
        }
 

	
 
        // Silly check, as always
 
        while (byte_to_send != received_byte) {
 
            print(\"requester: Oh no! The echo is an otherworldly distorter\");
 
        }
 

	
 
        // Shut down the TCP connection
 
        print(\"requester: shutting down TCP component\");
 
        sync put(cmd_tx, ClientCmd::Shutdown);
 
    }
 

	
 
    comp constructor() {
 
        auto num_connections = 1;
 
        new server(num_connections);
 

	
 
        auto connection_index = 0;
 
        while (connection_index < num_connections) {
 
            new echo_requester(cast(connection_index));
 
        }
 
    }
 
    ", "constructor", no_args());
 
}
 
\ No newline at end of file
std/std.internet.pdl
Show inline comments
 
@@ -18,9 +18,9 @@ union ListenerCmd {
 

	
 
struct TcpConnection {
 
    out<ClientCmd> tx,
 
    in<u8[]> rx,
 
}
 

	
 
/* comp tcp_listener(u8[] ip, u16 port, in<ListenerCmd> cmds, out<TcpConnection> rx) {
 
comp tcp_listener(u8[] ip, u16 port, in<ListenerCmd> cmds, out<TcpConnection> rx) {
 
    #builtin
 
} */
 
\ No newline at end of file
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)