Changeset - ebea15dffde4
[Not reviewed]
src/protocol/parser/mod.rs
Show inline comments
 
@@ -318,25 +318,31 @@ impl Parser {
 
                    path.push_str("./");
 
                    path.push_str(REOWOLF_PATH_DIR);
 
                    path
 
                }
 
            };
 

	
 
            (path, false)
 
        };
 

	
 
        // Make sure directory exists
 
        let path = Path::new(&base_path);
 
        if !path.exists() {
 
            return Err(format!("std lib root directory '{}' does not exist", base_path));
 
            let from_env_message = if from_env {
 
                format!(" (retrieved from the environment variable '{}')", REOWOLF_PATH_ENV)
 
            } else {
 
                String::new()
 
            };
 

	
 
            return Err(format!("std lib root directory '{}'{} does not exist", base_path, from_env_message));
 
        }
 

	
 
        // Try to load all standard library files. We might need a more unified
 
        // way to do this in the future (i.e. a "std" package, containing all
 
        // of the modules)
 
        let mut file_path = PathBuf::new();
 
        let mut first_file = true;
 

	
 
        for (file, add_to_global_namespace) in FILES {
 
            file_path.clear();
 
            file_path.push(path);
 
            file_path.push(file);
src/runtime2/component/component.rs
Show inline comments
 
use std::fmt::{Display as FmtDisplay, Result as FmtResult, Formatter};
 

	
 
use crate::protocol::eval::{Prompt, EvalError, ValueGroup, Value, ValueId, PortId as EvalPortId};
 
use crate::protocol::*;
 
use crate::runtime2::*;
 
use crate::runtime2::communication::*;
 
use crate::runtime2::component::component_pdl::find_ports_in_value_group;
 

	
 
use super::{CompCtx, CompPDL, CompId};
 
use super::component_context::*;
 
use super::component_random::*;
 
use super::component_internet::*;
 
use super::control_layer::*;
 
use super::consensus::*;
 

	
 
pub enum CompScheduling {
 
    Immediate,
 
    Requeue,
 
    Sleep,
 
@@ -225,25 +224,26 @@ pub(crate) fn create_component(
 
// Generic component messaging utilities (for sending and receiving)
 
// -----------------------------------------------------------------------------
 

	
 
/// Default handling of sending a data message. In case the port is blocked then
 
/// the `ExecState` will become blocked as well. Note that
 
/// `default_handle_control_message` will ensure that the port becomes
 
/// unblocked if so instructed by the receiving component. The returned
 
/// scheduling value must be used.
 
#[must_use]
 
pub(crate) fn default_send_data_message(
 
    exec_state: &mut CompExecState, transmitting_port_id: PortId,
 
    port_instruction: PortInstruction, value: ValueGroup,
 
    sched_ctx: &SchedulerCtx, consensus: &mut Consensus, comp_ctx: &mut CompCtx
 
    sched_ctx: &SchedulerCtx, consensus: &mut Consensus,
 
    control: &mut ControlLayer, comp_ctx: &mut CompCtx
 
) -> Result<CompScheduling, (PortInstruction, String)> {
 
    debug_assert_eq!(exec_state.mode, CompMode::Sync);
 

	
 
    let port_handle = comp_ctx.get_port_handle(transmitting_port_id);
 
    let port_info = comp_ctx.get_port_mut(port_handle);
 
    port_info.last_instruction = port_instruction;
 

	
 
    let port_info = comp_ctx.get_port(port_handle);
 
    debug_assert_eq!(port_info.kind, PortKind::Putter);
 

	
 
    if port_info.state.is_closed() {
 
        // Note: normally peer is eventually consistent, but if it has shut down
 
@@ -253,51 +253,39 @@ pub(crate) fn default_send_data_message(
 
            format!("Cannot send on this port, as the peer (id:{}) has shut down", port_info.peer_comp_id.0)
 
        ))
 
    } else if port_info.state.is_blocked() {
 
        // Port is blocked, so we cannot send
 
        exec_state.set_as_blocked_put(transmitting_port_id, value);
 

	
 
        return Ok(CompScheduling::Sleep);
 
    } else {
 
        // Check if there are any ports that are being transmitted
 
        let mut ports = Vec::new();
 
        find_ports_in_value_group(&value, &mut ports);
 
        if !ports.is_empty() {
 
            prepare_send_message_with_ports(
 
                transmitting_port_id, port_instruction, value, exec_state,
 
                comp_ctx, sched_ctx, control
 
            )?;
 

	
 
            return Ok(CompScheduling::Sleep);
 
        } else {
 
            // Port is not blocked and no ports to transfer: send to the peer
 
            let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 
            let peer_info = comp_ctx.get_peer(peer_handle);
 
            let annotated_message = consensus.annotate_data_message(comp_ctx, port_info, value);
 
            peer_info.handle.send_message_logged(sched_ctx, Message::Data(annotated_message), true);
 

	
 
            for (value_location, port_id) in ports {
 
                let transmitted_port_handle = comp_ctx.get_port_handle(port_id);
 
                let transmitted_port = comp_ctx.get_port(transmitted_port_handle);
 

	
 
                if transmitted_port.state.is_set(PortStateFlag::Transmitted) {
 
                    // Note: We could also attach where the port has been
 
                    //  transferred
 
                    return Err((
 
                        port_info.last_instruction,
 
                        String::from("Cannot send this message, as it contains a previously transmitted port")
 
                    ));
 
                }
 

	
 
                // Prepare ack for PPC
 
                // Prepare PPC message
 
            }
 
            return Ok(CompScheduling::Immediate);
 
        }
 

	
 
        // Port is not blocked, so send to the peer
 
        let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 
        let annotated_message = consensus.annotate_data_message(comp_ctx, port_info, value);
 
        peer_info.handle.send_message_logged(sched_ctx, Message::Data(annotated_message), true);
 

	
 
        return Ok(CompScheduling::Immediate);
 
    }
 
}
 

	
 
pub(crate) enum IncomingData {
 
    PlacedInSlot,
 
    SlotFull(DataMessage),
 
}
 

	
 
/// Default handling of receiving a data message. In case there is no room for
 
/// the message it is returned from this function. Note that this function is
 
/// different from PDL code performing a `get` on a port; this is the case where
 
/// the message first arrives at the component.
 
@@ -407,56 +395,56 @@ pub(crate) fn default_attempt_get(
 
        // We don't have a message waiting for us and the port is not blocked.
 
        // So enter the BlockedGet state
 
        exec_state.set_as_blocked_get(target_port);
 
        return GetResult::NoMessage;
 
    }
 
}
 

	
 
/// Default handling that has been received through a `get`. Will check if any
 
/// more messages are waiting, and if the corresponding port was blocked because
 
/// of full buffers (hence, will use the control layer to make sure the peer
 
/// will become unblocked).
 
pub(crate) fn default_handle_received_data_message(
 
    targeted_port: PortId, port_instruction: PortInstruction, message: &mut DataMessage,
 
    targeted_port: PortId, _port_instruction: PortInstruction, message: &mut DataMessage,
 
    inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup,
 
    comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx, control: &mut ControlLayer
 
) -> Result<(), (PortInstruction, String)> {
 
    let port_handle = comp_ctx.get_port_handle(targeted_port);
 
    let port_index = comp_ctx.get_port_index(port_handle);
 
    let slot = &mut inbox_main[port_index];
 
    debug_assert!(slot.is_none()); // because we've just received from it
 
    debug_assert!(inbox_main[port_index].is_none()); // because we've just received from it
 

	
 
    // If we received any ports, add them to the port tracking and inbox struct.
 
    // Then notify the peers that they can continue sending to this port, but
 
    // now at a new address.
 
    for received_port in &mut message.ports {
 
        // Transfer messages to main/backup inbox
 
        let new_inbox_index = inbox_main.len();
 
        let _new_inbox_index = inbox_main.len();
 
        if !received_port.messages.is_empty() {
 
            inbox_main.push(Some(received_port.messages.remove(0)));
 
        }
 
        inbox_backup.extend(received_port.messages.drain(..));
 

	
 
        // Create a new port locally
 
        let mut new_port_state = received_port.state;
 
        new_port_state.set(PortStateFlag::Received);
 
        let new_port_handle = comp_ctx.add_port(
 
            received_port.peer_comp, received_port.peer_port,
 
            received_port.kind, new_port_state
 
        );
 
        debug_assert_eq!(_new_inbox_index, comp_ctx.get_port_index(new_port_handle));
 
        comp_ctx.change_port_peer(sched_ctx, new_port_handle, Some(received_port.peer_comp));
 
        let new_port = comp_ctx.get_port(new_port_handle);
 
        comp_ctx.change_port_peer(sched_ctx, new_port_handle, Some(new_port.peer_comp_id));
 

	
 
        // Replace all references to the port in the received message
 
        for message_location in received_port.locations {
 
        for message_location in received_port.locations.iter().copied() {
 
            let value = match message_location {
 
                ValueId::Heap(heap_pos, heap_index) => &mut message.content.regions[heap_pos as usize][heap_index as usize],
 
                ValueId::Stack(stack_index) => &mut message.content.values[stack_index as usize],
 
            };
 

	
 
            match value {
 
                Value::Input(_) => {
 
                    debug_assert_eq!(new_port.kind, PortKind::Getter);
 
                    *value = Value::Input(port_id_to_eval(new_port.self_id));
 
                },
 
                Value::Output(_) => {
 
                    debug_assert_eq!(new_port.kind, PortKind::Putter);
 
@@ -465,41 +453,41 @@ pub(crate) fn default_handle_received_data_message(
 
                _ => unreachable!(),
 
            }
 
        }
 

	
 
        // Let the peer know that the port can now be used
 
        let peer_handle = comp_ctx.get_peer_handle(new_port.peer_comp_id);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 

	
 
        peer_info.handle.send_message_logged(sched_ctx, Message::Control(ControlMessage{
 
            id: ControlId::new_invalid(),
 
            sender_comp_id: comp_ctx.id,
 
            target_port_id: Some(new_port.peer_port_id),
 
            content: ControlMessageContent::PortPeerChangedUnblock(new_port.id, comp_ctx.id)
 
            content: ControlMessageContent::PortPeerChangedUnblock(new_port.self_id, comp_ctx.id)
 
        }), true);
 
    }
 

	
 
    // Modify last-known location where port instruction was retrieved
 
    let port_info = comp_ctx.get_port(port_handle);
 
    debug_assert_ne!(port_info.last_instruction, PortInstruction::None); // set by caller
 
    debug_assert!(port_info.state.is_open()); // checked by caller
 

	
 
    // Check if there are any more messages in the backup buffer
 
    for message_index in 0..inbox_backup.len() {
 
        let message = &inbox_backup[message_index];
 
        if message.data_header.target_port == targeted_port {
 
            // One more message, place it in the slot
 
            let message = inbox_backup.remove(message_index);
 
            debug_assert!(comp_ctx.get_port(port_handle).state.is_blocked()); // since we're removing another message from the backup
 
            *slot = Some(message);
 
            inbox_main[port_index] = Some(message);
 

	
 
            return Ok(());
 
        }
 
    }
 

	
 
    // Did not have any more messages, so if we were blocked, then we need to
 
    // unblock the port now (and inform the peer of this unblocking)
 
    if port_info.state.is_set(PortStateFlag::BlockedDueToFullBuffers) {
 
        let port_info = comp_ctx.get_port_mut(port_handle);
 
        port_info.state.clear(PortStateFlag::BlockedDueToFullBuffers);
 

	
 
        let (peer_handle, message) = control.cancel_port_blocking(comp_ctx, port_handle);
 
@@ -513,25 +501,25 @@ pub(crate) fn default_handle_received_data_message(
 
/// Handles control messages in the default way. Note that this function may
 
/// take a lot of actions in the name of the caller: pending messages may be
 
/// sent, ports may become blocked/unblocked, etc. So the execution
 
/// (`CompExecState`), control (`ControlLayer`) and consensus (`Consensus`)
 
/// state may all change.
 
pub(crate) fn default_handle_control_message(
 
    exec_state: &mut CompExecState, control: &mut ControlLayer, consensus: &mut Consensus,
 
    message: ControlMessage, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx,
 
    inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup
 
) -> Result<(), (PortInstruction, String)> {
 
    match message.content {
 
        ControlMessageContent::Ack => {
 
            default_handle_ack(exec_state, control, message.id, sched_ctx, comp_ctx, inbox_main, inbox_backup);
 
            default_handle_ack(exec_state, control, message.id, sched_ctx, comp_ctx, consensus, inbox_main, inbox_backup);
 
        },
 
        ControlMessageContent::BlockPort => {
 
            // One of our messages was accepted, but the port should be
 
            // blocked.
 
            let port_to_block = message.target_port_id.unwrap();
 
            let port_handle = comp_ctx.get_port_handle(port_to_block);
 
            let port_info = comp_ctx.get_port_mut(port_handle);
 
            debug_assert_eq!(port_info.kind, PortKind::Putter);
 
            port_info.state.set(PortStateFlag::BlockedDueToFullBuffers);
 
        },
 
        ControlMessageContent::ClosePort(content) => {
 
            // Request to close the port. We immediately comply and remove
 
@@ -546,25 +534,25 @@ pub(crate) fn default_handle_control_message(
 
            port_info.close_at_sync_end = true; // might be redundant (we might set it closed now)
 

	
 
            let peer_comp_id = port_info.peer_comp_id;
 
            let peer_handle = comp_ctx.get_peer_handle(peer_comp_id);
 

	
 
            // One exception to sending an `Ack` is if we just closed the
 
            // port ourselves, meaning that the `ClosePort` messages got
 
            // sent to one another.
 
            if let Some(control_id) = control.has_close_port_entry(port_handle, comp_ctx) {
 
                // The two components (sender and this component) are closing
 
                // the channel at the same time. So we don't care about the
 
                // content of the `ClosePort` message.
 
                default_handle_ack(exec_state, control, control_id, sched_ctx, comp_ctx, inbox_main, inbox_backup);
 
                default_handle_ack(exec_state, control, control_id, sched_ctx, comp_ctx, consensus, inbox_main, inbox_backup);
 
            } else {
 
                // Respond to the message
 
                let port_info = comp_ctx.get_port(port_handle);
 
                let last_instruction = port_info.last_instruction;
 
                let port_has_had_message = port_info.received_message_for_sync;
 
                default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx);
 
                comp_ctx.change_port_peer(sched_ctx, port_handle, None);
 

	
 
                // Handle any possible error conditions (which boil down to: the
 
                // port has been used, but the peer has died). If not in sync
 
                // mode then we close the port immediately.
 

	
 
@@ -613,25 +601,24 @@ pub(crate) fn default_handle_control_message(
 
            let port_info = comp_ctx.get_port_mut(port_handle);
 
            let peer_comp_id = port_info.peer_comp_id;
 
            port_info.state.set(PortStateFlag::BlockedDueToPeerChange);
 
            let peer_handle = comp_ctx.get_peer_handle(peer_comp_id);
 

	
 
            default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx);
 
        },
 
        ControlMessageContent::PortPeerChangedUnblock(new_port_id, new_comp_id) => {
 
            let port_to_change = message.target_port_id.unwrap();
 
            let port_handle = comp_ctx.get_port_handle(port_to_change);
 
            let port_info = comp_ctx.get_port(port_handle);
 
            debug_assert!(port_info.state.is_set(PortStateFlag::BlockedDueToPeerChange));
 
            let old_peer_id = port_info.peer_comp_id;
 

	
 
            let port_info = comp_ctx.get_port_mut(port_handle);
 
            port_info.peer_port_id = new_port_id;
 

	
 
            port_info.state.clear(PortStateFlag::BlockedDueToPeerChange);
 
            comp_ctx.change_port_peer(sched_ctx, port_handle, Some(new_comp_id));
 
            default_handle_recently_unblocked_port(exec_state, consensus, port_handle, sched_ctx, comp_ctx);
 
        }
 
    }
 

	
 
    return Ok(());
 
}
 
@@ -763,24 +750,25 @@ pub(crate) fn default_handle_sync_decision(
 
    );
 

	
 
    sched_ctx.info(&format!("Handling decision {:?} (in mode: {:?})", decision, exec_state.mode));
 
    consensus.notify_sync_decision(decision);
 
    if success {
 
        // We cannot get a success message if the component has encountered an
 
        // error.
 
        for port_index in 0..comp_ctx.num_ports() {
 
            let port_info = comp_ctx.get_port_by_index_mut(port_index);
 
            if port_info.close_at_sync_end {
 
                port_info.state.set(PortStateFlag::Closed);
 
            }
 
            port_info.state.clear(PortStateFlag::Received);
 
        }
 
        debug_assert_eq!(exec_state.mode, CompMode::SyncEnd);
 
        exec_state.mode = CompMode::NonSync;
 
        return Some(true);
 
    } else {
 
        // We may get failure both in all possible cases. But we should only
 
        // modify the execution state if we're not already in exit mode
 
        if !exec_state.mode.is_busy_exiting() {
 
            sched_ctx.error("failed synchronous round, initiating exit");
 
            exec_state.set_as_start_exit(ExitReason::ErrorNonSync);
 
        }
 
        return Some(false);
 
@@ -908,26 +896,26 @@ fn perform_send_message_with_ports(
 
    find_ports_in_value_group(&exec_state.mode_value, &mut transmit_ports);
 

	
 
    let port_handle = comp_ctx.get_port_handle(exec_state.mode_port);
 
    let port_info = comp_ctx.get_port(port_handle);
 
    let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 

	
 
    // Annotate the data message
 
    let message_value = exec_state.mode_value.take();
 
    let mut annotated_message = consensus.annotate_data_message(comp_ctx, port_info, message_value);
 

	
 
    // And further enhance the message by adding data about the ports that are
 
    // being transferred
 
    for (port_locations, port_id) in transmit_ports {
 
        let transmit_port_handle = comp_ctx.get_port_handle(port_id);
 
    for (port_locations, transmit_port_id) in transmit_ports {
 
        let transmit_port_handle = comp_ctx.get_port_handle(transmit_port_id);
 
        let transmit_port_info = comp_ctx.get_port(transmit_port_handle);
 

	
 
        let transmit_messages = take_port_messages(comp_ctx, transmit_port_id, inbox_main, inbox_backup);
 

	
 
        let mut transmit_port_state = transmit_port_info.state;
 
        debug_assert!(transmit_port_state.is_set(PortStateFlag::Transmitted));
 
        transmit_port_state.clear(PortStateFlag::Transmitted);
 

	
 
        annotated_message.ports.push(TransmittedPort{
 
            locations: port_locations,
 
            messages: transmit_messages,
 
            peer_comp: transmit_port_info.peer_comp_id,
 
@@ -936,26 +924,26 @@ fn perform_send_message_with_ports(
 
            state: transmit_port_state
 
        })
 
    }
 

	
 
    // And finally, send the message to the peer
 
    let peer_info = comp_ctx.get_peer(peer_handle);
 
    peer_info.handle.send_message_logged(sched_ctx, Message::Data(annotated_message), true);
 
}
 

	
 
/// Handles an `Ack` for the control layer.
 
fn default_handle_ack(
 
    exec_state: &mut CompExecState, control: &mut ControlLayer, control_id: ControlId,
 
    sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, inbox_main: &mut InboxMain,
 
    inbox_backup: &mut InboxBackup
 
    sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus,
 
    inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup
 
) {
 
    // Since an `Ack` may cause another one, handle them in a loop
 
    let mut to_ack = control_id;
 
    loop {
 
        let (action, new_to_ack) = control.handle_ack(to_ack, sched_ctx, comp_ctx);
 
        match action {
 
            AckAction::SendMessage(target_comp, message) => {
 
                // FIX @NoDirectHandle
 
                let mut handle = sched_ctx.runtime.get_component_public(target_comp);
 
                handle.send_message_logged(sched_ctx, Message::Control(message), true);
 
                let _should_remove = handle.decrement_users();
 
                debug_assert!(_should_remove.is_none());
 
@@ -1075,25 +1063,25 @@ pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut En
 
                let heap_region = &group.regions[*heap_pos as usize];
 
                for (value_index, embedded_value) in heap_region.iter().enumerate() {
 
                    let value_location = ValueId::Heap(*heap_pos, value_index as u32);
 
                    find_port_in_value(group, embedded_value, value_location, ports);
 
                }
 
            },
 
            _ => {}, // values we don't care about
 
        }
 
    }
 

	
 
    // Clear the ports, then scan all the available values
 
    ports.clear();
 
    for (value_index, value) in &value_group.values.iter().enumerate() {
 
    for (value_index, value) in value_group.values.iter().enumerate() {
 
        find_port_in_value(value_group, value, ValueId::Stack(value_index as u32), ports);
 
    }
 
}
 

	
 
/// Goes through the inbox of a component and takes out all the messages that
 
/// are targeted at a specific port
 
pub(crate) fn take_port_messages(
 
    comp_ctx: &CompCtx, port_id: PortId,
 
    inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup
 
) -> Vec<DataMessage> {
 
    let mut messages = Vec::new();
 
    let port_handle = comp_ctx.get_port_handle(port_id);
src/runtime2/component/component_context.rs
Show inline comments
 
@@ -187,38 +187,59 @@ impl CompCtx {
 
            kind: PortKind::Getter,
 
            state: PortState::new(),
 
            peer_comp_id: self.id,
 
            last_instruction: PortInstruction::None,
 
            close_at_sync_end: false,
 
            received_message_for_sync: false,
 
            associated_with_peer: false,
 
        });
 

	
 
        return Channel{ putter_id, getter_id };
 
    }
 

	
 
    /// Adds a new port. Make sure to call `add_peer` afterwards.
 
    /// Adds a new port. Make sure to call `change_peer` afterwards.
 
    pub(crate) fn add_port(&mut self, peer_comp_id: CompId, peer_port_id: PortId, kind: PortKind, state: PortState) -> LocalPortHandle {
 
        let self_id = PortId(self.take_port_id());
 
        self.ports.push(Port{
 
            self_id, peer_comp_id, peer_port_id, kind, state,
 
            last_instruction: PortInstruction::None,
 
            close_at_sync_end: false,
 
            received_message_for_sync: false,
 
            associated_with_peer: false,
 
        });
 
        return LocalPortHandle(self_id);
 
    }
 

	
 
    /// Removes a port. Make sure you called `remove_peer` first.
 
    /// Adds a self-reference. Called by the runtime/scheduler
 
    pub(crate) fn add_self_reference(&mut self, self_handle: CompHandle) {
 
        debug_assert_eq!(self.id, self_handle.id());
 
        debug_assert!(self.get_peer_index_by_id(self.id).is_none());
 
        self.peers.push(Peer{
 
            id: self.id,
 
            num_associated_ports: 0,
 
            handle: self_handle
 
        });
 
    }
 

	
 
    /// Removes a self-reference. Called by the runtime/scheduler
 
    pub(crate) fn remove_self_reference(&mut self) -> Option<CompKey> {
 
        let self_index = self.get_peer_index_by_id(self.id).unwrap();
 
        let peer = &mut self.peers[self_index];
 
        let maybe_comp_key = peer.handle.decrement_users();
 
        self.peers.remove(self_index);
 

	
 
        return maybe_comp_key;
 
    }
 

	
 
    /// Removes a port. Make sure you called `change_peer` first.
 
    pub(crate) fn remove_port(&mut self, port_handle: LocalPortHandle) -> Port {
 
        let port_index = self.must_get_port_index(port_handle);
 
        let port = self.ports.remove(port_index);
 
        dbg_code!(assert!(!port.associated_with_peer));
 
        return port;
 
    }
 

	
 
    /// Changes a peer
 
    pub(crate) fn change_port_peer(&mut self, sched_ctx: &SchedulerCtx, port_handle: LocalPortHandle, new_peer_comp_id: Option<CompId>) {
 
        // If port is currently associated with a peer, then remove that peer
 
        let port_index = self.get_port_index(port_handle);
 
        let port = &mut self.ports[port_index];
src/runtime2/component/component_internet.rs
Show inline comments
 
@@ -101,40 +101,40 @@ impl Component for ComponentTcpClient {
 
    fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) {
 
        match message {
 
            Message::Data(message) => {
 
                self.handle_incoming_data_message(sched_ctx, comp_ctx, message);
 
            },
 
            Message::Sync(message) => {
 
                let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message);
 
                component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, comp_ctx, decision, &mut self.consensus);
 
            },
 
            Message::Control(message) => {
 
                if let Err(location_and_message) = component::default_handle_control_message(
 
                    &mut self.exec_state, &mut self.control, &mut self.consensus,
 
                    message, sched_ctx, comp_ctx
 
                    message, sched_ctx, comp_ctx, &mut self.inbox_main, &mut self.inbox_backup
 
                ) {
 
                    component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message);
 
                }
 
            },
 
            Message::Poll => {
 
                sched_ctx.info("Received polling event");
 
            },
 
        }
 
    }
 

	
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling {
 
        sched_ctx.info(&format!("Running component ComponentTcpClient (mode: {:?}, sync state: {:?})", self.exec_state.mode, self.sync_state));
 

	
 
        match self.exec_state.mode {
 
            CompMode::BlockedSelect => {
 
            CompMode::BlockedSelect | CompMode::BlockedPutPorts => {
 
                // Not possible: we never enter this state
 
                unreachable!();
 
            },
 
            CompMode::NonSync => {
 
                // When in non-sync mode
 
                match &mut self.socket_state {
 
                    SocketState::Connected(_socket) => {
 
                        if self.sync_state == SyncState::FinishSyncThenQuit {
 
                            // Previous request was to let the component shut down
 
                            self.exec_state.set_as_start_exit(ExitReason::Termination);
 
                        } else {
 
                            // Reset for a new request
 
@@ -230,25 +230,29 @@ impl Component for ComponentTcpClient {
 
                    SyncState::Getting => {
 
                        // We're going to try and receive a single message. If
 
                        // this causes us to end up blocking the component
 
                        // goes to sleep until it is polled.
 
                        const BUFFER_SIZE: usize = 1024; // TODO: Move to config
 

	
 
                        let socket = self.socket_state.get_socket();
 
                        self.byte_buffer.resize(BUFFER_SIZE, 0);
 
                        match socket.receive(&mut self.byte_buffer) {
 
                            Ok(num_received) => {
 
                                self.byte_buffer.resize(num_received, 0);
 
                                let message_content = self.bytes_to_data_message_content(&self.byte_buffer);
 
                                let send_result = component::default_send_data_message(&mut self.exec_state, self.pdl_output_port_id, PortInstruction::NoSource, message_content, sched_ctx, &mut self.consensus, comp_ctx);
 
                                let send_result = component::default_send_data_message(
 
                                    &mut self.exec_state, self.pdl_output_port_id, PortInstruction::NoSource,
 
                                    message_content, sched_ctx, &mut self.consensus, &mut self.control, comp_ctx
 
                                );
 

	
 
                                if let Err(location_and_message) = send_result {
 
                                    component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message);
 
                                    return CompScheduling::Immediate;
 
                                } else {
 
                                    let scheduling = send_result.unwrap();
 
                                    self.sync_state = SyncState::AwaitingCmd;
 
                                    return scheduling;
 
                                }
 
                            },
 
                            Err(err) => {
 
                                if err.kind() == IoErrorKind::WouldBlock {
 
                                    return CompScheduling::Sleep; // wait until polled
src/runtime2/component/component_pdl.rs
Show inline comments
 
@@ -209,25 +209,25 @@ pub(crate) struct CompPDL {
 
    pub exec_state: CompExecState,
 
    select_state: SelectState,
 
    pub prompt: Prompt,
 
    pub control: ControlLayer,
 
    pub consensus: Consensus,
 
    pub sync_counter: u32,
 
    pub exec_ctx: ExecCtx,
 
    // TODO: Temporary field, simulates future plans of having one storage place
 
    //  reserved per port.
 
    // Should be same length as the number of ports. Corresponding indices imply
 
    // message is intended for that port.
 
    pub inbox_main: InboxMain,
 
    pub inbox_backup: Vec<DataMessage>,
 
    pub inbox_backup: InboxBackup,
 
}
 

	
 
impl Component for CompPDL {
 
    fn on_creation(&mut self, _id: CompId, _sched_ctx: &SchedulerCtx) {
 
        // Intentionally empty
 
    }
 

	
 
    fn on_shutdown(&mut self, _sched_ctx: &SchedulerCtx) {
 
        // Intentionally empty
 
    }
 

	
 
    fn adopt_message(&mut self, comp_ctx: &mut CompCtx, message: DataMessage) {
 
@@ -248,50 +248,51 @@ impl Component for CompPDL {
 
            let _should_remove = target.decrement_users();
 
            debug_assert!(_should_remove.is_none());
 
            return;
 
        }
 

	
 
        match message {
 
            Message::Data(message) => {
 
                self.handle_incoming_data_message(sched_ctx, comp_ctx, message);
 
            },
 
            Message::Control(message) => {
 
                if let Err(location_and_message) = component::default_handle_control_message(
 
                    &mut self.exec_state, &mut self.control, &mut self.consensus,
 
                    message, sched_ctx, comp_ctx
 
                    message, sched_ctx, comp_ctx, &mut self.inbox_main, &mut self.inbox_backup
 
                ) {
 
                    self.handle_generic_component_error(sched_ctx, location_and_message);
 
                }
 
            },
 
            Message::Sync(message) => {
 
                self.handle_incoming_sync_message(sched_ctx, comp_ctx, message);
 
            },
 
            Message::Poll => {
 
                unreachable!(); // because we never register at the polling thread
 
            }
 
        }
 
    }
 

	
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling {
 
        use EvalContinuation as EC;
 

	
 
        sched_ctx.info(&format!("Running component (mode: {:?})", self.exec_state.mode));
 

	
 
        // Depending on the mode don't do anything at all, take some special
 
        // actions, or fall through and run the PDL code.
 
        match self.exec_state.mode {
 
            CompMode::NonSync | CompMode::Sync => {
 
                // continue and run PDL code
 
            },
 
            CompMode::SyncEnd | CompMode::BlockedGet | CompMode::BlockedPut | CompMode::BlockedSelect => {
 
            CompMode::SyncEnd | CompMode::BlockedGet | CompMode::BlockedPut |
 
            CompMode::BlockedSelect | CompMode::BlockedPutPorts => {
 
                return CompScheduling::Sleep;
 
            }
 
            CompMode::StartExit => return component::default_handle_start_exit(
 
                &mut self.exec_state, &mut self.control, sched_ctx, comp_ctx, &mut self.consensus
 
            ),
 
            CompMode::BusyExit => return component::default_handle_busy_exit(
 
                &mut self.exec_state, &self.control, sched_ctx
 
            ),
 
            CompMode::Exit => return component::default_handle_exit(&self.exec_state),
 
        }
 

	
 
        let run_result = self.execute_prompt(&sched_ctx);
 
@@ -333,25 +334,25 @@ impl Component for CompPDL {
 
                    }
 
                }
 
            },
 
            EC::Put(expr_id, port_id, value) => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::Sync);
 
                sched_ctx.info(&format!("Putting value {:?}", value));
 

	
 
                // Send the message
 
                let target_port_id = port_id_from_eval(port_id);
 
                let send_result = component::default_send_data_message(
 
                    &mut self.exec_state, target_port_id,
 
                    PortInstruction::SourceLocation(expr_id), value,
 
                    sched_ctx, &mut self.consensus, comp_ctx
 
                    sched_ctx, &mut self.consensus, &mut self.control, comp_ctx
 
                );
 
                if let Err(location_and_message) = send_result {
 
                    self.handle_generic_component_error(sched_ctx, location_and_message);
 
                    return CompScheduling::Immediate;
 
                } else {
 
                    // When `run` is called again (potentially after becoming
 
                    // unblocked) we need to instruct the executor that we performed
 
                    // the `put`
 
                    let scheduling = send_result.unwrap();
 
                    self.exec_ctx.stmt = ExecStmt::PerformedPut;
 
                    return scheduling;
 
                }
 
@@ -568,36 +569,36 @@ impl CompPDL {
 
        sched_ctx: &SchedulerCtx, creator_ctx: &mut CompCtx,
 
        definition_id: ProcedureDefinitionId, type_id: TypeId, mut arguments: ValueGroup
 
    ) {
 
        struct PortPair{
 
            creator_handle: LocalPortHandle,
 
            creator_id: PortId,
 
            created_handle: LocalPortHandle,
 
            created_id: PortId,
 
        }
 
        let mut opened_port_id_pairs = Vec::new();
 
        let mut closed_port_id_pairs = Vec::new();
 

	
 
        let reservation = sched_ctx.runtime.start_create_pdl_component();
 
        let reservation = sched_ctx.runtime.start_create_component();
 
        let mut created_ctx = CompCtx::new(&reservation);
 

	
 
        // let other_proc = &sched_ctx.runtime.protocol.heap[definition_id];
 
        // let self_proc = &sched_ctx.runtime.protocol.heap[self.prompt.frames[0].definition];
 
        // dbg_code!({
 
        //     sched_ctx.log(&format!(
 
        //         "DEBUG: Comp '{}' (ID {:?}) is creating comp '{}' (ID {:?})",
 
        //         self_proc.identifier.value.as_str(), creator_ctx.id,
 
        //         other_proc.identifier.value.as_str(), reservation.id()
 
        //     ));
 
        // });
 
        let other_proc = &sched_ctx.runtime.protocol.heap[definition_id];
 
        let self_proc = &sched_ctx.runtime.protocol.heap[self.prompt.frames[0].definition];
 
        dbg_code!({
 
            sched_ctx.info(&format!(
 
                "DEBUG: Comp '{}' (ID {:?}) is creating comp '{}' (ID {:?})",
 
                self_proc.identifier.value.as_str(), creator_ctx.id,
 
                other_proc.identifier.value.as_str(), reservation.id()
 
            ));
 
        });
 

	
 
        // Take all the ports ID that are in the `args` (and currently belong to
 
        // the creator component) and translate them into new IDs that are
 
        // associated with the component we're about to create
 
        let mut arg_iter = ValueGroupPortIter::new(&mut arguments);
 
        while let Some(port_reference) = arg_iter.next() {
 
            // Create port entry for new component
 
            let creator_port_id = port_reference.id;
 
            let creator_port_handle = creator_ctx.get_port_handle(creator_port_id);
 
            let creator_port = creator_ctx.get_port(creator_port_handle);
 
            let created_port_handle = created_ctx.add_port(
 
                creator_port.peer_comp_id, creator_port.peer_port_id,
 
@@ -645,49 +646,49 @@ impl CompPDL {
 
                // Peer of the transferred port is the component that is
 
                // creating the new component.
 
                let created_peer_port_index = opened_port_id_pairs
 
                    .iter()
 
                    .position(|v| v.creator_id == creator_port_info.peer_port_id);
 
                match created_peer_port_index {
 
                    Some(created_peer_port_index) => {
 
                        // Addendum to the above comment: but that port is also
 
                        // moving to the new component
 
                        let peer_pair = &opened_port_id_pairs[created_peer_port_index];
 
                        created_port_info.peer_port_id = peer_pair.created_id;
 
                        created_port_info.peer_comp_id = reservation.id();
 
                        todo!("either add 'self peer', or remove that idea from Ctx altogether");`
 
                        todo!("either add 'self peer', or remove that idea from Ctx altogether");
 
                    },
 
                    None => {
 
                        // Peer port remains with creator component.
 
                        created_port_info.peer_comp_id = creator_ctx.id;
 
                        created_ctx.change_port_peer(sched_ctx, pair.created_handle, Some(creator_ctx.id));
 
                    }
 
                }
 
            } else {
 
                // Peer is a different component. We'll deal with sending the
 
                // appropriate messages later
 
                let peer_handle = creator_ctx.get_peer_handle(created_port_info.peer_comp_id);
 
                let peer_info = creator_ctx.get_peer(peer_handle);
 
                created_ctx.change_port_peer(sched_ctx, pair.created_handle, Some(peer_info.id));
 
                created_component_has_remote_peers = true;
 
            }
 
        }
 

	
 
        // We'll now actually turn our reservation for a new component into an
 
        // actual component. Note that we initialize it as "not sleeping" as
 
        // its initial scheduling might be performed based on `Ack`s in response
 
        // to message exchanges between remote peers.
 
        let total_num_ports = opened_port_id_pairs.len() + closed_port_id_pairs.len();
 
        let component = component::create_component(&sched_ctx.runtime.protocol, definition_id, type_id, arguments, total_num_ports);
 
        let (created_key, component) = sched_ctx.runtime.finish_create_pdl_component(
 
        let (created_key, component) = sched_ctx.runtime.finish_create_component(
 
            reservation, component, created_ctx, false,
 
        );
 
        component.component.on_creation(created_key.downgrade(), sched_ctx);
 

	
 
        // Now modify the creator's ports: remove every transferred port and
 
        // potentially remove the peer component.
 
        for pair in opened_port_id_pairs.iter() {
 
            // Remove peer if appropriate
 
            let creator_port_index = creator_ctx.get_port_index(pair.creator_handle);
 
            creator_ctx.change_port_peer(sched_ctx, pair.creator_handle, None);
 
            creator_ctx.remove_port(pair.creator_handle);
 

	
src/runtime2/component/component_random.rs
Show inline comments
 
use rand::prelude as random;
 
use rand::RngCore;
 

	
 
use crate::protocol::eval::{ValueGroup, Value};
 
use crate::runtime2::*;
 
use crate::runtime2::communication::*;
 

	
 
use super::*;
 
use super::component::{
 
    self,
 
    Component, CompExecState, CompScheduling,
 
    CompMode, ExitReason
 
};
 
use super::component::*;
 
use super::control_layer::*;
 
use super::consensus::*;
 

	
 
/// TODO: Temporary component to figure out what to do with custom components.
 
///     This component sends random numbers between two u32 limits
 
pub struct ComponentRandomU32 {
 
    // Properties for this specific component
 
    output_port_id: PortId,
 
    random_minimum: u32,
 
    random_maximum: u32,
 
    num_sends: u32,
 
    max_num_sends: u32,
 
    generator: random::ThreadRng,
 
    // Generic state-tracking
 
    exec_state: CompExecState,
 
    did_perform_send: bool, // when in sync mode
 
    control: ControlLayer,
 
    consensus: Consensus,
 
    inbox_main: InboxMain, // not used
 
    inbox_backup: InboxBackup, // not used
 
}
 

	
 
impl Component for ComponentRandomU32 {
 
    fn on_creation(&mut self, _id: CompId, _sched_ctx: &SchedulerCtx) {}
 

	
 
    fn on_shutdown(&mut self, sched_ctx: &SchedulerCtx) {}
 

	
 
    fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, _message: DataMessage) {
 
        // Impossible since this component does not have any input ports in its
 
        // signature.
 
        unreachable!();
 
    }
 

	
 
    fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) {
 
        match message {
 
            Message::Data(_message) => unreachable!(),
 
            Message::Sync(message) => {
 
                let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message);
 
                component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, comp_ctx, decision, &mut self.consensus);
 
            },
 
            Message::Control(message) => {
 
                if let Err(location_and_message) = component::default_handle_control_message(
 
                    &mut self.exec_state, &mut self.control, &mut self.consensus,
 
                    message, sched_ctx, comp_ctx
 
                    message, sched_ctx, comp_ctx, &mut self.inbox_main, &mut self.inbox_backup
 
                ) {
 
                    component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message);
 
                }
 
            },
 
            Message::Poll => unreachable!(),
 
        }
 
    }
 

	
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling {
 
        sched_ctx.info(&format!("Running component ComponentRandomU32 (mode: {:?})", self.exec_state.mode));
 

	
 
        match self.exec_state.mode {
 
            CompMode::BlockedGet | CompMode::BlockedSelect => {
 
            CompMode::BlockedGet | CompMode::BlockedSelect | CompMode::BlockedPutPorts => {
 
                // impossible for this component, no input ports and no select
 
                // blocks
 
                unreachable!();
 
            }
 
            CompMode::NonSync => {
 
                // If in non-sync mode then we check if the arguments make sense
 
                // (at some point in the future, this is just a testing
 
                // component).
 
                if self.random_minimum >= self.random_maximum {
 
                    // Could throw an evaluation error, but lets just panic
 
                    panic!("going to crash 'n burn your system now, please provide valid arguments");
 
                }
 
@@ -95,25 +93,26 @@ impl Component for ComponentRandomU32 {
 
                // consensus has been reached
 
                if !self.did_perform_send {
 
                    sched_ctx.info("Sending random message");
 
                    let mut random = self.generator.next_u32() - self.random_minimum;
 
                    let random_delta = self.random_maximum - self.random_minimum;
 
                    random %= random_delta;
 
                    random += self.random_minimum;
 
                    let value_group = ValueGroup::new_stack(vec![Value::UInt32(random)]);
 

	
 
                    let send_result = component::default_send_data_message(
 
                        &mut self.exec_state, self.output_port_id,
 
                        PortInstruction::NoSource, value_group,
 
                        sched_ctx, &mut self.consensus, comp_ctx
 
                        sched_ctx, &mut self.consensus, &mut self.control,
 
                        comp_ctx
 
                    );
 

	
 
                    if let Err(location_and_message) = send_result {
 
                        component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message);
 
                        return CompScheduling::Immediate
 
                    } else {
 
                        // Blocked or not, we set `did_perform_send` to true. If
 
                        // blocked then the moment we become unblocked (and are back
 
                        // at the `Sync` mode) we have sent the message.
 
                        let scheduling = send_result.unwrap();
 
                        self.did_perform_send = true;
 
                        self.num_sends += 1;
 
@@ -148,15 +147,17 @@ impl ComponentRandomU32 {
 

	
 
        return Self{
 
            output_port_id: port_id,
 
            random_minimum: minimum,
 
            random_maximum: maximum,
 
            num_sends: 0,
 
            max_num_sends: num_sends,
 
            generator: random::thread_rng(),
 
            exec_state: CompExecState::new(),
 
            did_perform_send: false,
 
            control: ControlLayer::default(),
 
            consensus: Consensus::new(),
 
            inbox_main: Vec::new(),
 
            inbox_backup: Vec::new(),
 
        }
 
    }
 
}
 
\ No newline at end of file
src/runtime2/component/consensus.rs
Show inline comments
 
@@ -357,25 +357,28 @@ impl Consensus {
 

	
 
    // -------------------------------------------------------------------------
 
    // Handling inbound and outbound messages
 
    // -------------------------------------------------------------------------
 

	
 
    /// Prepares a set of values to be sent of a channel.
 
    pub(crate) fn annotate_data_message(&mut self, comp_ctx: &CompCtx, port_info: &Port, content: ValueGroup) -> DataMessage {
 
        debug_assert_eq!(self.mode, Mode::SyncBusy); // can only send between sync start and sync end
 
        debug_assert!(self.ports.iter().any(|v| v.self_port_id == port_info.self_id));
 
        let data_header = self.create_data_header_and_update_mapping(port_info);
 
        let sync_header = self.create_sync_header(comp_ctx);
 

	
 
        return DataMessage{ data_header, sync_header, content };
 
        return DataMessage{
 
            data_header, sync_header, content,
 
            ports: Vec::new()
 
        };
 
    }
 

	
 
    /// Handles the arrival of a new data message (needs to be called for every
 
    /// new data message, even though it might not end up being received). This
 
    /// is used to determine peers of `get`ter ports.
 
    // TODO: The use of this function is rather ugly. Find a more robust
 
    //  scheme about owners of `get`ter ports not knowing about their peers.
 
    pub(crate) fn handle_incoming_data_message(&mut self, comp_ctx: &CompCtx, message: &DataMessage) {
 
        let target_handle = comp_ctx.get_port_handle(message.data_header.target_port);
 
        let target_index = comp_ctx.get_port_index(target_handle);
 
        let annotation = &mut self.ports[target_index];
 
        debug_assert!(
src/runtime2/error.rs
Show inline comments
 
use std::fmt::{Write, Debug, Display, Formatter as FmtFormatter, Result as FmtResult};
 
use std::fmt::{Debug, Display, Formatter as FmtFormatter, Result as FmtResult};
 

	
 
/// Represents an unrecoverable runtime error that is reported to the user (for
 
/// debugging purposes). Basically a human-readable message with its source
 
/// location. The error is chainable.
 
pub struct RtError {
 
    file: &'static str,
 
    line: u32,
 
    message: String,
 
    cause: Option<Box<RtError>>,
 
}
 

	
 
impl RtError {
src/runtime2/poll/mod.rs
Show inline comments
 
@@ -168,39 +168,56 @@ impl PollingThread {
 
            handle: Some(thread_handle),
 
        };
 
        let client_factory = PollingClientFactory{
 
            poller,
 
            generation_counter: Arc::new(AtomicU32::new(0)),
 
            queue_factory: queue_producers,
 
        };
 

	
 
        return Ok((thread_handle, client_factory));
 
    }
 

	
 
    pub(crate) fn run(&mut self) {
 
        use std::io::ErrorKind;
 
        use crate::runtime2::communication::Message;
 

	
 
        const NUM_EVENTS: usize = 256;
 
        const EPOLL_DURATION: time::Duration = time::Duration::from_millis(250);
 

	
 
        // @performance: Lot of improvements possible here, a HashMap is likely
 
        // a horrible way to do this.
 
        let mut events = Vec::with_capacity(NUM_EVENTS);
 
        let mut lookup = HashMap::with_capacity(64);
 
        self.log("Starting polling thread");
 

	
 
        loop {
 
            // Retrieve events first (because the PollingClient will first
 
            // register at epoll, and then push a command into the queue).
 
            self.poller.wait(&mut events, EPOLL_DURATION).unwrap();
 
            loop {
 
                let wait_result = self.poller.wait(&mut events, EPOLL_DURATION);
 
                match wait_result {
 
                    Ok(()) => break,
 
                    Err(reason) => {
 
                        match reason.kind() {
 
                            ErrorKind::Interrupted => {
 
                                // Happens when we're debugging and set a break-
 
                                // point, we want to continue waiting
 
                            },
 
                            _ => {
 
                                panic!("failed to poll: {}", reason);
 
                            }
 
                        }
 
                    }
 
                }
 
            }
 

	
 
            // Then handle everything in the command queue.
 
            while let Some(command) = self.queue.pop() {
 
                match command {
 
                    PollCmd::Register(handle, user_data) => {
 
                        self.log(&format!("Registering component {:?} as {}", handle.id(), user_data.0));
 
                        let key = Self::user_data_as_key(user_data);
 
                        debug_assert!(!lookup.contains_key(&key));
 
                        lookup.insert(key, handle);
 
                    },
 
                    PollCmd::Unregister(_file_descriptor, user_data) => {
 
                        let key = Self::user_data_as_key(user_data);
src/runtime2/runtime.rs
Show inline comments
 
@@ -65,51 +65,59 @@ impl CompReserved {
 

	
 
/// Representation of a runtime component. Contains the bookkeeping variables
 
/// for the schedulers, the publicly accessible fields, and the private fields
 
/// that should only be accessed by the thread running the component's routine.
 
pub(crate) struct RuntimeComp {
 
    pub public: CompPublic,
 
    pub component: Box<dyn Component>,
 
    pub ctx: CompCtx,
 
    pub inbox: QueueDynMpsc<Message>,
 
    pub exiting: bool,
 
}
 

	
 
/// Should contain everything that is accessible in a thread-safe manner
 
/// Should contain everything that is accessible in a thread-safe manner. May
 
/// NOT contain non-threadsafe fields.
 
// TODO: Do something about the `num_handles` thing. This needs to be a bit more
 
//  "foolproof" to lighten the mental burden of using the `num_handles`
 
//  variable.
 
pub(crate) struct CompPublic {
 
    pub sleeping: AtomicBool,
 
    pub num_handles: AtomicU32, // manually modified (!)
 
    inbox: QueueDynProducer<Message>,
 
}
 

	
 
/// Handle to public part of a component. Would be nice if we could
 
/// automagically manage the `num_handles` counter. But when it reaches zero we
 
/// need to manually remove the handle from the runtime. So we just have debug
 
/// code to make sure this actually happens.
 
pub(crate) struct CompHandle {
 
    target: *const CompPublic,
 
    id: CompId,
 
    #[cfg(debug_assertions)] decremented: bool,
 
}
 

	
 
impl CompHandle {
 
    fn new(id: CompId, public: &CompPublic) -> CompHandle {
 
        let handle = CompHandle{
 
    /// Creates a new component handle and does not increment the reference
 
    /// counter.
 
    fn new_unincremented(id: CompId, public: &CompPublic) -> CompHandle {
 
        return CompHandle{
 
            target: public,
 
            id,
 
            #[cfg(debug_assertions)] decremented: false,
 
        };
 
    }
 

	
 
    /// Creates a new component handle and increments the reference counter.
 
    fn new(id: CompId, public: &CompPublic) -> CompHandle {
 
        let mut handle = Self::new_unincremented(id, public);
 
        handle.increment_users();
 
        return handle;
 
    }
 

	
 
    pub(crate) fn send_message(&self, runtime: &RuntimeInner, message: Message, try_wake_up: bool) {
 
        self.inbox.push(message);
 
        if try_wake_up {
 
            wake_up_if_sleeping(runtime, self.id, self);
 
        }
 
    }
 

	
 
    #[inline]
 
@@ -223,28 +231,28 @@ impl Runtime {
 
            inner: runtime_inner,
 
            scheduler_threads,
 
            polling_handle,
 
        });
 
    }
 

	
 
    pub fn create_component(&self, module_name: &[u8], routine_name: &[u8]) -> Result<(), ComponentCreationError> {
 
        use crate::protocol::eval::ValueGroup;
 
        let prompt = self.inner.protocol.new_component(
 
            module_name, routine_name,
 
            ValueGroup::new_stack(Vec::new())
 
        )?;
 
        let reserved = self.inner.start_create_pdl_component();
 
        let reserved = self.inner.start_create_component();
 
        let ctx = CompCtx::new(&reserved);
 
        let component = Box::new(CompPDL::new(prompt, 0));
 
        let (key, _) = self.inner.finish_create_pdl_component(reserved, component, ctx, false);
 
        let (key, _) = self.inner.finish_create_component(reserved, component, ctx, false);
 
        self.inner.enqueue_work(key);
 

	
 
        return Ok(())
 
    }
 
}
 

	
 
impl Drop for Runtime {
 
    fn drop(&mut self) {
 
        self.inner.decrement_active_components();
 
        for handle in self.scheduler_threads.drain(..) {
 
            handle.join().expect("join scheduler thread");
 
        }
 
@@ -275,55 +283,64 @@ impl RuntimeInner {
 
        // We have work, or the schedulers should exit.
 
        return lock.pop_front();
 
    }
 

	
 
    pub(crate) fn enqueue_work(&self, key: CompKey) {
 
        let mut lock = self.work_queue.lock().unwrap();
 
        lock.push_back(key);
 
        self.work_condvar.notify_one();
 
    }
 

	
 
    // Creating/destroying components
 

	
 
    pub(crate) fn start_create_pdl_component(&self) -> CompReserved {
 
    pub(crate) fn start_create_component(&self) -> CompReserved {
 
        self.increment_active_components();
 
        let reservation = self.components.reserve();
 
        return CompReserved{ reservation };
 
    }
 

	
 
    pub(crate) fn finish_create_pdl_component(
 
    pub(crate) fn finish_create_component(
 
        &self, reserved: CompReserved,
 
        component: Box<dyn Component>, mut context: CompCtx, initially_sleeping: bool,
 
    ) -> (CompKey, &mut RuntimeComp) {
 
        // Construct runtime component
 
        let inbox_queue = QueueDynMpsc::new(16);
 
        let inbox_producer = inbox_queue.producer();
 

	
 
        let _id = reserved.id();
 
        context.id = reserved.id();
 
        let component = RuntimeComp {
 
        let component_id = reserved.id();
 
        context.id = component_id;
 

	
 
        let mut component = RuntimeComp {
 
            public: CompPublic{
 
                sleeping: AtomicBool::new(initially_sleeping),
 
                num_handles: AtomicU32::new(1), // the component itself acts like a handle
 
                inbox: inbox_producer,
 
            },
 
            component,
 
            ctx: context,
 
            inbox: inbox_queue,
 
            exiting: false,
 
        };
 

	
 
        // Submit created component into storage.
 
        let index = self.components.submit(reserved.reservation, component);
 
        debug_assert_eq!(index, _id.0);
 
        debug_assert_eq!(index, component_id.0);
 
        let component = self.components.get_mut(index);
 

	
 
        // Bit messy, but here we create the reference of a component to itself,
 
        // the `num_handles` being initialized to `1` above, and add it to the
 
        // component context.
 
        let self_handle = CompHandle::new_unincremented(component_id, &component.public);
 
        component.ctx.add_self_reference(self_handle);
 

	
 
        return (CompKey(index), component);
 
    }
 

	
 
    pub(crate) fn get_component(&self, key: CompKey) -> &mut RuntimeComp {
 
        let component = self.components.get_mut(key.0);
 
        return component;
 
    }
 

	
 
    pub(crate) fn get_component_public(&self, id: CompId) -> CompHandle {
 
        let component = self.components.get(id.0);
 
        return CompHandle::new(id, &component.public);
 
    }
src/runtime2/scheduler.rs
Show inline comments
 
@@ -80,25 +80,25 @@ impl Scheduler {
 
                    component.component.handle_message(&mut scheduler_ctx, &mut component.ctx, message);
 
                }
 
                new_scheduling = component.component.run(&mut scheduler_ctx, &mut component.ctx);
 
            }
 

	
 
            // Handle the new scheduling
 
            match new_scheduling {
 
                CompScheduling::Immediate => unreachable!(),
 
                CompScheduling::Requeue => { self.runtime.enqueue_work(comp_key); },
 
                CompScheduling::Sleep => { self.mark_component_as_sleeping(comp_key, component); },
 
                CompScheduling::Exit => {
 
                    component.component.on_shutdown(&scheduler_ctx);
 
                    self.mark_component_as_exiting(&scheduler_ctx, component);
 
                    self.mark_component_as_exiting(&scheduler_ctx, comp_key, component);
 
                }
 
            }
 
        }
 
    }
 

	
 
    // local utilities
 

	
 
    /// Marks component as sleeping, if after marking itself as sleeping the
 
    /// inbox contains messages then the component will be immediately
 
    /// rescheduled. After calling this function the component should not be
 
    /// executed anymore.
 
    fn mark_component_as_sleeping(&self, key: CompKey, component: &mut RuntimeComp) {
 
@@ -111,31 +111,29 @@ impl Scheduler {
 
                .compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed)
 
                .is_ok();
 

	
 
            if should_reschedule {
 
                self.runtime.enqueue_work(key);
 
            }
 
        }
 
    }
 

	
 
    /// Marks the component as exiting by removing the reference it holds to
 
    /// itself. Afterward the component will enter "normal" sleeping mode (if it
 
    /// has not yet been destroyed)
 
    fn mark_component_as_exiting(&self, sched_ctx: &SchedulerCtx, component: &mut RuntimeComp) {
 
    fn mark_component_as_exiting(&self, sched_ctx: &SchedulerCtx, comp_key: CompKey, component: &mut RuntimeComp) {
 
        // If we didn't yet decrement our reference count, do so now
 
        let comp_key = unsafe{ component.ctx.id.upgrade() };
 

	
 
        if !component.exiting {
 
            component.exiting = true;
 

	
 
            let old_count = component.public.num_handles.fetch_sub(1, Ordering::AcqRel);
 
            let new_count = old_count - 1;
 
            if new_count == 0 {
 
            let maybe_comp_key = component.ctx.remove_self_reference();
 
            if let Some(_comp_key) = maybe_comp_key {
 
                debug_assert_eq!(_comp_key.0, comp_key.0);
 
                sched_ctx.runtime.destroy_component(comp_key);
 
                return;
 
            }
 
        }
 

	
 
        // Enter "regular" sleeping mode
 
        self.mark_component_as_sleeping(comp_key, component);
 
    }
 
}
 
\ No newline at end of file
src/runtime2/tests/messaging.rs
Show inline comments
 
new file 100644
 
use super::*;
 

	
 

	
 
#[test]
 
fn test_component_communication() {
 
    let pd = ProtocolDescription::parse(b"
 
    primitive sender(out<u32> o, u32 outside_loops, u32 inside_loops) {
 
        u32 outside_index = 0;
 
        while (outside_index < outside_loops) {
 
            u32 inside_index = 0;
 
            sync while (inside_index < inside_loops) {
 
                put(o, inside_index);
 
                inside_index += 1;
 
            }
 
            outside_index += 1;
 
        }
 
    }
 

	
 
    primitive receiver(in<u32> i, u32 outside_loops, u32 inside_loops) {
 
        u32 outside_index = 0;
 
        while (outside_index < outside_loops) {
 
            u32 inside_index = 0;
 
            sync while (inside_index < inside_loops) {
 
                auto val = get(i);
 
                while (val != inside_index) {} // infinite loop if incorrect value is received
 
                inside_index += 1;
 
            }
 
            outside_index += 1;
 
        }
 
    }
 

	
 
    composite constructor() {
 
        channel o_orom -> i_orom;
 
        channel o_mrom -> i_mrom;
 
        channel o_ormm -> i_ormm;
 
        channel o_mrmm -> i_mrmm;
 

	
 
        // one round, one message per round
 
        new sender(o_orom, 1, 1);
 
        new receiver(i_orom, 1, 1);
 

	
 
        // multiple rounds, one message per round
 
        new sender(o_mrom, 5, 1);
 
        new receiver(i_mrom, 5, 1);
 

	
 
        // one round, multiple messages per round
 
        new sender(o_ormm, 1, 5);
 
        new receiver(i_ormm, 1, 5);
 

	
 
        // multiple rounds, multiple messages per round
 
        new sender(o_mrmm, 5, 5);
 
        new receiver(i_mrmm, 5, 5);
 
    }").expect("compilation");
 
    let rt = Runtime::new(3, LOG_LEVEL, pd).unwrap();
 
    create_component(&rt, "", "constructor", no_args());
 
}
 

	
 
#[test]
 
fn test_send_to_self() {
 
    compile_and_create_component("
 
    primitive insane_in_the_membrane() {
 
        channel a -> b;
 
        sync {
 
            put(a, 1);
 
            auto v = get(b);
 
            while (v != 1) {}
 
        }
 
    }
 
    ", "insane_in_the_membrane", no_args());
 
}
 

	
 
#[test]
 
fn test_intermediate_messenger() {
 
    let pd = ProtocolDescription::parse(b"
 
    primitive receiver<T>(in<T> rx, u32 num) {
 
        auto index = 0;
 
        while (index < num) {
 
            sync { auto v = get(rx); }
 
            index += 1;
 
        }
 
    }
 

	
 
    primitive middleman<T>(in<T> rx, out<T> tx, u32 num) {
 
        auto index = 0;
 
        while (index < num) {
 
            sync { put(tx, get(rx)); }
 
            index += 1;
 
        }
 
    }
 

	
 
    primitive sender<T>(out<T> tx, u32 num) {
 
        auto index = 0;
 
        while (index < num) {
 
            sync put(tx, 1337);
 
            index += 1;
 
        }
 
    }
 

	
 
    composite constructor_template<T>() {
 
        auto num = 0;
 
        channel<T> tx_a -> rx_a;
 
        channel tx_b -> rx_b;
 
        new sender(tx_a, 3);
 
        new middleman(rx_a, tx_b, 3);
 
        new receiver(rx_b, 3);
 
    }
 

	
 
    composite constructor() {
 
        new constructor_template<u16>();
 
        new constructor_template<u32>();
 
        new constructor_template<u64>();
 
        new constructor_template<s16>();
 
        new constructor_template<s32>();
 
        new constructor_template<s64>();
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(3, LOG_LEVEL, pd).unwrap();
 
    create_component(&rt, "", "constructor", no_args());
 
}
src/runtime2/tests/mod.rs
Show inline comments
 
use crate::protocol::*;
 
use crate::protocol::eval::*;
 
use crate::runtime2::runtime::*;
 
use crate::runtime2::component::{CompCtx, CompPDL};
 

	
 
mod messaging;
 
mod error_handling;
 
mod transfer_ports;
 

	
 
const LOG_LEVEL: LogLevel = LogLevel::Debug;
 
const NUM_THREADS: u32 = 4;
 
const NUM_THREADS: u32 = 1;
 

	
 
pub(crate) fn compile_and_create_component(source: &str, routine_name: &str, args: ValueGroup) {
 
    let protocol = ProtocolDescription::parse(source.as_bytes())
 
        .expect("successful compilation");
 
    let runtime = Runtime::new(NUM_THREADS, LOG_LEVEL, protocol)
 
        .expect("successful runtime startup");
 
    create_component(&runtime, "", routine_name, args);
 
}
 

	
 
pub(crate) fn create_component(rt: &Runtime, module_name: &str, routine_name: &str, args: ValueGroup) {
 
    let prompt = rt.inner.protocol.new_component(
 
        module_name.as_bytes(), routine_name.as_bytes(), args
 
    ).expect("create prompt");
 
    let reserved = rt.inner.start_create_pdl_component();
 
    let reserved = rt.inner.start_create_component();
 
    let ctx = CompCtx::new(&reserved);
 
    let component = Box::new(CompPDL::new(prompt, 0));
 
    let (key, _) = rt.inner.finish_create_pdl_component(reserved, component, ctx, false);
 
    let (key, _) = rt.inner.finish_create_component(reserved, component, ctx, false);
 
    rt.inner.enqueue_work(key);
 
}
 

	
 
pub(crate) fn no_args() -> ValueGroup { ValueGroup::new_stack(Vec::new()) }
 

	
 
#[test]
 
fn test_component_creation() {
 
    let pd = ProtocolDescription::parse(b"
 
    primitive nothing_at_all() {
 
        s32 a = 5;
 
        auto b = 5 + a;
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(1, LOG_LEVEL, pd).unwrap();
 

	
 
    for _i in 0..20 {
 
        create_component(&rt, "", "nothing_at_all", no_args());
 
    }
 
}
 

	
 
#[test]
 
fn test_component_communication() {
 
    let pd = ProtocolDescription::parse(b"
 
    primitive sender(out<u32> o, u32 outside_loops, u32 inside_loops) {
 
        u32 outside_index = 0;
 
        while (outside_index < outside_loops) {
 
            u32 inside_index = 0;
 
            sync while (inside_index < inside_loops) {
 
                put(o, inside_index);
 
                inside_index += 1;
 
            }
 
            outside_index += 1;
 
        }
 
    }
 

	
 
    primitive receiver(in<u32> i, u32 outside_loops, u32 inside_loops) {
 
        u32 outside_index = 0;
 
        while (outside_index < outside_loops) {
 
            u32 inside_index = 0;
 
            sync while (inside_index < inside_loops) {
 
                auto val = get(i);
 
                while (val != inside_index) {} // infinite loop if incorrect value is received
 
                inside_index += 1;
 
            }
 
            outside_index += 1;
 
        }
 
    }
 

	
 
    composite constructor() {
 
        channel o_orom -> i_orom;
 
        channel o_mrom -> i_mrom;
 
        channel o_ormm -> i_ormm;
 
        channel o_mrmm -> i_mrmm;
 

	
 
        // one round, one message per round
 
        new sender(o_orom, 1, 1);
 
        new receiver(i_orom, 1, 1);
 

	
 
        // multiple rounds, one message per round
 
        new sender(o_mrom, 5, 1);
 
        new receiver(i_mrom, 5, 1);
 

	
 
        // one round, multiple messages per round
 
        new sender(o_ormm, 1, 5);
 
        new receiver(i_ormm, 1, 5);
 

	
 
        // multiple rounds, multiple messages per round
 
        new sender(o_mrmm, 5, 5);
 
        new receiver(i_mrmm, 5, 5);
 
    }").expect("compilation");
 
    let rt = Runtime::new(3, LOG_LEVEL, pd).unwrap();
 
    create_component(&rt, "", "constructor", no_args());
 
}
 

	
 
#[test]
 
fn test_intermediate_messenger() {
 
    let pd = ProtocolDescription::parse(b"
 
    primitive receiver<T>(in<T> rx, u32 num) {
 
        auto index = 0;
 
        while (index < num) {
 
            sync { auto v = get(rx); }
 
            index += 1;
 
        }
 
    }
 

	
 
    primitive middleman<T>(in<T> rx, out<T> tx, u32 num) {
 
        auto index = 0;
 
        while (index < num) {
 
            sync { put(tx, get(rx)); }
 
            index += 1;
 
        }
 
    }
 

	
 
    primitive sender<T>(out<T> tx, u32 num) {
 
        auto index = 0;
 
        while (index < num) {
 
            sync put(tx, 1337);
 
            index += 1;
 
        }
 
    }
 

	
 
    composite constructor_template<T>() {
 
        auto num = 0;
 
        channel<T> tx_a -> rx_a;
 
        channel tx_b -> rx_b;
 
        new sender(tx_a, 3);
 
        new middleman(rx_a, tx_b, 3);
 
        new receiver(rx_b, 3);
 
    }
 

	
 
    composite constructor() {
 
        new constructor_template<u16>();
 
        new constructor_template<u32>();
 
        new constructor_template<u64>();
 
        new constructor_template<s16>();
 
        new constructor_template<s32>();
 
        new constructor_template<s64>();
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(3, LOG_LEVEL, pd).unwrap();
 
    create_component(&rt, "", "constructor", no_args());
 
}
 

	
 
#[test]
 
fn test_simple_select() {
 
    let pd = ProtocolDescription::parse(b"
 
    func infinite_assert<T>(T val, T expected) -> () {
 
        while (val != expected) { print(\"nope!\"); }
 
        return ();
 
    }
 

	
 
    primitive receiver(in<u32> in_a, in<u32> in_b, u32 num_sends) {
 
        auto num_from_a = 0;
 
        auto num_from_b = 0;
 
        while (num_from_a + num_from_b < 2 * num_sends) {
src/runtime2/tests/transfer_ports.rs
Show inline comments
 
new file 100644
 
use super::*;
 

	
 
#[test]
 
fn test_transfer_precreated_port_without_using() {
 
    compile_and_create_component("
 
    primitive port_sender(out<in<u32>> tx) {
 
        channel a -> b;
 
        sync put(tx, b);
 
    }
 

	
 
    primitive port_receiver(in<in<u32>> rx) {
 
        sync auto a = get(rx);
 
    }
 

	
 
    composite constructor() {
 
        channel a -> b;
 
        new port_sender(a);
 
        new port_receiver(b);
 
    }
 
    ", "constructor", no_args());
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)