Changeset - e62f669c1e03
[Not reviewed]
0 3 0
mh - 3 years ago 2022-05-09 13:37:54
contact@maxhenger.nl
WIP on refactoring port transmission code
3 files changed with 105 insertions and 6 deletions:
0 comments (0 inline, 0 general)
src/runtime2/component/component.rs
Show inline comments
 
use std::fmt::{Display as FmtDisplay, Result as FmtResult, Formatter};
 

	
 
use crate::protocol::eval::{Prompt, EvalError, ValueGroup, Value, ValueId, PortId as EvalPortId};
 
use crate::protocol::*;
 
use crate::runtime2::*;
 
use crate::runtime2::communication::*;
 
use crate::runtime2::component::component_pdl::find_ports_in_value_group;
 

	
 
use super::{CompCtx, CompPDL, CompId};
 
use super::component_context::*;
 
use super::component_random::*;
 
use super::component_internet::*;
 
use super::control_layer::*;
 
use super::consensus::*;
 

	
 
pub enum CompScheduling {
 
    Immediate,
 
    Requeue,
 
    Sleep,
 
    Exit,
 
}
 

	
 
/// Potential error emitted by a component
 
pub enum CompError {
 
    /// Error originating from the code executor. Hence has an associated
 
    /// source location.
 
    Executor(EvalError),
 
    /// Error originating from a component, but not necessarily associated with
 
    /// a location in the source.
 
    Component(String), // TODO: Maybe a different embedded value in the future?
 
    /// Pure runtime error. Not necessarily originating from the component
 
    /// itself. Should be treated as a very severe runtime-compromising error.
 
    Runtime(RtError),
 
}
 

	
 
impl FmtDisplay for CompError {
 
    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
 
        match self {
 
            CompError::Executor(v) => v.fmt(f),
 
            CompError::Component(v) => v.fmt(f),
 
            CompError::Runtime(v) => v.fmt(f),
 
        }
 
    }
 
}
 

	
 
/// Generic representation of a component (as viewed by a scheduler).
 
pub(crate) trait Component {
 
    /// Called upon the creation of the component. Note that the scheduler
 
    /// context is officially running another component (the component that is
 
    /// creating the new component).
 
    fn on_creation(&mut self, comp_id: CompId, sched_ctx: &SchedulerCtx);
 

	
 
    /// Called when a component crashes or wishes to exit. So is not called
 
    /// right before destruction, other components may still hold a handle to
 
    /// the component and send it messages!
 
    fn on_shutdown(&mut self, sched_ctx: &SchedulerCtx);
 

	
 
    /// Called if the component is created by another component and the messages
 
    /// are being transferred between the two.
 
    fn adopt_message(&mut self, comp_ctx: &mut CompCtx, message: DataMessage);
 

	
 
    /// Called if the component receives a new message. The component is
 
    /// responsible for deciding where that messages goes.
 
    fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message);
 

	
 
    /// Called if the component's routine should be executed. The return value
 
    /// can be used to indicate when the routine should be run again.
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling;
 
}
 

	
 
/// Representation of the generic operating mode of a component. Although not
 
/// every state may be used by every kind of (builtin) component, this allows
 
/// writing standard handlers for particular events in a component's lifetime.
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub(crate) enum CompMode {
 
    NonSync, // not in sync mode
 
    Sync, // in sync mode, can interact with other components
 
    SyncEnd, // awaiting a solution, i.e. encountered the end of the sync block
 
    BlockedGet, // blocked because we need to receive a message on a particular port
 
    BlockedPut, // component is blocked because the port is blocked
 
    BlockedSelect, // waiting on message to complete the select statement
 
    BlockedPutPorts,// blocked because we're waiting to send a data message containing ports
 
    StartExit, // temporary state: if encountered then we start the shutdown process.
 
    BusyExit, // temporary state: waiting for Acks for all the closed ports, potentially waiting for sync round to finish
 
    Exit, // exiting: shutdown process started, now waiting until the reference count drops to 0
 
}
 

	
 
impl CompMode {
 
    pub(crate) fn is_in_sync_block(&self) -> bool {
 
        use CompMode::*;
 

	
 
        match self {
 
            Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect | BlockedPutPorts => true,
 
            NonSync | StartExit | BusyExit | Exit => false,
 
        }
 
    }
 

	
 
    pub(crate) fn is_busy_exiting(&self) -> bool {
 
        use CompMode::*;
 

	
 
        match self {
 
            NonSync | Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect | BlockedPutPorts => false,
 
            StartExit | BusyExit => true,
 
            Exit => false,
 
        }
 
    }
 
}
 

	
 
#[derive(Debug)]
 
pub(crate) enum ExitReason {
 
    Termination, // regular termination of component
 
    ErrorInSync,
 
    ErrorNonSync,
 
}
 

	
 
impl ExitReason {
 
    pub(crate) fn is_in_sync(&self) -> bool {
 
        use ExitReason::*;
 

	
 
        match self {
 
            Termination | ErrorNonSync => false,
 
            ErrorInSync => true,
 
        }
 
    }
 

	
 
    pub(crate) fn is_error(&self) -> bool {
 
        use ExitReason::*;
 

	
 
        match self {
 
            Termination => false,
 
            ErrorInSync | ErrorNonSync => true,
 
        }
 
    }
 
}
 

	
 
/// Component execution state: the execution mode along with some descriptive
 
/// fields. Fields are public for ergonomic reasons, use member functions when
 
/// appropriate.
 
pub(crate) struct CompExecState {
 
    pub mode: CompMode,
 
    pub mode_port: PortId, // valid if blocked on a port (put/get)
 
    pub mode_value: ValueGroup, // valid if blocked on a put
 
    pub exit_reason: ExitReason, // valid if in StartExit/BusyExit/Exit mode
 
}
 

	
 
impl CompExecState {
 
    pub(crate) fn new() -> Self {
 
        return Self{
 
            mode: CompMode::NonSync,
 
            mode_port: PortId::new_invalid(),
 
            mode_value: ValueGroup::default(),
 
            exit_reason: ExitReason::Termination,
 
        }
 
    }
 

	
 
    pub(crate) fn set_as_start_exit(&mut self, reason: ExitReason) {
 
        self.mode = CompMode::StartExit;
 
        self.exit_reason = reason;
 
    }
 

	
 
    pub(crate) fn set_as_blocked_get(&mut self, port: PortId) {
 
        self.mode = CompMode::BlockedGet;
 
        self.mode_port = port;
 
        debug_assert!(self.mode_value.values.is_empty());
 
    }
 

	
 
    pub(crate) fn is_blocked_on_get(&self, port: PortId) -> bool {
 
        return
 
            self.mode == CompMode::BlockedGet &&
 
            self.mode_port == port;
 
    }
 

	
 
    pub(crate) fn set_as_blocked_put(&mut self, port: PortId, value: ValueGroup) {
 
        self.mode = CompMode::BlockedPut;
 
        self.mode_port = port;
 
        self.mode_value = value;
 
    }
 

	
 
    pub(crate) fn is_blocked_on_put(&self, port: PortId) -> bool {
 
        return
 
            self.mode == CompMode::BlockedPut &&
 
            self.mode_port == port;
 
    }
 
}
 

	
 
// TODO: Replace when implementing port sending. Should probably be incorporated
 
//  into CompCtx (and rename CompCtx into CompComms)
 
pub(crate) type InboxMain = Vec<Option<DataMessage>>;
 
pub(crate) type InboxMainRef = [Option<DataMessage>];
 
pub(crate) type InboxBackup = Vec<DataMessage>;
 

	
 
/// Creates a new component based on its definition. Meaning that if it is a
 
/// user-defined component then we set up the PDL code state. Otherwise we
 
/// construct a custom component. This does NOT take care of port and message
 
/// management.
 
