Changeset - 971e6293edfb
[Not reviewed]
Cargo.toml
Show inline comments
 
@@ -11,14 +11,13 @@ edition = "2021"
 
[features]
 
default=["internet"]
 
internet=["libc"]
 

	
 
[dependencies]
 

	
 
libc = { version = "^0.2", optional = true } # raw sockets
 
mio = { version = "0.8", features = ["os-poll"] } # cross-platform IO notification queue
 
libc = { version = "^0.2", optional = true } # raw sockets, epoll access
 

	
 
# randomness
 
rand = "0.8.4"
 
rand_pcg = "0.3.1"
 

	
 
[lib]
src/protocol/eval/executor.rs
Show inline comments
 
@@ -200,14 +200,14 @@ pub enum EvalContinuation {
 
    Stepping,
 
    // Returned only in sync mode
 
    BranchInconsistent,
 
    SyncBlockEnd,
 
    NewFork,
 
    BlockFires(PortId),
 
    BlockGet(PortId),
 
    Put(PortId, ValueGroup),
 
    BlockGet(ExpressionId, PortId),
 
    Put(ExpressionId, PortId, ValueGroup),
 
    SelectStart(u32, u32), // (num_cases, num_ports_total)
 
    SelectRegisterPort(u32, u32, PortId), // (case_index, port_index_in_case, port_id)
 
    SelectWait, // wait until select can continue
 
    // Returned only in non-sync mode
 
    ComponentTerminated,
 
    SyncBlockStart,
 
@@ -627,13 +627,13 @@ impl Prompt {
 
                                        },
 
                                        None => {
 
                                            // Don't have the result yet, prepare the expression to
 
                                            // get run again after we've received a message.
 
                                            cur_frame.expr_values.push_front(value.clone());
 
                                            cur_frame.expr_stack.push_back(ExprInstruction::EvalExpr(expr_id));
 
                                            return Ok(EvalContinuation::BlockGet(port_id));
 
                                            return Ok(EvalContinuation::BlockGet(expr_id, port_id));
 
                                        }
 
                                    }
 
                                },
 
                                Method::Put => {
 
                                    let port_value = cur_frame.expr_values.pop_front().unwrap();
 
                                    let deref_port_value = self.store.maybe_read_ref(&port_value).clone();
 
@@ -654,13 +654,13 @@ impl Prompt {
 
                                    } else {
 
                                        // Prepare to execute again
 
                                        cur_frame.expr_values.push_front(msg_value);
 
                                        cur_frame.expr_values.push_front(port_value);
 
                                        cur_frame.expr_stack.push_back(ExprInstruction::EvalExpr(expr_id));
 
                                        let value_group = ValueGroup::from_store(&self.store, &[deref_msg_value]);
 
                                        return Ok(EvalContinuation::Put(port_id, value_group));
 
                                        return Ok(EvalContinuation::Put(expr_id, port_id, value_group));
 
                                    }
 
                                },
 
                                Method::Fires => {
 
                                    let port_value = cur_frame.expr_values.pop_front().unwrap();
 
                                    let port_value_deref = self.store.maybe_read_ref(&port_value).clone();
 
                                    let port_id = port_value_deref.as_port_id();
src/runtime/connector.rs
Show inline comments
 
@@ -330,13 +330,13 @@ impl ConnectorPDL {
 
                // that branch is ran again immediately.
 
                self.tree.push_into_queue(QueueKind::Runnable, firing_branch_id);
 
                self.tree.push_into_queue(QueueKind::Runnable, silent_branch_id);
 

	
 
                return ConnectorScheduling::Immediate;
 
            },
 
            EvalContinuation::BlockGet(port_id) => {
 
            EvalContinuation::BlockGet(_expr_id, port_id) => {
 
                // Branch performed a `get()` on a port that does not have a
 
                // received message on that port.
 
                let port_id = PortIdLocal::new(port_id.id);
 

	
 
                branch.sync_state = SpeculativeState::HaltedAtBranchPoint;
 
                branch.awaiting_port = port_id;
 
@@ -388,13 +388,13 @@ impl ConnectorPDL {
 

	
 
                let left_branch = &mut self.tree[left_id];
 
                left_branch.prepared = PreparedStatement::ForkedExecution(true);
 
                let right_branch = &mut self.tree[right_id];
 
                right_branch.prepared = PreparedStatement::ForkedExecution(false);
 
            }
 
            EvalContinuation::Put(port_id, content) => {
 
            EvalContinuation::Put(_expr_id, port_id, content) => {
 
                // Branch is attempting to send data
 
                let port_id = PortIdLocal::new(port_id.id);
 
                let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx);
 
                let message = DataMessage{ sync_header, data_header, content };
 
                match comp_ctx.submit_message(Message::Data(message)) {
 
                    Ok(_) => {
src/runtime2/communication.rs
Show inline comments
 
@@ -108,21 +108,23 @@ pub type SyncLocalSolution = Vec<SyncLocalSolutionEntry>;
 
pub struct SyncSolutionGetterPort {
 
    pub self_comp_id: CompId,
 
    pub self_port_id: PortId,
 
    pub peer_comp_id: CompId,
 
    pub peer_port_id: PortId,
 
    pub mapping: u32,
 
    pub failed: bool,
 
}
 

	
 
/// Putter port in a solution. A putter may not be certain about who its peer
 
/// component/port is.
 
#[derive(Debug)]
 
pub struct SyncSolutionPutterPort {
 
    pub self_comp_id: CompId,
 
    pub self_port_id: PortId,
 
    pub mapping: u32,
 
    pub failed: bool,
 
}
 

	
 
#[derive(Debug)]
 
pub struct SyncSolutionChannel {
 
    pub putter: Option<SyncSolutionPutterPort>,
 
    pub getter: Option<SyncSolutionGetterPort>,
 
@@ -178,13 +180,13 @@ pub enum ControlMessageContent {
 
    UnblockPort(PortId),
 
    ClosePort(ControlMessageClosePort),
 
    PortPeerChangedBlock(PortId),
 
    PortPeerChangedUnblock(PortId, CompId),
 
}
 

	
 
#[derive(Debug)]
 
#[derive(Copy, Clone, Debug)]
 
pub struct ControlMessageClosePort {
 
    pub port_to_close: PortId, // ID of the receiving port
 
    pub closed_in_sync_round: bool, // needed to ensure correct handling of errors
 
}
 

	
 
// -----------------------------------------------------------------------------
src/runtime2/component/component.rs
Show inline comments
 
use std::fmt::{Display as FmtDisplay, Result as FmtResult, Formatter};
 

	
 
use crate::protocol::eval::{Prompt, EvalError, ValueGroup, PortId as EvalPortId};
 
use crate::protocol::*;
 
use crate::runtime2::*;
 
use crate::runtime2::communication::*;
 

	
 
use super::{CompCtx, CompPDL, CompId};
 
@@ -14,12 +16,35 @@ pub enum CompScheduling {
 
    Immediate,
 
    Requeue,
 
    Sleep,
 
    Exit,
 
}
 

	
 
/// Potential error emitted by a component
 
pub enum CompError {
 
    /// Error originating from the code executor. Hence has an associated
 
    /// source location.
 
    Executor(EvalError),
 
    /// Error originating from a component, but not necessarily associated with
 
    /// a location in the source.
 
    Component(String), // TODO: Maybe a different embedded value in the future?
 
    /// Pure runtime error. Not necessarily originating from the component
 
    /// itself. Should be treated as a very severe runtime-compromising error.
 
    Runtime(RtError),
 
}
 

	
 
impl FmtDisplay for CompError {
 
    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
 
        match self {
 
            CompError::Executor(v) => v.fmt(f),
 
            CompError::Component(v) => v.fmt(f),
 
            CompError::Runtime(v) => v.fmt(f),
 
        }
 
    }
 
}
 

	
 
/// Generic representation of a component (as viewed by a scheduler).
 
pub(crate) trait Component {
 
    /// Called upon the creation of the component. Note that the scheduler
 
    /// context is officially running another component (the component that is
 
    /// creating the new component).
 
    fn on_creation(&mut self, comp_id: CompId, sched_ctx: &SchedulerCtx);
 
@@ -36,58 +61,102 @@ pub(crate) trait Component {
 
    /// Called if the component receives a new message. The component is
 
    /// responsible for deciding where that messages goes.
 
    fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message);
 

	
 
    /// Called if the component's routine should be executed. The return value
 
    /// can be used to indicate when the routine should be run again.
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result<CompScheduling, EvalError>;
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling;
 
}
 

	
 
/// Representation of the generic operating mode of a component.
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub(crate) enum CompMode {
 
    NonSync, // not in sync mode
 
    Sync, // in sync mode, can interact with other components
 
    SyncEnd, // awaiting a solution, i.e. encountered the end of the sync block
 
    BlockedGet, // blocked because we need to receive a message on a particular port
 
    BlockedPut, // component is blocked because the port is blocked
 
    BlockedSelect, // waiting on message to complete the select statement
 
    StartExit, // temporary state: if encountered then we start the shutdown process
 
    BusyExit, // temporary state: waiting for Acks for all the closed ports
 
    StartExit, // temporary state: if encountered then we start the shutdown process.
 
    BusyExit, // temporary state: waiting for Acks for all the closed ports, potentially waiting for sync round to finish
 
    Exit, // exiting: shutdown process started, now waiting until the reference count drops to 0
 
}
 

	
 
impl CompMode {
 
    pub(crate) fn is_in_sync_block(&self) -> bool {
 
        use CompMode::*;
 

	
 
        match self {
 
            Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect => true,
 
            NonSync | StartExit | BusyExit | Exit => false,
 
        }
 
    }
 

	
 
    pub(crate) fn is_busy_exiting(&self) -> bool {
 
        use CompMode::*;
 

	
 
        match self {
 
            NonSync | Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect => false,
 
            StartExit | BusyExit => true,
 
            Exit => false,
 
        }
 
    }
 
}
 

	
 
#[derive(Debug)]
 
pub(crate) enum ExitReason {
 
    Termination, // regular termination of component
 
    ErrorInSync,
 
    ErrorNonSync,
 
}
 

	
 
impl ExitReason {
 
    pub(crate) fn is_in_sync(&self) -> bool {
 
        use ExitReason::*;
 

	
 
        match self {
 
            Termination | ErrorNonSync => false,
 
            ErrorInSync => true,
 
        }
 
    }
 

	
 
    pub(crate) fn is_error(&self) -> bool {
 
        use ExitReason::*;
 

	
 
        match self {
 
            Termination => false,
 
            ErrorInSync | ErrorNonSync => true,
 
        }
 
    }
 
}
 

	
 
/// Component execution state: the execution mode along with some descriptive
 
/// fields. Fields are public for ergonomic reasons, use member functions when
 
/// appropriate.
 
pub(crate) struct CompExecState {
 
    pub mode: CompMode,
 
    pub mode_port: PortId, // valid if blocked on a port (put/get)
 
    pub mode_value: ValueGroup, // valid if blocked on a put
 
    pub exit_reason: ExitReason, // valid if in StartExit/BusyExit/Exit mode
 
}
 

	
 
impl CompExecState {
 
    pub(crate) fn new() -> Self {
 
        return Self{
 
            mode: CompMode::NonSync,
 
            mode_port: PortId::new_invalid(),
 
            mode_value: ValueGroup::default(),
 
            exit_reason: ExitReason::Termination,
 
        }
 
    }
 

	
 
    pub(crate) fn set_as_start_exit(&mut self, reason: ExitReason) {
 
        self.mode = CompMode::StartExit;
 
        self.exit_reason = reason;
 
    }
 

	
 
    pub(crate) fn set_as_blocked_get(&mut self, port: PortId) {
 
        self.mode = CompMode::BlockedGet;
 
        self.mode_port = port;
 
        debug_assert!(self.mode_value.values.is_empty());
 
    }
 

	
 
@@ -150,25 +219,32 @@ pub(crate) fn create_component(
 
/// the `ExecState` will become blocked as well. Note that
 
/// `default_handle_control_message` will ensure that the port becomes
 
/// unblocked if so instructed by the receiving component. The returned
 
/// scheduling value must be used.
 
#[must_use]
 
pub(crate) fn default_send_data_message(
 
    exec_state: &mut CompExecState, transmitting_port_id: PortId, value: ValueGroup,
 
    exec_state: &mut CompExecState, transmitting_port_id: PortId,
 
    port_instruction: PortInstruction, value: ValueGroup,
 
    sched_ctx: &SchedulerCtx, consensus: &mut Consensus, comp_ctx: &mut CompCtx
 
) -> Result<CompScheduling, String> { // @nocommit: Something better than Err(String)
 
) -> Result<CompScheduling, (PortInstruction, String)> {
 
    debug_assert_eq!(exec_state.mode, CompMode::Sync);
 

	
 
    let port_handle = comp_ctx.get_port_handle(transmitting_port_id);
 
    let port_info = comp_ctx.get_port_mut(port_handle);
 
    port_info.last_instruction = port_instruction;
 

	
 
    let port_info = comp_ctx.get_port(port_handle);
 
    debug_assert_eq!(port_info.kind, PortKind::Putter);
 

	
 
    if port_info.state == PortState::Closed {
 
        // Note: normally peer is eventually consistent, but if it has shut down
 
        // then we can be sure it is consistent (I think?)
 
        return Err(format!("Cannot send on this port, as the peer (id:{}) has shut down", port_info.peer_comp_id.0))
 
        return Err((
 
            port_info.last_instruction,
 
            format!("Cannot send on this port, as the peer (id:{}) has shut down", port_info.peer_comp_id.0)
 
        ))
 
    } else if port_info.state.is_blocked() {
 
        // Port is blocked, so we cannot send
 
        exec_state.set_as_blocked_put(transmitting_port_id, value);
 

	
 
        return Ok(CompScheduling::Sleep);
 
    } else {
 
@@ -242,51 +318,65 @@ pub(crate) fn default_handle_incoming_data_message(
 

	
 
/// Default handling that has been received through a `get`. Will check if any
 
/// more messages are waiting, and if the corresponding port was blocked because
 
/// of full buffers (hence, will use the control layer to make sure the peer
 
/// will become unblocked).
 
pub(crate) fn default_handle_received_data_message(
 
    targeted_port: PortId, slot: &mut Option<DataMessage>, inbox_backup: &mut Vec<DataMessage>,
 
    targeted_port: PortId, port_instruction: PortInstruction,
 
    slot: &mut Option<DataMessage>, inbox_backup: &mut Vec<DataMessage>,
 
    comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx, control: &mut ControlLayer
 
) {
 
) -> Result<(), (PortInstruction, String)> {
 
    debug_assert!(slot.is_none()); // because we've just received from it
 

	
 
    // Check if there are any more messages in the backup buffer
 
    // Modify last-known location where port instruction was retrieved
 
    let port_handle = comp_ctx.get_port_handle(targeted_port);
 
    let port_info = comp_ctx.get_port_mut(port_handle);
 
    port_info.last_instruction = port_instruction;
 

	
 
    if port_info.state == PortState::Closed {
 
        return Err((
 
            port_info.last_instruction,
 
            format!("Cannot 'get' because the channel is closed"))
 
        );
 
    }
 

	
 
    // Check if there are any more messages in the backup buffer
 
    let port_info = comp_ctx.get_port(port_handle);
 
    for message_index in 0..inbox_backup.len() {
 
        let message = &inbox_backup[message_index];
 
        if message.data_header.target_port == targeted_port {
 
            // One more message, place it in the slot
 
            let message = inbox_backup.remove(message_index);
 
            debug_assert!(port_info.state.is_blocked()); // since we're removing another message from the backup
 
            *slot = Some(message);
 

	
 
            return;
 
            return Ok(());
 
        }
 
    }
 

	
 
    // Did not have any more messages, so if we were blocked, then we need to
 
    // unblock the port now (and inform the peer of this unblocking)
 
    if port_info.state == PortState::BlockedDueToFullBuffers {
 
        comp_ctx.set_port_state(port_handle, PortState::Open);
 
        let (peer_handle, message) = control.cancel_port_blocking(comp_ctx, port_handle);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 
        peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(message), true);
 
    }
 

	
 
    return Ok(());
 
}
 

	
 
/// Handles control messages in the default way. Note that this function may
 
/// take a lot of actions in the name of the caller: pending messages may be
 
/// sent, ports may become blocked/unblocked, etc. So the execution
 
/// (`CompExecState`), control (`ControlLayer`) and consensus (`Consensus`)
 
/// state may all change.
 
pub(crate) fn default_handle_control_message(
 
    exec_state: &mut CompExecState, control: &mut ControlLayer, consensus: &mut Consensus,
 
    message: ControlMessage, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx
 
) -> Result<(), String> { // @nocommit, use something else than Err(String)
 
) -> Result<(), (PortInstruction, String)> {
 
    match message.content {
 
        ControlMessageContent::Ack => {
 
            default_handle_ack(control, message.id, sched_ctx, comp_ctx);
 
        },
 
        ControlMessageContent::BlockPort(port_id) => {
 
            // One of our messages was accepted, but the port should be
 
@@ -299,21 +389,23 @@ pub(crate) fn default_handle_control_message(
 
                comp_ctx.set_port_state(port_handle, PortState::BlockedDueToFullBuffers);
 
            }
 
        },
 
        ControlMessageContent::ClosePort(content) => {
 
            // Request to close the port. We immediately comply and remove
 
            // the component handle as well
 
            let port_handle = comp_ctx.get_port_handle(content.port_id);
 
            let port_info = comp_ctx.get_port_mut(port_handle);
 
            let peer_comp_id = port_info.peer_comp_id;
 
            let peer_handle = comp_ctx.get_peer_handle(peer_comp_id);
 
            let port_handle = comp_ctx.get_port_handle(content.port_to_close);
 

	
 
            // We're closing the port, so we will always update the peer of the
 
            // port (in case of error messages)
 
            let port_info = comp_ctx.get_port_mut(port_handle);
 
            port_info.peer_comp_id = message.sender_comp_id;
 

	
 
            let port_info = comp_ctx.get_port(port_handle);
 
            let peer_comp_id = port_info.peer_comp_id;
 
            let peer_handle = comp_ctx.get_peer_handle(peer_comp_id);
 

	
 
            // One exception to sending an `Ack` is if we just closed the
 
            // port ourselves, meaning that the `ClosePort` messages got
 
            // sent to one another.
 
            if let Some(control_id) = control.has_close_port_entry(port_handle, comp_ctx) {
 
                // The two components (sender and this component) are closing
 
                // the channel at the same time.
 
@@ -331,25 +423,16 @@ pub(crate) fn default_handle_control_message(
 
                // now, but we may error out in the next sync block when we
 
                // try to `put`/`get` on the port. This condition makes sure
 
                // that if we have a successful sync round, followed by the peer
 
                // closing the port, that we don't consider the sync round to
 
                // have failed by mistake.
 
                if content.closed_in_sync_round && exec_state.mode.is_in_sync_block() && port_was_used {
 
                    let error_message = match last_instruction {
 
                        PortInstruction::None => unreachable!(), // port was used
 
                        PortInstruction::NoSource => format!(
 
                            "Peer component (id:{}) shut down, so operation on port cannot have succeeded",
 
                            message.sender_comp_id.0
 
                        ),
 
                        PortInstruction::SourceLocation(source_location) => format!(
 
                            "Peer component (id:{}) shut down, so this operation cannot have succeeded",
 
                            message.sender_comp_id.0
 
                        ),
 
                    };
 

	
 
                    return Err(error_message);
 
                    return Err((
 
                        last_instruction,
 
                        format!("Peer component (id:{}) shut down, so previous communication cannot have succeeded", peer_comp_id.0)
 
                    ));
 
                }
 
            }
 
        },
 
        ControlMessageContent::UnblockPort(port_id) => {
 
            // We were previously blocked (or already closed)
 
            let port_handle = comp_ctx.get_port_handle(port_id);
 
@@ -395,19 +478,27 @@ pub(crate) fn default_handle_control_message(
 
/// Handles a component initiating the exiting procedure, and closing all of its
 
/// ports. Should only be called once per component (which is ensured by
 
/// checking and modifying the mode in the execution state).
 
#[must_use]
 
pub(crate) fn default_handle_start_exit(
 
    exec_state: &mut CompExecState, control: &mut ControlLayer,
 
    sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx
 
    sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus
 
) -> CompScheduling {
 
    debug_assert_eq!(exec_state.mode, CompMode::StartExit);
 
    sched_ctx.log("Component starting exit");
 
    exec_state.mode = CompMode::BusyExit;
 
    let exit_inside_sync = exec_state.exit_reason.is_in_sync();
 

	
 
    // If exiting while inside sync mode, report to the leader of the current
 
    // round that we've failed.
 
    if exit_inside_sync {
 
        let decision = consensus.notify_sync_end_failure(sched_ctx, comp_ctx);
 
        default_handle_sync_decision(sched_ctx, exec_state, decision, consensus);
 
    }
 

	
 
    // Iterating by index to work around borrowing rules
 
    // Iterating over ports by index to work around borrowing rules
 
    for port_index in 0..comp_ctx.num_ports() {
 
        let port = comp_ctx.get_port_by_index_mut(port_index);
 
        if port.state == PortState::Closed {
 
            // Already closed, or in the process of being closed
 
            continue;
 
        }
 
@@ -415,13 +506,13 @@ pub(crate) fn default_handle_start_exit(
 
        // Mark as closed
 
        let port_id = port.self_id;
 
        port.state = PortState::Closed;
 

	
 
        // Notify peer of closing
 
        let port_handle = comp_ctx.get_port_handle(port_id);
 
        let (peer, message) = control.initiate_port_closing(port_handle, comp_ctx);
 
        let (peer, message) = control.initiate_port_closing(port_handle, exit_inside_sync, comp_ctx);
 
        let peer_info = comp_ctx.get_peer(peer);
 
        peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(message), true);
 
    }
 

	
 
    return CompScheduling::Immediate; // to check if we can shut down immediately
 
}
 
@@ -444,34 +535,74 @@ pub(crate) fn default_handle_busy_exit(
 
    }
 
}
 

	
 
/// Handles a potential synchronous round decision. If there was a decision then
 
/// the `Some(success)` value indicates whether the round succeeded or not.
 
/// Might also end up changing the `ExecState`.
 
///
 
/// Might be called in two cases:
 
/// 1. The component is in regular execution mode, at the end of a sync round,
 
///     and is waiting for a solution to the round.
 
/// 2. The component has encountered an error during a sync round and is
 
///     exiting, hence is waiting for a "Failure" message from the leader.
 
pub(crate) fn default_handle_sync_decision(
 
    exec_state: &mut CompExecState, decision: SyncRoundDecision,
 
    consensus: &mut Consensus
 
    sched_ctx: &SchedulerCtx, exec_state: &mut CompExecState,
 
    decision: SyncRoundDecision, consensus: &mut Consensus
 
) -> Option<bool> {
 
    debug_assert_eq!(exec_state.mode, CompMode::SyncEnd);
 
    let success = match decision {
 
        SyncRoundDecision::None => return None,
 
        SyncRoundDecision::Solution => true,
 
        SyncRoundDecision::Failure => false,
 
    };
 

	
 
    debug_assert_eq!(exec_state.mode, CompMode::SyncEnd);
 
    debug_assert!(
 
        exec_state.mode == CompMode::SyncEnd || (
 
            exec_state.mode.is_busy_exiting() && exec_state.exit_reason.is_error()
 
        ) || (
 
            exec_state.mode.is_in_sync_block() && decision == SyncRoundDecision::Failure
 
        )
 
    );
 

	
 
    sched_ctx.log(&format!("Handling decision {:?} (in mode: {:?})", decision, exec_state.mode));
 
    consensus.notify_sync_decision(decision);
 
    if success {
 
        // We cannot get a success message if the component has encountered an
 
        // error.
 
        debug_assert_eq!(exec_state.mode, CompMode::SyncEnd);
 
        exec_state.mode = CompMode::NonSync;
 
        consensus.notify_sync_decision(decision);
 
        return Some(true);
 
    } else {
 
        exec_state.mode = CompMode::StartExit;
 
        // We may get failure both in all possible cases. But we should only
 
        // modify the execution state if we're not already in exit mode
 
        if !exec_state.mode.is_busy_exiting() {
 
            sched_ctx.error("failed synchronous round, initiating exit");
 
            exec_state.set_as_start_exit(ExitReason::ErrorNonSync);
 
        }
 
        return Some(false);
 
    }
 
}
 

	
 
/// Performs the default action of printing the provided error, and then putting
 
/// the component in the state where it will shut down. Only to be used for
 
/// builtin components: their error message construction is simpler (and more
 
/// common) as they don't have any source code.
 
pub(crate) fn default_handle_error_for_builtin(
 
    exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx,
 
    location_and_message: (PortInstruction, String)
 
) {
 
    let (_location, message) = location_and_message;
 
    sched_ctx.error(&message);
 

	
 
    let exit_reason = if exec_state.mode.is_in_sync_block() {
 
        ExitReason::ErrorInSync
 
    } else {
 
        ExitReason::ErrorNonSync
 
    };
 

	
 
    exec_state.set_as_start_exit(exit_reason);
 
}
 

	
 
#[inline]
 
pub(crate) fn default_handle_exit(_exec_state: &CompExecState) -> CompScheduling {
 
    debug_assert_eq!(_exec_state.mode, CompMode::Exit);
 
    return CompScheduling::Exit;
 
}
src/runtime2/component/component_context.rs
Show inline comments
 
use crate::runtime2::scheduler::*;
 
use crate::runtime2::runtime::*;
 
use crate::runtime2::communication::*;
 

	
 
use crate::protocol::ExpressionId;
 

	
 
/// Helper struct to remember when the last operation on the port took place
 
/// Helper struct to remember when the last operation on the port took place.
 
#[derive(Debug, PartialEq, Copy, Clone)]
 
pub enum PortInstruction {
 
    None,
 
    NoSource,
 
    SourceLocation(ExpressionId),
 
}
src/runtime2/component/component_internet.rs
Show inline comments
 
use crate::protocol::eval::{ValueGroup, Value, EvalError};
 
use crate::protocol::eval::{ValueGroup, Value};
 
use crate::runtime2::*;
 
use crate::runtime2::component::{CompCtx, CompId};
 
use crate::runtime2::component::{CompCtx, CompId, PortInstruction};
 
use crate::runtime2::stdlib::internet::*;
 
use crate::runtime2::poll::*;
 

	
 
use super::component::{self, *};
 
use super::control_layer::*;
 
use super::consensus::*;
 
@@ -101,27 +101,29 @@ impl Component for ComponentTcpClient {
 
        match message {
 
            Message::Data(message) => {
 
                self.handle_incoming_data_message(sched_ctx, comp_ctx, message);
 
            },
 
            Message::Sync(message) => {
 
                let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message);
 
                component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus);
 
                component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, decision, &mut self.consensus);
 
            },
 
            Message::Control(message) => {
 
                component::default_handle_control_message(
 
                if let Err(location_and_message) = component::default_handle_control_message(
 
                    &mut self.exec_state, &mut self.control, &mut self.consensus,
 
                    message, sched_ctx, comp_ctx
 
                );
 
                ) {
 
                    component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message);
 
                }
 
            },
 
            Message::Poll => {
 
                sched_ctx.log("Received polling event");
 
            },
 
        }
 
    }
 

	
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result<CompScheduling, EvalError> {
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling {
 
        sched_ctx.log(&format!("Running component ComponentTcpClient (mode: {:?}, sync state: {:?})", self.exec_state.mode, self.sync_state));
 

	
 
        match self.exec_state.mode {
 
            CompMode::BlockedSelect => {
 
                // Not possible: we never enter this state
 
                unreachable!();
 
@@ -129,26 +131,26 @@ impl Component for ComponentTcpClient {
 
            CompMode::NonSync => {
 
                // When in non-sync mode
 
                match &mut self.socket_state {
 
                    SocketState::Connected(_socket) => {
 
                        if self.sync_state == SyncState::FinishSyncThenQuit {
 
                            // Previous request was to let the component shut down
 
                            self.exec_state.mode = CompMode::StartExit;
 
                            self.exec_state.set_as_start_exit(ExitReason::Termination);
 
                        } else {
 
                            // Reset for a new request
 
                            self.sync_state = SyncState::AwaitingCmd;
 
                            self.consensus.notify_sync_start(comp_ctx);
 
                            self.exec_state.mode = CompMode::Sync;
 
                        }
 
                        return Ok(CompScheduling::Immediate);
 
                        return CompScheduling::Immediate;
 
                    },
 
                    SocketState::Error => {
 
                        // Could potentially send an error message to the
 
                        // connected component.
 
                        self.exec_state.mode = CompMode::StartExit;
 
                        return Ok(CompScheduling::Immediate);
 
                        self.exec_state.set_as_start_exit(ExitReason::ErrorNonSync);
 
                        return CompScheduling::Immediate;
 
                    }
 
                }
 
            },
 
            CompMode::Sync => {
 
                // When in sync mode: wait for a command to come in
 
                match self.sync_state {
 
@@ -156,17 +158,22 @@ impl Component for ComponentTcpClient {
 
                        if let Some(message) = &self.inbox_main {
 
                            self.consensus.handle_incoming_data_message(comp_ctx, &message);
 
                            if self.consensus.try_receive_data_message(sched_ctx, comp_ctx, &message) {
 
                                // Check which command we're supposed to execute.
 
                                let message = self.inbox_main.take().unwrap();
 
                                let target_port_id = message.data_header.target_port;
 
                                component::default_handle_received_data_message(
 
                                    target_port_id, &mut self.inbox_main, &mut self.inbox_backup,
 
                                let receive_result = component::default_handle_received_data_message(
 
                                    target_port_id, PortInstruction::NoSource,
 
                                    &mut self.inbox_main, &mut self.inbox_backup,
 
                                    comp_ctx, sched_ctx, &mut self.control
 
                                );
 

	
 
                                if let Err(location_and_message) = receive_result {
 
                                    component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message);
 
                                    return CompScheduling::Immediate;
 
                                } else {
 
                                    let (tag_value, embedded_heap_pos) = message.content.values[0].as_union();
 
                                    if tag_value == self.input_union_send_tag_value {
 
                                        // Retrieve bytes from the message
 
                                        self.byte_buffer.clear();
 
                                        let union_content = &message.content.regions[embedded_heap_pos as usize];
 
                                        debug_assert_eq!(union_content.len(), 1);
 
@@ -175,35 +182,36 @@ impl Component for ComponentTcpClient {
 
                                        self.byte_buffer.reserve(array_values.len());
 
                                        for value in array_values {
 
                                            self.byte_buffer.push(value.as_uint8());
 
                                        }
 

	
 
                                        self.sync_state = SyncState::Putting;
 
                                    return Ok(CompScheduling::Immediate);
 
                                        return CompScheduling::Immediate;
 
                                    } else if tag_value == self.input_union_receive_tag_value {
 
                                        // Component requires a `recv`
 
                                        self.sync_state = SyncState::Getting;
 
                                    return Ok(CompScheduling::Immediate);
 
                                        return CompScheduling::Immediate;
 
                                    } else if tag_value == self.input_union_finish_tag_value {
 
                                        // Component requires us to end the sync round
 
                                        self.sync_state = SyncState::FinishSync;
 
                                    return Ok(CompScheduling::Immediate);
 
                                        return CompScheduling::Immediate;
 
                                    } else if tag_value == self.input_union_shutdown_tag_value {
 
                                        // Component wants to close the connection
 
                                        self.sync_state = SyncState::FinishSyncThenQuit;
 
                                    return Ok(CompScheduling::Immediate);
 
                                        return CompScheduling::Immediate;
 
                                    } else {
 
                                        unreachable!("got tag_value {}", tag_value)
 
                                    }
 
                                }
 
                            } else {
 
                                todo!("handle sync failure due to message deadlock");
 
                                return Ok(CompScheduling::Sleep);
 
                                return CompScheduling::Sleep;
 
                            }
 
                        } else {
 
                            self.exec_state.set_as_blocked_get(self.pdl_input_port_id);
 
                            return Ok(CompScheduling::Sleep);
 
                            return CompScheduling::Sleep;
 
                        }
 
                    },
 
                    SyncState::Putting => {
 
                        // We're supposed to send a user-supplied message fully
 
                        // over the socket. But we might end up blocking. In
 
                        // that case the component goes to sleep until it is
 
@@ -213,26 +221,26 @@ impl Component for ComponentTcpClient {
 
                            match socket.send(&self.byte_buffer) {
 
                                Ok(bytes_sent) => {
 
                                    self.byte_buffer.drain(..bytes_sent);
 
                                },
 
                                Err(err) => {
 
                                    if err.kind() == IoErrorKind::WouldBlock {
 
                                        return Ok(CompScheduling::Sleep); // wait until notified
 
                                        return CompScheduling::Sleep; // wait until notified
 
                                    } else {
 
                                        todo!("handle socket.send error {:?}", err)
 
                                    }
 
                                }
 
                            }
 
                        }
 

	
 
                        // If here then we're done putting the data, we can
 
                        // finish the sync round
 
                        let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx);
 
                        let decision = self.consensus.notify_sync_end_success(sched_ctx, comp_ctx);
 
                        self.exec_state.mode = CompMode::SyncEnd;
 
                        component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus);
 
                        return Ok(CompScheduling::Immediate);
 
                        component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, decision, &mut self.consensus);
 
                        return CompScheduling::Immediate;
 
                    },
 
                    SyncState::Getting => {
 
                        // We're going to try and receive a single message. If
 
                        // this causes us to end up blocking the component
 
                        // goes to sleep until it is polled.
 
                        const BUFFER_SIZE: usize = 1024; // TODO: Move to config
 
@@ -240,46 +248,52 @@ impl Component for ComponentTcpClient {
 
                        let socket = self.socket_state.get_socket();
 
                        self.byte_buffer.resize(BUFFER_SIZE, 0);
 
                        match socket.receive(&mut self.byte_buffer) {
 
                            Ok(num_received) => {
 
                                self.byte_buffer.resize(num_received, 0);
 
                                let message_content = self.bytes_to_data_message_content(&self.byte_buffer);
 
                                let scheduling = component::default_send_data_message(&mut self.exec_state, self.pdl_output_port_id, message_content, sched_ctx, &mut self.consensus, comp_ctx);
 
                                let send_result = component::default_send_data_message(&mut self.exec_state, self.pdl_output_port_id, PortInstruction::NoSource, message_content, sched_ctx, &mut self.consensus, comp_ctx);
 
                                if let Err(location_and_message) = send_result {
 
                                    component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message);
 
                                    return CompScheduling::Immediate;
 
                                } else {
 
                                    let scheduling = send_result.unwrap();
 
                                    self.sync_state = SyncState::AwaitingCmd;
 
                                return Ok(scheduling);
 
                                    return scheduling;
 
                                }
 
                            },
 
                            Err(err) => {
 
                                if err.kind() == IoErrorKind::WouldBlock {
 
                                    return Ok(CompScheduling::Sleep); // wait until polled
 
                                    return CompScheduling::Sleep; // wait until polled
 
                                } else {
 
                                    todo!("handle socket.receive error {:?}", err)
 
                                }
 
                            }
 
                        }
 
                    },
 
                    SyncState::FinishSync | SyncState::FinishSyncThenQuit => {
 
                        let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx);
 
                        let decision = self.consensus.notify_sync_end_success(sched_ctx, comp_ctx);
 
                        self.exec_state.mode = CompMode::SyncEnd;
 
                        component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus);
 
                        return Ok(CompScheduling::Requeue);
 
                        component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, decision, &mut self.consensus);
 
                        return CompScheduling::Requeue;
 
                    },
 
                }
 
            },
 
            CompMode::BlockedGet => {
 
                // Entered when awaiting a new command
 
                debug_assert_eq!(self.sync_state, SyncState::AwaitingCmd);
 
                return Ok(CompScheduling::Sleep);
 
                return CompScheduling::Sleep;
 
            },
 
            CompMode::SyncEnd | CompMode::BlockedPut =>
 
                return Ok(CompScheduling::Sleep),
 
                return CompScheduling::Sleep,
 
            CompMode::StartExit =>
 
                return Ok(component::default_handle_start_exit(&mut self.exec_state, &mut self.control, sched_ctx, comp_ctx)),
 
                return component::default_handle_start_exit(&mut self.exec_state, &mut self.control, sched_ctx, comp_ctx, &mut self.consensus),
 
            CompMode::BusyExit =>
 
                return Ok(component::default_handle_busy_exit(&mut self.exec_state, &mut self.control, sched_ctx)),
 
                return component::default_handle_busy_exit(&mut self.exec_state, &mut self.control, sched_ctx),
 
            CompMode::Exit =>
 
                return Ok(component::default_handle_exit(&self.exec_state)),
 
                return component::default_handle_exit(&self.exec_state),
 
        }
 
    }
 
}
 

	
 
impl ComponentTcpClient {
 
    pub(crate) fn new(arguments: ValueGroup) -> Self {
src/runtime2/component/component_pdl.rs
Show inline comments
 
@@ -10,13 +10,13 @@ use crate::protocol::eval::{
 
use crate::runtime2::runtime::CompId;
 
use crate::runtime2::scheduler::SchedulerCtx;
 
use crate::runtime2::communication::*;
 

	
 
use super::component::{
 
    self,
 
    CompExecState, Component, CompScheduling, CompMode,
 
    CompExecState, Component, CompScheduling, CompError, CompMode, ExitReason,
 
    port_id_from_eval, port_id_to_eval
 
};
 
use super::component_context::*;
 
use super::control_layer::*;
 
use super::consensus::Consensus;
 

	
 
@@ -253,163 +253,190 @@ impl Component for CompPDL {
 

	
 
        match message {
 
            Message::Data(message) => {
 
                self.handle_incoming_data_message(sched_ctx, comp_ctx, message);
 
            },
 
            Message::Control(message) => {
 
                component::default_handle_control_message(
 
                if let Err(location_and_message) = component::default_handle_control_message(
 
                    &mut self.exec_state, &mut self.control, &mut self.consensus,
 
                    message, sched_ctx, comp_ctx
 
                );
 
                ) {
 
                    self.handle_component_error(sched_ctx, location_and_message);
 
                }
 
            },
 
            Message::Sync(message) => {
 
                self.handle_incoming_sync_message(sched_ctx, comp_ctx, message);
 
            },
 
            Message::Poll => {
 
                unreachable!(); // because we never register at the polling thread
 
            }
 
        }
 
    }
 

	
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result<CompScheduling, EvalError> {
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling {
 
        use EvalContinuation as EC;
 

	
 
        sched_ctx.log(&format!("Running component (mode: {:?})", self.exec_state.mode));
 

	
 
        // Depending on the mode don't do anything at all, take some special
 
        // actions, or fall through and run the PDL code.
 
        match self.exec_state.mode {
 
            CompMode::NonSync | CompMode::Sync => {
 
                // continue and run PDL code
 
            },
 
            CompMode::SyncEnd | CompMode::BlockedGet | CompMode::BlockedPut | CompMode::BlockedSelect => {
 
                return Ok(CompScheduling::Sleep);
 
                return CompScheduling::Sleep;
 
            }
 
            CompMode::StartExit => return Ok(component::default_handle_start_exit(
 
                &mut self.exec_state, &mut self.control, sched_ctx, comp_ctx
 
            )),
 
            CompMode::BusyExit => return Ok(component::default_handle_busy_exit(
 
            CompMode::StartExit => return component::default_handle_start_exit(
 
                &mut self.exec_state, &mut self.control, sched_ctx, comp_ctx, &mut self.consensus
 
            ),
 
            CompMode::BusyExit => return component::default_handle_busy_exit(
 
                &mut self.exec_state, &self.control, sched_ctx
 
            )),
 
            CompMode::Exit => return Ok(component::default_handle_exit(&self.exec_state)),
 
            ),
 
            CompMode::Exit => return component::default_handle_exit(&self.exec_state),
 
        }
 

	
 
        let run_result = self.execute_prompt(&sched_ctx);
 
        if let Err(error) = run_result {
 
            // TODO: Cleanup before @nocommit
 
            sched_ctx.error(&format!("{}", error));
 
            let exit_reason = if self.exec_state.mode.is_in_sync_block() {
 
                ExitReason::ErrorInSync
 
            } else {
 
                ExitReason::ErrorNonSync
 
            };
 

	
 
            self.exec_state.set_as_start_exit(exit_reason);
 
            return CompScheduling::Immediate;
 
        }
 

	
 
        let run_result = self.execute_prompt(&sched_ctx)?;
 
        let run_result = run_result.unwrap();
 

	
 
        match run_result {
 
            EC::Stepping => unreachable!(), // execute_prompt runs until this is no longer returned
 
            EC::BranchInconsistent | EC::NewFork | EC::BlockFires(_) => todo!("remove these"),
 
            // Results that can be returned in sync mode
 
            EC::SyncBlockEnd => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::Sync);
 
                self.handle_sync_end(sched_ctx, comp_ctx);
 
                return Ok(CompScheduling::Immediate);
 
                return CompScheduling::Immediate;
 
            },
 
            EC::BlockGet(port_id) => {
 
            EC::BlockGet(expr_id, port_id) => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::Sync);
 
                debug_assert!(self.exec_ctx.stmt.is_none());
 

	
 
                let port_id = port_id_from_eval(port_id);
 
                let port_handle = comp_ctx.get_port_handle(port_id);
 
                let port_index = comp_ctx.get_port_index(port_handle);
 
                if let Some(message) = &self.inbox_main[port_index] {
 
                    // Check if we can actually receive the message
 
                    if self.consensus.try_receive_data_message(sched_ctx, comp_ctx, message) {
 
                        // Message was received. Make sure any blocked peers and
 
                        // pending messages are handled.
 
                        let message = self.inbox_main[port_index].take().unwrap();
 
                        component::default_handle_received_data_message(
 
                            port_id, &mut self.inbox_main[port_index], &mut self.inbox_backup,
 
                        let receive_result = component::default_handle_received_data_message(
 
                            port_id, PortInstruction::SourceLocation(expr_id),
 
                            &mut self.inbox_main[port_index], &mut self.inbox_backup,
 
                            comp_ctx, sched_ctx, &mut self.control
 
                        );
 

	
 
                        if let Err(location_and_message) = receive_result {
 
                            self.handle_component_error(sched_ctx, location_and_message);
 
                            return CompScheduling::Immediate
 
                        } else {
 
                            self.exec_ctx.stmt = ExecStmt::PerformedGet(message.content);
 
                        return Ok(CompScheduling::Immediate);
 
                            return CompScheduling::Immediate;
 
                        }
 
                    } else {
 
                        todo!("handle sync failure due to message deadlock");
 
                        return Ok(CompScheduling::Sleep);
 
                        return CompScheduling::Sleep;
 
                    }
 
                } else {
 
                    // We need to wait
 
                    self.exec_state.set_as_blocked_get(port_id);
 
                    return Ok(CompScheduling::Sleep);
 
                    return CompScheduling::Sleep;
 
                }
 
            },
 
            EC::Put(port_id, value) => {
 
            EC::Put(expr_id, port_id, value) => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::Sync);
 
                sched_ctx.log(&format!("Putting value {:?}", value));
 

	
 
                // Send the message
 
                let target_port_id = port_id_from_eval(port_id);
 
                let scheduling = component::default_send_data_message(
 
                    &mut self.exec_state, target_port_id, value,
 
                let send_result = component::default_send_data_message(
 
                    &mut self.exec_state, target_port_id,
 
                    PortInstruction::SourceLocation(expr_id), value,
 
                    sched_ctx, &mut self.consensus, comp_ctx
 
                );
 

	
 
                if let Err(location_and_message) = send_result {
 
                    self.handle_component_error(sched_ctx, location_and_message);
 
                    return CompScheduling::Immediate;
 
                } else {
 
                    // When `run` is called again (potentially after becoming
 
                    // unblocked) we need to instruct the executor that we performed
 
                    // the `put`
 
                    let scheduling = send_result.unwrap();
 
                    self.exec_ctx.stmt = ExecStmt::PerformedPut;
 
                return Ok(scheduling);
 
                    return scheduling;
 
                }
 
            },
 
            EC::SelectStart(num_cases, _num_ports) => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::Sync);
 
                self.select_state.handle_select_start(num_cases);
 
                return Ok(CompScheduling::Requeue);
 
                return CompScheduling::Requeue;
 
            },
 
            EC::SelectRegisterPort(case_index, port_index, port_id) => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::Sync);
 
                let port_id = port_id_from_eval(port_id);
 
                if let Err(_err) = self.select_state.register_select_case_port(comp_ctx, case_index, port_index, port_id) {
 
                    todo!("handle registering a port multiple times");
 
                }
 
                return Ok(CompScheduling::Immediate);
 
                return CompScheduling::Immediate;
 
            },
 
            EC::SelectWait => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::Sync);
 
                let select_decision = self.select_state.handle_select_waiting_point(&self.inbox_main, comp_ctx);
 
                if let SelectDecision::Case(case_index) = select_decision {
 
                    // Reached a conclusion, so we can continue immediately
 
                    self.exec_ctx.stmt = ExecStmt::PerformedSelectWait(case_index);
 
                    self.exec_state.mode = CompMode::Sync;
 
                    return Ok(CompScheduling::Immediate);
 
                    return CompScheduling::Immediate;
 
                } else {
 
                    // No decision yet
 
                    self.exec_state.mode = CompMode::BlockedSelect;
 
                    return Ok(CompScheduling::Sleep);
 
                    return CompScheduling::Sleep;
 
                }
 
            },
 
            // Results that can be returned outside of sync mode
 
            EC::ComponentTerminated => {
 
                self.exec_state.mode = CompMode::StartExit; // next call we'll take care of the exit
 
                return Ok(CompScheduling::Immediate);
 
                self.exec_state.set_as_start_exit(ExitReason::Termination);
 
                return CompScheduling::Immediate;
 
            },
 
            EC::SyncBlockStart => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::NonSync);
 
                self.handle_sync_start(sched_ctx, comp_ctx);
 
                return Ok(CompScheduling::Immediate);
 
                return CompScheduling::Immediate;
 
            },
 
            EC::NewComponent(definition_id, type_id, arguments) => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::NonSync);
 
                self.create_component_and_transfer_ports(
 
                    sched_ctx, comp_ctx,
 
                    definition_id, type_id, arguments
 
                );
 
                return Ok(CompScheduling::Requeue);
 
                return CompScheduling::Requeue;
 
            },
 
            EC::NewChannel => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::NonSync);
 
                debug_assert!(self.exec_ctx.stmt.is_none());
 
                let channel = comp_ctx.create_channel();
 
                self.exec_ctx.stmt = ExecStmt::CreatedChannel((
 
                    Value::Output(port_id_to_eval(channel.putter_id)),
 
                    Value::Input(port_id_to_eval(channel.getter_id))
 
                ));
 
                self.inbox_main.push(None);
 
                self.inbox_main.push(None);
 
                return Ok(CompScheduling::Immediate);
 
                return CompScheduling::Immediate;
 
            }
 
        }
 
    }
 
}
 

	
 
