diff --git a/src/runtime2/component/component_internet.rs b/src/runtime2/component/component_internet.rs index 4d508f2545e458c5747fd9fdd2eed7fe9d269b92..8d2f67a13e88cf98e39c0edd0eddb4eb2043896c 100644 --- a/src/runtime2/component/component_internet.rs +++ b/src/runtime2/component/component_internet.rs @@ -24,11 +24,15 @@ impl SocketState { } } +// ----------------------------------------------------------------------------- +// ComponentTcpClient +// ----------------------------------------------------------------------------- + /// States from the point of view of the component that is connecting to this /// TCP component (i.e. from the point of view of attempting to interface with /// a socket). #[derive(PartialEq, Debug)] -enum SyncState { +enum ClientSyncState { AwaitingCmd, Getting, Putting, @@ -39,7 +43,7 @@ enum SyncState { pub struct ComponentTcpClient { // Properties for the tcp socket socket_state: SocketState, - sync_state: SyncState, + sync_state: ClientSyncState, poll_ticket: Option, inbox_main: InboxMain, inbox_backup: InboxBackup, @@ -137,12 +141,12 @@ impl Component for ComponentTcpClient { // When in non-sync mode match &mut self.socket_state { SocketState::Connected(_socket) => { - if self.sync_state == SyncState::FinishSyncThenQuit { + if self.sync_state == ClientSyncState::FinishSyncThenQuit { // Previous request was to let the component shut down self.exec_state.set_as_start_exit(ExitReason::Termination); } else { // Reset for a new request - self.sync_state = SyncState::AwaitingCmd; + self.sync_state = ClientSyncState::AwaitingCmd; component::default_handle_sync_start( &mut self.exec_state, &mut self.inbox_main, sched_ctx, comp_ctx, &mut self.consensus ); @@ -160,7 +164,7 @@ impl Component for ComponentTcpClient { CompMode::Sync => { // When in sync mode: wait for a command to come in match self.sync_state { - SyncState::AwaitingCmd => { + ClientSyncState::AwaitingCmd => { match component::default_attempt_get( &mut self.exec_state, self.pdl_input_port_id, PortInstruction::NoSource, &mut self.inbox_main, &mut self.inbox_backup, sched_ctx, comp_ctx, @@ -180,16 +184,16 @@ impl Component for ComponentTcpClient { self.byte_buffer.push(value.as_uint8()); } - self.sync_state = SyncState::Putting; + self.sync_state = ClientSyncState::Putting; } else if tag_value == self.input_union_receive_tag_value { // Component requires a `recv` - self.sync_state = SyncState::Getting; + self.sync_state = ClientSyncState::Getting; } else if tag_value == self.input_union_finish_tag_value { // Component requires us to end the sync round - self.sync_state = SyncState::FinishSync; + self.sync_state = ClientSyncState::FinishSync; } else if tag_value == self.input_union_shutdown_tag_value { // Component wants to close the connection - self.sync_state = SyncState::FinishSyncThenQuit; + self.sync_state = ClientSyncState::FinishSyncThenQuit; } else { unreachable!("got tag_value {}", tag_value) } @@ -205,7 +209,7 @@ impl Component for ComponentTcpClient { } } }, - SyncState::Putting => { + ClientSyncState::Putting => { // We're supposed to send a user-supplied message fully // over the socket. But we might end up blocking. In // that case the component goes to sleep until it is @@ -231,7 +235,7 @@ impl Component for ComponentTcpClient { component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus); return CompScheduling::Requeue; }, - SyncState::Getting => { + ClientSyncState::Getting => { // We're going to try and receive a single message. If // this causes us to end up blocking the component // goes to sleep until it is polled. @@ -253,7 +257,7 @@ impl Component for ComponentTcpClient { return CompScheduling::Immediate; } else { let scheduling = send_result.unwrap(); - self.sync_state = SyncState::AwaitingCmd; + self.sync_state = ClientSyncState::AwaitingCmd; return scheduling; } }, @@ -266,7 +270,7 @@ impl Component for ComponentTcpClient { } } }, - SyncState::FinishSync | SyncState::FinishSyncThenQuit => { + ClientSyncState::FinishSync | ClientSyncState::FinishSyncThenQuit => { component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus); return CompScheduling::Requeue; }, @@ -274,7 +278,7 @@ impl Component for ComponentTcpClient { }, CompMode::BlockedGet => { // Entered when awaiting a new command - debug_assert_eq!(self.sync_state, SyncState::AwaitingCmd); + debug_assert_eq!(self.sync_state, ClientSyncState::AwaitingCmd); return CompScheduling::Sleep; }, CompMode::SyncEnd | CompMode::BlockedPut => @@ -317,7 +321,7 @@ impl ComponentTcpClient { return Self{ socket_state: SocketState::Connected(socket.unwrap()), - sync_state: SyncState::AwaitingCmd, + sync_state: ClientSyncState::AwaitingCmd, poll_ticket: None, inbox_main: vec![None], inbox_backup: Vec::new(), @@ -379,4 +383,9 @@ impl ComponentTcpClient { return value_group; } -} \ No newline at end of file +} + +// ----------------------------------------------------------------------------- +// ComponentTcpListener +// ----------------------------------------------------------------------------- + diff --git a/src/runtime2/stdlib/internet.rs b/src/runtime2/stdlib/internet.rs index 19235bcc6ba9280a75aa6770ad44135da2f89485..88605e5d9808734a936ecfeb38b77f2e79f46ffa 100644 --- a/src/runtime2/stdlib/internet.rs +++ b/src/runtime2/stdlib/internet.rs @@ -26,7 +26,9 @@ enum SocketState { Listening, } -/// TCP connection +const SOCKET_BLOCKING: bool = false; + +/// TCP (client) connection pub struct SocketTcpClient { socket_handle: libc::c_int, is_blocking: bool, @@ -34,19 +36,30 @@ pub struct SocketTcpClient { impl SocketTcpClient { pub fn new(ip: IpAddr, port: u16) -> Result { - const BLOCKING: bool = false; let socket_handle = create_and_connect_socket( libc::SOCK_STREAM, libc::IPPROTO_TCP, ip, port )?; - if !set_socket_blocking(socket_handle, BLOCKING) { + if !set_socket_blocking(socket_handle, SOCKET_BLOCKING) { unsafe{ libc::close(socket_handle); } return Err(SocketError::Modifying); } return Ok(SocketTcpClient{ socket_handle, - is_blocking: BLOCKING, + is_blocking: SOCKET_BLOCKING, + }) + } + + fn new_from_handle(socket_handle: libc::c_int) -> Result { + if !set_socket_blocking(socket_handle, SOCKET_BLOCKING) { + unsafe{ libc::close(socket_handle); } + return Err(SocketError::Modifying); + } + + return Ok(SocketTcpClient{ + socket_handle, + is_blocking: SOCKET_BLOCKING, }) } @@ -90,6 +103,57 @@ impl AsFileDescriptor for SocketTcpClient { } } +/// TCP listener. Yielding new connections +pub struct SocketTcpListener { + socket_handle: libc::c_int, + is_blocking: bool, +} + +impl SocketTcpListener { + pub fn new(ip: IpAddr, port: u16) -> Result { + // Create and bind + let socket_handle = create_and_bind_socket( + libc::SOCK_STREAM, libc::IPPROTO_TCP, ip, port + )?; + if !set_socket_blocking(socket_handle, SOCKET_BLOCKING) { + unsafe{ libc::close(socket_handle); } + return Err(SocketError::Modifying); + } + + // Listen + unsafe { + let result = listen(socket_handle, libc::SOMAXCONN); + if result < 0 { + unsafe{ libc::close(socket_handle); } + return Err(SocketError::Listening); + } + } + + return Ok(SocketTcpListener{ + socket_handle, + is_blocking: SOCKET_BLOCKING, + }); + } + + pub fn accept(&self) -> Result, IoError> { + let (mut address, mut address_size) = create_sockaddr_in_empty(); + let address_pointer = &mut address as *mut sockaddr_in; + let socket_handle = unsafe { accept(self.socket_handle, address_pointer.cast(), &mut address_size) }; + if socket_handle < 0 { + return Err(IoError::last_os_error()); + } + + return Ok(SocketTcpClient::new_from_handle(socket_handle)); + } +} + +impl Drop for SocketTcpListener { + fn drop(&mut self) { + debug_assert!(self.socket_handle >= 0); + unsafe{ close(self.socket_handle) }; + } +} + /// Raw socket receiver. Essentially a listener that accepts a single connection struct SocketRawRx { listen_handle: c_int, @@ -235,6 +299,18 @@ fn create_and_connect_socket(socket_type: libc::c_int, protocol: libc::c_int, ip } } +#[inline] +fn create_sockaddr_in_empty() -> (sockaddr_in, libc::socklen_t) { + let socket_address = sockaddr_in{ + sin_family: 0, + sin_port: 0, + sin_addr: in_addr { s_addr: 0 }, + sin_zero: [0; 8], + }; + let address_size = size_of::(); + + return (socket_address, address_size as _); +} #[inline] fn create_sockaddr_in_v4(ip: Ipv4Addr, port: u16) -> (sockaddr_in, libc::socklen_t) { let address = unsafe{ @@ -316,4 +392,4 @@ fn socket_family_from_ip(ip: IpAddr) -> libc::c_int { #[inline] fn htons(port: u16) -> u16 { return port.to_be(); -} \ No newline at end of file +}