Changeset - 1f78496722d1
Cargo.toml
Show inline comments
 
@@ -14,8 +14,7 @@ internet=["libc"]
 

	
 
[dependencies]
 

	
 
libc = { version = "^0.2", optional = true } # raw sockets
 
mio = { version = "0.8", features = ["os-poll"] } # cross-platform IO notification queue
 
libc = { version = "^0.2", optional = true } # raw sockets, epoll access
 

	
 
# randomness
 
rand = "0.8.4"
docs/runtime/sync.md
Show inline comments
 
@@ -144,3 +144,60 @@ Concluding:
 

	
 
- Every data message that is transmitted needs to contain the port mapping of all `put`ting ports (annotating them appropriately if they have not yet been used). We also need to include the port mapping of all `get`ting ports that have a pending/received message. The port mapping for `put`ting ports will only include their own ID, the port mapping for `get`ting ports will include the IDs of their peer as well.
 
- Every arriving data message will immediately be used to identify the sender as the peer of the corresponding `get`ter port. Since messages between components arrive in order this allows us to detect when the `put`s are in a different order at the sender as the `get`s at the receiver.
 

	
 
## Handling Fatal Component Errors
 

	
 
Components may, during their execution, encounter errors that prevent them from continuing executing their code. For the purposes of this chapter we may consider these to occur during two particular phases of their execution:
 

	
 
1. The error occurred outside of a sync block. Or equivalently (from the point of view of the runtime): the error ocurred inside a sync block, but the component has not interacted with other components through `put`/`get` calls.
 
2. The error occurred inside of a sync block. The component can have performed any number of `put`/`get` calls. But for the sake of discussion we will only discuss the case where we perform:
 
   1. One `put` in the synchronous round.
 
   2. One `get` in the synchronous round.
 

	
 
As a preliminary remark: note that encountering an error is nothing special: the component can simply print an error to `stdout` and stop executing. The handling of the error by peers is of importance! If an interaction is made impossible because a peer has stopped executing, then the component that wishes to perform that interaction should error out itself!
 

	
 
### Handling Errors Outside of a Sync Block
 

	
 
If a component `E` encounters a critical error outside of a sync block. Then we can be sure that if it had a lat synchronous round, that it succeeded. However, there might be future synchronous rounds for component `E`, likewise a peer component `C` might have already put a message in `E`'s inbox.
 

	
 
The requirement for the outside-sync error of `E` is that any future sync interactions by `C` will fail (but, if `C` has no future interactions, it shouldn't fail either!). 
 

	
 
Note that `E` cannot perform `put`/`get` requests, because we're assuming `E` is outside of a sync block. Hence the only possible failing interaction is that `C` has performed a `put`, or is attempting a `get`. In the case the `C` `put`s to `E`, then `E` might not have figured out the identity of `C` yet (see earlier remarks on the eventual consistency of peer detection). Hence `C` is responsible for ensuring its own correct shutdown due to a failing `put`. Likewise for a `get`: `C` cannot receive from `E` if it is failing. So if `C` is waiting on a message to arrive, or if it will call `get` in the future, then `C` must fail as well.
 

	
 
In this case it is sufficient for `E` to send around a `ClosePort` message. As detailed in another chapter of this document. However, a particular race condition might occur. We have assumed that `E` is not in a sync block. But `C` is not aware of this fact. `C` might not be able to distinguish between the following three cases:
 

	
 
1. Regular shutdown: Components `C` and `E` are not in a sync round.
 
   - `E` broadcasts `ClosePort`.
 
   - `C` receives `ClosePort`.
 
2. Shutdown within a sync round, `ClosePort` leads `Solution`: A leader component `L`, peer component `C` and failing component `E`. Assume that all are/were busy in a synchronous round with one another.
 
   - `L` broadcasts `Solution` for the current sync round.
 
   - `E` receives `Solution`, finishes round. 
 
   - `E` encounters an error, so sends `ClosePort` to `C`.
 
   - `C` receives `ClosePort` from `E`.
 
   - `C` receives `Solution` from `L`.
 
3. Shutdown within a sync round, `Solution` leads `ClosePort`: Same components `L`, `C` and `E`.
 
   - `L` broadcasts `Solution` for the current sync round.
 
   - `E` receives `Solution` finishes round.
 
   - `E` encounters an error, so sends `ClosePort` to `C`.
 
   - `C` receives `Solution` from `L`.
 
   - `C` receives `ClosePort` from `E`.
 

	
 
In all described cases `E` encounters an error after finishing a sync round. But from the point of view of `C` it is unsure whether the `ClosePort` message pertains to the current synchronous round or not. In case 1 and 3 nothing is out of the ordinary. But in case 2 we have that `C` is at a particular point in time aware of the `ClosePort` from `E`, but not yet of the `Solution` from `L`. `C` should not fail the sync round, as it is completed, but it is unaware of this fact.
 

	
 
As a rather simple solution, since components that are participating with one another in a sync round move in lock-step at the end of the sync block, we send a boolean along with the `ClosePort`, e.g. `ClosePort(nonsync)`. This boolean indicates whether `E` was inside or outside of a sync block during it encountering an error. Now `C` can distinguish between the three cases: in all cases it agrees that `E` was not in a sync block (and hence: the sync round in cases 2 and 3 can be completed).
 

	
 
### Handling Errors Inside of a Sync Block
 

	
 
If `E` is inside of a sync block. Then it has interacted with other components. Our requirement now is that the sync round fails (and ofcourse, that all of the peers are notified that `E` will no longer be present in the runtime). There are two things that are complicating this type of failure:
 

	
 
1. Suppose that in the successful case of the synchronous interaction, there are a large number of components interacting with one another. Now it might be that `E` fails very early in its sync block, such that it cannot interact with several components. This lack of interaction might cause the single sync block to break up into several smaller sync blocks. Each of these separated regions is supposed to fail.
 
2. Within a particular synchronous interaction we might have that the leader `L` has a reference to the component `E` without it being a direct peer. There is a reference counting system in place that makes sure that `L` can always send messages to `E`. But we still need to make sure that those references stay alive for as long as needed. 
 

	
 
Suppose a synchronous region is (partially) established, and the component `E` encounters a critical error. The two points given above imply that two processes need to be initiated. For the first error-handling process, we simply use the same scheme as described in the case where `E` is not in a synchronous region. However now we broadcast `ClosePort(sync)` instead of `ClosePort(nonsync)` messages. Consider the following two cases: 
 

	
 
1. Component `C` is not part of the same synchronous region as `E`. And component `C` has tried `put`ting to `E`. If `C` receives a `ClosePort(sync)`, then it knows that its interaction should fail. Note: it might be that `E` wasn't planning on `get`ting from `C` in the sync round in which `E` failed, but much later. In that case it still makes sense for `C` to fail; it would have failed in the future. A small inconsistency here (within the current infinitely-deadlocking implementation) is that if `E` would *never* `get` from `C`, then `C` would deadlock instead of crash (one could argue that this implies that deadlocking should lead to crashing through a timeout mechanism).
 
2. Component `C` is not part of the same synchronous region as `E`. And if `E` wouldn't have crashed, then it would've `put` a message to `C`. In this case it is still proper for `C` to crash. The reasoning works the same as above.
 

	
 
So that is to say that this `ClosePort(sync)` causes instant failure of `C` if it has used the closed port in a round without consensus, or if it uses that port in the future. Note that this `ClosePort(sync)` system causes cascading failures throughout the disjoint synchronous regions. This is as intended: once one component's PDL program can no longer be executed, we cannot depend on the discovery of all the peers that constitute the intended synchronous region. So instead we rely on a peer-to-peer mechanism to make sure that every component is notified of failure.
 

	
 
However, while these cascading peer-to-peer `ClosePort(sync)` messages are happily shared around, we still have a leader component somewhere, and components that have not yet been notified of the failure.
 
\ No newline at end of file
src/protocol/eval/executor.rs
Show inline comments
 
@@ -203,10 +203,10 @@ pub enum EvalContinuation {
 
    SyncBlockEnd,
 
    NewFork,
 
    BlockFires(PortId),
 
    BlockGet(PortId),
 
    Put(PortId, ValueGroup),
 
    BlockGet(ExpressionId, PortId),
 
    Put(ExpressionId, PortId, ValueGroup),
 
    SelectStart(u32, u32), // (num_cases, num_ports_total)
 
    SelectRegisterPort(u32, u32, PortId), // (case_index, port_index_in_case, port_id)
 
    SelectRegisterPort(ExpressionId, u32, u32, PortId), // (call_expr_id, case_index, port_index_in_case, port_id)
 
    SelectWait, // wait until select can continue
 
    // Returned only in non-sync mode
 
    ComponentTerminated,
 
@@ -630,7 +630,7 @@ impl Prompt {
 
                                            // get run again after we've received a message.
 
                                            cur_frame.expr_values.push_front(value.clone());
 
                                            cur_frame.expr_stack.push_back(ExprInstruction::EvalExpr(expr_id));
 
                                            return Ok(EvalContinuation::BlockGet(port_id));
 
                                            return Ok(EvalContinuation::BlockGet(expr_id, port_id));
 
                                        }
 
                                    }
 
                                },
 
