Files @ a3a2b16408b1
Branch filter:

Location: CSY/reowolf/src/runtime2/poll/mod.rs - annotation

a3a2b16408b1 10.5 KiB application/rls-services+xml Show Source Show as Raw Download as Raw
mh
Initial polling thread implementation
2c1fa43903ac
2c1fa43903ac
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
a3a2b16408b1
a3a2b16408b1
2c1fa43903ac
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
a3a2b16408b1
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
2c1fa43903ac
a3a2b16408b1
a3a2b16408b1
2c1fa43903ac
a3a2b16408b1
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
a3a2b16408b1
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
2c1fa43903ac
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
a3a2b16408b1
2c1fa43903ac
use libc::{self, c_int};

use std::{io, ptr, time, thread};
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use std::collections::HashMap;

use crate::runtime2::RtError;
use crate::runtime2::runtime::{CompHandle, RuntimeInner};
use crate::runtime2::store::queue_mpsc::*;


pub(crate) type FileDescriptor = c_int;

pub(crate) trait AsFileDescriptor {
    fn as_file_descriptor(&self) -> FileDescriptor;

}

#[derive(Copy, Clone)]
pub(crate) struct UserData(u64);

// -----------------------------------------------------------------------------
// Poller
// -----------------------------------------------------------------------------

#[cfg(unix)]
pub(crate) struct Poller {
    handle: c_int,
}

// All of this is gleaned from the `mio` crate.
#[cfg(unix)]
impl Poller {
    pub fn new() -> io::Result<Self> {
        let handle = syscall_result(unsafe{ libc::epoll_create1(libc::EPOLL_CLOEXEC) })?;

        return Ok(Self{
            handle,
        })
    }

    fn register(&self, fd: FileDescriptor, user: UserData, read: bool, write: bool) -> io::Result<()> {
        let mut event = libc::epoll_event{
            events: Self::events_from_rw_flags(read, write),
            u64: user.0,
        };
        syscall_result(unsafe{
            libc::epoll_ctl(self.handle, libc::EPOLL_CTL_ADD, fd, &mut event)
        })?;

        return Ok(());
    }

    fn unregister(&self, fd: FileDescriptor) -> io::Result<()> {
        syscall_result(unsafe{
            libc::epoll_ctl(self.handle, libc::EPOLL_CTL_DEL, fd, ptr::null_mut())
        })?;

        return Ok(());
    }

    /// Performs `epoll_wait`, waiting for the provided timeout or until events
    /// are reported. They are stored in the `events` variable (up to
    /// `events.cap()` are reported, so ensure it is preallocated).
    pub fn wait(&self, events: &mut Vec<libc::epoll_event>, timeout: time::Duration) -> io::Result<()> {
        // See `mio` for the reason. Works around a linux bug
        #[cfg(target_pointer_width = "32")]
        const MAX_TIMEOUT: u128 = 1789569;
        #[cfg(not(target_pointer_width = "32"))]
        const MAX_TIMEOUT: u128 = c_int::MAX as u128;

        let timeout_millis = timeout.as_millis();
        let timeout_millis = if timeout_millis > MAX_TIMEOUT {
            -1 // effectively infinite
        } else {
            timeout_millis as c_int
        };

        debug_assert!(events.is_empty());
        debug_assert!(events.capacity() > 0 && events.capacity() < i32::MAX as usize);
        let num_events = syscall_result(unsafe{
            libc::epoll_wait(self.handle, events.as_mut_ptr(), events.capacity() as i32, timeout_millis)
        })?;

        unsafe{
            debug_assert!(num_events >= 0);
            events.set_len(num_events as usize);
        }

        return Ok(());
    }

    fn events_from_rw_flags(read: bool, write: bool) -> u32 {
        let mut events = libc::EPOLLET;
        if read {
            events |= libc::EPOLLIN | libc::EPOLLRDHUP;
        }
        if write {
            events |= libc::EPOLLOUT;
        }

        return events as u32;
    }
}

#[cfg(unix)]
impl Drop for Poller {
    fn drop(&mut self) {
        unsafe{ libc::close(self.handle); }
    }
}

#[inline]
fn syscall_result(result: c_int) -> io::Result<c_int> {
    if result < 0 {
        return Err(io::Error::last_os_error());
    } else {
        return Ok(result);
    }
}

#[cfg(not(unix))]
struct Poller {

}

// -----------------------------------------------------------------------------
// Polling Thread
// -----------------------------------------------------------------------------

enum PollCmd {
    Register(CompHandle, UserData),
    Unregister(FileDescriptor, UserData),
    Shutdown,
}

/// Represents the data needed to build interfaces to the polling thread (which
/// should happen first) and to create the polling thread itself.
pub(crate) struct PollingThreadBuilder {
    poller: Arc<Poller>,
    generation_counter: Arc<AtomicU32>,
    queue: QueueDynMpsc<PollCmd>,
    runtime: Arc<RuntimeInner>,
    logging_enabled: bool,
}

impl PollingThreadBuilder {
    pub(crate) fn new(runtime: Arc<RuntimeInner>, logging_enabled: bool) -> Result<PollingThreadBuilder, RtError> {
        let poller = Poller::new()
            .map_err(|e| rt_error!("failed to create poller, because: {}", e))?;

        return Ok(PollingThreadBuilder {
            poller: Arc::new(poller),
            generation_counter: Arc::new(AtomicU32::new(0)),
            queue: QueueDynMpsc::new(64),
            runtime,
            logging_enabled,
        })
    }