pub(crate) fn create_component(
 
    protocol: &ProtocolDescription,
 
    definition_id: ProcedureDefinitionId, type_id: TypeId,
 
    arguments: ValueGroup, num_ports: usize
 
) -> Box<dyn Component> {
 
    let definition = &protocol.heap[definition_id];
 
    debug_assert!(definition.kind == ProcedureKind::Primitive || definition.kind == ProcedureKind::Composite);
 

	
 
    if definition.source.is_builtin() {
 
        // Builtin component
 
        let component: Box<dyn Component> = match definition.source {
 
            ProcedureSource::CompRandomU32 => Box::new(ComponentRandomU32::new(arguments)),
 
            ProcedureSource::CompTcpClient => Box::new(ComponentTcpClient::new(arguments)),
 
            _ => unreachable!(),
 
        };
 

	
 
        return component;
 
    } else {
 
        // User-defined component
 
        let prompt = Prompt::new(
 
            &protocol.types, &protocol.heap,
 
            definition_id, type_id, arguments
 
        );
 
        let component = CompPDL::new(prompt, num_ports);
 
        return Box::new(component);
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Generic component messaging utilities (for sending and receiving)
 
// -----------------------------------------------------------------------------
 

	
 
/// Default handling of sending a data message. In case the port is blocked then
 
/// the `ExecState` will become blocked as well. Note that
 
/// `default_handle_control_message` will ensure that the port becomes
 
/// unblocked if so instructed by the receiving component. The returned
 
/// scheduling value must be used.
 
#[must_use]
 
pub(crate) fn default_send_data_message(
 
    exec_state: &mut CompExecState, transmitting_port_id: PortId,
 
    port_instruction: PortInstruction, value: ValueGroup,
 
    sched_ctx: &SchedulerCtx, consensus: &mut Consensus, comp_ctx: &mut CompCtx
 
) -> Result<CompScheduling, (PortInstruction, String)> {
 
    debug_assert_eq!(exec_state.mode, CompMode::Sync);
 

	
 
    let port_handle = comp_ctx.get_port_handle(transmitting_port_id);
 
    let port_info = comp_ctx.get_port_mut(port_handle);
 
    port_info.last_instruction = port_instruction;
 

	
 
    let port_info = comp_ctx.get_port(port_handle);
 
    debug_assert_eq!(port_info.kind, PortKind::Putter);
 

	
 
    if port_info.state.is_closed() {
 
        // Note: normally peer is eventually consistent, but if it has shut down
 
        // then we can be sure it is consistent (I think?)
 
        return Err((
 
            port_info.last_instruction,
 
            format!("Cannot send on this port, as the peer (id:{}) has shut down", port_info.peer_comp_id.0)
 
        ))
 
    } else if port_info.state.is_blocked() {
 
        // Port is blocked, so we cannot send
 
        exec_state.set_as_blocked_put(transmitting_port_id, value);
 

	
 
        return Ok(CompScheduling::Sleep);
 
    } else {
 
        // Check if there are any ports that are being transmitted
 
        let mut ports = Vec::new();
 
        find_ports_in_value_group(&value, &mut ports);
 
        if !ports.is_empty() {
 

	
 

	
 
            for (value_location, port_id) in ports {
 
                let transmitted_port_handle = comp_ctx.get_port_handle(port_id);
 
                let transmitted_port = comp_ctx.get_port(transmitted_port_handle);
 

	
 
                if transmitted_port.state.is_set(PortStateFlag::Transmitted) {
 
                    // Note: We could also attach where the port has been
 
                    //  transferred
 
                    return Err((
 
                        port_info.last_instruction,
 
                        String::from("Cannot send this message, as it contains a previously transmitted port")
 
                    ));
 
                }
 

	
 
                // Prepare ack for PPC
 
                // Prepare PPC message
 
            }
 
        }
 

	
 
        // Port is not blocked, so send to the peer
 
        let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 
        let annotated_message = consensus.annotate_data_message(comp_ctx, port_info, value);
 
        peer_info.handle.send_message_logged(sched_ctx, Message::Data(annotated_message), true);
 

	
 
        return Ok(CompScheduling::Immediate);
 
    }
 
}
 

	
 
pub(crate) enum IncomingData {
 
    PlacedInSlot,
 
    SlotFull(DataMessage),
 
}
 

	
 
/// Default handling of receiving a data message. In case there is no room for
 
/// the message it is returned from this function. Note that this function is
 
/// different from PDL code performing a `get` on a port; this is the case where
 
/// the message first arrives at the component.
 
// NOTE: This is supposed to be a somewhat temporary implementation. It would be
 
//  nicest if the sending component can figure out it cannot send any more data.
 
#[must_use]
 
pub(crate) fn default_handle_incoming_data_message(
 
    exec_state: &mut CompExecState, inbox_main: &mut InboxMain,
 
    comp_ctx: &mut CompCtx, incoming_message: DataMessage,
 
    sched_ctx: &SchedulerCtx, control: &mut ControlLayer
 
) -> IncomingData {
 
    let port_handle = comp_ctx.get_port_handle(incoming_message.data_header.target_port);
 
    let port_index = comp_ctx.get_port_index(port_handle);
 
    comp_ctx.get_port_mut(port_handle).received_message_for_sync = true;
 
    let port_value_slot = &mut inbox_main[port_index];
 
    let target_port_id = incoming_message.data_header.target_port;
 

	
 
    if port_value_slot.is_none() {
 
        // We can put the value in the slot
 
        *port_value_slot = Some(incoming_message);
 

	
 
        // Check if we're blocked on receiving this message.
 
        dbg_code!({
 
            // Our port cannot have been blocked itself, because we're able to
 
            // directly insert the message into its slot.
 
            assert!(!comp_ctx.get_port(port_handle).state.is_blocked());
 
        });
 

	
 
        if exec_state.is_blocked_on_get(target_port_id) {
 
            // Return to normal operation
 
            exec_state.mode = CompMode::Sync;
 
            exec_state.mode_port = PortId::new_invalid();
 
            debug_assert!(exec_state.mode_value.values.is_empty());
 
        }
 

	
 
        return IncomingData::PlacedInSlot
 
    } else {
 
        // Slot is already full, so if the port was previously opened, it will
 
        // now become closed
 
        let port_info = comp_ctx.get_port_mut(port_handle);
 
        if port_info.state.is_open() {
 
            port_info.state.set(PortStateFlag::BlockedDueToFullBuffers);
 

	
 
            let (peer_handle, message) =
 
                control.initiate_port_blocking(comp_ctx, port_handle);
 
            let peer = comp_ctx.get_peer(peer_handle);
 
            peer.handle.send_message_logged(sched_ctx, Message::Control(message), true);
 
        }
 

	
 
        return IncomingData::SlotFull(incoming_message)
 
    }
 
}
 

	
 
pub(crate) enum GetResult {
 
    Received(DataMessage),
 
    NoMessage,
 
    Error((PortInstruction, String)),
 
}
 

	
 
/// Default attempt at trying to receive from a port (i.e. through a `get`, or
 
/// the equivalent operation for a builtin component). `target_port` is the port
 
/// we're trying to receive from, and the `target_port_instruction` is the
 
/// instruction we're attempting on this port.
 
pub(crate) fn default_attempt_get(
 
    exec_state: &mut CompExecState, target_port: PortId, target_port_instruction: PortInstruction,
 
    inbox_main: &mut InboxMainRef, inbox_backup: &mut InboxBackup, sched_ctx: &SchedulerCtx,
 
    comp_ctx: &mut CompCtx, control: &mut ControlLayer, consensus: &mut Consensus
 
) -> GetResult {
 
    let port_handle = comp_ctx.get_port_handle(target_port);
 
    let port_index = comp_ctx.get_port_index(port_handle);
 

	
 
    let port_info = comp_ctx.get_port_mut(port_handle);
 
    port_info.last_instruction = target_port_instruction;
 
    if port_info.state.is_closed() {
 
        let peer_id = port_info.peer_comp_id;
 
        return GetResult::Error((
 
            target_port_instruction,
 
            format!("Cannot get from this port, as the peer component (id:{}) closed the port", peer_id.0)
 
        ));
 
    }
 

	
 
    if let Some(message) = &inbox_main[port_index] {
 
        if consensus.try_receive_data_message(sched_ctx, comp_ctx, message) {
 
            // We're allowed to receive this message
 
            let message = inbox_main[port_index].take().unwrap();
 
            debug_assert_eq!(target_port, message.data_header.target_port);
 

	
 
            // Note: we can still run into an unrecoverable error when actually
 
            // receiving this message
 
            match default_handle_received_data_message(
 
                target_port, target_port_instruction, inbox_main, inbox_backup,
 
                comp_ctx, sched_ctx, control,
 
            ) {
 
                Ok(()) => return GetResult::Received(message),
 
                Err(location_and_message) => return GetResult::Error(location_and_message)
 
            }
 
        } else {
 
            // We're not allowed to receive this message. This means that the
 
            // receiver is attempting to receive something out of order with
 
            // respect to the sender.
 
            return GetResult::Error((target_port_instruction, String::from(
 
                "Cannot get from this port, as this causes a deadlock. This happens if you `get` in a different order as another component `put`s"
 
            )));
 
        }
 
    } else {
 
        // We don't have a message waiting for us and the port is not blocked.
 
        // So enter the BlockedGet state
 
        exec_state.set_as_blocked_get(target_port);
 
        return GetResult::NoMessage;
 
    }
 
}
 

	
 
/// Default handling that has been received through a `get`. Will check if any
 
/// more messages are waiting, and if the corresponding port was blocked because
 
/// of full buffers (hence, will use the control layer to make sure the peer
 
/// will become unblocked).
 
pub(crate) fn default_handle_received_data_message(
 
    targeted_port: PortId, port_instruction: PortInstruction,
 
    inbox_main: &mut InboxMainRef, inbox_backup: &mut InboxBackup,
 
    comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx, control: &mut ControlLayer
 
) -> Result<(), (PortInstruction, String)> {
 
    let port_handle = comp_ctx.get_port_handle(targeted_port);
 
    let port_index = comp_ctx.get_port_index(port_handle);
 
    let slot = &mut inbox_main[port_index];
 
    debug_assert!(slot.is_none()); // because we've just received from it
 

	
 
    // Modify last-known location where port instruction was retrieved
 
    let port_info = comp_ctx.get_port(port_handle);
 
    debug_assert_ne!(port_info.last_instruction, PortInstruction::None); // set by caller
 
    debug_assert!(port_info.state.is_open()); // checked by caller
 

	
 
    // Check if there are any more messages in the backup buffer
 
    for message_index in 0..inbox_backup.len() {
 
        let message = &inbox_backup[message_index];
 
        if message.data_header.target_port == targeted_port {
 
            // One more message, place it in the slot
 
            let message = inbox_backup.remove(message_index);
 
            debug_assert!(comp_ctx.get_port(port_handle).state.is_blocked()); // since we're removing another message from the backup
 
            *slot = Some(message);
 

	
 
            return Ok(());
 
        }
 
    }
 

	
 
    // Did not have any more messages, so if we were blocked, then we need to
 
    // unblock the port now (and inform the peer of this unblocking)
 
    if port_info.state.is_set(PortStateFlag::BlockedDueToFullBuffers) {
 
        let port_info = comp_ctx.get_port_mut(port_handle);
 
        port_info.state.clear(PortStateFlag::BlockedDueToFullBuffers);
 

	
 
        let (peer_handle, message) = control.cancel_port_blocking(comp_ctx, port_handle);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 
        peer_info.handle.send_message_logged(sched_ctx, Message::Control(message), true);
 
    }
 

	
 
    return Ok(());
 
}
 

	
 
/// Handles control messages in the default way. Note that this function may
 
/// take a lot of actions in the name of the caller: pending messages may be
 
/// sent, ports may become blocked/unblocked, etc. So the execution
 
/// (`CompExecState`), control (`ControlLayer`) and consensus (`Consensus`)
 
/// state may all change.
 
pub(crate) fn default_handle_control_message(
 
    exec_state: &mut CompExecState, control: &mut ControlLayer, consensus: &mut Consensus,
 
    message: ControlMessage, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx
 
) -> Result<(), (PortInstruction, String)> {
 
    match message.content {
 
        ControlMessageContent::Ack => {
 
            default_handle_ack(control, message.id, sched_ctx, comp_ctx);
 
        },
 
        ControlMessageContent::BlockPort => {
 
            // One of our messages was accepted, but the port should be
 
            // blocked.
 
            let port_to_block = message.target_port_id.unwrap();
 
            let port_handle = comp_ctx.get_port_handle(port_to_block);
 
            let port_info = comp_ctx.get_port_mut(port_handle);
 
            debug_assert_eq!(port_info.kind, PortKind::Putter);
 
            port_info.state.set(PortStateFlag::BlockedDueToFullBuffers);
 
        },
 
        ControlMessageContent::ClosePort(content) => {
 
            // Request to close the port. We immediately comply and remove
 
            // the component handle as well
 
            let port_to_close = message.target_port_id.unwrap();
 
            let port_handle = comp_ctx.get_port_handle(port_to_close);
 

	
 
            // We're closing the port, so we will always update the peer of the
 
            // port (in case of error messages)
 
            let port_info = comp_ctx.get_port_mut(port_handle);
 
            port_info.peer_comp_id = message.sender_comp_id;
 
            port_info.close_at_sync_end = true; // might be redundant (we might set it closed now)
 

	
 
            let peer_comp_id = port_info.peer_comp_id;
 
            let peer_handle = comp_ctx.get_peer_handle(peer_comp_id);
 

	
 
            // One exception to sending an `Ack` is if we just closed the
 
            // port ourselves, meaning that the `ClosePort` messages got
 
            // sent to one another.
 
            if let Some(control_id) = control.has_close_port_entry(port_handle, comp_ctx) {
 
                // The two components (sender and this component) are closing
 
                // the channel at the same time. So we don't care about the
 
                // content of the `ClosePort` message.
 
                default_handle_ack(control, control_id, sched_ctx, comp_ctx);
 
            } else {
 
                // Respond to the message
 
                let port_info = comp_ctx.get_port(port_handle);
 
                let last_instruction = port_info.last_instruction;
 
                let port_has_had_message = port_info.received_message_for_sync;
 
                default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx);
 
                comp_ctx.change_port_peer(sched_ctx, port_handle, None);
 

	
 
                // Handle any possible error conditions (which boil down to: the
 
                // port has been used, but the peer has died). If not in sync
 
                // mode then we close the port immediately.
 

	
 
                // Note that `port_was_used` does not mean that any messages
 
                // were actually received. It might also mean that e.g. the
 
                // component attempted a `get`, but there were no messages, so
 
                // now it is in the `BlockedGet` state.
 
                let port_was_used = last_instruction != PortInstruction::None;
 

	
 
                if exec_state.mode.is_in_sync_block() {
 
                    let closed_during_sync_round = content.closed_in_sync_round && port_was_used;
 
                    let closed_before_sync_round = !content.closed_in_sync_round && !port_has_had_message;
 

	
 
                    if closed_during_sync_round || closed_before_sync_round {
 
                        return Err((
 
                            last_instruction,
 
                            format!("Peer component (id:{}) shut down, so communication cannot (have) succeed(ed)", peer_comp_id.0)
 
                        ));
 
                    }
 
                } else {
 
                    let port_info = comp_ctx.get_port_mut(port_handle);
 
                    port_info.state.set(PortStateFlag::Closed);
 
                }
 
            }
 
        },
 
        ControlMessageContent::UnblockPort => {
 
            // We were previously blocked (or already closed)
 
            let port_to_unblock = message.target_port_id.unwrap();
 
            let port_handle = comp_ctx.get_port_handle(port_to_unblock);
 
            let port_info = comp_ctx.get_port_mut(port_handle);
 

	
 
            debug_assert_eq!(port_info.kind, PortKind::Putter);
 
            debug_assert!(port_info.state.is_set(PortStateFlag::BlockedDueToFullBuffers));
 

	
 
            port_info.state.clear(PortStateFlag::BlockedDueToFullBuffers);
 
            default_handle_recently_unblocked_port(exec_state, consensus, port_handle, sched_ctx, comp_ctx);
 
        },
 
        ControlMessageContent::PortPeerChangedBlock => {
 
            // The peer of our port has just changed. So we are asked to
 
            // temporarily block the port (while our original recipient is
 
            // potentially rerouting some of the in-flight messages) and
 
            // Ack. Then we wait for the `unblock` call.
 
            let port_to_change = message.target_port_id.unwrap();
 
            let port_handle = comp_ctx.get_port_handle(port_to_change);
 

	
 
            let port_info = comp_ctx.get_port_mut(port_handle);
 
            let peer_comp_id = port_info.peer_comp_id;
 
            port_info.state.set(PortStateFlag::BlockedDueToPeerChange);
 
            let peer_handle = comp_ctx.get_peer_handle(peer_comp_id);
 

	
 
            default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx);
 
        },
 
        ControlMessageContent::PortPeerChangedUnblock(new_port_id, new_comp_id) => {
 
            let port_to_change = message.target_port_id.unwrap();
 
            let port_handle = comp_ctx.get_port_handle(port_to_change);
 
            let port_info = comp_ctx.get_port(port_handle);
 
            debug_assert!(port_info.state.is_set(PortStateFlag::BlockedDueToPeerChange));
 
            let old_peer_id = port_info.peer_comp_id;
 

	
 
            let port_info = comp_ctx.get_port_mut(port_handle);
 
            port_info.peer_port_id = new_port_id;
 

	
 
            port_info.state.clear(PortStateFlag::BlockedDueToPeerChange);
 
            comp_ctx.change_port_peer(sched_ctx, port_handle, Some(new_comp_id));
 
            default_handle_recently_unblocked_port(exec_state, consensus, port_handle, sched_ctx, comp_ctx);
 
        }
 
    }
 

	
 
    return Ok(());
 
}
 

	
 
