diff --git a/src/runtime2/poll/mod.rs b/src/runtime2/poll/mod.rs index e9240695432169e620a27f5b5a3f36e0efbbf397..e6589f21270927a7bf73f25f0360dfac2995632d 100644 --- a/src/runtime2/poll/mod.rs +++ b/src/runtime2/poll/mod.rs @@ -1,6 +1,14 @@ use libc::{self, c_int}; -use std::{io, ptr, time}; +use std::{io, ptr, time, thread}; +use std::sync::Arc; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::collections::HashMap; + +use crate::runtime2::RtError; +use crate::runtime2::runtime::{CompHandle, RuntimeInner}; +use crate::runtime2::store::queue_mpsc::*; + pub(crate) type FileDescriptor = c_int; @@ -8,40 +16,27 @@ pub(crate) trait AsFileDescriptor { fn as_file_descriptor(&self) -> FileDescriptor; } -pub(crate) struct UserData(u64); -#[inline] -pub(crate) fn register_polling( - poller: &Poller, entity: F, user: UserData, read: bool, write: bool -) -> io::Result<()> { - let file_descriptor = entity.as_file_descriptor(); - return poller.register(file_descriptor, user, read, write); -} +#[derive(Copy, Clone)] +pub(crate) struct UserData(u64); -#[inline] -pub(crate) fn unregister_polling( - poller: &Poller, entity: F -) -> io::Result<()> { - let file_descriptor = entity.as_file_descriptor(); - return poller.unregister(file_descriptor); -} +// ----------------------------------------------------------------------------- +// Poller +// ----------------------------------------------------------------------------- #[cfg(unix)] pub(crate) struct Poller { handle: c_int, - events: Vec } // All of this is gleaned from the `mio` crate. #[cfg(unix)] impl Poller { - pub fn new(event_capacity: usize) -> io::Result { - assert!(event_capacity < i32::MAX as usize); // because of argument to `epoll_wait`. + pub fn new() -> io::Result { let handle = syscall_result(unsafe{ libc::epoll_create1(libc::EPOLL_CLOEXEC) })?; return Ok(Self{ handle, - events: Vec::with_capacity(event_capacity), }) } @@ -65,25 +60,32 @@ impl Poller { return Ok(()); } - pub fn wait(&mut self, timeout: time::Duration) -> io::Result<()> { + /// Performs `epoll_wait`, waiting for the provided timeout or until events + /// are reported. They are stored in the `events` variable (up to + /// `events.cap()` are reported, so ensure it is preallocated). + pub fn wait(&self, events: &mut Vec, timeout: time::Duration) -> io::Result<()> { // See `mio` for the reason. Works around a linux bug #[cfg(target_pointer_width = "32")] const MAX_TIMEOUT: u128 = 1789569; #[cfg(not(target_pointer_width = "32"))] const MAX_TIMEOUT: u128 = c_int::MAX as u128; - let mut timeout_millis = timeout.as_millis(); - if timeout_millis > MAX_TIMEOUT { - timeout_millis = -1; // effectively infinite - } + let timeout_millis = timeout.as_millis(); + let timeout_millis = if timeout_millis > MAX_TIMEOUT { + -1 // effectively infinite + } else { + timeout_millis as c_int + }; + debug_assert!(events.is_empty()); + debug_assert!(events.capacity() > 0 && events.capacity() < i32::MAX as usize); let num_events = syscall_result(unsafe{ - libc::epoll_wait(self.handle, self.events.as_mut(), self.events.capacity() as i32, timeout_millis) + libc::epoll_wait(self.handle, events.as_mut_ptr(), events.capacity() as i32, timeout_millis) })?; unsafe{ debug_assert!(num_events >= 0); - self.events.set_len(num_events as usize); + events.set_len(num_events as usize); } return Ok(()); @@ -102,6 +104,13 @@ impl Poller { } } +#[cfg(unix)] +impl Drop for Poller { + fn drop(&mut self) { + unsafe{ libc::close(self.handle); } + } +} + #[inline] fn syscall_result(result: c_int) -> io::Result { if result < 0 { @@ -114,4 +123,203 @@ fn syscall_result(result: c_int) -> io::Result { #[cfg(not(unix))] struct Poller { +} + +// ----------------------------------------------------------------------------- +// Polling Thread +// ----------------------------------------------------------------------------- + +enum PollCmd { + Register(CompHandle, UserData), + Unregister(FileDescriptor, UserData), + Shutdown, +} + +/// Represents the data needed to build interfaces to the polling thread (which +/// should happen first) and to create the polling thread itself. +pub(crate) struct PollingThreadBuilder { + poller: Arc, + generation_counter: Arc, + queue: QueueDynMpsc, + runtime: Arc, + logging_enabled: bool, +} + +impl PollingThreadBuilder { + pub(crate) fn new(runtime: Arc, logging_enabled: bool) -> Result { + let poller = Poller::new() + .map_err(|e| rt_error!("failed to create poller, because: {}", e))?; + + return Ok(PollingThreadBuilder { + poller: Arc::new(poller), + generation_counter: Arc::new(AtomicU32::new(0)), + queue: QueueDynMpsc::new(64), + runtime, + logging_enabled, + }) + } + + pub(crate) fn client(&self) -> PollingClient { + return PollingClient{ + poller: self.poller.clone(), + generation_counter: self.generation_counter.clone(), + queue: self.queue.producer(), + } + } + + pub(crate) fn into_thread(self) -> (PollingThread, PollingThreadDestroyer) { + let destroyer = self.queue.producer(); + + return ( + PollingThread{ + poller: self.poller, + runtime: self.runtime, + queue: self.queue, + logging_enabled: self.logging_enabled, + }, + PollingThreadDestroyer::new(destroyer) + ); + } +} + +pub(crate) struct PollingThread { + poller: Arc, + runtime: Arc, + queue: QueueDynMpsc, + logging_enabled: bool, +} + +impl PollingThread { + pub(crate) fn run(&mut self) { + use crate::runtime2::scheduler::SchedulerCtx; + use crate::runtime2::communication::Message; + + const NUM_EVENTS: usize = 256; + const EPOLL_DURATION: time::Duration = time::Duration::from_millis(250); + + // @performance: Lot of improvements possible here, a HashMap is likely + // a horrible way to do this. + let mut events = Vec::with_capacity(NUM_EVENTS); + let mut lookup = HashMap::with_capacity(64); + self.log("Starting polling thread"); + + loop { + // Retrieve events first (because the PollingClient will first + // register at epoll, and then push a command into the queue). + self.poller.wait(&mut events, EPOLL_DURATION).unwrap(); + + // Then handle everything in the command queue. + while let Some(command) = self.queue.pop() { + match command { + PollCmd::Register(handle, user_data) => { + self.log(&format!("Registering component {:?} as {}", handle.id(), user_data.0)); + let key = Self::user_data_as_key(user_data); + debug_assert!(!lookup.contains_key(&key)); + lookup.insert(key, handle); + }, + PollCmd::Unregister(_file_descriptor, user_data) => { + let key = Self::user_data_as_key(user_data); + debug_assert!(lookup.contains_key(&key)); + let mut handle = lookup.remove(&key).unwrap(); + self.log(&format!("Unregistering component {:?} as {}", handle.id(), user_data.0)); + if let Some(key) = handle.decrement_users() { + self.runtime.destroy_component(key); + } + }, + PollCmd::Shutdown => { + // The contract is that all scheduler threads shutdown + // before the polling thread. This happens when all + // components are removed. + self.log("Received shutdown signal"); + debug_assert!(lookup.is_empty()); + return; + } + } + } + + // Now process all of the events. Because we might have had a + // `Register` command followed by an `Unregister` command (e.g. a + // component has died), we might get events that are not associated + // with an entry in the lookup. + for event in events.drain(..) { + let key = event.u64; + if let Some(handle) = lookup.get(&key) { + self.log(&format!("Sending poll to {:?} (event: {:x})", handle.id(), event.events)); + handle.send_message(&self.runtime, Message::Poll, true); + } + } + } + } + + #[inline] + fn user_data_as_key(data: UserData) -> u64 { + return data.0; + } + + fn log(&self, message: &str) { + if self.logging_enabled { + println!("[polling] {}", message); + } + } +} + +// bit convoluted, but it works +pub(crate) struct PollingThreadDestroyer { + queue: Option>, +} + +impl PollingThreadDestroyer { + fn new(queue: QueueDynProducer) -> Self { + return Self{ queue: Some(queue) }; + } + + pub(crate) fn initiate_destruction(&mut self) { + self.queue.take().unwrap().push(PollCmd::Shutdown); + } +} + +impl Drop for PollingThreadDestroyer { + fn drop(&mut self) { + debug_assert!(self.queue.is_none()); + } +} + +pub(crate) struct PollTicket(FileDescriptor, u64); + +/// A structure that allows the owner to register components at the polling +/// thread. Because of assumptions in the communication queue all of these +/// clients should be dropped before stopping the polling thread. +pub(crate) struct PollingClient { + poller: Arc, + generation_counter: Arc, + queue: QueueDynProducer, +} + +impl PollingClient { + fn register(&self, entity: F, handle: CompHandle, read: bool, write: bool) -> Result { + let generation = self.generation_counter.fetch_add(1, Ordering::Relaxed); + let user_data = user_data_for_component(handle.id().0, generation); + self.queue.push(PollCmd::Register(handle, user_data)); + + let file_descriptor = entity.as_file_descriptor(); + self.poller.register(file_descriptor, user_data, read, write) + .map_err(|e| rt_error!("failed to register for polling, because: {}", e))?; + + return Ok(PollTicket(file_descriptor, user_data.0)); + } + + fn unregister(&self, ticket: PollTicket) -> Result<(), RtError> { + let file_descriptor = ticket.0; + let user_data = UserData(ticket.1); + self.queue.push(PollCmd::Unregister(file_descriptor, user_data)); + self.poller.unregister(file_descriptor) + .map_err(|e| rt_error!("failed to unregister polling, because: {}", e))?; + + return Ok(()); + } +} + +#[inline] +fn user_data_for_component(component_id: u32, generation: u32) -> UserData { + return UserData((generation as u64) << 32 | (component_id as u64)); } \ No newline at end of file