Changeset - 3a0142804667
[Not reviewed]
0 1 0
MH - 3 years ago 2022-04-01 17:11:42
contact@maxhenger.nl
WIP on nonblocking before implementing epoll
1 file changed with 94 insertions and 7 deletions:
0 comments (0 inline, 0 general)
src/runtime2/stdlib/internet.rs
Show inline comments
 
@@ -7,12 +7,13 @@ use libc::{
 
    socket, bind, listen, accept, connect, close,
 
};
 

	
 
#[derive(Debug)]
 
pub enum SocketError {
 
    Opening,
 
    Modifying,
 
    Binding,
 
    Listening,
 
    Connecting,
 
    Accepted,
 
    Accepting,
 
}
 
@@ -21,23 +22,31 @@ enum SocketState {
 
    Opened,
 
    Listening,
 
}
 

	
 
/// TCP connection
 
pub struct SocketTcpClient {
 
    socket_handle: libc::c_int
 
    socket_handle: libc::c_int,
 
    is_blocking: bool,
 
}
 

	
 
impl SocketTcpClient {
 
    pub fn new(ip: IpAddr, port: u16) -> Result<Self, SocketError> {
 
        const BLOCKING: bool = false;
 

	
 
        let socket_handle = create_and_connect_socket(
 
            libc::SOCK_STREAM, libc::IPPROTO_TCP, ip, port
 
        )?;
 
        if !set_socket_blocking(socket_handle, BLOCKING) {
 
            unsafe{ libc::close(socket_handle); }
 
            return Err(SocketError::Modifying);
 
        }
 

	
 
        return Ok(SocketTcpClient{
 
            socket_handle,
 
            is_blocking: BLOCKING,
 
        })
 
    }
 

	
 
    pub fn send(&self, message: &[u8]) -> Result<usize, ()> {
 
        let result = unsafe{
 
            let message_pointer = message.as_ptr().cast();
 
@@ -47,23 +56,65 @@ impl SocketTcpClient {
 
            return Err(())
 
        }
 

	
 
        return Ok(result as usize);
 
    }
 

	
 
    /// Receives data from the TCP socket. Returns the number of bytes received.
 
    /// More bytes may be present even thought `used < buffer.len()`.
 
    pub fn receive(&self, buffer: &mut [u8]) -> Result<usize, ()> {
 
        if self.is_blocking {
 
            return self.receive_blocking(buffer);
 
        } else {
 
            return self.receive_nonblocking(buffer);
 
        }
 
    }
 

	
 
    #[inline]
 
    fn receive_blocking(&self, buffer: &mut [u8]) -> Result<usize, ()> {
 
        let result = unsafe {
 
            let message_pointer = buffer.as_mut_ptr().cast();
 
            libc::recv(self.socket_handle, message_pointer, buffer.len() as libc::size_t, 0)
 
            libc::recv(self.socket_handle, message_pointer, buffer.len(), 0)
 
        };
 
        if result < 0 {
 
            return Err(())
 
            return Err(());
 
        }
 

	
 
        return Ok(result as usize);
 
    }
 

	
 
    #[inline]
 
    fn receive_nonblocking(&self, buffer: &mut [u8]) -> Result<usize, ()> {
 
        unsafe {
 
            let mut message_pointer = buffer.as_mut_ptr().cast();
 
            let mut remaining = buffer.len();
 

	
 
            loop {
 
                // Receive more data
 
                let result = libc::recv(self.socket_handle, message_pointer, remaining, 0);
 
                if result < 0 {
 
                    // Check reason
 
                    let errno = std::io::Error::last_os_error().raw_os_error().expect("os error after failed recv");
 
                    if errno == libc::EWOULDBLOCK || errno == libc::EAGAIN {
 
                        return Ok(buffer.len() - remaining);
 
                    } else {
 
                        return Err(());
 
                    }
 
                }
 

	
 
                // Modify pointer and remaining bytes
 
                let received = result as usize;
 
                message_pointer = message_pointer.add(received);
 
                remaining -= received;
 

	
 
                if remaining == 0 {
 
                    return Ok(buffer.len());
 
                }
 
            }
 
        }
 
    }
 
}
 

	
 
impl Drop for SocketTcpClient {
 
    fn drop(&mut self) {
 
        debug_assert!(self.socket_handle >= 0);
 
        unsafe{ close(self.socket_handle) };
 
@@ -245,12 +296,39 @@ fn create_sockaddr_in_v6(ip: Ipv6Addr, port: u16) -> (sockaddr_in6, libc::sockle
 
    };
 
    let address_size = size_of::<sockaddr_in6>();
 

	
 
    return (socket_address, address_size as _);
 
}
 

	
 
#[inline]
 
fn set_socket_blocking(handle: libc::c_int, blocking: bool) -> bool {
 
    if handle < 0 {
 
        return false;
 
    }
 

	
 
    unsafe{
 
        let mut flags = libc::fcntl(handle, libc::F_GETFL, 0);
 
        if flags < 0 {
 
            return false;
 
        }
 

	
 
        if blocking {
 
            flags &= !libc::O_NONBLOCK;
 
        } else {
 
            flags |= libc::O_NONBLOCK;
 
        }
 

	
 
        let result = libc::fcntl(handle, libc::F_SETFL, flags);
 
        if result < 0 {
 
            return false;
 
        }
 
    }
 

	
 
    return true;
 
}
 

	
 
#[inline]
 
fn socket_family_from_ip(ip: IpAddr) -> libc::c_int {
 
    return match ip {
 
        IpAddr::V4(_) => libc::AF_INET,
 
        IpAddr::V6(_) => libc::AF_INET6,
 
    };
 
@@ -264,14 +342,23 @@ fn htons(port: u16) -> u16 {
 
mod tests {
 
    use std::net::*;
 
    use super::*;
 

	
 
    #[test]
 
    fn test_inet_thingo() {
 
        const SIZE: usize = 1024;
 

	
 
        let s = SocketTcpClient::new(IpAddr::V4(Ipv4Addr::new(142, 250, 179, 163)), 80).expect("connect");
 
        s.send(b"GET / HTTP/1.1\r\n\r\n").expect("sending");
 
        let mut buffer = [0;65000];
 
        s.receive(&mut buffer).expect("receiving");
 
        let as_str = String::from_utf8_lossy(&buffer);
 
        println!("Yay! Got:\n{}", as_str);
 
        let mut total = Vec::<u8>::new();
 
        let mut buffer = [0; SIZE];
 
        let mut received = SIZE;
 

	
 
        while received > 0 {
 
            received = s.receive(&mut buffer).expect("receiving");
 
            println!("DEBUG: Received {} bytes", received);
 
            total.extend_from_slice(&buffer[..received]);
 
        }
 
        let as_str = String::from_utf8_lossy(total.as_slice());
 
        println!("Yay! Got {} bytes:\n{}", as_str.len(), as_str);
 
    }
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)