Changeset - 432fcde4e554
[Not reviewed]
0 6 0
MH - 3 years ago 2022-05-15 14:41:30
contact@maxhenger.nl
Fix async component creation error in tcp listener
6 files changed with 70 insertions and 25 deletions:
0 comments (0 inline, 0 general)
src/runtime2/component/component.rs
Show inline comments
 
@@ -1291,7 +1291,6 @@ fn perform_send_message_with_ports_notify_peers(
 

	
 
        // Block the peer of the port
 
        let message = control.create_port_transfer_message(unblock_put_control_id, comp_ctx.id, peer_port_id);
 
        println!("DEBUG: Port transfer message\nControl ID: {:?}\nMessage: {:?}", unblock_put_control_id, message);
 
        let peer_handle = comp_ctx.get_peer_handle(peer_comp_id);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 

	
 
@@ -1405,7 +1404,6 @@ fn default_handle_ack(
 
            AckAction::UnblockPutWithPorts => {
 
                // Send the message (containing ports) stored in the component
 
                // execution state to the recipient
 
                println!("DEBUG: Unblocking put with ports");
 
                debug_assert_eq!(exec_state.mode, CompMode::PutPortsBlockedAwaitingAcks);
 
                exec_state.mode = CompMode::PutPortsBlockedSendingPort;
 
                let port_handle = comp_ctx.get_port_handle(exec_state.mode_port);
src/runtime2/component/component_internet.rs
Show inline comments
 
@@ -299,14 +299,30 @@ impl Component for ComponentTcpClient {
 

	
 
impl ComponentTcpClient {
 
    pub(crate) fn new(arguments: ValueGroup) -> Self {
 
        debug_assert_eq!(arguments.values.len(), 4);
 
        // Two possible cases here: if the number of arguments is 3, then we
 
        // get: (socket_handle, input_port, output_port). If the number of
 
        // arguments is 4, then we get: (ip, port, input_port, output_port).
 
        assert!(arguments.values.len() == 3 || arguments.values.len() == 4);
 

	
 
        // Parsing arguments
 
        let (socket, input_port, output_port) = if arguments.values.len() == 3 {
 
            let socket_handle = arguments.values[0].as_sint32();
 
            let socket = SocketTcpClient::new_from_handle(socket_handle);
 

	
 
            let input_port = component::port_id_from_eval(arguments.values[1].as_input());
 
            let output_port = component::port_id_from_eval(arguments.values[2].as_output());
 

	
 
            (socket, input_port, output_port)
 
        } else {
 
            let (ip_address, port) = ip_addr_and_port_from_args(&arguments, 0, 1);
 
            let socket = SocketTcpClient::new(ip_address, port);
 

	
 
            let input_port = component::port_id_from_eval(arguments.values[2].as_input());
 
            let output_port = component::port_id_from_eval(arguments.values[3].as_output());
 

	
 
        let socket = SocketTcpClient::new(ip_address, port);
 
            (socket, input_port, output_port)
 
        };
 

	
 
        if let Err(socket) = socket {
 
            todo!("friendly error reporting: failed to open socket (reason: {:?})", socket);
 
        }
 
@@ -416,7 +432,7 @@ impl ListenerSocketState {
 
}
 

	
 
struct PendingComponent {
 
    client: SocketTcpClient,
 
    client: i32, // OS socket handle
 
    cmd_rx: PortId,
 
    data_tx: PortId,
 
}
 
@@ -456,6 +472,9 @@ impl Component for ComponentTcpListener {
 
        // Retrieve type information for the message with ports we're going to send
 
        let pd = &sched_ctx.runtime.protocol;
 

	
 
        self.tcp_client_definition = sched_ctx.runtime.protocol.find_procedure(b"std.internet", b"tcp_client")
 
            .expect("'tcp_client' component in the 'std.internet' module");
 

	
 
        let cmd_type = pd.find_type(b"std.internet", b"ListenerCmd")
 
            .expect("'ListenerCmd' type in the 'std.internet' module");
 
        let cmd_type = cmd_type.as_union();
 
@@ -541,15 +560,19 @@ impl Component for ComponentTcpListener {
 
                                // Now that we're outside the sync round, create the tcp client
 
                                // component
 
                                let pending = self.pending_component.take().unwrap();
 
                                let socket_component: Box<dyn Component> = Box::new(ComponentTcpClient::new_with_existing_connection(
 
                                    pending.client, pending.cmd_rx, pending.data_tx
 
                                ));
 
                                component::special_create_component(
 

	
 
                                let arguments = ValueGroup::new_stack(vec![
 
                                    Value::SInt32(pending.client),
 
                                    Value::Input(port_id_to_eval(pending.cmd_rx)),
 
                                    Value::Output(port_id_to_eval(pending.data_tx)),
 
                                ]);
 
                                component::default_start_create_component(
 
                                    &mut self.exec_state, sched_ctx, comp_ctx, &mut self.control,
 
                                    &mut self.inbox_main, &mut self.inbox_backup, socket_component,
 
                                    vec![pending.cmd_rx, pending.data_tx]
 
                                    &mut self.inbox_main, &mut self.inbox_backup,
 
                                    self.tcp_client_definition.0, self.tcp_client_definition.1,
 
                                    arguments
 
                                );
 
                                self.sync_state = ListenerSyncState::AwaitingCmd; // superfluous, see ListenerSyncState.take()
 
                                self.sync_state = ListenerSyncState::AwaitingCmd;
 
                            },
 
                            ListenerSyncState::FinishSyncThenQuit => {
 
                                self.exec_state.set_as_start_exit(ExitReason::Termination);
 
@@ -596,10 +619,9 @@ impl Component for ComponentTcpListener {
 
                    ListenerSyncState::AcceptCommandReceived => {
 
                        let socket = self.socket_state.get_socket();
 
                        match socket.accept() {
 
                            Ok(client) => {
 
                            Ok(client_handle) => {
 
                                // Create the channels (and the inbox entries, to stay consistent
 
                                // with the expectations from the `component` module's functions)
 
                                let client = client.unwrap();
 
                                let cmd_channel = comp_ctx.create_channel();
 
                                let data_channel = comp_ctx.create_channel();
 

	
 
@@ -635,7 +657,7 @@ impl Component for ComponentTcpListener {
 
                                self.sync_state = ListenerSyncState::AcceptChannelGenerated;
 
                                debug_assert!(self.pending_component.is_none());
 
                                self.pending_component = Some(PendingComponent{
 
                                    client,
 
                                    client: client_handle,
 
                                    cmd_rx: cmd_channel.getter_id,
 
                                    data_tx: data_channel.putter_id
 
                                });
 
@@ -701,6 +723,7 @@ impl ComponentTcpListener {
 
            inbox_backup: InboxBackup::new(),
 
            pdl_input_port_id: input_port,
 
            pdl_output_port_id: output_port,
 
            tcp_client_definition: (ProcedureDefinitionId::new_invalid(), TypeId::new_invalid()),
 
            input_union_accept_tag: -1,
 
            input_union_shutdown_tag: -1,
 
            output_struct_tx_index: 0,
src/runtime2/poll/mod.rs
Show inline comments
 
@@ -260,7 +260,7 @@ impl PollingThread {
 
    }
 

	
 
    fn log(&self, message: &str) {
 
        if self.log_level >= LogLevel::Info {
 
        if LogLevel::Info >= self.log_level {
 
            println!("[polling] {}", message);
 
        }
 
    }
src/runtime2/stdlib/internet.rs
Show inline comments
 
@@ -45,13 +45,14 @@ impl SocketTcpClient {
 
            return Err(SocketError::Modifying);
 
        }
 

	
 
        println!(" CREATE  [{:04}] client", socket_handle);
 
        return Ok(SocketTcpClient{
 
            socket_handle,
 
            is_blocking: SOCKET_BLOCKING,
 
        })
 
    }
 

	
 
    fn new_from_handle(socket_handle: libc::c_int) -> Result<Self, SocketError> {
 
    pub(crate) fn new_from_handle(socket_handle: libc::c_int) -> Result<Self, SocketError> {
 
        if !set_socket_blocking(socket_handle, SOCKET_BLOCKING) {
 
            unsafe{ libc::close(socket_handle); }
 
            return Err(SocketError::Modifying);
 
@@ -92,6 +93,7 @@ impl SocketTcpClient {
 

	
 
impl Drop for SocketTcpClient {
 
    fn drop(&mut self) {
 
        println!("DESTRUCT [{:04}] client", self.socket_handle);
 
        debug_assert!(self.socket_handle >= 0);
 
        unsafe{ close(self.socket_handle) };
 
    }
 
@@ -129,13 +131,15 @@ impl SocketTcpListener {
 
            }
 
        }
 

	
 

	
 
        println!(" CREATE  [{:04}] listener", socket_handle);
 
        return Ok(SocketTcpListener{
 
            socket_handle,
 
            is_blocking: SOCKET_BLOCKING,
 
        });
 
    }
 

	
 
    pub fn accept(&self) -> Result<Result<SocketTcpClient, SocketError>, IoError> {
 
    pub fn accept(&self) -> Result<libc::c_int, IoError> {
 
        let (mut address, mut address_size) = create_sockaddr_in_empty();
 
        let address_pointer = &mut address as *mut sockaddr_in;
 
        let socket_handle = unsafe { accept(self.socket_handle, address_pointer.cast(), &mut address_size) };
 
@@ -143,12 +147,14 @@ impl SocketTcpListener {
 
            return Err(IoError::last_os_error());
 
        }
 

	
 
        return Ok(SocketTcpClient::new_from_handle(socket_handle));
 
        println!(" CREATE  [{:04}] client (from listener)", socket_handle);
 
        return Ok(socket_handle);
 
    }
 
}
 

	
 
impl Drop for SocketTcpListener {
 
    fn drop(&mut self) {
 
        println!("DESTRUCT [{:04}] listener", self.socket_handle);
 
        debug_assert!(self.socket_handle >= 0);
 
        unsafe{ close(self.socket_handle) };
 
    }
src/runtime2/tests/internet.rs
Show inline comments
 
@@ -79,10 +79,10 @@ fn test_tcp_listener_and_client() {
 
    import std.internet::*;
 

	
 
    func listen_port() -> u16 {
 
        return 2393;
 
        return 2392;
 
    }
 

	
 
    comp server(u32 num_connections) {
 
    comp server(u32 num_connections, in<()> shutdown) {
 
        // Start tcp listener
 
        channel listen_cmd_tx -> listen_cmd_rx;
 
        channel listen_conn_tx -> listen_conn_rx;
 
@@ -113,6 +113,8 @@ fn test_tcp_listener_and_client() {
 

	
 
        // Shut down the listener
 
        print(\"server: shutting down listener\");
 
        sync auto v = get(shutdown);
 
        sync put(listen_cmd_tx, ListenerCmd::Shutdown);
 
    }
 

	
 
    // Waits for a single TCP byte (to simplify potentially having to
 
@@ -137,7 +139,7 @@ fn test_tcp_listener_and_client() {
 
        sync put(conn.tx, ClientCmd::Shutdown);
 
    }
 

	
 
    comp echo_requester(u8 byte_to_send) {
 
    comp echo_requester(u8 byte_to_send, out<()> done) {
 
        channel cmd_tx -> cmd_rx;
 
        channel data_tx -> data_rx;
 
        new tcp_client({127, 0, 0, 1}, listen_port(), cmd_rx, data_tx);
 
@@ -163,16 +165,32 @@ fn test_tcp_listener_and_client() {
 
        // Shut down the TCP connection
 
        print(\"requester: shutting down TCP component\");
 
        sync put(cmd_tx, ClientCmd::Shutdown);
 
        sync put(done, ());
 
    }
 

	
 
    comp constructor() {
 
        auto num_connections = 1;
 
        new server(num_connections);
 
        channel shutdown_listener_tx -> shutdown_listener_rx;
 
        new server(num_connections, shutdown_listener_rx);
 

	
 
        auto connection_index = 0;
 
        auto all_done = {};
 
        while (connection_index < num_connections) {
 
            new echo_requester(cast(connection_index));
 
            channel done_tx -> done_rx;
 
            new echo_requester(cast(connection_index), done_tx);
 
            connection_index += 1;
 
            all_done @= {done_rx};
 
        }
 

	
 
        auto counter = 0;
 
        while (counter < length(all_done)) {
 
            print(\"constructor: waiting for requester to exit\");
 
            sync auto v = get(all_done[counter]);
 
            counter += 1;
 
        }
 

	
 
        print(\"constructor: instructing listener to exit\");
 
        sync put(shutdown_listener_tx, ());
 
    }
 
    ", "constructor", no_args());
 
}
 
\ No newline at end of file
src/runtime2/tests/mod.rs
Show inline comments
 
@@ -14,7 +14,7 @@ const NUM_THREADS: u32 = 1;
 
pub(crate) fn compile_and_create_component(source: &str, routine_name: &str, args: ValueGroup) {
 
    let protocol = ProtocolDescription::parse(source.as_bytes())
 
        .expect("successful compilation");
 
    let runtime = Runtime::new(NUM_THREADS, LOG_LEVEL, protocol)
 
    let runtime = Runtime::new(NUM_THREADS, LogLevel::None, protocol)
 
        .expect("successful runtime startup");
 
    create_component(&runtime, "", routine_name, args);
 
}
0 comments (0 inline, 0 general)