diff --git a/Cargo.toml b/Cargo.toml index 339f8f5edd7837d35b1115453a8590aac363d184..0111c5509eb738484982e085a3da446386f226d0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,8 +14,8 @@ internet=["libc"] [dependencies] -# ffi -libc = { version = "^0.2", optional = true } +libc = { version = "^0.2", optional = true } # raw sockets +mio = { version = "0.8", features = ["os-poll"] } # cross-platform IO notification queue # randomness rand = "0.8.4" diff --git a/src/runtime2/mod.rs b/src/runtime2/mod.rs index 784e46b7807f7126caea5f4594a7c0265924bdc1..4e213a77a4b315c88752ea843bfea22f4fecf53c 100644 --- a/src/runtime2/mod.rs +++ b/src/runtime2/mod.rs @@ -3,6 +3,7 @@ mod runtime; mod component; mod communication; mod scheduler; +mod poll; mod stdlib; #[cfg(test)] mod tests; diff --git a/src/runtime2/poll/mod.rs b/src/runtime2/poll/mod.rs new file mode 100644 index 0000000000000000000000000000000000000000..e9240695432169e620a27f5b5a3f36e0efbbf397 --- /dev/null +++ b/src/runtime2/poll/mod.rs @@ -0,0 +1,117 @@ +use libc::{self, c_int}; + +use std::{io, ptr, time}; + +pub(crate) type FileDescriptor = c_int; + +pub(crate) trait AsFileDescriptor { + fn as_file_descriptor(&self) -> FileDescriptor; + +} +pub(crate) struct UserData(u64); + +#[inline] +pub(crate) fn register_polling( + poller: &Poller, entity: F, user: UserData, read: bool, write: bool +) -> io::Result<()> { + let file_descriptor = entity.as_file_descriptor(); + return poller.register(file_descriptor, user, read, write); +} + +#[inline] +pub(crate) fn unregister_polling( + poller: &Poller, entity: F +) -> io::Result<()> { + let file_descriptor = entity.as_file_descriptor(); + return poller.unregister(file_descriptor); +} + +#[cfg(unix)] +pub(crate) struct Poller { + handle: c_int, + events: Vec +} + +// All of this is gleaned from the `mio` crate. +#[cfg(unix)] +impl Poller { + pub fn new(event_capacity: usize) -> io::Result { + assert!(event_capacity < i32::MAX as usize); // because of argument to `epoll_wait`. + let handle = syscall_result(unsafe{ libc::epoll_create1(libc::EPOLL_CLOEXEC) })?; + + return Ok(Self{ + handle, + events: Vec::with_capacity(event_capacity), + }) + } + + fn register(&self, fd: FileDescriptor, user: UserData, read: bool, write: bool) -> io::Result<()> { + let mut event = libc::epoll_event{ + events: Self::events_from_rw_flags(read, write), + u64: user.0, + }; + syscall_result(unsafe{ + libc::epoll_ctl(self.handle, libc::EPOLL_CTL_ADD, fd, &mut event) + })?; + + return Ok(()); + } + + fn unregister(&self, fd: FileDescriptor) -> io::Result<()> { + syscall_result(unsafe{ + libc::epoll_ctl(self.handle, libc::EPOLL_CTL_DEL, fd, ptr::null_mut()) + })?; + + return Ok(()); + } + + pub fn wait(&mut self, timeout: time::Duration) -> io::Result<()> { + // See `mio` for the reason. Works around a linux bug + #[cfg(target_pointer_width = "32")] + const MAX_TIMEOUT: u128 = 1789569; + #[cfg(not(target_pointer_width = "32"))] + const MAX_TIMEOUT: u128 = c_int::MAX as u128; + + let mut timeout_millis = timeout.as_millis(); + if timeout_millis > MAX_TIMEOUT { + timeout_millis = -1; // effectively infinite + } + + let num_events = syscall_result(unsafe{ + libc::epoll_wait(self.handle, self.events.as_mut(), self.events.capacity() as i32, timeout_millis) + })?; + + unsafe{ + debug_assert!(num_events >= 0); + self.events.set_len(num_events as usize); + } + + return Ok(()); + } + + fn events_from_rw_flags(read: bool, write: bool) -> u32 { + let mut events = libc::EPOLLET; + if read { + events |= libc::EPOLLIN | libc::EPOLLRDHUP; + } + if write { + events |= libc::EPOLLOUT; + } + + return events as u32; + } +} + +#[inline] +fn syscall_result(result: c_int) -> io::Result { + if result < 0 { + return Err(io::Error::last_os_error()); + } else { + return Ok(result); + } +} + +#[cfg(not(unix))] +struct Poller { + +} \ No newline at end of file diff --git a/src/runtime2/stdlib/internet.rs b/src/runtime2/stdlib/internet.rs index 0013332ee7c5c7b50eae94a319637d642214ad80..3071b975015dea74e0ec67b71b91528cab76089b 100644 --- a/src/runtime2/stdlib/internet.rs +++ b/src/runtime2/stdlib/internet.rs @@ -6,6 +6,7 @@ use libc::{ sockaddr_in, sockaddr_in6, in_addr, in6_addr, socket, bind, listen, accept, connect, close, }; +use mio::{event, Interest, Registry, Token}; #[derive(Debug)] pub enum SocketError { @@ -193,7 +194,31 @@ impl Drop for SocketRawRx { } } +// The following is essentially stolen from `mio`'s io_source.rs file. +#[cfg(unix)] +trait AsRawFileDescriptor { + fn as_raw_file_descriptor(&self) -> c_int; +} + +impl AsRawFileDescriptor for SocketTcpClient { + fn as_raw_file_descriptor(&self) -> c_int { + return self.socket_handle; + } +} + +impl event::Source for T { + fn register(&mut self, registry: &Registry, token: Token, interests: Interest) -> std::io::Result<()> { + registry.selector().register() + } + + fn reregister(&mut self, registry: &Registry, token: Token, interests: Interest) -> std::io::Result<()> { + todo!() + } + fn deregister(&mut self, registry: &Registry) -> std::io::Result<()> { + todo!() + } +} /// Performs the `socket` and `bind` calls. fn create_and_bind_socket(socket_type: libc::c_int, protocol: libc::c_int, ip: IpAddr, port: u16) -> Result {