Changeset - a3a2b16408b1
[Not reviewed]
src/protocol/parser/token_parsing.rs
Show inline comments
 
@@ -549,8 +549,6 @@ fn is_reserved_expression_keyword(text: &[u8]) -> bool {
 
    match text {
 
        KW_LET | KW_CAST |
 
        KW_LIT_TRUE | KW_LIT_FALSE | KW_LIT_NULL |
 
        // TODO: Remove this once global namespace errors work @nocommit
 
        // KW_FUNC_GET | KW_FUNC_PUT | KW_FUNC_FIRES | KW_FUNC_CREATE | KW_FUNC_ASSERT | KW_FUNC_LENGTH | KW_FUNC_PRINT => true,
 
        _ => false,
 
    }
 
}
src/runtime2/communication.rs
Show inline comments
 
@@ -197,6 +197,7 @@ pub enum Message {
 
    Data(DataMessage),
 
    Sync(SyncMessage),
 
    Control(ControlMessage),
 
    Poll,
 
}
 

	
 
impl Message {
 
@@ -208,6 +209,8 @@ impl Message {
 
                return v.target_port_id,
 
            Message::Sync(_) =>
 
                return None,
 
            Message::Poll =>
 
                return None,
 
        }
 
    }
 

	
 
@@ -218,6 +221,7 @@ impl Message {
 
            Message::Control(v) =>
 
                v.target_port_id = Some(port_id),
 
            Message::Sync(_) => unreachable!(), // should never be called for this message type
 
            Message::Poll => unreachable!(),
 
        }
 
    }
 
}
src/runtime2/component/component.rs
Show inline comments
 
@@ -243,7 +243,7 @@ pub(crate) fn default_handle_start_exit(
 
        let port_handle = comp_ctx.get_port_handle(port_id);
 
        let (peer, message) = control.initiate_port_closing(port_handle, comp_ctx);
 
        let peer_info = comp_ctx.get_peer(peer);
 
        peer_info.handle.send_message(sched_ctx, Message::Control(message), true);
 
        peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(message), true);
 
    }
 

	
 
    return CompScheduling::Immediate; // to check if we can shut down immediately
 
