Changeset - 935c576f54d0
[Not reviewed]
0 4 0
MH - 3 years ago 2022-05-11 00:41:52
contact@maxhenger.nl
Fixing rerouting+port-transfer bug
4 files changed with 9 insertions and 4 deletions:
0 comments (0 inline, 0 general)
src/runtime2/component/component.rs
Show inline comments
 
use std::fmt::{Display as FmtDisplay, Result as FmtResult, Formatter};
 

	
 
use crate::protocol::eval::{Prompt, EvalError, ValueGroup, Value, ValueId, PortId as EvalPortId};
 
use crate::protocol::*;
 
use crate::runtime2::*;
 
use crate::runtime2::communication::*;
 

	
 
use super::{CompCtx, CompPDL, CompId};
 
use super::component_context::*;
 
use super::component_random::*;
 
use super::component_internet::*;
 
use super::control_layer::*;
 
use super::consensus::*;
 

	
 
pub enum CompScheduling {
 
    Immediate,
 
    Requeue,
 
    Sleep,
 
    Exit,
 
}
 

	
 
/// Potential error emitted by a component
 
pub enum CompError {
 
    /// Error originating from the code executor. Hence has an associated
 
    /// source location.
 
    Executor(EvalError),
 
    /// Error originating from a component, but not necessarily associated with
 
    /// a location in the source.
 
    Component(String), // TODO: Maybe a different embedded value in the future?
 
    /// Pure runtime error. Not necessarily originating from the component
 
    /// itself. Should be treated as a very severe runtime-compromising error.
 
    Runtime(RtError),
 
}
 

	
 
impl FmtDisplay for CompError {
 
    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
 
        match self {
 
            CompError::Executor(v) => v.fmt(f),
 
            CompError::Component(v) => v.fmt(f),
 
            CompError::Runtime(v) => v.fmt(f),
 
        }
 
    }
 
}
 

	
 
/// Generic representation of a component (as viewed by a scheduler).
 
pub(crate) trait Component {
 
    /// Called upon the creation of the component. Note that the scheduler
 
    /// context is officially running another component (the component that is
 
    /// creating the new component).
 
    fn on_creation(&mut self, comp_id: CompId, sched_ctx: &SchedulerCtx);
 

	
 
    /// Called when a component crashes or wishes to exit. So is not called
 
    /// right before destruction, other components may still hold a handle to
 
    /// the component and send it messages!
 
    fn on_shutdown(&mut self, sched_ctx: &SchedulerCtx);
 

	
 
    /// Called if the component is created by another component and the messages
 
    /// are being transferred between the two.
 
    fn adopt_message(&mut self, comp_ctx: &mut CompCtx, message: DataMessage);
 

	
 
    /// Called if the component receives a new message. The component is
 
    /// responsible for deciding where that messages goes.
 
    fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message);
 

	
 
    /// Called if the component's routine should be executed. The return value
 
    /// can be used to indicate when the routine should be run again.
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling;
 
}
 

	
 
/// Representation of the generic operating mode of a component. Although not
 
/// every state may be used by every kind of (builtin) component, this allows
 
/// writing standard handlers for particular events in a component's lifetime.
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub(crate) enum CompMode {
 
    NonSync, // not in sync mode
 
    Sync, // in sync mode, can interact with other components
 
    SyncEnd, // awaiting a solution, i.e. encountered the end of the sync block
 
    BlockedGet, // blocked because we need to receive a message on a particular port
 
    BlockedPut, // component is blocked because the port is blocked
 
    BlockedSelect, // waiting on message to complete the select statement
 
    BlockedPutPorts,// blocked because we're waiting to send a data message containing ports
 
    StartExit, // temporary state: if encountered then we start the shutdown process.
 
    BusyExit, // temporary state: waiting for Acks for all the closed ports, potentially waiting for sync round to finish
 
    Exit, // exiting: shutdown process started, now waiting until the reference count drops to 0
 
}
 

	
 
impl CompMode {
 
    pub(crate) fn is_in_sync_block(&self) -> bool {
 
        use CompMode::*;
 

	
 
        match self {
 
            Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect | BlockedPutPorts => true,
 
            NonSync | StartExit | BusyExit | Exit => false,
 
        }
 
    }
 

	
 
    pub(crate) fn is_busy_exiting(&self) -> bool {
 
        use CompMode::*;
 

	
 
        match self {
 
            NonSync | Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect | BlockedPutPorts => false,
 
            StartExit | BusyExit => true,
 
            Exit => false,
 
        }
 
    }
 
}
 

	
 
#[derive(Debug)]
 
pub(crate) enum ExitReason {
 
    Termination, // regular termination of component
 
    ErrorInSync,
 
    ErrorNonSync,
 
}
 

	
 
impl ExitReason {
 
    pub(crate) fn is_in_sync(&self) -> bool {
 
        use ExitReason::*;
 

	
 
        match self {
 
            Termination | ErrorNonSync => false,
 
            ErrorInSync => true,
 
        }
 
    }
 

	
 
    pub(crate) fn is_error(&self) -> bool {
 
        use ExitReason::*;
 

	
 
        match self {
 
            Termination => false,
 
            ErrorInSync | ErrorNonSync => true,
 
        }
 
    }
 
}
 

	
 
/// Component execution state: the execution mode along with some descriptive
 
/// fields. Fields are public for ergonomic reasons, use member functions when
 
/// appropriate.
 
pub(crate) struct CompExecState {
 
    pub mode: CompMode,
 
    pub mode_port: PortId, // valid if blocked on a port (put/get)
 
    pub mode_value: ValueGroup, // valid if blocked on a put
 
    pub exit_reason: ExitReason, // valid if in StartExit/BusyExit/Exit mode
 
}
 

	
 
impl CompExecState {
 
    pub(crate) fn new() -> Self {
 
        return Self{
 
            mode: CompMode::NonSync,
 
            mode_port: PortId::new_invalid(),
 
            mode_value: ValueGroup::default(),
 
            exit_reason: ExitReason::Termination,
 
        }
 
    }
 

	
 
    pub(crate) fn set_as_start_exit(&mut self, reason: ExitReason) {
 
        self.mode = CompMode::StartExit;
 
        self.exit_reason = reason;
 
    }
 

	
 
    pub(crate) fn set_as_blocked_get(&mut self, port: PortId) {
 
        self.mode = CompMode::BlockedGet;
 
        self.mode_port = port;
 
        debug_assert!(self.mode_value.values.is_empty());
 
    }
 

	
 
    pub(crate) fn is_blocked_on_get(&self, port: PortId) -> bool {
 
        return
 
            self.mode == CompMode::BlockedGet &&
 
            self.mode_port == port;
 
    }
 

	
 
    pub(crate) fn set_as_blocked_put(&mut self, port: PortId, value: ValueGroup) {
 
        self.mode = CompMode::BlockedPut;
 
        self.mode_port = port;
 
        self.mode_value = value;
 
    }
 

	
 
    pub(crate) fn is_blocked_on_put(&self, port: PortId) -> bool {
 
        return
 
            self.mode == CompMode::BlockedPut &&
 
            self.mode_port == port;
 
    }
 
}
 

	
 
// TODO: Replace when implementing port sending. Should probably be incorporated
 
//  into CompCtx (and rename CompCtx into CompComms)
 
pub(crate) type InboxMain = Vec<Option<DataMessage>>;
 
pub(crate) type InboxMainRef = [Option<DataMessage>];
 
pub(crate) type InboxBackup = Vec<DataMessage>;
 

	
 
/// Creates a new component based on its definition. Meaning that if it is a
 
/// user-defined component then we set up the PDL code state. Otherwise we
 
/// construct a custom component. This does NOT take care of port and message
 
/// management.
 
