Changeset - 5e53e3e9d68e
[Not reviewed]
src/protocol/eval/mod.rs
Show inline comments
 
@@ -24,8 +24,7 @@ pub(crate) mod value;
 
pub(crate) mod store;
 
pub(crate) mod executor;
 
pub(crate) mod error;
 

	
 
pub use error::EvalError;
 
pub use value::{PortId, Value, ValueGroup};
 
pub use value::{PortId, Value, ValueId, ValueGroup};
 
pub use executor::{EvalContinuation, EvalResult, Prompt};
 

	
src/protocol/eval/value.rs
Show inline comments
 
@@ -261,6 +261,14 @@ impl ValueGroup {
 
        }
 
    }
 

	
 
    /// Retrieves a mutable reference to the value given its ValueId.
 
    pub(crate) fn get_value_mut(&mut self, id: ValueId) -> &mut Value {
 
        match id {
 
            ValueId::Stack(pos) => return &mut self.values[pos as usize],
 
            ValueId::Heap(heap_pos, pos) => return &mut self.regions[heap_pos as usize][pos as usize],
 
        }
 
    }
 

	
 
    fn provide_value(&self, value: &Value, to_store: &mut Store) -> Value {
 
        if let Some(from_heap_pos) = value.get_heap_pos() {
 
            let from_heap_pos = from_heap_pos as usize;
src/protocol/parser/mod.rs
Show inline comments
 
@@ -327,7 +327,13 @@ impl Parser {
 
        // Make sure directory exists
 
        let path = Path::new(&base_path);
 
        if !path.exists() {
 
            return Err(format!("std lib root directory '{}' does not exist", base_path));
 
            let from_env_message = if from_env {
 
                format!(" (retrieved from the environment variable '{}')", REOWOLF_PATH_ENV)
 
            } else {
 
                String::new()
 
            };
 

	
 
            return Err(format!("std lib root directory '{}'{} does not exist", base_path, from_env_message));
 
        }
 

	
 
        // Try to load all standard library files. We might need a more unified
src/runtime2/communication.rs
Show inline comments
 
use crate::protocol::eval::*;
 
use crate::protocol::eval::value::ValueId;
 
use super::runtime::*;
 
use super::component::*;
 

	
 
@@ -31,6 +32,7 @@ pub struct DataMessage {
 
    pub data_header: MessageDataHeader,
 
    pub sync_header: MessageSyncHeader,
 
    pub content: ValueGroup,
 
    pub ports: Vec<TransmittedPort>,
 
}
 

	
 
#[derive(Debug)]
 
@@ -61,6 +63,16 @@ pub struct MessageDataHeader {
 
    pub target_port: PortId,
 
}
 

	
 
#[derive(Debug)]
 
pub struct TransmittedPort {
 
    pub locations: Vec<ValueId>, // within `content`
 
    pub messages: Vec<DataMessage>, // owned by previous component
 
    pub peer_comp: CompId,
 
    pub peer_port: PortId,
 
    pub kind: PortKind,
 
    pub state: PortState,
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Sync messages
 
// -----------------------------------------------------------------------------
src/runtime2/component/component.rs
Show inline comments
 
/*
 
 * Default toolkit for creating components. Contains handlers for initiating and
 
 * responding to various events.
 
 */
 

	
 
use std::fmt::{Display as FmtDisplay, Result as FmtResult, Formatter};
 

	
 
use crate::protocol::eval::{Prompt, EvalError, ValueGroup, PortId as EvalPortId};
 
use crate::protocol::eval::{Prompt, EvalError, ValueGroup, Value, ValueId, PortId as EvalPortId};
 
use crate::protocol::*;
 
use crate::runtime2::*;
 
use crate::runtime2::communication::*;
 
@@ -78,6 +83,10 @@ pub(crate) enum CompMode {
 
    BlockedGet, // blocked because we need to receive a message on a particular port
 
    BlockedPut, // component is blocked because the port is blocked
 
    BlockedSelect, // waiting on message to complete the select statement
 
    PutPortsBlockedTransferredPorts, // sending a message with ports, those sent ports are (partly) blocked
 
    PutPortsBlockedAwaitingAcks, // sent out PPC message for blocking transferred ports, now awaiting Acks
 
    PutPortsBlockedSendingPort, // sending a message with ports, message sent through a still-blocked port
 
    NewComponentBlocked, // waiting until ports are in the appropriate state to create a new component
 
    StartExit, // temporary state: if encountered then we start the shutdown process.
 
    BusyExit, // temporary state: waiting for Acks for all the closed ports, potentially waiting for sync round to finish
 
    Exit, // exiting: shutdown process started, now waiting until the reference count drops to 0
 
@@ -88,8 +97,11 @@ impl CompMode {
 
        use CompMode::*;
 

	
 
        match self {
 
            Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect => true,
 
            NonSync | StartExit | BusyExit | Exit => false,
 
            Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect |
 
                PutPortsBlockedTransferredPorts |
 
                PutPortsBlockedAwaitingAcks |
 
                PutPortsBlockedSendingPort => true,
 
            NonSync | NewComponentBlocked | StartExit | BusyExit | Exit => false,
 
        }
 
    }
 

	
 
@@ -97,7 +109,11 @@ impl CompMode {
 
        use CompMode::*;
 

	
 
        match self {
 
            NonSync | Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect => false,
 
            NonSync | Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect |
 
            PutPortsBlockedTransferredPorts |
 
            PutPortsBlockedAwaitingAcks |
 
            PutPortsBlockedSendingPort |
 
                NewComponentBlocked => false,
 
            StartExit | BusyExit => true,
 
            Exit => false,
 
        }
 
@@ -138,6 +154,7 @@ pub(crate) struct CompExecState {
 
    pub mode: CompMode,
 
    pub mode_port: PortId, // valid if blocked on a port (put/get)
 
    pub mode_value: ValueGroup, // valid if blocked on a put
 
    pub mode_component: (ProcedureDefinitionId, TypeId),
 
    pub exit_reason: ExitReason, // valid if in StartExit/BusyExit/Exit mode
 
}
 

	
 
@@ -147,6 +164,7 @@ impl CompExecState {
 
            mode: CompMode::NonSync,
 
            mode_port: PortId::new_invalid(),
 
            mode_value: ValueGroup::default(),
 
            mode_component: (ProcedureDefinitionId::new_invalid(), TypeId::new_invalid()),
 
            exit_reason: ExitReason::Termination,
 
        }
 
    }
 
@@ -162,23 +180,42 @@ impl CompExecState {
 
        debug_assert!(self.mode_value.values.is_empty());
 
    }
 

	
 
    pub(crate) fn set_as_create_component_blocked(
 
        &mut self, proc_id: ProcedureDefinitionId, type_id: TypeId,
 
        arguments: ValueGroup
 
    ) {
 
        self.mode = CompMode::NewComponentBlocked;
 
        self.mode_value = arguments;
 
        self.mode_component = (proc_id, type_id);
 
    }
 

	
 
    pub(crate) fn is_blocked_on_get(&self, port: PortId) -> bool {
 
        return
 
            self.mode == CompMode::BlockedGet &&
 
            self.mode_port == port;
 
    }
 

	
 
    pub(crate) fn set_as_blocked_put(&mut self, port: PortId, value: ValueGroup) {
 
    pub(crate) fn set_as_blocked_put_without_ports(&mut self, port: PortId, value: ValueGroup) {
 
        self.mode = CompMode::BlockedPut;
 
        self.mode_port = port;
 
        self.mode_value = value;
 
    }
 

	
 
    pub(crate) fn is_blocked_on_put(&self, port: PortId) -> bool {
 
    pub(crate) fn set_as_blocked_put_with_ports(&mut self, port: PortId, value: ValueGroup) {
 
        self.mode = CompMode::PutPortsBlockedTransferredPorts;
 
        self.mode_port = port;
 
        self.mode_value = value;
 
    }
 

	
 
    pub(crate) fn is_blocked_on_put_without_ports(&self, port: PortId) -> bool {
 
        return
 
            self.mode == CompMode::BlockedPut &&
 
            self.mode_port == port;
 
    }
 

	
 
    pub(crate) fn is_blocked_on_create_component(&self) -> bool {
 
        return self.mode == CompMode::NewComponentBlocked;
 
    }
 
}
 

	
 
// TODO: Replace when implementing port sending. Should probably be incorporated
 
@@ -232,7 +269,8 @@ pub(crate) fn create_component(
 
pub(crate) fn default_send_data_message(
 
    exec_state: &mut CompExecState, transmitting_port_id: PortId,
 
    port_instruction: PortInstruction, value: ValueGroup,
 
    sched_ctx: &SchedulerCtx, consensus: &mut Consensus, comp_ctx: &mut CompCtx
 
    sched_ctx: &SchedulerCtx, consensus: &mut Consensus,
 
    control: &mut ControlLayer, comp_ctx: &mut CompCtx
 
) -> Result<CompScheduling, (PortInstruction, String)> {
 
    debug_assert_eq!(exec_state.mode, CompMode::Sync);
 

	
 
@@ -243,6 +281,9 @@ pub(crate) fn default_send_data_message(
 
    let port_info = comp_ctx.get_port(port_handle);
 
    debug_assert_eq!(port_info.kind, PortKind::Putter);
 

	
 
    let mut ports = Vec::new();
 
    find_ports_in_value_group(&value, &mut ports);
 

	
 
    if port_info.state.is_closed() {
 
        // Note: normally peer is eventually consistent, but if it has shut down
 
        // then we can be sure it is consistent (I think?)
 
@@ -250,13 +291,20 @@ pub(crate) fn default_send_data_message(
 
            port_info.last_instruction,
 
            format!("Cannot send on this port, as the peer (id:{}) has shut down", port_info.peer_comp_id.0)
 
        ))
 
    } else if !ports.is_empty() {
 
        start_send_message_with_ports(
 
            transmitting_port_id, port_instruction, value, exec_state,
 
            comp_ctx, sched_ctx, control
 
        )?;
 

	
 
        return Ok(CompScheduling::Sleep);
 
    } else if port_info.state.is_blocked() {
 
        // Port is blocked, so we cannot send
 
        exec_state.set_as_blocked_put(transmitting_port_id, value);
 
        exec_state.set_as_blocked_put_without_ports(transmitting_port_id, value);
 

	
 
        return Ok(CompScheduling::Sleep);
 
    } else {
 
        // Port is not blocked, so send to the peer
 
        // Port is not blocked and no ports to transfer: send to the peer
 
        let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 
        let annotated_message = consensus.annotate_data_message(comp_ctx, port_info, value);
 
@@ -337,7 +385,7 @@ pub(crate) enum GetResult {
 
/// instruction we're attempting on this port.
 
pub(crate) fn default_attempt_get(
 
    exec_state: &mut CompExecState, target_port: PortId, target_port_instruction: PortInstruction,
 
    inbox_main: &mut InboxMainRef, inbox_backup: &mut InboxBackup, sched_ctx: &SchedulerCtx,
 
    inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup, sched_ctx: &SchedulerCtx,
 
    comp_ctx: &mut CompCtx, control: &mut ControlLayer, consensus: &mut Consensus
 
) -> GetResult {
 
    let port_handle = comp_ctx.get_port_handle(target_port);
 
@@ -356,14 +404,15 @@ pub(crate) fn default_attempt_get(
 
    if let Some(message) = &inbox_main[port_index] {
 
        if consensus.try_receive_data_message(sched_ctx, comp_ctx, message) {
 
            // We're allowed to receive this message
 
            let message = inbox_main[port_index].take().unwrap();
 
            let mut message = inbox_main[port_index].take().unwrap();
 
            debug_assert_eq!(target_port, message.data_header.target_port);
 

	
 
            // Note: we can still run into an unrecoverable error when actually
 
            // receiving this message
 
            match default_handle_received_data_message(
 
                target_port, target_port_instruction, inbox_main, inbox_backup,
 
                comp_ctx, sched_ctx, control,
 
                target_port, target_port_instruction,
 
                &mut message, inbox_main, inbox_backup,
 
                comp_ctx, sched_ctx, control, consensus
 
            ) {
 
                Ok(()) => return GetResult::Received(message),
 
                Err(location_and_message) => return GetResult::Error(location_and_message)
 
@@ -389,14 +438,70 @@ pub(crate) fn default_attempt_get(
 
/// of full buffers (hence, will use the control layer to make sure the peer
 
/// will become unblocked).
 
pub(crate) fn default_handle_received_data_message(
 
    targeted_port: PortId, port_instruction: PortInstruction,
 
    inbox_main: &mut InboxMainRef, inbox_backup: &mut InboxBackup,
 
    comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx, control: &mut ControlLayer
 
    targeted_port: PortId, _port_instruction: PortInstruction, message: &mut DataMessage,
 
    inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup,
 
    comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx, control: &mut ControlLayer,
 
    consensus: &mut Consensus
 
) -> Result<(), (PortInstruction, String)> {
 
    let port_handle = comp_ctx.get_port_handle(targeted_port);
 
    let port_index = comp_ctx.get_port_index(port_handle);
 
    let slot = &mut inbox_main[port_index];
 
    debug_assert!(slot.is_none()); // because we've just received from it
 
    debug_assert!(inbox_main[port_index].is_none()); // because we've just received from it
 

	
 
    // If we received any ports, add them to the port tracking and inbox struct.
 
    // Then notify the peers that they can continue sending to this port, but
 
    // now at a new address.
 
    for received_port in &mut message.ports {
 
        // Transfer messages to main/backup inbox
 
        let _new_inbox_index = inbox_main.len();
 
        if !received_port.messages.is_empty() {
 
            inbox_main.push(Some(received_port.messages.remove(0)));
 
            inbox_backup.extend(received_port.messages.drain(..));
 
        } else {
 
            inbox_main.push(None);
 
        }
 

	
 
        // Create a new port locally
 
        let mut new_port_state = received_port.state;
 
        new_port_state.set(PortStateFlag::Received);
 
        let new_port_handle = comp_ctx.add_port(
 
            received_port.peer_comp, received_port.peer_port,
 
            received_port.kind, new_port_state
 
        );
 
        debug_assert_eq!(_new_inbox_index, comp_ctx.get_port_index(new_port_handle));
 
        comp_ctx.change_port_peer(sched_ctx, new_port_handle, Some(received_port.peer_comp));
 
        let new_port = comp_ctx.get_port(new_port_handle);
 

	
 
        // Add the port tho the consensus
 
        consensus.notify_received_port(_new_inbox_index, new_port_handle, comp_ctx);
 

	
 
        // Replace all references to the port in the received message
 
        for message_location in received_port.locations.iter().copied() {
 
            let value = message.content.get_value_mut(message_location);
 

	
 
            match value {
 
                Value::Input(_) => {
 
                    debug_assert_eq!(new_port.kind, PortKind::Getter);
 
                    *value = Value::Input(port_id_to_eval(new_port.self_id));
 
                },
 
                Value::Output(_) => {
 
                    debug_assert_eq!(new_port.kind, PortKind::Putter);
 
                    *value = Value::Output(port_id_to_eval(new_port.self_id));
 
                },
 
                _ => unreachable!(),
 
            }
 
        }
 

	
 
        // Let the peer know that the port can now be used
 
        let peer_handle = comp_ctx.get_peer_handle(new_port.peer_comp_id);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 

	
 
        peer_info.handle.send_message_logged(sched_ctx, Message::Control(ControlMessage{
 
            id: ControlId::new_invalid(),
 
            sender_comp_id: comp_ctx.id,
 
            target_port_id: Some(new_port.peer_port_id),
 
            content: ControlMessageContent::PortPeerChangedUnblock(new_port.self_id, comp_ctx.id)
 
        }), true);
 
    }
 

	
 
    // Modify last-known location where port instruction was retrieved
 
    let port_info = comp_ctx.get_port(port_handle);
 
@@ -410,7 +515,7 @@ pub(crate) fn default_handle_received_data_message(
 
            // One more message, place it in the slot
 
            let message = inbox_backup.remove(message_index);
 
            debug_assert!(comp_ctx.get_port(port_handle).state.is_blocked()); // since we're removing another message from the backup
 
            *slot = Some(message);
 
            inbox_main[port_index] = Some(message);
 

	
 
            return Ok(());
 
        }
 
@@ -437,11 +542,12 @@ pub(crate) fn default_handle_received_data_message(
 
/// state may all change.
 
pub(crate) fn default_handle_control_message(
 
    exec_state: &mut CompExecState, control: &mut ControlLayer, consensus: &mut Consensus,
 
    message: ControlMessage, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx
 
    message: ControlMessage, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx,
 
    inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup
 
) -> Result<(), (PortInstruction, String)> {
 
    match message.content {
 
        ControlMessageContent::Ack => {
 
            default_handle_ack(control, message.id, sched_ctx, comp_ctx);
 
            default_handle_ack(exec_state, control, message.id, sched_ctx, comp_ctx, consensus, inbox_main, inbox_backup)?;
 
        },
 
        ControlMessageContent::BlockPort => {
 
            // One of our messages was accepted, but the port should be
 
@@ -474,7 +580,7 @@ pub(crate) fn default_handle_control_message(
 
                // The two components (sender and this component) are closing
 
                // the channel at the same time. So we don't care about the
 
                // content of the `ClosePort` message.
 
                default_handle_ack(control, control_id, sched_ctx, comp_ctx);
 
                default_handle_ack(exec_state, control, control_id, sched_ctx, comp_ctx, consensus, inbox_main, inbox_backup)?;
 
            } else {
 
                // Respond to the message
 
                let port_info = comp_ctx.get_port(port_handle);
 
@@ -495,7 +601,7 @@ pub(crate) fn default_handle_control_message(
 

	
 
                if exec_state.mode.is_in_sync_block() {
 
                    let closed_during_sync_round = content.closed_in_sync_round && port_was_used;
 
                    let closed_before_sync_round = !content.closed_in_sync_round && !port_has_had_message;
 
                    let closed_before_sync_round = !content.closed_in_sync_round && !port_has_had_message && port_was_used;
 

	
 
                    if closed_during_sync_round || closed_before_sync_round {
 
                        return Err((
 
@@ -519,7 +625,10 @@ pub(crate) fn default_handle_control_message(
 
            debug_assert!(port_info.state.is_set(PortStateFlag::BlockedDueToFullBuffers));
 

	
 
            port_info.state.clear(PortStateFlag::BlockedDueToFullBuffers);
 
            default_handle_recently_unblocked_port(exec_state, consensus, port_handle, sched_ctx, comp_ctx);
 
            default_handle_recently_unblocked_port(
 
                exec_state, control, consensus, port_handle, sched_ctx,
 
                comp_ctx, inbox_main, inbox_backup
 
            )?;
 
        },
 
        ControlMessageContent::PortPeerChangedBlock => {
 
            // The peer of our port has just changed. So we are asked to
 
@@ -541,14 +650,16 @@ pub(crate) fn default_handle_control_message(
 
            let port_handle = comp_ctx.get_port_handle(port_to_change);
 
            let port_info = comp_ctx.get_port(port_handle);
 
            debug_assert!(port_info.state.is_set(PortStateFlag::BlockedDueToPeerChange));
 
            let old_peer_id = port_info.peer_comp_id;
 

	
 
            let port_info = comp_ctx.get_port_mut(port_handle);
 
            port_info.peer_port_id = new_port_id;
 

	
 
            port_info.state.clear(PortStateFlag::BlockedDueToPeerChange);
 
            comp_ctx.change_port_peer(sched_ctx, port_handle, Some(new_comp_id));
 
            default_handle_recently_unblocked_port(exec_state, consensus, port_handle, sched_ctx, comp_ctx);
 
            default_handle_recently_unblocked_port(
 
                exec_state, control, consensus, port_handle, sched_ctx,
 
                comp_ctx, inbox_main, inbox_backup
 
            )?;
 
        }
 
    }
 

	
 
@@ -603,6 +714,13 @@ pub(crate) fn default_handle_start_exit(
 
    sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus
 
) -> CompScheduling {
 
    debug_assert_eq!(exec_state.mode, CompMode::StartExit);
 
    for port_index in 0..comp_ctx.num_ports() {
 
        let port_info = comp_ctx.get_port_by_index_mut(port_index);
 
        if port_info.state.is_blocked() {
 
            return CompScheduling::Sleep;
 
        }
 
    }
 

	
 
    sched_ctx.info(&format!("Component starting exit (reason: {:?})", exec_state.exit_reason));
 
    exec_state.mode = CompMode::BusyExit;
 
    let exit_inside_sync = exec_state.exit_reason.is_in_sync();
 
@@ -617,7 +735,8 @@ pub(crate) fn default_handle_start_exit(
 
    // Iterating over ports by index to work around borrowing rules
 
    for port_index in 0..comp_ctx.num_ports() {
 
        let port = comp_ctx.get_port_by_index_mut(port_index);
 
        if port.state.is_closed() || port.close_at_sync_end {
 
        println!("DEBUG: Considering port:\n{:?}", port);
 
        if port.state.is_closed() || port.state.is_set(PortStateFlag::Transmitted) || port.close_at_sync_end {
 
            // Already closed, or in the process of being closed
 
            continue;
 
        }
 
@@ -691,6 +810,7 @@ pub(crate) fn default_handle_sync_decision(
 
            if port_info.close_at_sync_end {
 
                port_info.state.set(PortStateFlag::Closed);
 
            }
 
            port_info.state.clear(PortStateFlag::Received);
 
        }
 
        debug_assert_eq!(exec_state.mode, CompMode::SyncEnd);
 
        exec_state.mode = CompMode::NonSync;
 
@@ -706,6 +826,240 @@ pub(crate) fn default_handle_sync_decision(
 
    }
 
}
 

	
 

	
 
pub(crate) fn default_start_create_component(
 
    exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx,
 
    control: &mut ControlLayer, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup,
 
    definition_id: ProcedureDefinitionId, type_id: TypeId, arguments: ValueGroup
 
) {
 
    debug_assert_eq!(exec_state.mode, CompMode::NonSync);
 

	
 
    let mut transferred_ports = Vec::new();
 
    find_ports_in_value_group(&arguments, &mut transferred_ports);
 

	
 
    // Set execution state as waiting until we can create the component. If we
 
    // can do so right away, then we will.
 
    exec_state.set_as_create_component_blocked(definition_id, type_id, arguments);
 
    if ports_not_blocked(comp_ctx, &transferred_ports) {
 
        perform_create_component(exec_state, sched_ctx, comp_ctx, control, inbox_main, inbox_backup);
 
    }
 
}
 

	
 
/// Actually creates a component (and assumes that the caller made sure that
 
/// none of the ports are involved in a blocking operation).
 
pub(crate) fn perform_create_component(
 
    exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, instantiator_ctx: &mut CompCtx,
 
    control: &mut ControlLayer, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup
 
) {
 
    // Small internal utilities
 
    struct PortPair {
 
        instantiator_id: PortId,
 
        instantiator_handle: LocalPortHandle,
 
        created_id: PortId,
 
        created_handle: LocalPortHandle,
 
        is_open: bool,
 
    }
 

	
 
    // Retrieve ports from the arguments
 
    debug_assert_eq!(exec_state.mode, CompMode::NewComponentBlocked);
 

	
 
    let (procedure_id, procedure_type_id) = exec_state.mode_component;
 
    let mut arguments = exec_state.mode_value.take();
 
    let mut ports = Vec::new();
 
    find_ports_in_value_group(&arguments, &mut ports);
 
    debug_assert!(ports_not_blocked(instantiator_ctx, &ports));
 

	
 
    // Reserve a location for the new component
 
    let reservation = sched_ctx.runtime.start_create_component();
 
    let mut created_ctx = CompCtx::new(&reservation);
 

	
 
    let mut port_pairs = Vec::with_capacity(ports.len());
 

	
 
    // Go over all the ports that will be transferred. Since the ports will get
 
    // a new ID in the new component, we will take care of that here.
 
    for (port_location, instantiator_port_id) in &ports {
 
        // Retrieve port information from instantiator
 
        let instantiator_port_id = *instantiator_port_id;
 
        let instantiator_port_handle = instantiator_ctx.get_port_handle(instantiator_port_id);
 
        let instantiator_port = instantiator_ctx.get_port(instantiator_port_handle);
 

	
 
        // Create port at created component
 
        let created_port_handle = created_ctx.add_port(
 
            instantiator_port.peer_comp_id, instantiator_port.peer_port_id,
 
            instantiator_port.kind, instantiator_port.state
 
        );
 
        let created_port = created_ctx.get_port(created_port_handle);
 
        let created_port_id = created_port.self_id;
 

	
 
        // Modify port ID in the arguments to the new component and store them
 
        // for later access
 
        let is_open = instantiator_port.state.is_open();
 
        port_pairs.push(PortPair{
 
            instantiator_id: instantiator_port_id,
 
            instantiator_handle: instantiator_port_handle,
 
            created_id: created_port_id,
 
            created_handle: created_port_handle,
 
            is_open,
 
        });
 

	
 
        for location in port_location.iter().copied() {
 
            let value = arguments.get_value_mut(location);
 
            match value {
 
                Value::Input(id) => *id = port_id_to_eval(created_port_id),
 
                Value::Output(id) => *id = port_id_to_eval(created_port_id),
 
                _ => unreachable!(),
 
            }
 
        }
 
    }
 

	
 
    // For each of the ports in the newly created component we set the peer to
 
    // the correct value. We will not yet change the peer on the instantiator's
 
    // ports (as we haven't yet stored the new component in the runtime's
 
    // component storage)
 
    let mut created_component_has_remote_peers = false;
 
    for pair in port_pairs.iter() {
 
        let instantiator_port_info = instantiator_ctx.get_port(pair.instantiator_handle);
 
        let created_port_info = created_ctx.get_port_mut(pair.created_handle);
 

	
 
        if created_port_info.peer_comp_id == instantiator_ctx.id {
 
            // The peer of the created component's port seems to be the
 
            // instantiator.
 
            let created_port_peer_index = port_pairs.iter()
 
                .position(|v| v.instantiator_id == instantiator_port_info.peer_port_id);
 

	
 
            match created_port_peer_index {
 
                Some(created_port_peer_index) => {
 
                    // However, the peer port is also moved to the new
 
                    // component, so the complete channel is owned by the new
 
                    // component.
 
                    let peer_pair = &port_pairs[created_port_peer_index];
 
                    created_port_info.peer_port_id = peer_pair.created_id;
 
                    created_port_info.peer_comp_id = reservation.id();
 
                },
 
                None => {
 
                    // Peer port remains with instantiator. However, we cannot
 
                    // set the peer on the instantiator yet, because the new
 
                    // component has not yet been stored in the runtime's
 
                    // component storage. So we do this later
 
                    created_port_info.peer_comp_id = instantiator_ctx.id;
 
                    if pair.is_open {
 
                        created_ctx.change_port_peer(sched_ctx, pair.created_handle, Some(instantiator_ctx.id));
 
                    }
 
                }
 
            }
 
        } else {
 
            // Peer is a different component
 
            if pair.is_open {
 
                // And the port is still open, so we need to notify the peer
 
                let peer_handle = instantiator_ctx.get_peer_handle(created_port_info.peer_comp_id);
 
                let peer_info = instantiator_ctx.get_peer(peer_handle);
 
                created_ctx.change_port_peer(sched_ctx, pair.created_handle, Some(peer_info.id));
 
                created_component_has_remote_peers = true;
 
            }
 
        }
 
    }
 

	
 
    // Now we store the new component into the runtime's component storage using
 
    // the reservation.
 
    let component = create_component(
 
        &sched_ctx.runtime.protocol, procedure_id, procedure_type_id,
 
        arguments, port_pairs.len()
 
    );
 
    let (created_key, created_runtime_component) = sched_ctx.runtime.finish_create_component(
 
        reservation, component, created_ctx, false
 
    );
 
    let created_ctx = &mut created_runtime_component.ctx;
 
    let created_component = &mut created_runtime_component.component;
 
    created_component.on_creation(created_key.downgrade(), sched_ctx);
 

	
 
    // We now pass along the messages that the instantiator component still has
 
    // that belong to the new component. At the same time we'll take care of
 
    // setting the correct peer of the instantiator component
 
    for pair in port_pairs.iter() {
 
        // Transferring the messages and removing the port from the
 
        // instantiator component
 
        let instantiator_port_index = instantiator_ctx.get_port_index(pair.instantiator_handle);
 
        instantiator_ctx.change_port_peer(sched_ctx, pair.instantiator_handle, None);
 
        instantiator_ctx.remove_port(pair.instantiator_handle);
 

	
 
        if let Some(mut message) = inbox_main[instantiator_port_index].take() {
 
            message.data_header.target_port = pair.created_id;
 
            created_component.adopt_message(created_ctx, message);
 
        }
 

	
 
        let mut message_index = 0;
 
        while message_index < inbox_backup.len() {
 
            let message = &inbox_backup[message_index];
 
            if message.data_header.target_port == pair.instantiator_id {
 
                // Transfer the message
 
                let mut message = inbox_backup.remove(message_index);
 
                message.data_header.target_port = pair.created_id;
 
                created_component.adopt_message(created_ctx, message);
 
            } else {
 
                // Message does not belong to the port pair that we're
 
                // transferring to the new component.
 
                message_index += 1;
 
            }
 
        }
 

	
 
        // Here we take care of the case where the instantiator previously owned
 
        // both ends of the channel, but has transferred one port to the new
 
        // component (hence creating a channel between the instantiator
 
        // component and the new component).
 
        let created_port_info = created_ctx.get_port(pair.created_handle);
 
        if pair.is_open && created_port_info.peer_comp_id == instantiator_ctx.id {
 
            // Note: the port we're receiving here belongs to the instantiator
 
            // and is NOT in the "port_pairs" array.
 
            let instantiator_port_handle = instantiator_ctx.get_port_handle(created_port_info.peer_port_id);
 
            let instantiator_port_info = instantiator_ctx.get_port_mut(instantiator_port_handle);
 
            instantiator_port_info.peer_port_id = created_port_info.self_id;
 
            instantiator_ctx.change_port_peer(sched_ctx, instantiator_port_handle, Some(created_ctx.id));
 
        }
 
    }
 

	
 
    // Finally: if we did move ports around whose peers are different
 
    // components, then we'll initiate the appropriate protocol to notify them.
 
    if created_component_has_remote_peers {
 
        let schedule_entry_id = control.add_schedule_entry(created_ctx.id);
 
        for pair in &port_pairs {
 
            let port_info = created_ctx.get_port(pair.created_handle);
 
            if pair.is_open && port_info.peer_comp_id != instantiator_ctx.id && port_info.peer_comp_id != created_ctx.id {
 
                // Peer component is not the instantiator, and it is not the
 
                // new component itself
 
                let message = control.add_reroute_entry(
 
                    instantiator_ctx.id, port_info.peer_port_id, port_info.peer_comp_id,
 
                    pair.instantiator_id, pair.created_id, created_ctx.id,
 
                    schedule_entry_id
 
                );
 
                let peer_handle = created_ctx.get_peer_handle(port_info.peer_comp_id);
 
                let peer_info = created_ctx.get_peer(peer_handle);
 

	
 
                peer_info.handle.send_message_logged(sched_ctx, message, true);
 
            }
 
        }
 
    } else {
 
        // We can schedule the component immediately, we do not have to wait
 
        // for any peers: there are none.
 
        sched_ctx.runtime.enqueue_work(created_key);
 
    }
 

	
 
    exec_state.mode = CompMode::NonSync;
 
    exec_state.mode_component = (ProcedureDefinitionId::new_invalid(), TypeId::new_invalid());
 
}
 

	
 
pub(crate) fn ports_not_blocked(comp_ctx: &CompCtx, ports: &EncounteredPorts) -> bool {
 
    for (_port_locations, port_id) in ports {
 
        let port_handle = comp_ctx.get_port_handle(*port_id);
 
        let port_info = comp_ctx.get_port(port_handle);
 

	
 
        if port_info.state.is_blocked_due_to_port_change() {
 
            return false;
 
        }
 
    }
 

	
 
    return true;
 
}
 

	
 
/// Performs the default action of printing the provided error, and then putting
 
/// the component in the state where it will shut down. Only to be used for
 
/// builtin components: their error message construction is simpler (and more
 
@@ -736,13 +1090,185 @@ pub(crate) fn default_handle_exit(_exec_state: &CompExecState) -> CompScheduling
 
// Internal messaging/state utilities
 
// -----------------------------------------------------------------------------
 

	
 
/// Sends a message without any transmitted ports. Does not check if sending
 
/// is actually valid.
 
fn send_message_without_ports(
 
    sending_port_handle: LocalPortHandle, value: ValueGroup,
 
    comp_ctx: &CompCtx, sched_ctx: &SchedulerCtx, consensus: &mut Consensus,
 
) {
 
    let port_info = comp_ctx.get_port(sending_port_handle);
 
    debug_assert!(port_info.state.can_send());
 
    let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 
    let peer_info = comp_ctx.get_peer(peer_handle);
 

	
 
    let annotated_message = consensus.annotate_data_message(comp_ctx, port_info, value);
 
    peer_info.handle.send_message_logged(sched_ctx, Message::Data(annotated_message), true);
 
}
 

	
 
/// Prepares sending a message that contains ports. Only once a particular
 
/// protocol has completed (where we notify all the peers that the ports will
 
/// be transferred) will we actually send the message to the recipient.
 
fn start_send_message_with_ports(
 
    sending_port_id: PortId, sending_port_instruction: PortInstruction, value: ValueGroup,
 
    exec_state: &mut CompExecState, comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx,
 
    control: &mut ControlLayer
 
) -> Result<(), (PortInstruction, String)> {
 
    debug_assert_eq!(exec_state.mode, CompMode::Sync); // busy in sync, trying to send
 

	
 
    // Retrieve ports we're going to transfer
 
    let sending_port_handle = comp_ctx.get_port_handle(sending_port_id);
 
    let sending_port_info = comp_ctx.get_port_mut(sending_port_handle);
 
    sending_port_info.last_instruction = sending_port_instruction;
 

	
 
    let mut transmit_ports = Vec::new();
 
    find_ports_in_value_group(&value, &mut transmit_ports);
 
    debug_assert!(!transmit_ports.is_empty()); // required from caller
 

	
 
    // Enter the state where we'll wait until all transferred ports are not
 
    // blocked.
 
    exec_state.set_as_blocked_put_with_ports(sending_port_id, value);
 

	
 
    if ports_not_blocked(comp_ctx, &transmit_ports) {
 
        // Ports are not blocked, so we can send them right away.
 
        perform_send_message_with_ports_notify_peers(
 
            exec_state, comp_ctx, sched_ctx, control, transmit_ports
 
        )?;
 
    } // else: wait until they become unblocked
 

	
 
    return Ok(())
 
}
 

	
 
fn perform_send_message_with_ports_notify_peers(
 
    exec_state: &mut CompExecState, comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx,
 
    control: &mut ControlLayer, transmit_ports: EncounteredPorts
 
) -> Result<(), (PortInstruction, String)> {
 
    // Check we're in the correct state in debug mode
 
    debug_assert_eq!(exec_state.mode, CompMode::PutPortsBlockedTransferredPorts);
 
    debug_assert!(ports_not_blocked(comp_ctx, &transmit_ports));
 

	
 
    // Set up the final Ack that triggers us to send our final message
 
    let unblock_put_control_id = control.add_unblock_put_with_ports_entry();
 
    for (_, port_id) in &transmit_ports {
 
        let transmit_port_handle = comp_ctx.get_port_handle(*port_id);
 
        let transmit_port_info = comp_ctx.get_port_mut(transmit_port_handle);
 
        let peer_comp_id = transmit_port_info.peer_comp_id;
 
        let peer_port_id = transmit_port_info.peer_port_id;
 

	
 

	
 
        // Note: we checked earlier that we are currently in sync mode. Now we
 
        // will check if we've already used the port we're about to transmit.
 
        if !transmit_port_info.last_instruction.is_none() {
 
            let sending_port_handle = comp_ctx.get_port_handle(exec_state.mode_port);
 
            let sending_port_instruction = comp_ctx.get_port(sending_port_handle).last_instruction;
 
            return Err((
 
                sending_port_instruction,
 
                String::from("Cannot transmit one of the ports in this message, as it is used in this sync round")
 
            ));
 
        }
 

	
 
        if transmit_port_info.state.is_set(PortStateFlag::Transmitted) {
 
            let sending_port_handle = comp_ctx.get_port_handle(exec_state.mode_port);
 
            let sending_port_instruction = comp_ctx.get_port(sending_port_handle).last_instruction;
 
            return Err((
 
                sending_port_instruction,
 
                String::from("Cannot transmit one of the ports in this message, as that port is already transmitted")
 
            ));
 
        }
 

	
 
        // Set the flag for transmission
 
        transmit_port_info.state.set(PortStateFlag::Transmitted);
 

	
 
        // Block the peer of the port
 
        let message = control.create_port_transfer_message(unblock_put_control_id, comp_ctx.id, peer_port_id);
 
        println!("DEBUG: Port transfer message\nControl ID: {:?}\nMessage: {:?}", unblock_put_control_id, message);
 
        let peer_handle = comp_ctx.get_peer_handle(peer_comp_id);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 

	
 
        peer_info.handle.send_message_logged(sched_ctx, message, true);
 
    }
 

	
 
    // We've set up the protocol, once all the PPC's are blocked we are supposed
 
    // to transfer the message to the recipient. So store it temporarily
 
    exec_state.mode = CompMode::PutPortsBlockedAwaitingAcks;
 

	
 
    return Ok(());
 
}
 

	
 
/// Performs the transmission of a data message that contains ports. These were
 
/// all stored in the component's execution state by the
 
/// `prepare_send_message_with_ports` function. Port must be ready to send!
 
fn perform_send_message_with_ports_to_receiver(
 
    exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus,
 
    inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup
 
) -> Result<(), (PortInstruction, String)> {
 
    debug_assert_eq!(exec_state.mode, CompMode::PutPortsBlockedSendingPort);
 

	
 
    // Find all ports again
 
    let mut transmit_ports = Vec::new();
 
    find_ports_in_value_group(&exec_state.mode_value, &mut transmit_ports);
 

	
 
    // Retrieve the port over which we're going to send the message
 
    let port_handle = comp_ctx.get_port_handle(exec_state.mode_port);
 
    let port_info = comp_ctx.get_port(port_handle);
 

	
 
    if !port_info.state.is_open() {
 
        return Err((
 
            port_info.last_instruction,
 
            String::from("cannot send over this port, as it is closed")
 
        ));
 
    }
 

	
 
    debug_assert!(!port_info.state.is_blocked_due_to_port_change()); // caller should have checked this
 
    let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 

	
 
    // Change state back to its default
 
    exec_state.mode = CompMode::Sync;
 
    let message_value = exec_state.mode_value.take();
 
    exec_state.mode_port = PortId::new_invalid();
 

	
 
    // Annotate the data message
 
    let mut annotated_message = consensus.annotate_data_message(comp_ctx, port_info, message_value);
 

	
 
    // And further enhance the message by adding data about the ports that are
 
    // being transferred
 
    for (port_locations, transmit_port_id) in transmit_ports {
 
        let transmit_port_handle = comp_ctx.get_port_handle(transmit_port_id);
 
        let transmit_port_info = comp_ctx.get_port(transmit_port_handle);
 

	
 
        let transmit_messages = take_port_messages(comp_ctx, transmit_port_id, inbox_main, inbox_backup);
 

	
 
        let mut transmit_port_state = transmit_port_info.state;
 
        debug_assert!(transmit_port_state.is_set(PortStateFlag::Transmitted));
 
        transmit_port_state.clear(PortStateFlag::Transmitted);
 

	
 
        annotated_message.ports.push(TransmittedPort{
 
            locations: port_locations,
 
            messages: transmit_messages,
 
            peer_comp: transmit_port_info.peer_comp_id,
 
            peer_port: transmit_port_info.peer_port_id,
 
            kind: transmit_port_info.kind,
 
            state: transmit_port_state
 
        });
 

	
 
        comp_ctx.change_port_peer(sched_ctx, transmit_port_handle, None);
 
    }
 

	
 
    // And finally, send the message to the peer
 
    let peer_info = comp_ctx.get_peer(peer_handle);
 
    peer_info.handle.send_message_logged(sched_ctx, Message::Data(annotated_message), true);
 

	
 
    return Ok(());
 
}
 

	
 
/// Handles an `Ack` for the control layer.
 
fn default_handle_ack(
 
    control: &mut ControlLayer, control_id: ControlId,
 
    sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx
 
) {
 
    exec_state: &mut CompExecState, control: &mut ControlLayer, control_id: ControlId,
 
    sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus,
 
    inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup
 
) -> Result<(), (PortInstruction, String)>{
 
    // Since an `Ack` may cause another one, handle them in a loop
 
    let mut to_ack = control_id;
 

	
 
    loop {
 
        let (action, new_to_ack) = control.handle_ack(to_ack, sched_ctx, comp_ctx);
 
        match action {
 
@@ -765,6 +1291,22 @@ fn default_handle_ack(
 
                let _should_remove = handle.decrement_users();
 
                debug_assert!(_should_remove.is_none());
 
            },
 
            AckAction::UnblockPutWithPorts => {
 
                // Send the message (containing ports) stored in the component
 
                // execution state to the recipient
 
                println!("DEBUG: Unblocking put with ports");
 
                debug_assert_eq!(exec_state.mode, CompMode::PutPortsBlockedAwaitingAcks);
 
                exec_state.mode = CompMode::PutPortsBlockedSendingPort;
 
                let port_handle = comp_ctx.get_port_handle(exec_state.mode_port);
 

	
 
                // Little bit of a hack, we didn't really unblock the sending
 
                // port, but this will mesh nicely with waiting for the sending
 
                // port to become unblocked.
 
                default_handle_recently_unblocked_port(
 
                    exec_state, control, consensus, port_handle, sched_ctx,
 
                    comp_ctx, inbox_main, inbox_backup
 
                )?;
 
            },
 
            AckAction::None => {}
 
        }
 

	
 
@@ -773,6 +1315,8 @@ fn default_handle_ack(
 
            None => break,
 
        }
 
    }
 

	
 
    return Ok(());
 
}
 

	
 
/// Little helper for sending the most common kind of `Ack`
 
@@ -790,16 +1334,26 @@ fn default_send_ack(
 
}
 

	
 
/// Handles the unblocking of a putter port. In case there is a pending message
 
/// on that port then it will be sent.
 
/// on that port then it will be sent. There are two reasons for calling this
 
/// function: either a port was blocked (i.e. the Blocked state flag was
 
/// cleared), or the component is ready to send a message containing ports
 
/// (stored in the execution state). In this latter case we might still have
 
/// a blocked port.
 
fn default_handle_recently_unblocked_port(
 
    exec_state: &mut CompExecState, consensus: &mut Consensus,
 
    exec_state: &mut CompExecState, control: &mut ControlLayer, consensus: &mut Consensus,
 
    port_handle: LocalPortHandle, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx,
 
) {
 
    inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup
 
) -> Result<(), (PortInstruction, String)> {
 
    let port_info = comp_ctx.get_port_mut(port_handle);
 
    let port_id = port_info.self_id;
 
    debug_assert!(!port_info.state.is_blocked()); // should have been done by the caller
 

	
 
    if exec_state.is_blocked_on_put(port_id) {
 
    if port_info.state.is_blocked() {
 
        // Port is still blocked. We wait until the next control message where
 
        // we unblock the port.
 
        return Ok(());
 
    }
 

	
 
    if exec_state.is_blocked_on_put_without_ports(port_id) {
 
        // Annotate the message that we're going to send
 
        let port_info = comp_ctx.get_port(port_handle); // for immutable access
 
        debug_assert_eq!(port_info.kind, PortKind::Putter);
 
@@ -811,9 +1365,36 @@ fn default_handle_recently_unblocked_port(
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 
        peer_info.handle.send_message_logged(sched_ctx, Message::Data(to_send), true);
 

	
 
        exec_state.mode = CompMode::Sync; // because we're blocked on a `put`, we must've started in the sync state.
 
        // Return to the regular execution mode
 
        exec_state.mode = CompMode::Sync;
 
        exec_state.mode_port = PortId::new_invalid();
 
    } else if exec_state.mode == CompMode::PutPortsBlockedTransferredPorts {
 
        // We are waiting until all of the transferred ports become unblocked,
 
        // check so here.
 
        let mut transfer_ports = Vec::new();
 
        find_ports_in_value_group(&exec_state.mode_value, &mut transfer_ports);
 
        if ports_not_blocked(comp_ctx, &transfer_ports) {
 
            perform_send_message_with_ports_notify_peers(
 
                exec_state, comp_ctx, sched_ctx, control, transfer_ports
 
            )?;
 
        }
 
    } else if exec_state.mode == CompMode::PutPortsBlockedSendingPort && exec_state.mode_port == port_id {
 
        // We checked above that the port became unblocked, so we can send the
 
        // message
 
        perform_send_message_with_ports_to_receiver(
 
            exec_state, sched_ctx, comp_ctx, consensus, inbox_main, inbox_backup
 
        )?;
 
    } else if exec_state.is_blocked_on_create_component() {
 
        let mut ports = Vec::new();
 
        find_ports_in_value_group(&exec_state.mode_value, &mut ports);
 
        if ports_not_blocked(comp_ctx, &ports) {
 
            perform_create_component(
 
                exec_state, sched_ctx, comp_ctx, control, inbox_main, inbox_backup
 
            );
 
        }
 
    }
 

	
 
    return Ok(());
 
}
 

	
 
#[inline]
 
@@ -825,3 +1406,77 @@ pub(crate) fn port_id_from_eval(port_id: EvalPortId) -> PortId {
 
pub(crate) fn port_id_to_eval(port_id: PortId) -> EvalPortId {
 
    return EvalPortId{ id: port_id.0 };
 
}
 

	
 
// TODO: Optimize double vec
 
type EncounteredPorts = Vec<(Vec<ValueId>, PortId)>;
 

	
 
/// Recursively goes through the value group, attempting to find ports.
 
/// Duplicates will only be added once.
 
pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut EncounteredPorts) {
 
    // Helper to check a value for a port and recurse if needed.
 
    fn find_port_in_value(group: &ValueGroup, value: &Value, value_location: ValueId, ports: &mut EncounteredPorts) {
 
        match value {
 
            Value::Input(port_id) | Value::Output(port_id) => {
 
                // This is an actual port
 
                let cur_port = PortId(port_id.id);
 
                for prev_port in ports.iter_mut() {
 
                    if prev_port.1 == cur_port {
 
                        // Already added
 
                        prev_port.0.push(value_location);
 
                        return;
 
                    }
 
                }
 

	
 
                ports.push((vec![value_location], cur_port));
 
            },
 
            Value::Array(heap_pos) |
 
            Value::Message(heap_pos) |
 
            Value::String(heap_pos) |
 
            Value::Struct(heap_pos) |
 
            Value::Union(_, heap_pos) => {
 
                // Reference to some dynamic thing which might contain ports,
 
                // so recurse
 
                let heap_region = &group.regions[*heap_pos as usize];
 
                for (value_index, embedded_value) in heap_region.iter().enumerate() {
 
                    let value_location = ValueId::Heap(*heap_pos, value_index as u32);
 
                    find_port_in_value(group, embedded_value, value_location, ports);
 
                }
 
            },
 
            _ => {}, // values we don't care about
 
        }
 
    }
 

	
 
    // Clear the ports, then scan all the available values
 
    ports.clear();
 
    for (value_index, value) in value_group.values.iter().enumerate() {
 
        find_port_in_value(value_group, value, ValueId::Stack(value_index as u32), ports);
 
    }
 
}
 

	
 
/// Goes through the inbox of a component and takes out all the messages that
 
/// are targeted at a specific port
 
pub(crate) fn take_port_messages(
 
    comp_ctx: &CompCtx, port_id: PortId,
 
    inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup
 
) -> Vec<DataMessage> {
 
    let mut messages = Vec::new();
 
    let port_handle = comp_ctx.get_port_handle(port_id);
 
    let port_index = comp_ctx.get_port_index(port_handle);
 

	
 
    if let Some(message) = inbox_main[port_index].take() {
 
        messages.push(message);
 
    }
 

	
 
    let mut message_index = 0;
 
    while message_index < inbox_backup.len() {
 
        let message = &inbox_backup[message_index];
 
        if message.data_header.target_port == port_id {
 
            let message = inbox_backup.remove(message_index);
 
            messages.push(message);
 
        } else {
 
            message_index += 1;
 
        }
 
    }
 

	
 
    return messages;
 
}
 
\ No newline at end of file
src/runtime2/component/component_context.rs
Show inline comments
 
@@ -14,6 +14,15 @@ pub enum PortInstruction {
 
    SourceLocation(ExpressionId),
 
}
 

	
 
impl PortInstruction {
 
    pub fn is_none(&self) -> bool {
 
        match self {
 
            PortInstruction::None => return true,
 
            _ => return false,
 
        }
 
    }
 
}
 

	
 
/// Directionality of a port
 
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
 
pub enum PortKind {
 
@@ -29,6 +38,8 @@ pub enum PortStateFlag {
 
    Closed = 0x01, // If not closed, then the port is open
 
    BlockedDueToPeerChange = 0x02, // busy changing peers, hence use of port is temporarily blocked
 
    BlockedDueToFullBuffers = 0x04,
 
    Transmitted = 0x08, // Transmitted, so cannot be used anymore
 
    Received = 0x10, // Received, so cannot be used yet, only after the sync round
 
}
 

	
 
#[derive(Copy, Clone)]
 
@@ -48,6 +59,14 @@ impl PortState {
 
        return !self.is_closed();
 
    }
 

	
 
    #[inline]
 
    pub fn can_send(&self) -> bool {
 
        return
 
            !self.is_set(PortStateFlag::Closed) &&
 
            !self.is_set(PortStateFlag::Transmitted) &&
 
            !self.is_set(PortStateFlag::Received);
 
    }
 

	
 
    #[inline]
 
    pub fn is_closed(&self) -> bool {
 
        return self.is_set(PortStateFlag::Closed);
 
@@ -60,6 +79,11 @@ impl PortState {
 
            self.is_set(PortStateFlag::BlockedDueToFullBuffers);
 
    }
 

	
 
    #[inline]
 
    pub fn is_blocked_due_to_port_change(&self) -> bool {
 
        return self.is_set(PortStateFlag::BlockedDueToPeerChange);
 
    }
 

	
 
    // lower-level utils
 
    #[inline]
 
    pub fn set(&mut self, flag: PortStateFlag) {
 
@@ -85,7 +109,8 @@ impl Debug for PortState {
 
        for (flag_name, flag_value) in &[
 
            ("closed", Closed),
 
            ("blocked_peer_change", BlockedDueToPeerChange),
 
            ("blocked_full_buffers", BlockedDueToFullBuffers)
 
            ("blocked_full_buffers", BlockedDueToFullBuffers),
 
            ("transmitted", Transmitted),
 
        ] {
 
            s.field(flag_name, &self.is_set(*flag_value));
 
        }
 
@@ -176,7 +201,7 @@ impl CompCtx {
 
        return Channel{ putter_id, getter_id };
 
    }
 

	
 
    /// Adds a new port. Make sure to call `add_peer` afterwards.
 
    /// Adds a new port. Make sure to call `change_peer` afterwards.
 
    pub(crate) fn add_port(&mut self, peer_comp_id: CompId, peer_port_id: PortId, kind: PortKind, state: PortState) -> LocalPortHandle {
 
        let self_id = PortId(self.take_port_id());
 
        self.ports.push(Port{
 
@@ -189,7 +214,28 @@ impl CompCtx {
 
        return LocalPortHandle(self_id);
 
    }
 

	
 
    /// Removes a port. Make sure you called `remove_peer` first.
 
    /// Adds a self-reference. Called by the runtime/scheduler
 
    pub(crate) fn add_self_reference(&mut self, self_handle: CompHandle) {
 
        debug_assert_eq!(self.id, self_handle.id());
 
        debug_assert!(self.get_peer_index_by_id(self.id).is_none());
 
        self.peers.push(Peer{
 
            id: self.id,
 
            num_associated_ports: 0,
 
            handle: self_handle
 
        });
 
    }
 

	
 
    /// Removes a self-reference. Called by the runtime/scheduler
 
    pub(crate) fn remove_self_reference(&mut self) -> Option<CompKey> {
 
        let self_index = self.get_peer_index_by_id(self.id).unwrap();
 
        let peer = &mut self.peers[self_index];
 
        let maybe_comp_key = peer.handle.decrement_users();
 
        self.peers.remove(self_index);
 

	
 
        return maybe_comp_key;
 
    }
 

	
 
    /// Removes a port. Make sure you called `change_peer` first.
 
    pub(crate) fn remove_port(&mut self, port_handle: LocalPortHandle) -> Port {
 
        let port_index = self.must_get_port_index(port_handle);
 
        let port = self.ports.remove(port_index);
 
@@ -307,11 +353,6 @@ impl CompCtx {
 
    // Local utilities
 
    // -------------------------------------------------------------------------
 

	
 
    #[inline]
 
    fn requires_peer_reference(port: &Port, self_id: CompId, required_if_closed: bool) -> bool {
 
        return (!port.state.is_closed() || required_if_closed) && port.peer_comp_id != self_id;
 
    }
 

	
 
    fn must_get_port_index(&self, handle: LocalPortHandle) -> usize {
 
        for (index, port) in self.ports.iter().enumerate() {
 
            if port.self_id == handle.0 {
src/runtime2/component/component_internet.rs
Show inline comments
 
@@ -110,7 +110,7 @@ impl Component for ComponentTcpClient {
 
            Message::Control(message) => {
 
                if let Err(location_and_message) = component::default_handle_control_message(
 
                    &mut self.exec_state, &mut self.control, &mut self.consensus,
 
                    message, sched_ctx, comp_ctx
 
                    message, sched_ctx, comp_ctx, &mut self.inbox_main, &mut self.inbox_backup
 
                ) {
 
                    component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message);
 
                }
 
@@ -125,7 +125,11 @@ impl Component for ComponentTcpClient {
 
        sched_ctx.info(&format!("Running component ComponentTcpClient (mode: {:?}, sync state: {:?})", self.exec_state.mode, self.sync_state));
 

	
 
        match self.exec_state.mode {
 
            CompMode::BlockedSelect => {
 
            CompMode::BlockedSelect |
 
            CompMode::PutPortsBlockedTransferredPorts |
 
            CompMode::PutPortsBlockedAwaitingAcks |
 
            CompMode::PutPortsBlockedSendingPort |
 
            CompMode::NewComponentBlocked => {
 
                // Not possible: we never enter this state
 
                unreachable!();
 
            },
 
@@ -239,7 +243,11 @@ impl Component for ComponentTcpClient {
 
                            Ok(num_received) => {
 
                                self.byte_buffer.resize(num_received, 0);
 
                                let message_content = self.bytes_to_data_message_content(&self.byte_buffer);
 
                                let send_result = component::default_send_data_message(&mut self.exec_state, self.pdl_output_port_id, PortInstruction::NoSource, message_content, sched_ctx, &mut self.consensus, comp_ctx);
 
                                let send_result = component::default_send_data_message(
 
                                    &mut self.exec_state, self.pdl_output_port_id, PortInstruction::NoSource,
 
                                    message_content, sched_ctx, &mut self.consensus, &mut self.control, comp_ctx
 
                                );
 

	
 
                                if let Err(location_and_message) = send_result {
 
                                    component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message);
 
                                    return CompScheduling::Immediate;
src/runtime2/component/component_pdl.rs
Show inline comments
 
@@ -218,7 +218,7 @@ pub(crate) struct CompPDL {
 
    // Should be same length as the number of ports. Corresponding indices imply
 
    // message is intended for that port.
 
    pub inbox_main: InboxMain,
 
    pub inbox_backup: Vec<DataMessage>,
 
    pub inbox_backup: InboxBackup,
 
}
 

	
 
impl Component for CompPDL {
 
@@ -241,15 +241,17 @@ impl Component for CompPDL {
 
    }
 

	
 
    fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, mut message: Message) {
 
        // sched_ctx.log(&format!("handling message: {:?}", message));
 
        sched_ctx.debug(&format!("handling message: {:?}", message));
 
        if let Some(new_target) = self.control.should_reroute(&mut message) {
 
            let mut target = sched_ctx.runtime.get_component_public(new_target); // TODO: @NoDirectHandle
 
            sched_ctx.debug(&format!("rerouting to: {:?}", new_target));
 
            target.send_message_logged(sched_ctx, message, false); // not waking up: we schedule once we've received all PortPeerChanged Acks
 
            let _should_remove = target.decrement_users();
 
            debug_assert!(_should_remove.is_none());
 
            return;
 
        }
 

	
 
        sched_ctx.debug("handling message myself");
 
        match message {
 
            Message::Data(message) => {
 
                self.handle_incoming_data_message(sched_ctx, comp_ctx, message);
 
@@ -257,7 +259,7 @@ impl Component for CompPDL {
 
            Message::Control(message) => {
 
                if let Err(location_and_message) = component::default_handle_control_message(
 
                    &mut self.exec_state, &mut self.control, &mut self.consensus,
 
                    message, sched_ctx, comp_ctx
 
                    message, sched_ctx, comp_ctx, &mut self.inbox_main, &mut self.inbox_backup
 
                ) {
 
                    self.handle_generic_component_error(sched_ctx, location_and_message);
 
                }
 
@@ -282,7 +284,10 @@ impl Component for CompPDL {
 
            CompMode::NonSync | CompMode::Sync => {
 
                // continue and run PDL code
 
            },
 
            CompMode::SyncEnd | CompMode::BlockedGet | CompMode::BlockedPut | CompMode::BlockedSelect => {
 
            CompMode::SyncEnd | CompMode::BlockedGet |
 
            CompMode::BlockedPut | CompMode::BlockedSelect | CompMode::PutPortsBlockedTransferredPorts |
 
            CompMode::PutPortsBlockedAwaitingAcks | CompMode::PutPortsBlockedSendingPort |
 
            CompMode::NewComponentBlocked => {
 
                return CompScheduling::Sleep;
 
            }
 
            CompMode::StartExit => return component::default_handle_start_exit(
 
@@ -342,7 +347,7 @@ impl Component for CompPDL {
 
                let send_result = component::default_send_data_message(
 
                    &mut self.exec_state, target_port_id,
 
                    PortInstruction::SourceLocation(expr_id), value,
 
                    sched_ctx, &mut self.consensus, comp_ctx
 
                    sched_ctx, &mut self.consensus, &mut self.control, comp_ctx
 
                );
 
                if let Err(location_and_message) = send_result {
 
                    self.handle_generic_component_error(sched_ctx, location_and_message);
 
@@ -420,14 +425,14 @@ impl Component for CompPDL {
 
            },
 
            EC::NewComponent(definition_id, type_id, arguments) => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::NonSync);
 
                self.create_component_and_transfer_ports(
 
                    sched_ctx, comp_ctx,
 
                component::default_start_create_component(
 
                    &mut self.exec_state, sched_ctx, comp_ctx, &mut self.control,
 
                    &mut self.inbox_main, &mut self.inbox_backup,
 
                    definition_id, type_id, arguments
 
                );
 
                return CompScheduling::Requeue;
 
            },
 
            EC::NewChannel => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::NonSync);
 
                debug_assert!(self.exec_ctx.stmt.is_none());
 
                let channel = comp_ctx.create_channel();
 
                self.exec_ctx.stmt = ExecStmt::CreatedChannel((
 
@@ -552,343 +557,4 @@ impl CompPDL {
 

	
 
        self.exec_state.set_as_start_exit(exit_reason);
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Handling ports
 
    // -------------------------------------------------------------------------
 

	
 
    /// Creates a new component and transfers ports. Because of the stepwise
 
    /// process in which memory is allocated, ports are transferred, messages
 
    /// are exchanged, component lifecycle methods are called, etc. This
 
    /// function facilitates a lot of implicit assumptions (e.g. when the
 
    /// `Component::on_creation` method is called, the component is already
 
    /// registered at the runtime).
 
    fn create_component_and_transfer_ports(
 
        &mut self,
 
        sched_ctx: &SchedulerCtx, creator_ctx: &mut CompCtx,
 
        definition_id: ProcedureDefinitionId, type_id: TypeId, mut arguments: ValueGroup
 
    ) {
 
        struct PortPair{
 
            creator_handle: LocalPortHandle,
 
            creator_id: PortId,
 
            created_handle: LocalPortHandle,
 
            created_id: PortId,
 
        }
 
        let mut opened_port_id_pairs = Vec::new();
 
        let mut closed_port_id_pairs = Vec::new();
 

	
 
        let reservation = sched_ctx.runtime.start_create_pdl_component();
 
        let mut created_ctx = CompCtx::new(&reservation);
 

	
 
        // let other_proc = &sched_ctx.runtime.protocol.heap[definition_id];
 
        // let self_proc = &sched_ctx.runtime.protocol.heap[self.prompt.frames[0].definition];
 
        // dbg_code!({
 
        //     sched_ctx.log(&format!(
 
        //         "DEBUG: Comp '{}' (ID {:?}) is creating comp '{}' (ID {:?})",
 
        //         self_proc.identifier.value.as_str(), creator_ctx.id,
 
        //         other_proc.identifier.value.as_str(), reservation.id()
 
        //     ));
 
        // });
 

	
 
        // Take all the ports ID that are in the `args` (and currently belong to
 
        // the creator component) and translate them into new IDs that are
 
        // associated with the component we're about to create
 
        let mut arg_iter = ValueGroupPortIter::new(&mut arguments);
 
        while let Some(port_reference) = arg_iter.next() {
 
            // Create port entry for new component
 
            let creator_port_id = port_reference.id;
 
            let creator_port_handle = creator_ctx.get_port_handle(creator_port_id);
 
            let creator_port = creator_ctx.get_port(creator_port_handle);
 
            let created_port_handle = created_ctx.add_port(
 
                creator_port.peer_comp_id, creator_port.peer_port_id,
 
                creator_port.kind, creator_port.state
 
            );
 
            let created_port = created_ctx.get_port(created_port_handle);
 
            let created_port_id = created_port.self_id;
 

	
 
            let port_id_pair = PortPair {
 
                creator_handle: creator_port_handle,
 
                creator_id: creator_port_id,
 
                created_handle: created_port_handle,
 
                created_id: created_port_id,
 
            };
 

	
 
            if creator_port.state.is_closed() {
 
                closed_port_id_pairs.push(port_id_pair)
 
            } else {
 
                opened_port_id_pairs.push(port_id_pair);
 
            }
 

	
 
            // Modify value in arguments (bit dirty, but double vec in ValueGroup causes lifetime issues)
 
            let arg_value = if let Some(heap_pos) = port_reference.heap_pos {
 
                &mut arg_iter.group.regions[heap_pos][port_reference.index]
 
            } else {
 
                &mut arg_iter.group.values[port_reference.index]
 
            };
 
            match arg_value {
 
                Value::Input(id) => *id = port_id_to_eval(created_port_id),
 
                Value::Output(id) => *id = port_id_to_eval(created_port_id),
 
                _ => unreachable!(),
 
            }
 
        }
 

	
 
        // For each transferred port pair set their peer components to the
 
        // correct values. This will only change the values for the ports of
 
        // the new component.
 
        let mut created_component_has_remote_peers = false;
 

	
 
        for pair in opened_port_id_pairs.iter() {
 
            let creator_port_info = creator_ctx.get_port(pair.creator_handle);
 
            let created_port_info = created_ctx.get_port_mut(pair.created_handle);
 

	
 
            if created_port_info.peer_comp_id == creator_ctx.id {
 
                // Peer of the transferred port is the component that is
 
                // creating the new component.
 
                let created_peer_port_index = opened_port_id_pairs
 
                    .iter()
 
                    .position(|v| v.creator_id == creator_port_info.peer_port_id);
 
                match created_peer_port_index {
 
                    Some(created_peer_port_index) => {
 
                        // Addendum to the above comment: but that port is also
 
                        // moving to the new component
 
                        let peer_pair = &opened_port_id_pairs[created_peer_port_index];
 
                        created_port_info.peer_port_id = peer_pair.created_id;
 
                        created_port_info.peer_comp_id = reservation.id();
 
                        todo!("either add 'self peer', or remove that idea from Ctx altogether");`
 
                    },
 
                    None => {
 
                        // Peer port remains with creator component.
 
                        created_port_info.peer_comp_id = creator_ctx.id;
 
                        created_ctx.change_port_peer(sched_ctx, pair.created_handle, Some(creator_ctx.id));
 
                    }
 
                }
 
            } else {
 
                // Peer is a different component. We'll deal with sending the
 
                // appropriate messages later
 
                let peer_handle = creator_ctx.get_peer_handle(created_port_info.peer_comp_id);
 
                let peer_info = creator_ctx.get_peer(peer_handle);
 
                created_ctx.change_port_peer(sched_ctx, pair.created_handle, Some(peer_info.id));
 
                created_component_has_remote_peers = true;
 
            }
 
        }
 

	
 
        // We'll now actually turn our reservation for a new component into an
 
        // actual component. Note that we initialize it as "not sleeping" as
 
        // its initial scheduling might be performed based on `Ack`s in response
 
        // to message exchanges between remote peers.
 
        let total_num_ports = opened_port_id_pairs.len() + closed_port_id_pairs.len();
 
        let component = component::create_component(&sched_ctx.runtime.protocol, definition_id, type_id, arguments, total_num_ports);
 
        let (created_key, component) = sched_ctx.runtime.finish_create_pdl_component(
 
            reservation, component, created_ctx, false,
 
        );
 
        component.component.on_creation(created_key.downgrade(), sched_ctx);
 

	
 
        // Now modify the creator's ports: remove every transferred port and
 
        // potentially remove the peer component.
 
        for pair in opened_port_id_pairs.iter() {
 
            // Remove peer if appropriate
 
            let creator_port_index = creator_ctx.get_port_index(pair.creator_handle);
 
            creator_ctx.change_port_peer(sched_ctx, pair.creator_handle, None);
 
            creator_ctx.remove_port(pair.creator_handle);
 

	
 
            // Transfer any messages
 
            if let Some(mut message) = self.inbox_main.remove(creator_port_index) {
 
                message.data_header.target_port = pair.created_id;
 
                component.component.adopt_message(&mut component.ctx, message)
 
            }
 

	
 
            let mut message_index = 0;
 
            while message_index < self.inbox_backup.len() {
 
                let message = &self.inbox_backup[message_index];
 
                if message.data_header.target_port == pair.creator_id {
 
                    // transfer message
 
                    let mut message = self.inbox_backup.remove(message_index);
 
                    message.data_header.target_port = pair.created_id;
 
                    component.component.adopt_message(&mut component.ctx, message);
 
                } else {
 
                    message_index += 1;
 
                }
 
            }
 

	
 
            let created_port_info = component.ctx.get_port(pair.created_handle);
 
            if created_port_info.peer_comp_id == creator_ctx.id {
 
                // This handles the creation of a channel between the creator
 
                // component and the newly created component. So if the creator
 
                // had a `a -> b` channel, and `b` is moved to the new
 
                // component, then `a` needs to set its peer component.
 
                let peer_port_handle = creator_ctx.get_port_handle(created_port_info.peer_port_id);
 
                let peer_port_info = creator_ctx.get_port_mut(peer_port_handle);
 
                peer_port_info.peer_comp_id = component.ctx.id;
 
                peer_port_info.peer_port_id = created_port_info.self_id;
 
                creator_ctx.change_port_peer(sched_ctx, peer_port_handle, Some(component.ctx.id));
 
            }
 
        }
 

	
 
        // Do the same for the closed ports. Note that we might still have to
 
        // transfer messages that cause the new owner of the port to fail.
 
        for pair in closed_port_id_pairs.iter() {
 
            let port_index = creator_ctx.get_port_index(pair.creator_handle);
 
            creator_ctx.remove_port(pair.creator_handle);
 
            if let Some(mut message) = self.inbox_main.remove(port_index) {
 
                message.data_header.target_port = pair.created_id;
 
                component.component.adopt_message(&mut component.ctx, message);
 
            }
 

	
 
            let mut message_index = 0;
 
            while message_index < self.inbox_backup.len() {
 
                let message = &self.inbox_backup[message_index];
 
                if message.data_header.target_port == pair.created_id {
 
                    // Transfer message
 
                    let mut message = self.inbox_backup.remove(message_index);
 
                    message.data_header.target_port = pair.created_id;
 
                    component.component.adopt_message(&mut component.ctx, message);
 
                } else {
 
                    message_index += 1;
 
                }
 
            }
 
        }
 

	
 
        // By now all ports and messages have been transferred. If there are any
 
        // peers that need to be notified about this new component, then we
 
        // initiate the protocol that will notify everyone here.
 
        if created_component_has_remote_peers {
 
            let created_ctx = &component.ctx;
 
            let schedule_entry_id = self.control.add_schedule_entry(created_ctx.id);
 
            for pair in opened_port_id_pairs.iter() {
 
                let port_info = created_ctx.get_port(pair.created_handle);
 
                if port_info.peer_comp_id != creator_ctx.id && port_info.peer_comp_id != created_ctx.id {
 
                    let message = self.control.add_reroute_entry(
 
                        creator_ctx.id, port_info.peer_port_id, port_info.peer_comp_id,
 
                        pair.creator_id, pair.created_id, created_ctx.id,
 
                        schedule_entry_id
 
                    );
 
                    let peer_handle = created_ctx.get_peer_handle(port_info.peer_comp_id);
 
                    let peer_info = created_ctx.get_peer(peer_handle);
 
                    peer_info.handle.send_message_logged(sched_ctx, message, true);
 
                }
 
            }
 
        } else {
 
            // Peer can be scheduled immediately
 
            sched_ctx.runtime.enqueue_work(created_key);
 
        }
 
    }
 
}
 

	
 
/// Recursively goes through the value group, attempting to find ports.
 
/// Duplicates will only be added once.
 
pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec<PortId>) {
 
    // Helper to check a value for a port and recurse if needed.
 
    fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec<PortId>) {
 
        match value {
 
            Value::Input(port_id) | Value::Output(port_id) => {
 
                // This is an actual port
 
                let cur_port = PortId(port_id.id);
 
                for prev_port in ports.iter() {
 
                    if *prev_port == cur_port {
 
                        // Already added
 
                        return;
 
                    }
 
                }
 

	
 
                ports.push(cur_port);
 
            },
 
            Value::Array(heap_pos) |
 
            Value::Message(heap_pos) |
 
            Value::String(heap_pos) |
 
            Value::Struct(heap_pos) |
 
            Value::Union(_, heap_pos) => {
 
                // Reference to some dynamic thing which might contain ports,
 
                // so recurse
 
                let heap_region = &group.regions[*heap_pos as usize];
 
                for embedded_value in heap_region {
 
                    find_port_in_value(group, embedded_value, ports);
 
                }
 
            },
 
            _ => {}, // values we don't care about
 
        }
 
    }
 

	
 
    // Clear the ports, then scan all the available values
 
    ports.clear();
 
    for value in &value_group.values {
 
        find_port_in_value(value_group, value, ports);
 
    }
 
}
 

	
 
struct ValueGroupPortIter<'a> {
 
    group: &'a mut ValueGroup,
 
    heap_stack: Vec<(usize, usize)>,
 
    index: usize,
 
}
 

	
 
impl<'a> ValueGroupPortIter<'a> {
 
    fn new(group: &'a mut ValueGroup) -> Self {
 
        return Self{ group, heap_stack: Vec::new(), index: 0 }
 
    }
 
}
 

	
 
struct ValueGroupPortRef {
 
    id: PortId,
 
    heap_pos: Option<usize>, // otherwise: on stack
 
    index: usize,
 
}
 

	
 
impl<'a> Iterator for ValueGroupPortIter<'a> {
 
    type Item = ValueGroupPortRef;
 

	
 
    fn next(&mut self) -> Option<Self::Item> {
 
        // Enter loop that keeps iterating until a port is found
 
        loop {
 
            if let Some(pos) = self.heap_stack.last() {
 
                let (heap_pos, region_index) = *pos;
 
                if region_index >= self.group.regions[heap_pos].len() {
 
                    self.heap_stack.pop();
 
                    continue;
 
                }
 

	
 
                let value = &self.group.regions[heap_pos][region_index];
 
                self.heap_stack.last_mut().unwrap().1 += 1;
 

	
 
                match value {
 
                    Value::Input(id) | Value::Output(id) => {
 
                        let id = PortId(id.id);
 
                        return Some(ValueGroupPortRef{
 
                            id,
 
                            heap_pos: Some(heap_pos),
 
                            index: region_index,
 
                        });
 
                    },
 
                    _ => {},
 
                }
 

	
 
                if let Some(heap_pos) = value.get_heap_pos() {
 
                    self.heap_stack.push((heap_pos as usize, 0));
 
                }
 
            } else {
 
                if self.index >= self.group.values.len() {
 
                    return None;
 
                }
 

	
 
                let value = &mut self.group.values[self.index];
 
                self.index += 1;
 

	
 
                match value {
 
                    Value::Input(id) | Value::Output(id) => {
 
                        let id = PortId(id.id);
 
                        return Some(ValueGroupPortRef{
 
                            id,
 
                            heap_pos: None,
 
                            index: self.index - 1
 
                        });
 
                    },
 
                    _ => {},
 
                }
 

	
 
                // Not a port, check if we need to enter a heap region
 
                if let Some(heap_pos) = value.get_heap_pos() {
 
                    self.heap_stack.push((heap_pos as usize, 0));
 
                } // else: just consider the next value
 
            }
 
        }
 
    }
 
}
 
\ No newline at end of file
src/runtime2/component/component_random.rs
Show inline comments
 
@@ -2,14 +2,10 @@ use rand::prelude as random;
 
use rand::RngCore;
 

	
 
use crate::protocol::eval::{ValueGroup, Value};
 
use crate::runtime2::*;
 
use crate::runtime2::communication::*;
 

	
 
use super::*;
 
use super::component::{
 
    self,
 
    Component, CompExecState, CompScheduling,
 
    CompMode, ExitReason
 
};
 
use super::component::*;
 
use super::control_layer::*;
 
use super::consensus::*;
 

	
 
@@ -28,6 +24,8 @@ pub struct ComponentRandomU32 {
 
    did_perform_send: bool, // when in sync mode
 
    control: ControlLayer,
 
    consensus: Consensus,
 
    inbox_main: InboxMain, // not used
 
    inbox_backup: InboxBackup, // not used
 
}
 

	
 
impl Component for ComponentRandomU32 {
 
@@ -51,7 +49,7 @@ impl Component for ComponentRandomU32 {
 
            Message::Control(message) => {
 
                if let Err(location_and_message) = component::default_handle_control_message(
 
                    &mut self.exec_state, &mut self.control, &mut self.consensus,
 
                    message, sched_ctx, comp_ctx
 
                    message, sched_ctx, comp_ctx, &mut self.inbox_main, &mut self.inbox_backup
 
                ) {
 
                    component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message);
 
                }
 
@@ -64,7 +62,12 @@ impl Component for ComponentRandomU32 {
 
        sched_ctx.info(&format!("Running component ComponentRandomU32 (mode: {:?})", self.exec_state.mode));
 

	
 
        match self.exec_state.mode {
 
            CompMode::BlockedGet | CompMode::BlockedSelect => {
 
            CompMode::BlockedGet |
 
            CompMode::BlockedSelect |
 
            CompMode::PutPortsBlockedTransferredPorts |
 
            CompMode::PutPortsBlockedAwaitingAcks |
 
            CompMode::PutPortsBlockedSendingPort |
 
            CompMode::NewComponentBlocked => {
 
                // impossible for this component, no input ports and no select
 
                // blocks
 
                unreachable!();
 
@@ -104,7 +107,8 @@ impl Component for ComponentRandomU32 {
 
                    let send_result = component::default_send_data_message(
 
                        &mut self.exec_state, self.output_port_id,
 
                        PortInstruction::NoSource, value_group,
 
                        sched_ctx, &mut self.consensus, comp_ctx
 
                        sched_ctx, &mut self.consensus, &mut self.control,
 
                        comp_ctx
 
                    );
 

	
 
                    if let Err(location_and_message) = send_result {
 
@@ -157,6 +161,8 @@ impl ComponentRandomU32 {
 
            did_perform_send: false,
 
            control: ControlLayer::default(),
 
            consensus: Consensus::new(),
 
            inbox_main: Vec::new(),
 
            inbox_backup: Vec::new(),
 
        }
 
    }
 
}
 
\ No newline at end of file
src/runtime2/component/consensus.rs
Show inline comments
 
@@ -355,6 +355,20 @@ impl Consensus {
 
        self.solution.clear();
 
    }
 

	
 
    pub(crate) fn notify_received_port(&mut self, _expected_index: usize, port_handle: LocalPortHandle, comp_ctx: &CompCtx) {
 
        debug_assert_eq!(_expected_index, self.ports.len());
 
        let port_info = comp_ctx.get_port(port_handle);
 
        self.ports.push(PortAnnotation{
 
            self_comp_id: comp_ctx.id,
 
            self_port_id: port_info.self_id,
 
            peer_comp_id: port_info.peer_comp_id,
 
            peer_port_id: port_info.peer_port_id,
 
            peer_discovered: false,
 
            mapping: None,
 
            kind: port_info.kind,
 
        });
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Handling inbound and outbound messages
 
    // -------------------------------------------------------------------------
 
@@ -366,7 +380,10 @@ impl Consensus {
 
        let data_header = self.create_data_header_and_update_mapping(port_info);
 
        let sync_header = self.create_sync_header(comp_ctx);
 

	
 
        return DataMessage{ data_header, sync_header, content };
 
        return DataMessage{
 
            data_header, sync_header, content,
 
            ports: Vec::new()
 
        };
 
    }
 

	
 
    /// Handles the arrival of a new data message (needs to be called for every
src/runtime2/component/control_layer.rs
Show inline comments
 
@@ -5,12 +5,12 @@ use crate::runtime2::component::*;
 
use super::component_context::*;
 

	
 
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
 
pub(crate) struct ControlId(u32);
 
pub(crate) struct ControlId(pub(crate) u32);
 

	
 
impl ControlId {
 
    /// Like other invalid IDs, this one doesn't care any significance, but is
 
    /// just set at u32::MAX to hopefully bring out bugs sooner.
 
    fn new_invalid() -> Self {
 
    pub(crate) fn new_invalid() -> Self {
 
        return ControlId(u32::MAX);
 
    }
 
}
 
@@ -25,6 +25,7 @@ enum ControlContent {
 
    PeerChange(ContentPeerChange),
 
    ScheduleComponent(CompId),
 
    ClosedPort(PortId),
 
    UnblockPutWithPorts
 
}
 

	
 
struct ContentPeerChange {
 
@@ -45,6 +46,7 @@ pub(crate) enum AckAction {
 
    None,
 
    SendMessage(CompId, ControlMessage),
 
    ScheduleComponent(CompId),
 
    UnblockPutWithPorts,
 
}
 

	
 
/// Handling/sending control messages.
 
@@ -128,6 +130,9 @@ impl ControlLayer {
 
                comp_ctx.change_port_peer(sched_ctx, port_handle, None);
 

	
 
                return (AckAction::None, None);
 
            },
 
            ControlContent::UnblockPutWithPorts => {
 
                return (AckAction::UnblockPutWithPorts, None);
 
            }
 
        }
 
    }
 
@@ -137,7 +142,7 @@ impl ControlLayer {
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Port transfer (due to component creation)
 
    // Port transfer
 
    // -------------------------------------------------------------------------
 

	
 
    /// Adds an entry that, when completely ack'd, will schedule a component.
 
@@ -152,15 +157,20 @@ impl ControlLayer {
 
        return entry_id;
 
    }
 

	
 
    /// Removes a schedule entry. Only used if the caller preemptively called
 
    /// `add_schedule_entry`, but ended up not calling `add_reroute_entry`,
 
    /// hence the `ack_countdown` in the scheduling entry is at 0.
 
    pub(crate) fn remove_schedule_entry(&mut self, schedule_entry_id: ControlId) {
 
        let index = self.get_entry_index_by_id(schedule_entry_id).unwrap();
 
        debug_assert_eq!(self.entries[index].ack_countdown, 0);
 
        self.entries.remove(index);
 
    /// Adds an entry that returns the similarly named Ack action
 
    pub(crate) fn add_unblock_put_with_ports_entry(&mut self) -> ControlId {
 
        let entry_id = self.take_id();
 
        self.entries.push(ControlEntry{
 
            id: entry_id,
 
            ack_countdown: 0, // incremented by calls to `add_reroute_entry`
 
            content: ControlContent::UnblockPutWithPorts,
 
        });
 

	
 
        return entry_id;
 
    }
 

	
 
    /// Adds a rerouting entry (used to ensure all messages will end up at a
 
    /// newly created component). Used when creating a new component.
 
    pub(crate) fn add_reroute_entry(
 
        &mut self, creator_comp_id: CompId,
 
        source_port_id: PortId, source_comp_id: CompId,
 
@@ -193,6 +203,24 @@ impl ControlLayer {
 
        })
 
    }
 

	
 
    /// Creates a PortPeerChanged message (and increments ack-counter on a
 
    /// pre-created control entry) that is used as a preliminary step before
 
    /// transferring a port over a channel.
 
    pub(crate) fn create_port_transfer_message(
 
        &mut self, associated_control_id: ControlId,
 
        sender_comp_id: CompId, target_port_id: PortId
 
    ) -> Message {
 
        let entry_index = self.get_entry_index_by_id(associated_control_id).unwrap();
 
        self.entries[entry_index].ack_countdown += 1;
 

	
 
        return Message::Control(ControlMessage{
 
            id: associated_control_id,
 
            sender_comp_id,
 
            target_port_id: Some(target_port_id),
 
            content: ControlMessageContent::PortPeerChangedBlock
 
        })
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Blocking, unblocking, and closing ports
 
    // -------------------------------------------------------------------------
src/runtime2/component/mod.rs
Show inline comments
 
@@ -8,7 +8,7 @@ mod component_internet;
 

	
 
pub(crate) use component::{Component, CompScheduling};
 
pub(crate) use component_pdl::{CompPDL};
 
pub(crate) use component_context::{CompCtx, PortInstruction};
 
pub(crate) use component_context::{CompCtx, PortInstruction, PortKind, PortState, PortStateFlag};
 
pub(crate) use control_layer::{ControlId};
 

	
 
use super::scheduler::*;
src/runtime2/error.rs
Show inline comments
 
use std::fmt::{Write, Debug, Display, Formatter as FmtFormatter, Result as FmtResult};
 
use std::fmt::{Debug, Display, Formatter as FmtFormatter, Result as FmtResult};
 

	
 
/// Represents an unrecoverable runtime error that is reported to the user (for
 
/// debugging purposes). Basically a human-readable message with its source
src/runtime2/poll/mod.rs
Show inline comments
 
@@ -177,6 +177,7 @@ impl PollingThread {
 
    }
 

	
 
    pub(crate) fn run(&mut self) {
 
        use std::io::ErrorKind;
 
        use crate::runtime2::communication::Message;
 

	
 
        const NUM_EVENTS: usize = 256;
 
@@ -191,7 +192,23 @@ impl PollingThread {
 
        loop {
 
            // Retrieve events first (because the PollingClient will first
 
            // register at epoll, and then push a command into the queue).
 
            self.poller.wait(&mut events, EPOLL_DURATION).unwrap();
 
            loop {
 
                let wait_result = self.poller.wait(&mut events, EPOLL_DURATION);
 
                match wait_result {
 
                    Ok(()) => break,
 
                    Err(reason) => {
 
                        match reason.kind() {
 
                            ErrorKind::Interrupted => {
 
                                // Happens when we're debugging and set a break-
 
                                // point, we want to continue waiting
 
                            },
 
                            _ => {
 
                                panic!("failed to poll: {}", reason);
 
                            }
 
                        }
 
                    }
 
                }
 
            }
 

	
 
            // Then handle everything in the command queue.
 
            while let Some(command) = self.queue.pop() {
src/runtime2/runtime.rs
Show inline comments
 
@@ -74,7 +74,8 @@ pub(crate) struct RuntimeComp {
 
    pub exiting: bool,
 
}
 

	
 
/// Should contain everything that is accessible in a thread-safe manner
 
/// Should contain everything that is accessible in a thread-safe manner. May
 
/// NOT contain non-threadsafe fields.
 
// TODO: Do something about the `num_handles` thing. This needs to be a bit more
 
//  "foolproof" to lighten the mental burden of using the `num_handles`
 
//  variable.
 
@@ -95,12 +96,19 @@ pub(crate) struct CompHandle {
 
}
 

	
 
impl CompHandle {
 
    fn new(id: CompId, public: &CompPublic) -> CompHandle {
 
        let handle = CompHandle{
 
    /// Creates a new component handle and does not increment the reference
 
    /// counter.
 
    fn new_unincremented(id: CompId, public: &CompPublic) -> CompHandle {
 
        return CompHandle{
 
            target: public,
 
            id,
 
            #[cfg(debug_assertions)] decremented: false,
 
        };
 
    }
 

	
 
    /// Creates a new component handle and increments the reference counter.
 
    fn new(id: CompId, public: &CompPublic) -> CompHandle {
 
        let mut handle = Self::new_unincremented(id, public);
 
        handle.increment_users();
 
        return handle;
 
    }
 
@@ -232,10 +240,10 @@ impl Runtime {
 
            module_name, routine_name,
 
            ValueGroup::new_stack(Vec::new())
 
        )?;
 
        let reserved = self.inner.start_create_pdl_component();
 
        let reserved = self.inner.start_create_component();
 
        let ctx = CompCtx::new(&reserved);
 
        let component = Box::new(CompPDL::new(prompt, 0));
 
        let (key, _) = self.inner.finish_create_pdl_component(reserved, component, ctx, false);
 
        let (key, _) = self.inner.finish_create_component(reserved, component, ctx, false);
 
        self.inner.enqueue_work(key);
 

	
 
        return Ok(())
 
@@ -284,22 +292,24 @@ impl RuntimeInner {
 

	
 
    // Creating/destroying components
 

	
 
    pub(crate) fn start_create_pdl_component(&self) -> CompReserved {
 
    pub(crate) fn start_create_component(&self) -> CompReserved {
 
        self.increment_active_components();
 
        let reservation = self.components.reserve();
 
        return CompReserved{ reservation };
 
    }
 

	
 
    pub(crate) fn finish_create_pdl_component(
 
    pub(crate) fn finish_create_component(
 
        &self, reserved: CompReserved,
 
        component: Box<dyn Component>, mut context: CompCtx, initially_sleeping: bool,
 
    ) -> (CompKey, &mut RuntimeComp) {
 
        // Construct runtime component
 
        let inbox_queue = QueueDynMpsc::new(16);
 
        let inbox_producer = inbox_queue.producer();
 

	
 
        let _id = reserved.id();
 
        context.id = reserved.id();
 
        let component = RuntimeComp {
 
        let component_id = reserved.id();
 
        context.id = component_id;
 

	
 
        let mut component = RuntimeComp {
 
            public: CompPublic{
 
                sleeping: AtomicBool::new(initially_sleeping),
 
                num_handles: AtomicU32::new(1), // the component itself acts like a handle
 
@@ -311,10 +321,17 @@ impl RuntimeInner {
 
            exiting: false,
 
        };
 

	
 
        // Submit created component into storage.
 
        let index = self.components.submit(reserved.reservation, component);
 
        debug_assert_eq!(index, _id.0);
 
        debug_assert_eq!(index, component_id.0);
 
        let component = self.components.get_mut(index);
 

	
 
        // Bit messy, but here we create the reference of a component to itself,
 
        // the `num_handles` being initialized to `1` above, and add it to the
 
        // component context.
 
        let self_handle = CompHandle::new_unincremented(component_id, &component.public);
 
        component.ctx.add_self_reference(self_handle);
 

	
 
        return (CompKey(index), component);
 
    }
 

	
src/runtime2/scheduler.rs
Show inline comments
 
@@ -89,7 +89,7 @@ impl Scheduler {
 
                CompScheduling::Sleep => { self.mark_component_as_sleeping(comp_key, component); },
 
                CompScheduling::Exit => {
 
                    component.component.on_shutdown(&scheduler_ctx);
 
                    self.mark_component_as_exiting(&scheduler_ctx, component);
 
                    self.mark_component_as_exiting(&scheduler_ctx, comp_key, component);
 
                }
 
            }
 
        }
 
@@ -120,16 +120,14 @@ impl Scheduler {
 
    /// Marks the component as exiting by removing the reference it holds to
 
    /// itself. Afterward the component will enter "normal" sleeping mode (if it
 
    /// has not yet been destroyed)
 
    fn mark_component_as_exiting(&self, sched_ctx: &SchedulerCtx, component: &mut RuntimeComp) {
 
    fn mark_component_as_exiting(&self, sched_ctx: &SchedulerCtx, comp_key: CompKey, component: &mut RuntimeComp) {
 
        // If we didn't yet decrement our reference count, do so now
 
        let comp_key = unsafe{ component.ctx.id.upgrade() };
 

	
 
        if !component.exiting {
 
            component.exiting = true;
 

	
 
            let old_count = component.public.num_handles.fetch_sub(1, Ordering::AcqRel);
 
            let new_count = old_count - 1;
 
            if new_count == 0 {
 
            let maybe_comp_key = component.ctx.remove_self_reference();
 
            if let Some(_comp_key) = maybe_comp_key {
 
                debug_assert_eq!(_comp_key.0, comp_key.0);
 
                sched_ctx.runtime.destroy_component(comp_key);
 
                return;
 
            }
src/runtime2/tests/messaging.rs
Show inline comments
 
new file 100644
 
use super::*;
 

	
 

	
 
#[test]
 
fn test_component_communication() {
 
    let pd = ProtocolDescription::parse(b"
 
    primitive sender(out<u32> o, u32 outside_loops, u32 inside_loops) {
 
        u32 outside_index = 0;
 
        while (outside_index < outside_loops) {
 
            u32 inside_index = 0;
 
            sync while (inside_index < inside_loops) {
 
                put(o, inside_index);
 
                inside_index += 1;
 
            }
 
            outside_index += 1;
 
        }
 
    }
 

	
 
    primitive receiver(in<u32> i, u32 outside_loops, u32 inside_loops) {
 
        u32 outside_index = 0;
 
        while (outside_index < outside_loops) {
 
            u32 inside_index = 0;
 
            sync while (inside_index < inside_loops) {
 
                auto val = get(i);
 
                while (val != inside_index) {} // infinite loop if incorrect value is received
 
                inside_index += 1;
 
            }
 
            outside_index += 1;
 
        }
 
    }
 

	
 
    composite constructor() {
 
        channel o_orom -> i_orom;
 
        channel o_mrom -> i_mrom;
 
        channel o_ormm -> i_ormm;
 
        channel o_mrmm -> i_mrmm;
 

	
 
        // one round, one message per round
 
        new sender(o_orom, 1, 1);
 
        new receiver(i_orom, 1, 1);
 

	
 
        // multiple rounds, one message per round
 
        new sender(o_mrom, 5, 1);
 
        new receiver(i_mrom, 5, 1);
 

	
 
        // one round, multiple messages per round
 
        new sender(o_ormm, 1, 5);
 
        new receiver(i_ormm, 1, 5);
 

	
 
        // multiple rounds, multiple messages per round
 
        new sender(o_mrmm, 5, 5);
 
        new receiver(i_mrmm, 5, 5);
 
    }").expect("compilation");
 
    let rt = Runtime::new(3, LOG_LEVEL, pd).unwrap();
 
    create_component(&rt, "", "constructor", no_args());
 
}
 

	
 
#[test]
 
fn test_send_to_self() {
 
    compile_and_create_component("
 
    primitive insane_in_the_membrane() {
 
        channel a -> b;
 
        sync {
 
            put(a, 1);
 
            auto v = get(b);
 
            while (v != 1) {}
 
        }
 
    }
 
    ", "insane_in_the_membrane", no_args());
 
}
 

	
 
#[test]
 
fn test_intermediate_messenger() {
 
    let pd = ProtocolDescription::parse(b"
 
    primitive receiver<T>(in<T> rx, u32 num) {
 
        auto index = 0;
 
        while (index < num) {
 
            sync { auto v = get(rx); }
 
            index += 1;
 
        }
 
    }
 

	
 
    primitive middleman<T>(in<T> rx, out<T> tx, u32 num) {
 
        auto index = 0;
 
        while (index < num) {
 
            sync { put(tx, get(rx)); }
 
            index += 1;
 
        }
 
    }
 

	
 
    primitive sender<T>(out<T> tx, u32 num) {
 
        auto index = 0;
 
        while (index < num) {
 
            sync put(tx, 1337);
 
            index += 1;
 
        }
 
    }
 

	
 
    composite constructor_template<T>() {
 
        auto num = 0;
 
        channel<T> tx_a -> rx_a;
 
        channel tx_b -> rx_b;
 
        new sender(tx_a, 3);
 
        new middleman(rx_a, tx_b, 3);
 
        new receiver(rx_b, 3);
 
    }
 

	
 
    composite constructor() {
 
        new constructor_template<u16>();
 
        new constructor_template<u32>();
 
        new constructor_template<u64>();
 
        new constructor_template<s16>();
 
        new constructor_template<s32>();
 
        new constructor_template<s64>();
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(3, LOG_LEVEL, pd).unwrap();
 
    create_component(&rt, "", "constructor", no_args());
 
}
src/runtime2/tests/mod.rs
Show inline comments
 
@@ -3,10 +3,12 @@ use crate::protocol::eval::*;
 
use crate::runtime2::runtime::*;
 
use crate::runtime2::component::{CompCtx, CompPDL};
 

	
 
mod messaging;
 
mod error_handling;
 
mod transfer_ports;
 

	
 
const LOG_LEVEL: LogLevel = LogLevel::Debug;
 
const NUM_THREADS: u32 = 4;
 
const NUM_THREADS: u32 = 1;
 

	
 
pub(crate) fn compile_and_create_component(source: &str, routine_name: &str, args: ValueGroup) {
 
    let protocol = ProtocolDescription::parse(source.as_bytes())
 
@@ -20,10 +22,10 @@ pub(crate) fn create_component(rt: &Runtime, module_name: &str, routine_name: &s
 
    let prompt = rt.inner.protocol.new_component(
 
        module_name.as_bytes(), routine_name.as_bytes(), args
 
    ).expect("create prompt");
 
    let reserved = rt.inner.start_create_pdl_component();
 
    let reserved = rt.inner.start_create_component();
 
    let ctx = CompCtx::new(&reserved);
 
    let component = Box::new(CompPDL::new(prompt, 0));
 
    let (key, _) = rt.inner.finish_create_pdl_component(reserved, component, ctx, false);
 
    let (key, _) = rt.inner.finish_create_component(reserved, component, ctx, false);
 
    rt.inner.enqueue_work(key);
 
}
 

	
 
@@ -44,109 +46,6 @@ fn test_component_creation() {
 
    }
 
}
 

	
 
#[test]
 
fn test_component_communication() {
 
    let pd = ProtocolDescription::parse(b"
 
    primitive sender(out<u32> o, u32 outside_loops, u32 inside_loops) {
 
        u32 outside_index = 0;
 
        while (outside_index < outside_loops) {
 
            u32 inside_index = 0;
 
            sync while (inside_index < inside_loops) {
 
                put(o, inside_index);
 
                inside_index += 1;
 
            }
 
            outside_index += 1;
 
        }
 
    }
 

	
 
    primitive receiver(in<u32> i, u32 outside_loops, u32 inside_loops) {
 
        u32 outside_index = 0;
 
        while (outside_index < outside_loops) {
 
            u32 inside_index = 0;
 
            sync while (inside_index < inside_loops) {
 
                auto val = get(i);
 
                while (val != inside_index) {} // infinite loop if incorrect value is received
 
                inside_index += 1;
 
            }
 
            outside_index += 1;
 
        }
 
    }
 

	
 
    composite constructor() {
 
        channel o_orom -> i_orom;
 
        channel o_mrom -> i_mrom;
 
        channel o_ormm -> i_ormm;
 
        channel o_mrmm -> i_mrmm;
 

	
 
        // one round, one message per round
 
        new sender(o_orom, 1, 1);
 
        new receiver(i_orom, 1, 1);
 

	
 
        // multiple rounds, one message per round
 
        new sender(o_mrom, 5, 1);
 
        new receiver(i_mrom, 5, 1);
 

	
 
        // one round, multiple messages per round
 
        new sender(o_ormm, 1, 5);
 
        new receiver(i_ormm, 1, 5);
 

	
 
        // multiple rounds, multiple messages per round
 
        new sender(o_mrmm, 5, 5);
 
        new receiver(i_mrmm, 5, 5);
 
    }").expect("compilation");
 
    let rt = Runtime::new(3, LOG_LEVEL, pd).unwrap();
 
    create_component(&rt, "", "constructor", no_args());
 
}
 

	
 
#[test]
 
fn test_intermediate_messenger() {
 
    let pd = ProtocolDescription::parse(b"
 
    primitive receiver<T>(in<T> rx, u32 num) {
 
        auto index = 0;
 
        while (index < num) {
 
            sync { auto v = get(rx); }
 
            index += 1;
 
        }
 
    }
 

	
 
    primitive middleman<T>(in<T> rx, out<T> tx, u32 num) {
 
        auto index = 0;
 
        while (index < num) {
 
            sync { put(tx, get(rx)); }
 
            index += 1;
 
        }
 
    }
 

	
 
    primitive sender<T>(out<T> tx, u32 num) {
 
        auto index = 0;
 
        while (index < num) {
 
            sync put(tx, 1337);
 
            index += 1;
 
        }
 
    }
 

	
 
    composite constructor_template<T>() {
 
        auto num = 0;
 
        channel<T> tx_a -> rx_a;
 
        channel tx_b -> rx_b;
 
        new sender(tx_a, 3);
 
        new middleman(rx_a, tx_b, 3);
 
        new receiver(rx_b, 3);
 
    }
 

	
 
    composite constructor() {
 
        new constructor_template<u16>();
 
        new constructor_template<u32>();
 
        new constructor_template<u64>();
 
        new constructor_template<s16>();
 
        new constructor_template<s32>();
 
        new constructor_template<s64>();
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(3, LOG_LEVEL, pd).unwrap();
 
    create_component(&rt, "", "constructor", no_args());
 
}
 

	
 
#[test]
 
fn test_simple_select() {
 
    let pd = ProtocolDescription::parse(b"
src/runtime2/tests/transfer_ports.rs
Show inline comments
 
new file 100644
 
use super::*;
 

	
 
#[test]
 
fn test_transfer_precreated_port_with_owned_peer() {
 
    compile_and_create_component("
 
    primitive port_sender(out<in<u32>> tx) {
 
        channel a -> b;
 
        sync put(tx, b);
 
    }
 

	
 
    primitive port_receiver(in<in<u32>> rx) {
 
        sync auto a = get(rx);
 
    }
 

	
 
    composite constructor() {
 
        channel a -> b;
 
        new port_sender(a);
 
        new port_receiver(b);
 
    }
 
    ", "constructor", no_args());
 
}
 

	
 
#[test]
 
fn test_transfer_precreated_port_with_foreign_peer() {
 
    compile_and_create_component("
 
    primitive port_sender(out<in<u32>> tx, in<u32> to_send) {
 
        sync put(tx, to_send);
 
    }
 

	
 
    primitive port_receiver(in<in<u32>> rx) {
 
        sync auto a = get(rx);
 
    }
 

	
 
    composite constructor() {
 
        channel tx -> rx;
 
        channel forgotten -> to_send;
 
        new port_sender(tx, to_send);
 
        new port_receiver(rx);
 
    }
 
    ", "constructor", no_args());
 
}
 

	
 
#[test]
 
fn test_transfer_synccreated_port() {
 
    compile_and_create_component("
 
    primitive port_sender(out<in<u32>> tx) {
 
        sync {
 
            channel a -> b;
 
            put(tx, b);
 
        }
 
    }
 

	
 
    primitive port_receiver(in<in<u32>> rx) {
 
        sync auto a = get(rx);
 
    }
 

	
 
    composite constructor() {
 
        channel a -> b;
 
        new port_sender(a);
 
        new port_receiver(b);
 
    }
 
    ", "constructor", no_args());
 
}
 

	
 
#[test]
 
fn test_transfer_precreated_port_with_owned_peer_and_communication() {
 
    compile_and_create_component("
 
    primitive port_sender(out<in<u32>> tx) {
 
        channel a -> b;
 
        sync put(tx, b);
 
        sync put(a, 1337);
 
    }
 

	
 
    primitive port_receiver(in<in<u32>> rx) {
 
        channel a -> b; // this is stupid, but we need to have a variable to use
 
        sync b = get(rx);
 
        u32 value = 0;
 
        sync value = get(b);
 
        while (value != 1337) {}
 
    }
 
    composite constructor() {
 
        channel a -> b;
 
        new port_sender(a);
 
        new port_receiver(b);
 
    }
 
    ", "constructor", no_args());
 
}
 

	
 
#[test]
 
fn test_transfer_precreated_port_with_foreign_peer_and_communication() {
 
    compile_and_create_component("
 
    primitive port_sender(out<in<u32>> tx, in<u32> to_send) {
 
        sync put(tx, to_send);
 
    }
 

	
 
    primitive message_transmitter(out<u32> tx) {
 
        sync put(tx, 1337);
 
    }
 

	
 
    primitive port_receiver(in<in<u32>> rx) {
 
        channel unused -> b;
 
        sync b = get(rx);
 
        u32 value = 0;
 
        sync value = get(b);
 
        while (value != 1337) {}
 
    }
 

	
 
    composite constructor() {
 
        channel port_tx -> port_rx;
 
        channel value_tx -> value_rx;
 
        new port_sender(port_tx, value_rx);
 
        new port_receiver(port_rx);
 
        new message_transmitter(value_tx);
 
    }
 
    ", "constructor", no_args());
 
}
 

	
 
#[test]
 
fn test_transfer_precreated_port_with_owned_peer_back_and_forth() {
 
    compile_and_create_component("
 
    primitive port_send_and_receive(out<in<u32>> tx, in<in<u32>> rx) {
 
        channel a -> b;
 
        sync {
 
            put(tx, b);
 
            b = get(rx);
 
        }
 
    }
 

	
 
    primitive port_receive_and_send(in<in<u32>> rx, out<in<u32>> tx) {
 
        channel unused -> transferred; // same problem as in different tests
 
        sync {
 
            transferred = get(rx);
 
            put(tx, transferred);
 
        }
 
    }
 

	
 
    composite constructor() {
 
        channel port_tx_forward -> port_rx_forward;
 
        channel port_tx_backward -> port_rx_backward;
 

	
 
        new port_send_and_receive(port_tx_forward, port_rx_backward);
 
        new port_receive_and_send(port_rx_forward, port_tx_backward);
 
    }", "constructor", no_args());
 
}
 

	
 
#[test]
 
fn test_transfer_precreated_port_with_foreign_peer_back_and_forth_and_communication() {
 
    compile_and_create_component("
 
    primitive port_send_and_receive(out<in<u32>> tx, in<in<u32>> rx, in<u32> to_transfer) {
 
        sync {
 
            put(tx, to_transfer);
 
            to_transfer = get(rx);
 
        }
 
        sync {
 
            auto value = get(to_transfer);
 
            while (value != 1337) {}
 
        }
 
    }
 

	
 
    primitive port_receive_and_send(in<in<u32>> rx, out<in<u32>> tx) {
 
        channel unused -> transferred;
 
        sync {
 
            transferred = get(rx);
 
            put(tx, transferred);
 
        }
 
    }
 

	
 
    primitive value_sender(out<u32> tx) {
 
        sync put(tx, 1337);
 
    }
 

	
 
    composite constructor() {
 
        channel port_tx_forward -> port_rx_forward;
 
        channel port_tx_backward -> port_rx_backward;
 
        channel message_tx -> message_rx;
 
        new port_send_and_receive(port_tx_forward, port_rx_backward, message_rx);
 
        new port_receive_and_send(port_rx_forward, port_tx_backward);
 
        new value_sender(message_tx);
 
    }
 
    ", "constructor", no_args());
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)