Files @ 4a6883c04294
Branch filter:

Location: CSY/reowolf/src/runtime2/poll/mod.rs

4a6883c04294 10.8 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
mh
Fix bug related to checking for closed port
use libc::{self, c_int};

use std::{io, ptr, time, thread};
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use std::collections::HashMap;

use crate::runtime2::RtError;
use crate::runtime2::runtime::{CompHandle, RuntimeInner};
use crate::runtime2::store::queue_mpsc::*;


pub(crate) type FileDescriptor = c_int;

pub(crate) trait AsFileDescriptor {
    fn as_file_descriptor(&self) -> FileDescriptor;

}

#[derive(Copy, Clone)]
pub(crate) struct UserData(u64);

// -----------------------------------------------------------------------------
// Poller
// -----------------------------------------------------------------------------

#[cfg(unix)]
pub(crate) struct Poller {
    handle: c_int,
}

// All of this is gleaned from the `mio` crate.
#[cfg(unix)]
impl Poller {
    pub fn new() -> io::Result<Self> {
        let handle = syscall_result(unsafe{ libc::epoll_create1(libc::EPOLL_CLOEXEC) })?;

        return Ok(Self{
            handle,
        })
    }

    fn register(&self, fd: FileDescriptor, user: UserData, read: bool, write: bool) -> io::Result<()> {
        let mut event = libc::epoll_event{
            events: Self::events_from_rw_flags(read, write),
            u64: user.0,
        };
        syscall_result(unsafe{
            libc::epoll_ctl(self.handle, libc::EPOLL_CTL_ADD, fd, &mut event)
        })?;

        return Ok(());
    }

    fn unregister(&self, fd: FileDescriptor) -> io::Result<()> {
        syscall_result(unsafe{
            libc::epoll_ctl(self.handle, libc::EPOLL_CTL_DEL, fd, ptr::null_mut())
        })?;

        return Ok(());
    }

    /// Performs `epoll_wait`, waiting for the provided timeout or until events
    /// are reported. They are stored in the `events` variable (up to
    /// `events.cap()` are reported, so ensure it is preallocated).
    pub fn wait(&self, events: &mut Vec<libc::epoll_event>, timeout: time::Duration) -> io::Result<()> {
        // See `mio` for the reason. Works around a linux bug
        #[cfg(target_pointer_width = "32")]
        const MAX_TIMEOUT: u128 = 1789569;
        #[cfg(not(target_pointer_width = "32"))]
        const MAX_TIMEOUT: u128 = c_int::MAX as u128;

        let timeout_millis = timeout.as_millis();
        let timeout_millis = if timeout_millis > MAX_TIMEOUT {
            -1 // effectively infinite
        } else {
            timeout_millis as c_int
        };

        debug_assert!(events.is_empty());
        debug_assert!(events.capacity() > 0 && events.capacity() < i32::MAX as usize);
        let num_events = syscall_result(unsafe{
            libc::epoll_wait(self.handle, events.as_mut_ptr(), events.capacity() as i32, timeout_millis)
        })?;

        unsafe{
            debug_assert!(num_events >= 0);
            events.set_len(num_events as usize);
        }

        return Ok(());
    }

    fn events_from_rw_flags(read: bool, write: bool) -> u32 {
        let mut events = libc::EPOLLET;
        if read {
            events |= libc::EPOLLIN | libc::EPOLLRDHUP;
        }
        if write {
            events |= libc::EPOLLOUT;
        }

        return events as u32;
    }
}

#[cfg(unix)]
impl Drop for Poller {
    fn drop(&mut self) {
        unsafe{ libc::close(self.handle); }
    }
}

#[inline]
fn syscall_result(result: c_int) -> io::Result<c_int> {
    if result < 0 {
        return Err(io::Error::last_os_error());
    } else {
        return Ok(result);
    }
}

#[cfg(not(unix))]
struct Poller {
    // Not implemented for OS's other than unix
}

// -----------------------------------------------------------------------------
// Polling Thread
// -----------------------------------------------------------------------------

enum PollCmd {
    Register(CompHandle, UserData),
    Unregister(FileDescriptor, UserData),
    Shutdown,
}

pub struct PollingThread {
    poller: Arc<Poller>,
    runtime: Arc<RuntimeInner>,
    queue: QueueDynMpsc<PollCmd>,
    logging_enabled: bool,
}

impl PollingThread {
    pub(crate) fn new(runtime: Arc<RuntimeInner>, logging_enabled: bool) -> Result<(PollingThreadHandle, PollingClientFactory), RtError> {
        let poller = Poller::new()
            .map_err(|e| rt_error!("failed to create poller, because: {}", e))?;
        let poller = Arc::new(poller);
        let queue = QueueDynMpsc::new(64);
        let queue_producers = queue.producer_factory();

        let mut thread_data = PollingThread{
            poller: poller.clone(),
            runtime: runtime.clone(),
            queue,
            logging_enabled,
        };
        let thread_handle = thread::spawn(move || { thread_data.run() });

        let thread_handle = PollingThreadHandle{
            queue: Some(queue_producers.producer()),
            handle: Some(thread_handle),
        };
        let client_factory = PollingClientFactory{
            poller,
            generation_counter: Arc::new(AtomicU32::new(0)),
            queue_factory: queue_producers,
        };

        return Ok((thread_handle, client_factory));
    }