pub(crate) fn create_component(
 
    protocol: &ProtocolDescription,
 
    definition_id: ProcedureDefinitionId, type_id: TypeId,
 
    arguments: ValueGroup, num_ports: usize
 
) -> Box<dyn Component> {
 
    let definition = &protocol.heap[definition_id];
 
    debug_assert!(definition.kind == ProcedureKind::Primitive || definition.kind == ProcedureKind::Composite);
 

	
 
    if definition.source.is_builtin() {
 
        // Builtin component
 
        let component: Box<dyn Component> = match definition.source {
 
            ProcedureSource::CompRandomU32 => Box::new(ComponentRandomU32::new(arguments)),
 
            ProcedureSource::CompTcpClient => Box::new(ComponentTcpClient::new(arguments)),
 
            _ => unreachable!(),
 
        };
 

	
 
        return component;
 
    } else {
 
        // User-defined component
 
        let prompt = Prompt::new(
 
            &protocol.types, &protocol.heap,
 
            definition_id, type_id, arguments
 
        );
 
        let component = CompPDL::new(prompt, num_ports);
 
        return Box::new(component);
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Generic component messaging utilities (for sending and receiving)
 
// -----------------------------------------------------------------------------
 

	
 
/// Default handling of sending a data message. In case the port is blocked then
 
/// the `ExecState` will become blocked as well. Note that
 
/// `default_handle_control_message` will ensure that the port becomes
 
/// unblocked if so instructed by the receiving component. The returned
 
/// scheduling value must be used.
 
#[must_use]
 
pub(crate) fn default_send_data_message(
 
    exec_state: &mut CompExecState, transmitting_port_id: PortId,
 
    port_instruction: PortInstruction, value: ValueGroup,
 
    sched_ctx: &SchedulerCtx, consensus: &mut Consensus,
 
    control: &mut ControlLayer, comp_ctx: &mut CompCtx
 
) -> Result<CompScheduling, (PortInstruction, String)> {
 
    debug_assert_eq!(exec_state.mode, CompMode::Sync);
 

	
 
    let port_handle = comp_ctx.get_port_handle(transmitting_port_id);
 
    let port_info = comp_ctx.get_port_mut(port_handle);
 
    port_info.last_instruction = port_instruction;
 

	
 
    let port_info = comp_ctx.get_port(port_handle);
 
    debug_assert_eq!(port_info.kind, PortKind::Putter);
 

	
 
    if port_info.state.is_closed() {
 
        // Note: normally peer is eventually consistent, but if it has shut down
 
        // then we can be sure it is consistent (I think?)
 
        return Err((
 
            port_info.last_instruction,
 
            format!("Cannot send on this port, as the peer (id:{}) has shut down", port_info.peer_comp_id.0)
 
        ))
 
    } else if port_info.state.is_blocked() {
 
        // Port is blocked, so we cannot send
 
        exec_state.set_as_blocked_put(transmitting_port_id, value);
 

	
 
        return Ok(CompScheduling::Sleep);
 
    } else {
 
        // Check if there are any ports that are being transmitted
 
        let mut ports = Vec::new();
 
        find_ports_in_value_group(&value, &mut ports);
 
        if !ports.is_empty() {
 
            prepare_send_message_with_ports(
 
                transmitting_port_id, port_instruction, value, exec_state,
 
                comp_ctx, sched_ctx, control
 
            )?;
 

	
 
            return Ok(CompScheduling::Sleep);
 
        } else {
 
            // Port is not blocked and no ports to transfer: send to the peer
 
            let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 
            let peer_info = comp_ctx.get_peer(peer_handle);
 
            let annotated_message = consensus.annotate_data_message(comp_ctx, port_info, value);
 
            peer_info.handle.send_message_logged(sched_ctx, Message::Data(annotated_message), true);
 

	
 
            return Ok(CompScheduling::Immediate);
 
        }
 
    }
 
}
 

	
 
pub(crate) enum IncomingData {
 
    PlacedInSlot,
 
    SlotFull(DataMessage),
 
}
 

	
 
/// Default handling of receiving a data message. In case there is no room for
 
/// the message it is returned from this function. Note that this function is
 
/// different from PDL code performing a `get` on a port; this is the case where
 
/// the message first arrives at the component.
 
// NOTE: This is supposed to be a somewhat temporary implementation. It would be
 
//  nicest if the sending component can figure out it cannot send any more data.
 
#[must_use]
 
pub(crate) fn default_handle_incoming_data_message(
 
    exec_state: &mut CompExecState, inbox_main: &mut InboxMain,
 
    comp_ctx: &mut CompCtx, incoming_message: DataMessage,
 
    sched_ctx: &SchedulerCtx, control: &mut ControlLayer
 
) -> IncomingData {
 
    let port_handle = comp_ctx.get_port_handle(incoming_message.data_header.target_port);
 
    let port_index = comp_ctx.get_port_index(port_handle);
 
    comp_ctx.get_port_mut(port_handle).received_message_for_sync = true;
 
    let port_value_slot = &mut inbox_main[port_index];
 
    let target_port_id = incoming_message.data_header.target_port;
 

	
 
    if port_value_slot.is_none() {
 
        // We can put the value in the slot
 
        *port_value_slot = Some(incoming_message);
 

	
 
        // Check if we're blocked on receiving this message.
 
        dbg_code!({
 
            // Our port cannot have been blocked itself, because we're able to
 
            // directly insert the message into its slot.
 
            assert!(!comp_ctx.get_port(port_handle).state.is_blocked());
 
        });
 

	
 
        if exec_state.is_blocked_on_get(target_port_id) {
 
            // Return to normal operation
 
            exec_state.mode = CompMode::Sync;
 
            exec_state.mode_port = PortId::new_invalid();
 
            debug_assert!(exec_state.mode_value.values.is_empty());
 
        }
 

	
 
        return IncomingData::PlacedInSlot
 
    } else {
 
        // Slot is already full, so if the port was previously opened, it will
 
        // now become closed
 
        let port_info = comp_ctx.get_port_mut(port_handle);
 
        if port_info.state.is_open() {
 
            port_info.state.set(PortStateFlag::BlockedDueToFullBuffers);
 

	
 
            let (peer_handle, message) =
 
                control.initiate_port_blocking(comp_ctx, port_handle);
 
            let peer = comp_ctx.get_peer(peer_handle);
 
            peer.handle.send_message_logged(sched_ctx, Message::Control(message), true);
 
        }
 

	
 
        return IncomingData::SlotFull(incoming_message)
 
    }
 
}
 

	
 
pub(crate) enum GetResult {
 
    Received(DataMessage),
 
    NoMessage,
 
    Error((PortInstruction, String)),
 
}
 

	
 
/// Default attempt at trying to receive from a port (i.e. through a `get`, or
 
/// the equivalent operation for a builtin component). `target_port` is the port
 
/// we're trying to receive from, and the `target_port_instruction` is the
 
/// instruction we're attempting on this port.
 
pub(crate) fn default_attempt_get(
 
    exec_state: &mut CompExecState, target_port: PortId, target_port_instruction: PortInstruction,
 
    inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup, sched_ctx: &SchedulerCtx,
 
    comp_ctx: &mut CompCtx, control: &mut ControlLayer, consensus: &mut Consensus
 
) -> GetResult {
 
    let port_handle = comp_ctx.get_port_handle(target_port);
 
    let port_index = comp_ctx.get_port_index(port_handle);
 

	
 
    let port_info = comp_ctx.get_port_mut(port_handle);
 
    port_info.last_instruction = target_port_instruction;
 
    if port_info.state.is_closed() {
 
        let peer_id = port_info.peer_comp_id;
 
        return GetResult::Error((
 
            target_port_instruction,
 
            format!("Cannot get from this port, as the peer component (id:{}) closed the port", peer_id.0)
 
        ));
 
    }
 

	
 
    if let Some(message) = &inbox_main[port_index] {
 
        if consensus.try_receive_data_message(sched_ctx, comp_ctx, message) {
 
            // We're allowed to receive this message
 
            let mut message = inbox_main[port_index].take().unwrap();
 
            debug_assert_eq!(target_port, message.data_header.target_port);
 

	
 
            // Note: we can still run into an unrecoverable error when actually
 
            // receiving this message
 
            match default_handle_received_data_message(
 
                target_port, target_port_instruction,
 
                &mut message, inbox_main, inbox_backup,
 
                comp_ctx, sched_ctx, control,
 
            ) {
 
                Ok(()) => return GetResult::Received(message),
 
                Err(location_and_message) => return GetResult::Error(location_and_message)
 
            }
 
        } else {
 
            // We're not allowed to receive this message. This means that the
 
            // receiver is attempting to receive something out of order with
 
            // respect to the sender.
 
            return GetResult::Error((target_port_instruction, String::from(
 
                "Cannot get from this port, as this causes a deadlock. This happens if you `get` in a different order as another component `put`s"
 
            )));
 
        }
 
    } else {
 
        // We don't have a message waiting for us and the port is not blocked.
 
        // So enter the BlockedGet state
 
        exec_state.set_as_blocked_get(target_port);
 
        return GetResult::NoMessage;
 
    }
 
}
 

	
 
/// Default handling that has been received through a `get`. Will check if any
 
/// more messages are waiting, and if the corresponding port was blocked because
 
/// of full buffers (hence, will use the control layer to make sure the peer
 
/// will become unblocked).
 
pub(crate) fn default_handle_received_data_message(
 
    targeted_port: PortId, _port_instruction: PortInstruction, message: &mut DataMessage,
 
    inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup,
 
    comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx, control: &mut ControlLayer
 
) -> Result<(), (PortInstruction, String)> {
 
    let port_handle = comp_ctx.get_port_handle(targeted_port);
 
    let port_index = comp_ctx.get_port_index(port_handle);
 
    debug_assert!(inbox_main[port_index].is_none()); // because we've just received from it
 

	
 
    // If we received any ports, add them to the port tracking and inbox struct.
 
    // Then notify the peers that they can continue sending to this port, but
 
    // now at a new address.
 
    for received_port in &mut message.ports {
 
        // Transfer messages to main/backup inbox
 
        let _new_inbox_index = inbox_main.len();
 
        if !received_port.messages.is_empty() {
 
            inbox_main.push(Some(received_port.messages.remove(0)));
 
        }
 
        inbox_backup.extend(received_port.messages.drain(..));
 

	
 
        // Create a new port locally
 
        let mut new_port_state = received_port.state;
 
        new_port_state.set(PortStateFlag::Received);
 
        let new_port_handle = comp_ctx.add_port(
 
            received_port.peer_comp, received_port.peer_port,
 
            received_port.kind, new_port_state
 
        );
 
        debug_assert_eq!(_new_inbox_index, comp_ctx.get_port_index(new_port_handle));
 
        comp_ctx.change_port_peer(sched_ctx, new_port_handle, Some(received_port.peer_comp));
 
        let new_port = comp_ctx.get_port(new_port_handle);
 

	
 
        // Replace all references to the port in the received message
 
        for message_location in received_port.locations.iter().copied() {
 
            let value = match message_location {
 
                ValueId::Heap(heap_pos, heap_index) => &mut message.content.regions[heap_pos as usize][heap_index as usize],
 
                ValueId::Stack(stack_index) => &mut message.content.values[stack_index as usize],
 
            };
 

	
 
            match value {
 
                Value::Input(_) => {
 
                    debug_assert_eq!(new_port.kind, PortKind::Getter);
 
                    *value = Value::Input(port_id_to_eval(new_port.self_id));
 
                },
 
                Value::Output(_) => {
 
                    debug_assert_eq!(new_port.kind, PortKind::Putter);
 
                    *value = Value::Output(port_id_to_eval(new_port.self_id));
 
                },
 
                _ => unreachable!(),
 
            }
 
        }
 

	
 
        // Let the peer know that the port can now be used
 
        let peer_handle = comp_ctx.get_peer_handle(new_port.peer_comp_id);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 

	
 
        peer_info.handle.send_message_logged(sched_ctx, Message::Control(ControlMessage{
 
            id: ControlId::new_invalid(),
 
            sender_comp_id: comp_ctx.id,
 
            target_port_id: Some(new_port.peer_port_id),
 
            content: ControlMessageContent::PortPeerChangedUnblock(new_port.self_id, comp_ctx.id)
 
        }), true);
 
    }
 

	
 
    // Modify last-known location where port instruction was retrieved
 
    let port_info = comp_ctx.get_port(port_handle);
 
    debug_assert_ne!(port_info.last_instruction, PortInstruction::None); // set by caller
 
    debug_assert!(port_info.state.is_open()); // checked by caller
 

	
 
    // Check if there are any more messages in the backup buffer
 
    for message_index in 0..inbox_backup.len() {
 
        let message = &inbox_backup[message_index];
 
        if message.data_header.target_port == targeted_port {
 
            // One more message, place it in the slot
 
            let message = inbox_backup.remove(message_index);
 
            debug_assert!(comp_ctx.get_port(port_handle).state.is_blocked()); // since we're removing another message from the backup
 
            inbox_main[port_index] = Some(message);
 

	
 
            return Ok(());
 
        }
 
    }
 

	
 
    // Did not have any more messages, so if we were blocked, then we need to
 
    // unblock the port now (and inform the peer of this unblocking)
 
    if port_info.state.is_set(PortStateFlag::BlockedDueToFullBuffers) {
 
        let port_info = comp_ctx.get_port_mut(port_handle);
 
        port_info.state.clear(PortStateFlag::BlockedDueToFullBuffers);
 

	
 
        let (peer_handle, message) = control.cancel_port_blocking(comp_ctx, port_handle);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 
        peer_info.handle.send_message_logged(sched_ctx, Message::Control(message), true);
 
    }
 

	
 
    return Ok(());
 
}
 

	
 
/// Handles control messages in the default way. Note that this function may
 
/// take a lot of actions in the name of the caller: pending messages may be
 
/// sent, ports may become blocked/unblocked, etc. So the execution
 
/// (`CompExecState`), control (`ControlLayer`) and consensus (`Consensus`)
 
/// state may all change.
 
pub(crate) fn default_handle_control_message(
 
    exec_state: &mut CompExecState, control: &mut ControlLayer, consensus: &mut Consensus,
 
    message: ControlMessage, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx,
 
    inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup
 
) -> Result<(), (PortInstruction, String)> {
 
    match message.content {
 
        ControlMessageContent::Ack => {
 
            default_handle_ack(exec_state, control, message.id, sched_ctx, comp_ctx, consensus, inbox_main, inbox_backup);
 
        },
 
        ControlMessageContent::BlockPort => {
 
            // One of our messages was accepted, but the port should be
 
            // blocked.
 
            let port_to_block = message.target_port_id.unwrap();
 
            let port_handle = comp_ctx.get_port_handle(port_to_block);
 
            let port_info = comp_ctx.get_port_mut(port_handle);
 
            debug_assert_eq!(port_info.kind, PortKind::Putter);
 
            port_info.state.set(PortStateFlag::BlockedDueToFullBuffers);
 
        },
 
        ControlMessageContent::ClosePort(content) => {
 
            // Request to close the port. We immediately comply and remove
 
            // the component handle as well
 
            let port_to_close = message.target_port_id.unwrap();
 
            let port_handle = comp_ctx.get_port_handle(port_to_close);
 

	
 
            // We're closing the port, so we will always update the peer of the
 
            // port (in case of error messages)
 
            let port_info = comp_ctx.get_port_mut(port_handle);
 
            port_info.peer_comp_id = message.sender_comp_id;
 
            port_info.close_at_sync_end = true; // might be redundant (we might set it closed now)
 

	
 
            let peer_comp_id = port_info.peer_comp_id;
 
            let peer_handle = comp_ctx.get_peer_handle(peer_comp_id);
 

	
 
            // One exception to sending an `Ack` is if we just closed the
 
            // port ourselves, meaning that the `ClosePort` messages got
 
            // sent to one another.
 
            if let Some(control_id) = control.has_close_port_entry(port_handle, comp_ctx) {
 
                // The two components (sender and this component) are closing
 
                // the channel at the same time. So we don't care about the
 
                // content of the `ClosePort` message.
 
                default_handle_ack(exec_state, control, control_id, sched_ctx, comp_ctx, consensus, inbox_main, inbox_backup);
 
            } else {
 
                // Respond to the message
 
                let port_info = comp_ctx.get_port(port_handle);
 
                let last_instruction = port_info.last_instruction;
 
                let port_has_had_message = port_info.received_message_for_sync;
 
                default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx);
 
                comp_ctx.change_port_peer(sched_ctx, port_handle, None);
 

	
 
                // Handle any possible error conditions (which boil down to: the
 
                // port has been used, but the peer has died). If not in sync
 
                // mode then we close the port immediately.
 

	
 
                // Note that `port_was_used` does not mean that any messages
 
                // were actually received. It might also mean that e.g. the
 
                // component attempted a `get`, but there were no messages, so
 
                // now it is in the `BlockedGet` state.
 
                let port_was_used = last_instruction != PortInstruction::None;
 

	
 
                if exec_state.mode.is_in_sync_block() {
 
                    let closed_during_sync_round = content.closed_in_sync_round && port_was_used;
 
                    let closed_before_sync_round = !content.closed_in_sync_round && !port_has_had_message;
 

	
 
                    if closed_during_sync_round || closed_before_sync_round {
 
                        return Err((
 
                            last_instruction,
 
                            format!("Peer component (id:{}) shut down, so communication cannot (have) succeed(ed)", peer_comp_id.0)
 
                        ));
 
                    }
 
                } else {
 
                    let port_info = comp_ctx.get_port_mut(port_handle);
 
                    port_info.state.set(PortStateFlag::Closed);
 
                }
 
            }
 
        },
 
        ControlMessageContent::UnblockPort => {
 
            // We were previously blocked (or already closed)
 
            let port_to_unblock = message.target_port_id.unwrap();
 
            let port_handle = comp_ctx.get_port_handle(port_to_unblock);
 
            let port_info = comp_ctx.get_port_mut(port_handle);
 

	
 
            debug_assert_eq!(port_info.kind, PortKind::Putter);
 
            debug_assert!(port_info.state.is_set(PortStateFlag::BlockedDueToFullBuffers));
 

	
 
            port_info.state.clear(PortStateFlag::BlockedDueToFullBuffers);
 
            default_handle_recently_unblocked_port(exec_state, consensus, port_handle, sched_ctx, comp_ctx);
 
        },
 
        ControlMessageContent::PortPeerChangedBlock => {
 
            // The peer of our port has just changed. So we are asked to
 
            // temporarily block the port (while our original recipient is
 
            // potentially rerouting some of the in-flight messages) and
 
            // Ack. Then we wait for the `unblock` call.
 
            let port_to_change = message.target_port_id.unwrap();
 
            let port_handle = comp_ctx.get_port_handle(port_to_change);
 

	
 
            let port_info = comp_ctx.get_port_mut(port_handle);
 
            let peer_comp_id = port_info.peer_comp_id;
 
            port_info.state.set(PortStateFlag::BlockedDueToPeerChange);
 
            let peer_handle = comp_ctx.get_peer_handle(peer_comp_id);
 

	
 
            default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx);
 
        },
 
        ControlMessageContent::PortPeerChangedUnblock(new_port_id, new_comp_id) => {
 
            let port_to_change = message.target_port_id.unwrap();
 
            let port_handle = comp_ctx.get_port_handle(port_to_change);
 
            let port_info = comp_ctx.get_port(port_handle);
 
            debug_assert!(port_info.state.is_set(PortStateFlag::BlockedDueToPeerChange));
 

	
 
            let port_info = comp_ctx.get_port_mut(port_handle);
 
            port_info.peer_port_id = new_port_id;
 

	
 
            port_info.state.clear(PortStateFlag::BlockedDueToPeerChange);
 
            comp_ctx.change_port_peer(sched_ctx, port_handle, Some(new_comp_id));
 
            default_handle_recently_unblocked_port(exec_state, consensus, port_handle, sched_ctx, comp_ctx);
 
        }
 
    }
 

	
 
    return Ok(());
 
}
 

	
 
