Changeset - 432fcde4e554
[Not reviewed]
0 6 0
MH - 3 years ago 2022-05-15 14:41:30
contact@maxhenger.nl
Fix async component creation error in tcp listener
6 files changed with 73 insertions and 28 deletions:
0 comments (0 inline, 0 general)
src/runtime2/component/component.rs
Show inline comments
 
@@ -1288,13 +1288,12 @@ fn perform_send_message_with_ports_notify_peers(
 

	
 
        // Set the flag for transmission
 
        transmit_port_info.state.set(PortStateFlag::Transmitted);
 

	
 
        // Block the peer of the port
 
        let message = control.create_port_transfer_message(unblock_put_control_id, comp_ctx.id, peer_port_id);
 
        println!("DEBUG: Port transfer message\nControl ID: {:?}\nMessage: {:?}", unblock_put_control_id, message);
 
        let peer_handle = comp_ctx.get_peer_handle(peer_comp_id);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 

	
 
        peer_info.handle.send_message_logged(sched_ctx, message, true);
 
    }
 

	
 
@@ -1402,13 +1401,12 @@ fn default_handle_ack(
 
                let _should_remove = handle.decrement_users();
 
                debug_assert!(_should_remove.is_none());
 
            },
 
            AckAction::UnblockPutWithPorts => {
 
                // Send the message (containing ports) stored in the component
 
                // execution state to the recipient
 
                println!("DEBUG: Unblocking put with ports");
 
                debug_assert_eq!(exec_state.mode, CompMode::PutPortsBlockedAwaitingAcks);
 
                exec_state.mode = CompMode::PutPortsBlockedSendingPort;
 
                let port_handle = comp_ctx.get_port_handle(exec_state.mode_port);
 

	
 
                // Little bit of a hack, we didn't really unblock the sending
 
                // port, but this will mesh nicely with waiting for the sending
src/runtime2/component/component_internet.rs
Show inline comments
 
@@ -296,20 +296,36 @@ impl Component for ComponentTcpClient {
 
        }
 
    }
 
}
 

	
 
impl ComponentTcpClient {
 
    pub(crate) fn new(arguments: ValueGroup) -> Self {
 
        debug_assert_eq!(arguments.values.len(), 4);
 
        // Two possible cases here: if the number of arguments is 3, then we
 
        // get: (socket_handle, input_port, output_port). If the number of
 
        // arguments is 4, then we get: (ip, port, input_port, output_port).
 
        assert!(arguments.values.len() == 3 || arguments.values.len() == 4);
 

	
 
        // Parsing arguments
 
        let (ip_address, port) = ip_addr_and_port_from_args(&arguments, 0, 1);
 
        let input_port = component::port_id_from_eval(arguments.values[2].as_input());
 
        let output_port = component::port_id_from_eval(arguments.values[3].as_output());
 
        let (socket, input_port, output_port) = if arguments.values.len() == 3 {
 
            let socket_handle = arguments.values[0].as_sint32();
 
            let socket = SocketTcpClient::new_from_handle(socket_handle);
 

	
 
            let input_port = component::port_id_from_eval(arguments.values[1].as_input());
 
            let output_port = component::port_id_from_eval(arguments.values[2].as_output());
 

	
 
            (socket, input_port, output_port)
 
        } else {
 
            let (ip_address, port) = ip_addr_and_port_from_args(&arguments, 0, 1);
 
            let socket = SocketTcpClient::new(ip_address, port);
 

	
 
            let input_port = component::port_id_from_eval(arguments.values[2].as_input());
 
            let output_port = component::port_id_from_eval(arguments.values[3].as_output());
 

	
 
            (socket, input_port, output_port)
 
        };
 

	
 
        let socket = SocketTcpClient::new(ip_address, port);
 
        if let Err(socket) = socket {
 
            todo!("friendly error reporting: failed to open socket (reason: {:?})", socket);
 
        }
 

	
 
        return Self{
 
            socket_state: ClientSocketState::Connected(socket.unwrap()),
 
@@ -413,13 +429,13 @@ impl ListenerSocketState {
 
            ListenerSocketState::Error => unreachable!(),
 
        }
 
    }
 
}
 

	
 
struct PendingComponent {
 
    client: SocketTcpClient,
 
    client: i32, // OS socket handle
 
    cmd_rx: PortId,
 
    data_tx: PortId,
 
}
 

	
 
enum ListenerSyncState {
 
