diff --git a/src/runtime2/poll/mod.rs b/src/runtime2/poll/mod.rs new file mode 100644 index 0000000000000000000000000000000000000000..75cf8e9a90c00b6f9d785db8c8ce75221c69f0a3 --- /dev/null +++ b/src/runtime2/poll/mod.rs @@ -0,0 +1,324 @@ +use libc::{self, c_int}; + +use std::{io, ptr, time, thread}; +use std::sync::Arc; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::collections::HashMap; + +use crate::runtime2::RtError; +use crate::runtime2::runtime::{CompHandle, RuntimeInner}; +use crate::runtime2::store::queue_mpsc::*; + + +pub(crate) type FileDescriptor = c_int; + +pub(crate) trait AsFileDescriptor { + fn as_file_descriptor(&self) -> FileDescriptor; + +} + +#[derive(Copy, Clone)] +pub(crate) struct UserData(u64); + +// ----------------------------------------------------------------------------- +// Poller +// ----------------------------------------------------------------------------- + +#[cfg(unix)] +pub(crate) struct Poller { + handle: c_int, +} + +// All of this is gleaned from the `mio` crate. +#[cfg(unix)] +impl Poller { + pub fn new() -> io::Result { + let handle = syscall_result(unsafe{ libc::epoll_create1(libc::EPOLL_CLOEXEC) })?; + + return Ok(Self{ + handle, + }) + } + + fn register(&self, fd: FileDescriptor, user: UserData, read: bool, write: bool) -> io::Result<()> { + let mut event = libc::epoll_event{ + events: Self::events_from_rw_flags(read, write), + u64: user.0, + }; + syscall_result(unsafe{ + libc::epoll_ctl(self.handle, libc::EPOLL_CTL_ADD, fd, &mut event) + })?; + + return Ok(()); + } + + fn unregister(&self, fd: FileDescriptor) -> io::Result<()> { + syscall_result(unsafe{ + libc::epoll_ctl(self.handle, libc::EPOLL_CTL_DEL, fd, ptr::null_mut()) + })?; + + return Ok(()); + } + + /// Performs `epoll_wait`, waiting for the provided timeout or until events + /// are reported. They are stored in the `events` variable (up to + /// `events.cap()` are reported, so ensure it is preallocated). + pub fn wait(&self, events: &mut Vec, timeout: time::Duration) -> io::Result<()> { + // See `mio` for the reason. Works around a linux bug + #[cfg(target_pointer_width = "32")] + const MAX_TIMEOUT: u128 = 1789569; + #[cfg(not(target_pointer_width = "32"))] + const MAX_TIMEOUT: u128 = c_int::MAX as u128; + + let timeout_millis = timeout.as_millis(); + let timeout_millis = if timeout_millis > MAX_TIMEOUT { + -1 // effectively infinite + } else { + timeout_millis as c_int + }; + + debug_assert!(events.is_empty()); + debug_assert!(events.capacity() > 0 && events.capacity() < i32::MAX as usize); + let num_events = syscall_result(unsafe{ + libc::epoll_wait(self.handle, events.as_mut_ptr(), events.capacity() as i32, timeout_millis) + })?; + + unsafe{ + debug_assert!(num_events >= 0); + events.set_len(num_events as usize); + } + + return Ok(()); + } + + fn events_from_rw_flags(read: bool, write: bool) -> u32 { + let mut events = libc::EPOLLET; + if read { + events |= libc::EPOLLIN | libc::EPOLLRDHUP; + } + if write { + events |= libc::EPOLLOUT; + } + + return events as u32; + } +} + +#[cfg(unix)] +impl Drop for Poller { + fn drop(&mut self) { + unsafe{ libc::close(self.handle); } + } +} + +#[inline] +fn syscall_result(result: c_int) -> io::Result { + if result < 0 { + return Err(io::Error::last_os_error()); + } else { + return Ok(result); + } +} + +#[cfg(not(unix))] +struct Poller { + // Not implemented for OS's other than unix +} + +// ----------------------------------------------------------------------------- +// Polling Thread +// ----------------------------------------------------------------------------- + +enum PollCmd { + Register(CompHandle, UserData), + Unregister(FileDescriptor, UserData), + Shutdown, +} + +pub struct PollingThread { + poller: Arc, + runtime: Arc, + queue: QueueDynMpsc, + logging_enabled: bool, +} + +impl PollingThread { + pub(crate) fn new(runtime: Arc, logging_enabled: bool) -> Result<(PollingThreadHandle, PollingClientFactory), RtError> { + let poller = Poller::new() + .map_err(|e| rt_error!("failed to create poller, because: {}", e))?; + let poller = Arc::new(poller); + let queue = QueueDynMpsc::new(64); + let queue_producers = queue.producer_factory(); + + let mut thread_data = PollingThread{ + poller: poller.clone(), + runtime: runtime.clone(), + queue, + logging_enabled, + }; + let thread_handle = thread::spawn(move || { thread_data.run() }); + + let thread_handle = PollingThreadHandle{ + queue: Some(queue_producers.producer()), + handle: Some(thread_handle), + }; + let client_factory = PollingClientFactory{ + poller, + generation_counter: Arc::new(AtomicU32::new(0)), + queue_factory: queue_producers, + }; + + return Ok((thread_handle, client_factory)); + } + + pub(crate) fn run(&mut self) { + use crate::runtime2::scheduler::SchedulerCtx; + use crate::runtime2::communication::Message; + + const NUM_EVENTS: usize = 256; + const EPOLL_DURATION: time::Duration = time::Duration::from_millis(250); + + // @performance: Lot of improvements possible here, a HashMap is likely + // a horrible way to do this. + let mut events = Vec::with_capacity(NUM_EVENTS); + let mut lookup = HashMap::with_capacity(64); + self.log("Starting polling thread"); + + loop { + // Retrieve events first (because the PollingClient will first + // register at epoll, and then push a command into the queue). + self.poller.wait(&mut events, EPOLL_DURATION).unwrap(); + + // Then handle everything in the command queue. + while let Some(command) = self.queue.pop() { + match command { + PollCmd::Register(handle, user_data) => { + self.log(&format!("Registering component {:?} as {}", handle.id(), user_data.0)); + let key = Self::user_data_as_key(user_data); + debug_assert!(!lookup.contains_key(&key)); + lookup.insert(key, handle); + }, + PollCmd::Unregister(_file_descriptor, user_data) => { + let key = Self::user_data_as_key(user_data); + debug_assert!(lookup.contains_key(&key)); + let mut handle = lookup.remove(&key).unwrap(); + self.log(&format!("Unregistering component {:?} as {}", handle.id(), user_data.0)); + if let Some(key) = handle.decrement_users() { + self.runtime.destroy_component(key); + } + }, + PollCmd::Shutdown => { + // The contract is that all scheduler threads shutdown + // before the polling thread. This happens when all + // components are removed. + self.log("Received shutdown signal"); + debug_assert!(lookup.is_empty()); + return; + } + } + } + + // Now process all of the events. Because we might have had a + // `Register` command followed by an `Unregister` command (e.g. a + // component has died), we might get events that are not associated + // with an entry in the lookup. + for event in events.drain(..) { + let key = event.u64; + if let Some(handle) = lookup.get(&key) { + let events = event.events; + self.log(&format!("Sending poll to {:?} (event: {:x})", handle.id(), events)); + handle.send_message(&self.runtime, Message::Poll, true); + } + } + } + } + + #[inline] + fn user_data_as_key(data: UserData) -> u64 { + return data.0; + } + + fn log(&self, message: &str) { + if self.logging_enabled { + println!("[polling] {}", message); + } + } +} + +// bit convoluted, but it works +pub(crate) struct PollingThreadHandle { + // requires Option, because: + queue: Option>, // destructor needs to be called + handle: Option>, // we need to call `join` +} + +impl PollingThreadHandle { + pub(crate) fn shutdown(&mut self) -> thread::Result<()> { + debug_assert!(self.handle.is_some(), "polling thread already destroyed"); + self.queue.take().unwrap().push(PollCmd::Shutdown); + return self.handle.take().unwrap().join(); + } +} + +impl Drop for PollingThreadHandle { + fn drop(&mut self) { + debug_assert!(self.queue.is_none() && self.handle.is_none()); + } +} + +// oh my god, now I'm writing factory objects. I'm not feeling too well +pub(crate) struct PollingClientFactory { + poller: Arc, + generation_counter: Arc, + queue_factory: QueueDynProducerFactory, +} + +impl PollingClientFactory { + pub(crate) fn client(&self) -> PollingClient { + return PollingClient{ + poller: self.poller.clone(), + generation_counter: self.generation_counter.clone(), + queue: self.queue_factory.producer(), + }; + } +} + +pub(crate) struct PollTicket(FileDescriptor, u64); + +/// A structure that allows the owner to register components at the polling +/// thread. Because of assumptions in the communication queue all of these +/// clients should be dropped before stopping the polling thread. +pub(crate) struct PollingClient { + poller: Arc, + generation_counter: Arc, + queue: QueueDynProducer, +} + +impl PollingClient { + pub(crate) fn register(&self, entity: &F, handle: CompHandle, read: bool, write: bool) -> Result { + let generation = self.generation_counter.fetch_add(1, Ordering::Relaxed); + let user_data = user_data_for_component(handle.id().0, generation); + self.queue.push(PollCmd::Register(handle, user_data)); + + let file_descriptor = entity.as_file_descriptor(); + self.poller.register(file_descriptor, user_data, read, write) + .map_err(|e| rt_error!("failed to register for polling, because: {}", e))?; + + return Ok(PollTicket(file_descriptor, user_data.0)); + } + + pub(crate) fn unregister(&self, ticket: PollTicket) -> Result<(), RtError> { + let file_descriptor = ticket.0; + let user_data = UserData(ticket.1); + self.queue.push(PollCmd::Unregister(file_descriptor, user_data)); + self.poller.unregister(file_descriptor) + .map_err(|e| rt_error!("failed to unregister polling, because: {}", e))?; + + return Ok(()); + } +} + +#[inline] +fn user_data_for_component(component_id: u32, generation: u32) -> UserData { + return UserData((generation as u64) << 32 | (component_id as u64)); +} \ No newline at end of file