/// Handles a component entering the synchronous block. Will ensure that the
 
/// `Consensus` and the `ComponentCtx` are initialized properly.
 
pub(crate) fn default_handle_sync_start(
 
    exec_state: &mut CompExecState, inbox_main: &mut InboxMainRef,
 
    sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus
 
) {
 
    sched_ctx.info("Component starting sync mode");
 

	
 
    // If any messages are present for this sync round, set the appropriate flag
 
    // and notify the consensus handler of the present messages
 
    consensus.notify_sync_start(comp_ctx);
 
    for (port_index, message) in inbox_main.iter().enumerate() {
 
        if let Some(message) = message {
 
            consensus.handle_incoming_data_message(comp_ctx, message);
 
            let port_info = comp_ctx.get_port_by_index_mut(port_index);
 
            port_info.received_message_for_sync = true;
 
        }
 
    }
 

	
 
    // Modify execution state
 
    debug_assert_eq!(exec_state.mode, CompMode::NonSync);
 
    exec_state.mode = CompMode::Sync;
 
}
 

	
 
/// Handles a component that has reached the end of the sync block. This does
 
/// not necessarily mean that the component will go into the `NonSync` mode, as
 
/// it might have to wait for the leader to finish the round for everyone (see
 
/// `default_handle_sync_decision`)
 
pub(crate) fn default_handle_sync_end(
 
    exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx,
 
    consensus: &mut Consensus
 
) {
 
    sched_ctx.info("Component ending sync mode (but possibly waiting for a solution)");
 
    debug_assert_eq!(exec_state.mode, CompMode::Sync);
 
    let decision = consensus.notify_sync_end_success(sched_ctx, comp_ctx);
 
    exec_state.mode = CompMode::SyncEnd;
 
    default_handle_sync_decision(sched_ctx, exec_state, comp_ctx, decision, consensus);
 
}
 

	
 
/// Handles a component initiating the exiting procedure, and closing all of its
 
/// ports. Should only be called once per component (which is ensured by
 
/// checking and modifying the mode in the execution state).
 
#[must_use]
 
pub(crate) fn default_handle_start_exit(
 
    exec_state: &mut CompExecState, control: &mut ControlLayer,
 
    sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus
 
) -> CompScheduling {
 
    debug_assert_eq!(exec_state.mode, CompMode::StartExit);
 
    sched_ctx.info(&format!("Component starting exit (reason: {:?})", exec_state.exit_reason));
 
    exec_state.mode = CompMode::BusyExit;
 
    let exit_inside_sync = exec_state.exit_reason.is_in_sync();
 

	
 
    // If exiting while inside sync mode, report to the leader of the current
 
    // round that we've failed.
 
    if exit_inside_sync {
 
        let decision = consensus.notify_sync_end_failure(sched_ctx, comp_ctx);
 
        default_handle_sync_decision(sched_ctx, exec_state, comp_ctx, decision, consensus);
 
    }
 

	
 
    // Iterating over ports by index to work around borrowing rules
 
    for port_index in 0..comp_ctx.num_ports() {
 
        let port = comp_ctx.get_port_by_index_mut(port_index);
 
        if port.state.is_closed() || port.close_at_sync_end {
 
            // Already closed, or in the process of being closed
 
            continue;
 
        }
 

	
 
        // Mark as closed
 
        let port_id = port.self_id;
 
        port.state.set(PortStateFlag::Closed);
 

	
 
        // Notify peer of closing
 
        let port_handle = comp_ctx.get_port_handle(port_id);
 
        let (peer, message) = control.initiate_port_closing(port_handle, exit_inside_sync, comp_ctx);
 
        let peer_info = comp_ctx.get_peer(peer);
 
        peer_info.handle.send_message_logged(sched_ctx, Message::Control(message), true);
 
    }
 

	
 
    return CompScheduling::Immediate; // to check if we can shut down immediately
 
}
 

	
 
/// Handles a component waiting until all peers are notified that it is quitting
 
/// (i.e. after calling `default_handle_start_exit`).
 
#[must_use]
 
pub(crate) fn default_handle_busy_exit(
 
    exec_state: &mut CompExecState, control: &ControlLayer,
 
    sched_ctx: &SchedulerCtx
 
) -> CompScheduling {
 
    debug_assert_eq!(exec_state.mode, CompMode::BusyExit);
 
    if control.has_acks_remaining() {
 
        sched_ctx.info("Component busy exiting, still has `Ack`s remaining");
 
        return CompScheduling::Sleep;
 
    } else {
 
        sched_ctx.info("Component busy exiting, now shutting down");
 
        exec_state.mode = CompMode::Exit;
 
        return CompScheduling::Exit;
 
    }
 
}
 

	
 
/// Handles a potential synchronous round decision. If there was a decision then
 
/// the `Some(success)` value indicates whether the round succeeded or not.
 
/// Might also end up changing the `ExecState`.
 
///
 
/// Might be called in two cases:
 
/// 1. The component is in regular execution mode, at the end of a sync round,
 
///     and is waiting for a solution to the round.
 
/// 2. The component has encountered an error during a sync round and is
 
///     exiting, hence is waiting for a "Failure" message from the leader.
 
pub(crate) fn default_handle_sync_decision(
 
    sched_ctx: &SchedulerCtx, exec_state: &mut CompExecState, comp_ctx: &mut CompCtx,
 
    decision: SyncRoundDecision, consensus: &mut Consensus
 
) -> Option<bool> {
 
    let success = match decision {
 
        SyncRoundDecision::None => return None,
 
        SyncRoundDecision::Solution => true,
 
        SyncRoundDecision::Failure => false,
 
    };
 

	
 
    debug_assert!(
 
        exec_state.mode == CompMode::SyncEnd || (
 
            exec_state.mode.is_busy_exiting() && exec_state.exit_reason.is_error()
 
        ) || (
 
            exec_state.mode.is_in_sync_block() && decision == SyncRoundDecision::Failure
 
        )
 
    );
 

	
 
    sched_ctx.info(&format!("Handling decision {:?} (in mode: {:?})", decision, exec_state.mode));
 
    consensus.notify_sync_decision(decision);
 
    if success {
 
        // We cannot get a success message if the component has encountered an
 
        // error.
 
        for port_index in 0..comp_ctx.num_ports() {
 
            let port_info = comp_ctx.get_port_by_index_mut(port_index);
 
            if port_info.close_at_sync_end {
 
                port_info.state.set(PortStateFlag::Closed);
 
            }
 
            port_info.state.clear(PortStateFlag::Received);
 
        }
 
        debug_assert_eq!(exec_state.mode, CompMode::SyncEnd);
 
        exec_state.mode = CompMode::NonSync;
 
        return Some(true);
 
    } else {
 
        // We may get failure both in all possible cases. But we should only
 
        // modify the execution state if we're not already in exit mode
 
        if !exec_state.mode.is_busy_exiting() {
 
            sched_ctx.error("failed synchronous round, initiating exit");
 
            exec_state.set_as_start_exit(ExitReason::ErrorNonSync);
 
        }
 
        return Some(false);
 
    }
 
}
 

	
 
/// Performs the default action of printing the provided error, and then putting
 
/// the component in the state where it will shut down. Only to be used for
 
/// builtin components: their error message construction is simpler (and more
 
/// common) as they don't have any source code.
 
pub(crate) fn default_handle_error_for_builtin(
 
    exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx,
 
    location_and_message: (PortInstruction, String)
 
) {
 
    let (_location, message) = location_and_message;
 
    sched_ctx.error(&message);
 

	
 
    let exit_reason = if exec_state.mode.is_in_sync_block() {
 
        ExitReason::ErrorInSync
 
    } else {
 
        ExitReason::ErrorNonSync
 
    };
 

	
 
    exec_state.set_as_start_exit(exit_reason);
 
}
 

	
 
#[inline]
 
