use libc::{self, c_int}; use std::{io, ptr, time, thread}; use std::sync::Arc; use std::sync::atomic::{AtomicU32, Ordering}; use std::collections::HashMap; use crate::runtime2::RtError; use crate::runtime2::runtime::{CompHandle, RuntimeInner, LogLevel}; use crate::runtime2::store::queue_mpsc::*; pub(crate) type FileDescriptor = c_int; pub(crate) trait AsFileDescriptor { fn as_file_descriptor(&self) -> FileDescriptor; } #[derive(Copy, Clone)] pub(crate) struct UserData(u64); // ----------------------------------------------------------------------------- // Poller // ----------------------------------------------------------------------------- #[cfg(unix)] pub(crate) struct Poller { handle: c_int, } // All of this is gleaned from the `mio` crate. #[cfg(unix)] impl Poller { pub fn new() -> io::Result { let handle = syscall_result(unsafe{ libc::epoll_create1(libc::EPOLL_CLOEXEC) })?; return Ok(Self{ handle, }) } fn register(&self, fd: FileDescriptor, user: UserData, read: bool, write: bool) -> io::Result<()> { let mut event = libc::epoll_event{ events: Self::events_from_rw_flags(read, write), u64: user.0, }; syscall_result(unsafe{ libc::epoll_ctl(self.handle, libc::EPOLL_CTL_ADD, fd, &mut event) })?; return Ok(()); } fn unregister(&self, fd: FileDescriptor) -> io::Result<()> { syscall_result(unsafe{ libc::epoll_ctl(self.handle, libc::EPOLL_CTL_DEL, fd, ptr::null_mut()) })?; return Ok(()); } /// Performs `epoll_wait`, waiting for the provided timeout or until events /// are reported. They are stored in the `events` variable (up to /// `events.cap()` are reported, so ensure it is preallocated). pub fn wait(&self, events: &mut Vec, timeout: time::Duration) -> io::Result<()> { // See `mio` for the reason. Works around a linux bug #[cfg(target_pointer_width = "32")] const MAX_TIMEOUT: u128 = 1789569; #[cfg(not(target_pointer_width = "32"))] const MAX_TIMEOUT: u128 = c_int::MAX as u128; let timeout_millis = timeout.as_millis(); let timeout_millis = if timeout_millis > MAX_TIMEOUT { -1 // effectively infinite } else { timeout_millis as c_int }; debug_assert!(events.is_empty()); debug_assert!(events.capacity() > 0 && events.capacity() < i32::MAX as usize); let num_events = syscall_result(unsafe{ libc::epoll_wait(self.handle, events.as_mut_ptr(), events.capacity() as i32, timeout_millis) })?; unsafe{ debug_assert!(num_events >= 0); events.set_len(num_events as usize); } return Ok(()); } fn events_from_rw_flags(read: bool, write: bool) -> u32 { let mut events = libc::EPOLLET; if read { events |= libc::EPOLLIN | libc::EPOLLRDHUP; } if write { events |= libc::EPOLLOUT; } return events as u32; } } #[cfg(unix)] impl Drop for Poller { fn drop(&mut self) { unsafe{ libc::close(self.handle); } } } #[inline] fn syscall_result(result: c_int) -> io::Result { if result < 0 { return Err(io::Error::last_os_error()); } else { return Ok(result); } } #[cfg(not(unix))] struct Poller { // Not implemented for OS's other than unix } // ----------------------------------------------------------------------------- // Polling Thread // ----------------------------------------------------------------------------- enum PollCmd { Register(CompHandle, UserData), Unregister(FileDescriptor, UserData), Shutdown, } pub struct PollingThread { poller: Arc, runtime: Arc, queue: QueueDynMpsc, log_level: LogLevel, } impl PollingThread { pub(crate) fn new(runtime: Arc, log_level: LogLevel) -> Result<(PollingThreadHandle, PollingClientFactory), RtError> { let poller = Poller::new() .map_err(|e| rt_error!("failed to create poller, because: {}", e))?; let poller = Arc::new(poller); let queue = QueueDynMpsc::new(64); let queue_producers = queue.producer_factory(); let mut thread_data = PollingThread{ poller: poller.clone(), runtime: runtime.clone(), queue, log_level, }; let thread_handle = thread::Builder::new() .name(String::from("poller")) .spawn(move || { thread_data.run() }) .map_err(|reason| rt_error!("failed to start polling thread, because: {}", reason) )?; let thread_handle = PollingThreadHandle{ queue: Some(queue_producers.producer()), handle: Some(thread_handle), }; let client_factory = PollingClientFactory{ poller, generation_counter: Arc::new(AtomicU32::new(0)), queue_factory: queue_producers, }; return Ok((thread_handle, client_factory)); } pub(crate) fn run(&mut self) { use crate::runtime2::communication::Message; const NUM_EVENTS: usize = 256; const EPOLL_DURATION: time::Duration = time::Duration::from_millis(250); // @performance: Lot of improvements possible here, a HashMap is likely // a horrible way to do this. let mut events = Vec::with_capacity(NUM_EVENTS); let mut lookup = HashMap::with_capacity(64); self.log("Starting polling thread"); loop { // Retrieve events first (because the PollingClient will first // register at epoll, and then push a command into the queue). self.poller.wait(&mut events, EPOLL_DURATION).unwrap(); // Then handle everything in the command queue. while let Some(command) = self.queue.pop() { match command { PollCmd::Register(handle, user_data) => { self.log(&format!("Registering component {:?} as {}", handle.id(), user_data.0)); let key = Self::user_data_as_key(user_data); debug_assert!(!lookup.contains_key(&key)); lookup.insert(key, handle); }, PollCmd::Unregister(_file_descriptor, user_data) => { let key = Self::user_data_as_key(user_data); debug_assert!(lookup.contains_key(&key)); let mut handle = lookup.remove(&key).unwrap(); self.log(&format!("Unregistering component {:?} as {}", handle.id(), user_data.0)); if let Some(key) = handle.decrement_users() { self.runtime.destroy_component(key); } }, PollCmd::Shutdown => { // The contract is that all scheduler threads shutdown // before the polling thread. This happens when all // components are removed. self.log("Received shutdown signal"); debug_assert!(lookup.is_empty()); return; } } } // Now process all of the events. Because we might have had a // `Register` command followed by an `Unregister` command (e.g. a // component has died), we might get events that are not associated // with an entry in the lookup. for event in events.drain(..) { let key = event.u64; if let Some(handle) = lookup.get(&key) { let events = event.events; self.log(&format!("Sending poll to {:?} (event: {:x})", handle.id(), events)); handle.send_message(&self.runtime, Message::Poll, true); } } } } #[inline] fn user_data_as_key(data: UserData) -> u64 { return data.0; } fn log(&self, message: &str) { if self.log_level >= LogLevel::Info { println!("[polling] {}", message); } } } // bit convoluted, but it works pub(crate) struct PollingThreadHandle { // requires Option, because: queue: Option>, // destructor needs to be called handle: Option>, // we need to call `join` } impl PollingThreadHandle { pub(crate) fn shutdown(&mut self) -> thread::Result<()> { debug_assert!(self.handle.is_some(), "polling thread already destroyed"); self.queue.take().unwrap().push(PollCmd::Shutdown); return self.handle.take().unwrap().join(); } } impl Drop for PollingThreadHandle { fn drop(&mut self) { debug_assert!(self.queue.is_none() && self.handle.is_none()); } } // oh my god, now I'm writing factory objects. I'm not feeling too well pub(crate) struct PollingClientFactory { poller: Arc, generation_counter: Arc, queue_factory: QueueDynProducerFactory, } impl PollingClientFactory { pub(crate) fn client(&self) -> PollingClient { return PollingClient{ poller: self.poller.clone(), generation_counter: self.generation_counter.clone(), queue: self.queue_factory.producer(), }; } } pub(crate) struct PollTicket(FileDescriptor, u64); /// A structure that allows the owner to register components at the polling /// thread. Because of assumptions in the communication queue all of these /// clients should be dropped before stopping the polling thread. pub(crate) struct PollingClient { poller: Arc, generation_counter: Arc, queue: QueueDynProducer, } impl PollingClient { pub(crate) fn register(&self, entity: &F, handle: CompHandle, read: bool, write: bool) -> Result { let generation = self.generation_counter.fetch_add(1, Ordering::Relaxed); let user_data = user_data_for_component(handle.id().0, generation); self.queue.push(PollCmd::Register(handle, user_data)); let file_descriptor = entity.as_file_descriptor(); self.poller.register(file_descriptor, user_data, read, write) .map_err(|e| rt_error!("failed to register for polling, because: {}", e))?; return Ok(PollTicket(file_descriptor, user_data.0)); } pub(crate) fn unregister(&self, ticket: PollTicket) -> Result<(), RtError> { let file_descriptor = ticket.0; let user_data = UserData(ticket.1); self.queue.push(PollCmd::Unregister(file_descriptor, user_data)); self.poller.unregister(file_descriptor) .map_err(|e| rt_error!("failed to unregister polling, because: {}", e))?; return Ok(()); } } #[inline] fn user_data_for_component(component_id: u32, generation: u32) -> UserData { return UserData((generation as u64) << 32 | (component_id as u64)); }