diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index 05afe71f5e22a5443118a142d3c7f0b74c469540..f5e74559dca3ec8e1c6e26f10e21079df43d5b7c 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -1291,7 +1291,6 @@ fn perform_send_message_with_ports_notify_peers( // Block the peer of the port let message = control.create_port_transfer_message(unblock_put_control_id, comp_ctx.id, peer_port_id); - println!("DEBUG: Port transfer message\nControl ID: {:?}\nMessage: {:?}", unblock_put_control_id, message); let peer_handle = comp_ctx.get_peer_handle(peer_comp_id); let peer_info = comp_ctx.get_peer(peer_handle); @@ -1405,7 +1404,6 @@ fn default_handle_ack( AckAction::UnblockPutWithPorts => { // Send the message (containing ports) stored in the component // execution state to the recipient - println!("DEBUG: Unblocking put with ports"); debug_assert_eq!(exec_state.mode, CompMode::PutPortsBlockedAwaitingAcks); exec_state.mode = CompMode::PutPortsBlockedSendingPort; let port_handle = comp_ctx.get_port_handle(exec_state.mode_port); diff --git a/src/runtime2/component/component_internet.rs b/src/runtime2/component/component_internet.rs index f57eb821a93f58b66d8c991e5add2d4b4bbf1540..4cd9fce6c2e84daa3058cee36774c70eba424b43 100644 --- a/src/runtime2/component/component_internet.rs +++ b/src/runtime2/component/component_internet.rs @@ -299,14 +299,30 @@ impl Component for ComponentTcpClient { impl ComponentTcpClient { pub(crate) fn new(arguments: ValueGroup) -> Self { - debug_assert_eq!(arguments.values.len(), 4); + // Two possible cases here: if the number of arguments is 3, then we + // get: (socket_handle, input_port, output_port). If the number of + // arguments is 4, then we get: (ip, port, input_port, output_port). + assert!(arguments.values.len() == 3 || arguments.values.len() == 4); // Parsing arguments - let (ip_address, port) = ip_addr_and_port_from_args(&arguments, 0, 1); - let input_port = component::port_id_from_eval(arguments.values[2].as_input()); - let output_port = component::port_id_from_eval(arguments.values[3].as_output()); + let (socket, input_port, output_port) = if arguments.values.len() == 3 { + let socket_handle = arguments.values[0].as_sint32(); + let socket = SocketTcpClient::new_from_handle(socket_handle); + + let input_port = component::port_id_from_eval(arguments.values[1].as_input()); + let output_port = component::port_id_from_eval(arguments.values[2].as_output()); + + (socket, input_port, output_port) + } else { + let (ip_address, port) = ip_addr_and_port_from_args(&arguments, 0, 1); + let socket = SocketTcpClient::new(ip_address, port); + + let input_port = component::port_id_from_eval(arguments.values[2].as_input()); + let output_port = component::port_id_from_eval(arguments.values[3].as_output()); + + (socket, input_port, output_port) + }; - let socket = SocketTcpClient::new(ip_address, port); if let Err(socket) = socket { todo!("friendly error reporting: failed to open socket (reason: {:?})", socket); } @@ -416,7 +432,7 @@ impl ListenerSocketState { } struct PendingComponent { - client: SocketTcpClient, + client: i32, // OS socket handle cmd_rx: PortId, data_tx: PortId, } @@ -456,6 +472,9 @@ impl Component for ComponentTcpListener { // Retrieve type information for the message with ports we're going to send let pd = &sched_ctx.runtime.protocol; + self.tcp_client_definition = sched_ctx.runtime.protocol.find_procedure(b"std.internet", b"tcp_client") + .expect("'tcp_client' component in the 'std.internet' module"); + let cmd_type = pd.find_type(b"std.internet", b"ListenerCmd") .expect("'ListenerCmd' type in the 'std.internet' module"); let cmd_type = cmd_type.as_union(); @@ -541,15 +560,19 @@ impl Component for ComponentTcpListener { // Now that we're outside the sync round, create the tcp client // component let pending = self.pending_component.take().unwrap(); - let socket_component: Box = Box::new(ComponentTcpClient::new_with_existing_connection( - pending.client, pending.cmd_rx, pending.data_tx - )); - component::special_create_component( + + let arguments = ValueGroup::new_stack(vec![ + Value::SInt32(pending.client), + Value::Input(port_id_to_eval(pending.cmd_rx)), + Value::Output(port_id_to_eval(pending.data_tx)), + ]); + component::default_start_create_component( &mut self.exec_state, sched_ctx, comp_ctx, &mut self.control, - &mut self.inbox_main, &mut self.inbox_backup, socket_component, - vec![pending.cmd_rx, pending.data_tx] + &mut self.inbox_main, &mut self.inbox_backup, + self.tcp_client_definition.0, self.tcp_client_definition.1, + arguments ); - self.sync_state = ListenerSyncState::AwaitingCmd; // superfluous, see ListenerSyncState.take() + self.sync_state = ListenerSyncState::AwaitingCmd; }, ListenerSyncState::FinishSyncThenQuit => { self.exec_state.set_as_start_exit(ExitReason::Termination); @@ -596,10 +619,9 @@ impl Component for ComponentTcpListener { ListenerSyncState::AcceptCommandReceived => { let socket = self.socket_state.get_socket(); match socket.accept() { - Ok(client) => { + Ok(client_handle) => { // Create the channels (and the inbox entries, to stay consistent // with the expectations from the `component` module's functions) - let client = client.unwrap(); let cmd_channel = comp_ctx.create_channel(); let data_channel = comp_ctx.create_channel(); @@ -635,7 +657,7 @@ impl Component for ComponentTcpListener { self.sync_state = ListenerSyncState::AcceptChannelGenerated; debug_assert!(self.pending_component.is_none()); self.pending_component = Some(PendingComponent{ - client, + client: client_handle, cmd_rx: cmd_channel.getter_id, data_tx: data_channel.putter_id }); @@ -701,6 +723,7 @@ impl ComponentTcpListener { inbox_backup: InboxBackup::new(), pdl_input_port_id: input_port, pdl_output_port_id: output_port, + tcp_client_definition: (ProcedureDefinitionId::new_invalid(), TypeId::new_invalid()), input_union_accept_tag: -1, input_union_shutdown_tag: -1, output_struct_tx_index: 0, diff --git a/src/runtime2/poll/mod.rs b/src/runtime2/poll/mod.rs index 86225ca65fe06aead14e60095c4592e216bf3638..097bfb83d707e17cf517c655598a7c25c546a86d 100644 --- a/src/runtime2/poll/mod.rs +++ b/src/runtime2/poll/mod.rs @@ -260,7 +260,7 @@ impl PollingThread { } fn log(&self, message: &str) { - if self.log_level >= LogLevel::Info { + if LogLevel::Info >= self.log_level { println!("[polling] {}", message); } } diff --git a/src/runtime2/stdlib/internet.rs b/src/runtime2/stdlib/internet.rs index f1e52354ea015e4d79ed598efb5178670b2a088d..d32e849a1b76464ae208cb9e11c18789b56e5ea9 100644 --- a/src/runtime2/stdlib/internet.rs +++ b/src/runtime2/stdlib/internet.rs @@ -45,13 +45,14 @@ impl SocketTcpClient { return Err(SocketError::Modifying); } + println!(" CREATE [{:04}] client", socket_handle); return Ok(SocketTcpClient{ socket_handle, is_blocking: SOCKET_BLOCKING, }) } - fn new_from_handle(socket_handle: libc::c_int) -> Result { + pub(crate) fn new_from_handle(socket_handle: libc::c_int) -> Result { if !set_socket_blocking(socket_handle, SOCKET_BLOCKING) { unsafe{ libc::close(socket_handle); } return Err(SocketError::Modifying); @@ -92,6 +93,7 @@ impl SocketTcpClient { impl Drop for SocketTcpClient { fn drop(&mut self) { + println!("DESTRUCT [{:04}] client", self.socket_handle); debug_assert!(self.socket_handle >= 0); unsafe{ close(self.socket_handle) }; } @@ -129,13 +131,15 @@ impl SocketTcpListener { } } + + println!(" CREATE [{:04}] listener", socket_handle); return Ok(SocketTcpListener{ socket_handle, is_blocking: SOCKET_BLOCKING, }); } - pub fn accept(&self) -> Result, IoError> { + pub fn accept(&self) -> Result { let (mut address, mut address_size) = create_sockaddr_in_empty(); let address_pointer = &mut address as *mut sockaddr_in; let socket_handle = unsafe { accept(self.socket_handle, address_pointer.cast(), &mut address_size) }; @@ -143,12 +147,14 @@ impl SocketTcpListener { return Err(IoError::last_os_error()); } - return Ok(SocketTcpClient::new_from_handle(socket_handle)); + println!(" CREATE [{:04}] client (from listener)", socket_handle); + return Ok(socket_handle); } } impl Drop for SocketTcpListener { fn drop(&mut self) { + println!("DESTRUCT [{:04}] listener", self.socket_handle); debug_assert!(self.socket_handle >= 0); unsafe{ close(self.socket_handle) }; } diff --git a/src/runtime2/tests/internet.rs b/src/runtime2/tests/internet.rs index 4859ca49ef963f54db2e9dbf0140f0707c4fe145..6cb03b954005b551198ecc3add8df9f94a6e9786 100644 --- a/src/runtime2/tests/internet.rs +++ b/src/runtime2/tests/internet.rs @@ -79,10 +79,10 @@ fn test_tcp_listener_and_client() { import std.internet::*; func listen_port() -> u16 { - return 2393; + return 2392; } - comp server(u32 num_connections) { + comp server(u32 num_connections, in<()> shutdown) { // Start tcp listener channel listen_cmd_tx -> listen_cmd_rx; channel listen_conn_tx -> listen_conn_rx; @@ -113,6 +113,8 @@ fn test_tcp_listener_and_client() { // Shut down the listener print(\"server: shutting down listener\"); + sync auto v = get(shutdown); + sync put(listen_cmd_tx, ListenerCmd::Shutdown); } // Waits for a single TCP byte (to simplify potentially having to @@ -137,7 +139,7 @@ fn test_tcp_listener_and_client() { sync put(conn.tx, ClientCmd::Shutdown); } - comp echo_requester(u8 byte_to_send) { + comp echo_requester(u8 byte_to_send, out<()> done) { channel cmd_tx -> cmd_rx; channel data_tx -> data_rx; new tcp_client({127, 0, 0, 1}, listen_port(), cmd_rx, data_tx); @@ -163,16 +165,32 @@ fn test_tcp_listener_and_client() { // Shut down the TCP connection print(\"requester: shutting down TCP component\"); sync put(cmd_tx, ClientCmd::Shutdown); + sync put(done, ()); } comp constructor() { auto num_connections = 1; - new server(num_connections); + channel shutdown_listener_tx -> shutdown_listener_rx; + new server(num_connections, shutdown_listener_rx); auto connection_index = 0; + auto all_done = {}; while (connection_index < num_connections) { - new echo_requester(cast(connection_index)); + channel done_tx -> done_rx; + new echo_requester(cast(connection_index), done_tx); + connection_index += 1; + all_done @= {done_rx}; } + + auto counter = 0; + while (counter < length(all_done)) { + print(\"constructor: waiting for requester to exit\"); + sync auto v = get(all_done[counter]); + counter += 1; + } + + print(\"constructor: instructing listener to exit\"); + sync put(shutdown_listener_tx, ()); } ", "constructor", no_args()); } \ No newline at end of file diff --git a/src/runtime2/tests/mod.rs b/src/runtime2/tests/mod.rs index 493bb3c46ee383e5ce2fb327a85bf7eec90af413..b3988d25db7fc32e3077a29ad6b0d4589e2ade98 100644 --- a/src/runtime2/tests/mod.rs +++ b/src/runtime2/tests/mod.rs @@ -14,7 +14,7 @@ const NUM_THREADS: u32 = 1; pub(crate) fn compile_and_create_component(source: &str, routine_name: &str, args: ValueGroup) { let protocol = ProtocolDescription::parse(source.as_bytes()) .expect("successful compilation"); - let runtime = Runtime::new(NUM_THREADS, LOG_LEVEL, protocol) + let runtime = Runtime::new(NUM_THREADS, LogLevel::None, protocol) .expect("successful runtime startup"); create_component(&runtime, "", routine_name, args); }