pub(crate) fn default_handle_exit(_exec_state: &CompExecState) -> CompScheduling {
 
    debug_assert_eq!(_exec_state.mode, CompMode::Exit);
 
    return CompScheduling::Exit;
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Internal messaging/state utilities
 
// -----------------------------------------------------------------------------
 

	
 
/// Sends a message without any transmitted ports. Does not check if sending
 
/// is actually valid.
 
fn send_message_without_ports(
 
    sending_port_handle: LocalPortHandle, value: ValueGroup,
 
    comp_ctx: &CompCtx, sched_ctx: &SchedulerCtx, consensus: &mut Consensus,
 
) {
 
    let port_info = comp_ctx.get_port(sending_port_handle);
 
    debug_assert!(port_info.state.can_send());
 
    let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 
    let peer_info = comp_ctx.get_peer(peer_handle);
 

	
 
    let annotated_message = consensus.annotate_data_message(comp_ctx, port_info, value);
 
    peer_info.handle.send_message_logged(sched_ctx, Message::Data(annotated_message), true);
 
}
 

	
 
/// Prepares sending a message that contains ports. Only once a particular
 
/// protocol has completed (where we notify all the peers that the ports will
 
/// be transferred) will we actually send the message to the recipient.
 
fn prepare_send_message_with_ports(
 
    sending_port_id: PortId, sending_port_instruction: PortInstruction, value: ValueGroup,
 
    exec_state: &mut CompExecState, comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx,
 
    control: &mut ControlLayer
 
) -> Result<(), (PortInstruction, String)> {
 
    debug_assert_eq!(exec_state.mode, CompMode::Sync); // busy in sync, trying to send
 

	
 
    let sending_port_handle = comp_ctx.get_port_handle(sending_port_id);
 
    let sending_port_info = comp_ctx.get_port_mut(sending_port_handle);
 
    sending_port_info.last_instruction = sending_port_instruction;
 

	
 
    let mut transmit_ports = Vec::new();
 
    find_ports_in_value_group(&value, &mut transmit_ports);
 
    debug_assert!(!transmit_ports.is_empty()); // requisite for calling this function
 

	
 
    // Set up the final Ack that triggers us to send our final message
 
    let unblock_put_control_id = control.add_unblock_put_with_ports_entry();
 
    for (_, port_id) in &transmit_ports {
 
        let transmit_port_handle = comp_ctx.get_port_handle(*port_id);
 
        let transmit_port_info = comp_ctx.get_port_mut(transmit_port_handle);
 
        let peer_comp_id = transmit_port_info.peer_comp_id;
 
        let peer_port_id = transmit_port_info.peer_port_id;
 

	
 
        // Note: we checked earlier that we are currently in sync mode. Now we
 
        // will check if we've already used the port we're about to transmit.
 
        if !transmit_port_info.last_instruction.is_none() {
 
            return Err((
 
                sending_port_instruction,
 
                String::from("Cannot transmit one of the ports in this message, as it is used in this sync round")
 
            ));
 
        }
 

	
 
        if transmit_port_info.state.is_set(PortStateFlag::Transmitted) {
 
            return Err((
 
                sending_port_instruction,
 
                String::from("Cannot transmit one of the ports in this message, as that port is already transmitted")
 
            ));
 
        }
 

	
 
        // Set the flag for transmission
 
        transmit_port_info.state.set(PortStateFlag::Transmitted);
 

	
 
        // Block the peer of the port
 
        let message = control.create_port_transfer_message(unblock_put_control_id, comp_ctx.id, peer_port_id);
 
        println!("DEBUG: Port transfer message\nControl ID: {:?}\nMessage: {:?}", unblock_put_control_id, message);
 
        let peer_handle = comp_ctx.get_peer_handle(peer_comp_id);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 

	
 
        peer_info.handle.send_message_logged(sched_ctx, message, true);
 
    }
 

	
 
    // We've set up the protocol, once all the PPC's are blocked we are supposed
 
    // to transfer the message to the recipient. So store it temporarily
 
    exec_state.mode = CompMode::BlockedPutPorts;
 
    exec_state.mode_port = sending_port_id;
 
    exec_state.mode_value = value;
 

	
 
    return Ok(());
 
}
 

	
 
/// Performs the transmission of a data message that contains ports. These were
 
/// all stored in the component's execution state by the
 
/// `prepare_send_message_with_ports` function.
 
fn perform_send_message_with_ports(
 
    exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, consensus: &mut Consensus,
 
    inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup
 
) {
 
    debug_assert_eq!(exec_state.mode, CompMode::BlockedPutPorts);
 

	
 
    // Find all ports again
 
    let mut transmit_ports = Vec::new();
 
    find_ports_in_value_group(&exec_state.mode_value, &mut transmit_ports);
 

	
 
    let port_handle = comp_ctx.get_port_handle(exec_state.mode_port);
 
    let port_info = comp_ctx.get_port(port_handle);
 
    let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 

	
 
    // Annotate the data message
 
    let message_value = exec_state.mode_value.take();
 
    let mut annotated_message = consensus.annotate_data_message(comp_ctx, port_info, message_value);
 

	
 
    // And further enhance the message by adding data about the ports that are
 
    // being transferred
 
    for (port_locations, transmit_port_id) in transmit_ports {
 
        let transmit_port_handle = comp_ctx.get_port_handle(transmit_port_id);
 
        let transmit_port_info = comp_ctx.get_port(transmit_port_handle);
 

	
 
        let transmit_messages = take_port_messages(comp_ctx, transmit_port_id, inbox_main, inbox_backup);
 

	
 
        let mut transmit_port_state = transmit_port_info.state;
 
        debug_assert!(transmit_port_state.is_set(PortStateFlag::Transmitted));
 
        transmit_port_state.clear(PortStateFlag::Transmitted);
 

	
 
        annotated_message.ports.push(TransmittedPort{
 
            locations: port_locations,
 
            messages: transmit_messages,
 
            peer_comp: transmit_port_info.peer_comp_id,
 
            peer_port: transmit_port_info.peer_port_id,
 
            kind: transmit_port_info.kind,
 
            state: transmit_port_state
 
        })
 
    }
 

	
 
    // And finally, send the message to the peer
 
    let peer_info = comp_ctx.get_peer(peer_handle);
 
    peer_info.handle.send_message_logged(sched_ctx, Message::Data(annotated_message), true);
 
}
 

	
 
/// Handles an `Ack` for the control layer.
 
fn default_handle_ack(
 
    exec_state: &mut CompExecState, control: &mut ControlLayer, control_id: ControlId,
 
    sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus,
 
    inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup
 
) {
 
    // Since an `Ack` may cause another one, handle them in a loop
 
    let mut to_ack = control_id;
 

	
 
    loop {
 
        let (action, new_to_ack) = control.handle_ack(to_ack, sched_ctx, comp_ctx);
 
        match action {
 
            AckAction::SendMessage(target_comp, message) => {
 
                // FIX @NoDirectHandle
 
                let mut handle = sched_ctx.runtime.get_component_public(target_comp);
 
                handle.send_message_logged(sched_ctx, Message::Control(message), true);
 
                let _should_remove = handle.decrement_users();
 
                debug_assert!(_should_remove.is_none());
 
            },
 
            AckAction::ScheduleComponent(to_schedule) => {
 
                // FIX @NoDirectHandle
 
                let mut handle = sched_ctx.runtime.get_component_public(to_schedule);
 

	
 
                // Note that the component is intentionally not
 
                // sleeping, so we just wake it up
 
                debug_assert!(!handle.sleeping.load(std::sync::atomic::Ordering::Acquire));
 
                let key = unsafe { to_schedule.upgrade() };
 
                sched_ctx.runtime.enqueue_work(key);
 
                let _should_remove = handle.decrement_users();
 
                debug_assert!(_should_remove.is_none());
 
            },
 
            AckAction::UnblockPutWithPorts => {
 
                // Send the message (containing ports) to the recipient
 
                println!("DEBUG: Unblocking put with ports");
 
                perform_send_message_with_ports(exec_state, sched_ctx, comp_ctx, consensus, inbox_main, inbox_backup);
 
                exec_state.mode = CompMode::Sync;
 
                exec_state.mode_port = PortId::new_invalid();
 
            },
 
            AckAction::None => {}
 
        }
 

	
 
        match new_to_ack {
 
            Some(new_to_ack) => to_ack = new_to_ack,
 
            None => break,
 
        }
 
    }
 
}
 

	
 
/// Little helper for sending the most common kind of `Ack`
 
fn default_send_ack(
 
    causer_of_ack_id: ControlId, peer_handle: LocalPeerHandle,
 
    sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx
 
) {
 
    let peer_info = comp_ctx.get_peer(peer_handle);
 
    peer_info.handle.send_message_logged(sched_ctx, Message::Control(ControlMessage{
 
        id: causer_of_ack_id,
 
        sender_comp_id: comp_ctx.id,
 
        target_port_id: None,
 
        content: ControlMessageContent::Ack
 
    }), true);
 
}
 

	
 
/// Handles the unblocking of a putter port. In case there is a pending message
 
/// on that port then it will be sent.
 
fn default_handle_recently_unblocked_port(
 
    exec_state: &mut CompExecState, consensus: &mut Consensus,
 
    port_handle: LocalPortHandle, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx,
 
) {
 
    let port_info = comp_ctx.get_port_mut(port_handle);
 
    let port_id = port_info.self_id;
 
    debug_assert!(!port_info.state.is_blocked()); // should have been done by the caller
 

	
 
    if exec_state.is_blocked_on_put(port_id) {
 
        // Return to the regular execution mode
 
        exec_state.mode = CompMode::Sync;
 
        exec_state.mode_port = PortId::new_invalid();
 

	
 
        // Annotate the message that we're going to send
 
        let port_info = comp_ctx.get_port(port_handle); // for immutable access
 
        debug_assert_eq!(port_info.kind, PortKind::Putter);
 
        let to_send = exec_state.mode_value.take();
 
        let to_send = consensus.annotate_data_message(comp_ctx, port_info, to_send);
 

	
 
        // Retrieve peer to send the message
 
        let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 
        peer_info.handle.send_message_logged(sched_ctx, Message::Data(to_send), true);
 

	
 
        exec_state.mode = CompMode::Sync; // because we're blocked on a `put`, we must've started in the sync state.
 
        exec_state.mode_port = PortId::new_invalid();
 
    }
 
}
 

	
 
#[inline]
 
pub(crate) fn port_id_from_eval(port_id: EvalPortId) -> PortId {
 
    return PortId(port_id.id);
 
}
 

	
 
#[inline]
 
pub(crate) fn port_id_to_eval(port_id: PortId) -> EvalPortId {
 
    return EvalPortId{ id: port_id.0 };
 
}
 

	
 
// TODO: Optimize double vec
 
type EncounteredPorts = Vec<(Vec<ValueId>, PortId)>;
 

	
 
/// Recursively goes through the value group, attempting to find ports.
 
/// Duplicates will only be added once.
 
pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut EncounteredPorts) {
 
    // Helper to check a value for a port and recurse if needed.
 
    fn find_port_in_value(group: &ValueGroup, value: &Value, value_location: ValueId, ports: &mut EncounteredPorts) {
 
        match value {
 
            Value::Input(port_id) | Value::Output(port_id) => {
 
                // This is an actual port
 
                let cur_port = PortId(port_id.id);
 
                for prev_port in ports.iter_mut() {
 
                    if prev_port.1 == cur_port {
 
                        // Already added
 
                        prev_port.0.push(value_location);
 
                        return;
 
                    }
 
                }
 

	
 
                ports.push((vec![value_location], cur_port));
 
            },
 
            Value::Array(heap_pos) |
 
            Value::Message(heap_pos) |
 
            Value::String(heap_pos) |
 
            Value::Struct(heap_pos) |
 
            Value::Union(_, heap_pos) => {
 
                // Reference to some dynamic thing which might contain ports,
 
                // so recurse
 
                let heap_region = &group.regions[*heap_pos as usize];
 
                for (value_index, embedded_value) in heap_region.iter().enumerate() {
 
                    let value_location = ValueId::Heap(*heap_pos, value_index as u32);
 
                    find_port_in_value(group, embedded_value, value_location, ports);
 
                }
 
            },
 
            _ => {}, // values we don't care about
 
        }
 
    }
 

	
 
    // Clear the ports, then scan all the available values
 
    ports.clear();
 
    for (value_index, value) in value_group.values.iter().enumerate() {
 
        find_port_in_value(value_group, value, ValueId::Stack(value_index as u32), ports);
 
    }
 
}
 

	
 
/// Goes through the inbox of a component and takes out all the messages that
 
/// are targeted at a specific port
 
pub(crate) fn take_port_messages(
 
    comp_ctx: &CompCtx, port_id: PortId,
 
    inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup
 
) -> Vec<DataMessage> {
 
    let mut messages = Vec::new();
 
    let port_handle = comp_ctx.get_port_handle(port_id);
 
    let port_index = comp_ctx.get_port_index(port_handle);
 

	
 
    if let Some(message) = inbox_main[port_index].take() {
 
        messages.push(message);
 
    }
 

	
 
    let mut message_index = 0;
 
    while message_index < inbox_backup.len() {
 
        let message = &inbox_backup[message_index];
 
        if message.data_header.target_port == port_id {
 
            let message = inbox_backup.remove(message_index);
 
            messages.push(message);
 
        } else {
 
            message_index += 1;
 
        }
 
    }
 

	
 
    return messages;
 
}
 