@@ -289,7 +289,7 @@ fn default_handle_ack(
 
            AckAction::SendMessage(target_comp, message) => {
 
                // FIX @NoDirectHandle
 
                let mut handle = sched_ctx.runtime.get_component_public(target_comp);
 
                handle.send_message(sched_ctx, Message::Control(message), true);
 
                handle.send_message(&sched_ctx.runtime, Message::Control(message), true);
 
                let _should_remove = handle.decrement_users();
 
                debug_assert!(_should_remove.is_none());
 
            },
 
@@ -321,7 +321,7 @@ fn default_send_ack(
 
    sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx
 
) {
 
    let peer_info = comp_ctx.get_peer(peer_handle);
 
    peer_info.handle.send_message(sched_ctx, Message::Control(ControlMessage{
 
    peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(ControlMessage{
 
        id: causer_of_ack_id,
 
        sender_comp_id: comp_ctx.id,
 
        target_port_id: None,
 
@@ -350,7 +350,7 @@ fn default_handle_unblock_put(
 
        // Retrieve peer to send the message
 
        let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 
        peer_info.handle.send_message(sched_ctx, Message::Data(to_send), true);
 
        peer_info.handle.send_message(&sched_ctx.runtime, Message::Data(to_send), true);
 

	
 
        exec_state.mode = CompMode::Sync; // because we're blocked on a `put`, we must've started in the sync state.
 
        exec_state.mode_port = PortId::new_invalid();
src/runtime2/component/component_ip.rs
Show inline comments
 
@@ -45,7 +45,8 @@ impl Component for ComponentRandomU32 {
 
                    &mut self.exec_state, &mut self.control, &mut self.consensus,
 
                    message, sched_ctx, comp_ctx
 
                );
 
            }
 
            },
 
            Message::Poll => unreachable!(),
 
        }
 
    }
 

	
 
@@ -101,7 +102,7 @@ impl Component for ComponentRandomU32 {
 
                        let message = self.consensus.annotate_data_message(comp_ctx, port_info, value_group);
 
                        let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 
                        let peer_info = comp_ctx.get_peer(peer_handle);
 
                        peer_info.handle.send_message(sched_ctx, Message::Data(message), true);
 
                        peer_info.handle.send_message(&sched_ctx.runtime, Message::Data(message), true);
 

	
 
                        // Remain in sync mode, but after `did_perform_send` was
 
                        // set to true.
src/runtime2/component/component_pdl.rs
Show inline comments
 
@@ -236,7 +236,7 @@ impl Component for CompPDL {
 
        sched_ctx.log(&format!("handling message: {:#?}", message));
 
        if let Some(new_target) = self.control.should_reroute(&mut message) {
 
            let mut target = sched_ctx.runtime.get_component_public(new_target);
 
            target.send_message(sched_ctx, message, false); // not waking up: we schedule once we've received all PortPeerChanged Acks
 
            target.send_message(&sched_ctx.runtime, message, false); // not waking up: we schedule once we've received all PortPeerChanged Acks
 
            let _should_remove = target.decrement_users();
 
            debug_assert!(_should_remove.is_none());
 
            return;
 
@@ -254,6 +254,9 @@ impl Component for CompPDL {
 
            },
 
            Message::Sync(message) => {
 
                self.handle_incoming_sync_message(sched_ctx, comp_ctx, message);
 
            },
 
            Message::Poll => {
 
                unreachable!(); // because we never register at the polling thread
 
            }
 
        }
 
    }
 
@@ -502,7 +505,7 @@ impl CompPDL {
 
            let port_handle = comp_ctx.get_port_handle(port_id);
 
            let (peer, message) = self.control.initiate_port_closing(port_handle, comp_ctx);
 
            let peer_info = comp_ctx.get_peer(peer);
 
            peer_info.handle.send_message(sched_ctx, Message::Control(message), true);
 
            peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(message), true);
 
        }
 
    }
 

	
 
@@ -515,7 +518,7 @@ impl CompPDL {
 
        let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 
        let annotated_message = self.consensus.annotate_data_message(comp_ctx, port_info, value);
 
        peer_info.handle.send_message(sched_ctx, Message::Data(annotated_message), true);
 
        peer_info.handle.send_message(&sched_ctx.runtime, Message::Data(annotated_message), true);
 
    }
 

	
 
    /// Handles a message that came in through the public inbox. This function
 
@@ -563,7 +566,7 @@ impl CompPDL {
 
                self.control.initiate_port_blocking(comp_ctx, port_handle);
 

	
 
            let peer = comp_ctx.get_peer(peer_handle);
 
            peer.handle.send_message(sched_ctx, Message::Control(message), true);
 
            peer.handle.send_message(&sched_ctx.runtime, Message::Control(message), true);
 
        }
 

	
 
        // But we still need to remember the message, so:
 
@@ -598,7 +601,7 @@ impl CompPDL {
 
            comp_ctx.set_port_state(port_handle, PortState::Open);
 
            let (peer_handle, message) = self.control.cancel_port_blocking(comp_ctx, port_handle);
 
            let peer_info = comp_ctx.get_peer(peer_handle);
 
            peer_info.handle.send_message(sched_ctx, Message::Control(message), true);
 
            peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(message), true);
 
        }
 
    }
 

	
 
@@ -625,18 +628,19 @@ impl CompPDL {
 
        let mut opened_port_id_pairs = Vec::new();
 
        let mut closed_port_id_pairs = Vec::new();
 

	
 
        // TODO: @Nocommit
 
        let other_proc = &sched_ctx.runtime.protocol.heap[definition_id];
 
        let self_proc = &sched_ctx.runtime.protocol.heap[self.prompt.frames[0].definition];
 

	
 
        let reservation = sched_ctx.runtime.start_create_pdl_component();
 
        let mut created_ctx = CompCtx::new(&reservation);
 

	
 
        println!(
 
        let other_proc = &sched_ctx.runtime.protocol.heap[definition_id];
 
        let self_proc = &sched_ctx.runtime.protocol.heap[self.prompt.frames[0].definition];
 

	
 
        dbg_code!({
 
            sched_ctx.log(&format!(
 
                "DEBUG: Comp '{}' (ID {:?}) is creating comp '{}' (ID {:?})",
 
                self_proc.identifier.value.as_str(), creator_ctx.id,
 
                other_proc.identifier.value.as_str(), reservation.id()
 
        );
 
            ));
 
        });
 

	
 
        // Take all the ports ID that are in the `args` (and currently belong to
 
        // the creator component) and translate them into new IDs that are
 
@@ -798,7 +802,7 @@ impl CompPDL {
 
                    );
 
                    let peer_handle = created_ctx.get_peer_handle(port_info.peer_comp_id);
 
                    let peer_info = created_ctx.get_peer(peer_handle);
 
                    peer_info.handle.send_message(sched_ctx, message, true);
 
                    peer_info.handle.send_message(&sched_ctx.runtime, message, true);
 
                }
 
            }
 
        } else {
src/runtime2/component/consensus.rs
Show inline comments
 
@@ -504,7 +504,7 @@ impl Consensus {
 
                    sync_header: self.create_sync_header(comp_ctx),
 
                    content: SyncMessageContent::NotificationOfLeader,
 
                };
 
                peer.handle.send_message(sched_ctx, Message::Sync(message), true);
 
                peer.handle.send_message(&sched_ctx.runtime, Message::Sync(message), true);
 
            }
 

	
 
            self.forward_partial_solution(sched_ctx, comp_ctx);
 
@@ -516,7 +516,7 @@ impl Consensus {
 
            };
 
            let peer_handle = comp_ctx.get_peer_handle(header.sending_id);
 
            let peer_info = comp_ctx.get_peer(peer_handle);
 
            peer_info.handle.send_message(sched_ctx, Message::Sync(message), true);
 
            peer_info.handle.send_message(&sched_ctx.runtime, Message::Sync(message), true);
 
        } // else: exactly equal
 
    }
 

	
 
