Files @ 1f78496722d1
Branch filter:

Location: CSY/reowolf/src/runtime2/poll/mod.rs - annotation

1f78496722d1 11.0 KiB application/rls-services+xml Show Source Show as Raw Download as Raw
Max Henger
feat: runtime error handling
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
1f78496722d1
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
1f78496722d1
113e4349a706
113e4349a706
113e4349a706
1f78496722d1
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
1f78496722d1
113e4349a706
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
1f78496722d1
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
1f78496722d1
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
use libc::{self, c_int};

use std::{io, ptr, time, thread};
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use std::collections::HashMap;

use crate::runtime2::RtError;
use crate::runtime2::runtime::{CompHandle, RuntimeInner, LogLevel};
use crate::runtime2::store::queue_mpsc::*;


pub(crate) type FileDescriptor = c_int;

pub(crate) trait AsFileDescriptor {
    fn as_file_descriptor(&self) -> FileDescriptor;

}

#[derive(Copy, Clone)]
pub(crate) struct UserData(u64);

// -----------------------------------------------------------------------------
// Poller
// -----------------------------------------------------------------------------

#[cfg(unix)]
pub(crate) struct Poller {
    handle: c_int,
}

// All of this is gleaned from the `mio` crate.
#[cfg(unix)]
impl Poller {
    pub fn new() -> io::Result<Self> {
        let handle = syscall_result(unsafe{ libc::epoll_create1(libc::EPOLL_CLOEXEC) })?;

        return Ok(Self{
            handle,
        })
    }

    fn register(&self, fd: FileDescriptor, user: UserData, read: bool, write: bool) -> io::Result<()> {
        let mut event = libc::epoll_event{
            events: Self::events_from_rw_flags(read, write),
            u64: user.0,
        };
        syscall_result(unsafe{
            libc::epoll_ctl(self.handle, libc::EPOLL_CTL_ADD, fd, &mut event)
        })?;

        return Ok(());
    }

    fn unregister(&self, fd: FileDescriptor) -> io::Result<()> {
        syscall_result(unsafe{
            libc::epoll_ctl(self.handle, libc::EPOLL_CTL_DEL, fd, ptr::null_mut())
        })?;

        return Ok(());
    }

    /// Performs `epoll_wait`, waiting for the provided timeout or until events
    /// are reported. They are stored in the `events` variable (up to
    /// `events.cap()` are reported, so ensure it is preallocated).
    pub fn wait(&self, events: &mut Vec<libc::epoll_event>, timeout: time::Duration) -> io::Result<()> {
        // See `mio` for the reason. Works around a linux bug
        #[cfg(target_pointer_width = "32")]
        const MAX_TIMEOUT: u128 = 1789569;
        #[cfg(not(target_pointer_width = "32"))]
        const MAX_TIMEOUT: u128 = c_int::MAX as u128;

        let timeout_millis = timeout.as_millis();
        let timeout_millis = if timeout_millis > MAX_TIMEOUT {
            -1 // effectively infinite
        } else {
            timeout_millis as c_int
        };

        debug_assert!(events.is_empty());
        debug_assert!(events.capacity() > 0 && events.capacity() < i32::MAX as usize);
        let num_events = syscall_result(unsafe{
            libc::epoll_wait(self.handle, events.as_mut_ptr(), events.capacity() as i32, timeout_millis)
        })?;

        unsafe{
            debug_assert!(num_events >= 0);
            events.set_len(num_events as usize);
        }

        return Ok(());
    }

    fn events_from_rw_flags(read: bool, write: bool) -> u32 {
        let mut events = libc::EPOLLET;
        if read {
            events |= libc::EPOLLIN | libc::EPOLLRDHUP;
        }
        if write {
            events |= libc::EPOLLOUT;
        }

        return events as u32;
    }
}

#[cfg(unix)]
impl Drop for Poller {
    fn drop(&mut self) {
        unsafe{ libc::close(self.handle); }
    }
}

#[inline]
fn syscall_result(result: c_int) -> io::Result<c_int> {
    if result < 0 {
        return Err(io::Error::last_os_error());
    } else {
        return Ok(result);
    }
}

#[cfg(not(unix))]
struct Poller {
    // Not implemented for OS's other than unix
}

// -----------------------------------------------------------------------------
// Polling Thread
// -----------------------------------------------------------------------------

enum PollCmd {
    Register(CompHandle, UserData),
    Unregister(FileDescriptor, UserData),
    Shutdown,
}

pub struct PollingThread {
    poller: Arc<Poller>,
    runtime: Arc<RuntimeInner>,
    queue: QueueDynMpsc<PollCmd>,
    log_level: LogLevel,
}

impl PollingThread {
    pub(crate) fn new(runtime: Arc<RuntimeInner>, log_level: LogLevel) -> Result<(PollingThreadHandle, PollingClientFactory), RtError> {
        let poller = Poller::new()
            .map_err(|e| rt_error!("failed to create poller, because: {}", e))?;
        let poller = Arc::new(poller);
        let queue = QueueDynMpsc::new(64);
        let queue_producers = queue.producer_factory();

        let mut thread_data = PollingThread{
            poller: poller.clone(),
            runtime: runtime.clone(),
            queue,
            log_level,
        };
        let thread_handle = thread::Builder::new()
            .name(String::from("poller"))
            .spawn(move || { thread_data.run() })
            .map_err(|reason|
                rt_error!("failed to start polling thread, because: {}", reason)
            )?;

        let thread_handle = PollingThreadHandle{
            queue: Some(queue_producers.producer()),
            handle: Some(thread_handle),
        };
        let client_factory = PollingClientFactory{
            poller,
            generation_counter: Arc::new(AtomicU32::new(0)),
            queue_factory: queue_producers,
        };

        return Ok((thread_handle, client_factory));
    }