/// Handles a component entering the synchronous block. Will ensure that the
 
/// `Consensus` and the `ComponentCtx` are initialized properly.
 
pub(crate) fn default_handle_sync_start(
 
    exec_state: &mut CompExecState, inbox_main: &mut InboxMainRef,
 
    sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus
 
) {
 
    sched_ctx.info("Component starting sync mode");
 

	
 
    // If any messages are present for this sync round, set the appropriate flag
 
    // and notify the consensus handler of the present messages
 
    consensus.notify_sync_start(comp_ctx);
 
    for (port_index, message) in inbox_main.iter().enumerate() {
 
        if let Some(message) = message {
 
            consensus.handle_incoming_data_message(comp_ctx, message);
 
            let port_info = comp_ctx.get_port_by_index_mut(port_index);
 
            port_info.received_message_for_sync = true;
 
        }
 
    }
 

	
 
    // Modify execution state
 
    debug_assert_eq!(exec_state.mode, CompMode::NonSync);
 
    exec_state.mode = CompMode::Sync;
 
}
 

	
 
/// Handles a component that has reached the end of the sync block. This does
 
/// not necessarily mean that the component will go into the `NonSync` mode, as
 
/// it might have to wait for the leader to finish the round for everyone (see
 
/// `default_handle_sync_decision`)
 