    pub(crate) fn run(&mut self) {
        use crate::runtime2::communication::Message;

        const NUM_EVENTS: usize = 256;
        const EPOLL_DURATION: time::Duration = time::Duration::from_millis(250);

        // @performance: Lot of improvements possible here, a HashMap is likely
        // a horrible way to do this.
        let mut events = Vec::with_capacity(NUM_EVENTS);
        let mut lookup = HashMap::with_capacity(64);
        self.log("Starting polling thread");

        loop {
            // Retrieve events first (because the PollingClient will first
            // register at epoll, and then push a command into the queue).
            self.poller.wait(&mut events, EPOLL_DURATION).unwrap();

            // Then handle everything in the command queue.
            while let Some(command) = self.queue.pop() {
                match command {
                    PollCmd::Register(handle, user_data) => {
                        self.log(&format!("Registering component {:?} as {}", handle.id(), user_data.0));
                        let key = Self::user_data_as_key(user_data);
                        debug_assert!(!lookup.contains_key(&key));
                        lookup.insert(key, handle);
                    },
                    PollCmd::Unregister(_file_descriptor, user_data) => {
                        let key = Self::user_data_as_key(user_data);
                        debug_assert!(lookup.contains_key(&key));
                        let mut handle = lookup.remove(&key).unwrap();
                        self.log(&format!("Unregistering component {:?} as {}", handle.id(), user_data.0));
                        if let Some(key) = handle.decrement_users() {
                            self.runtime.destroy_component(key);
                        }
                    },
                    PollCmd::Shutdown => {
                        // The contract is that all scheduler threads shutdown
                        // before the polling thread. This happens when all
                        // components are removed.
                        self.log("Received shutdown signal");
                        debug_assert!(lookup.is_empty());
                        return;
                    }
                }
            }

            // Now process all of the events. Because we might have had a
            // `Register` command followed by an `Unregister` command (e.g. a
            // component has died), we might get events that are not associated
            // with an entry in the lookup.
            for event in events.drain(..) {
                let key = event.u64;
                if let Some(handle) = lookup.get(&key) {
                    let events = event.events;
                    self.log(&format!("Sending poll to {:?} (event: {:x})", handle.id(), events));
                    handle.send_message(&self.runtime, Message::Poll, true);
                }
            }
        }
    }

    #[inline]
    fn user_data_as_key(data: UserData) -> u64 {
        return data.0;
    }

    fn log(&self, message: &str) {
        if self.logging_enabled {
            println!("[polling] {}", message);
        }
    }
}

// bit convoluted, but it works
pub(crate) struct PollingThreadHandle {
    // requires Option, because:
    queue: Option<QueueDynProducer<PollCmd>>, // destructor needs to be called
    handle: Option<thread::JoinHandle<()>>, // we need to call `join`
}

impl PollingThreadHandle {
    pub(crate) fn shutdown(&mut self) -> thread::Result<()> {
        debug_assert!(self.handle.is_some(), "polling thread already destroyed");
        self.queue.take().unwrap().push(PollCmd::Shutdown);
        return self.handle.take().unwrap().join();
    }
}

impl Drop for PollingThreadHandle {
    fn drop(&mut self) {
        debug_assert!(self.queue.is_none() && self.handle.is_none());
    }
}

// oh my god, now I'm writing factory objects. I'm not feeling too well
pub(crate) struct PollingClientFactory {
    poller: Arc<Poller>,
    generation_counter: Arc<AtomicU32>,
    queue_factory: QueueDynProducerFactory<PollCmd>,
}

impl PollingClientFactory {
    pub(crate) fn client(&self) -> PollingClient {
        return PollingClient{
            poller: self.poller.clone(),
            generation_counter: self.generation_counter.clone(),
            queue: self.queue_factory.producer(),
        };
    }
}

pub(crate) struct PollTicket(FileDescriptor, u64);

/// A structure that allows the owner to register components at the polling
/// thread. Because of assumptions in the communication queue all of these
/// clients should be dropped before stopping the polling thread.
pub(crate) struct PollingClient {
    poller: Arc<Poller>,
    generation_counter: Arc<AtomicU32>,
    queue: QueueDynProducer<PollCmd>,
}

impl PollingClient {
    pub(crate) fn register<F: AsFileDescriptor>(&self, entity: &F, handle: CompHandle, read: bool, write: bool) -> Result<PollTicket, RtError> {
        let generation = self.generation_counter.fetch_add(1, Ordering::Relaxed);
        let user_data = user_data_for_component(handle.id().0, generation);
        self.queue.push(PollCmd::Register(handle, user_data));

        let file_descriptor = entity.as_file_descriptor();
        self.poller.register(file_descriptor, user_data, read, write)
            .map_err(|e| rt_error!("failed to register for polling, because: {}", e))?;

        return Ok(PollTicket(file_descriptor, user_data.0));
    }

    pub(crate) fn unregister(&self, ticket: PollTicket) -> Result<(), RtError> {
        let file_descriptor = ticket.0;
        let user_data = UserData(ticket.1);
        self.queue.push(PollCmd::Unregister(file_descriptor, user_data));
        self.poller.unregister(file_descriptor)
            .map_err(|e| rt_error!("failed to unregister polling, because: {}", e))?;

        return Ok(());
    }
}

#[inline]
fn user_data_for_component(component_id: u32, generation: u32) -> UserData {
    return UserData((generation as u64) << 32 | (component_id as u64));
}