\ No newline at end of file
src/runtime2/component/component_pdl.rs
Show inline comments
 
use crate::random::Random;
 
use crate::protocol::*;
 
use crate::protocol::ast::ProcedureDefinitionId;
 
use crate::protocol::eval::{
 
    PortId as EvalPortId, Prompt,
 
    ValueGroup, Value,
 
    EvalContinuation, EvalResult, EvalError
 
};
 

	
 
use crate::runtime2::runtime::CompId;
 
use crate::runtime2::scheduler::SchedulerCtx;
 
use crate::runtime2::communication::*;
 

	
 
use super::component::{
 
    self,
 
    InboxMain, InboxBackup, GetResult,
 
    CompExecState, Component, CompScheduling, CompError, CompMode, ExitReason,
 
    port_id_from_eval, port_id_to_eval
 
};
 
use super::component_context::*;
 
use super::control_layer::*;
 
use super::consensus::Consensus;
 

	
 
pub enum ExecStmt {
 
    CreatedChannel((Value, Value)),
 
    PerformedPut,
 
    PerformedGet(ValueGroup),
 
    PerformedSelectWait(u32),
 
    None,
 
}
 

	
 
impl ExecStmt {
 
    fn take(&mut self) -> ExecStmt {
 
        let mut value = ExecStmt::None;
 
        std::mem::swap(self, &mut value);
 
        return value;
 
    }
 

	
 
    fn is_none(&self) -> bool {
 
        match self {
 
            ExecStmt::None => return true,
 
            _ => return false,
 
        }
 
    }
 
}
 

	
 
pub struct ExecCtx {
 
    stmt: ExecStmt,
 
}
 

	
 
impl RunContext for ExecCtx {
 
    fn performed_put(&mut self, _port: EvalPortId) -> bool {
 
        match self.stmt.take() {
 
            ExecStmt::None => return false,
 
            ExecStmt::PerformedPut => return true,
 
            _ => unreachable!(),
 
        }
 
    }
 

	
 
    fn performed_get(&mut self, _port: EvalPortId) -> Option<ValueGroup> {
 
        match self.stmt.take() {
 
            ExecStmt::None => return None,
 
            ExecStmt::PerformedGet(value) => return Some(value),
 
            _ => unreachable!(),
 
        }
 
    }
 

	
 
    fn fires(&mut self, _port: EvalPortId) -> Option<Value> {
 
        todo!("remove fires")
 
    }
 

	
 
    fn performed_fork(&mut self) -> Option<bool> {
 
        todo!("remove fork")
 
    }
 

	
 
    fn created_channel(&mut self) -> Option<(Value, Value)> {
 
        match self.stmt.take() {
 
            ExecStmt::None => return None,
 
            ExecStmt::CreatedChannel(ports) => return Some(ports),
 
            _ => unreachable!(),
 
        }
 
    }
 

	
 
    fn performed_select_wait(&mut self) -> Option<u32> {
 
        match self.stmt.take() {
 
            ExecStmt::None => return None,
 
            ExecStmt::PerformedSelectWait(selected_case) => Some(selected_case),
 
            _v => unreachable!(),
 
        }
 
    }
 
}
 

	
 
struct SelectCase {
 
    involved_ports: Vec<LocalPortHandle>,
 
}
 

	
 
// TODO: @Optimize, flatten cases into single array, have index-pointers to next case
 
struct SelectState {
 
    cases: Vec<SelectCase>,
 
    next_case: u32,
 
    num_cases: u32,
 
    random: Random,
 
    candidates_workspace: Vec<usize>,
 
}
 

	
 
enum SelectDecision {
 
    None,
 
    Case(u32), // contains case index, should be passed along to PDL code
 
}
 

	
 
impl SelectState {
 
    fn new() -> Self {
 
        return Self{
 
            cases: Vec::new(),
 
            next_case: 0,
 
            num_cases: 0,
 
            random: Random::new(),
 
            candidates_workspace: Vec::new(),
 
        }
 
    }
 

	
 
    fn handle_select_start(&mut self, num_cases: u32) {
 
        self.cases.clear();
 
        self.next_case = 0;
 
        self.num_cases = num_cases;
 
    }
 

	
 
    /// Register a port as belonging to a particular case. As for correctness of
 
    /// PDL code one cannot register the same port twice, this function might
 
    /// return an error
 
    fn register_select_case_port(&mut self, comp_ctx: &CompCtx, case_index: u32, _port_index: u32, port_id: PortId) -> Result<(), PortId> {
 
        // Retrieve case and port handle
 
        self.ensure_at_case(case_index);
 
        let cur_case = &mut self.cases[case_index as usize];
 
        let port_handle = comp_ctx.get_port_handle(port_id);
 
        debug_assert_eq!(cur_case.involved_ports.len(), _port_index as usize);
 

	
 
        // Make sure port wasn't added before, we disallow having the same port
 
        // in the same select guard twice.
 
        if cur_case.involved_ports.contains(&port_handle) {
 
            return Err(port_id);
 
        }
 

	
 
        cur_case.involved_ports.push(port_handle);
 
        return Ok(());
 
    }
 

	
 
    /// Notification that all ports have been registered and we should now wait
 
    /// until the appropriate messages have come in.
 
    fn handle_select_waiting_point(&mut self, inbox: &InboxMain, comp_ctx: &CompCtx) -> SelectDecision {
 
        if self.num_cases != self.next_case {
 
            // This happens when there are >=1 select cases written at the end
 
            // of the select block.
 
            self.ensure_at_case(self.num_cases - 1);
 
        }
 

	
 
        return self.has_decision(inbox, comp_ctx);
 
    }
 

	
 
    fn handle_updated_inbox(&mut self, inbox: &InboxMain, comp_ctx: &CompCtx) -> SelectDecision {
 
        return self.has_decision(inbox, comp_ctx);
 
    }
 

	
 
    /// Internal helper, pushes empty cases inbetween last case and provided new
 
    /// case index.
 
    fn ensure_at_case(&mut self, new_case_index: u32) {
 
        // Push an empty case for all intermediate cases that were not
 
        // registered with a port.
 
        debug_assert!(new_case_index >= self.next_case && new_case_index < self.num_cases);
 
        for _ in self.next_case..new_case_index + 1 {
 
            self.cases.push(SelectCase{ involved_ports: Vec::new() });
 
        }
 
        self.next_case = new_case_index + 1;
 
    }
 

	
 
    /// Checks if a decision can be reached
 
    fn has_decision(&mut self, inbox: &InboxMain, comp_ctx: &CompCtx) -> SelectDecision {
 
        self.candidates_workspace.clear();
 
        if self.cases.is_empty() {
 
            // If there are no cases then we can immediately reach a "bogus
 
            // decision".
 
            return SelectDecision::Case(0);
 
        }
 

	
 
        // Need to check for valid case
 
        'case_loop: for (case_index, case) in self.cases.iter().enumerate() {
 
            for port_handle in case.involved_ports.iter().copied() {
 
                let port_index = comp_ctx.get_port_index(port_handle);
 
                if inbox[port_index].is_none() {
 
                    // Condition not satisfied
 
                    continue 'case_loop;
 
                }
 
            }
 

	
 
            // If here then the case guard is satisfied
 
            self.candidates_workspace.push(case_index);
 
        }
 

	
 
        if self.candidates_workspace.is_empty() {
 
            return SelectDecision::None;
 
        } else {
 
            let candidate_index = self.random.get_u64() as usize % self.candidates_workspace.len();
 
            return SelectDecision::Case(self.candidates_workspace[candidate_index] as u32);
 
        }
 
    }
 
}
 

	
 
pub(crate) struct CompPDL {
 
    pub exec_state: CompExecState,
 
    select_state: SelectState,
 
    pub prompt: Prompt,
 
    pub control: ControlLayer,
 
    pub consensus: Consensus,
 
    pub sync_counter: u32,
 
    pub exec_ctx: ExecCtx,
 
    // TODO: Temporary field, simulates future plans of having one storage place
 
    //  reserved per port.
 
    // Should be same length as the number of ports. Corresponding indices imply
 
    // message is intended for that port.
 
    pub inbox_main: InboxMain,
 
    pub inbox_backup: InboxBackup,
 
}
 

	
 
impl Component for CompPDL {
 
    fn on_creation(&mut self, _id: CompId, _sched_ctx: &SchedulerCtx) {
 
        // Intentionally empty
 
    }
 

	
 
    fn on_shutdown(&mut self, _sched_ctx: &SchedulerCtx) {
 
        // Intentionally empty
 
    }
 

	
 
    fn adopt_message(&mut self, comp_ctx: &mut CompCtx, message: DataMessage) {
 
        let port_handle = comp_ctx.get_port_handle(message.data_header.target_port);
 
        let port_index = comp_ctx.get_port_index(port_handle);
 
        if self.inbox_main[port_index].is_none() {
 
            self.inbox_main[port_index] = Some(message);
 
        } else {
 
            self.inbox_backup.push(message);
 
        }
 
    }
 

	
 
    fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, mut message: Message) {
 
        // sched_ctx.log(&format!("handling message: {:?}", message));
 
        sched_ctx.debug(&format!("handling message: {:?}", message));
 
        if let Some(new_target) = self.control.should_reroute(&mut message) {
 
            let mut target = sched_ctx.runtime.get_component_public(new_target); // TODO: @NoDirectHandle
 
            sched_ctx.debug(&format!("rerouting to: {:?}", new_target));
 
            target.send_message_logged(sched_ctx, message, false); // not waking up: we schedule once we've received all PortPeerChanged Acks
 
            let _should_remove = target.decrement_users();
 
            debug_assert!(_should_remove.is_none());
 
            return;
 
        }
 

	
 
        sched_ctx.debug("handling message myself");
 
        match message {
 
            Message::Data(message) => {
 
                self.handle_incoming_data_message(sched_ctx, comp_ctx, message);
 
            },
 
            Message::Control(message) => {
 
                if let Err(location_and_message) = component::default_handle_control_message(
 
                    &mut self.exec_state, &mut self.control, &mut self.consensus,
 
                    message, sched_ctx, comp_ctx, &mut self.inbox_main, &mut self.inbox_backup
 
                ) {
 
                    self.handle_generic_component_error(sched_ctx, location_and_message);
 
                }
 
            },
 
            Message::Sync(message) => {
 
                self.handle_incoming_sync_message(sched_ctx, comp_ctx, message);
 
            },
 
            Message::Poll => {
 
                unreachable!(); // because we never register at the polling thread
 
            }
 
        }
 
    }
 

	
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling {
 
        use EvalContinuation as EC;
 

	
 
        sched_ctx.info(&format!("Running component (mode: {:?})", self.exec_state.mode));
 

	
 
        // Depending on the mode don't do anything at all, take some special
 
        // actions, or fall through and run the PDL code.
 
        match self.exec_state.mode {
 
            CompMode::NonSync | CompMode::Sync => {
 
                // continue and run PDL code
 
            },
 
            CompMode::SyncEnd | CompMode::BlockedGet | CompMode::BlockedPut |
 
            CompMode::BlockedSelect | CompMode::BlockedPutPorts => {
 
                return CompScheduling::Sleep;
 
            }
 
            CompMode::StartExit => return component::default_handle_start_exit(
 
                &mut self.exec_state, &mut self.control, sched_ctx, comp_ctx, &mut self.consensus
 
            ),
 
            CompMode::BusyExit => return component::default_handle_busy_exit(
 
                &mut self.exec_state, &self.control, sched_ctx
 
            ),
 
            CompMode::Exit => return component::default_handle_exit(&self.exec_state),
 
        }
 

	
 
        let run_result = self.execute_prompt(&sched_ctx);
 
        if let Err(error) = run_result {
 
            self.handle_component_error(sched_ctx, CompError::Executor(error));
 
            return CompScheduling::Immediate;
 
        }
 

	
 
        let run_result = run_result.unwrap();
 

	
 
        match run_result {
 
            EC::Stepping => unreachable!(), // execute_prompt runs until this is no longer returned
 
            EC::BranchInconsistent | EC::NewFork | EC::BlockFires(_) => todo!("remove these"),
 
            // Results that can be returned in sync mode
 
            EC::SyncBlockEnd => {
 
                component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus);
 
                return CompScheduling::Immediate;
 
            },
 
            EC::BlockGet(expr_id, port_id) => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::Sync);
 
                debug_assert!(self.exec_ctx.stmt.is_none());
 

	
 
                let port_id = port_id_from_eval(port_id);
 
                match component::default_attempt_get(
 
                    &mut self.exec_state, port_id, PortInstruction::SourceLocation(expr_id),
 
                    &mut self.inbox_main, &mut self.inbox_backup, sched_ctx, comp_ctx,
 
                    &mut self.control, &mut self.consensus
 
                ) {
 
                    GetResult::Received(message) => {
 
                        self.exec_ctx.stmt = ExecStmt::PerformedGet(message.content);
 
                        return CompScheduling::Immediate;
 
                    },
 
                    GetResult::NoMessage => {
 
                        return CompScheduling::Sleep;
 
                    },
 
                    GetResult::Error(location_and_message) => {
 
                        self.handle_generic_component_error(sched_ctx, location_and_message);
 
                        return CompScheduling::Immediate;
 
                    }
 
                }
 
            },
 
            EC::Put(expr_id, port_id, value) => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::Sync);
 
                sched_ctx.info(&format!("Putting value {:?}", value));
 

	
 
                // Send the message
 
                let target_port_id = port_id_from_eval(port_id);
 
                let send_result = component::default_send_data_message(
 
                    &mut self.exec_state, target_port_id,
 
                    PortInstruction::SourceLocation(expr_id), value,
 
                    sched_ctx, &mut self.consensus, &mut self.control, comp_ctx
 
                );
 
                if let Err(location_and_message) = send_result {
 
                    self.handle_generic_component_error(sched_ctx, location_and_message);
 
                    return CompScheduling::Immediate;
 
                } else {
 
                    // When `run` is called again (potentially after becoming
 
                    // unblocked) we need to instruct the executor that we performed
 
                    // the `put`
 
                    let scheduling = send_result.unwrap();
 
                    self.exec_ctx.stmt = ExecStmt::PerformedPut;
 
                    return scheduling;
 
                }
 
            },
 
            EC::SelectStart(num_cases, _num_ports) => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::Sync);
 
                self.select_state.handle_select_start(num_cases);
 
                return CompScheduling::Requeue;
 
            },
 
            EC::SelectRegisterPort(expr_id, case_index, port_index, port_id) => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::Sync);
 
                let port_id = port_id_from_eval(port_id);
 
                let port_handle = comp_ctx.get_port_handle(port_id);
 

	
 
                // Note: we register the "last_instruction" here already. This
 
                // way if we get a `ClosePort` message, the condition to fail
 
                // the synchronous round is satisfied.
 
                let port_info = comp_ctx.get_port_mut(port_handle);
 
                port_info.last_instruction = PortInstruction::SourceLocation(expr_id);
 
                let port_is_closed = port_info.state.is_closed();
 

	
 
                // Register port as part of select guard
 
                if let Err(_err) = self.select_state.register_select_case_port(comp_ctx, case_index, port_index, port_id) {
 
                    // Failure occurs if a port is used twice in the same guard
 
                    let protocol = &sched_ctx.runtime.protocol;
 
                    self.handle_component_error(sched_ctx, CompError::Executor(EvalError::new_error_at_expr(
 
                        &self.prompt, &protocol.modules, &protocol.heap, expr_id,
 
                        String::from("Cannot have the one port appear in the same guard twice")
 
                    )));
 
                } else if port_is_closed {
 
                    // Port is closed
 
                    let peer_id = comp_ctx.get_port(port_handle).peer_comp_id;
 
                    let protocol = &sched_ctx.runtime.protocol;
 
                    self.handle_component_error(sched_ctx, CompError::Executor(EvalError::new_error_at_expr(
 
                        &self.prompt, &protocol.modules, &protocol.heap, expr_id,
 
                        format!("Cannot register port, as the peer component (id:{}) has shut down", peer_id.0)
 
                    )));
 
                }
 

	
 
                return CompScheduling::Immediate;
 
            },
 
            EC::SelectWait => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::Sync);
 
                let select_decision = self.select_state.handle_select_waiting_point(&self.inbox_main, comp_ctx);
 
                if let SelectDecision::Case(case_index) = select_decision {
 
                    // Reached a conclusion, so we can continue immediately
 
                    self.exec_ctx.stmt = ExecStmt::PerformedSelectWait(case_index);
 
                    self.exec_state.mode = CompMode::Sync;
 
                    return CompScheduling::Immediate;
 
                } else {
 
                    // No decision yet
 
                    self.exec_state.mode = CompMode::BlockedSelect;
 
                    return CompScheduling::Sleep;
 
                }
 
            },
 
            // Results that can be returned outside of sync mode
 
            EC::ComponentTerminated => {
 
                self.exec_state.set_as_start_exit(ExitReason::Termination);
 
                return CompScheduling::Immediate;
 
            },
 
            EC::SyncBlockStart => {
 
                component::default_handle_sync_start(
 
                    &mut self.exec_state, &mut self.inbox_main, sched_ctx, comp_ctx, &mut self.consensus
 
                );
 
                return CompScheduling::Immediate;
 
            },
 
            EC::NewComponent(definition_id, type_id, arguments) => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::NonSync);
 
                self.create_component_and_transfer_ports(
 
                    sched_ctx, comp_ctx,
 
                    definition_id, type_id, arguments
 
                );
 
                return CompScheduling::Requeue;
 
            },
 
            EC::NewChannel => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::NonSync);
 
                debug_assert!(self.exec_ctx.stmt.is_none());
 
                let channel = comp_ctx.create_channel();
 
                self.exec_ctx.stmt = ExecStmt::CreatedChannel((
 
                    Value::Output(port_id_to_eval(channel.putter_id)),
 
                    Value::Input(port_id_to_eval(channel.getter_id))
 
                ));
 
                self.inbox_main.push(None);
 
                self.inbox_main.push(None);
 
                return CompScheduling::Immediate;
 
            }
 
        }
 
    }
 
}
 

	
 
impl CompPDL {
 
    pub(crate) fn new(initial_state: Prompt, num_ports: usize) -> Self {
 
        let mut inbox_main = Vec::new();
 
        inbox_main.reserve(num_ports);
 
        for _ in 0..num_ports {
 
            inbox_main.push(None);
 
        }
 

	
 
        return Self{
 
            exec_state: CompExecState::new(),
 
            select_state: SelectState::new(),
 
            prompt: initial_state,
 
            control: ControlLayer::default(),
 
            consensus: Consensus::new(),
 
            sync_counter: 0,
 
            exec_ctx: ExecCtx{
 
                stmt: ExecStmt::None,
 
            },
 
            inbox_main,
 
            inbox_backup: Vec::new(),
 
        }
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Running component and handling changes in global component state
 
    // -------------------------------------------------------------------------
 

	
 
    fn execute_prompt(&mut self, sched_ctx: &SchedulerCtx) -> EvalResult {
 
        let mut step_result = EvalContinuation::Stepping;
 
        while let EvalContinuation::Stepping = step_result {
 
            step_result = self.prompt.step(
 
                &sched_ctx.runtime.protocol.types, &sched_ctx.runtime.protocol.heap,
 
                &sched_ctx.runtime.protocol.modules, &mut self.exec_ctx,
 
            )?;
 
        }
 

	
 
        return Ok(step_result)
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Handling messages
 
    // -------------------------------------------------------------------------
 

	
 
    /// Handles a message that came in through the public inbox. This function
 
    /// will handle putting it in the correct place, and potentially blocking
 
    /// the port in case too many messages are being received.
 
    fn handle_incoming_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: DataMessage) {
 
        use component::IncomingData;
 

	
 
        // Whatever we do, glean information from headers in message
 
        if self.exec_state.mode.is_in_sync_block() {
 
            self.consensus.handle_incoming_data_message(comp_ctx, &message);
 
        }
 

	
 
        match component::default_handle_incoming_data_message(
 
            &mut self.exec_state, &mut self.inbox_main, comp_ctx, message,
 
            sched_ctx, &mut self.control
 
        ) {
 
            IncomingData::PlacedInSlot => {
 
                if self.exec_state.mode == CompMode::BlockedSelect {
 
                    let select_decision = self.select_state.handle_updated_inbox(&self.inbox_main, comp_ctx);
 
                    if let SelectDecision::Case(case_index) = select_decision {
 
                        self.exec_ctx.stmt = ExecStmt::PerformedSelectWait(case_index);
 
                        self.exec_state.mode = CompMode::Sync;
 
                    }
 
                }
 
            },
 
            IncomingData::SlotFull(message) => {
 
                self.inbox_backup.push(message);
 
            }
 
        }
 
    }
 

	
 
    fn handle_incoming_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) {
 
        let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message);
 