    AwaitingCmd,
 
@@ -453,12 +469,15 @@ pub struct ComponentTcpListener {
 

	
 
impl Component for ComponentTcpListener {
 
    fn on_creation(&mut self, id: CompId, sched_ctx: &SchedulerCtx) {
 
        // Retrieve type information for the message with ports we're going to send
 
        let pd = &sched_ctx.runtime.protocol;
 

	
 
        self.tcp_client_definition = sched_ctx.runtime.protocol.find_procedure(b"std.internet", b"tcp_client")
 
            .expect("'tcp_client' component in the 'std.internet' module");
 

	
 
        let cmd_type = pd.find_type(b"std.internet", b"ListenerCmd")
 
            .expect("'ListenerCmd' type in the 'std.internet' module");
 
        let cmd_type = cmd_type.as_union();
 

	
 
        self.input_union_accept_tag = cmd_type.get_variant_tag_value(b"Accept").unwrap();
 
        self.input_union_shutdown_tag = cmd_type.get_variant_tag_value(b"Shutdown").unwrap();
 
@@ -538,21 +557,25 @@ impl Component for ComponentTcpListener {
 
                            ListenerSyncState::AcceptCommandReceived |
 
                            ListenerSyncState::AcceptChannelGenerated => unreachable!(),
 
                            ListenerSyncState::AcceptGenerateComponent => {
 
                                // Now that we're outside the sync round, create the tcp client
 
                                // component
 
                                let pending = self.pending_component.take().unwrap();
 
                                let socket_component: Box<dyn Component> = Box::new(ComponentTcpClient::new_with_existing_connection(
 
                                    pending.client, pending.cmd_rx, pending.data_tx
 
                                ));
 
                                component::special_create_component(
 

	
 
                                let arguments = ValueGroup::new_stack(vec![
 
                                    Value::SInt32(pending.client),
 
                                    Value::Input(port_id_to_eval(pending.cmd_rx)),
 
                                    Value::Output(port_id_to_eval(pending.data_tx)),
 
                                ]);
 
                                component::default_start_create_component(
 
                                    &mut self.exec_state, sched_ctx, comp_ctx, &mut self.control,
 
                                    &mut self.inbox_main, &mut self.inbox_backup, socket_component,
 
                                    vec![pending.cmd_rx, pending.data_tx]
 
                                    &mut self.inbox_main, &mut self.inbox_backup,
 
                                    self.tcp_client_definition.0, self.tcp_client_definition.1,
 
                                    arguments
 
                                );
 
                                self.sync_state = ListenerSyncState::AwaitingCmd; // superfluous, see ListenerSyncState.take()
 
                                self.sync_state = ListenerSyncState::AwaitingCmd;
 
                            },
 
                            ListenerSyncState::FinishSyncThenQuit => {
 
                                self.exec_state.set_as_start_exit(ExitReason::Termination);
 
                            },
 
                        }
 

	
 
@@ -593,16 +616,15 @@ impl Component for ComponentTcpListener {
 
                            }
 
                        }
 
                    },
 
                    ListenerSyncState::AcceptCommandReceived => {
 
                        let socket = self.socket_state.get_socket();
 
                        match socket.accept() {
 
                            Ok(client) => {
 
                            Ok(client_handle) => {
 
                                // Create the channels (and the inbox entries, to stay consistent
 
                                // with the expectations from the `component` module's functions)
 
                                let client = client.unwrap();
 
                                let cmd_channel = comp_ctx.create_channel();
 
                                let data_channel = comp_ctx.create_channel();
 

	
 
                                let port_ids = [
 
                                    cmd_channel.putter_id, cmd_channel.getter_id,
 
                                    data_channel.putter_id, data_channel.getter_id,
 
@@ -632,13 +654,13 @@ impl Component for ComponentTcpListener {
 

	
 
                                // Prepare for finishing the consensus round, and once finished,
 
                                // create the tcp client component
 
                                self.sync_state = ListenerSyncState::AcceptChannelGenerated;
 
                                debug_assert!(self.pending_component.is_none());
 
                                self.pending_component = Some(PendingComponent{
 
                                    client,
 
                                    client: client_handle,
 
                                    cmd_rx: cmd_channel.getter_id,
 
                                    data_tx: data_channel.putter_id
 
                                });
 

	
 
                                return CompScheduling::Requeue;
 
                            },
 
@@ -698,12 +720,13 @@ impl ComponentTcpListener {
 
            pending_component: None,
 
            poll_ticket: None,
 
            inbox_main: vec![None, None],
 
            inbox_backup: InboxBackup::new(),
 
            pdl_input_port_id: input_port,
 
            pdl_output_port_id: output_port,
 
            tcp_client_definition: (ProcedureDefinitionId::new_invalid(), TypeId::new_invalid()),
 
            input_union_accept_tag: -1,
 
            input_union_shutdown_tag: -1,
 
            output_struct_tx_index: 0,
 
            output_struct_rx_index: 0,
 
            exec_state: CompExecState::new(),
 
            control: ControlLayer::default(),
src/runtime2/poll/mod.rs
Show inline comments
 
@@ -257,13 +257,13 @@ impl PollingThread {
 
    #[inline]
 
    fn user_data_as_key(data: UserData) -> u64 {
 
        return data.0;
 
    }
 

	
 
    fn log(&self, message: &str) {
 
        if self.log_level >= LogLevel::Info {
 
        if LogLevel::Info >= self.log_level {
 
            println!("[polling] {}", message);
 
        }
 
    }
 
}
 

	
 
// bit convoluted, but it works
src/runtime2/stdlib/internet.rs
Show inline comments
 
@@ -42,19 +42,20 @@ impl SocketTcpClient {
 
        )?;
 
        if !set_socket_blocking(socket_handle, SOCKET_BLOCKING) {
 
            unsafe{ libc::close(socket_handle); }
 
            return Err(SocketError::Modifying);
 
        }
 

	
 
        println!(" CREATE  [{:04}] client", socket_handle);
 
        return Ok(SocketTcpClient{
 
            socket_handle,
 
            is_blocking: SOCKET_BLOCKING,
 
        })
 
    }
 

	
 
    fn new_from_handle(socket_handle: libc::c_int) -> Result<Self, SocketError> {
 
    pub(crate) fn new_from_handle(socket_handle: libc::c_int) -> Result<Self, SocketError> {
 
        if !set_socket_blocking(socket_handle, SOCKET_BLOCKING) {
 
            unsafe{ libc::close(socket_handle); }
 
            return Err(SocketError::Modifying);
 
        }
 

	
 
        return Ok(SocketTcpClient{
 
@@ -89,12 +90,13 @@ impl SocketTcpClient {
 
        return Ok(result as usize);
 
    }
 
}
 

	
 
impl Drop for SocketTcpClient {
 
    fn drop(&mut self) {
 
        println!("DESTRUCT [{:04}] client", self.socket_handle);
 
        debug_assert!(self.socket_handle >= 0);
 
        unsafe{ close(self.socket_handle) };
 
    }
 
}
 

	
 
impl AsFileDescriptor for SocketTcpClient {
 
@@ -126,32 +128,36 @@ impl SocketTcpListener {
 
            if result < 0 {
 
                unsafe{ libc::close(socket_handle); }
 
                return Err(SocketError::Listening);
 
            }
 
        }
 

	
 

	
 
        println!(" CREATE  [{:04}] listener", socket_handle);
 
        return Ok(SocketTcpListener{
 
            socket_handle,
 
            is_blocking: SOCKET_BLOCKING,
 
        });
 
    }
 

	
 
    pub fn accept(&self) -> Result<Result<SocketTcpClient, SocketError>, IoError> {
 
    pub fn accept(&self) -> Result<libc::c_int, IoError> {
 
        let (mut address, mut address_size) = create_sockaddr_in_empty();
 
        let address_pointer = &mut address as *mut sockaddr_in;
 
        let socket_handle = unsafe { accept(self.socket_handle, address_pointer.cast(), &mut address_size) };
 
        if socket_handle < 0 {
 
            return Err(IoError::last_os_error());
 
        }
 

	
 
        return Ok(SocketTcpClient::new_from_handle(socket_handle));
 
        println!(" CREATE  [{:04}] client (from listener)", socket_handle);
 
        return Ok(socket_handle);
 
    }
 
}
 

	
 
impl Drop for SocketTcpListener {
 
    fn drop(&mut self) {
 
        println!("DESTRUCT [{:04}] listener", self.socket_handle);
 
        debug_assert!(self.socket_handle >= 0);
 
        unsafe{ close(self.socket_handle) };
 
    }
 
}
 

	
 
impl AsFileDescriptor for SocketTcpListener {
src/runtime2/tests/internet.rs
Show inline comments
 
@@ -76,16 +76,16 @@ fn test_stdlib_file() {
 
#[test]
 
fn test_tcp_listener_and_client() {
 
    compile_and_create_component("
 
    import std.internet::*;
 

	
 
    func listen_port() -> u16 {
 
        return 2393;
 
        return 2392;
 
    }
 

	
 
    comp server(u32 num_connections) {
 
    comp server(u32 num_connections, in<()> shutdown) {
 
        // Start tcp listener
 
        channel listen_cmd_tx -> listen_cmd_rx;
 
        channel listen_conn_tx -> listen_conn_rx;
 
        new tcp_listener({}, listen_port(), listen_cmd_rx, listen_conn_tx);
 

	
 
        // Fake channels such that we can create a dummy connection variable
 
@@ -110,12 +110,14 @@ fn test_tcp_listener_and_client() {
 
            new echo_machine(new_connection);
 
            connection_counter += 1;
 
        }
 

	
 
        // Shut down the listener
 
        print(\"server: shutting down listener\");
 
        sync auto v = get(shutdown);
 
        sync put(listen_cmd_tx, ListenerCmd::Shutdown);
 
    }
 

	
 
    // Waits for a single TCP byte (to simplify potentially having to
 
    // concatenate requests) and echos it
 
    comp echo_machine(TcpConnection conn) {
 
        auto data_to_echo = {};
 
@@ -134,13 +136,13 @@ fn test_tcp_listener_and_client() {
 

	
 
        // Ask the tcp connection to shut down
 
        print(\"echo: shutting down\");
 
        sync put(conn.tx, ClientCmd::Shutdown);
 
    }
 

	
 
    comp echo_requester(u8 byte_to_send) {
 
    comp echo_requester(u8 byte_to_send, out<()> done) {
 
        channel cmd_tx -> cmd_rx;
 
        channel data_tx -> data_rx;
 
        new tcp_client({127, 0, 0, 1}, listen_port(), cmd_rx, data_tx);
 

	
 
        // Send the message
 
        print(\"requester: sending bytes\");
 
@@ -160,19 +162,35 @@ fn test_tcp_listener_and_client() {
 
            print(\"requester: Oh no! The echo is an otherworldly distorter\");
 
        }
 

	
 
        // Shut down the TCP connection
 
        print(\"requester: shutting down TCP component\");
 
        sync put(cmd_tx, ClientCmd::Shutdown);
 
        sync put(done, ());
 
    }
 

	
 
    comp constructor() {
 
        auto num_connections = 1;
 
        new server(num_connections);
 
        channel shutdown_listener_tx -> shutdown_listener_rx;
 
        new server(num_connections, shutdown_listener_rx);
 

	
 
        auto connection_index = 0;
 
        auto all_done = {};
 
        while (connection_index < num_connections) {
 
            new echo_requester(cast(connection_index));
 
            channel done_tx -> done_rx;
 
            new echo_requester(cast(connection_index), done_tx);
 
            connection_index += 1;
 
            all_done @= {done_rx};
 
        }
 

	
 
        auto counter = 0;
 
        while (counter < length(all_done)) {
 
            print(\"constructor: waiting for requester to exit\");
 
            sync auto v = get(all_done[counter]);
 
            counter += 1;
 
        }
 

	
 
        print(\"constructor: instructing listener to exit\");
 
        sync put(shutdown_listener_tx, ());
 
    }
 
    ", "constructor", no_args());
 
}
 
\ No newline at end of file
src/runtime2/tests/mod.rs
Show inline comments
 
@@ -11,13 +11,13 @@ mod internet;
 
const LOG_LEVEL: LogLevel = LogLevel::Debug;
 
const NUM_THREADS: u32 = 1;
 

	
 
pub(crate) fn compile_and_create_component(source: &str, routine_name: &str, args: ValueGroup) {
 
    let protocol = ProtocolDescription::parse(source.as_bytes())
 
        .expect("successful compilation");
 
    let runtime = Runtime::new(NUM_THREADS, LOG_LEVEL, protocol)
 
    let runtime = Runtime::new(NUM_THREADS, LogLevel::None, protocol)
 
        .expect("successful runtime startup");
 
    create_component(&runtime, "", routine_name, args);
 
}
 

	
 
pub(crate) fn create_component(rt: &Runtime, module_name: &str, routine_name: &str, args: ValueGroup) {
 
    let prompt = rt.inner.protocol.new_component(
0 comments (0 inline, 0 general)