@@ -622,7 +622,7 @@ impl Consensus {
 
                sync_header: self.create_sync_header(comp_ctx),
 
                content: if is_success { SyncMessageContent::GlobalSolution } else { SyncMessageContent::GlobalFailure },
 
            });
 
            handle.send_message(sched_ctx, message, true);
 
            handle.send_message(&sched_ctx.runtime, message, true);
 
            let _should_remove = handle.decrement_users();
 
            debug_assert!(_should_remove.is_none());
 
        }
 
@@ -631,7 +631,7 @@ impl Consensus {
 
    fn send_to_leader(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, message: Message) {
 
        debug_assert_ne!(self.highest_id, comp_ctx.id); // we're not the leader
 
        let mut leader_info = sched_ctx.runtime.get_component_public(self.highest_id);
 
        leader_info.send_message(sched_ctx, message, true);
 
        leader_info.send_message(&sched_ctx.runtime, message, true);
 
        let should_remove = leader_info.decrement_users();
 
        if let Some(key) = should_remove {
 
            sched_ctx.runtime.destroy_component(key);
src/runtime2/component/mod.rs
Show inline comments
 
@@ -16,7 +16,7 @@ use super::runtime::*;
 
/// If the component is sleeping, then that flag will be atomically set to
 
/// false. If we're the ones that made that happen then we add it to the work
 
/// queue.
 
pub(crate) fn wake_up_if_sleeping(sched_ctx: &SchedulerCtx, comp_id: CompId, handle: &CompHandle) {
 
pub(crate) fn wake_up_if_sleeping(runtime: &RuntimeInner, comp_id: CompId, handle: &CompHandle) {
 
    use std::sync::atomic::Ordering;
 

	
 
    let should_wake_up = handle.sleeping
 
@@ -25,6 +25,6 @@ pub(crate) fn wake_up_if_sleeping(sched_ctx: &SchedulerCtx, comp_id: CompId, han
 

	
 
    if should_wake_up {
 
        let comp_key = unsafe{ comp_id.upgrade() };
 
        sched_ctx.runtime.enqueue_work(comp_key);
 
        runtime.enqueue_work(comp_key);
 
    }
 
}
 
\ No newline at end of file
src/runtime2/error.rs
Show inline comments
 
new file 100644
 
use std::fmt::{Write, Debug, Display, Formatter as FmtFormatter, Result as FmtResult};
 

	
 
/// Represents an unrecoverable runtime error that is reported to the user (for
 
/// debugging purposes). Basically a human-readable message with its source
 
/// location. The error is chainable.
 
pub struct RtError {
 
    file: &'static str,
 
    line: u32,
 
    message: String,
 
    cause: Option<Box<RtError>>,
 
}
 

	
 
impl RtError {
 
    pub(crate) fn new(file: &'static str, line: u32, message: String) -> RtError {
 
        return RtError {
 
            file, line, message, cause: None,
 
        }
 
    }
 

	
 
    pub(crate) fn wrap(self, file: &'static str, line: u32, message: String) -> RtError {
 
        return RtError {
 
            file, line, message, cause: Some(Box::new(self))
 
        }
 
    }
 
}
 

	
 
impl Display for RtError {
 
    fn fmt(&self, f: &mut FmtFormatter<'_>) -> FmtResult {
 
        let mut error = self;
 
        loop {
 
            write!(f, "[{}:{}] {}", self.file, self.line, self.message).unwrap();
 
            match &error.cause {
 
                Some(cause) => {
 
                    writeln!(f, " ...");
 
                    error = cause.as_ref()
 
                },
 
                None => {
 
                    writeln!(f).unwrap();
 
                },
 
            }
 
        }
 
    }
 
}
 

	
 
impl Debug for RtError {
 
    fn fmt(&self, f: &mut FmtFormatter<'_>) -> FmtResult {
 
        return (self as &dyn Display).fmt(f);
 
    }
 
}
 

	
 
macro_rules! rt_error {
 
    ($fmt:expr) => {
 
        $crate::runtime2::error::RtError::new(file!(), line!(), $fmt.to_string())
 
    };
 
    ($fmt:expr, $($args:expr),*) => {
 
        $crate::runtime2::error::RtError::new(file!(), line!(), format!($fmt, $($args),*))
 
    };
 
}
 

	
 
macro_rules! rt_error_try {
 
    ($prev:expr, $($fmt_and_args:expr),*) => {
 
        {
 
            let result = $prev;
 
            match result {
 
                Ok(result) => result,
 
                Err(result) => return Err(result.wrap(file!(), line!(), format!($($fmt_and_args),*))),
 
            }
 
        }
 
    }
 
}
 
\ No newline at end of file
src/runtime2/mod.rs
Show inline comments
 
#[macro_use] mod error;
 
mod store;
 
mod runtime;
 
mod component;
 
@@ -8,6 +9,7 @@ mod stdlib;
 
#[cfg(test)] mod tests;
 

	
 
pub use runtime::Runtime;
 
pub(crate) use error::RtError;
 
pub(crate) use scheduler::SchedulerCtx;
 
pub(crate) use communication::{
 
    PortId, PortKind, PortState,
src/runtime2/poll/mod.rs
Show inline comments
 
use libc::{self, c_int};
 

	
 
use std::{io, ptr, time};
 
use std::{io, ptr, time, thread};
 
use std::sync::Arc;
 
use std::sync::atomic::{AtomicU32, Ordering};
 
use std::collections::HashMap;
 

	
 
use crate::runtime2::RtError;
 
use crate::runtime2::runtime::{CompHandle, RuntimeInner};
 
use crate::runtime2::store::queue_mpsc::*;
 

	
 

	
 
pub(crate) type FileDescriptor = c_int;
 

	
 
@@ -8,40 +16,27 @@ pub(crate) trait AsFileDescriptor {
 
    fn as_file_descriptor(&self) -> FileDescriptor;
 

	
 
}
 
pub(crate) struct UserData(u64);
 

	
 
#[inline]
 
pub(crate) fn register_polling<F: AsFileDescriptor>(
 
    poller: &Poller, entity: F, user: UserData, read: bool, write: bool
 
) -> io::Result<()> {
 
    let file_descriptor = entity.as_file_descriptor();
 
    return poller.register(file_descriptor, user, read, write);
 
}
 
#[derive(Copy, Clone)]
 
pub(crate) struct UserData(u64);
 

	
 
#[inline]
 
pub(crate) fn unregister_polling<F: AsFileDescriptor>(
 
    poller: &Poller, entity: F
 
) -> io::Result<()> {
 
    let file_descriptor = entity.as_file_descriptor();
 
    return poller.unregister(file_descriptor);
 
}
 
// -----------------------------------------------------------------------------
 
// Poller
 
// -----------------------------------------------------------------------------
 

	
 
#[cfg(unix)]
 
pub(crate) struct Poller {
 
    handle: c_int,
 
    events: Vec<libc::epoll_event>
 
}
 

	
 
// All of this is gleaned from the `mio` crate.
 
#[cfg(unix)]
 
impl Poller {
 
    pub fn new(event_capacity: usize) -> io::Result<Self> {
 
        assert!(event_capacity < i32::MAX as usize); // because of argument to `epoll_wait`.
 
    pub fn new() -> io::Result<Self> {
 
        let handle = syscall_result(unsafe{ libc::epoll_create1(libc::EPOLL_CLOEXEC) })?;
 

	
 
        return Ok(Self{
 
            handle,
 
            events: Vec::with_capacity(event_capacity),
 
        })
 
    }
 

	
 
@@ -65,25 +60,32 @@ impl Poller {
 
        return Ok(());
 
    }
 

	
 
    pub fn wait(&mut self, timeout: time::Duration) -> io::Result<()> {
 
    /// Performs `epoll_wait`, waiting for the provided timeout or until events
 
    /// are reported. They are stored in the `events` variable (up to
 
    /// `events.cap()` are reported, so ensure it is preallocated).
 
    pub fn wait(&self, events: &mut Vec<libc::epoll_event>, timeout: time::Duration) -> io::Result<()> {
 
        // See `mio` for the reason. Works around a linux bug
 
        #[cfg(target_pointer_width = "32")]
 
        const MAX_TIMEOUT: u128 = 1789569;
 
        #[cfg(not(target_pointer_width = "32"))]
 
        const MAX_TIMEOUT: u128 = c_int::MAX as u128;
 

	
 
        let mut timeout_millis = timeout.as_millis();
 
        if timeout_millis > MAX_TIMEOUT {
 
            timeout_millis = -1; // effectively infinite
 
        }
 
        let timeout_millis = timeout.as_millis();
 
        let timeout_millis = if timeout_millis > MAX_TIMEOUT {
 
            -1 // effectively infinite
 
        } else {
 
            timeout_millis as c_int
 
        };
 

	
 
        debug_assert!(events.is_empty());
 
        debug_assert!(events.capacity() > 0 && events.capacity() < i32::MAX as usize);
 
        let num_events = syscall_result(unsafe{
 
            libc::epoll_wait(self.handle, self.events.as_mut(), self.events.capacity() as i32, timeout_millis)
 
            libc::epoll_wait(self.handle, events.as_mut_ptr(), events.capacity() as i32, timeout_millis)
 
        })?;
 

	
 
        unsafe{
 
            debug_assert!(num_events >= 0);
 
            self.events.set_len(num_events as usize);
 
            events.set_len(num_events as usize);
 
        }
 

	
 
        return Ok(());
 
@@ -102,6 +104,13 @@ impl Poller {
 
    }
 
}
 

	
 
#[cfg(unix)]
 
impl Drop for Poller {
 
    fn drop(&mut self) {
 
        unsafe{ libc::close(self.handle); }
 
    }
 
}
 

	
 
#[inline]
 
fn syscall_result(result: c_int) -> io::Result<c_int> {
 
    if result < 0 {
 
@@ -115,3 +124,202 @@ fn syscall_result(result: c_int) -> io::Result<c_int> {
 
struct Poller {
 

	
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Polling Thread
 
// -----------------------------------------------------------------------------
 

	
 
enum PollCmd {
 
    Register(CompHandle, UserData),
 
    Unregister(FileDescriptor, UserData),
 
    Shutdown,
 
}
 

	
 
/// Represents the data needed to build interfaces to the polling thread (which
 
/// should happen first) and to create the polling thread itself.
 
pub(crate) struct PollingThreadBuilder {
 
    poller: Arc<Poller>,
 
    generation_counter: Arc<AtomicU32>,
 
    queue: QueueDynMpsc<PollCmd>,
 
    runtime: Arc<RuntimeInner>,
 
    logging_enabled: bool,
 
}
 

	
 
impl PollingThreadBuilder {
 
    pub(crate) fn new(runtime: Arc<RuntimeInner>, logging_enabled: bool) -> Result<PollingThreadBuilder, RtError> {
 
        let poller = Poller::new()
 
            .map_err(|e| rt_error!("failed to create poller, because: {}", e))?;
 

	
 
        return Ok(PollingThreadBuilder {
 
            poller: Arc::new(poller),
 
            generation_counter: Arc::new(AtomicU32::new(0)),
 
            queue: QueueDynMpsc::new(64),
 
            runtime,
 
            logging_enabled,
 
        })
 
    }
 

	
 
    pub(crate) fn client(&self) -> PollingClient {
 
        return PollingClient{
 
            poller: self.poller.clone(),
 
            generation_counter: self.generation_counter.clone(),
 
            queue: self.queue.producer(),
 
        }
 
    }
 

	
 
    pub(crate) fn into_thread(self) -> (PollingThread, PollingThreadDestroyer) {
 
        let destroyer = self.queue.producer();
 

	
 
        return (
 
            PollingThread{
 
                poller: self.poller,
 
                runtime: self.runtime,
 
                queue: self.queue,
 
                logging_enabled: self.logging_enabled,
 
            },
 
            PollingThreadDestroyer::new(destroyer)
 
        );
 
    }
 
}
 

	
 
pub(crate) struct PollingThread {
 
    poller: Arc<Poller>,
 
    runtime: Arc<RuntimeInner>,
 
    queue: QueueDynMpsc<PollCmd>,
 
    logging_enabled: bool,
 
}
 

	
 
impl PollingThread {
 
    pub(crate) fn run(&mut self) {
 
        use crate::runtime2::scheduler::SchedulerCtx;
 
        use crate::runtime2::communication::Message;
 

	
 
        const NUM_EVENTS: usize = 256;
 
        const EPOLL_DURATION: time::Duration = time::Duration::from_millis(250);
 

	
 
        // @performance: Lot of improvements possible here, a HashMap is likely
 
        // a horrible way to do this.
 
        let mut events = Vec::with_capacity(NUM_EVENTS);
 
        let mut lookup = HashMap::with_capacity(64);
 
        self.log("Starting polling thread");
 

	
 
        loop {
 
            // Retrieve events first (because the PollingClient will first
 
            // register at epoll, and then push a command into the queue).
 
            self.poller.wait(&mut events, EPOLL_DURATION).unwrap();
 

	
 
            // Then handle everything in the command queue.
 
            while let Some(command) = self.queue.pop() {
 
                match command {
 
                    PollCmd::Register(handle, user_data) => {
 
                        self.log(&format!("Registering component {:?} as {}", handle.id(), user_data.0));
 
                        let key = Self::user_data_as_key(user_data);
 
                        debug_assert!(!lookup.contains_key(&key));
 
                        lookup.insert(key, handle);
 
                    },
 
                    PollCmd::Unregister(_file_descriptor, user_data) => {
 
                        let key = Self::user_data_as_key(user_data);
 
                        debug_assert!(lookup.contains_key(&key));
 
                        let mut handle = lookup.remove(&key).unwrap();
 
                        self.log(&format!("Unregistering component {:?} as {}", handle.id(), user_data.0));
 
                        if let Some(key) = handle.decrement_users() {
 
                            self.runtime.destroy_component(key);
 
                        }
 
                    },
 
                    PollCmd::Shutdown => {
 
                        // The contract is that all scheduler threads shutdown
 
                        // before the polling thread. This happens when all
 
                        // components are removed.
 
                        self.log("Received shutdown signal");
 
                        debug_assert!(lookup.is_empty());
 
                        return;
 
                    }
 
                }
 
            }
 

	
 
            // Now process all of the events. Because we might have had a
 
            // `Register` command followed by an `Unregister` command (e.g. a
 
            // component has died), we might get events that are not associated
 
            // with an entry in the lookup.
 
            for event in events.drain(..) {
 
                let key = event.u64;
 
                if let Some(handle) = lookup.get(&key) {
 
                    self.log(&format!("Sending poll to {:?} (event: {:x})", handle.id(), event.events));
 
                    handle.send_message(&self.runtime, Message::Poll, true);
 
                }
 
            }
 
        }
 
    }
 

	
 
    #[inline]
 
    fn user_data_as_key(data: UserData) -> u64 {
 
        return data.0;
 
    }
 

	
 
    fn log(&self, message: &str) {
 
        if self.logging_enabled {
 
            println!("[polling] {}", message);
 
        }
 
    }
 
}
 

	
 
// bit convoluted, but it works
 
pub(crate) struct PollingThreadDestroyer {
 
    queue: Option<QueueDynProducer<PollCmd>>,
 
}
 

	
 
impl PollingThreadDestroyer {
 
    fn new(queue: QueueDynProducer<PollCmd>) -> Self {
 
        return Self{ queue: Some(queue) };
 
    }
 

	
 
    pub(crate) fn initiate_destruction(&mut self) {
 
        self.queue.take().unwrap().push(PollCmd::Shutdown);
 
    }
 
}
 

	
 
impl Drop for PollingThreadDestroyer {
 
    fn drop(&mut self) {
 
        debug_assert!(self.queue.is_none());
 
    }
 
}
 

	
 
pub(crate) struct PollTicket(FileDescriptor, u64);
 

	
 
/// A structure that allows the owner to register components at the polling
 
/// thread. Because of assumptions in the communication queue all of these
 
/// clients should be dropped before stopping the polling thread.
 
pub(crate) struct PollingClient {
 
    poller: Arc<Poller>,
 
    generation_counter: Arc<AtomicU32>,
 
    queue: QueueDynProducer<PollCmd>,
 
}
 

	
 
impl PollingClient {
 
    fn register<F: AsFileDescriptor>(&self, entity: F, handle: CompHandle, read: bool, write: bool) -> Result<PollTicket, RtError> {
 
        let generation = self.generation_counter.fetch_add(1, Ordering::Relaxed);
 
        let user_data = user_data_for_component(handle.id().0, generation);
 
        self.queue.push(PollCmd::Register(handle, user_data));
 

	
 
        let file_descriptor = entity.as_file_descriptor();
 
        self.poller.register(file_descriptor, user_data, read, write)
 
            .map_err(|e| rt_error!("failed to register for polling, because: {}", e))?;
 

	
 
        return Ok(PollTicket(file_descriptor, user_data.0));
 
    }
 

	
 
    fn unregister(&self, ticket: PollTicket) -> Result<(), RtError> {
 
        let file_descriptor = ticket.0;
 
        let user_data = UserData(ticket.1);
 
        self.queue.push(PollCmd::Unregister(file_descriptor, user_data));
 
        self.poller.unregister(file_descriptor)
 
            .map_err(|e| rt_error!("failed to unregister polling, because: {}", e))?;
 

	
 
        return Ok(());
 
    }
 
}
 

	
 
#[inline]
 
fn user_data_for_component(component_id: u32, generation: u32) -> UserData {
 
    return UserData((generation as u64) << 32 | (component_id as u64));
 
}
 
\ No newline at end of file
src/runtime2/runtime.rs
Show inline comments
 
use std::sync::{Arc, Mutex, Condvar};
 
use std::sync::atomic::{AtomicU32, AtomicBool, Ordering};
 
use std::thread;
 
use std::collections::VecDeque;
 

	
 
use crate::protocol::*;
 
use crate::runtime2::poll::{PollingThreadBuilder, PollingThreadDestroyer};
 
use crate::runtime2::RtError;
 

	
 
use super::communication::Message;
 
use super::component::{Component, wake_up_if_sleeping, CompPDL, CompCtx};
 
@@ -25,7 +28,7 @@ impl CompKey {
 
    }
 
}
 

	
 
/// Generational ID of a component
 
/// Generational ID of a component.
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub struct CompId(pub u32);
 

	
 
@@ -80,7 +83,7 @@ pub(crate) struct CompPublic {
 
/// code to make sure this actually happens.
 
pub(crate) struct CompHandle {
 
    target: *const CompPublic,
 
    id: CompId, // TODO: @Remove after debugging
 
    id: CompId,
 
    #[cfg(debug_assertions)] decremented: bool,
 
}
 

	
 
@@ -95,14 +98,17 @@ impl CompHandle {
 
        return handle;
 
    }
 

	
 
    pub(crate) fn send_message(&self, sched_ctx: &SchedulerCtx, message: Message, try_wake_up: bool) {
 
        sched_ctx.log(&format!("Sending message to [c:{:03}, wakeup:{}]: {:?}", self.id.0, try_wake_up, message));
 
    pub(crate) fn send_message(&self, runtime: &RuntimeInner, message: Message, try_wake_up: bool) {
 
        self.inbox.push(message);
 
        if try_wake_up {
 
            wake_up_if_sleeping(sched_ctx, self.id, self);
 
            wake_up_if_sleeping(runtime, self.id, self);
 
        }
 
    }
 

	
 
    pub(crate) fn id(&self) -> CompId {
 
        return self.id;
 
    }
 

	
 
    fn increment_users(&self) {
 
        let old_count = self.num_handles.fetch_add(1, Ordering::AcqRel);
 
        debug_assert!(old_count > 0); // because we should never be able to retrieve a handle when the component is (being) destroyed
 
@@ -155,13 +161,17 @@ impl Drop for CompHandle {
 

	
 
pub struct Runtime {
 
    pub(crate) inner: Arc<RuntimeInner>,
 
    threads: Vec<std::thread::JoinHandle<()>>,
 
    scheduler_threads: Vec<thread::JoinHandle<()>>,
 
    polling_destroyer: PollingThreadDestroyer,
 
    polling_thread: Option<thread::JoinHandle<()>>,
 
}
 

	
 
impl Runtime {
 
    // TODO: debug_logging should be removed at some point
 
    pub fn new(num_threads: u32, debug_logging: bool, protocol_description: ProtocolDescription) -> Runtime {
 
        assert!(num_threads > 0, "need a thread to perform work");
 
    pub fn new(num_threads: u32, debug_logging: bool, protocol_description: ProtocolDescription) -> Result<Runtime, RtError> {
 
        if num_threads == 0 {
 
            return Err(rt_error!("need at least one thread to create the runtime"));
 
        }
 
        let runtime_inner = Arc::new(RuntimeInner {
 
            protocol: protocol_description,
 
            components: ComponentStore::new(128),
 
@@ -169,21 +179,36 @@ impl Runtime {
 
            work_condvar: Condvar::new(),
 
            active_elements: AtomicU32::new(1),
 
        });
 
        let mut runtime = Runtime {
 
            inner: runtime_inner,
 
            threads: Vec::with_capacity(num_threads as usize),
 
        };
 
        let polling_builder = rt_error_try!(
 
            PollingThreadBuilder::new(runtime_inner.clone(), debug_logging),
 
            "failed to build polling thread"
 
        );
 

	
 
        let mut scheduler_threads = Vec::with_capacity(num_threads as usize);
 

	
 
        for thread_index in 0..num_threads {
 
            let mut scheduler = Scheduler::new(runtime.inner.clone(), thread_index, debug_logging);
 
            let thread_handle = std::thread::spawn(move || {
 
            let mut scheduler = Scheduler::new(
 
                runtime_inner.clone(), polling_builder.client(),
 
                thread_index, debug_logging
 
            );
 
            let thread_handle = thread::spawn(move || {
 
                scheduler.run();
 
            });
 

	
 
            runtime.threads.push(thread_handle);
 
            scheduler_threads.push(thread_handle);
 
        }
 

	
 
        return runtime;
 
        let (mut poller, polling_destroyer) = polling_builder.into_thread();
 
        let polling_thread = thread::spawn(move || {
 
            poller.run();
 
        });
 

	
 
        return Ok(Runtime{
 
            inner: runtime_inner,
 
            scheduler_threads,
 
            polling_destroyer,
 
            polling_thread: Some(polling_thread),
 
        });
 
    }
 

	
 
    pub fn create_component(&self, module_name: &[u8], routine_name: &[u8]) -> Result<(), ComponentCreationError> {
 
@@ -205,9 +230,12 @@ impl Runtime {
 
impl Drop for Runtime {
 
    fn drop(&mut self) {
 
        self.inner.decrement_active_components();
 
        for handle in self.threads.drain(..) {
 
        for handle in self.scheduler_threads.drain(..) {
 
            handle.join().expect("join scheduler thread");
 
        }
 

	
 
        self.polling_destroyer.initiate_destruction();
 
        self.polling_thread.take().unwrap().join().expect("join polling thread");
 
    }
 
}
 

	
src/runtime2/scheduler.rs
Show inline comments
 
use std::sync::Arc;
 
use std::sync::atomic::Ordering;
 
use crate::runtime2::poll::PollingClient;
 

	
 
use super::component::*;
 
use super::runtime::*;
 
@@ -7,21 +8,24 @@ use super::runtime::*;
 
/// Data associated with a scheduler thread
 
pub(crate) struct Scheduler {
 
    runtime: Arc<RuntimeInner>,
 
    polling: PollingClient,
 
    scheduler_id: u32,
 
    debug_logging: bool,
 
}
 

	
 
pub(crate) struct SchedulerCtx<'a> {
 
    pub runtime: &'a RuntimeInner,
 
    pub polling: &'a PollingClient,
 
    pub id: u32,
 
    pub comp: u32,
 
    pub logging_enabled: bool,
 
}
 

	
 
impl<'a> SchedulerCtx<'a> {
 
    pub fn new(runtime: &'a RuntimeInner, id: u32, logging_enabled: bool) -> Self {
 
    pub fn new(runtime: &'a RuntimeInner, polling: &'a PollingClient, id: u32, logging_enabled: bool) -> Self {
 
        return Self {
 
            runtime,
 
            polling,
 
            id,
 
            comp: 0,
 
            logging_enabled,
 
@@ -38,12 +42,12 @@ impl<'a> SchedulerCtx<'a> {
 
impl Scheduler {
 
    // public interface to thread
 

	
 
    pub fn new(runtime: Arc<RuntimeInner>, scheduler_id: u32, debug_logging: bool) -> Self {
 
        return Scheduler{ runtime, scheduler_id, debug_logging }
 
    pub fn new(runtime: Arc<RuntimeInner>, polling: PollingClient, scheduler_id: u32, debug_logging: bool) -> Self {
 
        return Scheduler{ runtime, polling, scheduler_id, debug_logging }
 
    }
 

	
 
    pub fn run(&mut self) {
 
        let mut scheduler_ctx = SchedulerCtx::new(&*self.runtime, self.scheduler_id, self.debug_logging);
 
        let mut scheduler_ctx = SchedulerCtx::new(&*self.runtime, &self.polling, self.scheduler_id, self.debug_logging);
 

	
 
        'run_loop: loop {
 
            // Wait until we have something to do (or need to quit)
src/runtime2/stdlib/internet.rs
Show inline comments
 
@@ -206,20 +206,6 @@ impl AsRawFileDescriptor for SocketTcpClient {
 
    }
 
}
 

	
 
impl<T: AsRawFileDescriptor> event::Source for T {
 
    fn register(&mut self, registry: &Registry, token: Token, interests: Interest) -> std::io::Result<()> {
 
        registry.selector().register()
 
    }
 

	
 
    fn reregister(&mut self, registry: &Registry, token: Token, interests: Interest) -> std::io::Result<()> {
 
        todo!()
 
    }
 

	
 
    fn deregister(&mut self, registry: &Registry) -> std::io::Result<()> {
 
        todo!()
 
    }
 
}
 

	
 
/// Performs the `socket` and `bind` calls.
 
fn create_and_bind_socket(socket_type: libc::c_int, protocol: libc::c_int, ip: IpAddr, port: u16) -> Result<libc::c_int, SocketError> {
 
    let family = socket_family_from_ip(ip);
src/runtime2/store/queue_mpsc.rs
Show inline comments
 
@@ -149,11 +149,13 @@ impl<T> QueueDynProducer<T> {
 
        unsafe {
 
            // If you only knew the power of the dark side! Obi-Wan never told
 
            // you what happened to your father!
 
            let queue: *const _ = std::mem::transmute(consumer.inner.as_ref());
 
            let queue = consumer.inner.as_ref() as *const _;
 
            return Self{ queue };
 
        }
 
    }
 

	
 

	
 

	
 
    pub fn push(&self, value: T) {
 
        let queue = unsafe{ &*self.queue };
 

	
src/runtime2/tests/mod.rs
Show inline comments
 
@@ -24,7 +24,7 @@ fn test_component_creation() {
 
        auto b = 5 + a;
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(1, true, pd);
 
    let rt = Runtime::new(1, true, pd).unwrap();
 

	
 
    for _i in 0..20 {
 
        create_component(&rt, "", "nothing_at_all", no_args());
 
@@ -81,7 +81,7 @@ fn test_component_communication() {
 
        new sender(o_mrmm, 5, 5);
 
        new receiver(i_mrmm, 5, 5);
 
    }").expect("compilation");
 
    let rt = Runtime::new(3, true, pd);
 
    let rt = Runtime::new(3, true, pd).unwrap();
 
    create_component(&rt, "", "constructor", no_args());
 
}
 

	
 
@@ -130,7 +130,7 @@ fn test_intermediate_messenger() {
 
        new constructor_template<s64>();
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(3, true, pd);
 
    let rt = Runtime::new(3, true, pd).unwrap();
 
    create_component(&rt, "", "constructor", no_args());
 
}
 

	
 
@@ -172,7 +172,7 @@ fn test_simple_select() {
 
    }
 

	
 
    composite constructor() {
 
        auto num_sends = 15;
 
        auto num_sends = 1;
 
        channel tx_a -> rx_a;
 
        channel tx_b -> rx_b;
 
        new sender(tx_a, num_sends);
 
@@ -180,7 +180,7 @@ fn test_simple_select() {
 
        new sender(tx_b, num_sends);
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(3, false, pd);
 
    let rt = Runtime::new(3, true, pd).unwrap();
 
    create_component(&rt, "", "constructor", no_args());
 
}
 

	
 
@@ -202,7 +202,7 @@ fn test_unguarded_select() {
 
        }
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(3, false, pd);
 
    let rt = Runtime::new(3, false, pd).unwrap();
 
    create_component(&rt, "", "constructor_outside_select", no_args());
 
    create_component(&rt, "", "constructor_inside_select", no_args());
 
}
 
@@ -218,7 +218,7 @@ fn test_empty_select() {
 
        }
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(3, false, pd);
 
    let rt = Runtime::new(3, false, pd).unwrap();
 
    create_component(&rt, "", "constructor", no_args());
 
}
 

	
 
@@ -244,6 +244,6 @@ fn test_random_u32_temporary_thingo() {
 
        new random_taker(rx, num_values);
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(1, true, pd);
 
    let rt = Runtime::new(1, true, pd).unwrap();
 
    create_component(&rt, "", "constructor", no_args());
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)