Changeset - a3a2b16408b1
[Not reviewed]
src/protocol/parser/token_parsing.rs
Show inline comments
 
@@ -546,14 +546,12 @@ fn is_reserved_statement_keyword(text: &[u8]) -> bool {
 
}
 

	
 
fn is_reserved_expression_keyword(text: &[u8]) -> bool {
 
    match text {
 
        KW_LET | KW_CAST |
 
        KW_LIT_TRUE | KW_LIT_FALSE | KW_LIT_NULL |
 
        // TODO: Remove this once global namespace errors work @nocommit
 
        // KW_FUNC_GET | KW_FUNC_PUT | KW_FUNC_FIRES | KW_FUNC_CREATE | KW_FUNC_ASSERT | KW_FUNC_LENGTH | KW_FUNC_PRINT => true,
 
        _ => false,
 
    }
 
}
 

	
 
fn is_reserved_type_keyword(text: &[u8]) -> bool {
 
    match text {
src/runtime2/communication.rs
Show inline comments
 
@@ -194,32 +194,36 @@ pub struct MessageSyncHeader {
 

	
 
#[derive(Debug)]
 
pub enum Message {
 
    Data(DataMessage),
 
    Sync(SyncMessage),
 
    Control(ControlMessage),
 
    Poll,
 
}
 

	
 
impl Message {
 
    pub(crate) fn target_port(&self) -> Option<PortId> {
 
        match self {
 
            Message::Data(v) =>
 
                return Some(v.data_header.target_port),
 
            Message::Control(v) =>
 
                return v.target_port_id,
 
            Message::Sync(_) =>
 
                return None,
 
            Message::Poll =>
 
                return None,
 
        }
 
    }
 

	
 
    pub(crate) fn modify_target_port(&mut self, port_id: PortId) {
 
        match self {
 
            Message::Data(v) =>
 
                v.data_header.target_port = port_id,
 
            Message::Control(v) =>
 
                v.target_port_id = Some(port_id),
 
            Message::Sync(_) => unreachable!(), // should never be called for this message type
 
            Message::Poll => unreachable!(),
 
        }
 
    }
 
}
 

	
 

	
src/runtime2/component/component.rs
Show inline comments
 
@@ -240,13 +240,13 @@ pub(crate) fn default_handle_start_exit(
 
        port.state = PortState::Closed;
 

	
 
        // Notify peer of closing
 
        let port_handle = comp_ctx.get_port_handle(port_id);
 
        let (peer, message) = control.initiate_port_closing(port_handle, comp_ctx);
 
        let peer_info = comp_ctx.get_peer(peer);
 
        peer_info.handle.send_message(sched_ctx, Message::Control(message), true);
 
        peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(message), true);
 
    }
 

	
 
    return CompScheduling::Immediate; // to check if we can shut down immediately
 
}
 

	
 
/// Handles a component waiting until all peers are notified that it is quitting
 
@@ -286,13 +286,13 @@ fn default_handle_ack(
 
    loop {
 
        let (action, new_to_ack) = control.handle_ack(to_ack, sched_ctx, comp_ctx);
 
        match action {
 
            AckAction::SendMessage(target_comp, message) => {
 
                // FIX @NoDirectHandle
 
                let mut handle = sched_ctx.runtime.get_component_public(target_comp);
 
                handle.send_message(sched_ctx, Message::Control(message), true);
 
                handle.send_message(&sched_ctx.runtime, Message::Control(message), true);
 
                let _should_remove = handle.decrement_users();
 
                debug_assert!(_should_remove.is_none());
 
            },
 
            AckAction::ScheduleComponent(to_schedule) => {
 
                // FIX @NoDirectHandle
 
                let mut handle = sched_ctx.runtime.get_component_public(to_schedule);
 
@@ -318,13 +318,13 @@ fn default_handle_ack(
 
/// Little helper for sending the most common kind of `Ack`
 
fn default_send_ack(
 
    causer_of_ack_id: ControlId, peer_handle: LocalPeerHandle,
 
    sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx
 
) {
 
    let peer_info = comp_ctx.get_peer(peer_handle);
 
    peer_info.handle.send_message(sched_ctx, Message::Control(ControlMessage{
 
    peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(ControlMessage{
 
        id: causer_of_ack_id,
 
        sender_comp_id: comp_ctx.id,
 
        target_port_id: None,
 
        content: ControlMessageContent::Ack
 
    }), true);
 
}
 
@@ -347,13 +347,13 @@ fn default_handle_unblock_put(
 
        let to_send = exec_state.mode_value.take();
 
        let to_send = consensus.annotate_data_message(comp_ctx, port_info, to_send);
 

	
 
        // Retrieve peer to send the message
 
        let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 
        peer_info.handle.send_message(sched_ctx, Message::Data(to_send), true);
 
        peer_info.handle.send_message(&sched_ctx.runtime, Message::Data(to_send), true);
 

	
 
        exec_state.mode = CompMode::Sync; // because we're blocked on a `put`, we must've started in the sync state.
 
        exec_state.mode_port = PortId::new_invalid();
 
    }
 
}
 

	
src/runtime2/component/component_ip.rs
Show inline comments
 
@@ -42,13 +42,14 @@ impl Component for ComponentRandomU32 {
 
            },
 
            Message::Control(message) => {
 
                component::default_handle_control_message(
 
                    &mut self.exec_state, &mut self.control, &mut self.consensus,
 
                    message, sched_ctx, comp_ctx
 
                );
 
            }
 
            },
 
            Message::Poll => unreachable!(),
 
        }
 
    }
 

	
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result<CompScheduling, EvalError> {
 
        sched_ctx.log(&format!("Running component ComponentRandomU32 (mode: {:?})", self.exec_state.mode));
 

	
 
@@ -98,13 +99,13 @@ impl Component for ComponentRandomU32 {
 

	
 
                        CompScheduling::Sleep
 
                    } else {
 
                        let message = self.consensus.annotate_data_message(comp_ctx, port_info, value_group);
 
                        let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 
                        let peer_info = comp_ctx.get_peer(peer_handle);
 
                        peer_info.handle.send_message(sched_ctx, Message::Data(message), true);
 
                        peer_info.handle.send_message(&sched_ctx.runtime, Message::Data(message), true);
 

	
 
                        // Remain in sync mode, but after `did_perform_send` was
 
                        // set to true.
 
                        CompScheduling::Immediate
 
                    };
 

	
src/runtime2/component/component_pdl.rs
Show inline comments
 
@@ -233,13 +233,13 @@ impl Component for CompPDL {
 
    }
 

	
 
    fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, mut message: Message) {
 
        sched_ctx.log(&format!("handling message: {:#?}", message));
 
        if let Some(new_target) = self.control.should_reroute(&mut message) {
 
            let mut target = sched_ctx.runtime.get_component_public(new_target);
 
            target.send_message(sched_ctx, message, false); // not waking up: we schedule once we've received all PortPeerChanged Acks
 
            target.send_message(&sched_ctx.runtime, message, false); // not waking up: we schedule once we've received all PortPeerChanged Acks
 
            let _should_remove = target.decrement_users();
 
            debug_assert!(_should_remove.is_none());
 
            return;
 
        }
 

	
 
        match message {
 
@@ -251,12 +251,15 @@ impl Component for CompPDL {
 
                    &mut self.exec_state, &mut self.control, &mut self.consensus,
 
                    message, sched_ctx, comp_ctx
 
                );
 
            },
 
            Message::Sync(message) => {
 
                self.handle_incoming_sync_message(sched_ctx, comp_ctx, message);
 
            },
 
            Message::Poll => {
 
                unreachable!(); // because we never register at the polling thread
 
            }
 
        }
 
    }
 

	
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result<CompScheduling, EvalError> {
 
        use EvalContinuation as EC;
 
@@ -499,26 +502,26 @@ impl CompPDL {
 
            port.state = PortState::Closed;
 

	
 
            // Notify peer of closing
 
            let port_handle = comp_ctx.get_port_handle(port_id);
 
            let (peer, message) = self.control.initiate_port_closing(port_handle, comp_ctx);
 
            let peer_info = comp_ctx.get_peer(peer);
 
            peer_info.handle.send_message(sched_ctx, Message::Control(message), true);
 
            peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(message), true);
 
        }
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Handling messages
 
    // -------------------------------------------------------------------------
 

	
 
    fn send_data_message_and_wake_up(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, source_port_handle: LocalPortHandle, value: ValueGroup) {
 
        let port_info = comp_ctx.get_port(source_port_handle);
 
        let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 
        let annotated_message = self.consensus.annotate_data_message(comp_ctx, port_info, value);
 
        peer_info.handle.send_message(sched_ctx, Message::Data(annotated_message), true);
 
        peer_info.handle.send_message(&sched_ctx.runtime, Message::Data(annotated_message), true);
 
    }
 

	
 
    /// Handles a message that came in through the public inbox. This function
 
    /// will handle putting it in the correct place, and potentially blocking
 
    /// the port in case too many messages are being received.
 
    fn handle_incoming_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: DataMessage) {
 
@@ -560,13 +563,13 @@ impl CompPDL {
 
        if port_info.state == PortState::Open {
 
            comp_ctx.set_port_state(port_handle, PortState::BlockedDueToFullBuffers);
 
            let (peer_handle, message) =
 
                self.control.initiate_port_blocking(comp_ctx, port_handle);
 

	
 
            let peer = comp_ctx.get_peer(peer_handle);
 
            peer.handle.send_message(sched_ctx, Message::Control(message), true);
 
            peer.handle.send_message(&sched_ctx.runtime, Message::Control(message), true);
 
        }
 

	
 
        // But we still need to remember the message, so:
 
        self.inbox_backup.push(message);
 
    }
 

	
 
@@ -595,13 +598,13 @@ impl CompPDL {
 
        // Did not have any more messages. So if we were blocked, then we need
 
        // to send the "unblock" message.
 
        if port_info.state == PortState::BlockedDueToFullBuffers {
 
            comp_ctx.set_port_state(port_handle, PortState::Open);
 
            let (peer_handle, message) = self.control.cancel_port_blocking(comp_ctx, port_handle);
 
            let peer_info = comp_ctx.get_peer(peer_handle);
 
            peer_info.handle.send_message(sched_ctx, Message::Control(message), true);
 
            peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(message), true);
 
        }
 
    }
 

	
 
    fn handle_incoming_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) {
 
        let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message);
 
        self.handle_sync_decision(sched_ctx, comp_ctx, decision);
 
@@ -622,24 +625,25 @@ impl CompPDL {
 
            created_handle: LocalPortHandle,
 
            created_id: PortId,
 
        }
 
        let mut opened_port_id_pairs = Vec::new();
 
        let mut closed_port_id_pairs = Vec::new();
 

	
 
        // TODO: @Nocommit
 
        let other_proc = &sched_ctx.runtime.protocol.heap[definition_id];
 
        let self_proc = &sched_ctx.runtime.protocol.heap[self.prompt.frames[0].definition];
 

	
 
        let reservation = sched_ctx.runtime.start_create_pdl_component();
 
        let mut created_ctx = CompCtx::new(&reservation);
 

	
 
        println!(
 
            "DEBUG: Comp '{}' (ID {:?}) is creating comp '{}' (ID {:?})",
 
            self_proc.identifier.value.as_str(), creator_ctx.id,
 
            other_proc.identifier.value.as_str(), reservation.id()
 
        );
 
        let other_proc = &sched_ctx.runtime.protocol.heap[definition_id];
 
        let self_proc = &sched_ctx.runtime.protocol.heap[self.prompt.frames[0].definition];
 

	
 
        dbg_code!({
 
            sched_ctx.log(&format!(
 
                "DEBUG: Comp '{}' (ID {:?}) is creating comp '{}' (ID {:?})",
 
                self_proc.identifier.value.as_str(), creator_ctx.id,
 
                other_proc.identifier.value.as_str(), reservation.id()
 
            ));
 
        });
 

	
 
        // Take all the ports ID that are in the `args` (and currently belong to
 
        // the creator component) and translate them into new IDs that are
 
        // associated with the component we're about to create
 
        let mut arg_iter = ValueGroupPortIter::new(&mut arguments);
 
        while let Some(port_reference) = arg_iter.next() {
 
@@ -795,13 +799,13 @@ impl CompPDL {
 
                        creator_ctx.id, port_info.peer_port_id, port_info.peer_comp_id,
 
                        pair.creator_id, pair.created_id, created_ctx.id,
 
                        schedule_entry_id
 
                    );
 
                    let peer_handle = created_ctx.get_peer_handle(port_info.peer_comp_id);
 
                    let peer_info = created_ctx.get_peer(peer_handle);
 
                    peer_info.handle.send_message(sched_ctx, message, true);
 
                    peer_info.handle.send_message(&sched_ctx.runtime, message, true);
 
                }
 
            }
 
        } else {
 
            // Peer can be scheduled immediately
 
            sched_ctx.runtime.enqueue_work(created_key);
 
        }