pub(crate) fn default_handle_sync_end(
 
    exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx,
 
    consensus: &mut Consensus
 
) {
 
    sched_ctx.info("Component ending sync mode (but possibly waiting for a solution)");
 
    debug_assert_eq!(exec_state.mode, CompMode::Sync);
 
    let decision = consensus.notify_sync_end_success(sched_ctx, comp_ctx);
 
    exec_state.mode = CompMode::SyncEnd;
 
    default_handle_sync_decision(sched_ctx, exec_state, comp_ctx, decision, consensus);
 
}
 

	
 
/// Handles a component initiating the exiting procedure, and closing all of its
 
/// ports. Should only be called once per component (which is ensured by
 
/// checking and modifying the mode in the execution state).
 
#[must_use]
 
pub(crate) fn default_handle_start_exit(
 
    exec_state: &mut CompExecState, control: &mut ControlLayer,
 
    sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus
 
) -> CompScheduling {
 
    debug_assert_eq!(exec_state.mode, CompMode::StartExit);
 
    sched_ctx.info(&format!("Component starting exit (reason: {:?})", exec_state.exit_reason));
 
    exec_state.mode = CompMode::BusyExit;
 
    let exit_inside_sync = exec_state.exit_reason.is_in_sync();
 

	
 
    // If exiting while inside sync mode, report to the leader of the current
 
    // round that we've failed.
 
    if exit_inside_sync {
 
        let decision = consensus.notify_sync_end_failure(sched_ctx, comp_ctx);
 
        default_handle_sync_decision(sched_ctx, exec_state, comp_ctx, decision, consensus);
 
    }
 

	
 
    // Iterating over ports by index to work around borrowing rules
 
    for port_index in 0..comp_ctx.num_ports() {
 
        let port = comp_ctx.get_port_by_index_mut(port_index);
 
        if port.state.is_closed() || port.close_at_sync_end {
 
            // Already closed, or in the process of being closed
 
            continue;
 
        }
 

	
 
        // Mark as closed
 
        let port_id = port.self_id;
 
        port.state.set(PortStateFlag::Closed);
 

	
 
        // Notify peer of closing
 
        let port_handle = comp_ctx.get_port_handle(port_id);
 
        let (peer, message) = control.initiate_port_closing(port_handle, exit_inside_sync, comp_ctx);
 
        let peer_info = comp_ctx.get_peer(peer);
 
        peer_info.handle.send_message_logged(sched_ctx, Message::Control(message), true);
 
    }
 

	
 
    return CompScheduling::Immediate; // to check if we can shut down immediately
 
}
 

	
 
/// Handles a component waiting until all peers are notified that it is quitting
 
/// (i.e. after calling `default_handle_start_exit`).
 
#[must_use]
 
pub(crate) fn default_handle_busy_exit(
 
    exec_state: &mut CompExecState, control: &ControlLayer,
 
    sched_ctx: &SchedulerCtx
 
) -> CompScheduling {
 
    debug_assert_eq!(exec_state.mode, CompMode::BusyExit);
 
    if control.has_acks_remaining() {
 
        sched_ctx.info("Component busy exiting, still has `Ack`s remaining");
 
        return CompScheduling::Sleep;
 
    } else {
 
        sched_ctx.info("Component busy exiting, now shutting down");
 
        exec_state.mode = CompMode::Exit;
 
        return CompScheduling::Exit;
 
    }
 
}
 

	
 
/// Handles a potential synchronous round decision. If there was a decision then
 
/// the `Some(success)` value indicates whether the round succeeded or not.
 
/// Might also end up changing the `ExecState`.
 
///
 
/// Might be called in two cases:
 
/// 1. The component is in regular execution mode, at the end of a sync round,
 
///     and is waiting for a solution to the round.
 
/// 2. The component has encountered an error during a sync round and is
 
///     exiting, hence is waiting for a "Failure" message from the leader.
 
pub(crate) fn default_handle_sync_decision(
 
    sched_ctx: &SchedulerCtx, exec_state: &mut CompExecState, comp_ctx: &mut CompCtx,
 
    decision: SyncRoundDecision, consensus: &mut Consensus
 
) -> Option<bool> {
 
    let success = match decision {
 
        SyncRoundDecision::None => return None,
 
        SyncRoundDecision::Solution => true,
 
        SyncRoundDecision::Failure => false,
 
    };
 

	
 
    debug_assert!(
 
        exec_state.mode == CompMode::SyncEnd || (
 
            exec_state.mode.is_busy_exiting() && exec_state.exit_reason.is_error()
 
        ) || (
 
            exec_state.mode.is_in_sync_block() && decision == SyncRoundDecision::Failure
 
        )
 
    );
 

	
 
    sched_ctx.info(&format!("Handling decision {:?} (in mode: {:?})", decision, exec_state.mode));
 
    consensus.notify_sync_decision(decision);
 
    if success {
 
        // We cannot get a success message if the component has encountered an
 
        // error.
 
        for port_index in 0..comp_ctx.num_ports() {
 
            let port_info = comp_ctx.get_port_by_index_mut(port_index);
 
            if port_info.close_at_sync_end {
 
                port_info.state.set(PortStateFlag::Closed);
 
            }
 
        }
 
        debug_assert_eq!(exec_state.mode, CompMode::SyncEnd);
 
        exec_state.mode = CompMode::NonSync;
 
        return Some(true);
 
    } else {
 
        // We may get failure both in all possible cases. But we should only
 
        // modify the execution state if we're not already in exit mode
 
        if !exec_state.mode.is_busy_exiting() {
 
            sched_ctx.error("failed synchronous round, initiating exit");
 
            exec_state.set_as_start_exit(ExitReason::ErrorNonSync);
 
        }
 
        return Some(false);
 
    }
 
}
 

	
 
/// Performs the default action of printing the provided error, and then putting
 
/// the component in the state where it will shut down. Only to be used for
 
/// builtin components: their error message construction is simpler (and more
 
/// common) as they don't have any source code.
 
pub(crate) fn default_handle_error_for_builtin(
 
    exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx,
 
    location_and_message: (PortInstruction, String)
 
) {
 
    let (_location, message) = location_and_message;
 
    sched_ctx.error(&message);
 

	
 
    let exit_reason = if exec_state.mode.is_in_sync_block() {
 
        ExitReason::ErrorInSync
 
    } else {
 
        ExitReason::ErrorNonSync
 
    };
 

	
 
    exec_state.set_as_start_exit(exit_reason);
 
}
 

	
 
#[inline]
 