impl CompPDL {
 
@@ -466,43 +493,22 @@ impl CompPDL {
 
    /// Handles end of sync. The conclusion to the sync round might arise
 
    /// immediately (and be handled immediately), or might come later through
 
    /// messaging. In any case the component should be scheduled again
 
    /// immediately
 
    fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        sched_ctx.log("Component ending sync mode (now waiting for solution)");
 
        let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx);
 
        let decision = self.consensus.notify_sync_end_success(sched_ctx, comp_ctx);
 
        self.exec_state.mode = CompMode::SyncEnd;
 
        self.handle_sync_decision(sched_ctx, comp_ctx, decision);
 
    }
 

	
 
    /// Handles decision from the consensus round. This will cause a change in
 
    /// the internal `Mode`, such that the next call to `run` can take the
 
    /// appropriate next steps.
 
    fn handle_sync_decision(&mut self, sched_ctx: &SchedulerCtx, _comp_ctx: &mut CompCtx, decision: SyncRoundDecision) {
 
        sched_ctx.log(&format!("Handling sync decision: {:?} (in mode {:?})", decision, self.exec_state.mode));
 
        match decision {
 
            SyncRoundDecision::None => {
 
                // No decision yet
 
                return;
 
            },
 
            SyncRoundDecision::Solution => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::SyncEnd);
 
                self.exec_state.mode = CompMode::NonSync;
 
                self.consensus.notify_sync_decision(decision);
 
            },
 
            SyncRoundDecision::Failure => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::SyncEnd);
 
                self.exec_state.mode = CompMode::StartExit;
 
            },
 
        }
 
        component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, decision, &mut self.consensus);
 
    }
 

	
 
    fn handle_component_exit(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        sched_ctx.log("Component exiting");
 
        sched_ctx.log(&format!("Component exiting (reason: {:?}", self.exec_state.exit_reason));
 
        debug_assert_eq!(self.exec_state.mode, CompMode::StartExit);
 
        self.exec_state.mode = CompMode::BusyExit;
 
        let exit_inside_sync = self.exec_state.exit_reason.is_in_sync();
 

	
 
        // Doing this by index, then retrieving the handle is a bit rediculous,
 
        // but Rust is being Rust with its borrowing rules.
 
        for port_index in 0..comp_ctx.num_ports() {
 
            let port = comp_ctx.get_port_by_index_mut(port_index);
 
            if port.state == PortState::Closed {
 
@@ -513,13 +519,13 @@ impl CompPDL {
 
            // Mark as closed
 
            let port_id = port.self_id;
 
            port.state = PortState::Closed;
 

	
 
            // Notify peer of closing
 
            let port_handle = comp_ctx.get_port_handle(port_id);
 
            let (peer, message) = self.control.initiate_port_closing(port_handle, comp_ctx);
 
            let (peer, message) = self.control.initiate_port_closing(port_handle, exit_inside_sync, comp_ctx);
 
            let peer_info = comp_ctx.get_peer(peer);
 
            peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(message), true);
 
        }
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
@@ -557,13 +563,42 @@ impl CompPDL {
 
            }
 
        }
 
    }
 

	
 
    fn handle_incoming_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) {
 
        let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message);
 
        self.handle_sync_decision(sched_ctx, comp_ctx, decision);
 
        component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, decision, &mut self.consensus);
 
    }
 

	
 
    /// Handles an error coming from the generic `component::handle_xxx`
 
    /// functions. Hence accepts argument as a tuple.
 
    fn handle_component_error(&mut self, sched_ctx: &SchedulerCtx, location_and_message: (PortInstruction, String)) {
 
        // Retrieve location and message, display in terminal
 
        let (location, message) = location_and_message;
 
        let error = match location {
 
            PortInstruction::None => CompError::Component(message),
 
            PortInstruction::NoSource => unreachable!(), // for debugging: all in-sync errors are associated with a source location
 
            PortInstruction::SourceLocation(expression_id) => {
 
                let protocol = &sched_ctx.runtime.protocol;
 
                CompError::Executor(EvalError::new_error_at_expr(
 
                    &self.prompt, &protocol.modules, &protocol.heap,
 
                    expression_id, message
 
                ))
 
            }
 
        };
 

	
 
        sched_ctx.error(&format!("{}", error));
 

	
 
        // Set state to handle subsequent error
 
        let exit_reason = if self.exec_state.mode.is_in_sync_block() {
 
            ExitReason::ErrorInSync
 
        } else {
 
            ExitReason::ErrorNonSync
 
        };
 

	
 
        self.exec_state.set_as_start_exit(exit_reason);
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Handling ports
 
    // -------------------------------------------------------------------------
 

	
src/runtime2/component/component_random.rs
Show inline comments
 
use rand::prelude as random;
 
use rand::RngCore;
 

	
 
use crate::protocol::eval::{ValueGroup, Value, EvalError};
 
use crate::protocol::eval::{ValueGroup, Value};
 
use crate::runtime2::*;
 

	
 
use super::*;
 
use super::component::{self, Component, CompExecState, CompScheduling, CompMode};
 
use super::component::{
 
    self,
 
    Component, CompExecState, CompScheduling,
 
    CompMode, ExitReason
 
};
 
use super::control_layer::*;
 
use super::consensus::*;
 

	
 
/// TODO: Temporary component to figure out what to do with custom components.
 
///     This component sends random numbers between two u32 limits
 
pub struct ComponentRandomU32 {
 
@@ -39,25 +43,27 @@ impl Component for ComponentRandomU32 {
 

	
 
    fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) {
 
        match message {
 
            Message::Data(_message) => unreachable!(),
 
            Message::Sync(message) => {
 
                let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message);
 
                component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus);
 
                component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, decision, &mut self.consensus);
 
            },
 
            Message::Control(message) => {
 
                component::default_handle_control_message(
 
                if let Err(location_and_message) = component::default_handle_control_message(
 
                    &mut self.exec_state, &mut self.control, &mut self.consensus,
 
                    message, sched_ctx, comp_ctx
 
                );
 
                ) {
 
                    component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message);
 
                }
 
            },
 
            Message::Poll => unreachable!(),
 
        }
 
    }
 

	
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result<CompScheduling, EvalError> {
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling {
 
        sched_ctx.log(&format!("Running component ComponentRandomU32 (mode: {:?})", self.exec_state.mode));
 

	
 
        match self.exec_state.mode {
 
            CompMode::BlockedGet | CompMode::BlockedSelect => {
 
                // impossible for this component, no input ports and no select
 
                // blocks
 
@@ -70,61 +76,68 @@ impl Component for ComponentRandomU32 {
 
                if self.random_minimum >= self.random_maximum {
 
                    // Could throw an evaluation error, but lets just panic
 
                    panic!("going to crash 'n burn your system now, please provide valid arguments");
 
                }
 

	
 
                if self.num_sends >= self.max_num_sends {
 
                    self.exec_state.mode = CompMode::StartExit;
 
                    self.exec_state.set_as_start_exit(ExitReason::Termination);
 
                } else {
 
                    sched_ctx.log("Entering sync mode");
 
                    self.did_perform_send = false;
 
                    self.consensus.notify_sync_start(comp_ctx);
 
                    self.exec_state.mode = CompMode::Sync;
 
                }
 

	
 
                return Ok(CompScheduling::Immediate);
 
                return CompScheduling::Immediate;
 
            },
 
            CompMode::Sync => {
 
                // This component just sends a single message, then waits until
 
                // consensus has been reached
 
                if !self.did_perform_send {
 
                    sched_ctx.log("Sending random message");
 
                    let mut random = self.generator.next_u32() - self.random_minimum;
 
                    let random_delta = self.random_maximum - self.random_minimum;
 
                    random %= random_delta;
 
                    random += self.random_minimum;
 
                    let value_group = ValueGroup::new_stack(vec![Value::UInt32(random)]);
 

	
 
                    let scheduling = component::default_send_data_message(
 
                        &mut self.exec_state, self.output_port_id, value_group,
 
                    let send_result = component::default_send_data_message(
 
                        &mut self.exec_state, self.output_port_id,
 
                        PortInstruction::NoSource, value_group,
 
                        sched_ctx, &mut self.consensus, comp_ctx
 
                    );
 

	
 
                    if let Err(location_and_message) = send_result {
 
                        component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message);
 
                        return CompScheduling::Immediate
 
                    } else {
 
                        // Blocked or not, we set `did_perform_send` to true. If
 
                        // blocked then the moment we become unblocked (and are back
 
                        // at the `Sync` mode) we have sent the message.
 
                        let scheduling = send_result.unwrap();
 
                        self.did_perform_send = true;
 
                        self.num_sends += 1;
 
                    return Ok(scheduling)
 
                        return scheduling
 
                    }
 
                } else {
 
                    // Message was sent, finish this sync round
 
                    sched_ctx.log("Waiting for consensus");
 
                    self.exec_state.mode = CompMode::SyncEnd;
 
                    let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx);
 
                    component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus);
 
                    return Ok(CompScheduling::Requeue);
 
                    let decision = self.consensus.notify_sync_end_success(sched_ctx, comp_ctx);
 
                    component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, decision, &mut self.consensus);
 
                    return CompScheduling::Requeue;
 
                }
 
            },
 
            CompMode::SyncEnd | CompMode::BlockedPut => return Ok(CompScheduling::Sleep),
 
            CompMode::StartExit => return Ok(component::default_handle_start_exit(
 
                &mut self.exec_state, &mut self.control, sched_ctx, comp_ctx
 
            )),
 
            CompMode::BusyExit => return Ok(component::default_handle_busy_exit(
 
            CompMode::SyncEnd | CompMode::BlockedPut => return CompScheduling::Sleep,
 
            CompMode::StartExit => return component::default_handle_start_exit(
 
                &mut self.exec_state, &mut self.control, sched_ctx, comp_ctx, &mut self.consensus
 
            ),
 
            CompMode::BusyExit => return component::default_handle_busy_exit(
 
                &mut self.exec_state, &self.control, sched_ctx
 
            )),
 
            CompMode::Exit => return Ok(component::default_handle_exit(&self.exec_state)),
 
            ),
 
            CompMode::Exit => return component::default_handle_exit(&self.exec_state),
 
        }
 
    }
 
}
 

	
 