src/runtime2/component/consensus.rs
Show inline comments
 
@@ -501,25 +501,25 @@ impl Consensus {
 
                }
 

	
 
                let message = SyncMessage{
 
                    sync_header: self.create_sync_header(comp_ctx),
 
                    content: SyncMessageContent::NotificationOfLeader,
 
                };
 
                peer.handle.send_message(sched_ctx, Message::Sync(message), true);
 
                peer.handle.send_message(&sched_ctx.runtime, Message::Sync(message), true);
 
            }
 

	
 
            self.forward_partial_solution(sched_ctx, comp_ctx);
 
        } else if header.highest_id.0 < self.highest_id.0 {
 
            // Sender has a lower ID, so notify it of our higher one
 
            let message = SyncMessage{
 
                sync_header: self.create_sync_header(comp_ctx),
 
                content: SyncMessageContent::NotificationOfLeader,
 
            };
 
            let peer_handle = comp_ctx.get_peer_handle(header.sending_id);
 
            let peer_info = comp_ctx.get_peer(peer_handle);
 
            peer_info.handle.send_message(sched_ctx, Message::Sync(message), true);
 
            peer_info.handle.send_message(&sched_ctx.runtime, Message::Sync(message), true);
 
        } // else: exactly equal
 
    }
 

	
 
    fn set_annotation(&mut self, source_comp_id: CompId, data_header: &MessageDataHeader) {
 
        for annotation in self.ports.iter_mut() {
 
            if annotation.self_port_id == data_header.target_port {
 
@@ -619,22 +619,22 @@ impl Consensus {
 
        for peer in peers {
 
            let mut handle = sched_ctx.runtime.get_component_public(peer);
 
            let message = Message::Sync(SyncMessage{
 
                sync_header: self.create_sync_header(comp_ctx),
 
                content: if is_success { SyncMessageContent::GlobalSolution } else { SyncMessageContent::GlobalFailure },
 
            });
 
            handle.send_message(sched_ctx, message, true);
 
            handle.send_message(&sched_ctx.runtime, message, true);
 
            let _should_remove = handle.decrement_users();
 
            debug_assert!(_should_remove.is_none());
 
        }
 
    }
 

	
 
    fn send_to_leader(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, message: Message) {
 
        debug_assert_ne!(self.highest_id, comp_ctx.id); // we're not the leader
 
        let mut leader_info = sched_ctx.runtime.get_component_public(self.highest_id);
 
        leader_info.send_message(sched_ctx, message, true);
 
        leader_info.send_message(&sched_ctx.runtime, message, true);
 
        let should_remove = leader_info.decrement_users();
 
        if let Some(key) = should_remove {
 
            sched_ctx.runtime.destroy_component(key);
 
        }
 
    }
 

	
src/runtime2/component/mod.rs
Show inline comments
 
@@ -13,18 +13,18 @@ pub(crate) use control_layer::{ControlId};
 
use super::scheduler::*;
 
use super::runtime::*;
 

	
 
/// If the component is sleeping, then that flag will be atomically set to
 
/// false. If we're the ones that made that happen then we add it to the work
 
/// queue.
 
pub(crate) fn wake_up_if_sleeping(sched_ctx: &SchedulerCtx, comp_id: CompId, handle: &CompHandle) {
 
pub(crate) fn wake_up_if_sleeping(runtime: &RuntimeInner, comp_id: CompId, handle: &CompHandle) {
 
    use std::sync::atomic::Ordering;
 

	
 
    let should_wake_up = handle.sleeping
 
        .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
 
        .is_ok();
 

	
 
    if should_wake_up {
 
        let comp_key = unsafe{ comp_id.upgrade() };
 
        sched_ctx.runtime.enqueue_work(comp_key);
 
        runtime.enqueue_work(comp_key);
 
    }
 
}
 
\ No newline at end of file
src/runtime2/error.rs
Show inline comments
 
new file 100644
 
use std::fmt::{Write, Debug, Display, Formatter as FmtFormatter, Result as FmtResult};
 

	
 
/// Represents an unrecoverable runtime error that is reported to the user (for
 
/// debugging purposes). Basically a human-readable message with its source
 
/// location. The error is chainable.
 
pub struct RtError {
 
    file: &'static str,
 
    line: u32,
 
    message: String,
 
    cause: Option<Box<RtError>>,
 
}
 

	
 
impl RtError {
 
    pub(crate) fn new(file: &'static str, line: u32, message: String) -> RtError {
 
        return RtError {
 
            file, line, message, cause: None,
 
        }
 
    }
 

	
 
    pub(crate) fn wrap(self, file: &'static str, line: u32, message: String) -> RtError {
 
        return RtError {
 
            file, line, message, cause: Some(Box::new(self))
 
        }
 
    }
 
}
 

	
 
impl Display for RtError {
 
    fn fmt(&self, f: &mut FmtFormatter<'_>) -> FmtResult {
 
        let mut error = self;
 
        loop {
 
            write!(f, "[{}:{}] {}", self.file, self.line, self.message).unwrap();
 
            match &error.cause {
 
                Some(cause) => {
 
                    writeln!(f, " ...");
 
                    error = cause.as_ref()
 
                },
 
                None => {
 
                    writeln!(f).unwrap();
 
                },
 
            }
 
        }
 
    }
 
}
 

	
 
impl Debug for RtError {
 
    fn fmt(&self, f: &mut FmtFormatter<'_>) -> FmtResult {
 
        return (self as &dyn Display).fmt(f);
 
    }
 
}
 

	
 
macro_rules! rt_error {
 
    ($fmt:expr) => {
 
        $crate::runtime2::error::RtError::new(file!(), line!(), $fmt.to_string())
 
    };
 
    ($fmt:expr, $($args:expr),*) => {
 
        $crate::runtime2::error::RtError::new(file!(), line!(), format!($fmt, $($args),*))
 
    };
 
}
 

	
 
macro_rules! rt_error_try {
 
    ($prev:expr, $($fmt_and_args:expr),*) => {
 
        {
 
            let result = $prev;
 
            match result {
 
                Ok(result) => result,
 
                Err(result) => return Err(result.wrap(file!(), line!(), format!($($fmt_and_args),*))),
 
            }
 
        }
 
    }
 
}
 
\ No newline at end of file
src/runtime2/mod.rs
Show inline comments
 
#[macro_use] mod error;
 
mod store;
 
mod runtime;
 
mod component;
 
mod communication;
 
mod scheduler;
 
mod poll;
 
mod stdlib;
 
#[cfg(test)] mod tests;
 

	
 
pub use runtime::Runtime;
 
pub(crate) use error::RtError;
 
pub(crate) use scheduler::SchedulerCtx;
 
pub(crate) use communication::{
 
    PortId, PortKind, PortState,
 
    Message, ControlMessage, SyncMessage, DataMessage,
 
    SyncRoundDecision
 
};
 
\ No newline at end of file
src/runtime2/poll/mod.rs
Show inline comments
 
use libc::{self, c_int};
 

	
 
use std::{io, ptr, time};
 
use std::{io, ptr, time, thread};
 
use std::sync::Arc;
 
use std::sync::atomic::{AtomicU32, Ordering};
 
use std::collections::HashMap;
 

	
 
use crate::runtime2::RtError;
 
use crate::runtime2::runtime::{CompHandle, RuntimeInner};
 
use crate::runtime2::store::queue_mpsc::*;
 

	
 

	
 
pub(crate) type FileDescriptor = c_int;
 

	
 
pub(crate) trait AsFileDescriptor {
 
    fn as_file_descriptor(&self) -> FileDescriptor;
 

	
 
}
 
pub(crate) struct UserData(u64);
 

	
 
#[inline]
 
pub(crate) fn register_polling<F: AsFileDescriptor>(
 
    poller: &Poller, entity: F, user: UserData, read: bool, write: bool
 
) -> io::Result<()> {
 
    let file_descriptor = entity.as_file_descriptor();
 
    return poller.register(file_descriptor, user, read, write);
 
}
 
#[derive(Copy, Clone)]
 
pub(crate) struct UserData(u64);
 

	
 
#[inline]
 
pub(crate) fn unregister_polling<F: AsFileDescriptor>(
 
    poller: &Poller, entity: F
 
) -> io::Result<()> {
 
    let file_descriptor = entity.as_file_descriptor();
 
    return poller.unregister(file_descriptor);
 
}
 
// -----------------------------------------------------------------------------
 
// Poller
 
// -----------------------------------------------------------------------------
 

	
 
#[cfg(unix)]
 
pub(crate) struct Poller {
 
    handle: c_int,
 
    events: Vec<libc::epoll_event>
 
}
 

	
 
// All of this is gleaned from the `mio` crate.
 
#[cfg(unix)]
 
impl Poller {
 
    pub fn new(event_capacity: usize) -> io::Result<Self> {
 
        assert!(event_capacity < i32::MAX as usize); // because of argument to `epoll_wait`.
 
    pub fn new() -> io::Result<Self> {
 
        let handle = syscall_result(unsafe{ libc::epoll_create1(libc::EPOLL_CLOEXEC) })?;
 

	
 
        return Ok(Self{
 
            handle,
 
            events: Vec::with_capacity(event_capacity),
 
        })
 
    }
 

	
 
    fn register(&self, fd: FileDescriptor, user: UserData, read: bool, write: bool) -> io::Result<()> {
 
        let mut event = libc::epoll_event{
 
            events: Self::events_from_rw_flags(read, write),
 
@@ -62,31 +57,38 @@ impl Poller {
 
            libc::epoll_ctl(self.handle, libc::EPOLL_CTL_DEL, fd, ptr::null_mut())
 
        })?;
 

	
 
        return Ok(());
 
    }
 

	
 
    pub fn wait(&mut self, timeout: time::Duration) -> io::Result<()> {
 
    /// Performs `epoll_wait`, waiting for the provided timeout or until events
 
    /// are reported. They are stored in the `events` variable (up to
 
    /// `events.cap()` are reported, so ensure it is preallocated).
 
    pub fn wait(&self, events: &mut Vec<libc::epoll_event>, timeout: time::Duration) -> io::Result<()> {
 
        // See `mio` for the reason. Works around a linux bug
 
        #[cfg(target_pointer_width = "32")]
 
        const MAX_TIMEOUT: u128 = 1789569;
 
        #[cfg(not(target_pointer_width = "32"))]
 
        const MAX_TIMEOUT: u128 = c_int::MAX as u128;
 

	
 
        let mut timeout_millis = timeout.as_millis();
 
        if timeout_millis > MAX_TIMEOUT {
 
            timeout_millis = -1; // effectively infinite
 
        }
 
        let timeout_millis = timeout.as_millis();
 
        let timeout_millis = if timeout_millis > MAX_TIMEOUT {
 
            -1 // effectively infinite
 
        } else {
 
            timeout_millis as c_int
 
        };
 

	
 
        debug_assert!(events.is_empty());
 
        debug_assert!(events.capacity() > 0 && events.capacity() < i32::MAX as usize);
 
        let num_events = syscall_result(unsafe{
 
            libc::epoll_wait(self.handle, self.events.as_mut(), self.events.capacity() as i32, timeout_millis)
 
            libc::epoll_wait(self.handle, events.as_mut_ptr(), events.capacity() as i32, timeout_millis)
 
        })?;
 

	
 
        unsafe{
 
            debug_assert!(num_events >= 0);
 
            self.events.set_len(num_events as usize);
 
            events.set_len(num_events as usize);
 
        }
 

	
 
        return Ok(());
 
    }
 

	
 
    fn events_from_rw_flags(read: bool, write: bool) -> u32 {
 
@@ -99,19 +101,225 @@ impl Poller {
 
        }
 

	
 
        return events as u32;
 
    }
 
}
 

	
 
#[cfg(unix)]
 
impl Drop for Poller {
 
    fn drop(&mut self) {
 
        unsafe{ libc::close(self.handle); }
 
    }
 
}
 

	
 
#[inline]
 
fn syscall_result(result: c_int) -> io::Result<c_int> {
 
    if result < 0 {
 
        return Err(io::Error::last_os_error());
 
    } else {
 
        return Ok(result);
 
    }
 
}
 

	
 
#[cfg(not(unix))]
 
struct Poller {
 

	
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Polling Thread
 
// -----------------------------------------------------------------------------
 

	
 
enum PollCmd {
 
    Register(CompHandle, UserData),
 
    Unregister(FileDescriptor, UserData),
 
    Shutdown,
 
}
 

	
 
/// Represents the data needed to build interfaces to the polling thread (which
 
/// should happen first) and to create the polling thread itself.
 
pub(crate) struct PollingThreadBuilder {
 
    poller: Arc<Poller>,
 
    generation_counter: Arc<AtomicU32>,
 
    queue: QueueDynMpsc<PollCmd>,
 
    runtime: Arc<RuntimeInner>,
 
    logging_enabled: bool,
 
}
 

	
 
impl PollingThreadBuilder {
 
    pub(crate) fn new(runtime: Arc<RuntimeInner>, logging_enabled: bool) -> Result<PollingThreadBuilder, RtError> {
 
        let poller = Poller::new()
 
            .map_err(|e| rt_error!("failed to create poller, because: {}", e))?;
 

	
 
        return Ok(PollingThreadBuilder {
 
            poller: Arc::new(poller),
 
            generation_counter: Arc::new(AtomicU32::new(0)),
 
            queue: QueueDynMpsc::new(64),
 
            runtime,
 
            logging_enabled,
 
        })
 
    }
 

	
 
    pub(crate) fn client(&self) -> PollingClient {
 
        return PollingClient{
 
            poller: self.poller.clone(),
 
            generation_counter: self.generation_counter.clone(),
 
            queue: self.queue.producer(),
 
        }
 
    }
 

	
 
    pub(crate) fn into_thread(self) -> (PollingThread, PollingThreadDestroyer) {
 
        let destroyer = self.queue.producer();
 

	
 
        return (
 
            PollingThread{
 
                poller: self.poller,
 
                runtime: self.runtime,
 
                queue: self.queue,
 
                logging_enabled: self.logging_enabled,
 
            },
 
            PollingThreadDestroyer::new(destroyer)
 
        );
 
    }
 
}
 

	
 
pub(crate) struct PollingThread {
 
    poller: Arc<Poller>,
 
    runtime: Arc<RuntimeInner>,
 
    queue: QueueDynMpsc<PollCmd>,
 
    logging_enabled: bool,
 
}
 

	
 
impl PollingThread {
 
    pub(crate) fn run(&mut self) {
 
        use crate::runtime2::scheduler::SchedulerCtx;
 
        use crate::runtime2::communication::Message;
 

	
 
        const NUM_EVENTS: usize = 256;
 
        const EPOLL_DURATION: time::Duration = time::Duration::from_millis(250);
 

	
 
        // @performance: Lot of improvements possible here, a HashMap is likely
 
        // a horrible way to do this.
 
        let mut events = Vec::with_capacity(NUM_EVENTS);
 
        let mut lookup = HashMap::with_capacity(64);
 
        self.log("Starting polling thread");
 

	
 
        loop {
 
            // Retrieve events first (because the PollingClient will first
 
            // register at epoll, and then push a command into the queue).
 
            self.poller.wait(&mut events, EPOLL_DURATION).unwrap();
 

	
 
            // Then handle everything in the command queue.
 
            while let Some(command) = self.queue.pop() {
 
                match command {
 
                    PollCmd::Register(handle, user_data) => {
 
                        self.log(&format!("Registering component {:?} as {}", handle.id(), user_data.0));
 
                        let key = Self::user_data_as_key(user_data);
 
                        debug_assert!(!lookup.contains_key(&key));
 
                        lookup.insert(key, handle);
 
                    },
 
                    PollCmd::Unregister(_file_descriptor, user_data) => {
 
                        let key = Self::user_data_as_key(user_data);
 
                        debug_assert!(lookup.contains_key(&key));
 
                        let mut handle = lookup.remove(&key).unwrap();
 
                        self.log(&format!("Unregistering component {:?} as {}", handle.id(), user_data.0));
 
                        if let Some(key) = handle.decrement_users() {
 
                            self.runtime.destroy_component(key);
 
                        }
 
                    },
 
                    PollCmd::Shutdown => {
 
                        // The contract is that all scheduler threads shutdown
 
                        // before the polling thread. This happens when all
 
                        // components are removed.
 
                        self.log("Received shutdown signal");
 
                        debug_assert!(lookup.is_empty());
 
                        return;
 
                    }
 
                }
 
            }
 

	
 
            // Now process all of the events. Because we might have had a
 
            // `Register` command followed by an `Unregister` command (e.g. a
 
            // component has died), we might get events that are not associated
 
            // with an entry in the lookup.
 
            for event in events.drain(..) {
 
                let key = event.u64;
 
                if let Some(handle) = lookup.get(&key) {
 
                    self.log(&format!("Sending poll to {:?} (event: {:x})", handle.id(), event.events));
 
                    handle.send_message(&self.runtime, Message::Poll, true);
 
                }
 
            }
 
        }
 
    }
 

	
 
    #[inline]
 
    fn user_data_as_key(data: UserData) -> u64 {
 
        return data.0;
 
    }
 

	
 
    fn log(&self, message: &str) {
 
        if self.logging_enabled {
 
            println!("[polling] {}", message);
 
        }
 
    }
 
}
 

	
 
// bit convoluted, but it works
 
pub(crate) struct PollingThreadDestroyer {
 
    queue: Option<QueueDynProducer<PollCmd>>,
 
}
 

	
 
impl PollingThreadDestroyer {
 
    fn new(queue: QueueDynProducer<PollCmd>) -> Self {
 
        return Self{ queue: Some(queue) };
 
    }
 

	
 
    pub(crate) fn initiate_destruction(&mut self) {
 
        self.queue.take().unwrap().push(PollCmd::Shutdown);
 
    }
 
}
 

	
 
impl Drop for PollingThreadDestroyer {
 
    fn drop(&mut self) {
 
        debug_assert!(self.queue.is_none());
 
    }
 
}
 

	
 
pub(crate) struct PollTicket(FileDescriptor, u64);
 

	
 
/// A structure that allows the owner to register components at the polling
 
/// thread. Because of assumptions in the communication queue all of these
 
/// clients should be dropped before stopping the polling thread.
 
pub(crate) struct PollingClient {
 
    poller: Arc<Poller>,
 
    generation_counter: Arc<AtomicU32>,
 
    queue: QueueDynProducer<PollCmd>,
 
}
 

	
 
impl PollingClient {
 
    fn register<F: AsFileDescriptor>(&self, entity: F, handle: CompHandle, read: bool, write: bool) -> Result<PollTicket, RtError> {
 
        let generation = self.generation_counter.fetch_add(1, Ordering::Relaxed);
 
        let user_data = user_data_for_component(handle.id().0, generation);
 
        self.queue.push(PollCmd::Register(handle, user_data));
 

	
 
        let file_descriptor = entity.as_file_descriptor();
 
        self.poller.register(file_descriptor, user_data, read, write)
 
            .map_err(|e| rt_error!("failed to register for polling, because: {}", e))?;
 

	
 
        return Ok(PollTicket(file_descriptor, user_data.0));
 
    }
 

	
 
    fn unregister(&self, ticket: PollTicket) -> Result<(), RtError> {
 
        let file_descriptor = ticket.0;
 
        let user_data = UserData(ticket.1);
 
        self.queue.push(PollCmd::Unregister(file_descriptor, user_data));
 
        self.poller.unregister(file_descriptor)
 
            .map_err(|e| rt_error!("failed to unregister polling, because: {}", e))?;
 

	
 
        return Ok(());
 
    }
 
}
 

	
 
#[inline]
 
fn user_data_for_component(component_id: u32, generation: u32) -> UserData {
 
    return UserData((generation as u64) << 32 | (component_id as u64));
 
}
 
\ No newline at end of file
src/runtime2/runtime.rs
Show inline comments
 
use std::sync::{Arc, Mutex, Condvar};
 
use std::sync::atomic::{AtomicU32, AtomicBool, Ordering};
 
use std::thread;
 
use std::collections::VecDeque;
 

	
 
use crate::protocol::*;
 
use crate::runtime2::poll::{PollingThreadBuilder, PollingThreadDestroyer};
 
use crate::runtime2::RtError;
 

	
 
use super::communication::Message;
 
use super::component::{Component, wake_up_if_sleeping, CompPDL, CompCtx};
 
use super::store::{ComponentStore, ComponentReservation, QueueDynMpsc, QueueDynProducer};
 
use super::scheduler::*;
 

	
 
@@ -22,13 +25,13 @@ pub(crate) struct CompKey(pub u32);
 
impl CompKey {
 
    pub(crate) fn downgrade(&self) -> CompId {
 
        return CompId(self.0);
 
    }
 
}
 

	
 
/// Generational ID of a component
 
/// Generational ID of a component.
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub struct CompId(pub u32);
 

	
 
impl CompId {
 
    pub(crate) fn new_invalid() -> CompId {
 
        return CompId(u32::MAX);
 
@@ -77,13 +80,13 @@ pub(crate) struct CompPublic {
 
/// Handle to public part of a component. Would be nice if we could
 
/// automagically manage the `num_handles` counter. But when it reaches zero we
 
/// need to manually remove the handle from the runtime. So we just have debug
 
/// code to make sure this actually happens.
 
pub(crate) struct CompHandle {
 
    target: *const CompPublic,
 
    id: CompId, // TODO: @Remove after debugging
 
    id: CompId,
 
    #[cfg(debug_assertions)] decremented: bool,
 
}
 

	
 
impl CompHandle {
 
    fn new(id: CompId, public: &CompPublic) -> CompHandle {
 
        let handle = CompHandle{
 
@@ -92,20 +95,23 @@ impl CompHandle {
 
            #[cfg(debug_assertions)] decremented: false,
 
        };
 
        handle.increment_users();
 
        return handle;
 
    }
 

	
 
    pub(crate) fn send_message(&self, sched_ctx: &SchedulerCtx, message: Message, try_wake_up: bool) {
 
        sched_ctx.log(&format!("Sending message to [c:{:03}, wakeup:{}]: {:?}", self.id.0, try_wake_up, message));
 
    pub(crate) fn send_message(&self, runtime: &RuntimeInner, message: Message, try_wake_up: bool) {
 
        self.inbox.push(message);
 
        if try_wake_up {
 
            wake_up_if_sleeping(sched_ctx, self.id, self);
 
            wake_up_if_sleeping(runtime, self.id, self);
 
        }
 
    }
 

	
 
    pub(crate) fn id(&self) -> CompId {
 
        return self.id;
 
    }
 

	
 
    fn increment_users(&self) {
 
        let old_count = self.num_handles.fetch_add(1, Ordering::AcqRel);
 
        debug_assert!(old_count > 0); // because we should never be able to retrieve a handle when the component is (being) destroyed
 
    }
 

	
 
    /// Returns the `CompKey` to the component if it should be destroyed
 
@@ -152,41 +158,60 @@ impl Drop for CompHandle {
 
// -----------------------------------------------------------------------------
 
// Runtime
 
// -----------------------------------------------------------------------------
 

	
 
pub struct Runtime {
 
    pub(crate) inner: Arc<RuntimeInner>,
 
    threads: Vec<std::thread::JoinHandle<()>>,
 
    scheduler_threads: Vec<thread::JoinHandle<()>>,
 
    polling_destroyer: PollingThreadDestroyer,
 
    polling_thread: Option<thread::JoinHandle<()>>,
 
}
 

	
 
impl Runtime {
 
    // TODO: debug_logging should be removed at some point
 
    pub fn new(num_threads: u32, debug_logging: bool, protocol_description: ProtocolDescription) -> Runtime {
 
        assert!(num_threads > 0, "need a thread to perform work");
 
    pub fn new(num_threads: u32, debug_logging: bool, protocol_description: ProtocolDescription) -> Result<Runtime, RtError> {
 
        if num_threads == 0 {
 
            return Err(rt_error!("need at least one thread to create the runtime"));
 
        }
 
        let runtime_inner = Arc::new(RuntimeInner {
 
            protocol: protocol_description,
 
            components: ComponentStore::new(128),
 
            work_queue: Mutex::new(VecDeque::with_capacity(128)),
 
            work_condvar: Condvar::new(),
 
            active_elements: AtomicU32::new(1),
 
        });
 
        let mut runtime = Runtime {
 
            inner: runtime_inner,
 
            threads: Vec::with_capacity(num_threads as usize),
 
        };
 
        let polling_builder = rt_error_try!(
 
            PollingThreadBuilder::new(runtime_inner.clone(), debug_logging),
 
            "failed to build polling thread"
 
        );
 

	
 
        let mut scheduler_threads = Vec::with_capacity(num_threads as usize);
 

	
 
        for thread_index in 0..num_threads {
 
            let mut scheduler = Scheduler::new(runtime.inner.clone(), thread_index, debug_logging);
 
            let thread_handle = std::thread::spawn(move || {
 
            let mut scheduler = Scheduler::new(
 
                runtime_inner.clone(), polling_builder.client(),
 
                thread_index, debug_logging
 
            );
 
            let thread_handle = thread::spawn(move || {
 
                scheduler.run();
 
            });
 

	
 
            runtime.threads.push(thread_handle);
 
            scheduler_threads.push(thread_handle);
 
        }
 

	
 
        return runtime;
 
        let (mut poller, polling_destroyer) = polling_builder.into_thread();
 
        let polling_thread = thread::spawn(move || {
 
            poller.run();
 
        });
 

	
 
        return Ok(Runtime{
 
            inner: runtime_inner,
 
            scheduler_threads,
 
            polling_destroyer,
 
            polling_thread: Some(polling_thread),
 
        });
 
    }
 

	
 
    pub fn create_component(&self, module_name: &[u8], routine_name: &[u8]) -> Result<(), ComponentCreationError> {
 
        use crate::protocol::eval::ValueGroup;
 
        let prompt = self.inner.protocol.new_component(
 
            module_name, routine_name,
 
@@ -202,15 +227,18 @@ impl Runtime {
 
    }
 
}
 

	
 
impl Drop for Runtime {
 
    fn drop(&mut self) {
 
        self.inner.decrement_active_components();
 
        for handle in self.threads.drain(..) {
 
        for handle in self.scheduler_threads.drain(..) {
 
            handle.join().expect("join scheduler thread");
 
        }
 

	
 
        self.polling_destroyer.initiate_destruction();
 
        self.polling_thread.take().unwrap().join().expect("join polling thread");
 
    }
 
}
 

	
 
/// Memory that is maintained by "the runtime". In practice it is maintained by
 
/// multiple schedulers, and this serves as the common interface to that memory.
 
pub(crate) struct RuntimeInner {
src/runtime2/scheduler.rs
Show inline comments
 
use std::sync::Arc;
 
use std::sync::atomic::Ordering;
 
use crate::runtime2::poll::PollingClient;
 

	
 
use super::component::*;
 
use super::runtime::*;
 

	
 
/// Data associated with a scheduler thread
 
pub(crate) struct Scheduler {
 
    runtime: Arc<RuntimeInner>,
 
    polling: PollingClient,
 
    scheduler_id: u32,
 
    debug_logging: bool,
 
}
 

	
 
pub(crate) struct SchedulerCtx<'a> {
 
    pub runtime: &'a RuntimeInner,
 
    pub polling: &'a PollingClient,
 
    pub id: u32,
 
    pub comp: u32,
 
    pub logging_enabled: bool,
 
}
 

	
 
impl<'a> SchedulerCtx<'a> {
 
    pub fn new(runtime: &'a RuntimeInner, id: u32, logging_enabled: bool) -> Self {
 
    pub fn new(runtime: &'a RuntimeInner, polling: &'a PollingClient, id: u32, logging_enabled: bool) -> Self {
 
        return Self {
 
            runtime,
 
            polling,
 
            id,
 
            comp: 0,
 
            logging_enabled,
 
        }
 
    }
 

	
 
@@ -35,18 +39,18 @@ impl<'a> SchedulerCtx<'a> {
 
    }
 
}
 

	
 
impl Scheduler {
 
    // public interface to thread
 

	
 
    pub fn new(runtime: Arc<RuntimeInner>, scheduler_id: u32, debug_logging: bool) -> Self {
 
        return Scheduler{ runtime, scheduler_id, debug_logging }
 
    pub fn new(runtime: Arc<RuntimeInner>, polling: PollingClient, scheduler_id: u32, debug_logging: bool) -> Self {
 
        return Scheduler{ runtime, polling, scheduler_id, debug_logging }
 
    }
 

	
 
    pub fn run(&mut self) {
 
        let mut scheduler_ctx = SchedulerCtx::new(&*self.runtime, self.scheduler_id, self.debug_logging);
 
        let mut scheduler_ctx = SchedulerCtx::new(&*self.runtime, &self.polling, self.scheduler_id, self.debug_logging);
 

	
 
        'run_loop: loop {
 
            // Wait until we have something to do (or need to quit)
 
            let comp_key = self.runtime.take_work();
 
            if comp_key.is_none() {
 
                break 'run_loop;
src/runtime2/stdlib/internet.rs
Show inline comments
 
@@ -203,26 +203,12 @@ trait AsRawFileDescriptor {
 
impl AsRawFileDescriptor for SocketTcpClient {
 
    fn as_raw_file_descriptor(&self) -> c_int {
 
        return self.socket_handle;
 
    }
 
}
 

	
 
impl<T: AsRawFileDescriptor> event::Source for T {
 
    fn register(&mut self, registry: &Registry, token: Token, interests: Interest) -> std::io::Result<()> {
 
        registry.selector().register()
 
    }
 

	
 
    fn reregister(&mut self, registry: &Registry, token: Token, interests: Interest) -> std::io::Result<()> {
 
        todo!()
 
    }
 

	
 
    fn deregister(&mut self, registry: &Registry) -> std::io::Result<()> {
 
        todo!()
 
    }
 
}
 

	
 
/// Performs the `socket` and `bind` calls.
 
fn create_and_bind_socket(socket_type: libc::c_int, protocol: libc::c_int, ip: IpAddr, port: u16) -> Result<libc::c_int, SocketError> {
 
    let family = socket_family_from_ip(ip);
 

	
 
    unsafe {
 
        let socket_handle = socket(family, socket_type, protocol);
src/runtime2/store/queue_mpsc.rs
Show inline comments
 
@@ -146,17 +146,19 @@ pub struct QueueDynProducer<T> {
 
impl<T> QueueDynProducer<T> {
 
    fn new(consumer: &QueueDynMpsc<T>) -> Self {
 
        dbg_code!(consumer.inner.dbg.fetch_add(1, Ordering::AcqRel));
 
        unsafe {
 
            // If you only knew the power of the dark side! Obi-Wan never told
 
            // you what happened to your father!
 
            let queue: *const _ = std::mem::transmute(consumer.inner.as_ref());
 
            let queue = consumer.inner.as_ref() as *const _;
 
            return Self{ queue };
 
        }
 
    }
 

	
 

	
 

	
 
    pub fn push(&self, value: T) {
 
        let queue = unsafe{ &*self.queue };
 

	
 
        let mut data_lock = queue.data.lock_shared();
 
        let mut write_index = queue.write_head.load(Ordering::Acquire);
 

	
src/runtime2/tests/mod.rs
Show inline comments
 
@@ -21,13 +21,13 @@ fn test_component_creation() {
 
    let pd = ProtocolDescription::parse(b"
 
    primitive nothing_at_all() {
 
        s32 a = 5;
 
        auto b = 5 + a;
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(1, true, pd);
 
    let rt = Runtime::new(1, true, pd).unwrap();
 

	
 
    for _i in 0..20 {
 
        create_component(&rt, "", "nothing_at_all", no_args());
 
    }
 
}
 

	
 
@@ -78,13 +78,13 @@ fn test_component_communication() {
 
        new receiver(i_ormm, 1, 5);
 

	
 
        // multiple rounds, multiple messages per round
 
        new sender(o_mrmm, 5, 5);
 
        new receiver(i_mrmm, 5, 5);
 
    }").expect("compilation");
 
    let rt = Runtime::new(3, true, pd);
 
    let rt = Runtime::new(3, true, pd).unwrap();
 
    create_component(&rt, "", "constructor", no_args());
 
}
 

	
 
#[test]
 
fn test_intermediate_messenger() {
 
    let pd = ProtocolDescription::parse(b"
 
@@ -127,13 +127,13 @@ fn test_intermediate_messenger() {
 
        new constructor_template<u64>();
 
        new constructor_template<s16>();
 
        new constructor_template<s32>();
 
        new constructor_template<s64>();
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(3, true, pd);
 
    let rt = Runtime::new(3, true, pd).unwrap();
 
    create_component(&rt, "", "constructor", no_args());
 
}
 

	
 
#[test]
 
fn test_simple_select() {
 
    let pd = ProtocolDescription::parse(b"
 
@@ -169,21 +169,21 @@ fn test_simple_select() {
 
                index += 1;
 
            }
 
        }
 
    }
 

	
 
    composite constructor() {
 
        auto num_sends = 15;
 
        auto num_sends = 1;
 
        channel tx_a -> rx_a;
 
        channel tx_b -> rx_b;
 
        new sender(tx_a, num_sends);
 
        new receiver(rx_a, rx_b, num_sends);
 
        new sender(tx_b, num_sends);
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(3, false, pd);
 
    let rt = Runtime::new(3, true, pd).unwrap();
 
    create_component(&rt, "", "constructor", no_args());
 
}
 

	
 
#[test]
 
fn test_unguarded_select() {
 
    let pd = ProtocolDescription::parse(b"
 
@@ -199,13 +199,13 @@ fn test_unguarded_select() {
 
        u32 index = 0;
 
        while (index < 5) {
 
            sync select { auto v = () -> index += 1; }
 
        }
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(3, false, pd);
 
    let rt = Runtime::new(3, false, pd).unwrap();
 
    create_component(&rt, "", "constructor_outside_select", no_args());
 
    create_component(&rt, "", "constructor_inside_select", no_args());
 
}
 

	
 
#[test]
 
fn test_empty_select() {
 
@@ -215,13 +215,13 @@ fn test_empty_select() {
 
        while (index < 5) {
 
            sync select {}
 
            index += 1;
 
        }
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(3, false, pd);
 
    let rt = Runtime::new(3, false, pd).unwrap();
 
    create_component(&rt, "", "constructor", no_args());
 
}
 

	
 
#[test]
 
fn test_random_u32_temporary_thingo() {
 
    let pd = ProtocolDescription::parse(b"
 
@@ -241,9 +241,9 @@ fn test_random_u32_temporary_thingo() {
 
        channel tx -> rx;
 
        auto num_values = 25;
 
        new random_u32(tx, 1, 100, num_values);
 
        new random_taker(rx, num_values);
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(1, true, pd);
 
    let rt = Runtime::new(1, true, pd).unwrap();
 
    create_component(&rt, "", "constructor", no_args());
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)