    pub(crate) fn client(&self) -> PollingClient {
        return PollingClient{
            poller: self.poller.clone(),
            generation_counter: self.generation_counter.clone(),
            queue: self.queue.producer(),
        }
    }

    pub(crate) fn into_thread(self) -> (PollingThread, PollingThreadDestroyer) {
        let destroyer = self.queue.producer();

        return (
            PollingThread{
                poller: self.poller,
                runtime: self.runtime,
                queue: self.queue,
                logging_enabled: self.logging_enabled,
            },
            PollingThreadDestroyer::new(destroyer)
        );
    }
}

pub(crate) struct PollingThread {
    poller: Arc<Poller>,
    runtime: Arc<RuntimeInner>,
    queue: QueueDynMpsc<PollCmd>,
    logging_enabled: bool,
}

impl PollingThread {
    pub(crate) fn run(&mut self) {
        use crate::runtime2::scheduler::SchedulerCtx;
        use crate::runtime2::communication::Message;

        const NUM_EVENTS: usize = 256;
        const EPOLL_DURATION: time::Duration = time::Duration::from_millis(250);

        // @performance: Lot of improvements possible here, a HashMap is likely
        // a horrible way to do this.
        let mut events = Vec::with_capacity(NUM_EVENTS);
        let mut lookup = HashMap::with_capacity(64);
        self.log("Starting polling thread");

        loop {
            // Retrieve events first (because the PollingClient will first
            // register at epoll, and then push a command into the queue).
            self.poller.wait(&mut events, EPOLL_DURATION).unwrap();

            // Then handle everything in the command queue.
            while let Some(command) = self.queue.pop() {
                match command {
                    PollCmd::Register(handle, user_data) => {
                        self.log(&format!("Registering component {:?} as {}", handle.id(), user_data.0));
                        let key = Self::user_data_as_key(user_data);
                        debug_assert!(!lookup.contains_key(&key));
                        lookup.insert(key, handle);
                    },
                    PollCmd::Unregister(_file_descriptor, user_data) => {
                        let key = Self::user_data_as_key(user_data);
                        debug_assert!(lookup.contains_key(&key));
                        let mut handle = lookup.remove(&key).unwrap();
                        self.log(&format!("Unregistering component {:?} as {}", handle.id(), user_data.0));
                        if let Some(key) = handle.decrement_users() {
                            self.runtime.destroy_component(key);
                        }
                    },
                    PollCmd::Shutdown => {
                        // The contract is that all scheduler threads shutdown
                        // before the polling thread. This happens when all
                        // components are removed.
                        self.log("Received shutdown signal");
                        debug_assert!(lookup.is_empty());
                        return;
                    }
                }
            }

            // Now process all of the events. Because we might have had a
            // `Register` command followed by an `Unregister` command (e.g. a
            // component has died), we might get events that are not associated
            // with an entry in the lookup.
            for event in events.drain(..) {
                let key = event.u64;
                if let Some(handle) = lookup.get(&key) {
                    self.log(&format!("Sending poll to {:?} (event: {:x})", handle.id(), event.events));
                    handle.send_message(&self.runtime, Message::Poll, true);
                }
            }
        }
    }

    #[inline]
    fn user_data_as_key(data: UserData) -> u64 {
        return data.0;
    }

    fn log(&self, message: &str) {
        if self.logging_enabled {
            println!("[polling] {}", message);
        }
    }
}

// bit convoluted, but it works
pub(crate) struct PollingThreadDestroyer {
    queue: Option<QueueDynProducer<PollCmd>>,
}

impl PollingThreadDestroyer {
    fn new(queue: QueueDynProducer<PollCmd>) -> Self {
        return Self{ queue: Some(queue) };
    }

    pub(crate) fn initiate_destruction(&mut self) {
        self.queue.take().unwrap().push(PollCmd::Shutdown);
    }
}

impl Drop for PollingThreadDestroyer {
    fn drop(&mut self) {
        debug_assert!(self.queue.is_none());
    }
}

pub(crate) struct PollTicket(FileDescriptor, u64);

/// A structure that allows the owner to register components at the polling
/// thread. Because of assumptions in the communication queue all of these
/// clients should be dropped before stopping the polling thread.
pub(crate) struct PollingClient {
    poller: Arc<Poller>,
    generation_counter: Arc<AtomicU32>,
    queue: QueueDynProducer<PollCmd>,
}

impl PollingClient {
    fn register<F: AsFileDescriptor>(&self, entity: F, handle: CompHandle, read: bool, write: bool) -> Result<PollTicket, RtError> {
        let generation = self.generation_counter.fetch_add(1, Ordering::Relaxed);
        let user_data = user_data_for_component(handle.id().0, generation);
        self.queue.push(PollCmd::Register(handle, user_data));

        let file_descriptor = entity.as_file_descriptor();
        self.poller.register(file_descriptor, user_data, read, write)
            .map_err(|e| rt_error!("failed to register for polling, because: {}", e))?;

        return Ok(PollTicket(file_descriptor, user_data.0));
    }

    fn unregister(&self, ticket: PollTicket) -> Result<(), RtError> {
        let file_descriptor = ticket.0;
        let user_data = UserData(ticket.1);
        self.queue.push(PollCmd::Unregister(file_descriptor, user_data));
        self.poller.unregister(file_descriptor)
            .map_err(|e| rt_error!("failed to unregister polling, because: {}", e))?;

        return Ok(());
    }
}

#[inline]
fn user_data_for_component(component_id: u32, generation: u32) -> UserData {
    return UserData((generation as u64) << 32 | (component_id as u64));
}