impl ComponentRandomU32 {
 
    pub(crate) fn new(arguments: ValueGroup) -> Self {
src/runtime2/component/consensus.rs
Show inline comments
 
@@ -223,13 +223,17 @@ impl SolutionCombiner {
 
        if channel.putter.is_none() || channel.getter.is_none() {
 
            return None;
 
        }
 

	
 
        let putter = channel.putter.as_ref().unwrap();
 
        let getter = channel.getter.as_ref().unwrap();
 
        return Some(putter.mapping == getter.mapping);
 
        return Some(
 
            !putter.failed &&
 
            !getter.failed &&
 
            putter.mapping == getter.mapping
 
        );
 
    }
 

	
 
    /// Determines the global solution if all components have contributed their
 
    /// local solutions.
 
    fn update_solution(&mut self) {
 
        if self.matched_channels == self.solution.channel_mapping.len() {
 
@@ -310,40 +314,30 @@ impl Consensus {
 
        }
 
    }
 

	
 
    /// Notifies the consensus management that the PDL code has reached the end
 
    /// of a sync block. A local solution will be submitted, after which we wait
 
    /// until the participants in the round (hopefully) reach a conclusion.
 
    pub(crate) fn notify_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx) -> SyncRoundDecision {
 
    pub(crate) fn notify_sync_end_success(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx) -> SyncRoundDecision {
 
        debug_assert_eq!(self.mode, Mode::SyncBusy);
 
        self.mode = Mode::SyncAwaitingSolution;
 

	
 
        // Submit our port mapping as a solution
 
        let mut local_solution = Vec::with_capacity(self.ports.len());
 
        for port in &self.ports {
 
            if let Some(mapping) = port.mapping {
 
                let port_handle = comp_ctx.get_port_handle(port.self_port_id);
 
                let port_info = comp_ctx.get_port(port_handle);
 
                let new_entry = match port_info.kind {
 
                    PortKind::Putter => SyncLocalSolutionEntry::Putter(SyncSolutionPutterPort{
 
                        self_comp_id: comp_ctx.id,
 
                        self_port_id: port_info.self_id,
 
                        mapping
 
                    }),
 
                    PortKind::Getter => SyncLocalSolutionEntry::Getter(SyncSolutionGetterPort{
 
                        self_comp_id: comp_ctx.id,
 
                        self_port_id: port_info.self_id,
 
                        peer_comp_id: port.peer_comp_id,
 
                        peer_port_id: port.peer_port_id,
 
                        mapping
 
                    })
 
                };
 
                local_solution.push(new_entry);
 
            }
 
        let local_solution = self.generate_local_solution(comp_ctx, false);
 
        let decision = self.handle_local_solution(sched_ctx, comp_ctx, comp_ctx.id, local_solution);
 
        return decision;
 
    }
 

	
 
    /// Notifies the consensus management that the component has encountered a
 
    /// critical error during the synchronous round. Hence we should report that
 
    /// we've failed and wait until all the participants have been notified of
 
    /// the error.
 
    pub(crate) fn notify_sync_end_failure(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx) -> SyncRoundDecision {
 
        debug_assert_eq!(self.mode, Mode::SyncBusy);
 
        self.mode = Mode::SyncAwaitingSolution;
 

	
 
        let local_solution = self.generate_local_solution(comp_ctx, true);
 
        let decision = self.handle_local_solution(sched_ctx, comp_ctx, comp_ctx.id, local_solution);
 
        return decision;
 
    }
 

	
 
    /// Notifies that a decision has been reached. Note that the caller should
 
    /// still take the appropriate actions based on the decision it is supplying
 
@@ -377,13 +371,12 @@ impl Consensus {
 

	
 
    /// Handles the arrival of a new data message (needs to be called for every
 
    /// new data message, even though it might not end up being received). This
 
    /// is used to determine peers of `get`ter ports.
 
    // TODO: The use of this function is rather ugly. Find a more robust
 
    //  scheme about owners of `get`ter ports not knowing about their peers.
 
    //  (also, figure out why this was written again, I forgot).
 
    pub(crate) fn handle_incoming_data_message(&mut self, comp_ctx: &CompCtx, message: &DataMessage) {
 
        let target_handle = comp_ctx.get_port_handle(message.data_header.target_port);
 
        let target_index = comp_ctx.get_port_index(target_handle);
 
        let annotation = &mut self.ports[target_index];
 
        debug_assert!(
 
            !annotation.peer_discovered || (
 
@@ -639,15 +632,44 @@ impl Consensus {
 
        if let Some(key) = should_remove {
 
            sched_ctx.runtime.destroy_component(key);
 
        }
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Creating message headers
 
    // Small utilities
 
    // -------------------------------------------------------------------------
 

	
 
    fn generate_local_solution(&self, comp_ctx: &CompCtx, failed: bool) -> SyncLocalSolution {
 
        let mut local_solution = Vec::with_capacity(self.ports.len());
 
        for port in &self.ports {
 
            if let Some(mapping) = port.mapping {
 
                let port_handle = comp_ctx.get_port_handle(port.self_port_id);
 
                let port_info = comp_ctx.get_port(port_handle);
 
                let new_entry = match port_info.kind {
 
                    PortKind::Putter => SyncLocalSolutionEntry::Putter(SyncSolutionPutterPort{
 
                        self_comp_id: comp_ctx.id,
 
                        self_port_id: port_info.self_id,
 
                        mapping,
 
                        failed
 
                    }),
 
                    PortKind::Getter => SyncLocalSolutionEntry::Getter(SyncSolutionGetterPort{
 
                        self_comp_id: comp_ctx.id,
 
                        self_port_id: port_info.self_id,
 
                        peer_comp_id: port.peer_comp_id,
 
                        peer_port_id: port.peer_port_id,
 
                        mapping,
 
                        failed
 
                    })
 
                };
 
                local_solution.push(new_entry);
 
            }
 
        }
 

	
 
        return local_solution;
 
    }
 

	
 
    fn create_data_header_and_update_mapping(&mut self, port_info: &Port) -> MessageDataHeader {
 
        let mut expected_mapping = Vec::with_capacity(self.ports.len());
 
        let mut port_index = usize::MAX;
 
        for (index, port) in self.ports.iter().enumerate() {
 
            if port.self_port_id == port_info.self_id {
 
                port_index = index; // remember for later updating
src/runtime2/component/control_layer.rs
Show inline comments
 
@@ -212,13 +212,13 @@ impl ControlLayer {
 

	
 
        return None;
 
    }
 

	
 
    /// Initiates the control message procedures for closing a port. Caller must
 
    /// make sure that the port state has already been set to `Closed`.
 
    pub(crate) fn initiate_port_closing(&mut self, port_handle: LocalPortHandle, comp_ctx: &CompCtx) -> (LocalPeerHandle, ControlMessage) {
 
    pub(crate) fn initiate_port_closing(&mut self, port_handle: LocalPortHandle, exit_inside_sync: bool, comp_ctx: &CompCtx) -> (LocalPeerHandle, ControlMessage) {
 
        let port = comp_ctx.get_port(port_handle);
 
        let peer_port_id = port.peer_port_id;
 
        debug_assert!(port.state == PortState::Closed);
 

	
 
        // Construct the port-closing entry
 
        let entry_id = self.take_id();
 
@@ -233,13 +233,16 @@ impl ControlLayer {
 
        return (
 
            peer_handle,
 
            ControlMessage{
 
                id: entry_id,
 
                sender_comp_id: comp_ctx.id,
 
                target_port_id: Some(peer_port_id),
 
                content: ControlMessageContent::ClosePort(peer_port_id),
 
                content: ControlMessageContent::ClosePort(ControlMessageClosePort{
 
                    port_to_close: peer_port_id,
 
                    closed_in_sync_round: exit_inside_sync,
 
                }),
 
            }
 
        );
 
    }
 

	
 
    /// Generates the control message used to indicate to a peer that a port
 
    /// should be blocked (expects the caller to have set the port's state to
src/runtime2/component/mod.rs
Show inline comments
 
@@ -5,13 +5,13 @@ mod consensus;
 
mod component;
 
mod component_random;
 
mod component_internet;
 

	
 
pub(crate) use component::{Component, CompScheduling};
 
pub(crate) use component_pdl::{CompPDL};
 
pub(crate) use component_context::CompCtx;
 
pub(crate) use component_context::{CompCtx, PortInstruction};
 
pub(crate) use control_layer::{ControlId};
 

	
 
use super::scheduler::*;
 
use super::runtime::*;
 

	
 
/// If the component is sleeping, then that flag will be atomically set to
src/runtime2/poll/mod.rs
Show inline comments
 
@@ -169,13 +169,12 @@ impl PollingThread {
 
        };
 

	
 
        return Ok((thread_handle, client_factory));
 
    }
 

	
 
    pub(crate) fn run(&mut self) {
 
        use crate::runtime2::scheduler::SchedulerCtx;
 
        use crate::runtime2::communication::Message;
 

	
 
        const NUM_EVENTS: usize = 256;
 
        const EPOLL_DURATION: time::Duration = time::Duration::from_millis(250);
 

	
 
        // @performance: Lot of improvements possible here, a HashMap is likely
src/runtime2/scheduler.rs
Show inline comments
 
@@ -34,12 +34,17 @@ impl<'a> SchedulerCtx<'a> {
 

	
 
    pub(crate) fn log(&self, text: &str) {
 
        if self.logging_enabled {
 
            println!("[s:{:02}, c:{:03}] {}", self.id, self.comp, text);
 
        }
 
    }
 

	
 
    pub(crate) fn error(&self, text: &str) {
 
        // TODO: Probably not always use colour
 
        println!("[s:{:02}, c:{:03}] \x1b[0;31m{}\x1b[0m", self.id, self.comp, text);
 
    }
 
}
 

	
 
impl Scheduler {
 
    // public interface to thread
 

	
 
    pub fn new(runtime: Arc<RuntimeInner>, polling: PollingClient, scheduler_id: u32, debug_logging: bool) -> Self {
 
@@ -64,13 +69,13 @@ impl Scheduler {
 
            // be re-executed immediately.
 
            let mut new_scheduling = CompScheduling::Immediate;
 
            while let CompScheduling::Immediate = new_scheduling {
 
                while let Some(message) = component.inbox.pop() {
 
                    component.component.handle_message(&mut scheduler_ctx, &mut component.ctx, message);
 
                }
 
                new_scheduling = component.component.run(&mut scheduler_ctx, &mut component.ctx).expect("TODO: Handle error");
 
                new_scheduling = component.component.run(&mut scheduler_ctx, &mut component.ctx);
 
            }
 

	
 
            // Handle the new scheduling
 
            match new_scheduling {
 
                CompScheduling::Immediate => unreachable!(),
 
                CompScheduling::Requeue => { self.runtime.enqueue_work(comp_key); },
src/runtime2/stdlib/internet.rs
Show inline comments
 
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
 
use std::mem::size_of;
 
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
 
use std::io::Error as IoError;
 

	
 
use libc::{
 
    c_int,
 
    sockaddr_in, sockaddr_in6, in_addr, in6_addr,
 
    socket, bind, listen, accept, connect, close,
 
};
 
use mio::{event, Interest, Registry, Token};
 

	
 
use crate::runtime2::poll::{AsFileDescriptor, FileDescriptor};
 

	
 
#[derive(Debug)]
 
pub enum SocketError {
 
    Opening,
0 comments (0 inline, 0 general)