        component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, comp_ctx, decision, &mut self.consensus);
 
    }
 

	
 
    /// Handles an error coming from the generic `component::handle_xxx`
 
    /// functions. Hence accepts argument as a tuple.
 
    fn handle_generic_component_error(&mut self, sched_ctx: &SchedulerCtx, location_and_message: (PortInstruction, String)) {
 
        // Retrieve location and message, display in terminal
 
        let (location, message) = location_and_message;
 
        let error = match location {
 
            PortInstruction::None => CompError::Component(message),
 
            PortInstruction::NoSource => unreachable!(), // for debugging: all in-sync errors are associated with a source location
 
            PortInstruction::SourceLocation(expression_id) => {
 
                let protocol = &sched_ctx.runtime.protocol;
 
                CompError::Executor(EvalError::new_error_at_expr(
 
                    &self.prompt, &protocol.modules, &protocol.heap,
 
                    expression_id, message
 
                ))
 
            }
 
        };
 

	
 
        self.handle_component_error(sched_ctx, error);
 
    }
 

	
 
    fn handle_component_error(&mut self, sched_ctx: &SchedulerCtx, error: CompError) {
 
        sched_ctx.error(&format!("{}", error));
 

	
 
        // Set state to handle subsequent error
 
        let exit_reason = if self.exec_state.mode.is_in_sync_block() {
 
            ExitReason::ErrorInSync
 
        } else {
 
            ExitReason::ErrorNonSync
 
        };
 

	
 
        self.exec_state.set_as_start_exit(exit_reason);
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Handling ports
 
    // -------------------------------------------------------------------------
 

	
 
    /// Creates a new component and transfers ports. Because of the stepwise
 
    /// process in which memory is allocated, ports are transferred, messages
 
    /// are exchanged, component lifecycle methods are called, etc. This
 
    /// function facilitates a lot of implicit assumptions (e.g. when the
 
    /// `Component::on_creation` method is called, the component is already
 
    /// registered at the runtime).
 
    fn create_component_and_transfer_ports(
 
        &mut self,
 
        sched_ctx: &SchedulerCtx, creator_ctx: &mut CompCtx,
 
        definition_id: ProcedureDefinitionId, type_id: TypeId, mut arguments: ValueGroup
 
    ) {
 
        struct PortPair{
 
            creator_handle: LocalPortHandle,
 
            creator_id: PortId,
 
            created_handle: LocalPortHandle,
 
            created_id: PortId,
 
        }
 
        let mut opened_port_id_pairs = Vec::new();
 
        let mut closed_port_id_pairs = Vec::new();
 

	
 
        let reservation = sched_ctx.runtime.start_create_component();
 
        let mut created_ctx = CompCtx::new(&reservation);
 

	
 
        let other_proc = &sched_ctx.runtime.protocol.heap[definition_id];
 
        let self_proc = &sched_ctx.runtime.protocol.heap[self.prompt.frames[0].definition];
 
        dbg_code!({
 
            sched_ctx.info(&format!(
 
                "DEBUG: Comp '{}' (ID {:?}) is creating comp '{}' (ID {:?})",
 
                self_proc.identifier.value.as_str(), creator_ctx.id,
 
                other_proc.identifier.value.as_str(), reservation.id()
 
            ));
 
        });
 

	
 
        // Take all the ports ID that are in the `args` (and currently belong to
 
        // the creator component) and translate them into new IDs that are
 
        // associated with the component we're about to create
 
        let mut arg_iter = ValueGroupPortIter::new(&mut arguments);
 
        while let Some(port_reference) = arg_iter.next() {
 
            // Create port entry for new component
 
            let creator_port_id = port_reference.id;
 
            let creator_port_handle = creator_ctx.get_port_handle(creator_port_id);
 
            let creator_port = creator_ctx.get_port(creator_port_handle);
 
            let created_port_handle = created_ctx.add_port(
 
                creator_port.peer_comp_id, creator_port.peer_port_id,
 
                creator_port.kind, creator_port.state
 
            );
 
            let created_port = created_ctx.get_port(created_port_handle);
 
            let created_port_id = created_port.self_id;
 

	
 
            let port_id_pair = PortPair {
 
                creator_handle: creator_port_handle,
 
                creator_id: creator_port_id,
 
                created_handle: created_port_handle,
 
                created_id: created_port_id,
 
            };
 

	
 
            if creator_port.state.is_closed() {
 
                closed_port_id_pairs.push(port_id_pair)
 
            } else {
 
                opened_port_id_pairs.push(port_id_pair);
 
            }
 

	
 
            // Modify value in arguments (bit dirty, but double vec in ValueGroup causes lifetime issues)
 
            let arg_value = if let Some(heap_pos) = port_reference.heap_pos {
 
                &mut arg_iter.group.regions[heap_pos][port_reference.index]
 
            } else {
 
                &mut arg_iter.group.values[port_reference.index]
 
            };
 
            match arg_value {
 
                Value::Input(id) => *id = port_id_to_eval(created_port_id),
 
                Value::Output(id) => *id = port_id_to_eval(created_port_id),
 
                _ => unreachable!(),
 
            }
 
        }
 

	
 
        // For each transferred port pair set their peer components to the
 
        // correct values. This will only change the values for the ports of
 
        // the new component.
 
        let mut created_component_has_remote_peers = false;
 

	
 
        for pair in opened_port_id_pairs.iter() {
 
            let creator_port_info = creator_ctx.get_port(pair.creator_handle);
 
            let created_port_info = created_ctx.get_port_mut(pair.created_handle);
 

	
 
            if created_port_info.peer_comp_id == creator_ctx.id {
 
                // Peer of the transferred port is the component that is
 
                // creating the new component.
 
                let created_peer_port_index = opened_port_id_pairs
 
                    .iter()
 
                    .position(|v| v.creator_id == creator_port_info.peer_port_id);
 
                match created_peer_port_index {
 
                    Some(created_peer_port_index) => {
 
                        // Addendum to the above comment: but that port is also
 
                        // moving to the new component
 
                        let peer_pair = &opened_port_id_pairs[created_peer_port_index];
 
                        created_port_info.peer_port_id = peer_pair.created_id;
 
                        created_port_info.peer_comp_id = reservation.id();
 
                        todo!("either add 'self peer', or remove that idea from Ctx altogether");
 
                    },
 
                    None => {
 
                        // Peer port remains with creator component.
 
                        created_port_info.peer_comp_id = creator_ctx.id;
 
                        created_ctx.change_port_peer(sched_ctx, pair.created_handle, Some(creator_ctx.id));
 
                    }
 
                }
 
            } else {
 
                // Peer is a different component. We'll deal with sending the
 
                // appropriate messages later
 
                let peer_handle = creator_ctx.get_peer_handle(created_port_info.peer_comp_id);
 
                let peer_info = creator_ctx.get_peer(peer_handle);
 
                created_ctx.change_port_peer(sched_ctx, pair.created_handle, Some(peer_info.id));
 
                created_component_has_remote_peers = true;
 
            }
 
        }
 

	
 
        // We'll now actually turn our reservation for a new component into an
 
        // actual component. Note that we initialize it as "not sleeping" as
 
        // its initial scheduling might be performed based on `Ack`s in response
 
        // to message exchanges between remote peers.
 
        let total_num_ports = opened_port_id_pairs.len() + closed_port_id_pairs.len();
 
        let component = component::create_component(&sched_ctx.runtime.protocol, definition_id, type_id, arguments, total_num_ports);
 
        let (created_key, component) = sched_ctx.runtime.finish_create_component(
 
            reservation, component, created_ctx, false,
 
        );
 
        component.component.on_creation(created_key.downgrade(), sched_ctx);
 

	
 
        // Now modify the creator's ports: remove every transferred port and
 
        // potentially remove the peer component.
 
        for pair in opened_port_id_pairs.iter() {
 
            // Remove peer if appropriate
 
            let creator_port_index = creator_ctx.get_port_index(pair.creator_handle);
 
            creator_ctx.change_port_peer(sched_ctx, pair.creator_handle, None);
 
            creator_ctx.remove_port(pair.creator_handle);
 

	
 
            // Transfer any messages
 
            if let Some(mut message) = self.inbox_main.remove(creator_port_index) {
 
                message.data_header.target_port = pair.created_id;
 
                component.component.adopt_message(&mut component.ctx, message)
 
            }
 

	
 
            let mut message_index = 0;
 
            while message_index < self.inbox_backup.len() {
 
                let message = &self.inbox_backup[message_index];
 
                if message.data_header.target_port == pair.creator_id {
 
                    // transfer message
 
                    let mut message = self.inbox_backup.remove(message_index);
 
                    message.data_header.target_port = pair.created_id;
 
                    component.component.adopt_message(&mut component.ctx, message);
 
                } else {
 
                    message_index += 1;
 
                }
 
            }
 

	
 
            let created_port_info = component.ctx.get_port(pair.created_handle);
 
            if created_port_info.peer_comp_id == creator_ctx.id {
 
                // This handles the creation of a channel between the creator
 
                // component and the newly created component. So if the creator
 
                // had a `a -> b` channel, and `b` is moved to the new
 
                // component, then `a` needs to set its peer component.
 
                let peer_port_handle = creator_ctx.get_port_handle(created_port_info.peer_port_id);
 
                let peer_port_info = creator_ctx.get_port_mut(peer_port_handle);
 
                peer_port_info.peer_comp_id = component.ctx.id;
 
                peer_port_info.peer_port_id = created_port_info.self_id;
 
                creator_ctx.change_port_peer(sched_ctx, peer_port_handle, Some(component.ctx.id));
 
            }
 
        }
 

	
 
        // Do the same for the closed ports. Note that we might still have to
 
        // transfer messages that cause the new owner of the port to fail.
 
        for pair in closed_port_id_pairs.iter() {
 
            let port_index = creator_ctx.get_port_index(pair.creator_handle);
 
            creator_ctx.remove_port(pair.creator_handle);
 
            if let Some(mut message) = self.inbox_main.remove(port_index) {
 
                message.data_header.target_port = pair.created_id;
 
                component.component.adopt_message(&mut component.ctx, message);
 
            }
 

	
 
            let mut message_index = 0;
 
            while message_index < self.inbox_backup.len() {
 
                let message = &self.inbox_backup[message_index];
 
                if message.data_header.target_port == pair.created_id {
 
                    // Transfer message
 
                    let mut message = self.inbox_backup.remove(message_index);
 
                    message.data_header.target_port = pair.created_id;
 
                    component.component.adopt_message(&mut component.ctx, message);
 
                } else {
 
                    message_index += 1;
 
                }
 
            }
 
        }
 

	
 
        // By now all ports and messages have been transferred. If there are any
 
        // peers that need to be notified about this new component, then we
 
        // initiate the protocol that will notify everyone here.
 
        if created_component_has_remote_peers {
 
            let created_ctx = &component.ctx;
 
            let schedule_entry_id = self.control.add_schedule_entry(created_ctx.id);
 
            for pair in opened_port_id_pairs.iter() {
 
                let port_info = created_ctx.get_port(pair.created_handle);
 
                if port_info.peer_comp_id != creator_ctx.id && port_info.peer_comp_id != created_ctx.id {
 
                    let message = self.control.add_reroute_entry(
 
                        creator_ctx.id, port_info.peer_port_id, port_info.peer_comp_id,
 
                        pair.creator_id, pair.created_id, created_ctx.id,
 
                        schedule_entry_id
 
                    );
 
                    let peer_handle = created_ctx.get_peer_handle(port_info.peer_comp_id);
 
                    let peer_info = created_ctx.get_peer(peer_handle);
 
                    peer_info.handle.send_message_logged(sched_ctx, message, true);
 
                }
 
            }
 
        } else {
 
            // Peer can be scheduled immediately
 
            sched_ctx.runtime.enqueue_work(created_key);
 
        }
 
    }
 
}
 

	
 
struct ValueGroupPortIter<'a> {
 
    group: &'a mut ValueGroup,
 
    heap_stack: Vec<(usize, usize)>,
 
    index: usize,
 
}
 

	
 
impl<'a> ValueGroupPortIter<'a> {
 
    fn new(group: &'a mut ValueGroup) -> Self {
 
        return Self{ group, heap_stack: Vec::new(), index: 0 }
 
    }
 
}
 

	
 
struct ValueGroupPortRef {
 
    id: PortId,
 
    heap_pos: Option<usize>, // otherwise: on stack
 
    index: usize,
 
}
 

	
 
impl<'a> Iterator for ValueGroupPortIter<'a> {
 
    type Item = ValueGroupPortRef;
 

	
 
    fn next(&mut self) -> Option<Self::Item> {
 
        // Enter loop that keeps iterating until a port is found
 
        loop {
 
            if let Some(pos) = self.heap_stack.last() {
 
                let (heap_pos, region_index) = *pos;
 
                if region_index >= self.group.regions[heap_pos].len() {
 
                    self.heap_stack.pop();
 
                    continue;
 
                }
 

	
 
                let value = &self.group.regions[heap_pos][region_index];
 
                self.heap_stack.last_mut().unwrap().1 += 1;
 

	
 
                match value {
 
                    Value::Input(id) | Value::Output(id) => {
 
                        let id = PortId(id.id);
 
                        return Some(ValueGroupPortRef{
 
                            id,
 
                            heap_pos: Some(heap_pos),
 
                            index: region_index,
 
                        });
 
                    },
 
                    _ => {},
 
                }
 

	
 
                if let Some(heap_pos) = value.get_heap_pos() {
 
                    self.heap_stack.push((heap_pos as usize, 0));
 
                }
 
            } else {
 
                if self.index >= self.group.values.len() {
 
                    return None;
 
                }
 

	
 
                let value = &mut self.group.values[self.index];
 
                self.index += 1;
 

	
 
                match value {
 
                    Value::Input(id) | Value::Output(id) => {
 
                        let id = PortId(id.id);
 
                        return Some(ValueGroupPortRef{
 
                            id,
 
                            heap_pos: None,
 
                            index: self.index - 1
 
                        });
 
                    },
 
                    _ => {},
 
                }
 

	
 
                // Not a port, check if we need to enter a heap region
 
                if let Some(heap_pos) = value.get_heap_pos() {
 
                    self.heap_stack.push((heap_pos as usize, 0));
 
                } // else: just consider the next value
 
            }
 
        }
 
    }
 
}
 
\ No newline at end of file
src/runtime2/component/control_layer.rs
Show inline comments
 
use crate::runtime2::runtime::*;
 
use crate::runtime2::communication::*;
 
use crate::runtime2::component::*;
 

	
 
use super::component_context::*;
 

	
 
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
 
pub(crate) struct ControlId(u32);
 
pub(crate) struct ControlId(pub(crate) u32);
 

	
 
impl ControlId {
 
    /// Like other invalid IDs, this one doesn't care any significance, but is
 
    /// just set at u32::MAX to hopefully bring out bugs sooner.
 
    pub(crate) fn new_invalid() -> Self {
 
        return ControlId(u32::MAX);
 
    }
 
}
 

	
 
struct ControlEntry {
 
    id: ControlId,
 
    ack_countdown: u32,
 
    content: ControlContent,
 
}
 

	
 
enum ControlContent {
 
    PeerChange(ContentPeerChange),
 
    ScheduleComponent(CompId),
 
    ClosedPort(PortId),
 
    UnblockPutWithPorts
 
}
 

	
 
struct ContentPeerChange {
 
    source_port: PortId,
 
    source_comp: CompId,
 
    old_target_port: PortId,
 
    new_target_port: PortId,
 
    new_target_comp: CompId,
 
    schedule_entry_id: ControlId,
 
}
 

	
 
struct ControlClosedPort {
 
    closed_port: PortId,
 
    exit_entry_id: Option<ControlId>,
 
}
 

	
 