pub(crate) fn default_handle_exit(_exec_state: &CompExecState) -> CompScheduling {
 
    debug_assert_eq!(_exec_state.mode, CompMode::Exit);
 
    return CompScheduling::Exit;
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Internal messaging/state utilities
 
// -----------------------------------------------------------------------------
 

	
 
/// Sends a message without any transmitted ports. Does not check if sending
 
/// is actually valid.
 
fn send_message_without_ports(
 
    sending_port_handle: LocalPortHandle, value: ValueGroup,
 
    comp_ctx: &CompCtx, sched_ctx: &SchedulerCtx, consensus: &mut Consensus,
 
) {
 
    let port_info = comp_ctx.get_port(sending_port_handle);
 
    debug_assert!(port_info.state.can_send());
 
    let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 
    let peer_info = comp_ctx.get_peer(peer_handle);
 

	
 
    let annotated_message = consensus.annotate_data_message(comp_ctx, port_info, value);
 
    peer_info.handle.send_message_logged(sched_ctx, Message::Data(annotated_message), true);
 
}
 

	
 
fn send_message_with_ports(
 
    sending_port_handle: LocalPortHandle, sending_port_instruction: PortInstruction,
 
    value: ValueGroup,
 
    exec_state: &mut CompExecState, comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx,
 
    control: &mut ControlLayer, consensus: &mut Consensus
 
) -> Result<(), (PortInstruction, String)> {
 
    debug_assert_eq!(exec_state.mode, CompMode::Sync); // busy in sync, trying to send
 

	
 
    let sending_port_info = comp_ctx.get_port_mut(sending_port_handle);
 
    sending_port_info.last_instruction = sending_port_instruction;
 

	
 
    let mut transmit_ports = Vec::new();
 
    find_ports_in_value_group(&value, &mut transmit_ports);
 
    debug_assert!(!transmit_ports.is_empty()); // requisite for calling this function
 

	
 
    // Set up the final Ack that triggers us to send our final message
 
    let unblock_put_control_id = control.add_unblock_put_with_ports_entry();
 
    for (_, port_id) in &transmit_ports {
 
        let transmit_port_handle = comp_ctx.get_port_handle(*port_id);
 
        let transmit_port_info = comp_ctx.get_port_mut(transmit_port_handle);
 

	
 
        // Note: we checked earlier that we are currently in sync mode. Now we
 
        // will check if we've already used the port we're about to transmit.
 
        if !transmit_port_info.last_instruction.is_none() {
 
            return Err((
 
                sending_port_instruction,
 
                String::from("Cannot transmit one of the ports, as it is used in this sync round")
 
            ));
 
        }
 

	
 
        if transmit_port_info.state.is_set(PortStateFlag::Transmitted) {
 
            return Err((
 
                sending_port_instruction,
 
                String::from("Cannot transmit one of the ports, as that port is already transmitted")
 
            ));
 
        }
 

	
 
        transmit_port_info.state.set(PortStateFlag::Transmitted);
 

	
 
    }
 

	
 
    let port_info = comp_ctx.get_port(sending_port_handle);
 

	
 

	
 
    return Ok(());
 
}
 

	
 
/// Handles an `Ack` for the control layer.
 
fn default_handle_ack(
 
    control: &mut ControlLayer, control_id: ControlId,
 
    sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx
 
) {
 
    // Since an `Ack` may cause another one, handle them in a loop
 
    let mut to_ack = control_id;
 
    loop {
 
        let (action, new_to_ack) = control.handle_ack(to_ack, sched_ctx, comp_ctx);
 
        match action {
 
            AckAction::SendMessage(target_comp, message) => {
 
                // FIX @NoDirectHandle
 
                let mut handle = sched_ctx.runtime.get_component_public(target_comp);
 
                handle.send_message_logged(sched_ctx, Message::Control(message), true);
 
                let _should_remove = handle.decrement_users();
 
                debug_assert!(_should_remove.is_none());
 
            },
 
            AckAction::ScheduleComponent(to_schedule) => {
 
                // FIX @NoDirectHandle
 
                let mut handle = sched_ctx.runtime.get_component_public(to_schedule);
 

	
 
                // Note that the component is intentionally not
 
                // sleeping, so we just wake it up
 
                debug_assert!(!handle.sleeping.load(std::sync::atomic::Ordering::Acquire));
 
                let key = unsafe { to_schedule.upgrade() };
 
                sched_ctx.runtime.enqueue_work(key);
 
                let _should_remove = handle.decrement_users();
 
                debug_assert!(_should_remove.is_none());
 
            },
 
            AckAction::None => {}
 
        }
 

	
 
        match new_to_ack {
 
            Some(new_to_ack) => to_ack = new_to_ack,
 
            None => break,
 
        }
 
    }
 
}
 

	
 
/// Little helper for sending the most common kind of `Ack`
 
fn default_send_ack(
 
    causer_of_ack_id: ControlId, peer_handle: LocalPeerHandle,
 
    sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx
 
) {
 
    let peer_info = comp_ctx.get_peer(peer_handle);
 
    peer_info.handle.send_message_logged(sched_ctx, Message::Control(ControlMessage{
 
        id: causer_of_ack_id,
 
        sender_comp_id: comp_ctx.id,
 
        target_port_id: None,
 
        content: ControlMessageContent::Ack
 
    }), true);
 
}
 

	
 
/// Handles the unblocking of a putter port. In case there is a pending message
 
/// on that port then it will be sent.
 
fn default_handle_recently_unblocked_port(
 
    exec_state: &mut CompExecState, consensus: &mut Consensus,
 
    port_handle: LocalPortHandle, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx,
 
) {
 
    let port_info = comp_ctx.get_port_mut(port_handle);
 
    let port_id = port_info.self_id;
 
    debug_assert!(!port_info.state.is_blocked()); // should have been done by the caller
 

	
 
    if exec_state.is_blocked_on_put(port_id) {
 
        // Return to the regular execution mode
 
        exec_state.mode = CompMode::Sync;
 
        exec_state.mode_port = PortId::new_invalid();
 

	
 
        // Annotate the message that we're going to send
 
        let port_info = comp_ctx.get_port(port_handle); // for immutable access
 
        debug_assert_eq!(port_info.kind, PortKind::Putter);
 
        let to_send = exec_state.mode_value.take();
 
        let to_send = consensus.annotate_data_message(comp_ctx, port_info, to_send);
 

	
 
        // Retrieve peer to send the message
 
        let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 
        peer_info.handle.send_message_logged(sched_ctx, Message::Data(to_send), true);
 

	
 
        exec_state.mode = CompMode::Sync; // because we're blocked on a `put`, we must've started in the sync state.
 
        exec_state.mode_port = PortId::new_invalid();
 
    }
 
}
 

	
 
#[inline]
 
pub(crate) fn port_id_from_eval(port_id: EvalPortId) -> PortId {
 
    return PortId(port_id.id);
 
}
 

	
 
#[inline]
 
pub(crate) fn port_id_to_eval(port_id: PortId) -> EvalPortId {
 
    return EvalPortId{ id: port_id.0 };
 
}
 

	
 
// TODO: Optimize double vec
 
type EncounteredPorts = Vec<(Vec<ValueId>, PortId)>;
 

	
 
/// Recursively goes through the value group, attempting to find ports.
 
/// Duplicates will only be added once.
 
pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec<(ValueId, PortId)>) {
 
pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut EncounteredPorts) {
 
    // Helper to check a value for a port and recurse if needed.
 
    fn find_port_in_value(group: &ValueGroup, value: &Value, value_location: ValueId, ports: &mut Vec<(ValueId, PortId)>) {
 
    fn find_port_in_value(group: &ValueGroup, value: &Value, value_location: ValueId, ports: &mut EncounteredPorts) {
 
        match value {
 
            Value::Input(port_id) | Value::Output(port_id) => {
 
                // This is an actual port
 
                let cur_port = PortId(port_id.id);
 
                for prev_port in ports.iter() {
 
                    if *prev_port == cur_port.1 {
 
                for prev_port in ports.iter_mut() {
 
                    if prev_port.1 == cur_port {
 
                        // Already added
 
                        prev_port.0.push(value_location);
 
                        return;
 
                    }
 
                }
 

	
 
                ports.push((value_location, cur_port));
 
                ports.push((vec![value_location], cur_port));
 
            },
 
            Value::Array(heap_pos) |
 
            Value::Message(heap_pos) |
 
            Value::String(heap_pos) |
 
            Value::Struct(heap_pos) |
 
            Value::Union(_, heap_pos) => {
 
                // Reference to some dynamic thing which might contain ports,
 
                // so recurse
 
                let heap_region = &group.regions[*heap_pos as usize];
 
                for (value_index, embedded_value) in heap_region.iter().enumerate() {
 
                    let value_location = ValueId::Heap(*heap_pos, value_index as u32);
 
                    find_port_in_value(group, embedded_value, value_location, ports);
 
                }
 
            },
 
            _ => {}, // values we don't care about
 
        }
 
    }
 

	
 
    // Clear the ports, then scan all the available values
 
    ports.clear();
 
    for (value_index, value) in &value_group.values.iter().enumerate() {
 
        find_port_in_value(value_group, value, ValueId::Stack(value_index as u32), ports);
 
    }
 
}
 
\ No newline at end of file
src/runtime2/component/component_context.rs
Show inline comments
 
use std::fmt::{Debug, Formatter, Result as FmtResult};
 

	
 
use crate::runtime2::scheduler::*;
 
use crate::runtime2::runtime::*;
 
use crate::runtime2::communication::*;
 

	
 
use crate::protocol::ExpressionId;
 

	
 
/// Helper struct to remember when the last operation on the port took place.
 
#[derive(Debug, PartialEq, Copy, Clone)]
 
pub enum PortInstruction {
 
    None,
 
    NoSource,
 
    SourceLocation(ExpressionId),
 
}
 

	
 
impl PortInstruction {
 
    pub fn is_none(&self) -> bool {
 
        match self {
 
            PortInstruction::None => return true,
 
            _ => return false,
 
        }
 
    }
 
}
 

	
 
/// Directionality of a port
 
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
 
pub enum PortKind {
 
    Putter,
 
    Getter,
 
}
 

	
 
/// Bitflags for port
 
// TODO: Incorporate remaining flags from `Port` struct
 
#[repr(u32)]
 
#[derive(Debug, Copy, Clone)]
 
pub enum PortStateFlag {
 
    Closed = 0x01, // If not closed, then the port is open
 
    BlockedDueToPeerChange = 0x02, // busy changing peers, hence use of port is temporarily blocked
 
    BlockedDueToFullBuffers = 0x04,
 
    Transmitted, // Transmitted, so cannot be used anymore
 
    Transmitted = 0x08, // Transmitted, so cannot be used anymore
 
}
 

	
 
#[derive(Copy, Clone)]
 
pub struct PortState {
 
    flags: u32
 
}
 

	
 
impl PortState {
 
    pub(crate) fn new() -> PortState {
 
        return PortState{ flags: 0 }
 
    }
 

	
 
    // high-level
 

	
 
    #[inline]
 
    pub fn is_open(&self) -> bool {
 
        return !self.is_closed();
 
    }
 

	
 
    #[inline]
 
    pub fn can_send(&self) -> bool {
 
        return
 
            !self.is_set(PortStateFlag::Closed) &&
 
            !self.is_set(PortStateFlag::Transmitted);
 
    }
 

	
 
    #[inline]
 
    pub fn is_closed(&self) -> bool {
 
        return self.is_set(PortStateFlag::Closed);
 
    }
 

	
 
    #[inline]
 
    pub fn is_blocked(&self) -> bool {
 
        return
 
            self.is_set(PortStateFlag::BlockedDueToPeerChange) ||
 
            self.is_set(PortStateFlag::BlockedDueToFullBuffers);
 
    }
 

	
 
    // lower-level utils
 
    #[inline]
 
    pub fn set(&mut self, flag: PortStateFlag) {
 
        self.flags |= flag as u32;
 
    }
 

	
 
    #[inline]
 
    pub fn clear(&mut self, flag: PortStateFlag) {
 
        self.flags &= !(flag as u32);
 
    }
 

	
 
    #[inline]
 
    pub const fn is_set(&self, flag: PortStateFlag) -> bool {
 
        return (self.flags & (flag as u32)) != 0;
 
    }
 
}
 

	
 
impl Debug for PortState {
 
    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
 
        use PortStateFlag::*;
 

	
 
        let mut s = f.debug_struct("PortState");
 
        for (flag_name, flag_value) in &[
 
            ("closed", Closed),
 
            ("blocked_peer_change", BlockedDueToPeerChange),
 
            ("blocked_full_buffers", BlockedDueToFullBuffers)
 
        ] {
 
            s.field(flag_name, &self.is_set(*flag_value));
 
        }
 

	
 
        return s.finish();
 
    }
 
}
 

	
 
#[derive(Debug)]
 
pub struct Port {
 
    // Identifiers
 
    pub self_id: PortId,
 
    pub peer_comp_id: CompId, // eventually consistent
 
    pub peer_port_id: PortId, // eventually consistent
 
    // Generic operating state
 
    pub kind: PortKind,
 
    pub state: PortState,
 
    // State tracking for error detection and error handling
 
    pub last_instruction: PortInstruction, // used during sync round to detect port-closed-during-sync errors
 
    pub received_message_for_sync: bool, // used during sync round to detect port-closed-before-sync errors
 
    pub close_at_sync_end: bool, // set during sync round when receiving a port-closed-after-sync message
 
    pub(crate) associated_with_peer: bool,
 
}
 

	
 
pub struct Peer {
 
    pub id: CompId,
 
    pub num_associated_ports: u32,
 
    pub(crate) handle: CompHandle,
 
}
 

	
 
/// Port and peer management structure. Will keep a local reference counter to
 
/// the ports associate with peers, additionally manages the atomic reference
 
/// counter associated with the peers' component handles.
 
pub struct CompCtx {
 
    pub id: CompId,
 
    ports: Vec<Port>,
 
    peers: Vec<Peer>,
 
    port_id_counter: u32,
 
}
 

	
 
#[derive(Copy, Clone, PartialEq, Eq)]
 
pub struct LocalPortHandle(PortId);
 

	
 
#[derive(Copy, Clone)]
 
pub struct LocalPeerHandle(CompId);
 

	
 
impl CompCtx {
 
    /// Creates a new component context based on a reserved entry in the
 
    /// component store. This reservation is used such that we already know our
 
    /// assigned ID.
 
    pub(crate) fn new(reservation: &CompReserved) -> Self {
 
        return Self{
 
            id: reservation.id(),
 
            ports: Vec::new(),
 
            peers: Vec::new(),
 
            port_id_counter: 0,
 
        }
 
    }
 

	
 
    /// Creates a new channel that is fully owned by the component associated
 
    /// with this context.
 
    pub(crate) fn create_channel(&mut self) -> Channel {
 
        let putter_id = PortId(self.take_port_id());
 
        let getter_id = PortId(self.take_port_id());
 
        self.ports.push(Port{
 
            self_id: putter_id,
 
            peer_port_id: getter_id,
 
            kind: PortKind::Putter,
 
            state: PortState::new(),
 
            peer_comp_id: self.id,
 
            last_instruction: PortInstruction::None,
 
            close_at_sync_end: false,
 
            received_message_for_sync: false,
 
            associated_with_peer: false,
 
        });
 
        self.ports.push(Port{
 
            self_id: getter_id,
 
            peer_port_id: putter_id,
 
            kind: PortKind::Getter,
 
            state: PortState::new(),
 
            peer_comp_id: self.id,
 
            last_instruction: PortInstruction::None,
 
            close_at_sync_end: false,
 
            received_message_for_sync: false,
 
            associated_with_peer: false,
 
        });
 

	
 
        return Channel{ putter_id, getter_id };
 
    }
 

	
 
    /// Adds a new port. Make sure to call `add_peer` afterwards.
 
    pub(crate) fn add_port(&mut self, peer_comp_id: CompId, peer_port_id: PortId, kind: PortKind, state: PortState) -> LocalPortHandle {
 
        let self_id = PortId(self.take_port_id());
 
        self.ports.push(Port{
 
            self_id, peer_comp_id, peer_port_id, kind, state,
 
            last_instruction: PortInstruction::None,
 
            close_at_sync_end: false,
 
            received_message_for_sync: false,
 
            associated_with_peer: false,
 
        });
 
        return LocalPortHandle(self_id);
 
    }
 

	
 
    /// Removes a port. Make sure you called `remove_peer` first.
 
    pub(crate) fn remove_port(&mut self, port_handle: LocalPortHandle) -> Port {
 
        let port_index = self.must_get_port_index(port_handle);
 
        let port = self.ports.remove(port_index);
 
        dbg_code!(assert!(!port.associated_with_peer));
 
        return port;
 
    }
 

	
 
    /// Changes a peer
 
    pub(crate) fn change_port_peer(&mut self, sched_ctx: &SchedulerCtx, port_handle: LocalPortHandle, new_peer_comp_id: Option<CompId>) {
 
        // If port is currently associated with a peer, then remove that peer
 
        let port_index = self.get_port_index(port_handle);
 
        let port = &mut self.ports[port_index];
 
        let port_is_closed = port.state.is_closed();
 
        if port.associated_with_peer {
 
            // Remove old peer association
 
            port.associated_with_peer = false;
 
            let peer_comp_id = port.peer_comp_id;
 
            let peer_index = self.get_peer_index_by_id(peer_comp_id).unwrap();
 
            let peer = &mut self.peers[peer_index];
 

	
 
            peer.num_associated_ports -= 1;
 
            if peer.num_associated_ports == 0 {
 
                let mut peer = self.peers.remove(peer_index);
 
                if let Some(key) = peer.handle.decrement_users() {
 
                    sched_ctx.runtime.destroy_component(key);
 
                }
 
            }
 
        }
 

	
 
        // If there is a new peer, then set it as the peer associated with the
 
        // port
 
        if let Some(peer_id) = new_peer_comp_id {
 
            let port = &mut self.ports[port_index];
 
            port.peer_comp_id = peer_id;
 

	
 
            if peer_id != self.id && !port_is_closed {
 
                port.associated_with_peer = true;
 

	
 
                match self.get_peer_index_by_id(peer_id) {
 
                    Some(index) => {
 
                        let peer = &mut self.peers[index];
 
                        peer.num_associated_ports += 1;
 
                    },
 
                    None => {
 
                        let handle = sched_ctx.runtime.get_component_public(peer_id);
 
                        self.peers.push(Peer {
 
                            id: peer_id,
 
                            num_associated_ports: 1,
 
                            handle
 
                        })
 
                    }
 
                }
 
            }
 
        }
 
    }
 

	
 
    pub(crate) fn get_port_handle(&self, port_id: PortId) -> LocalPortHandle {
 
        return LocalPortHandle(port_id);
 
    }
 

	
 
    // should perhaps be revised, used in main inbox
 
    pub(crate) fn get_port_index(&self, port_handle: LocalPortHandle) -> usize {
 
        return self.must_get_port_index(port_handle);
 
    }
 

	
 
    pub(crate) fn get_peer_handle(&self, peer_id: CompId) -> LocalPeerHandle {
 
        return LocalPeerHandle(peer_id);
 
    }
 

	
 
    pub(crate) fn get_port(&self, port_handle: LocalPortHandle) -> &Port {
 
        let index = self.must_get_port_index(port_handle);
 
        return &self.ports[index];
 
    }
 

	
 
    pub(crate) fn get_port_mut(&mut self, port_handle: LocalPortHandle) -> &mut Port {
 
        let index = self.must_get_port_index(port_handle);
 
        return &mut self.ports[index];
 
    }
 

	
 
    pub(crate) fn get_port_by_index_mut(&mut self, index: usize) -> &mut Port {
 
        return &mut self.ports[index];
 
    }
 

	
 
    pub(crate) fn get_peer(&self, peer_handle: LocalPeerHandle) -> &Peer {
 
        let index = self.must_get_peer_index(peer_handle);
 
        return &self.peers[index];
 
    }
 

	
 
    pub(crate) fn get_peer_mut(&mut self, peer_handle: LocalPeerHandle) -> &mut Peer {
 
        let index = self.must_get_peer_index(peer_handle);
 
        return &mut self.peers[index];
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn iter_ports(&self) -> impl Iterator<Item=&Port> {
 
        return self.ports.iter();
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn iter_ports_mut(&mut self) -> impl Iterator<Item=&mut Port> {
 
        return self.ports.iter_mut();
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn iter_peers(&self) -> impl Iterator<Item=&Peer> {
 
        return self.peers.iter();
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn num_ports(&self) -> usize {
 
        return self.ports.len();
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Local utilities
 
    // -------------------------------------------------------------------------
 

	
 
    fn must_get_port_index(&self, handle: LocalPortHandle) -> usize {
 
        for (index, port) in self.ports.iter().enumerate() {
 
            if port.self_id == handle.0 {
 
                return index;
 
            }
 
        }
 

	
 
        unreachable!()
 
    }
 

	
 
    fn must_get_peer_index(&self, handle: LocalPeerHandle) -> usize {
 
        for (index, peer) in self.peers.iter().enumerate() {
 
            if peer.id == handle.0 {
 
                return index;
 
            }
 
        }
 

	
 
        unreachable!()
 
    }
 

	
 
    fn get_peer_index_by_id(&self, comp_id: CompId) -> Option<usize> {
 
        for (index, peer) in self.peers.iter().enumerate() {
 
            if peer.id == comp_id {
 
                return Some(index);
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    fn take_port_id(&mut self) -> u32 {
 
        let port_id = self.port_id_counter;
 
        self.port_id_counter = self.port_id_counter.wrapping_add(1);
 
        return port_id;
 
    }
 
}
 
\ No newline at end of file
src/runtime2/component/control_layer.rs
Show inline comments
 
use crate::runtime2::runtime::*;
 
use crate::runtime2::communication::*;
 
use crate::runtime2::component::*;
 

	
 
use super::component_context::*;
 

	
 
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
 
pub(crate) struct ControlId(u32);
 

	
 
impl ControlId {
 
    /// Like other invalid IDs, this one doesn't care any significance, but is
 
    /// just set at u32::MAX to hopefully bring out bugs sooner.
 
    fn new_invalid() -> Self {
 
        return ControlId(u32::MAX);
 
    }
 
}
 

	
 
struct ControlEntry {
 
    id: ControlId,
 
    ack_countdown: u32,
 
    content: ControlContent,
 
}
 

	
 
enum ControlContent {
 
    PeerChange(ContentPeerChange),
 
    ScheduleComponent(CompId),
 
    ClosedPort(PortId),
 
    UnblockPutWithPorts
 
}
 

	
 
struct ContentPeerChange {
 
    source_port: PortId,
 
    source_comp: CompId,
 
    old_target_port: PortId,
 
    new_target_port: PortId,
 
    new_target_comp: CompId,
 
    schedule_entry_id: ControlId,
 
}
 

	
 
struct ControlClosedPort {
 
    closed_port: PortId,
 
    exit_entry_id: Option<ControlId>,
 
}
 

	
 
pub(crate) enum AckAction {
 
    None,
 
    SendMessage(CompId, ControlMessage),
 
    ScheduleComponent(CompId),
 
    UnblockPutWithPorts,
 
}
 

	
 
/// Handling/sending control messages.
 
pub(crate) struct ControlLayer {
 
    id_counter: ControlId,
 
    entries: Vec<ControlEntry>,
 
}
 

	
 
impl ControlLayer {
 
    pub(crate) fn should_reroute(&self, message: &mut Message) -> Option<CompId> {
 
        // Safety note: rerouting should occur during the time when we're
 
        // notifying a peer of a new component. During this period that
 
        // component hasn't been executed yet, so cannot have died yet.
 
        // FIX @NoDirectHandle
 
        let target_port = message.target_port();
 
        if target_port.is_none() {
 
            return None;
 
        }
 

	
 
        let target_port = target_port.unwrap();
 
        for entry in &self.entries {
 
            if let ControlContent::PeerChange(entry) = &entry.content {
 
                if entry.old_target_port == target_port {
 
                    message.modify_target_port(entry.new_target_port);
 
                    return Some(entry.new_target_comp);
 
                }
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    /// Handles an acknowledgement. The returned action must be performed by the
 
    /// caller. The optionally returned `ControlId` must be used and passed to
 
    /// `handle_ack` again.
 
    pub(crate) fn handle_ack(&mut self, entry_id: ControlId, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> (AckAction, Option<ControlId>) {
 
        let entry_index = self.get_entry_index_by_id(entry_id).unwrap();
 
        let entry = &mut self.entries[entry_index];
 
        debug_assert!(entry.ack_countdown > 0);
 

	
 
        entry.ack_countdown -= 1;
 
        if entry.ack_countdown != 0 {
 
            return (AckAction::None, None);
 
        }
 

	
 
        let entry = self.entries.remove(entry_index);
 

	
 
        // All `Ack`s received, take action based on the kind of entry
 
        match entry.content {
 
            ControlContent::PeerChange(content) => {
 
                // If change of peer is ack'd. Then we are certain we have
 
                // rerouted all of the messages, and the sender's port can now
 
                // be unblocked again.
 
                let target_comp_id = content.source_comp;
 
                let message_to_send = ControlMessage{
 
                    id: ControlId::new_invalid(),
 
                    sender_comp_id: comp_ctx.id,
 
                    target_port_id: Some(content.source_port),
 
                    content: ControlMessageContent::PortPeerChangedUnblock(
 
                        content.new_target_port,
 
                        content.new_target_comp
 
                    )
 
                };
 
                let to_ack = content.schedule_entry_id;
 

	
 
                return (
 
                    AckAction::SendMessage(target_comp_id, message_to_send),
 
                    Some(to_ack)
 
                );
 
            },
 
            ControlContent::ScheduleComponent(to_schedule) => {
 
                // If all change-of-peers are `Ack`d, then we're ready to
 
                // schedule the component!
 
                return (AckAction::ScheduleComponent(to_schedule), None);
 
            },
 
            ControlContent::ClosedPort(closed_port) => {
 
                // If a closed port is Ack'd, then we remove the reference to
 
                // that component.
 
                let port_handle = comp_ctx.get_port_handle(closed_port);
 
                debug_assert!(comp_ctx.get_port(port_handle).state.is_closed());
 
                comp_ctx.change_port_peer(sched_ctx, port_handle, None);
 

	
 
                return (AckAction::None, None);
 
            },
 
            ControlContent::UnblockPutWithPorts => {
 
                return (AckAction::UnblockPutWithPorts, None);
 
            }
 
        }
 
    }
 

	
 
    pub(crate) fn has_acks_remaining(&self) -> bool {
 
        return !self.entries.is_empty();
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Port transfer (due to component creation)
 
    // -------------------------------------------------------------------------
 

	
 
    /// Adds an entry that, when completely ack'd, will schedule a component.
 
    pub(crate) fn add_schedule_entry(&mut self, to_schedule_id: CompId) -> ControlId {
 
        let entry_id = self.take_id();
 
        self.entries.push(ControlEntry{
 
            id: entry_id,
 
            ack_countdown: 0, // incremented by calls to `add_reroute_entry`
 
            content: ControlContent::ScheduleComponent(to_schedule_id),
 
        });
 

	
 
        return entry_id;
 
    }
 

	
 
    /// Adds an entry that returns the similarly named Ack action
 
    pub(crate) fn add_unblock_put_with_ports_entry(&mut self) -> ControlId {
 
        let entry_id = self.take_id();
 
        self.entries.push(ControlEntry{
 
            id: entry_id,
 
            ack_countdown: 1, // incremented by calls to `add_reroute_entry`
 
            content: ControlContent::UnblockPutWithPorts,
 
        });
 

	
 
        return entry_id;
 
    }
 

	
 
    /// Removes a schedule entry. Only used if the caller preemptively called
 
    /// `add_schedule_entry`, but ended up not calling `add_reroute_entry`,
 
    /// hence the `ack_countdown` in the scheduling entry is at 0.
 
    pub(crate) fn remove_schedule_entry(&mut self, schedule_entry_id: ControlId) {
 
        let index = self.get_entry_index_by_id(schedule_entry_id).unwrap();
 
        debug_assert_eq!(self.entries[index].ack_countdown, 0);
 
        self.entries.remove(index);
 
    }
 

	
 
    pub(crate) fn add_reroute_entry(
 
        &mut self, creator_comp_id: CompId,
 
        source_port_id: PortId, source_comp_id: CompId,
 
        old_target_port_id: PortId, new_target_port_id: PortId, new_comp_id: CompId,
 
        schedule_entry_id: ControlId,
 
    ) -> Message {
 
        let entry_id = self.take_id();
 
        self.entries.push(ControlEntry{
 
            id: entry_id,
 
            ack_countdown: 1,
 
            content: ControlContent::PeerChange(ContentPeerChange{
 
                source_port: source_port_id,
 
                source_comp: source_comp_id,
 
                old_target_port: old_target_port_id,
 
                new_target_port: new_target_port_id,
 
                new_target_comp: new_comp_id,
 
                schedule_entry_id,
 
            }),
 
        });
 

	
 
        // increment counter on schedule entry
 
        let entry_index = self.get_entry_index_by_id(schedule_entry_id).unwrap();
 
        self.entries[entry_index].ack_countdown += 1;
 

	
 
        return Message::Control(ControlMessage{
 
            id: entry_id,
 
            sender_comp_id: creator_comp_id,
 
            target_port_id: Some(source_port_id),
 
            content: ControlMessageContent::PortPeerChangedBlock
 
        })
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Blocking, unblocking, and closing ports
 
    // -------------------------------------------------------------------------
 

	
 
    pub(crate) fn has_close_port_entry(&self, port_handle: LocalPortHandle, comp_ctx: &CompCtx) -> Option<ControlId> {
 
        let port = comp_ctx.get_port(port_handle);
 
        let port_id = port.self_id;
 
        for entry in self.entries.iter() {
 
            if let ControlContent::ClosedPort(entry_port_id) = &entry.content {
 
                if *entry_port_id == port_id {
 
                    return Some(entry.id);
 
                }
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    /// Initiates the control message procedures for closing a port. Caller must
 
    /// make sure that the port state has already been set to `Closed`.
 
    pub(crate) fn initiate_port_closing(&mut self, port_handle: LocalPortHandle, exit_inside_sync: bool, comp_ctx: &CompCtx) -> (LocalPeerHandle, ControlMessage) {
 
        let port = comp_ctx.get_port(port_handle);
 
        let peer_port_id = port.peer_port_id;
 
        debug_assert!(port.state.is_closed());
 

	
 
        // Construct the port-closing entry
 
        let entry_id = self.take_id();
 
        self.entries.push(ControlEntry{
 
            id: entry_id,
 
            ack_countdown: 1,
 
            content: ControlContent::ClosedPort(port.self_id),
 
        });
 

	
 
        // Return the messages notifying peer of the closed port
 
        let peer_handle = comp_ctx.get_peer_handle(port.peer_comp_id);
 
        return (
 
            peer_handle,
 
            ControlMessage{
 
                id: entry_id,
 
                sender_comp_id: comp_ctx.id,
 
                target_port_id: Some(peer_port_id),
 
                content: ControlMessageContent::ClosePort(ControlMessageClosePort{
 
                    closed_in_sync_round: exit_inside_sync,
 
                }),
 
            }
 
        );
 
    }
 

	
 
    /// Generates the control message used to indicate to a peer that a port
 
    /// should be blocked (expects the caller to have set the port's state to
 
    /// blocked).
 
    pub(crate) fn initiate_port_blocking(&mut self, comp_ctx: &CompCtx, port_handle: LocalPortHandle) -> (LocalPeerHandle, ControlMessage) {
 
        let port_info = comp_ctx.get_port(port_handle);
 
        debug_assert_eq!(port_info.kind, PortKind::Getter); // because we're telling the putter to block
 
        debug_assert!(port_info.state.is_set(PortStateFlag::BlockedDueToFullBuffers)); // contract with caller
 

	
 
        let peer_port_id = port_info.peer_port_id;
 
        let peer_comp_id = port_info.peer_comp_id;
 
        let peer_handle = comp_ctx.get_peer_handle(peer_comp_id);
 

	
 
        return (
 
            peer_handle,
 
            ControlMessage{
 
                id: ControlId::new_invalid(),
 
                sender_comp_id: comp_ctx.id,
 
                target_port_id: Some(peer_port_id),
 
                content: ControlMessageContent::BlockPort,
 
            }
 
        );
 
    }
 

	
 
    /// Generates a messages used to indicate to a peer that a port should be
 
    /// unblocked again.
 
    pub(crate) fn cancel_port_blocking(&mut self, comp_ctx: &CompCtx, port_handle: LocalPortHandle) -> (LocalPeerHandle, ControlMessage) {
 
        let port_info = comp_ctx.get_port(port_handle);
 
        debug_assert_eq!(port_info.kind, PortKind::Getter); // because we're initiating the unblocking
 

	
 
        let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 

	
 
        return (
 
            peer_handle,
 
            ControlMessage{
 
                id: ControlId::new_invalid(),
 
                sender_comp_id: comp_ctx.id,
 
                target_port_id: Some(port_info.peer_port_id),
 
                content: ControlMessageContent::UnblockPort,
 
            }
 
        );
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Internal utilities
 
    // -------------------------------------------------------------------------
 

	
 
    fn take_id(&mut self) -> ControlId {
 
        let id = self.id_counter;
 
        self.id_counter.0 = self.id_counter.0.wrapping_add(1);
 
        return id;
 
    }
 

	
 
    fn get_entry_index_by_id(&self, entry_id: ControlId) -> Option<usize> {
 
        for (index, entry) in self.entries.iter().enumerate() {
 
            if entry.id == entry_id {
 
                return Some(index);
 
            }
 
        }
 

	
 
        return None;
 
    }
 
}
 

	
 
impl Default for ControlLayer {
 
    fn default() -> Self {
 
        return ControlLayer{
 
            id_counter: ControlId(0),
 
            entries: Vec::new(),
 
        }
 
    }
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)