@@ -657,7 +657,7 @@ impl Prompt {
 
                                        cur_frame.expr_values.push_front(port_value);
 
                                        cur_frame.expr_stack.push_back(ExprInstruction::EvalExpr(expr_id));
 
                                        let value_group = ValueGroup::from_store(&self.store, &[deref_msg_value]);
 
                                        return Ok(EvalContinuation::Put(port_id, value_group));
 
                                        return Ok(EvalContinuation::Put(expr_id, port_id, value_group));
 
                                    }
 
                                },
 
                                Method::Fires => {
 
@@ -728,7 +728,7 @@ impl Prompt {
 
                                    // Convert the runtime-variant of a string
 
                                    // into an actual string.
 
                                    let value = cur_frame.expr_values.pop_front().unwrap();
 
                                    let mut is_literal_string = value.get_heap_pos().is_some();
 
                                    let is_literal_string = value.get_heap_pos().is_some();
 
                                    let value = self.store.maybe_read_ref(&value);
 
                                    let value_heap_pos = value.as_string();
 
                                    let elements = &self.store.heap_regions[value_heap_pos as usize].values;
 
@@ -757,7 +757,7 @@ impl Prompt {
 
                                    let port_index = self.store.maybe_read_ref(&cur_frame.expr_values.pop_front().unwrap()).as_uint32();
 
                                    let port_value = self.store.maybe_read_ref(&cur_frame.expr_values.pop_front().unwrap()).as_port_id();
 

	
 
                                    return Ok(EvalContinuation::SelectRegisterPort(case_index, port_index, port_value));
 
                                    return Ok(EvalContinuation::SelectRegisterPort(expr_id, case_index, port_index, port_value));
 
                                },
 
                                Method::SelectWait => {
 
                                    match ctx.performed_select_wait() {
src/protocol/parser/pass_rewriting.rs
Show inline comments
 
@@ -217,7 +217,7 @@ impl Visitor for PassRewriting {
 
                num_ports_expression_id.upcast()
 
            ];
 

	
 
            let call_expression_id = create_ast_call_expr(ctx, self.current_procedure_id, Method::SelectStart, &mut self.expression_buffer, arguments);
 
            let call_expression_id = create_ast_call_expr(ctx, InputSpan::new(), self.current_procedure_id, Method::SelectStart, &mut self.expression_buffer, arguments);
 
            let call_statement_id = create_ast_expression_stmt(ctx, call_expression_id.upcast());
 

	
 
            transformed_stmts.push(call_statement_id.upcast());
 
@@ -232,6 +232,11 @@ impl Visitor for PassRewriting {
 
                let case_num_ports = case.involved_ports.len();
 

	
 
                for case_port_index in 0..case_num_ports {
 
                    // Retrieve original span in case of error reporting for the
 
                    // inserted function call
 
                    let original_get_call_id = ctx.heap[id].cases[case_index].involved_ports[case_port_index].0;
 
                    let original_get_call_span = ctx.heap[original_get_call_id].full_span;
 

	
 
                    // Arguments to runtime call
 
                    let (port_variable_id, port_variable_type) = locals[total_port_index]; // so far this variable contains the temporary variables for the port expressions
 
                    let case_index_expr_id = create_ast_literal_integer_expr(ctx, self.current_procedure_id, case_index as u64, ctx.arch.uint32_type_id);
 
@@ -244,7 +249,7 @@ impl Visitor for PassRewriting {
 
                    ];
 

	
 
                    // Create runtime call, then store it
 
                    let runtime_call_expr_id = create_ast_call_expr(ctx, self.current_procedure_id, Method::SelectRegisterCasePort, &mut self.expression_buffer, runtime_call_arguments);
 
                    let runtime_call_expr_id = create_ast_call_expr(ctx, original_get_call_span, self.current_procedure_id, Method::SelectRegisterCasePort, &mut self.expression_buffer, runtime_call_arguments);
 
                    let runtime_call_stmt_id = create_ast_expression_stmt(ctx, runtime_call_expr_id.upcast());
 

	
 
                    transformed_stmts.push(runtime_call_stmt_id.upcast());
 
@@ -261,7 +266,7 @@ impl Visitor for PassRewriting {
 
        locals.push((select_variable_id, select_variable_type));
 

	
 
        {
 
            let runtime_call_expr_id = create_ast_call_expr(ctx, self.current_procedure_id, Method::SelectWait, &mut self.expression_buffer, Vec::new());
 
            let runtime_call_expr_id = create_ast_call_expr(ctx, InputSpan::new(), self.current_procedure_id, Method::SelectWait, &mut self.expression_buffer, Vec::new());
 
            let variable_stmt_id = create_ast_variable_declaration_stmt(ctx, self.current_procedure_id, select_variable_id, select_variable_type, runtime_call_expr_id.upcast());
 
            transformed_stmts.push(variable_stmt_id.upcast().upcast());
 
        }
 
@@ -364,7 +369,12 @@ fn create_ast_variable_expr(ctx: &mut Ctx, containing_procedure_id: ProcedureDef
 
    });
 
}
 

	
 
fn create_ast_call_expr(ctx: &mut Ctx, containing_procedure_id: ProcedureDefinitionId, method: Method, buffer: &mut ScopedBuffer<ExpressionId>, arguments: Vec<ExpressionId>) -> CallExpressionId {
 
/// Creates an AST call expression. The provided expression span is useful for
 
/// the cases where we perform compiler-builtin function calls that, when they
 
/// fail, should provide an error pointing at a specific point in the source
 
/// code. The `containing_procedure_id` is the procedure whose instructions are
 
/// going to contain this new call expression.
 
fn create_ast_call_expr(ctx: &mut Ctx, span: InputSpan, containing_procedure_id: ProcedureDefinitionId, method: Method, buffer: &mut ScopedBuffer<ExpressionId>, arguments: Vec<ExpressionId>) -> CallExpressionId {
 
    let call_type_id = match method {
 
        Method::SelectStart => ctx.arch.void_type_id,
 
        Method::SelectRegisterCasePort => ctx.arch.void_type_id,
 
@@ -377,7 +387,7 @@ fn create_ast_call_expr(ctx: &mut Ctx, containing_procedure_id: ProcedureDefinit
 
    let call_expression_id = ctx.heap.alloc_call_expression(|this| CallExpression{
 
        func_span: InputSpan::new(),
 
        this,
 
        full_span: InputSpan::new(),
 
        full_span: span,
 
        parser_type: ParserType{
 
            elements: Vec::new(),
 
            full_span: InputSpan::new(),
src/runtime/connector.rs
Show inline comments
 
@@ -333,7 +333,7 @@ impl ConnectorPDL {
 

	
 
                return ConnectorScheduling::Immediate;
 
            },
 
            EvalContinuation::BlockGet(port_id) => {
 
            EvalContinuation::BlockGet(_expr_id, port_id) => {
 
                // Branch performed a `get()` on a port that does not have a
 
                // received message on that port.
 
                let port_id = PortIdLocal::new(port_id.id);
 
@@ -391,7 +391,7 @@ impl ConnectorPDL {
 
                let right_branch = &mut self.tree[right_id];
 
                right_branch.prepared = PreparedStatement::ForkedExecution(false);
 
            }
 
            EvalContinuation::Put(port_id, content) => {
 
            EvalContinuation::Put(_expr_id, port_id, content) => {
 
                // Branch is attempting to send data
 
                let port_id = PortIdLocal::new(port_id.id);
 
                let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx);
src/runtime2/communication.rs
Show inline comments
 
@@ -17,29 +17,6 @@ impl PortId {
 
    }
 
}
 

	
 
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
 
pub enum PortKind {
 
    Putter,
 
    Getter,
 
}
 

	
 
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 
pub enum PortState {
 
    Open,
 
    BlockedDueToPeerChange,
 
    BlockedDueToFullBuffers,
 
    Closed,
 
}
 

	
 
impl PortState {
 
    pub fn is_blocked(&self) -> bool {
 
        match self {
 
            PortState::BlockedDueToPeerChange | PortState::BlockedDueToFullBuffers => true,
 
            PortState::Open | PortState::Closed => false,
 
        }
 
    }
 
}
 

	
 
pub struct Channel {
 
    pub putter_id: PortId,
 
    pub getter_id: PortId,
 
@@ -111,6 +88,7 @@ pub struct SyncSolutionGetterPort {
 
    pub peer_comp_id: CompId,
 
    pub peer_port_id: PortId,
 
    pub mapping: u32,
 
    pub failed: bool,
 
}
 

	
 
/// Putter port in a solution. A putter may not be certain about who its peer
 
@@ -120,6 +98,7 @@ pub struct SyncSolutionPutterPort {
 
    pub self_comp_id: CompId,
 
    pub self_port_id: PortId,
 
    pub mapping: u32,
 
    pub failed: bool,
 
}
 

	
 
#[derive(Debug)]
 
@@ -171,14 +150,21 @@ pub struct ControlMessage {
 
    pub content: ControlMessageContent,
 
}
 

	
 
/// Content of a control message. If the content refers to a port then the
 
/// `target_port_id` field is the one that it refers to.
 
#[derive(Copy, Clone, Debug)]
 
pub enum ControlMessageContent {
 
    Ack,
 
    BlockPort(PortId),
 
    UnblockPort(PortId),
 
    ClosePort(PortId),
 
    PortPeerChangedBlock(PortId),
 
    PortPeerChangedUnblock(PortId, CompId),
 
    BlockPort,
 
    UnblockPort,
 
    ClosePort(ControlMessageClosePort),
 
    PortPeerChangedBlock,
 
    PortPeerChangedUnblock(PortId, CompId), // contains (new_port_id, new_component_id)
 
}
 

	
 
#[derive(Copy, Clone, Debug)]
 
pub struct ControlMessageClosePort {
 
    pub closed_in_sync_round: bool, // needed to ensure correct handling of errors
 
}
 

	
 
// -----------------------------------------------------------------------------
src/runtime2/component/component.rs
Show inline comments
 
use std::fmt::{Display as FmtDisplay, Result as FmtResult, Formatter};
 

	
 
use crate::protocol::eval::{Prompt, EvalError, ValueGroup, PortId as EvalPortId};
 
use crate::protocol::*;
 
use crate::runtime2::*;
 
@@ -17,6 +19,29 @@ pub enum CompScheduling {
 
    Exit,
 
}
 

	
 
/// Potential error emitted by a component
 
pub enum CompError {
 
    /// Error originating from the code executor. Hence has an associated
 
    /// source location.
 
    Executor(EvalError),
 
    /// Error originating from a component, but not necessarily associated with
 
    /// a location in the source.
 
    Component(String), // TODO: Maybe a different embedded value in the future?
 
    /// Pure runtime error. Not necessarily originating from the component
 
    /// itself. Should be treated as a very severe runtime-compromising error.
 
    Runtime(RtError),
 
}
 

	
 
impl FmtDisplay for CompError {
 
    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
 
        match self {
 
            CompError::Executor(v) => v.fmt(f),
 
            CompError::Component(v) => v.fmt(f),
 
            CompError::Runtime(v) => v.fmt(f),
 
        }
 
    }
 
}
 

	
 
/// Generic representation of a component (as viewed by a scheduler).
 
pub(crate) trait Component {
 
    /// Called upon the creation of the component. Note that the scheduler
 
@@ -39,10 +64,12 @@ pub(crate) trait Component {
 

	
 
    /// Called if the component's routine should be executed. The return value
 
    /// can be used to indicate when the routine should be run again.
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result<CompScheduling, EvalError>;
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling;
 
}
 

	
 
/// Representation of the generic operating mode of a component.
 
/// Representation of the generic operating mode of a component. Although not
 
/// every state may be used by every kind of (builtin) component, this allows
 
/// writing standard handlers for particular events in a component's lifetime.
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub(crate) enum CompMode {
 
    NonSync, // not in sync mode
 
@@ -51,8 +78,8 @@ pub(crate) enum CompMode {
 
    BlockedGet, // blocked because we need to receive a message on a particular port
 
    BlockedPut, // component is blocked because the port is blocked
 
    BlockedSelect, // waiting on message to complete the select statement
 
    StartExit, // temporary state: if encountered then we start the shutdown process
 
    BusyExit, // temporary state: waiting for Acks for all the closed ports
 
    StartExit, // temporary state: if encountered then we start the shutdown process.
 
    BusyExit, // temporary state: waiting for Acks for all the closed ports, potentially waiting for sync round to finish
 
    Exit, // exiting: shutdown process started, now waiting until the reference count drops to 0
 
}
 

	
 
@@ -65,6 +92,43 @@ impl CompMode {
 
            NonSync | StartExit | BusyExit | Exit => false,
 
        }
 
    }
 

	
 
    pub(crate) fn is_busy_exiting(&self) -> bool {
 
        use CompMode::*;
 

	
 
        match self {
 
            NonSync | Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect => false,
 
            StartExit | BusyExit => true,
 
            Exit => false,
 
        }
 
    }
 
}
 

	
 
#[derive(Debug)]
 
pub(crate) enum ExitReason {
 
    Termination, // regular termination of component
 
    ErrorInSync,
 
    ErrorNonSync,
 
}
 

	
 
impl ExitReason {
 
    pub(crate) fn is_in_sync(&self) -> bool {
 
        use ExitReason::*;
 

	
 
        match self {
 
            Termination | ErrorNonSync => false,
 
            ErrorInSync => true,
 
        }
 
    }
 

	
 
    pub(crate) fn is_error(&self) -> bool {
 
        use ExitReason::*;
 

	
 
        match self {
 
            Termination => false,
 
            ErrorInSync | ErrorNonSync => true,
 
        }
 
    }
 
}
 

	
 
/// Component execution state: the execution mode along with some descriptive
 
@@ -74,6 +138,7 @@ pub(crate) struct CompExecState {
 
    pub mode: CompMode,
 
    pub mode_port: PortId, // valid if blocked on a port (put/get)
 
    pub mode_value: ValueGroup, // valid if blocked on a put
 
    pub exit_reason: ExitReason, // valid if in StartExit/BusyExit/Exit mode
 
}
 

	
 
impl CompExecState {
 
@@ -82,9 +147,15 @@ impl CompExecState {
 
            mode: CompMode::NonSync,
 
            mode_port: PortId::new_invalid(),
 
            mode_value: ValueGroup::default(),
 
            exit_reason: ExitReason::Termination,
 
        }
 
    }
 

	
 
    pub(crate) fn set_as_start_exit(&mut self, reason: ExitReason) {
 
        self.mode = CompMode::StartExit;
 
        self.exit_reason = reason;
 
    }
 

	
 
    pub(crate) fn set_as_blocked_get(&mut self, port: PortId) {
 
        self.mode = CompMode::BlockedGet;
 
        self.mode_port = port;
 
@@ -110,6 +181,12 @@ impl CompExecState {
 
    }
 
}
 

	
 
// TODO: Replace when implementing port sending. Should probably be incorporated
 
//  into CompCtx (and rename CompCtx into CompComms)
 
pub(crate) type InboxMain = Vec<Option<DataMessage>>;
 
pub(crate) type InboxMainRef = [Option<DataMessage>];
 
pub(crate) type InboxBackup = Vec<DataMessage>;
 

	
 
/// Creates a new component based on its definition. Meaning that if it is a
 
/// user-defined component then we set up the PDL code state. Otherwise we
 
/// construct a custom component. This does NOT take care of port and message
 
@@ -153,28 +230,39 @@ pub(crate) fn create_component(
 
/// scheduling value must be used.
 
#[must_use]
 
pub(crate) fn default_send_data_message(
 
    exec_state: &mut CompExecState, transmitting_port_id: PortId, value: ValueGroup,
 
    exec_state: &mut CompExecState, transmitting_port_id: PortId,
 
    port_instruction: PortInstruction, value: ValueGroup,
 
    sched_ctx: &SchedulerCtx, consensus: &mut Consensus, comp_ctx: &mut CompCtx
 
) -> CompScheduling {
 
) -> Result<CompScheduling, (PortInstruction, String)> {
 
    debug_assert_eq!(exec_state.mode, CompMode::Sync);
 

	
 
    // TODO: Handle closed ports
 
    let port_handle = comp_ctx.get_port_handle(transmitting_port_id);
 
    let port_info = comp_ctx.get_port_mut(port_handle);
 
    port_info.last_instruction = port_instruction;
 

	
 
    let port_info = comp_ctx.get_port(port_handle);
 
    debug_assert_eq!(port_info.kind, PortKind::Putter);
 
    if port_info.state.is_blocked() {
 

	
 
    if port_info.state.is_closed() {
 
        // Note: normally peer is eventually consistent, but if it has shut down
 
        // then we can be sure it is consistent (I think?)
 
        return Err((
 
            port_info.last_instruction,
 
            format!("Cannot send on this port, as the peer (id:{}) has shut down", port_info.peer_comp_id.0)
 
        ))
 
    } else if port_info.state.is_blocked() {
 
        // Port is blocked, so we cannot send
 
        exec_state.set_as_blocked_put(transmitting_port_id, value);
 

	
 
        return CompScheduling::Sleep;
 
        return Ok(CompScheduling::Sleep);
 
    } else {
 
        // Port is not blocked, so send to the peer
 
        let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 
        let annotated_message = consensus.annotate_data_message(comp_ctx, port_info, value);
 
        peer_info.handle.send_message(&sched_ctx.runtime, Message::Data(annotated_message), true);
 
        peer_info.handle.send_message_logged(sched_ctx, Message::Data(annotated_message), true);
 

	
 
        return CompScheduling::Immediate;
 
        return Ok(CompScheduling::Immediate);
 
    }
 
}
 

	
 
@@ -191,10 +279,14 @@ pub(crate) enum IncomingData {
 
//  nicest if the sending component can figure out it cannot send any more data.
 
#[must_use]
 
pub(crate) fn default_handle_incoming_data_message(
 
    exec_state: &mut CompExecState, port_value_slot: &mut Option<DataMessage>,
 
    exec_state: &mut CompExecState, inbox_main: &mut InboxMain,
 
    comp_ctx: &mut CompCtx, incoming_message: DataMessage,
 
    sched_ctx: &SchedulerCtx, control: &mut ControlLayer
 
) -> IncomingData {
 
    let port_handle = comp_ctx.get_port_handle(incoming_message.data_header.target_port);
 
    let port_index = comp_ctx.get_port_index(port_handle);
 
    comp_ctx.get_port_mut(port_handle).received_message_for_sync = true;
 
    let port_value_slot = &mut inbox_main[port_index];
 
    let target_port_id = incoming_message.data_header.target_port;
 

	
 
    if port_value_slot.is_none() {
 
@@ -205,7 +297,6 @@ pub(crate) fn default_handle_incoming_data_message(
 
        dbg_code!({
 
            // Our port cannot have been blocked itself, because we're able to
 
            // directly insert the message into its slot.
 
            let port_handle = comp_ctx.get_port_handle(target_port_id);
 
            assert!(!comp_ctx.get_port(port_handle).state.is_blocked());
 
        });
 

	
 
@@ -220,55 +311,123 @@ pub(crate) fn default_handle_incoming_data_message(
 
    } else {
 
        // Slot is already full, so if the port was previously opened, it will
 
        // now become closed
 
        let port_handle = comp_ctx.get_port_handle(target_port_id);
 
        let port_info = comp_ctx.get_port_mut(port_handle);
 
        debug_assert!(port_info.state == PortState::Open || port_info.state.is_blocked()); // i.e. not closed, but will go off if more states are added in the future
 
        if port_info.state.is_open() {
 
            port_info.state.set(PortStateFlag::BlockedDueToFullBuffers);
 

	
 
        if port_info.state == PortState::Open {
 
            comp_ctx.set_port_state(port_handle, PortState::BlockedDueToFullBuffers);
 
            let (peer_handle, message) =
 
                control.initiate_port_blocking(comp_ctx, port_handle);
 
            let peer = comp_ctx.get_peer(peer_handle);
 
            peer.handle.send_message(&sched_ctx.runtime, Message::Control(message), true);
 
            peer.handle.send_message_logged(sched_ctx, Message::Control(message), true);
 
        }
 

	
 
        return IncomingData::SlotFull(incoming_message)
 
    }
 
}
 

	
 
pub(crate) enum GetResult {
 
    Received(DataMessage),
 
    NoMessage,
 
    Error((PortInstruction, String)),
 
}
 

	
 
/// Default attempt at trying to receive from a port (i.e. through a `get`, or
 
/// the equivalent operation for a builtin component). `target_port` is the port
 
/// we're trying to receive from, and the `target_port_instruction` is the
 
/// instruction we're attempting on this port.
 
pub(crate) fn default_attempt_get(
 
    exec_state: &mut CompExecState, target_port: PortId, target_port_instruction: PortInstruction,
 
    inbox_main: &mut InboxMainRef, inbox_backup: &mut InboxBackup, sched_ctx: &SchedulerCtx,
 
    comp_ctx: &mut CompCtx, control: &mut ControlLayer, consensus: &mut Consensus
 
) -> GetResult {
 
    let port_handle = comp_ctx.get_port_handle(target_port);
 
    let port_index = comp_ctx.get_port_index(port_handle);
 

	
 
    let port_info = comp_ctx.get_port_mut(port_handle);
 
    port_info.last_instruction = target_port_instruction;
 
    if port_info.state.is_closed() {
 
        let peer_id = port_info.peer_comp_id;
 
        return GetResult::Error((
 
            target_port_instruction,
 
            format!("Cannot get from this port, as the peer component (id:{}) closed the port", peer_id.0)
 
        ));
 
    }
 

	
 
    if let Some(message) = &inbox_main[port_index] {
 
        if consensus.try_receive_data_message(sched_ctx, comp_ctx, message) {
 
            // We're allowed to receive this message
 
            let message = inbox_main[port_index].take().unwrap();
 
            debug_assert_eq!(target_port, message.data_header.target_port);
 

	
 
            // Note: we can still run into an unrecoverable error when actually
 
            // receiving this message
 
            match default_handle_received_data_message(
 
                target_port, target_port_instruction, inbox_main, inbox_backup,
 
                comp_ctx, sched_ctx, control,
 
            ) {
 
                Ok(()) => return GetResult::Received(message),
 
                Err(location_and_message) => return GetResult::Error(location_and_message)
 
            }
 
        } else {
 
            // We're not allowed to receive this message. This means that the
 
            // receiver is attempting to receive something out of order with
 
            // respect to the sender.
 
            return GetResult::Error((target_port_instruction, String::from(
 
                "Cannot get from this port, as this causes a deadlock. This happens if you `get` in a different order as another component `put`s"
 
            )));
 
        }
 
    } else {
 
        // We don't have a message waiting for us and the port is not blocked.
 
        // So enter the BlockedGet state
 
        exec_state.set_as_blocked_get(target_port);
 
        return GetResult::NoMessage;
 
    }
 
}
 

	
 
/// Default handling that has been received through a `get`. Will check if any
 
/// more messages are waiting, and if the corresponding port was blocked because
 
/// of full buffers (hence, will use the control layer to make sure the peer
 
/// will become unblocked).
 
pub(crate) fn default_handle_received_data_message(
 
    targeted_port: PortId, slot: &mut Option<DataMessage>, inbox_backup: &mut Vec<DataMessage>,
 
    targeted_port: PortId, port_instruction: PortInstruction,
 
    inbox_main: &mut InboxMainRef, inbox_backup: &mut InboxBackup,
 
    comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx, control: &mut ControlLayer
 
) {
 
) -> Result<(), (PortInstruction, String)> {
 
    let port_handle = comp_ctx.get_port_handle(targeted_port);
 
    let port_index = comp_ctx.get_port_index(port_handle);
 
    let slot = &mut inbox_main[port_index];
 
    debug_assert!(slot.is_none()); // because we've just received from it
 

	
 
    // Check if there are any more messages in the backup buffer
 
    let port_handle = comp_ctx.get_port_handle(targeted_port);
 
    // Modify last-known location where port instruction was retrieved
 
    let port_info = comp_ctx.get_port(port_handle);
 
    debug_assert_ne!(port_info.last_instruction, PortInstruction::None); // set by caller
 
    debug_assert!(port_info.state.is_open()); // checked by caller
 

	
 
    // Check if there are any more messages in the backup buffer
 
    for message_index in 0..inbox_backup.len() {
 
        let message = &inbox_backup[message_index];
 
        if message.data_header.target_port == targeted_port {
 
            // One more message, place it in the slot
 
            let message = inbox_backup.remove(message_index);
 
            debug_assert!(port_info.state.is_blocked()); // since we're removing another message from the backup
 
            debug_assert!(comp_ctx.get_port(port_handle).state.is_blocked()); // since we're removing another message from the backup
 
            *slot = Some(message);
 

	
 
            return;
 
            return Ok(());
 
        }
 
    }
 

	
 
    // Did not have any more messages, so if we were blocked, then we need to
 
    // unblock the port now (and inform the peer of this unblocking)
 
    if port_info.state == PortState::BlockedDueToFullBuffers {
 
        comp_ctx.set_port_state(port_handle, PortState::Open);
 
    if port_info.state.is_set(PortStateFlag::BlockedDueToFullBuffers) {
 
        let port_info = comp_ctx.get_port_mut(port_handle);
 
        port_info.state.clear(PortStateFlag::BlockedDueToFullBuffers);
 

	
 
        let (peer_handle, message) = control.cancel_port_blocking(comp_ctx, port_handle);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 
        peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(message), true);
 
        peer_info.handle.send_message_logged(sched_ctx, Message::Control(message), true);
 
    }
 

	
 
    return Ok(());
 
}
 

	
 
/// Handles control messages in the default way. Note that this function may
 
@@ -279,78 +438,160 @@ pub(crate) fn default_handle_received_data_message(
 
pub(crate) fn default_handle_control_message(
 
    exec_state: &mut CompExecState, control: &mut ControlLayer, consensus: &mut Consensus,
 
    message: ControlMessage, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx
 
) {
 
) -> Result<(), (PortInstruction, String)> {
 
    match message.content {
 
        ControlMessageContent::Ack => {
 
            default_handle_ack(control, message.id, sched_ctx, comp_ctx);
 
        },
 
        ControlMessageContent::BlockPort(port_id) => {
 
        ControlMessageContent::BlockPort => {
 
            // One of our messages was accepted, but the port should be
 
            // blocked.
 
            let port_handle = comp_ctx.get_port_handle(port_id);
 
            let port_info = comp_ctx.get_port(port_handle);
 
            let port_to_block = message.target_port_id.unwrap();
 
            let port_handle = comp_ctx.get_port_handle(port_to_block);
 
            let port_info = comp_ctx.get_port_mut(port_handle);
 
            debug_assert_eq!(port_info.kind, PortKind::Putter);
 
            if port_info.state == PortState::Open {
 
                // only when open: we don't do this when closed, and we we don't do this if we're blocked due to peer changes
 
                comp_ctx.set_port_state(port_handle, PortState::BlockedDueToFullBuffers);
 
            }
 
            port_info.state.set(PortStateFlag::BlockedDueToFullBuffers);
 
        },
 
        ControlMessageContent::ClosePort(port_id) => {
 
        ControlMessageContent::ClosePort(content) => {
 
            // Request to close the port. We immediately comply and remove
 
            // the component handle as well
 
            let port_handle = comp_ctx.get_port_handle(port_id);
 
            let peer_comp_id = comp_ctx.get_port(port_handle).peer_comp_id;
 
            let port_to_close = message.target_port_id.unwrap();
 
            let port_handle = comp_ctx.get_port_handle(port_to_close);
 

	
 
            // We're closing the port, so we will always update the peer of the
 
            // port (in case of error messages)
 
            let port_info = comp_ctx.get_port_mut(port_handle);
 
            port_info.peer_comp_id = message.sender_comp_id;
 
            port_info.close_at_sync_end = true; // might be redundant (we might set it closed now)
 

	
 
            let peer_comp_id = port_info.peer_comp_id;
 
            let peer_handle = comp_ctx.get_peer_handle(peer_comp_id);
 

	
 
            // One exception to sending an `Ack` is if we just closed the
 
            // port ourselves, meaning that the `ClosePort` messages got
 
            // sent to one another.
 
            if let Some(control_id) = control.has_close_port_entry(port_handle, comp_ctx) {
 
                // The two components (sender and this component) are closing
 
                // the channel at the same time. So we don't care about the
 
                // content of the `ClosePort` message.
 
                default_handle_ack(control, control_id, sched_ctx, comp_ctx);
 
            } else {
 
                // Respond to the message
 
                let port_info = comp_ctx.get_port(port_handle);
 
                let last_instruction = port_info.last_instruction;
 
                let port_has_had_message = port_info.received_message_for_sync;
 
                default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx);
 
                comp_ctx.remove_peer(sched_ctx, port_handle, peer_comp_id, false); // do not remove if closed
 
                comp_ctx.set_port_state(port_handle, PortState::Closed); // now set to closed
 
                comp_ctx.change_port_peer(sched_ctx, port_handle, None);
 

	
 
                // Handle any possible error conditions (which boil down to: the
 
                // port has been used, but the peer has died). If not in sync
 
                // mode then we close the port immediately.
 

	
 
                // Note that `port_was_used` does not mean that any messages
 
                // were actually received. It might also mean that e.g. the
 
                // component attempted a `get`, but there were no messages, so
 
                // now it is in the `BlockedGet` state.
 
                let port_was_used = last_instruction != PortInstruction::None;
 

	
 
                if exec_state.mode.is_in_sync_block() {
 
                    let closed_during_sync_round = content.closed_in_sync_round && port_was_used;
 
                    let closed_before_sync_round = !content.closed_in_sync_round && !port_has_had_message;
 

	
 
                    if closed_during_sync_round || closed_before_sync_round {
 
                        return Err((
 
                            last_instruction,
 
                            format!("Peer component (id:{}) shut down, so communication cannot (have) succeed(ed)", peer_comp_id.0)
 
                        ));
 
                    }
 
                } else {
 
                    let port_info = comp_ctx.get_port_mut(port_handle);
 
                    port_info.state.set(PortStateFlag::Closed);
 
                }
 
            }
 
        },
 
        ControlMessageContent::UnblockPort(port_id) => {
 
        ControlMessageContent::UnblockPort => {
 
            // We were previously blocked (or already closed)
 
            let port_handle = comp_ctx.get_port_handle(port_id);
 
            let port_info = comp_ctx.get_port(port_handle);
 
            let port_to_unblock = message.target_port_id.unwrap();
 
            let port_handle = comp_ctx.get_port_handle(port_to_unblock);
 
            let port_info = comp_ctx.get_port_mut(port_handle);
 

	
 
            debug_assert_eq!(port_info.kind, PortKind::Putter);
 
            if port_info.state == PortState::BlockedDueToFullBuffers {
 
                default_handle_unblock_put(exec_state, consensus, port_handle, sched_ctx, comp_ctx);
 
            }
 
            debug_assert!(port_info.state.is_set(PortStateFlag::BlockedDueToFullBuffers));
 

	
 
            port_info.state.clear(PortStateFlag::BlockedDueToFullBuffers);
 
            default_handle_recently_unblocked_port(exec_state, consensus, port_handle, sched_ctx, comp_ctx);
 
        },
 
        ControlMessageContent::PortPeerChangedBlock(port_id) => {
 
        ControlMessageContent::PortPeerChangedBlock => {
 
            // The peer of our port has just changed. So we are asked to
 
            // temporarily block the port (while our original recipient is
 
            // potentially rerouting some of the in-flight messages) and
 
            // Ack. Then we wait for the `unblock` call.
 
            debug_assert_eq!(message.target_port_id, Some(port_id));
 
            let port_handle = comp_ctx.get_port_handle(port_id);
 
            comp_ctx.set_port_state(port_handle, PortState::BlockedDueToPeerChange);
 
            let port_to_change = message.target_port_id.unwrap();
 
            let port_handle = comp_ctx.get_port_handle(port_to_change);
 

	
 
            let port_info = comp_ctx.get_port(port_handle);
 
            let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 
            let port_info = comp_ctx.get_port_mut(port_handle);
 
            let peer_comp_id = port_info.peer_comp_id;
 
            port_info.state.set(PortStateFlag::BlockedDueToPeerChange);
 
            let peer_handle = comp_ctx.get_peer_handle(peer_comp_id);
 

	
 
            default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx);
 
        },
 
        ControlMessageContent::PortPeerChangedUnblock(new_port_id, new_comp_id) => {
 
            let port_handle = comp_ctx.get_port_handle(message.target_port_id.unwrap());
 
            let port_to_change = message.target_port_id.unwrap();
 
            let port_handle = comp_ctx.get_port_handle(port_to_change);
 
            let port_info = comp_ctx.get_port(port_handle);
 
            debug_assert!(port_info.state == PortState::BlockedDueToPeerChange);
 
            debug_assert!(port_info.state.is_set(PortStateFlag::BlockedDueToPeerChange));
 
            let old_peer_id = port_info.peer_comp_id;
 

	
 
            comp_ctx.remove_peer(sched_ctx, port_handle, old_peer_id, false);
 

	
 
            let port_info = comp_ctx.get_port_mut(port_handle);
 
            port_info.peer_comp_id = new_comp_id;
 
            port_info.peer_port_id = new_port_id;
 
            comp_ctx.add_peer(port_handle, sched_ctx, new_comp_id, None);
 
            default_handle_unblock_put(exec_state, consensus, port_handle, sched_ctx, comp_ctx);
 

	
 
            port_info.state.clear(PortStateFlag::BlockedDueToPeerChange);
 
            comp_ctx.change_port_peer(sched_ctx, port_handle, Some(new_comp_id));
 
            default_handle_recently_unblocked_port(exec_state, consensus, port_handle, sched_ctx, comp_ctx);
 
        }
 
    }
 

	
 
    return Ok(());
 
}
 

	
 
/// Handles a component entering the synchronous block. Will ensure that the
 
/// `Consensus` and the `ComponentCtx` are initialized properly.
 
pub(crate) fn default_handle_sync_start(
 
    exec_state: &mut CompExecState, inbox_main: &mut InboxMainRef,
 
    sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus
 
) {
 
    sched_ctx.info("Component starting sync mode");
 

	
 
    // If any messages are present for this sync round, set the appropriate flag
 
    // and notify the consensus handler of the present messages
 
    consensus.notify_sync_start(comp_ctx);
 
    for (port_index, message) in inbox_main.iter().enumerate() {
 
        if let Some(message) = message {
 
            consensus.handle_incoming_data_message(comp_ctx, message);
 
            let port_info = comp_ctx.get_port_by_index_mut(port_index);
 
            port_info.received_message_for_sync = true;
 
        }
 
    }
 

	
 
    // Modify execution state
 
    debug_assert_eq!(exec_state.mode, CompMode::NonSync);
 
    exec_state.mode = CompMode::Sync;
 
}
 

	
 
/// Handles a component that has reached the end of the sync block. This does
 
/// not necessarily mean that the component will go into the `NonSync` mode, as
 
/// it might have to wait for the leader to finish the round for everyone (see
 
/// `default_handle_sync_decision`)
 
pub(crate) fn default_handle_sync_end(
 
    exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx,
 
    consensus: &mut Consensus
 
) {
 
    sched_ctx.info("Component ending sync mode (but possibly waiting for a solution)");
 
    debug_assert_eq!(exec_state.mode, CompMode::Sync);
 
    let decision = consensus.notify_sync_end_success(sched_ctx, comp_ctx);
 
    exec_state.mode = CompMode::SyncEnd;
 
    default_handle_sync_decision(sched_ctx, exec_state, comp_ctx, decision, consensus);
 
}
 

	
 
/// Handles a component initiating the exiting procedure, and closing all of its
 
@@ -359,29 +600,37 @@ pub(crate) fn default_handle_control_message(
 
#[must_use]
 
pub(crate) fn default_handle_start_exit(
 
    exec_state: &mut CompExecState, control: &mut ControlLayer,
 
    sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx
 
    sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus
 
) -> CompScheduling {
 
    debug_assert_eq!(exec_state.mode, CompMode::StartExit);
 
    sched_ctx.log("Component starting exit");
 
    sched_ctx.info(&format!("Component starting exit (reason: {:?})", exec_state.exit_reason));
 
    exec_state.mode = CompMode::BusyExit;
 
    let exit_inside_sync = exec_state.exit_reason.is_in_sync();
 

	
 
    // If exiting while inside sync mode, report to the leader of the current
 
    // round that we've failed.
 
    if exit_inside_sync {
 
        let decision = consensus.notify_sync_end_failure(sched_ctx, comp_ctx);
 
        default_handle_sync_decision(sched_ctx, exec_state, comp_ctx, decision, consensus);
 
    }
 

	
 
    // Iterating by index to work around borrowing rules
 
    // Iterating over ports by index to work around borrowing rules
 
    for port_index in 0..comp_ctx.num_ports() {
 
        let port = comp_ctx.get_port_by_index_mut(port_index);
 
        if port.state == PortState::Closed {
 
        if port.state.is_closed() || port.close_at_sync_end {
 
            // Already closed, or in the process of being closed
 
            continue;
 
        }
 

	
 
        // Mark as closed
 
        let port_id = port.self_id;
 
        port.state = PortState::Closed;
 
        port.state.set(PortStateFlag::Closed);
 

	
 
        // Notify peer of closing
 
        let port_handle = comp_ctx.get_port_handle(port_id);
 
        let (peer, message) = control.initiate_port_closing(port_handle, comp_ctx);
 
        let (peer, message) = control.initiate_port_closing(port_handle, exit_inside_sync, comp_ctx);
 
        let peer_info = comp_ctx.get_peer(peer);
 
        peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(message), true);
 
        peer_info.handle.send_message_logged(sched_ctx, Message::Control(message), true);
 
    }
 

	
 
    return CompScheduling::Immediate; // to check if we can shut down immediately
 
@@ -396,10 +645,10 @@ pub(crate) fn default_handle_busy_exit(
 
) -> CompScheduling {
 
    debug_assert_eq!(exec_state.mode, CompMode::BusyExit);
 
    if control.has_acks_remaining() {
 
        sched_ctx.log("Component busy exiting, still has `Ack`s remaining");
 
        sched_ctx.info("Component busy exiting, still has `Ack`s remaining");
 
        return CompScheduling::Sleep;
 
    } else {
 
        sched_ctx.log("Component busy exiting, now shutting down");
 
        sched_ctx.info("Component busy exiting, now shutting down");
 
        exec_state.mode = CompMode::Exit;
 
        return CompScheduling::Exit;
 
    }
 
@@ -408,28 +657,74 @@ pub(crate) fn default_handle_busy_exit(
 
/// Handles a potential synchronous round decision. If there was a decision then
 
/// the `Some(success)` value indicates whether the round succeeded or not.
 
/// Might also end up changing the `ExecState`.
 
///
 
/// Might be called in two cases:
 
/// 1. The component is in regular execution mode, at the end of a sync round,
 
///     and is waiting for a solution to the round.
 
/// 2. The component has encountered an error during a sync round and is
 
///     exiting, hence is waiting for a "Failure" message from the leader.
 
pub(crate) fn default_handle_sync_decision(
 
    exec_state: &mut CompExecState, decision: SyncRoundDecision,
 
    consensus: &mut Consensus
 
    sched_ctx: &SchedulerCtx, exec_state: &mut CompExecState, comp_ctx: &mut CompCtx,
 
    decision: SyncRoundDecision, consensus: &mut Consensus
 
) -> Option<bool> {
 
    debug_assert_eq!(exec_state.mode, CompMode::SyncEnd);
 
    let success = match decision {
 
        SyncRoundDecision::None => return None,
 
        SyncRoundDecision::Solution => true,
 
        SyncRoundDecision::Failure => false,
 
    };
 

	
 
    debug_assert_eq!(exec_state.mode, CompMode::SyncEnd);
 
    debug_assert!(
 
        exec_state.mode == CompMode::SyncEnd || (
 
            exec_state.mode.is_busy_exiting() && exec_state.exit_reason.is_error()
 
        ) || (
 
            exec_state.mode.is_in_sync_block() && decision == SyncRoundDecision::Failure
 
        )
 
    );
 

	
 
    sched_ctx.info(&format!("Handling decision {:?} (in mode: {:?})", decision, exec_state.mode));
 
    consensus.notify_sync_decision(decision);
 
    if success {
 
        // We cannot get a success message if the component has encountered an
 
        // error.
 
        for port_index in 0..comp_ctx.num_ports() {
 
            let port_info = comp_ctx.get_port_by_index_mut(port_index);
 
            if port_info.close_at_sync_end {
 
                port_info.state.set(PortStateFlag::Closed);
 
            }
 
        }
 
        debug_assert_eq!(exec_state.mode, CompMode::SyncEnd);
 
        exec_state.mode = CompMode::NonSync;
 
        consensus.notify_sync_decision(decision);
 
        return Some(true);
 
    } else {
 
        exec_state.mode = CompMode::StartExit;
 
        // We may get failure both in all possible cases. But we should only
 
        // modify the execution state if we're not already in exit mode
 
        if !exec_state.mode.is_busy_exiting() {
 
            sched_ctx.error("failed synchronous round, initiating exit");
 
            exec_state.set_as_start_exit(ExitReason::ErrorNonSync);
 
        }
 
        return Some(false);
 
    }
 
}
 

	
 
/// Performs the default action of printing the provided error, and then putting
 
/// the component in the state where it will shut down. Only to be used for
 
/// builtin components: their error message construction is simpler (and more
 
/// common) as they don't have any source code.
 
pub(crate) fn default_handle_error_for_builtin(
 
    exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx,
 
    location_and_message: (PortInstruction, String)
 
) {
 
    let (_location, message) = location_and_message;
 
    sched_ctx.error(&message);
 

	
 
    let exit_reason = if exec_state.mode.is_in_sync_block() {
 
        ExitReason::ErrorInSync
 
    } else {
 
        ExitReason::ErrorNonSync
 
    };
 

	
 
    exec_state.set_as_start_exit(exit_reason);
 
}
 

	
 
#[inline]
 
pub(crate) fn default_handle_exit(_exec_state: &CompExecState) -> CompScheduling {
 
@@ -454,7 +749,7 @@ fn default_handle_ack(
 
            AckAction::SendMessage(target_comp, message) => {
 
                // FIX @NoDirectHandle
 
                let mut handle = sched_ctx.runtime.get_component_public(target_comp);
 
                handle.send_message(&sched_ctx.runtime, Message::Control(message), true);
 
                handle.send_message_logged(sched_ctx, Message::Control(message), true);
 
                let _should_remove = handle.decrement_users();
 
                debug_assert!(_should_remove.is_none());
 
            },
 
@@ -486,7 +781,7 @@ fn default_send_ack(
 
    sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx
 
) {
 
    let peer_info = comp_ctx.get_peer(peer_handle);
 
    peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(ControlMessage{
 
    peer_info.handle.send_message_logged(sched_ctx, Message::Control(ControlMessage{
 
        id: causer_of_ack_id,
 
        sender_comp_id: comp_ctx.id,
 
        target_port_id: None,
 
@@ -496,14 +791,13 @@ fn default_send_ack(
 

	
 
/// Handles the unblocking of a putter port. In case there is a pending message
 
/// on that port then it will be sent.
 
fn default_handle_unblock_put(
 
fn default_handle_recently_unblocked_port(
 
    exec_state: &mut CompExecState, consensus: &mut Consensus,
 
    port_handle: LocalPortHandle, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx,
 
) {
 
    let port_info = comp_ctx.get_port_mut(port_handle);
 
    let port_id = port_info.self_id;
 
    debug_assert!(port_info.state.is_blocked());
 
    port_info.state = PortState::Open;
 
    debug_assert!(!port_info.state.is_blocked()); // should have been done by the caller
 

	
 
    if exec_state.is_blocked_on_put(port_id) {
 
        // Annotate the message that we're going to send
 
@@ -515,7 +809,7 @@ fn default_handle_unblock_put(
 
        // Retrieve peer to send the message
 
        let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 
        peer_info.handle.send_message(&sched_ctx.runtime, Message::Data(to_send), true);
 
        peer_info.handle.send_message_logged(sched_ctx, Message::Data(to_send), true);
 

	
 
        exec_state.mode = CompMode::Sync; // because we're blocked on a `put`, we must've started in the sync state.
 
        exec_state.mode_port = PortId::new_invalid();
src/runtime2/component/component_context.rs
Show inline comments
 
use std::fmt::{Debug, Formatter, Result as FmtResult};
 

	
 
use crate::runtime2::scheduler::*;
 
use crate::runtime2::runtime::*;
 
use crate::runtime2::communication::*;
 

	
 
use crate::protocol::ExpressionId;
 

	
 
/// Helper struct to remember when the last operation on the port took place.
 
#[derive(Debug, PartialEq, Copy, Clone)]
 
pub enum PortInstruction {
 
    None,
 
    NoSource,
 
    SourceLocation(ExpressionId),
 
}
 

	
 
/// Directionality of a port
 
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
 
pub enum PortKind {
 
    Putter,
 
    Getter,
 
}
 

	
 
/// Bitflags for port
 
// TODO: Incorporate remaining flags from `Port` struct
 
#[repr(u32)]
 
#[derive(Debug, Copy, Clone)]
 
pub enum PortStateFlag {
 
    Closed = 0x01, // If not closed, then the port is open
 
    BlockedDueToPeerChange = 0x02, // busy changing peers, hence use of port is temporarily blocked
 
    BlockedDueToFullBuffers = 0x04,
 
}
 

	
 
#[derive(Copy, Clone)]
 
pub struct PortState {
 
    flags: u32
 
}
 

	
 
impl PortState {
 
    pub(crate) fn new() -> PortState {
 
        return PortState{ flags: 0 }
 
    }
 

	
 
    // high-level
 

	
 
    #[inline]
 
    pub fn is_open(&self) -> bool {
 
        return !self.is_closed();
 
    }
 

	
 
    #[inline]
 
    pub fn is_closed(&self) -> bool {
 
        return self.is_set(PortStateFlag::Closed);
 
    }
 

	
 
    #[inline]
 
    pub fn is_blocked(&self) -> bool {
 
        return
 
            self.is_set(PortStateFlag::BlockedDueToPeerChange) ||
 
            self.is_set(PortStateFlag::BlockedDueToFullBuffers);
 
    }
 

	
 
    // lower-level utils
 
    #[inline]
 
    pub fn set(&mut self, flag: PortStateFlag) {
 
        self.flags |= flag as u32;
 
    }
 

	
 
    #[inline]
 
    pub fn clear(&mut self, flag: PortStateFlag) {
 
        self.flags &= !(flag as u32);
 
    }
 

	
 
    #[inline]
 
    pub const fn is_set(&self, flag: PortStateFlag) -> bool {
 
        return (self.flags & (flag as u32)) != 0;
 
    }
 
}
 

	
 
impl Debug for PortState {
 
    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
 
        use PortStateFlag::*;
 

	
 
        let mut s = f.debug_struct("PortState");
 
        for (flag_name, flag_value) in &[
 
            ("closed", Closed),
 
            ("blocked_peer_change", BlockedDueToPeerChange),
 
            ("blocked_full_buffers", BlockedDueToFullBuffers)
 
        ] {
 
            s.field(flag_name, &self.is_set(*flag_value));
 
        }
 

	
 
        return s.finish();
 
    }
 
}
 

	
 
#[derive(Debug)]
 
pub struct Port {
 
    // Identifiers
 
    pub self_id: PortId,
 
    pub peer_comp_id: CompId, // eventually consistent
 
    pub peer_port_id: PortId, // eventually consistent
 
    // Generic operating state
 
    pub kind: PortKind,
 
    pub state: PortState,
 
    #[cfg(debug_assertions)] pub(crate) associated_with_peer: bool,
 
    // State tracking for error detection and error handling
 
    pub last_instruction: PortInstruction, // used during sync round to detect port-closed-during-sync errors
 
    pub received_message_for_sync: bool, // used during sync round to detect port-closed-before-sync errors
 
    pub close_at_sync_end: bool, // set during sync round when receiving a port-closed-after-sync message
 
    pub(crate) associated_with_peer: bool,
 
}
 

	
 
pub struct Peer {
 
@@ -56,17 +154,23 @@ impl CompCtx {
 
            self_id: putter_id,
 
            peer_port_id: getter_id,
 
            kind: PortKind::Putter,
 
            state: PortState::Open,
 
            state: PortState::new(),
 
            peer_comp_id: self.id,
 
            #[cfg(debug_assertions)] associated_with_peer: false,
 
            last_instruction: PortInstruction::None,
 
            close_at_sync_end: false,
 
            received_message_for_sync: false,
 
            associated_with_peer: false,
 
        });
 
        self.ports.push(Port{
 
            self_id: getter_id,
 
            peer_port_id: putter_id,
 
            kind: PortKind::Getter,
 
            state: PortState::Open,
 
            state: PortState::new(),
 
            peer_comp_id: self.id,
 
            #[cfg(debug_assertions)] associated_with_peer: false,
 
            last_instruction: PortInstruction::None,
 
            close_at_sync_end: false,
 
            received_message_for_sync: false,
 
            associated_with_peer: false,
 
        });
 

	
 
        return Channel{ putter_id, getter_id };
 
@@ -77,7 +181,10 @@ impl CompCtx {
 
        let self_id = PortId(self.take_port_id());
 
        self.ports.push(Port{
 
            self_id, peer_comp_id, peer_port_id, kind, state,
 
            #[cfg(debug_assertions)] associated_with_peer: false,
 
            last_instruction: PortInstruction::None,
 
            close_at_sync_end: false,
 
            received_message_for_sync: false,
 
            associated_with_peer: false,
 
        });
 
        return LocalPortHandle(self_id);
 
    }
 
@@ -90,66 +197,53 @@ impl CompCtx {
 
        return port;
 
    }
 

	
 
    /// Adds a new peer. This must be called for every port, no matter the
 
    /// component the channel is connected to. If a `CompHandle` is supplied,
 
    /// then it will be used to add the peer. Otherwise it will be retrieved
 
    /// from the runtime using its ID.
 
    pub(crate) fn add_peer(&mut self, port_handle: LocalPortHandle, sched_ctx: &SchedulerCtx, peer_comp_id: CompId, handle: Option<&CompHandle>) {
 
        let self_id = self.id;
 
        let port = self.get_port_mut(port_handle);
 
        debug_assert_eq!(port.peer_comp_id, peer_comp_id);
 
        dbg_code!(assert!(!port.associated_with_peer));
 
        if !Self::requires_peer_reference(port, self_id, false) {
 
            return;
 
        }
 

	
 
        dbg_code!(port.associated_with_peer = true);
 
        match self.get_peer_index_by_id(peer_comp_id) {
 
            Some(peer_index) => {
 
    /// Changes a peer
 
    pub(crate) fn change_port_peer(&mut self, sched_ctx: &SchedulerCtx, port_handle: LocalPortHandle, new_peer_comp_id: Option<CompId>) {
 
        // If port is currently associated with a peer, then remove that peer
 
        let port_index = self.get_port_index(port_handle);
 
        let port = &mut self.ports[port_index];
 
        let port_is_closed = port.state.is_closed();
 
        if port.associated_with_peer {
 
            // Remove old peer association
 
            port.associated_with_peer = false;
 
            let peer_comp_id = port.peer_comp_id;
 
            let peer_index = self.get_peer_index_by_id(peer_comp_id).unwrap();
 
            let peer = &mut self.peers[peer_index];
 
                peer.num_associated_ports += 1;
 
            },
 
            None => {
 
                let handle = match handle {
 
                    Some(handle) => handle.clone(),
 
                    None => sched_ctx.runtime.get_component_public(peer_comp_id)
 
                };
 
                self.peers.push(Peer{
 
                    id: peer_comp_id,
 
                    num_associated_ports: 1,
 
                    handle,
 
                });
 
            }
 
        }
 
    }
 

	
 
    /// Removes a peer associated with a port.
 
    pub(crate) fn remove_peer(&mut self, sched_ctx: &SchedulerCtx, port_handle: LocalPortHandle, peer_id: CompId, also_remove_if_closed: bool) {
 
        let self_id = self.id;
 
        let port = self.get_port_mut(port_handle);
 
        debug_assert_eq!(port.peer_comp_id, peer_id);
 
        if !Self::requires_peer_reference(port, self_id, also_remove_if_closed) {
 
            return;
 
        }
 

	
 
        dbg_code!(assert!(port.associated_with_peer));
 
        dbg_code!(port.associated_with_peer = false);
 
        let peer_index = self.get_peer_index_by_id(peer_id).unwrap();
 
        let peer = &mut self.peers[peer_index];
 
            peer.num_associated_ports -= 1;
 
            if peer.num_associated_ports == 0 {
 
                let mut peer = self.peers.remove(peer_index);
 
                if let Some(key) = peer.handle.decrement_users() {
 
                debug_assert_ne!(key.downgrade(), self.id); // should be upheld by the code that shuts down a component
 
                    sched_ctx.runtime.destroy_component(key);
 
                }
 
            }
 
        }
 

	
 
    pub(crate) fn set_port_state(&mut self, port_handle: LocalPortHandle, new_state: PortState) {
 
        let port_info = self.get_port_mut(port_handle);
 
        debug_assert_ne!(port_info.state, PortState::Closed); // because then we do not expect to change the state
 
        port_info.state = new_state;
 
        // If there is a new peer, then set it as the peer associated with the
 
        // port
 
        if let Some(peer_id) = new_peer_comp_id {
 
            let port = &mut self.ports[port_index];
 
            port.peer_comp_id = peer_id;
 

	
 
            if peer_id != self.id && !port_is_closed {
 
                port.associated_with_peer = true;
 

	
 
                match self.get_peer_index_by_id(peer_id) {
 
                    Some(index) => {
 
                        let peer = &mut self.peers[index];
 
                        peer.num_associated_ports += 1;
 
                    },
 
                    None => {
 
                        let handle = sched_ctx.runtime.get_component_public(peer_id);
 
                        self.peers.push(Peer {
 
                            id: peer_id,
 
                            num_associated_ports: 1,
 
                            handle
 
                        })
 
                    }
 
                }
 
            }
 
        }
 
    }
 

	
 
    pub(crate) fn get_port_handle(&self, port_id: PortId) -> LocalPortHandle {
 
@@ -215,7 +309,7 @@ impl CompCtx {
 

	
 
    #[inline]
 
    fn requires_peer_reference(port: &Port, self_id: CompId, required_if_closed: bool) -> bool {
 
        return (port.state != PortState::Closed || required_if_closed) && port.peer_comp_id != self_id;
 
        return (!port.state.is_closed() || required_if_closed) && port.peer_comp_id != self_id;
 
    }
 

	
 
    fn must_get_port_index(&self, handle: LocalPortHandle) -> usize {
src/runtime2/component/component_internet.rs
Show inline comments
 
use crate::protocol::eval::{ValueGroup, Value, EvalError};
 
use crate::protocol::eval::{ValueGroup, Value};
 
use crate::runtime2::*;
 
use crate::runtime2::component::{CompCtx, CompId};
 
use crate::runtime2::component::{CompCtx, CompId, PortInstruction};
 
use crate::runtime2::stdlib::internet::*;
 
use crate::runtime2::poll::*;
 

	
 
@@ -41,8 +41,8 @@ pub struct ComponentTcpClient {
 
    socket_state: SocketState,
 
    sync_state: SyncState,
 
    poll_ticket: Option<PollTicket>,
 
    inbox_main: Option<DataMessage>,
 
    inbox_backup: Vec<DataMessage>,
 
    inbox_main: InboxMain,
 
    inbox_backup: InboxBackup,
 
    pdl_input_port_id: PortId, // input from PDL, so transmitted over socket
 
    pdl_output_port_id: PortId, // output towards PDL, so received over socket
 
    input_union_send_tag_value: i64,
 
@@ -90,8 +90,9 @@ impl Component for ComponentTcpClient {
 
    }
 

	
 
    fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, message: DataMessage) {
 
        if self.inbox_main.is_none() {
 
            self.inbox_main = Some(message);
 
        let slot = &mut self.inbox_main[0];
 
        if slot.is_none() {
 
            *slot = Some(message);
 
        } else {
 
            self.inbox_backup.push(message);
 
        }
 
@@ -104,22 +105,24 @@ impl Component for ComponentTcpClient {
 
            },
 
            Message::Sync(message) => {
 
                let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message);
 
                component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus);
 
                component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, comp_ctx, decision, &mut self.consensus);
 
            },
 
            Message::Control(message) => {
 
                component::default_handle_control_message(
 
                if let Err(location_and_message) = component::default_handle_control_message(
 
                    &mut self.exec_state, &mut self.control, &mut self.consensus,
 
                    message, sched_ctx, comp_ctx
 
                );
 
                ) {
 
                    component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message);
 
                }
 
            },
 
            Message::Poll => {
 
                sched_ctx.log("Received polling event");
 
                sched_ctx.info("Received polling event");
 
            },
 
        }
 
    }
 

	
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result<CompScheduling, EvalError> {
 
        sched_ctx.log(&format!("Running component ComponentTcpClient (mode: {:?}, sync state: {:?})", self.exec_state.mode, self.sync_state));
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling {
 
        sched_ctx.info(&format!("Running component ComponentTcpClient (mode: {:?}, sync state: {:?})", self.exec_state.mode, self.sync_state));
 

	
 
        match self.exec_state.mode {
 
            CompMode::BlockedSelect => {
 
@@ -132,20 +135,21 @@ impl Component for ComponentTcpClient {
 
                    SocketState::Connected(_socket) => {
 
                        if self.sync_state == SyncState::FinishSyncThenQuit {
 
                            // Previous request was to let the component shut down
 
                            self.exec_state.mode = CompMode::StartExit;
 
                            self.exec_state.set_as_start_exit(ExitReason::Termination);
 
                        } else {
 
                            // Reset for a new request
 
                            self.sync_state = SyncState::AwaitingCmd;
 
                            self.consensus.notify_sync_start(comp_ctx);
 
                            self.exec_state.mode = CompMode::Sync;
 
                            component::default_handle_sync_start(
 
                                &mut self.exec_state, &mut self.inbox_main, sched_ctx, comp_ctx, &mut self.consensus
 
                            );
 
                        }
 
                        return Ok(CompScheduling::Immediate);
 
                        return CompScheduling::Immediate;
 
                    },
 
                    SocketState::Error => {
 
                        // Could potentially send an error message to the
 
                        // connected component.
 
                        self.exec_state.mode = CompMode::StartExit;
 
                        return Ok(CompScheduling::Immediate);
 
                        self.exec_state.set_as_start_exit(ExitReason::ErrorNonSync);
 
                        return CompScheduling::Immediate;
 
                    }
 
                }
 
            },
 
@@ -153,17 +157,12 @@ impl Component for ComponentTcpClient {
 
                // When in sync mode: wait for a command to come in
 
                match self.sync_state {
 
                    SyncState::AwaitingCmd => {
 
                        if let Some(message) = &self.inbox_main {
 
                            self.consensus.handle_incoming_data_message(comp_ctx, &message);
 
                            if self.consensus.try_receive_data_message(sched_ctx, comp_ctx, &message) {
 
                                // Check which command we're supposed to execute.
 
                                let message = self.inbox_main.take().unwrap();
 
                                let target_port_id = message.data_header.target_port;
 
                                component::default_handle_received_data_message(
 
                                    target_port_id, &mut self.inbox_main, &mut self.inbox_backup,
 
                                    comp_ctx, sched_ctx, &mut self.control
 
                                );
 

	
 
                        match component::default_attempt_get(
 
                            &mut self.exec_state, self.pdl_input_port_id, PortInstruction::NoSource,
 
                            &mut self.inbox_main, &mut self.inbox_backup, sched_ctx, comp_ctx,
 
                            &mut self.control, &mut self.consensus
 
                        ) {
 
                            GetResult::Received(message) => {
 
                                let (tag_value, embedded_heap_pos) = message.content.values[0].as_union();
 
                                if tag_value == self.input_union_send_tag_value {
 
                                    // Retrieve bytes from the message
 
@@ -178,29 +177,28 @@ impl Component for ComponentTcpClient {
 
                                    }
 

	
 
                                    self.sync_state = SyncState::Putting;
 
                                    return Ok(CompScheduling::Immediate);
 
                                } else if tag_value == self.input_union_receive_tag_value {
 
                                    // Component requires a `recv`
 
                                    self.sync_state = SyncState::Getting;
 
                                    return Ok(CompScheduling::Immediate);
 
                                } else if tag_value == self.input_union_finish_tag_value {
 
                                    // Component requires us to end the sync round
 
                                    self.sync_state = SyncState::FinishSync;
 
                                    return Ok(CompScheduling::Immediate);
 
                                } else if tag_value == self.input_union_shutdown_tag_value {
 
                                    // Component wants to close the connection
 
                                    self.sync_state = SyncState::FinishSyncThenQuit;
 
                                    return Ok(CompScheduling::Immediate);
 
                                } else {
 
                                    unreachable!("got tag_value {}", tag_value)
 
                                }
 
                            } else {
 
                                todo!("handle sync failure due to message deadlock");
 
                                return Ok(CompScheduling::Sleep);
 

	
 
                                return CompScheduling::Immediate;
 
                            },
 
                            GetResult::NoMessage => {
 
                                return CompScheduling::Sleep;
 
                            },
 
                            GetResult::Error(location_and_message) => {
 
                                component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message);
 
                                return CompScheduling::Immediate;
 
                            }
 
                        } else {
 
                            self.exec_state.set_as_blocked_get(self.pdl_input_port_id);
 
                            return Ok(CompScheduling::Sleep);
 
                        }
 
                    },
 
                    SyncState::Putting => {
 
@@ -216,7 +214,7 @@ impl Component for ComponentTcpClient {
 
                                },
 
                                Err(err) => {
 
                                    if err.kind() == IoErrorKind::WouldBlock {
 
                                        return Ok(CompScheduling::Sleep); // wait until notified
 
                                        return CompScheduling::Sleep; // wait until notified
 
                                    } else {
 
                                        todo!("handle socket.send error {:?}", err)
 
                                    }
 
@@ -226,10 +224,8 @@ impl Component for ComponentTcpClient {
 

	
 
                        // If here then we're done putting the data, we can
 
                        // finish the sync round
 
                        let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx);
 
                        self.exec_state.mode = CompMode::SyncEnd;
 
                        component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus);
 
                        return Ok(CompScheduling::Immediate);
 
                        component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus);
 
                        return CompScheduling::Requeue;
 
                    },
 
                    SyncState::Getting => {
 
                        // We're going to try and receive a single message. If
 
@@ -243,13 +239,19 @@ impl Component for ComponentTcpClient {
 
                            Ok(num_received) => {
 
                                self.byte_buffer.resize(num_received, 0);
 
                                let message_content = self.bytes_to_data_message_content(&self.byte_buffer);
 
                                let scheduling = component::default_send_data_message(&mut self.exec_state, self.pdl_output_port_id, message_content, sched_ctx, &mut self.consensus, comp_ctx);
 
                                let send_result = component::default_send_data_message(&mut self.exec_state, self.pdl_output_port_id, PortInstruction::NoSource, message_content, sched_ctx, &mut self.consensus, comp_ctx);
 
                                if let Err(location_and_message) = send_result {
 
                                    component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message);
 
                                    return CompScheduling::Immediate;
 
                                } else {
 
                                    let scheduling = send_result.unwrap();
 
                                    self.sync_state = SyncState::AwaitingCmd;
 
                                return Ok(scheduling);
 
                                    return scheduling;
 
                                }
 
                            },
 
                            Err(err) => {
 
                                if err.kind() == IoErrorKind::WouldBlock {
 
                                    return Ok(CompScheduling::Sleep); // wait until polled
 
                                    return CompScheduling::Sleep; // wait until polled
 
                                } else {
 
                                    todo!("handle socket.receive error {:?}", err)
 
                                }
 
@@ -257,26 +259,24 @@ impl Component for ComponentTcpClient {
 
                        }
 
                    },
 
                    SyncState::FinishSync | SyncState::FinishSyncThenQuit => {
 
                        let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx);
 
                        self.exec_state.mode = CompMode::SyncEnd;
 
                        component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus);
 
                        return Ok(CompScheduling::Requeue);
 
                        component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus);
 
                        return CompScheduling::Requeue;
 
                    },
 
                }
 
            },
 
            CompMode::BlockedGet => {
 
                // Entered when awaiting a new command
 
                debug_assert_eq!(self.sync_state, SyncState::AwaitingCmd);
 
                return Ok(CompScheduling::Sleep);
 
                return CompScheduling::Sleep;
 
            },
 
            CompMode::SyncEnd | CompMode::BlockedPut =>
 
                return Ok(CompScheduling::Sleep),
 
                return CompScheduling::Sleep,
 
            CompMode::StartExit =>
 
                return Ok(component::default_handle_start_exit(&mut self.exec_state, &mut self.control, sched_ctx, comp_ctx)),
 
                return component::default_handle_start_exit(&mut self.exec_state, &mut self.control, sched_ctx, comp_ctx, &mut self.consensus),
 
            CompMode::BusyExit =>
 
                return Ok(component::default_handle_busy_exit(&mut self.exec_state, &mut self.control, sched_ctx)),
 
                return component::default_handle_busy_exit(&mut self.exec_state, &mut self.control, sched_ctx),
 
            CompMode::Exit =>
 
                return Ok(component::default_handle_exit(&self.exec_state)),
 
                return component::default_handle_exit(&self.exec_state),
 
        }
 
    }
 
}
 
@@ -311,7 +311,7 @@ impl ComponentTcpClient {
 
            socket_state: SocketState::Connected(socket.unwrap()),
 
            sync_state: SyncState::AwaitingCmd,
 
            poll_ticket: None,
 
            inbox_main: None,
 
            inbox_main: vec![None],
 
            inbox_backup: Vec::new(),
 
            input_union_send_tag_value: -1,
 
            input_union_receive_tag_value: -1,
src/runtime2/component/component_pdl.rs
Show inline comments
 
@@ -13,7 +13,8 @@ use crate::runtime2::communication::*;
 

	
 
use super::component::{
 
    self,
 
    CompExecState, Component, CompScheduling, CompMode,
 
    InboxMain, InboxBackup, GetResult,
 
    CompExecState, Component, CompScheduling, CompError, CompMode, ExitReason,
 
    port_id_from_eval, port_id_to_eval
 
};
 
use super::component_context::*;
 
@@ -107,8 +108,6 @@ enum SelectDecision {
 
    Case(u32), // contains case index, should be passed along to PDL code
 
}
 

	
 
type InboxMain = Vec<Option<DataMessage>>;
 

	
 
impl SelectState {
 
    fn new() -> Self {
 
        return Self{
 
@@ -245,7 +244,7 @@ impl Component for CompPDL {
 
        // sched_ctx.log(&format!("handling message: {:?}", message));
 
        if let Some(new_target) = self.control.should_reroute(&mut message) {
 
            let mut target = sched_ctx.runtime.get_component_public(new_target); // TODO: @NoDirectHandle
 
            target.send_message(&sched_ctx.runtime, message, false); // not waking up: we schedule once we've received all PortPeerChanged Acks
 
            target.send_message_logged(sched_ctx, message, false); // not waking up: we schedule once we've received all PortPeerChanged Acks
 
            let _should_remove = target.decrement_users();
 
            debug_assert!(_should_remove.is_none());
 
            return;
 
@@ -256,10 +255,12 @@ impl Component for CompPDL {
 
                self.handle_incoming_data_message(sched_ctx, comp_ctx, message);
 
            },
 
            Message::Control(message) => {
 
                component::default_handle_control_message(
 
                if let Err(location_and_message) = component::default_handle_control_message(
 
                    &mut self.exec_state, &mut self.control, &mut self.consensus,
 
                    message, sched_ctx, comp_ctx
 
                );
 
                ) {
 
                    self.handle_generic_component_error(sched_ctx, location_and_message);
 
                }
 
            },
 
            Message::Sync(message) => {
 
                self.handle_incoming_sync_message(sched_ctx, comp_ctx, message);
 
@@ -270,10 +271,10 @@ impl Component for CompPDL {
 
        }
 
    }
 

	
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result<CompScheduling, EvalError> {
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling {
 
        use EvalContinuation as EC;
 

	
 
        sched_ctx.log(&format!("Running component (mode: {:?})", self.exec_state.mode));
 
        sched_ctx.info(&format!("Running component (mode: {:?})", self.exec_state.mode));
 

	
 
        // Depending on the mode don't do anything at all, take some special
 
        // actions, or fall through and run the PDL code.
 
@@ -282,87 +283,115 @@ impl Component for CompPDL {
 
                // continue and run PDL code
 
            },
 
            CompMode::SyncEnd | CompMode::BlockedGet | CompMode::BlockedPut | CompMode::BlockedSelect => {
 
                return Ok(CompScheduling::Sleep);
 
                return CompScheduling::Sleep;
 
            }
 
            CompMode::StartExit => return Ok(component::default_handle_start_exit(
 
                &mut self.exec_state, &mut self.control, sched_ctx, comp_ctx
 
            )),
 
            CompMode::BusyExit => return Ok(component::default_handle_busy_exit(
 
            CompMode::StartExit => return component::default_handle_start_exit(
 
                &mut self.exec_state, &mut self.control, sched_ctx, comp_ctx, &mut self.consensus
 
            ),
 
            CompMode::BusyExit => return component::default_handle_busy_exit(
 
                &mut self.exec_state, &self.control, sched_ctx
 
            )),
 
            CompMode::Exit => return Ok(component::default_handle_exit(&self.exec_state)),
 
            ),
 
            CompMode::Exit => return component::default_handle_exit(&self.exec_state),
 
        }
 

	
 
        let run_result = self.execute_prompt(&sched_ctx);
 
        if let Err(error) = run_result {
 
            self.handle_component_error(sched_ctx, CompError::Executor(error));
 
            return CompScheduling::Immediate;
 
        }
 

	
 
        let run_result = self.execute_prompt(&sched_ctx)?;
 
        let run_result = run_result.unwrap();
 

	
 
        match run_result {
 
            EC::Stepping => unreachable!(), // execute_prompt runs until this is no longer returned
 
            EC::BranchInconsistent | EC::NewFork | EC::BlockFires(_) => todo!("remove these"),
 
            // Results that can be returned in sync mode
 
            EC::SyncBlockEnd => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::Sync);
 
                self.handle_sync_end(sched_ctx, comp_ctx);
 
                return Ok(CompScheduling::Immediate);
 
                component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus);
 
                return CompScheduling::Immediate;
 
            },
 
            EC::BlockGet(port_id) => {
 
            EC::BlockGet(expr_id, port_id) => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::Sync);
 
                debug_assert!(self.exec_ctx.stmt.is_none());
 

	
 
                let port_id = port_id_from_eval(port_id);
 
                let port_handle = comp_ctx.get_port_handle(port_id);
 
                let port_index = comp_ctx.get_port_index(port_handle);
 
                if let Some(message) = &self.inbox_main[port_index] {
 
                    // Check if we can actually receive the message
 
                    if self.consensus.try_receive_data_message(sched_ctx, comp_ctx, message) {
 
                        // Message was received. Make sure any blocked peers and
 
                        // pending messages are handled.
 
                        let message = self.inbox_main[port_index].take().unwrap();
 
                        component::default_handle_received_data_message(
 
                            port_id, &mut self.inbox_main[port_index], &mut self.inbox_backup,
 
                            comp_ctx, sched_ctx, &mut self.control
 
                        );
 

	
 
                match component::default_attempt_get(
 
                    &mut self.exec_state, port_id, PortInstruction::SourceLocation(expr_id),
 
                    &mut self.inbox_main, &mut self.inbox_backup, sched_ctx, comp_ctx,
 
                    &mut self.control, &mut self.consensus
 
                ) {
 
                    GetResult::Received(message) => {
 
                        self.exec_ctx.stmt = ExecStmt::PerformedGet(message.content);
 
                        return Ok(CompScheduling::Immediate);
 
                    } else {
 
                        todo!("handle sync failure due to message deadlock");
 
                        return Ok(CompScheduling::Sleep);
 
                        return CompScheduling::Immediate;
 
                    },
 
                    GetResult::NoMessage => {
 
                        return CompScheduling::Sleep;
 
                    },
 
                    GetResult::Error(location_and_message) => {
 
                        self.handle_generic_component_error(sched_ctx, location_and_message);
 
                        return CompScheduling::Immediate;
 
                    }
 
                } else {
 
                    // We need to wait
 
                    self.exec_state.set_as_blocked_get(port_id);
 
                    return Ok(CompScheduling::Sleep);
 
                }
 
            },
 
            EC::Put(port_id, value) => {
 
            EC::Put(expr_id, port_id, value) => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::Sync);
 
                sched_ctx.log(&format!("Putting value {:?}", value));
 
                sched_ctx.info(&format!("Putting value {:?}", value));
 

	
 
                // Send the message
 
                let target_port_id = port_id_from_eval(port_id);
 
                let scheduling = component::default_send_data_message(
 
                    &mut self.exec_state, target_port_id, value,
 
                let send_result = component::default_send_data_message(
 
                    &mut self.exec_state, target_port_id,
 
                    PortInstruction::SourceLocation(expr_id), value,
 
                    sched_ctx, &mut self.consensus, comp_ctx
 
                );
 

	
 
                if let Err(location_and_message) = send_result {
 
                    self.handle_generic_component_error(sched_ctx, location_and_message);
 
                    return CompScheduling::Immediate;
 
                } else {
 
                    // When `run` is called again (potentially after becoming
 
                    // unblocked) we need to instruct the executor that we performed
 
                    // the `put`
 
                    let scheduling = send_result.unwrap();
 
                    self.exec_ctx.stmt = ExecStmt::PerformedPut;
 
                return Ok(scheduling);
 
                    return scheduling;
 
                }
 
            },
 
            EC::SelectStart(num_cases, _num_ports) => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::Sync);
 
                self.select_state.handle_select_start(num_cases);
 
                return Ok(CompScheduling::Requeue);
 
                return CompScheduling::Requeue;
 
            },
 
            EC::SelectRegisterPort(case_index, port_index, port_id) => {
 
            EC::SelectRegisterPort(expr_id, case_index, port_index, port_id) => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::Sync);
 
                let port_id = port_id_from_eval(port_id);
 
                let port_handle = comp_ctx.get_port_handle(port_id);
 

	
 
                // Note: we register the "last_instruction" here already. This
 
                // way if we get a `ClosePort` message, the condition to fail
 
                // the synchronous round is satisfied.
 
                let port_info = comp_ctx.get_port_mut(port_handle);
 
                port_info.last_instruction = PortInstruction::SourceLocation(expr_id);
 
                let port_is_closed = port_info.state.is_closed();
 

	
 
                // Register port as part of select guard
 
                if let Err(_err) = self.select_state.register_select_case_port(comp_ctx, case_index, port_index, port_id) {
 
                    todo!("handle registering a port multiple times");
 
                }
 
                return Ok(CompScheduling::Immediate);
 
                    // Failure occurs if a port is used twice in the same guard
 
                    let protocol = &sched_ctx.runtime.protocol;
 
                    self.handle_component_error(sched_ctx, CompError::Executor(EvalError::new_error_at_expr(
 
                        &self.prompt, &protocol.modules, &protocol.heap, expr_id,
 
                        String::from("Cannot have the one port appear in the same guard twice")
 
                    )));
 
                } else if port_is_closed {
 
                    // Port is closed
 
                    let peer_id = comp_ctx.get_port(port_handle).peer_comp_id;
 
                    let protocol = &sched_ctx.runtime.protocol;
 
                    self.handle_component_error(sched_ctx, CompError::Executor(EvalError::new_error_at_expr(
 
                        &self.prompt, &protocol.modules, &protocol.heap, expr_id,
 
                        format!("Cannot register port, as the peer component (id:{}) has shut down", peer_id.0)
 
                    )));
 
                }
 

	
 
                return CompScheduling::Immediate;
 
            },
 
            EC::SelectWait => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::Sync);
 
@@ -371,22 +400,23 @@ impl Component for CompPDL {
 
                    // Reached a conclusion, so we can continue immediately
 
                    self.exec_ctx.stmt = ExecStmt::PerformedSelectWait(case_index);
 
                    self.exec_state.mode = CompMode::Sync;
 
                    return Ok(CompScheduling::Immediate);
 
                    return CompScheduling::Immediate;
 
                } else {
 
                    // No decision yet
 
                    self.exec_state.mode = CompMode::BlockedSelect;
 
                    return Ok(CompScheduling::Sleep);
 
                    return CompScheduling::Sleep;
 
                }
 
            },
 
            // Results that can be returned outside of sync mode
 
            EC::ComponentTerminated => {
 
                self.exec_state.mode = CompMode::StartExit; // next call we'll take care of the exit
 
                return Ok(CompScheduling::Immediate);
 
                self.exec_state.set_as_start_exit(ExitReason::Termination);
 
                return CompScheduling::Immediate;
 
            },
 
            EC::SyncBlockStart => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::NonSync);
 
                self.handle_sync_start(sched_ctx, comp_ctx);
 
                return Ok(CompScheduling::Immediate);
 
                component::default_handle_sync_start(
 
                    &mut self.exec_state, &mut self.inbox_main, sched_ctx, comp_ctx, &mut self.consensus
 
                );
 
                return CompScheduling::Immediate;
 
            },
 
            EC::NewComponent(definition_id, type_id, arguments) => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::NonSync);
 
@@ -394,7 +424,7 @@ impl Component for CompPDL {
 
                    sched_ctx, comp_ctx,
 
                    definition_id, type_id, arguments
 
                );
 
                return Ok(CompScheduling::Requeue);
 
                return CompScheduling::Requeue;
 
            },
 
            EC::NewChannel => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::NonSync);
 
@@ -406,7 +436,7 @@ impl Component for CompPDL {
 
                ));
 
                self.inbox_main.push(None);
 
                self.inbox_main.push(None);
 
                return Ok(CompScheduling::Immediate);
 
                return CompScheduling::Immediate;
 
            }
 
        }
 
    }
 
@@ -451,77 +481,6 @@ impl CompPDL {
 
        return Ok(step_result)
 
    }
 

	
 
    fn handle_sync_start(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        sched_ctx.log("Component starting sync mode");
 
        self.consensus.notify_sync_start(comp_ctx);
 
        for message in self.inbox_main.iter() {
 
            if let Some(message) = message {
 
                self.consensus.handle_incoming_data_message(comp_ctx, message);
 
            }
 
        }
 
        debug_assert_eq!(self.exec_state.mode, CompMode::NonSync);
 
        self.exec_state.mode = CompMode::Sync;
 
    }
 

	
 
    /// Handles end of sync. The conclusion to the sync round might arise
 
    /// immediately (and be handled immediately), or might come later through
 
    /// messaging. In any case the component should be scheduled again
 
    /// immediately
 
    fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        sched_ctx.log("Component ending sync mode (now waiting for solution)");
 
        let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx);
 
        self.exec_state.mode = CompMode::SyncEnd;
 
        self.handle_sync_decision(sched_ctx, comp_ctx, decision);
 
    }
 

	
 
    /// Handles decision from the consensus round. This will cause a change in
 
    /// the internal `Mode`, such that the next call to `run` can take the
 
    /// appropriate next steps.
 
    fn handle_sync_decision(&mut self, sched_ctx: &SchedulerCtx, _comp_ctx: &mut CompCtx, decision: SyncRoundDecision) {
 
        sched_ctx.log(&format!("Handling sync decision: {:?} (in mode {:?})", decision, self.exec_state.mode));
 
        match decision {
 
            SyncRoundDecision::None => {
 
                // No decision yet
 
                return;
 
            },
 
            SyncRoundDecision::Solution => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::SyncEnd);
 
                self.exec_state.mode = CompMode::NonSync;
 
                self.consensus.notify_sync_decision(decision);
 
            },
 
            SyncRoundDecision::Failure => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::SyncEnd);
 
                self.exec_state.mode = CompMode::StartExit;
 
            },
 
        }
 
    }
 

	
 
    fn handle_component_exit(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        sched_ctx.log("Component exiting");
 
        debug_assert_eq!(self.exec_state.mode, CompMode::StartExit);
 
        self.exec_state.mode = CompMode::BusyExit;
 

	
 
        // Doing this by index, then retrieving the handle is a bit rediculous,
 
        // but Rust is being Rust with its borrowing rules.
 
        for port_index in 0..comp_ctx.num_ports() {
 
            let port = comp_ctx.get_port_by_index_mut(port_index);
 
            if port.state == PortState::Closed {
 
                // Already closed, or in the process of being closed
 
                continue;
 
            }
 

	
 
            // Mark as closed
 
            let port_id = port.self_id;
 
            port.state = PortState::Closed;
 

	
 
            // Notify peer of closing
 
            let port_handle = comp_ctx.get_port_handle(port_id);
 
            let (peer, message) = self.control.initiate_port_closing(port_handle, comp_ctx);
 
            let peer_info = comp_ctx.get_peer(peer);
 
            peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(message), true);
 
        }
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Handling messages
 
    // -------------------------------------------------------------------------
 
@@ -537,10 +496,8 @@ impl CompPDL {
 
            self.consensus.handle_incoming_data_message(comp_ctx, &message);
 
        }
 

	
 
        let port_handle = comp_ctx.get_port_handle(message.data_header.target_port);
 
        let port_index = comp_ctx.get_port_index(port_handle);
 
        match component::default_handle_incoming_data_message(
 
            &mut self.exec_state, &mut self.inbox_main[port_index], comp_ctx, message,
 
            &mut self.exec_state, &mut self.inbox_main, comp_ctx, message,
 
            sched_ctx, &mut self.control
 
        ) {
 
            IncomingData::PlacedInSlot => {
 
@@ -560,7 +517,40 @@ impl CompPDL {
 

	
 
    fn handle_incoming_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) {
 
        let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message);
 
        self.handle_sync_decision(sched_ctx, comp_ctx, decision);
 
        component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, comp_ctx, decision, &mut self.consensus);
 
    }
 

	
 
    /// Handles an error coming from the generic `component::handle_xxx`
 
    /// functions. Hence accepts argument as a tuple.
 
    fn handle_generic_component_error(&mut self, sched_ctx: &SchedulerCtx, location_and_message: (PortInstruction, String)) {
 
        // Retrieve location and message, display in terminal
 
        let (location, message) = location_and_message;
 
        let error = match location {
 
            PortInstruction::None => CompError::Component(message),
 
            PortInstruction::NoSource => unreachable!(), // for debugging: all in-sync errors are associated with a source location
 
            PortInstruction::SourceLocation(expression_id) => {
 
                let protocol = &sched_ctx.runtime.protocol;
 
                CompError::Executor(EvalError::new_error_at_expr(
 
                    &self.prompt, &protocol.modules, &protocol.heap,
 
                    expression_id, message
 
                ))
 
            }
 
        };
 

	
 
        self.handle_component_error(sched_ctx, error);
 
    }
 

	
 
    fn handle_component_error(&mut self, sched_ctx: &SchedulerCtx, error: CompError) {
 
        sched_ctx.error(&format!("{}", error));
 

	
 
        // Set state to handle subsequent error
 
        let exit_reason = if self.exec_state.mode.is_in_sync_block() {
 
            ExitReason::ErrorInSync
 
        } else {
 
            ExitReason::ErrorNonSync
 
        };
 

	
 
        self.exec_state.set_as_start_exit(exit_reason);
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
@@ -590,9 +580,8 @@ impl CompPDL {
 
        let reservation = sched_ctx.runtime.start_create_pdl_component();
 
        let mut created_ctx = CompCtx::new(&reservation);
 

	
 
        let other_proc = &sched_ctx.runtime.protocol.heap[definition_id];
 
        let self_proc = &sched_ctx.runtime.protocol.heap[self.prompt.frames[0].definition];
 

	
 
        // let other_proc = &sched_ctx.runtime.protocol.heap[definition_id];
 
        // let self_proc = &sched_ctx.runtime.protocol.heap[self.prompt.frames[0].definition];
 
        // dbg_code!({
 
        //     sched_ctx.log(&format!(
 
        //         "DEBUG: Comp '{}' (ID {:?}) is creating comp '{}' (ID {:?})",
 
@@ -624,7 +613,7 @@ impl CompPDL {
 
                created_id: created_port_id,
 
            };
 

	
 
            if creator_port.state == PortState::Closed {
 
            if creator_port.state.is_closed() {
 
                closed_port_id_pairs.push(port_id_pair)
 
            } else {
 
                opened_port_id_pairs.push(port_id_pair);
 
@@ -653,23 +642,24 @@ impl CompPDL {
 
            let created_port_info = created_ctx.get_port_mut(pair.created_handle);
 

	
 
            if created_port_info.peer_comp_id == creator_ctx.id {
 
                // Port peer is owned by the creator as well
 
                // Peer of the transferred port is the component that is
 
                // creating the new component.
 
                let created_peer_port_index = opened_port_id_pairs
 
                    .iter()
 
                    .position(|v| v.creator_id == creator_port_info.peer_port_id);
 
                match created_peer_port_index {
 
                    Some(created_peer_port_index) => {
 
                        // Peer port moved to the new component as well. So
 
                        // adjust IDs appropriately.
 
                        // Addendum to the above comment: but that port is also
 
                        // moving to the new component
 
                        let peer_pair = &opened_port_id_pairs[created_peer_port_index];
 
                        created_port_info.peer_port_id = peer_pair.created_id;
 
                        created_port_info.peer_comp_id = reservation.id();
 
                        todo!("either add 'self peer', or remove that idea from Ctx altogether")
 
                        todo!("either add 'self peer', or remove that idea from Ctx altogether");`
 
                    },
 
                    None => {
 
                        // Peer port remains with creator component.
 
                        created_port_info.peer_comp_id = creator_ctx.id;
 
                        created_ctx.add_peer(pair.created_handle, sched_ctx, creator_ctx.id, None);
 
                        created_ctx.change_port_peer(sched_ctx, pair.created_handle, Some(creator_ctx.id));
 
                    }
 
                }
 
            } else {
 
@@ -677,7 +667,7 @@ impl CompPDL {
 
                // appropriate messages later
 
                let peer_handle = creator_ctx.get_peer_handle(created_port_info.peer_comp_id);
 
                let peer_info = creator_ctx.get_peer(peer_handle);
 
                created_ctx.add_peer(pair.created_handle, sched_ctx, peer_info.id, Some(&peer_info.handle));
 
                created_ctx.change_port_peer(sched_ctx, pair.created_handle, Some(peer_info.id));
 
                created_component_has_remote_peers = true;
 
            }
 
        }
 
@@ -697,10 +687,8 @@ impl CompPDL {
 
        // potentially remove the peer component.
 
        for pair in opened_port_id_pairs.iter() {
 
            // Remove peer if appropriate
 
            let creator_port_info = creator_ctx.get_port(pair.creator_handle);
 
            let creator_port_index = creator_ctx.get_port_index(pair.creator_handle);
 
            let creator_peer_comp_id = creator_port_info.peer_comp_id;
 
            creator_ctx.remove_peer(sched_ctx, pair.creator_handle, creator_peer_comp_id, false);
 
            creator_ctx.change_port_peer(sched_ctx, pair.creator_handle, None);
 
            creator_ctx.remove_port(pair.creator_handle);
 

	
 
            // Transfer any messages
 
@@ -722,28 +710,42 @@ impl CompPDL {
 
                }
 
            }
 

	
 
            // Handle potential channel between creator and created component
 
            let created_port_info = component.ctx.get_port(pair.created_handle);
 

	
 
            if created_port_info.peer_comp_id == creator_ctx.id {
 
                // This handles the creation of a channel between the creator
 
                // component and the newly created component. So if the creator
 
                // had a `a -> b` channel, and `b` is moved to the new
 
                // component, then `a` needs to set its peer component.
 
                let peer_port_handle = creator_ctx.get_port_handle(created_port_info.peer_port_id);
 
                let peer_port_info = creator_ctx.get_port_mut(peer_port_handle);
 
                peer_port_info.peer_comp_id = component.ctx.id;
 
                peer_port_info.peer_port_id = created_port_info.self_id;
 
                creator_ctx.add_peer(peer_port_handle, sched_ctx, component.ctx.id, None);
 
                creator_ctx.change_port_peer(sched_ctx, peer_port_handle, Some(component.ctx.id));
 
            }
 
        }
 

	
 
        // Do the same for the closed ports
 
        // Do the same for the closed ports. Note that we might still have to
 
        // transfer messages that cause the new owner of the port to fail.
 
        for pair in closed_port_id_pairs.iter() {
 
            let port_index = creator_ctx.get_port_index(pair.creator_handle);
 
            creator_ctx.remove_port(pair.creator_handle);
 
            let _removed_message = self.inbox_main.remove(port_index);
 
            if let Some(mut message) = self.inbox_main.remove(port_index) {
 
                message.data_header.target_port = pair.created_id;
 
                component.component.adopt_message(&mut component.ctx, message);
 
            }
 

	
 
            // In debug mode: since we've closed the port we shouldn't have any
 
            // messages for that port.
 
            debug_assert!(_removed_message.is_none());
 
            debug_assert!(!self.inbox_backup.iter().any(|v| v.data_header.target_port == pair.creator_id));
 
            let mut message_index = 0;
 
            while message_index < self.inbox_backup.len() {
 
                let message = &self.inbox_backup[message_index];
 
                if message.data_header.target_port == pair.created_id {
 
                    // Transfer message
 
                    let mut message = self.inbox_backup.remove(message_index);
 
                    message.data_header.target_port = pair.created_id;
 
                    component.component.adopt_message(&mut component.ctx, message);
 
                } else {
 
                    message_index += 1;
 
                }
 
            }
 
        }
 

	
 
        // By now all ports and messages have been transferred. If there are any
 
@@ -762,7 +764,7 @@ impl CompPDL {
 
                    );
 
                    let peer_handle = created_ctx.get_peer_handle(port_info.peer_comp_id);
 
                    let peer_info = created_ctx.get_peer(peer_handle);
 
                    peer_info.handle.send_message(&sched_ctx.runtime, message, true);
 
                    peer_info.handle.send_message_logged(sched_ctx, message, true);
 
                }
 
            }
 
        } else {
src/runtime2/component/component_random.rs
Show inline comments
 
use rand::prelude as random;
 
use rand::RngCore;
 

	
 
use crate::protocol::eval::{ValueGroup, Value, EvalError};
 
use crate::protocol::eval::{ValueGroup, Value};
 
use crate::runtime2::*;
 

	
 
use super::*;
 
use super::component::{self, Component, CompExecState, CompScheduling, CompMode};
 
use super::component::{
 
    self,
 
    Component, CompExecState, CompScheduling,
 
    CompMode, ExitReason
 
};
 
use super::control_layer::*;
 
use super::consensus::*;
 

	
 
@@ -42,20 +46,22 @@ impl Component for ComponentRandomU32 {
 
            Message::Data(_message) => unreachable!(),
 
            Message::Sync(message) => {
 
                let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message);
 
                component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus);
 
                component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, comp_ctx, decision, &mut self.consensus);
 
            },
 
            Message::Control(message) => {
 
                component::default_handle_control_message(
 
                if let Err(location_and_message) = component::default_handle_control_message(
 
                    &mut self.exec_state, &mut self.control, &mut self.consensus,
 
                    message, sched_ctx, comp_ctx
 
                );
 
                ) {
 
                    component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message);
 
                }
 
            },
 
            Message::Poll => unreachable!(),
 
        }
 
    }
 

	
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result<CompScheduling, EvalError> {
 
        sched_ctx.log(&format!("Running component ComponentRandomU32 (mode: {:?})", self.exec_state.mode));
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling {
 
        sched_ctx.info(&format!("Running component ComponentRandomU32 (mode: {:?})", self.exec_state.mode));
 

	
 
        match self.exec_state.mode {
 
            CompMode::BlockedGet | CompMode::BlockedSelect => {
 
@@ -73,55 +79,60 @@ impl Component for ComponentRandomU32 {
 
                }
 

	
 
                if self.num_sends >= self.max_num_sends {
 
                    self.exec_state.mode = CompMode::StartExit;
 
                    self.exec_state.set_as_start_exit(ExitReason::Termination);
 
                } else {
 
                    sched_ctx.log("Entering sync mode");
 
                    sched_ctx.info("Entering sync mode");
 
                    self.did_perform_send = false;
 
                    self.consensus.notify_sync_start(comp_ctx);
 
                    self.exec_state.mode = CompMode::Sync;
 
                    component::default_handle_sync_start(
 
                        &mut self.exec_state, &mut [], sched_ctx, comp_ctx, &mut self.consensus
 
                    );
 
                }
 

	
 
                return Ok(CompScheduling::Immediate);
 
                return CompScheduling::Immediate;
 
            },
 
            CompMode::Sync => {
 
                // This component just sends a single message, then waits until
 
                // consensus has been reached
 
                if !self.did_perform_send {
 
                    sched_ctx.log("Sending random message");
 
                    sched_ctx.info("Sending random message");
 
                    let mut random = self.generator.next_u32() - self.random_minimum;
 
                    let random_delta = self.random_maximum - self.random_minimum;
 
                    random %= random_delta;
 
                    random += self.random_minimum;
 
                    let value_group = ValueGroup::new_stack(vec![Value::UInt32(random)]);
 

	
 
                    let scheduling = component::default_send_data_message(
 
                        &mut self.exec_state, self.output_port_id, value_group,
 
                    let send_result = component::default_send_data_message(
 
                        &mut self.exec_state, self.output_port_id,
 
                        PortInstruction::NoSource, value_group,
 
                        sched_ctx, &mut self.consensus, comp_ctx
 
                    );
 

	
 
                    if let Err(location_and_message) = send_result {
 
                        component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message);
 
                        return CompScheduling::Immediate
 
                    } else {
 
                        // Blocked or not, we set `did_perform_send` to true. If
 
                        // blocked then the moment we become unblocked (and are back
 
                        // at the `Sync` mode) we have sent the message.
 
                        let scheduling = send_result.unwrap();
 
                        self.did_perform_send = true;
 
                        self.num_sends += 1;
 
                    return Ok(scheduling)
 
                        return scheduling
 
                    }
 
                } else {
 
                    // Message was sent, finish this sync round
 
                    sched_ctx.log("Waiting for consensus");
 
                    self.exec_state.mode = CompMode::SyncEnd;
 
                    let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx);
 
                    component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus);
 
                    return Ok(CompScheduling::Requeue);
 
                    component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus);
 
                    return CompScheduling::Requeue;
 
                }
 
            },
 
            CompMode::SyncEnd | CompMode::BlockedPut => return Ok(CompScheduling::Sleep),
 
            CompMode::StartExit => return Ok(component::default_handle_start_exit(
 
                &mut self.exec_state, &mut self.control, sched_ctx, comp_ctx
 
            )),
 
            CompMode::BusyExit => return Ok(component::default_handle_busy_exit(
 
            CompMode::SyncEnd | CompMode::BlockedPut => return CompScheduling::Sleep,
 
            CompMode::StartExit => return component::default_handle_start_exit(
 
                &mut self.exec_state, &mut self.control, sched_ctx, comp_ctx, &mut self.consensus
 
            ),
 
            CompMode::BusyExit => return component::default_handle_busy_exit(
 
                &mut self.exec_state, &self.control, sched_ctx
 
            )),
 
            CompMode::Exit => return Ok(component::default_handle_exit(&self.exec_state)),
 
            ),
 
            CompMode::Exit => return component::default_handle_exit(&self.exec_state),
 
        }
 
    }
 
}
src/runtime2/component/consensus.rs
Show inline comments
 
@@ -226,7 +226,11 @@ impl SolutionCombiner {
 

	
 
        let putter = channel.putter.as_ref().unwrap();
 
        let getter = channel.getter.as_ref().unwrap();
 
        return Some(putter.mapping == getter.mapping);
 
        return Some(
 
            !putter.failed &&
 
            !getter.failed &&
 
            putter.mapping == getter.mapping
 
        );
 
    }
 

	
 
    /// Determines the global solution if all components have contributed their
 
@@ -313,35 +317,25 @@ impl Consensus {
 
    /// Notifies the consensus management that the PDL code has reached the end
 
    /// of a sync block. A local solution will be submitted, after which we wait
 
    /// until the participants in the round (hopefully) reach a conclusion.
 
    pub(crate) fn notify_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx) -> SyncRoundDecision {
 
    pub(crate) fn notify_sync_end_success(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx) -> SyncRoundDecision {
 
        debug_assert_eq!(self.mode, Mode::SyncBusy);
 
        self.mode = Mode::SyncAwaitingSolution;
 

	
 
        // Submit our port mapping as a solution
 
        let mut local_solution = Vec::with_capacity(self.ports.len());
 
        for port in &self.ports {
 
            if let Some(mapping) = port.mapping {
 
                let port_handle = comp_ctx.get_port_handle(port.self_port_id);
 
                let port_info = comp_ctx.get_port(port_handle);
 
                let new_entry = match port_info.kind {
 
                    PortKind::Putter => SyncLocalSolutionEntry::Putter(SyncSolutionPutterPort{
 
                        self_comp_id: comp_ctx.id,
 
                        self_port_id: port_info.self_id,
 
                        mapping
 
                    }),
 
                    PortKind::Getter => SyncLocalSolutionEntry::Getter(SyncSolutionGetterPort{
 
                        self_comp_id: comp_ctx.id,
 
                        self_port_id: port_info.self_id,
 
                        peer_comp_id: port.peer_comp_id,
 
                        peer_port_id: port.peer_port_id,
 
                        mapping
 
                    })
 
                };
 
                local_solution.push(new_entry);
 
            }
 
        let local_solution = self.generate_local_solution(comp_ctx, false);
 
        let decision = self.handle_local_solution(sched_ctx, comp_ctx, comp_ctx.id, local_solution, false);
 
        return decision;
 
    }
 

	
 
        let decision = self.handle_local_solution(sched_ctx, comp_ctx, comp_ctx.id, local_solution);
 
    /// Notifies the consensus management that the component has encountered a
 
    /// critical error during the synchronous round. Hence we should report that
 
    /// we've failed and wait until all the participants have been notified of
 
    /// the error.
 
    pub(crate) fn notify_sync_end_failure(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx) -> SyncRoundDecision {
 
        debug_assert_eq!(self.mode, Mode::SyncBusy);
 
        self.mode = Mode::SyncAwaitingSolution;
 

	
 
        let local_solution = self.generate_local_solution(comp_ctx, true);
 
        let decision = self.handle_local_solution(sched_ctx, comp_ctx, comp_ctx.id, local_solution, true);
 
        return decision;
 
    }
 

	
 
@@ -380,7 +374,6 @@ impl Consensus {
 
    /// is used to determine peers of `get`ter ports.
 
    // TODO: The use of this function is rather ugly. Find a more robust
 
    //  scheme about owners of `get`ter ports not knowing about their peers.
 
    //  (also, figure out why this was written again, I forgot).
 
    pub(crate) fn handle_incoming_data_message(&mut self, comp_ctx: &CompCtx, message: &DataMessage) {
 
        let target_handle = comp_ctx.get_port_handle(message.data_header.target_port);
 
        let target_index = comp_ctx.get_port_index(target_handle);
 
@@ -464,7 +457,7 @@ impl Consensus {
 
                return SyncRoundDecision::None;
 
            },
 
            SyncMessageContent::LocalSolution(solution_generator_id, local_solution) => {
 
                return self.handle_local_solution(sched_ctx, comp_ctx, solution_generator_id, local_solution);
 
                return self.handle_local_solution(sched_ctx, comp_ctx, solution_generator_id, local_solution, false);
 
            },
 
            SyncMessageContent::PartialSolution(partial_solution) => {
 
                return self.handle_partial_solution(sched_ctx, comp_ctx, partial_solution);
 
@@ -507,7 +500,7 @@ impl Consensus {
 
                    sync_header: self.create_sync_header(comp_ctx),
 
                    content: SyncMessageContent::NotificationOfLeader,
 
                };
 
                peer.handle.send_message(&sched_ctx.runtime, Message::Sync(message), true);
 
                peer.handle.send_message_logged(sched_ctx, Message::Sync(message), true);
 
            }
 

	
 
            self.forward_partial_solution(sched_ctx, comp_ctx);
 
@@ -519,7 +512,7 @@ impl Consensus {
 
            };
 
            let peer_handle = comp_ctx.get_peer_handle(header.sending_id);
 
            let peer_info = comp_ctx.get_peer(peer_handle);
 
            peer_info.handle.send_message(&sched_ctx.runtime, Message::Sync(message), true);
 
            peer_info.handle.send_message_logged(sched_ctx, Message::Sync(message), true);
 
        } // else: exactly equal
 
    }
 

	
 
@@ -558,12 +551,18 @@ impl Consensus {
 
        }));
 
    }
 

	
 
    fn handle_local_solution(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, solution_sender_id: CompId, solution: SyncLocalSolution) -> SyncRoundDecision {
 
    fn handle_local_solution(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, solution_sender_id: CompId, solution: SyncLocalSolution, fail_if_empty: bool) -> SyncRoundDecision {
 
        if self.highest_id == comp_ctx.id {
 
            // We are the leader
 
            self.solution.combine_with_local_solution(solution_sender_id, solution);
 
            let round_decision = self.solution.get_decision();
 
            let mut round_decision = self.solution.get_decision();
 
            if round_decision != SyncRoundDecision::None {
 
                if fail_if_empty && self.solution.matched_channels == 0 {
 
                    // TODO: Not sure about this, bit of a hack. Situation is that a component
 
                    //  cannot interact with other components, but it is in a sync round, and has
 
                    //  failed that sync round.
 
                    round_decision = SyncRoundDecision::Failure;
 
                }
 
                self.broadcast_decision(sched_ctx, comp_ctx, round_decision);
 
            }
 
            return round_decision;
 
@@ -625,16 +624,16 @@ impl Consensus {
 
                sync_header: self.create_sync_header(comp_ctx),
 
                content: if is_success { SyncMessageContent::GlobalSolution } else { SyncMessageContent::GlobalFailure },
 
            });
 
            handle.send_message(&sched_ctx.runtime, message, true);
 
            handle.send_message_logged(sched_ctx, message, true);
 
            let _should_remove = handle.decrement_users();
 
            debug_assert!(_should_remove.is_none());
 
        }
 
    }
 

	
 
    fn send_to_leader(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, message: Message) {
 
        debug_assert_ne!(self.highest_id, comp_ctx.id); // we're not the leader
 
        debug_assert_ne!(self.highest_id, comp_ctx.id); // we're not the leader, // TODO: @NoDirectHandle
 
        let mut leader_info = sched_ctx.runtime.get_component_public(self.highest_id);
 
        leader_info.send_message(&sched_ctx.runtime, message, true);
 
        leader_info.send_message_logged(sched_ctx, message, true);
 
        let should_remove = leader_info.decrement_users();
 
        if let Some(key) = should_remove {
 
            sched_ctx.runtime.destroy_component(key);
 
@@ -642,9 +641,38 @@ impl Consensus {
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Creating message headers
 
    // Small utilities
 
    // -------------------------------------------------------------------------
 

	
 
    fn generate_local_solution(&self, comp_ctx: &CompCtx, failed: bool) -> SyncLocalSolution {
 
        let mut local_solution = Vec::with_capacity(self.ports.len());
 
        for port in &self.ports {
 
            if let Some(mapping) = port.mapping {
 
                let port_handle = comp_ctx.get_port_handle(port.self_port_id);
 
                let port_info = comp_ctx.get_port(port_handle);
 
                let new_entry = match port_info.kind {
 
                    PortKind::Putter => SyncLocalSolutionEntry::Putter(SyncSolutionPutterPort{
 
                        self_comp_id: comp_ctx.id,
 
                        self_port_id: port_info.self_id,
 
                        mapping,
 
                        failed
 
                    }),
 
                    PortKind::Getter => SyncLocalSolutionEntry::Getter(SyncSolutionGetterPort{
 
                        self_comp_id: comp_ctx.id,
 
                        self_port_id: port_info.self_id,
 
                        peer_comp_id: port.peer_comp_id,
 
                        peer_port_id: port.peer_port_id,
 
                        mapping,
 
                        failed
 
                    })
 
                };
 
                local_solution.push(new_entry);
 
            }
 
        }
 

	
 
        return local_solution;
 
    }
 

	
 
    fn create_data_header_and_update_mapping(&mut self, port_info: &Port) -> MessageDataHeader {
 
        let mut expected_mapping = Vec::with_capacity(self.ports.len());
 
        let mut port_index = usize::MAX;
src/runtime2/component/control_layer.rs
Show inline comments
 
@@ -124,10 +124,8 @@ impl ControlLayer {
 
                // If a closed port is Ack'd, then we remove the reference to
 
                // that component.
 
                let port_handle = comp_ctx.get_port_handle(closed_port);
 
                let port_info = comp_ctx.get_port(port_handle);
 
                let port_peer_comp_id = port_info.peer_comp_id;
 
                debug_assert_eq!(port_info.state, PortState::Closed);
 
                comp_ctx.remove_peer(sched_ctx, port_handle, port_peer_comp_id, true); // remove if closed
 
                debug_assert!(comp_ctx.get_port(port_handle).state.is_closed());
 
                comp_ctx.change_port_peer(sched_ctx, port_handle, None);
 

	
 
                return (AckAction::None, None);
 
            }
 
@@ -191,7 +189,7 @@ impl ControlLayer {
 
            id: entry_id,
 
            sender_comp_id: creator_comp_id,
 
            target_port_id: Some(source_port_id),
 
            content: ControlMessageContent::PortPeerChangedBlock(source_port_id)
 
            content: ControlMessageContent::PortPeerChangedBlock
 
        })
 
    }
 

	
 
@@ -215,10 +213,10 @@ impl ControlLayer {
 

	
 
    /// Initiates the control message procedures for closing a port. Caller must
 
    /// make sure that the port state has already been set to `Closed`.
 
    pub(crate) fn initiate_port_closing(&mut self, port_handle: LocalPortHandle, comp_ctx: &CompCtx) -> (LocalPeerHandle, ControlMessage) {
 
    pub(crate) fn initiate_port_closing(&mut self, port_handle: LocalPortHandle, exit_inside_sync: bool, comp_ctx: &CompCtx) -> (LocalPeerHandle, ControlMessage) {
 
        let port = comp_ctx.get_port(port_handle);
 
        let peer_port_id = port.peer_port_id;
 
        debug_assert!(port.state == PortState::Closed);
 
        debug_assert!(port.state.is_closed());
 

	
 
        // Construct the port-closing entry
 
        let entry_id = self.take_id();
 
@@ -236,7 +234,9 @@ impl ControlLayer {
 
                id: entry_id,
 
                sender_comp_id: comp_ctx.id,
 
                target_port_id: Some(peer_port_id),
 
                content: ControlMessageContent::ClosePort(peer_port_id),
 
                content: ControlMessageContent::ClosePort(ControlMessageClosePort{
 
                    closed_in_sync_round: exit_inside_sync,
 
                }),
 
            }
 
        );
 
    }
 
@@ -247,7 +247,7 @@ impl ControlLayer {
 
    pub(crate) fn initiate_port_blocking(&mut self, comp_ctx: &CompCtx, port_handle: LocalPortHandle) -> (LocalPeerHandle, ControlMessage) {
 
        let port_info = comp_ctx.get_port(port_handle);
 
        debug_assert_eq!(port_info.kind, PortKind::Getter); // because we're telling the putter to block
 
        debug_assert_eq!(port_info.state, PortState::BlockedDueToFullBuffers); // contract with caller
 
        debug_assert!(port_info.state.is_set(PortStateFlag::BlockedDueToFullBuffers)); // contract with caller
 

	
 
        let peer_port_id = port_info.peer_port_id;
 
        let peer_comp_id = port_info.peer_comp_id;
 
@@ -258,8 +258,8 @@ impl ControlLayer {
 
            ControlMessage{
 
                id: ControlId::new_invalid(),
 
                sender_comp_id: comp_ctx.id,
 
                target_port_id: Some(port_info.peer_port_id),
 
                content: ControlMessageContent::BlockPort(peer_port_id),
 
                target_port_id: Some(peer_port_id),
 
                content: ControlMessageContent::BlockPort,
 
            }
 
        );
 
    }
 
@@ -269,7 +269,6 @@ impl ControlLayer {
 
    pub(crate) fn cancel_port_blocking(&mut self, comp_ctx: &CompCtx, port_handle: LocalPortHandle) -> (LocalPeerHandle, ControlMessage) {
 
        let port_info = comp_ctx.get_port(port_handle);
 
        debug_assert_eq!(port_info.kind, PortKind::Getter); // because we're initiating the unblocking
 
        debug_assert_eq!(port_info.state, PortState::Open); // contract with caller, the locally stored entry ensures we were blocked before
 

	
 
        let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 

	
 
@@ -279,7 +278,7 @@ impl ControlLayer {
 
                id: ControlId::new_invalid(),
 
                sender_comp_id: comp_ctx.id,
 
                target_port_id: Some(port_info.peer_port_id),
 
                content: ControlMessageContent::UnblockPort(port_info.peer_port_id)
 
                content: ControlMessageContent::UnblockPort,
 
            }
 
        );
 
    }
src/runtime2/component/mod.rs
Show inline comments
 
@@ -8,7 +8,7 @@ mod component_internet;
 

	
 
pub(crate) use component::{Component, CompScheduling};
 
pub(crate) use component_pdl::{CompPDL};
 
pub(crate) use component_context::CompCtx;
 
pub(crate) use component_context::{CompCtx, PortInstruction};
 
pub(crate) use control_layer::{ControlId};
 

	
 
use super::scheduler::*;
src/runtime2/mod.rs
Show inline comments
 
@@ -12,7 +12,7 @@ pub use runtime::Runtime;
 
pub(crate) use error::RtError;
 
pub(crate) use scheduler::SchedulerCtx;
 
pub(crate) use communication::{
 
    PortId, PortKind, PortState,
 
    PortId,
 
    Message, ControlMessage, SyncMessage, DataMessage,
 
    SyncRoundDecision
 
};
 
\ No newline at end of file
src/runtime2/poll/mod.rs
Show inline comments
 
@@ -6,7 +6,7 @@ use std::sync::atomic::{AtomicU32, Ordering};
 
use std::collections::HashMap;
 

	
 
use crate::runtime2::RtError;
 
use crate::runtime2::runtime::{CompHandle, RuntimeInner};
 
use crate::runtime2::runtime::{CompHandle, RuntimeInner, LogLevel};
 
use crate::runtime2::store::queue_mpsc::*;
 

	
 

	
 
@@ -139,11 +139,11 @@ pub struct PollingThread {
 
    poller: Arc<Poller>,
 
    runtime: Arc<RuntimeInner>,
 
    queue: QueueDynMpsc<PollCmd>,
 
    logging_enabled: bool,
 
    log_level: LogLevel,
 
}
 

	
 
impl PollingThread {
 
    pub(crate) fn new(runtime: Arc<RuntimeInner>, logging_enabled: bool) -> Result<(PollingThreadHandle, PollingClientFactory), RtError> {
 
    pub(crate) fn new(runtime: Arc<RuntimeInner>, log_level: LogLevel) -> Result<(PollingThreadHandle, PollingClientFactory), RtError> {
 
        let poller = Poller::new()
 
            .map_err(|e| rt_error!("failed to create poller, because: {}", e))?;
 
        let poller = Arc::new(poller);
 
@@ -154,9 +154,14 @@ impl PollingThread {
 
            poller: poller.clone(),
 
            runtime: runtime.clone(),
 
            queue,
 
            logging_enabled,
 
            log_level,
 
        };
 
        let thread_handle = thread::spawn(move || { thread_data.run() });
 
        let thread_handle = thread::Builder::new()
 
            .name(String::from("poller"))
 
            .spawn(move || { thread_data.run() })
 
            .map_err(|reason|
 
                rt_error!("failed to start polling thread, because: {}", reason)
 
            )?;
 

	
 
        let thread_handle = PollingThreadHandle{
 
            queue: Some(queue_producers.producer()),
 
@@ -172,7 +177,6 @@ impl PollingThread {
 
    }
 

	
 
    pub(crate) fn run(&mut self) {
 
        use crate::runtime2::scheduler::SchedulerCtx;
 
        use crate::runtime2::communication::Message;
 

	
 
        const NUM_EVENTS: usize = 256;
 
@@ -239,7 +243,7 @@ impl PollingThread {
 
    }
 

	
 
    fn log(&self, message: &str) {
 
        if self.logging_enabled {
 
        if self.log_level >= LogLevel::Info {
 
            println!("[polling] {}", message);
 
        }
 
    }
src/runtime2/runtime.rs
Show inline comments
 
@@ -12,6 +12,13 @@ use super::component::{Component, wake_up_if_sleeping, CompPDL, CompCtx};
 
use super::store::{ComponentStore, ComponentReservation, QueueDynMpsc, QueueDynProducer};
 
use super::scheduler::*;
 

	
 
#[derive(PartialOrd, PartialEq, Copy, Clone)]
 
pub enum LogLevel {
 
    Debug, // all logging (includes messages)
 
    Info, // rough logging
 
    None, // no logging
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Component
 
// -----------------------------------------------------------------------------
 
@@ -105,6 +112,12 @@ impl CompHandle {
 
        }
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn send_message_logged(&self, sched_ctx: &SchedulerCtx, message: Message, try_wake_up: bool) {
 
        sched_ctx.debug(&format!("Sending message to comp:{} ... {:?}", self.id.0, message));
 
        self.send_message(&sched_ctx.runtime, message, try_wake_up);
 
    }
 

	
 
    pub(crate) fn id(&self) -> CompId {
 
        return self.id;
 
    }
 
@@ -167,7 +180,7 @@ pub struct Runtime {
 

	
 
impl Runtime {
 
    // TODO: debug_logging should be removed at some point
 
    pub fn new(num_threads: u32, debug_logging: bool, protocol_description: ProtocolDescription) -> Result<Runtime, RtError> {
 
    pub fn new(num_threads: u32, log_level: LogLevel, protocol_description: ProtocolDescription) -> Result<Runtime, RtError> {
 
        if num_threads == 0 {
 
            return Err(rt_error!("need at least one thread to create the runtime"));
 
        }
 
@@ -179,7 +192,7 @@ impl Runtime {
 
            active_elements: AtomicU32::new(1),
 
        });
 
        let (polling_handle, polling_clients) = rt_error_try!(
 
            PollingThread::new(runtime_inner.clone(), debug_logging),
 
            PollingThread::new(runtime_inner.clone(), log_level),
 
            "failed to build polling thread"
 
        );
 

	
 
@@ -188,11 +201,20 @@ impl Runtime {
 
        for thread_index in 0..num_threads {
 
            let mut scheduler = Scheduler::new(
 
                runtime_inner.clone(), polling_clients.client(),
 
                thread_index, debug_logging
 
                thread_index, log_level
 
            );
 
            let thread_handle = thread::spawn(move || {
 

	
 
            let thread_handle = thread::Builder::new()
 
                .name(format!("scheduler:{}", thread_index))
 
                .spawn(move || {
 
                    scheduler.run();
 
            });
 
                })
 
                .map_err(|reason|
 
                    rt_error!(
 
                        "failed to spawn scheduler thread {}, because: {}",
 
                        thread_index, reason
 
                    )
 
                )?;
 

	
 
            scheduler_threads.push(thread_handle);
 
        }
src/runtime2/scheduler.rs
Show inline comments
 
@@ -10,7 +10,7 @@ pub(crate) struct Scheduler {
 
    runtime: Arc<RuntimeInner>,
 
    polling: PollingClient,
 
    scheduler_id: u32,
 
    debug_logging: bool,
 
    log_level: LogLevel,
 
}
 

	
 
pub(crate) struct SchedulerCtx<'a> {
 
@@ -18,36 +18,48 @@ pub(crate) struct SchedulerCtx<'a> {
 
    pub polling: &'a PollingClient,
 
    pub id: u32,
 
    pub comp: u32,
 
    pub logging_enabled: bool,
 
    pub log_level: LogLevel,
 
}
 

	
 
impl<'a> SchedulerCtx<'a> {
 
    pub fn new(runtime: &'a RuntimeInner, polling: &'a PollingClient, id: u32, logging_enabled: bool) -> Self {
 
    pub fn new(runtime: &'a RuntimeInner, polling: &'a PollingClient, id: u32, log_level: LogLevel) -> Self {
 
        return Self {
 
            runtime,
 
            polling,
 
            id,
 
            comp: 0,
 
            logging_enabled,
 
            log_level,
 
        }
 
    }
 

	
 
    pub(crate) fn log(&self, text: &str) {
 
        if self.logging_enabled {
 
    pub(crate) fn debug(&self, text: &str) {
 
        // TODO: Probably not always use colour
 
        if LogLevel::Debug >= self.log_level {
 
            println!("[s:{:02}, c:{:03}] \x1b[0;36m{}\x1b[0m", self.id, self.comp, text);
 
        }
 
    }
 

	
 
    pub(crate) fn info(&self, text: &str) {
 
        if LogLevel::Info >= self.log_level {
 
            println!("[s:{:02}, c:{:03}] {}", self.id, self.comp, text);
 
        }
 
    }
 

	
 
    pub(crate) fn error(&self, text: &str) {
 
        // TODO: Probably not always use colour
 
        println!("[s:{:02}, c:{:03}] \x1b[0;31m{}\x1b[0m", self.id, self.comp, text);
 
    }
 
}
 

	
 
impl Scheduler {
 
    // public interface to thread
 

	
 
    pub fn new(runtime: Arc<RuntimeInner>, polling: PollingClient, scheduler_id: u32, debug_logging: bool) -> Self {
 
        return Scheduler{ runtime, polling, scheduler_id, debug_logging }
 
    pub fn new(runtime: Arc<RuntimeInner>, polling: PollingClient, scheduler_id: u32, log_level: LogLevel) -> Self {
 
        return Scheduler{ runtime, polling, scheduler_id, log_level }
 
    }
 

	
 
    pub fn run(&mut self) {
 
        let mut scheduler_ctx = SchedulerCtx::new(&*self.runtime, &self.polling, self.scheduler_id, self.debug_logging);
 
        let mut scheduler_ctx = SchedulerCtx::new(&*self.runtime, &self.polling, self.scheduler_id, self.log_level);
 

	
 
        'run_loop: loop {
 
            // Wait until we have something to do (or need to quit)
 
@@ -67,7 +79,7 @@ impl Scheduler {
 
                while let Some(message) = component.inbox.pop() {
 
                    component.component.handle_message(&mut scheduler_ctx, &mut component.ctx, message);
 
                }
 
                new_scheduling = component.component.run(&mut scheduler_ctx, &mut component.ctx).expect("TODO: Handle error");
 
                new_scheduling = component.component.run(&mut scheduler_ctx, &mut component.ctx);
 
            }
 

	
 
            // Handle the new scheduling
src/runtime2/stdlib/internet.rs
Show inline comments
 
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
 
use std::mem::size_of;
 
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
 
use std::io::Error as IoError;
 

	
 
use libc::{
 
    c_int,
 
    sockaddr_in, sockaddr_in6, in_addr, in6_addr,
 
    socket, bind, listen, accept, connect, close,
 
};
 
use mio::{event, Interest, Registry, Token};
 

	
 
use crate::runtime2::poll::{AsFileDescriptor, FileDescriptor};
 

	
src/runtime2/tests/error_handling.rs
Show inline comments
 
new file 100644
 
use super::*;
 

	
 
#[test]
 
fn test_unconnected_component_error() {
 
    compile_and_create_component("
 
    primitive interact_with_noone() {
 
        u8[] array = { 5 };
 
        auto value = array[1];
 
    }", "interact_with_noone", no_args());
 
}
 

	
 
#[test]
 
fn test_connected_uncommunicating_component_error() {
 
    compile_and_create_component("
 
    primitive crashing_and_burning(out<u32> unused) {
 
        u8[] array = { 1337 };
 
        auto value = array[1337];
 
    }
 
    primitive sitting_idly_waiting(in<u32> never_providing) {
 
        sync auto a = get(never_providing);
 
    }
 
    composite constructor() {
 
        // Test one way
 
        // channel a -> b;
 
        // new sitting_idly_waiting(b);
 
        // new crashing_and_burning(a);
 

	
 
        // And the other way around
 
        channel c -> d;
 
        new crashing_and_burning(c);
 
        new sitting_idly_waiting(d);
 
    }", "constructor", no_args())
 
}
 

	
 
#[test]
 
fn test_connected_communicating_component_error() {
 
    compile_and_create_component("
 
    primitive send_and_fail(out<u32> tx) {
 
        u8[] array = {};
 
        sync {
 
            put(tx, 0);
 
            array[0] = 5;
 
        }
 
    }
 
    primitive receive_once(in<u32> rx) {
 
        sync auto a = get(rx);
 
    }
 
    composite constructor() {
 
        channel a -> b;
 
        new send_and_fail(a);
 
        new receive_once(b);
 

	
 
        channel c -> d;
 
        new receive_once(d);
 
        new send_and_fail(c);
 
    }
 
    ", "constructor", no_args())
 
}
 

	
 
#[test]
 
fn test_failing_after_successful_sync() {
 
    compile_and_create_component("
 
    primitive put_and_fail(out<u8> tx) { sync put(tx, 1); u8 a = {}[0]; }
 
    primitive get_and_fail(in<u8> rx) { sync auto a = get(rx); u8 a = {}[0]; }
 
    primitive put_and_exit(out<u8> tx) { sync put(tx, 2); }
 
    primitive get_and_exit(in<u8> rx) { sync auto a = get(rx); }
 

	
 
    composite constructor() {
 
        {
 
            channel a -> b;
 
            new put_and_fail(a);
 
            new get_and_exit(b);
 
        }
 
        {
 
            channel a -> b;
 
            new get_and_exit(b);
 
            new put_and_fail(a);
 
        }
 
        {
 
            channel a -> b;
 
            new put_and_exit(a);
 
            new get_and_fail(b);
 
        }
 
        {
 
            channel a -> b;
 
            new get_and_fail(b);
 
            new put_and_exit(a);
 
        }
 
    }
 
    ", "constructor", no_args());
 
}
 
\ No newline at end of file
src/runtime2/tests/mod.rs
Show inline comments
 
@@ -3,7 +3,20 @@ use crate::protocol::eval::*;
 
use crate::runtime2::runtime::*;
 
use crate::runtime2::component::{CompCtx, CompPDL};
 

	
 
fn create_component(rt: &Runtime, module_name: &str, routine_name: &str, args: ValueGroup) {
 
mod error_handling;
 

	
 
const LOG_LEVEL: LogLevel = LogLevel::Debug;
 
const NUM_THREADS: u32 = 4;
 

	
 
pub(crate) fn compile_and_create_component(source: &str, routine_name: &str, args: ValueGroup) {
 
    let protocol = ProtocolDescription::parse(source.as_bytes())
 
        .expect("successful compilation");
 
    let runtime = Runtime::new(NUM_THREADS, LOG_LEVEL, protocol)
 
        .expect("successful runtime startup");
 
    create_component(&runtime, "", routine_name, args);
 
}
 

	
 
pub(crate) fn create_component(rt: &Runtime, module_name: &str, routine_name: &str, args: ValueGroup) {
 
    let prompt = rt.inner.protocol.new_component(
 
        module_name.as_bytes(), routine_name.as_bytes(), args
 
    ).expect("create prompt");
 
@@ -14,7 +27,7 @@ fn create_component(rt: &Runtime, module_name: &str, routine_name: &str, args: V
 
    rt.inner.enqueue_work(key);
 
}
 

	
 
fn no_args() -> ValueGroup { ValueGroup::new_stack(Vec::new()) }
 
pub(crate) fn no_args() -> ValueGroup { ValueGroup::new_stack(Vec::new()) }
 

	
 
#[test]
 
fn test_component_creation() {
 
@@ -24,7 +37,7 @@ fn test_component_creation() {
 
        auto b = 5 + a;
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(1, true, pd).unwrap();
 
    let rt = Runtime::new(1, LOG_LEVEL, pd).unwrap();
 

	
 
    for _i in 0..20 {
 
        create_component(&rt, "", "nothing_at_all", no_args());
 
@@ -81,7 +94,7 @@ fn test_component_communication() {
 
        new sender(o_mrmm, 5, 5);
 
        new receiver(i_mrmm, 5, 5);
 
    }").expect("compilation");
 
    let rt = Runtime::new(3, true, pd).unwrap();
 
    let rt = Runtime::new(3, LOG_LEVEL, pd).unwrap();
 
    create_component(&rt, "", "constructor", no_args());
 
}
 

	
 
@@ -130,7 +143,7 @@ fn test_intermediate_messenger() {
 
        new constructor_template<s64>();
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(3, true, pd).unwrap();
 
    let rt = Runtime::new(3, LOG_LEVEL, pd).unwrap();
 
    create_component(&rt, "", "constructor", no_args());
 
}
 

	
 
@@ -180,7 +193,7 @@ fn test_simple_select() {
 
        new sender(tx_b, num_sends);
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(3, true, pd).unwrap();
 
    let rt = Runtime::new(3, LOG_LEVEL, pd).unwrap();
 
    create_component(&rt, "", "constructor", no_args());
 
}
 

	
 
@@ -202,7 +215,7 @@ fn test_unguarded_select() {
 
        }
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(3, false, pd).unwrap();
 
    let rt = Runtime::new(3, LOG_LEVEL, pd).unwrap();
 
    create_component(&rt, "", "constructor_outside_select", no_args());
 
    create_component(&rt, "", "constructor_inside_select", no_args());
 
}
 
@@ -218,7 +231,7 @@ fn test_empty_select() {
 
        }
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(3, false, pd).unwrap();
 
    let rt = Runtime::new(3, LOG_LEVEL, pd).unwrap();
 
    create_component(&rt, "", "constructor", no_args());
 
}
 

	
 
@@ -244,7 +257,7 @@ fn test_random_u32_temporary_thingo() {
 
        new random_taker(rx, num_values);
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(1, true, pd).unwrap();
 
    let rt = Runtime::new(1, LOG_LEVEL, pd).unwrap();
 
    create_component(&rt, "", "constructor", no_args());
 
}
 

	
 
@@ -374,6 +387,6 @@ fn test_sending_receiving_union() {
 
        new client(cmd_tx, data_rx, num_rounds);
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(1, false, pd).unwrap();
 
    let rt = Runtime::new(1, LOG_LEVEL, pd).unwrap();
 
    create_component(&rt, "", "main", no_args());
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)