pub(crate) enum AckAction {
 
    None,
 
    SendMessage(CompId, ControlMessage),
 
    ScheduleComponent(CompId),
 
    UnblockPutWithPorts,
 
}
 

	
 
/// Handling/sending control messages.
 
pub(crate) struct ControlLayer {
 
    id_counter: ControlId,
 
    entries: Vec<ControlEntry>,
 
}
 

	
 
impl ControlLayer {
 
    pub(crate) fn should_reroute(&self, message: &mut Message) -> Option<CompId> {
 
        // Safety note: rerouting should occur during the time when we're
 
        // notifying a peer of a new component. During this period that
 
        // component hasn't been executed yet, so cannot have died yet.
 
        // FIX @NoDirectHandle
 
        let target_port = message.target_port();
 
        if target_port.is_none() {
 
            return None;
 
        }
 

	
 
        let target_port = target_port.unwrap();
 
        for entry in &self.entries {
 
            if let ControlContent::PeerChange(entry) = &entry.content {
 
                if entry.old_target_port == target_port {
 
                    message.modify_target_port(entry.new_target_port);
 
                    return Some(entry.new_target_comp);
 
                }
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    /// Handles an acknowledgement. The returned action must be performed by the
 
    /// caller. The optionally returned `ControlId` must be used and passed to
 
    /// `handle_ack` again.
 
    pub(crate) fn handle_ack(&mut self, entry_id: ControlId, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> (AckAction, Option<ControlId>) {
 
        let entry_index = self.get_entry_index_by_id(entry_id).unwrap();
 
        let entry = &mut self.entries[entry_index];
 
        debug_assert!(entry.ack_countdown > 0);
 

	
 
        entry.ack_countdown -= 1;
 
        if entry.ack_countdown != 0 {
 
            return (AckAction::None, None);
 
        }
 

	
 
        let entry = self.entries.remove(entry_index);
 

	
 
        // All `Ack`s received, take action based on the kind of entry
 
        match entry.content {
 
            ControlContent::PeerChange(content) => {
 
                // If change of peer is ack'd. Then we are certain we have
 
                // rerouted all of the messages, and the sender's port can now
 
                // be unblocked again.
 
                let target_comp_id = content.source_comp;
 
                let message_to_send = ControlMessage{
 
                    id: ControlId::new_invalid(),
 
                    sender_comp_id: comp_ctx.id,
 
                    target_port_id: Some(content.source_port),
 
                    content: ControlMessageContent::PortPeerChangedUnblock(
 
                        content.new_target_port,
 
                        content.new_target_comp
 
                    )
 
                };
 
                let to_ack = content.schedule_entry_id;
 

	
 
                return (
 
                    AckAction::SendMessage(target_comp_id, message_to_send),
 
                    Some(to_ack)
 
                );
 
            },
 
            ControlContent::ScheduleComponent(to_schedule) => {
 
                // If all change-of-peers are `Ack`d, then we're ready to
 
                // schedule the component!
 
                return (AckAction::ScheduleComponent(to_schedule), None);
 
            },
 
            ControlContent::ClosedPort(closed_port) => {
 
                // If a closed port is Ack'd, then we remove the reference to
 
                // that component.
 
                let port_handle = comp_ctx.get_port_handle(closed_port);
 
                debug_assert!(comp_ctx.get_port(port_handle).state.is_closed());
 
                comp_ctx.change_port_peer(sched_ctx, port_handle, None);
 

	
 
                return (AckAction::None, None);
 
            },
 
            ControlContent::UnblockPutWithPorts => {
 
                return (AckAction::UnblockPutWithPorts, None);
 
            }
 
        }
 
    }
 

	
 
    pub(crate) fn has_acks_remaining(&self) -> bool {
 
        return !self.entries.is_empty();
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Port transfer
 
    // -------------------------------------------------------------------------
 

	
 
    /// Adds an entry that, when completely ack'd, will schedule a component.
 
    pub(crate) fn add_schedule_entry(&mut self, to_schedule_id: CompId) -> ControlId {
 
        let entry_id = self.take_id();
 
        self.entries.push(ControlEntry{
 
            id: entry_id,
 
            ack_countdown: 0, // incremented by calls to `add_reroute_entry`
 
            content: ControlContent::ScheduleComponent(to_schedule_id),
 
        });
 

	
 
        return entry_id;
 
    }
 

	
 
    /// Adds an entry that returns the similarly named Ack action
 
    pub(crate) fn add_unblock_put_with_ports_entry(&mut self) -> ControlId {
 
        let entry_id = self.take_id();
 
        self.entries.push(ControlEntry{
 
            id: entry_id,
 
            ack_countdown: 1, // incremented by calls to `add_reroute_entry`
 
            ack_countdown: 0, // incremented by calls to `add_reroute_entry`
 
            content: ControlContent::UnblockPutWithPorts,
 
        });
 

	
 
        return entry_id;
 
    }
 

	
 
    /// Adds a rerouting entry (used to ensure all messages will end up at a
 
    /// newly created component). Used when creating a new component.
 
    pub(crate) fn add_reroute_entry(
 
        &mut self, creator_comp_id: CompId,
 
        source_port_id: PortId, source_comp_id: CompId,
 
        old_target_port_id: PortId, new_target_port_id: PortId, new_comp_id: CompId,
 
        schedule_entry_id: ControlId,
 
    ) -> Message {
 
        let entry_id = self.take_id();
 
        self.entries.push(ControlEntry{
 
            id: entry_id,
 
            ack_countdown: 1,
 
            content: ControlContent::PeerChange(ContentPeerChange{
 
                source_port: source_port_id,
 
                source_comp: source_comp_id,
 
                old_target_port: old_target_port_id,
 
                new_target_port: new_target_port_id,
 
                new_target_comp: new_comp_id,
 
                schedule_entry_id,
 
            }),
 
        });
 

	
 
        // increment counter on schedule entry
 
        let entry_index = self.get_entry_index_by_id(schedule_entry_id).unwrap();
 
        self.entries[entry_index].ack_countdown += 1;
 

	
 
        return Message::Control(ControlMessage{
 
            id: entry_id,
 
            sender_comp_id: creator_comp_id,
 
            target_port_id: Some(source_port_id),
 
            content: ControlMessageContent::PortPeerChangedBlock
 
        })
 
    }
 

	
 
    /// Creates a PortPeerChanged message (and increments ack-counter on a
 
    /// pre-created control entry) that is used as a preliminary step before
 
    /// transferring a port over a channel.
 
    pub(crate) fn create_port_transfer_message(
 
        &mut self, associated_control_id: ControlId,
 
        sender_comp_id: CompId, target_port_id: PortId
 
    ) -> Message {
 
        let entry_index = self.get_entry_index_by_id(associated_control_id).unwrap();
 
        self.entries[entry_index].ack_countdown += 1;
 

	
 
        return Message::Control(ControlMessage{
 
            id: associated_control_id,
 
            sender_comp_id,
 
            target_port_id: Some(target_port_id),
 
            content: ControlMessageContent::PortPeerChangedBlock
 
        })
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Blocking, unblocking, and closing ports
 
    // -------------------------------------------------------------------------
 

	
 
    pub(crate) fn has_close_port_entry(&self, port_handle: LocalPortHandle, comp_ctx: &CompCtx) -> Option<ControlId> {
 
        let port = comp_ctx.get_port(port_handle);
 
        let port_id = port.self_id;
 
        for entry in self.entries.iter() {
 
            if let ControlContent::ClosedPort(entry_port_id) = &entry.content {
 
                if *entry_port_id == port_id {
 
                    return Some(entry.id);
 
                }
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    /// Initiates the control message procedures for closing a port. Caller must
 
    /// make sure that the port state has already been set to `Closed`.
 
    pub(crate) fn initiate_port_closing(&mut self, port_handle: LocalPortHandle, exit_inside_sync: bool, comp_ctx: &CompCtx) -> (LocalPeerHandle, ControlMessage) {
 
        let port = comp_ctx.get_port(port_handle);
 
        let peer_port_id = port.peer_port_id;
 
        debug_assert!(port.state.is_closed());
 

	
 
        // Construct the port-closing entry
 
        let entry_id = self.take_id();
 
        self.entries.push(ControlEntry{
 
            id: entry_id,
 
            ack_countdown: 1,
 
            content: ControlContent::ClosedPort(port.self_id),
 
        });
 

	
 
        // Return the messages notifying peer of the closed port
 
        let peer_handle = comp_ctx.get_peer_handle(port.peer_comp_id);
 
        return (
 
            peer_handle,
 
            ControlMessage{
 
                id: entry_id,
 
                sender_comp_id: comp_ctx.id,
 
                target_port_id: Some(peer_port_id),
 
                content: ControlMessageContent::ClosePort(ControlMessageClosePort{
 
                    closed_in_sync_round: exit_inside_sync,
 
                }),
 
            }
 
        );
 
    }
 

	
 
    /// Generates the control message used to indicate to a peer that a port
 
    /// should be blocked (expects the caller to have set the port's state to
 
    /// blocked).
 
    pub(crate) fn initiate_port_blocking(&mut self, comp_ctx: &CompCtx, port_handle: LocalPortHandle) -> (LocalPeerHandle, ControlMessage) {
 
        let port_info = comp_ctx.get_port(port_handle);
 
        debug_assert_eq!(port_info.kind, PortKind::Getter); // because we're telling the putter to block
 
        debug_assert!(port_info.state.is_set(PortStateFlag::BlockedDueToFullBuffers)); // contract with caller
 

	
 
        let peer_port_id = port_info.peer_port_id;
 
        let peer_comp_id = port_info.peer_comp_id;
 
        let peer_handle = comp_ctx.get_peer_handle(peer_comp_id);
 

	
 
        return (
 
            peer_handle,
 
            ControlMessage{
 
                id: ControlId::new_invalid(),
 
                sender_comp_id: comp_ctx.id,
 
                target_port_id: Some(peer_port_id),
 
                content: ControlMessageContent::BlockPort,
 
            }
 
        );
 
    }
 

	
 
    /// Generates a messages used to indicate to a peer that a port should be
 
    /// unblocked again.
 
    pub(crate) fn cancel_port_blocking(&mut self, comp_ctx: &CompCtx, port_handle: LocalPortHandle) -> (LocalPeerHandle, ControlMessage) {
 
        let port_info = comp_ctx.get_port(port_handle);
 
        debug_assert_eq!(port_info.kind, PortKind::Getter); // because we're initiating the unblocking
 

	
 
        let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 

	
 
        return (
 
            peer_handle,
 
            ControlMessage{
 
                id: ControlId::new_invalid(),
 
                sender_comp_id: comp_ctx.id,
 
                target_port_id: Some(port_info.peer_port_id),
 
                content: ControlMessageContent::UnblockPort,
 
            }
 
        );
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Internal utilities
 
    // -------------------------------------------------------------------------
 

	
 
    fn take_id(&mut self) -> ControlId {
 
        let id = self.id_counter;
 
        self.id_counter.0 = self.id_counter.0.wrapping_add(1);
 
        return id;
 
    }
 

	
 
    fn get_entry_index_by_id(&self, entry_id: ControlId) -> Option<usize> {
 
        for (index, entry) in self.entries.iter().enumerate() {
 
            if entry.id == entry_id {
 
                return Some(index);
 
            }
 
        }
 

	
 
        return None;
 
    }
 
}
 

	
 
impl Default for ControlLayer {
 
    fn default() -> Self {
 
        return ControlLayer{
 
            id_counter: ControlId(0),
 
            entries: Vec::new(),
 
        }
 
    }
 
}
 
\ No newline at end of file
src/runtime2/tests/transfer_ports.rs
Show inline comments
 
use super::*;
 

	
 
#[test]
 
fn test_transfer_precreated_port_without_using() {
 
fn test_transfer_precreated_port_with_owned_peer() {
 
    compile_and_create_component("
 
    primitive port_sender(out<in<u32>> tx) {
 
        channel a -> b;
 
        sync put(tx, b);
 
    }
 

	
 
    primitive port_receiver(in<in<u32>> rx) {
 
        sync auto a = get(rx);
 
    }
 

	
 
    composite constructor() {
 
        channel a -> b;
 
        new port_sender(a);
 
        new port_receiver(b);
 
    }
 
    ", "constructor", no_args());
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)