    pub(crate) fn run(&mut self) {
        use crate::runtime2::communication::Message;

        const NUM_EVENTS: usize = 256;
        const EPOLL_DURATION: time::Duration = time::Duration::from_millis(250);

        // @performance: Lot of improvements possible here, a HashMap is likely
        // a horrible way to do this.
        let mut events = Vec::with_capacity(NUM_EVENTS);
        let mut lookup = HashMap::with_capacity(64);
        self.log("Starting polling thread");

        loop {
            // Retrieve events first (because the PollingClient will first
            // register at epoll, and then push a command into the queue).
            self.poller.wait(&mut events, EPOLL_DURATION).unwrap();

            // Then handle everything in the command queue.
            while let Some(command) = self.queue.pop() {
                match command {
                    PollCmd::Register(handle, user_data) => {
                        self.log(&format!("Registering component {:?} as {}", handle.id(), user_data.0));
                        let key = Self::user_data_as_key(user_data);
                        debug_assert!(!lookup.contains_key(&key));
                        lookup.insert(key, handle);
                    },
                    PollCmd::Unregister(_file_descriptor, user_data) => {
                        let key = Self::user_data_as_key(user_data);
                        debug_assert!(lookup.contains_key(&key));
                        let mut handle = lookup.remove(&key).unwrap();
                        self.log(&format!("Unregistering component {:?} as {}", handle.id(), user_data.0));
                        if let Some(key) = handle.decrement_users() {
                            self.runtime.destroy_component(key);
                        }
                    },
                    PollCmd::Shutdown => {
                        // The contract is that all scheduler threads shutdown
                        // before the polling thread. This happens when all
                        // components are removed.
                        self.log("Received shutdown signal");
                        debug_assert!(lookup.is_empty());
                        return;
                    }
                }
            }

            // Now process all of the events. Because we might have had a
            // `Register` command followed by an `Unregister` command (e.g. a
            // component has died), we might get events that are not associated
            // with an entry in the lookup.
            for event in events.drain(..) {
                let key = event.u64;
                if let Some(handle) = lookup.get(&key) {
                    let events = event.events;
                    self.log(&format!("Sending poll to {:?} (event: {:x})", handle.id(), events));
                    handle.send_message(&self.runtime, Message::Poll, true);
                }
            }
        }
    }

    #[inline]
    fn user_data_as_key(data: UserData) -> u64 {
        return data.0;
    }

    fn log(&self, message: &str) {
        if self.log_level >= LogLevel::Info {
            println!("[polling] {}", message);
        }
    }
}

// bit convoluted, but it works
pub(crate) struct PollingThreadHandle {
    // requires Option, because:
    queue: Option<QueueDynProducer<PollCmd>>, // destructor needs to be called
    handle: Option<thread::JoinHandle<()>>, // we need to call `join`
}

impl PollingThreadHandle {
    pub(crate) fn shutdown(&mut self) -> thread::Result<()> {
        debug_assert!(self.handle.is_some(), "polling thread already destroyed");
        self.queue.take().unwrap().push(PollCmd::Shutdown);
        return self.handle.take().unwrap().join();
    }
}

impl Drop for PollingThreadHandle {
    fn drop(&mut self) {
        debug_assert!(self.queue.is_none() && self.handle.is_none());
    }
}

// oh my god, now I'm writing factory objects. I'm not feeling too well
pub(crate) struct PollingClientFactory {
    poller: Arc<Poller>,
    generation_counter: Arc<AtomicU32>,
    queue_factory: QueueDynProducerFactory<PollCmd>,
}

impl PollingClientFactory {
    pub(crate) fn client(&self) -> PollingClient {
        return PollingClient{
            poller: self.poller.clone(),
            generation_counter: self.generation_counter.clone(),
            queue: self.queue_factory.producer(),
        };
    }
}

pub(crate) struct PollTicket(FileDescriptor, u64);

/// A structure that allows the owner to register components at the polling
/// thread. Because of assumptions in the communication queue all of these
/// clients should be dropped before stopping the polling thread.
pub(crate) struct PollingClient {
    poller: Arc<Poller>,
    generation_counter: Arc<AtomicU32>,
    queue: QueueDynProducer<PollCmd>,
}

impl PollingClient {
    pub(crate) fn register<F: AsFileDescriptor>(&self, entity: &F, handle: CompHandle, read: bool, write: bool) -> Result<PollTicket, RtError> {
        let generation = self.generation_counter.fetch_add(1, Ordering::Relaxed);
        let user_data = user_data_for_component(handle.id().0, generation);
        self.queue.push(PollCmd::Register(handle, user_data));

        let file_descriptor = entity.as_file_descriptor();
        self.poller.register(file_descriptor, user_data, read, write)
            .map_err(|e| rt_error!("failed to register for polling, because: {}", e))?;

        return Ok(PollTicket(file_descriptor, user_data.0));
    }

    pub(crate) fn unregister(&self, ticket: PollTicket) -> Result<(), RtError> {
        let file_descriptor = ticket.0;
        let user_data = UserData(ticket.1);
        self.queue.push(PollCmd::Unregister(file_descriptor, user_data));
        self.poller.unregister(file_descriptor)
            .map_err(|e| rt_error!("failed to unregister polling, because: {}", e))?;

        return Ok(());
    }
}

#[inline]
fn user_data_for_component(component_id: u32, generation: u32) -> UserData {
    return UserData((generation as u64) << 32 | (component_id as u64));
}