Changeset - fb814548c7d5
[Not reviewed]
src/protocol/ast.rs
Show inline comments
 
@@ -1083,12 +1083,13 @@ pub enum ProcedureSource {
 
    // Buitlin functions, not available to user
 
    FuncSelectStart,
 
    FuncSelectRegisterCasePort,
 
    FuncSelectWait,
 
    // Builtin components, available to user
 
    CompRandomU32, // TODO: Remove, temporary thing
 
    CompTcpClient,
 
}
 

	
 
impl ProcedureSource {
 
    pub(crate) fn is_builtin(&self) -> bool {
 
        match self {
 
            ProcedureSource::FuncUserDefined | ProcedureSource::CompUserDefined => false,
 
@@ -1849,23 +1850,24 @@ pub enum Method {
 
    // Builtin function, not accessible by programmer
 
    SelectStart, // SelectStart(total_num_cases, total_num_ports)
 
    SelectRegisterCasePort, // SelectRegisterCasePort(case_index, port_index, port_id)
 
    SelectWait, // SelectWait() -> u32
 
    // Builtin component,
 
    ComponentRandomU32,
 
    ComponentTcpClient,
 
    // User-defined
 
    UserFunction,
 
    UserComponent,
 
}
 

	
 
impl Method {
 
    pub(crate) fn is_public_builtin(&self) -> bool {
 
        use Method::*;
 
        match self {
 
            Get | Put | Fires | Create | Length | Assert | Print => true,
 
            ComponentRandomU32 => true,
 
            ComponentRandomU32 | ComponentTcpClient => true,
 
            _ => false,
 
        }
 
    }
 

	
 
    pub(crate) fn is_user_defined(&self) -> bool {
 
        use Method::*;
src/protocol/eval/executor.rs
Show inline comments
 
@@ -752,13 +752,13 @@ impl Prompt {
 
                                        None => {
 
                                            cur_frame.expr_stack.push_back(ExprInstruction::EvalExpr(expr.this.upcast()));
 
                                            return Ok(EvalContinuation::SelectWait)
 
                                        },
 
                                    }
 
                                },
 
                                Method::ComponentRandomU32 => {
 
                                Method::ComponentRandomU32 | Method::ComponentTcpClient => {
 
                                    debug_assert_eq!(heap[expr.procedure].parameters.len(), cur_frame.expr_values.len());
 
                                    debug_assert_eq!(heap[cur_frame.position].as_new().expression, expr.this);
 
                                },
 
                                Method::UserComponent => {
 
                                    // This is actually handled by the evaluation
 
                                    // of the statement.
src/protocol/parser/mod.rs
Show inline comments
 
@@ -295,14 +295,15 @@ impl Parser {
 
    fn feed_standard_library(&mut self) -> Result<(), String> {
 
        use std::env;
 
        use std::path::{Path, PathBuf};
 
        use std::fs;
 

	
 
        // Pair is (name, add_to_global_namespace)
 
        const FILES: [(&'static str, bool); 2] = [
 
        const FILES: [(&'static str, bool); 3] = [
 
            ("std.global.pdl", true),
 
            ("std.internet.pdl", false),
 
            ("std.random.pdl", false),
 
        ];
 

	
 
        // Determine base directory
 
        let (base_path, from_env) = if let Ok(path) = env::var(REOWOLF_PATH_ENV) {
 
            // Path variable is set
src/protocol/parser/pass_definitions.rs
Show inline comments
 
@@ -374,12 +374,13 @@ impl PassDefinitions {
 
                ("std.global", "fires") => ProcedureSource::FuncFires,
 
                ("std.global", "create") => ProcedureSource::FuncCreate,
 
                ("std.global", "length") => ProcedureSource::FuncLength,
 
                ("std.global", "assert") => ProcedureSource::FuncAssert,
 
                ("std.global", "print") => ProcedureSource::FuncPrint,
 
                ("std.random", "random_u32") => ProcedureSource::CompRandomU32,
 
                ("std.internet", "tcp_client") => ProcedureSource::CompTcpClient,
 
                _ => panic!(
 
                    "compiler error: unknown builtin procedure '{}' in module '{}'",
 
                    procedure_name, module_name
 
                ),
 
            };
 

	
 
@@ -1655,12 +1656,13 @@ impl PassDefinitions {
 
                                    ProcedureSource::FuncFires => Method::Fires,
 
                                    ProcedureSource::FuncCreate => Method::Create,
 
                                    ProcedureSource::FuncLength => Method::Length,
 
                                    ProcedureSource::FuncAssert => Method::Assert,
 
                                    ProcedureSource::FuncPrint => Method::Print,
 
                                    ProcedureSource::CompRandomU32 => Method::ComponentRandomU32,
 
                                    ProcedureSource::CompTcpClient => Method::ComponentTcpClient,
 
                                    _ => todo!("other procedure sources"),
 
                                };
 

	
 
                                // Function call: consume the arguments
 
                                let func_span = parser_type.full_span;
 
                                let mut full_span = func_span;
src/protocol/parser/pass_validation_linking.rs
Show inline comments
 
@@ -1157,13 +1157,14 @@ impl Visitor for PassValidationLinking {
 
                }
 
            },
 
            Method::Print => {},
 
            Method::SelectStart
 
            | Method::SelectRegisterCasePort
 
            | Method::SelectWait => unreachable!(), // not usable by programmer directly
 
            Method::ComponentRandomU32 => {
 
            Method::ComponentRandomU32
 
            | Method::ComponentTcpClient => {
 
                expecting_wrapping_new_stmt = true;
 
            },
 
            Method::UserFunction => {}
 
            Method::UserComponent => {
 
                expecting_wrapping_new_stmt = true;
 
            },
src/runtime2/component/component.rs
Show inline comments
 
use crate::protocol::eval::{Prompt, EvalError, ValueGroup, PortId as EvalPortId};
 
use crate::protocol::*;
 
use crate::runtime2::*;
 
use crate::runtime2::communication::*;
 

	
 
use super::{CompCtx, CompPDL};
 
use super::{CompCtx, CompPDL, CompId};
 
use super::component_context::*;
 
use super::component_random::*;
 
use super::component_internet::*;
 
use super::control_layer::*;
 
use super::consensus::*;
 

	
 
pub enum CompScheduling {
 
    Immediate,
 
    Requeue,
 
@@ -16,13 +17,13 @@ pub enum CompScheduling {
 
    Exit,
 
}
 

	
 
/// Generic representation of a component (as viewed by a scheduler).
 
pub(crate) trait Component {
 
    /// Called upon the creation of the component.
 
    fn on_creation(&mut self, sched_ctx: &SchedulerCtx);
 
    fn on_creation(&mut self, comp_id: CompId, sched_ctx: &SchedulerCtx);
 

	
 
    /// Called if the component is created by another component and the messages
 
    /// are being transferred between the two.
 
    fn adopt_message(&mut self, comp_ctx: &mut CompCtx, message: DataMessage);
 

	
 
    /// Called if the component receives a new message. The component is
 
@@ -113,14 +114,15 @@ pub(crate) fn create_component(
 
) -> Box<dyn Component> {
 
    let definition = &protocol.heap[definition_id];
 
    debug_assert!(definition.kind == ProcedureKind::Primitive || definition.kind == ProcedureKind::Composite);
 

	
 
    if definition.source.is_builtin() {
 
        // Builtin component
 
        let component = match definition.source {
 
        let component: Box<dyn Component> = match definition.source {
 
            ProcedureSource::CompRandomU32 => Box::new(ComponentRandomU32::new(arguments)),
 
            ProcedureSource::CompTcpClient => Box::new(ComponentTcpClient::new(arguments)),
 
            _ => unreachable!(),
 
        };
 

	
 
        return component;
 
    } else {
 
        // User-defined component
src/runtime2/component/component_internet.rs
Show inline comments
 
use crate::protocol::eval::{ValueGroup, Value, EvalError};
 
use crate::runtime2::*;
 
use crate::runtime2::component::CompCtx;
 
use crate::runtime2::component::{CompCtx, CompId};
 
use crate::runtime2::stdlib::internet::*;
 
use crate::runtime2::poll::*;
 

	
 
use super::component::{self, *};
 
use super::control_layer::*;
 
use super::consensus::*;
 

	
 
use std::io::ErrorKind as IoErrorKind;
 
@@ -35,38 +36,52 @@ enum SyncState {
 
}
 

	
 
pub struct ComponentTcpClient {
 
    // Properties for the tcp socket
 
    socket_state: SocketState,
 
    sync_state: SyncState,
 
    poll_ticket: Option<PollTicket>,
 
    inbox_main: Option<DataMessage>,
 
    inbox_backup: Vec<DataMessage>,
 
    pdl_input_port_id: PortId, // input from PDL, so transmitted over socket
 
    pdl_output_port_id: PortId, // output towards PDL, so received over socket
 
    input_union_send_tag_value: i64,
 
    input_union_receive_tag_value: i64,
 
    input_union_finish_tag_value: i64,
 
    input_union_shutdown_tag_value: i64,
 
    // Generic component state
 
    exec_state: CompExecState,
 
    control: ControlLayer,
 
    consensus: Consensus,
 
    // Temporary variables
 
    byte_buffer: Vec<u8>,
 
}
 

	
 
impl Component for ComponentTcpClient {
 
    fn on_creation(&mut self, sched_ctx: &SchedulerCtx) {
 
    fn on_creation(&mut self, id: CompId, sched_ctx: &SchedulerCtx) {
 
        // Retrieve type information for messages we're going to receive
 
        let pd = &sched_ctx.runtime.protocol;
 
        let cmd_type = pd.find_type(b"std.internet", b"Cmd")
 
            .expect("'Cmd' type in the 'std.internet' module");
 
        let cmd_type = cmd_type
 
            .as_union();
 

	
 
        self.input_union_send_tag_value = cmd_type.get_variant_tag_value(b"Send").unwrap();
 
        self.input_union_receive_tag_value = cmd_type.get_variant_tag_value(b"Receive").unwrap();
 
        self.input_union_finish_tag_value = cmd_type.get_variant_tag_value(b"Finish").unwrap();
 
        self.input_union_shutdown_tag_value = cmd_type.get_variant_tag_value(b"Shutdown").unwrap();
 

	
 
        // Register socket for async events
 
        if let SocketState::Connected(socket) = &self.socket_state {
 
            let self_handle = sched_ctx.runtime.get_component_public(id);
 
            let poll_ticket = sched_ctx.polling.register(socket, self_handle, true, true)
 
                .expect("registering tcp component");
 

	
 
            debug_assert!(self.poll_ticket.is_none());
 
            self.poll_ticket = Some(poll_ticket);
 
        }
 
    }
 

	
 
    fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, message: DataMessage) {
 
        if self.inbox_main.is_none() {
 
            self.inbox_main = Some(message);
 
        } else {
 
@@ -147,12 +162,15 @@ impl Component for ComponentTcpClient {
 
                                    self.sync_state = SyncState::Getting;
 
                                    return Ok(CompScheduling::Immediate);
 
                                } else if tag_value == self.input_union_finish_tag_value {
 
                                    // Component requires us to end the sync round
 
                                    let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx);
 
                                    component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus);
 
                                } else if tag_value == self.input_union_shutdown_tag_value {
 
                                    // Component wants to close the connection
 
                                    todo!("implement clean shutdown, don't forget to unregister to poll ticket");
 
                                }
 
                            } else {
 
                                todo!("handle sync failure due to message deadlock");
 
                                return Ok(CompScheduling::Sleep);
 
                            }
 
                        } else {
 
@@ -265,17 +283,19 @@ impl ComponentTcpClient {
 
            todo!("friendly error reporting: failed to open socket {:?}", socket);
 
        }
 

	
 
        return Self{
 
            socket_state: SocketState::Connected(socket.unwrap()),
 
            sync_state: SyncState::AwaitingCmd,
 
            poll_ticket: None,
 
            inbox_main: None,
 
            inbox_backup: Vec::new(),
 
            input_union_send_tag_value: -1,
 
            input_union_receive_tag_value: -1,
 
            input_union_finish_tag_value: -1,
 
            input_union_shutdown_tag_value: -1,
 
            pdl_input_port_id: input_port,
 
            pdl_output_port_id: output_port,
 
            exec_state: CompExecState::new(),
 
            control: ControlLayer::default(),
 
            consensus: Consensus::new(),
 
            byte_buffer: Vec::new(),
src/runtime2/component/component_pdl.rs
Show inline comments
 
@@ -4,12 +4,13 @@ use crate::protocol::ast::ProcedureDefinitionId;
 
use crate::protocol::eval::{
 
    PortId as EvalPortId, Prompt,
 
    ValueGroup, Value,
 
    EvalContinuation, EvalResult, EvalError
 
};
 

	
 
use crate::runtime2::runtime::CompId;
 
use crate::runtime2::scheduler::SchedulerCtx;
 
use crate::runtime2::communication::*;
 

	
 
use super::component::{
 
    self,
 
    CompExecState, Component, CompScheduling, CompMode,
 
@@ -219,13 +220,13 @@ pub(crate) struct CompPDL {
 
    // message is intended for that port.
 
    pub inbox_main: InboxMain,
 
    pub inbox_backup: Vec<DataMessage>,
 
}
 

	
 
impl Component for CompPDL {
 
    fn on_creation(&mut self, _sched_ctx: &SchedulerCtx) {
 
    fn on_creation(&mut self, _id: CompId, _sched_ctx: &SchedulerCtx) {
 
        // Intentionally empty
 
    }
 

	
 
    fn adopt_message(&mut self, comp_ctx: &mut CompCtx, message: DataMessage) {
 
        let port_handle = comp_ctx.get_port_handle(message.data_header.target_port);
 
        let port_index = comp_ctx.get_port_index(port_handle);
 
@@ -588,12 +589,18 @@ impl CompPDL {
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Handling ports
 
    // -------------------------------------------------------------------------
 

	
 
    /// Creates a new component and transfers ports. Because of the stepwise
 
    /// process in which memory is allocated, ports are transferred, messages
 
    /// are exchanged, component lifecycle methods are called, etc. This
 
    /// function facilitates a lot of implicit assumptions (e.g. when the
 
    /// `Component::on_creation` method is called, the component is already
 
    /// registered at the runtime).
 
    fn create_component_and_transfer_ports(
 
        &mut self,
 
        sched_ctx: &SchedulerCtx, creator_ctx: &mut CompCtx,
 
        definition_id: ProcedureDefinitionId, type_id: TypeId, mut arguments: ValueGroup
 
    ) {
 
        struct PortPair{
 
@@ -706,13 +713,13 @@ impl CompPDL {
 
        // to message exchanges between remote peers.
 
        let total_num_ports = opened_port_id_pairs.len() + closed_port_id_pairs.len();
 
        let component = component::create_component(&sched_ctx.runtime.protocol, definition_id, type_id, arguments, total_num_ports);
 
        let (created_key, component) = sched_ctx.runtime.finish_create_pdl_component(
 
            reservation, component, created_ctx, false,
 
        );
 
        component.component.on_creation(sched_ctx);
 
        component.component.on_creation(created_key.downgrade(), sched_ctx);
 

	
 
        // Now modify the creator's ports: remove every transferred port and
 
        // potentially remove the peer component.
 
        for pair in opened_port_id_pairs.iter() {
 
            // Remove peer if appropriate
 
            let creator_port_info = creator_ctx.get_port(pair.creator_handle);
src/runtime2/component/component_random.rs
Show inline comments
 
@@ -24,13 +24,13 @@ pub struct ComponentRandomU32 {
 
    did_perform_send: bool, // when in sync mode
 
    control: ControlLayer,
 
    consensus: Consensus,
 
}
 

	
 
impl Component for ComponentRandomU32 {
 
    fn on_creation(&mut self, _sched_ctx: &SchedulerCtx) {
 
    fn on_creation(&mut self, _id: CompId, _sched_ctx: &SchedulerCtx) {
 
    }
 

	
 
    fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, _message: DataMessage) {
 
        // Impossible since this component does not have any input ports in its
 
        // signature.
 
        unreachable!();
src/runtime2/poll/mod.rs
Show inline comments
 
@@ -292,25 +292,25 @@ pub(crate) struct PollingClient {
 
    poller: Arc<Poller>,
 
    generation_counter: Arc<AtomicU32>,
 
    queue: QueueDynProducer<PollCmd>,
 
}
 

	
 
impl PollingClient {
 
    fn register<F: AsFileDescriptor>(&self, entity: F, handle: CompHandle, read: bool, write: bool) -> Result<PollTicket, RtError> {
 
    pub(crate) fn register<F: AsFileDescriptor>(&self, entity: &F, handle: CompHandle, read: bool, write: bool) -> Result<PollTicket, RtError> {
 
        let generation = self.generation_counter.fetch_add(1, Ordering::Relaxed);
 
        let user_data = user_data_for_component(handle.id().0, generation);
 
        self.queue.push(PollCmd::Register(handle, user_data));
 

	
 
        let file_descriptor = entity.as_file_descriptor();
 
        self.poller.register(file_descriptor, user_data, read, write)
 
            .map_err(|e| rt_error!("failed to register for polling, because: {}", e))?;
 

	
 
        return Ok(PollTicket(file_descriptor, user_data.0));
 
    }
 

	
 
    fn unregister(&self, ticket: PollTicket) -> Result<(), RtError> {
 
    pub(crate) fn unregister(&self, ticket: PollTicket) -> Result<(), RtError> {
 
        let file_descriptor = ticket.0;
 
        let user_data = UserData(ticket.1);
 
        self.queue.push(PollCmd::Unregister(file_descriptor, user_data));
 
        self.poller.unregister(file_descriptor)
 
            .map_err(|e| rt_error!("failed to unregister polling, because: {}", e))?;
 

	
src/runtime2/runtime.rs
Show inline comments
 
@@ -303,18 +303,21 @@ impl RuntimeInner {
 

	
 
    pub(crate) fn get_component_public(&self, id: CompId) -> CompHandle {
 
        let component = self.components.get(id.0);
 
        return CompHandle::new(id, &component.public);
 
    }
 

	
 
    /// Will remove a component and its memory from the runtime. May only be
 
    /// called if the necessary conditions for destruction have been met.
 
    pub(crate) fn destroy_component(&self, key: CompKey) {
 
        dbg_code!({
 
            let component = self.get_component(key);
 
            debug_assert!(component.exiting);
 
            debug_assert_eq!(component.public.num_handles.load(Ordering::Acquire), 0);
 
        });
 

	
 
        self.decrement_active_components();
 
        self.components.destroy(key.0);
 
    }
 

	
 
    // Tracking number of active interfaces and the active components
 

	
src/runtime2/stdlib/internet.rs
Show inline comments
 
@@ -6,12 +6,14 @@ use libc::{
 
    c_int,
 
    sockaddr_in, sockaddr_in6, in_addr, in6_addr,
 
    socket, bind, listen, accept, connect, close,
 
};
 
use mio::{event, Interest, Registry, Token};
 

	
 
use crate::runtime2::poll::{AsFileDescriptor, FileDescriptor};
 

	
 
#[derive(Debug)]
 
pub enum SocketError {
 
    Opening,
 
    Modifying,
 
    Binding,
 
    Listening,
 
@@ -80,12 +82,18 @@ impl Drop for SocketTcpClient {
 
    fn drop(&mut self) {
 
        debug_assert!(self.socket_handle >= 0);
 
        unsafe{ close(self.socket_handle) };
 
    }
 
}
 

	
 
impl AsFileDescriptor for SocketTcpClient {
 
    fn as_file_descriptor(&self) -> FileDescriptor {
 
        return self.socket_handle;
 
    }
 
}
 

	
 
/// Raw socket receiver. Essentially a listener that accepts a single connection
 
struct SocketRawRx {
 
    listen_handle: c_int,
 
    accepted_handle: c_int,
 
}
 

	
 
@@ -306,31 +314,7 @@ fn socket_family_from_ip(ip: IpAddr) -> libc::c_int {
 
    };
 
}
 

	
 
#[inline]
 
fn htons(port: u16) -> u16 {
 
    return port.to_be();
 
}
 

	
 
mod tests {
 
    use std::net::*;
 
    use super::*;
 

	
 
    // #[test] @nocommit Remove this
 
    // fn test_inet_thingo() {
 
    //     const SIZE: usize = 1024;
 
    //
 
    //     let s = SocketTcpClient::new(IpAddr::V4(Ipv4Addr::new(142, 250, 179, 163)), 80).expect("connect");
 
    //     s.send(b"GET / HTTP/1.1\r\n\r\n").expect("sending");
 
    //     let mut total = Vec::<u8>::new();
 
    //     let mut buffer = [0; SIZE];
 
    //     let mut received = SIZE;
 
    //
 
    //     while received > 0 {
 
    //         received = s.receive(&mut buffer).expect("receiving");
 
    //         println!("DEBUG: Received {} bytes", received);
 
    //         total.extend_from_slice(&buffer[..received]);
 
    //     }
 
    //     let as_str = String::from_utf8_lossy(total.as_slice());
 
    //     println!("Yay! Got {} bytes:\n{}", as_str.len(), as_str);
 
    // }
 
}
 
\ No newline at end of file
src/runtime2/tests/mod.rs
Show inline comments
 
@@ -243,7 +243,7 @@ fn test_random_u32_temporary_thingo() {
 
        new random_u32(tx, 1, 100, num_values);
 
        new random_taker(rx, num_values);
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(1, true, pd).unwrap();
 
    create_component(&rt, "", "constructor", no_args());
 
}
 
\ No newline at end of file
 
}
std/std.internet.pdl
Show inline comments
 
#module std.internet
 

	
 
union Cmd {
 
    Send(u8[]),
 
    Receive,
 
    Finish,
 
    Shutdown,
 
}
 

	
 
primitive tcp_client(u8[] ip, u16 port, in<Cmd> cmds, out<u8[]> rx) {
 
    #builtin
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)