Changeset - e7e7211531c8
[Not reviewed]
0 5 1
mh - 3 years ago 2022-04-21 08:50:30
contact@maxhenger.nl
Initial error-handling tests
6 files changed with 43 insertions and 4 deletions:
0 comments (0 inline, 0 general)
docs/runtime/sync.md
Show inline comments
 
@@ -107,97 +107,97 @@ However, there are some properties that we can take advantage of:
 

	
 
Note that although the message transmitter may not be certain about its final recipient, the components along the way *are* aware of the routing that is necessary to make the message arrive at the intended target. Combing back to the case where we have a creator `C`, new component `N` and peer `P`. Then `P` will send a message intended for `N`, but arriving at `C`. Here `C` can change the target port and component to `N` (as it is in the process of transferring that port, so knows both its original and new port ID). Once the message arrives and is accepted by the recipient then it is certain about the component and port IDs.
 

	
 
## Sending Sync Messages
 

	
 
Sync messages have the purpose of making sure that consensus is reached about the interactions that took place in all of the participating components' sync blocks. The previous runtime featured speculative execution and a branching execution model: a component could exhibit multiple behaviours, and at the end all components decide which combination of local behaviours constitute a satisfying single global behaviour (if one exists at all). Without speculative execution the model can be a lot simpler.
 

	
 
We'll only shortly discuss the mechanisms that are present in the synchronization algorithm. A component has a local counter, that is reset for each synchronous interaction, that is used when transmitting data messages. Such a message will be annotated with the counter at `N`, after which the component sends the next message with annotation `N+1`. At the same time the component will keep track of a local mapping from port ID to port annotation, we'll call this the port mapping. Likewise, when a component receives a data message it will assign the received annotation in its own port mapping. If two components assign the same annotation to the ports that constitute a channel, then there is an agreeable interaction there.
 

	
 
And so at the end of the synchronous round a component will somehow broadcast its port mapping. Note from the discussion above that a transmitting port's annotation is only associated with that transmitting port, since a transmitting port can only truly ever know its own component/port ID. While the receiving port's annotation knows about the peer's component/port ID as well. And so a component can broadcast `(component ID, port ID, mapping)` for each of its transmitting ports, while it can broadcast `(own component ID, own port ID, peer component ID, peer port ID, mapping)` for each receiving port. Then a recipient of these mappings can match them up and make sure that the mappings agree.
 

	
 
Note that this broadcasting of synchronous messages is essentially a component-to-component operation. However these messages must still be sent over ports anyway (and any port that was used to transmit messages to a particular receiving component will do). There are two reasons:
 

	
 
1. The sync message may need to be rerouted (e.g. a sender quickly fires both a data message and a subsequent sync message while the receiving port is being transferred to a new component), but needs to arrive at the same target as the data message. This is essentially restating that a transmitter never knows about the component ID of the recipient.
 
2. The sync message must not be taken into account by the recipient if it has not accepted any messages from the sender yet. Ofcourse this can be achieved in various ways but a simple way to achieve this is to send the sync message over ports.
 

	
 
## Annotating Data Messages
 

	
 
These port mappings are also sent along when sending data messages. We will not go into details but here the mapping makes sure that messages arrive in the right order, and certain kinds of deadlock or inconsistent protocol behaviour may be detected. This port mapping is checked for consistency by the recipient and, when consistent, the target port is updated with its new mapping.
 

	
 
As we'll send along this mapping we will only consider the ports that are shared between the two components. But in the most general case the transmitting ports of the component do not have knowledge about the peer component. And so the sent port mapping will have to contain the annotation for *all* transmitting ports. Receiving port mappings only have to be sent along if they received a message, and here we can indeed apply filtering. Likewise, if the recipient of a port mapping has not yet received anything on its receiving port, then it cannot be sure about the identity of the sender.
 

	
 
This leads to problems both for ensuring the correct ordering of the messages. For finding consensus it is not. Suppose that a port `a` sends a message to a port `b`. Port `b` does not accept it. Upon trying to find consensus we see that `a` will submit an entry in its port mapping, and `b` does not submit anything at all. Hence no solution can be found, as desired.
 

	
 
For the message ordering we require from the receiver that it confirms that, for all of the shared channels, it has the same mapping as the sender sends along. Suppose a component `A` has ports `a_put` and `b_put`, while a component `B` has ports `a_get` and `b_get` (where `a_put -> a_get` and `b_put -> b_get`). Suppose component `A` sends on `a_put` and `b_put` consecutively. And component `B` only receives from `b_get`. Then since `a_get` did not receive from `a_put` (hence did not learn that component/port ID pair of `a_put` is associated with `a_get`), the component `B` cannot figure out that `a_get` should precede a `b_get`. Likewise component `A` has no way to know that `a_put` and `b_put` are sending to the same component, hence it cannot indicate to component `B` that `a_get` should precede `b_get`.
 

	
 
There are some simple assumptions we can make that makes the problem a little bit easier to think about. Suppose again `a_put -> a_get` and `b_put -> b_get`. Suppose `a_put` is used first, where we send along the mapping of `a_put` and `b_put`. Then we send along `b_put`, again sending along the mapping. Then it is possible for the receiving component to accept the wrong message first (e.g. `b_get`, therefore learning about `b`), but it will be impossible to get from `a_get` later, since that one requires `b_put` (of which we learned that it matches `b_get`) to not have sent any messages.
 

	
 
Without adding any extra overhead (e.g. some kind of discovery round per synchronous interaction), we can take three approaches:
 

	
 
1. We simply don't care. It is impossible for a round where messages are received out of order to complete. Hence we temporarily allow a component to take the wrong actions, therefore wasting some CPU time, and to crash/error afterward.
 
2. We remove the entire concept of ordering of channels at a single component. Channels are always independent entities. This way we truly do not have to care. All we care about is that the messages that have been sent over a channel arrive at the other side.
 
3. We slightly modify the algorithm to detect these problems. This can be done in reasonable fashion, albeit a bit "hacky". For each channel there is a slot to receive messages. Messages wait there until the receiver performs a `get` in the PDL code. So far we've only considered learning about the component/port IDs that constitute a channel the moment they're received with a `get`. The algorithm could be changed to already learn about the peer component/port ID the moment the message arrives in the slot.
 

	
 
We'll go with the last option in the current implementation. We return to the problematic example above. Note that messages between components are sent in ordered fashion, and `a_put` happens before `b_put`. Then component `B` will first learn that `a_put` is the peer of `a_get`, then it performs the first `get` on the message from `b_put` to `b_get`. This message is annotated with a port mapping that `a_put` has been used before. We're now able to detect at component `B` that we cannot accept `b_get` before `a_get`.
 

	
 
Concluding:
 

	
 
- Every data message that is transmitted needs to contain the port mapping of all `put`ting ports (annotating them appropriately if they have not yet been used). We also need to include the port mapping of all `get`ting ports that have a pending/received message. The port mapping for `put`ting ports will only include their own ID, the port mapping for `get`ting ports will include the IDs of their peer as well.
 
- Every arriving data message will immediately be used to identify the sender as the peer of the corresponding `get`ter port. Since messages between components arrive in order this allows us to detect when the `put`s are in a different order at the sender as the `get`s at the receiver.
 

	
 
## Handling Fatal Component Errors
 

	
 
Components may, during their execution, encounter errors that prevent them from continuing executing their code. For the purposes of this chapter we may consider these to occur during two particular phases of their execution:
 

	
 
1. The error occurred outside of a sync block. Or equivalently (from the point of view of the runtime): the error ocurred inside a sync block, but the component has not interacted with other components through `put`/`get` calls.
 
2. The error occurred inside of a sync block. The component can have performed any number of `put`/`get` calls. But for the sake of discussion we will only discuss the case where we perform:
 
   1. One `put` in the synchronous round.
 
   2. One `get` in the synchronous round.
 

	
 
As a preliminary remark: note that encountering an error is nothing special: the component can simply print an error to `stdout` and stop executing. The handling of the error by peers is of importance! If an interaction is made impossible because a peer has stopped executing, then the component that wishes to perform that interaction should error out itself!
 

	
 
### Handling Errors Outside of a Sync Block
 

	
 
If a component `E` encounters a critical error outside of a sync block. Then we can be sure that if it had a lat synchronous round, that it succeeded. However, there might be future synchronous rounds for component `E`, likewise a peer component `C` might have already put a message in `E`'s inbox.
 

	
 
The requirement for the outside-sync error of `E` is that any future sync interactions by `C` will fail (but, if `C` has no future interactions, it shouldn't fail either!). 
 

	
 
Note that `E` cannot perform `put`/`get` requests, because we're assuming `E` is outside of a sync block. Hence the only possible failing interaction is that `C` has performed a `put`, or is attempting a `get`. In the case the `C` `put`s to `E`, then `E` might not have figured out the identity of `C` yet (see earlier remarks on the eventual consistency of peer detection). Hence `C` is responsible for ensuring its own correct shutdown due to a failing `put`. Likewise for a `get`: `C` cannot receive from `E` if it is failing. So if `C` is waiting on a message to arrive, or if it will call `get` in the future, then `C` must fail as well.
 

	
 
In this case it is sufficient for `E` to send around a `ClosePort` message. As detailed in another chapter of this document. However, a particular race condition might occur. We have assumed that `E` is not in a sync block. But `C` is not aware of this fact. `C` might not be able to distinguish between the following three cases:
 

	
 
1. Regular shutdown: Components `C` and `E` are not in a sync round.
 
   - `E` broadcasts `ClosePort`.
 
   - `C` receives `ClosePort`.
 
2. Shutdown within a sync round, `ClosePort` leads `Solution`: A leader component `L`, peer component `C` and failing component `E`. Assume that all are/were busy in a synchronous round with one another.
 
   - `L` broadcasts `Solution` for the current sync round.
 
   - `E` receives `Solution`, finishes round. 
 
   - `E` encounters an error, so sends `ClosePort` to `C`.
 
   - `C` receives `ClosePort` from `E`.
 
   - `C` receives `Solution` from `L`.
 
3. Shutdown within a sync round, `Solution` leads `ClosePort`: Same components `L`, `C` and `E`.
 
   - `L` broadcasts `Solution` for the current sync round.
 
   - `E` receives `Solution` finishes round.
 
   - `E` encounters an error, so sends `ClosePort` to `C`.
 
   - `C` receives `Solution` from `L`.
 
   - `C` receives `ClosePort` from `E`.
 

	
 
In all described cases `E` encounters an error after finishing a sync round. But from the point of view of `C` it is unsure whether the `ClosePort` message pertains to the current synchronous round or not. In case 1 and 3 nothing is out of the ordinary. But in case 2 we have that `C` is at a particular point in time aware of the `ClosePort` from `E`, but not yet of the `Solution` from `L`. `C` should not fail the sync round, as it is completed, but it is unaware of this fact.
 

	
 
As a rather simple solution, since components that are participating with one another in a sync round move in lock-step at the end of the sync block, we send a boolean along with the `ClosePort`, e.g. `ClosePort(nonsync)`. This boolean indicates whether `E` was inside or outside of a sync block during it encountering an error. Now `C` can distinguish between the three cases: in all cases it agrees that `E` was not in a sync block (and hence: the sync round in cases 2 and 3 can be completed).
 

	
 
### Handling Errors Inside of a Sync Block
 

	
 
If `E` is inside of a sync block. Then it has interacted with other components. Our requirement now is that the sync round fails (and ofcourse, that all of the peers are notified that `E` will no longer be present in the runtime). There are two things that are complicating this type of failure:
 

	
 
1. Suppose that in the successful case of the synchronous interaction, there are a large number of components interacting with one another. Now it might be that `E` fails very early in its sync block, such that it cannot interact with several components. This lack of interaction might cause the single sync block to break up into several smaller sync blocks. Each of these separated regions is supposed to fail.
 
2. Within a particular synchronous interaction we might have that the leader `L` has a reference to the component `E` without it being a direct peer. There is a reference counting system in place that makes sure that `L` can always send messages to `E`. But we still need to make sure that those references stay alive for as long as needed. 
 

	
 
Suppose a synchronous region is (partially) established, and the component `E` encounters a critical error. The two points given above imply that two processes need to be initiated. For the first error-handling process, we simply use the same scheme as described in the case where `E` is not in a synchronous region. However now we broadcast `ClosePort(sync)` instead of `ClosePort(nonsync)` messages. Consider the following two cases: 
 

	
 
1. Component `C` is not part of the same synchronous region as `E`. And component `C` has tried `put`ting to `E`. If `C` receives a `ClosePort(sync)`, then it knows that its interaction should fail. Note: it might be that `E` wasn't planning on `get`ting from `C` in the sync round in which `E` failed, but much later. In that case it still makes sense for `C` to fail; it would have failed in the future. A small inconsistency here (within the current infinitely-deadlocking implementation) is that if `E` would *never* `get` from `C`, then `C` would deadlock instead of crash (one could argue that this implies that deadlocking should lead to crashing through a timeout mechanism).
 
2. Component `C` is not part of the same synchronous region as `E`. And if `E` wouldn't have crashed, then it would've `put` a message to `C`. In this case it is still proper for `C` to crash. The reasoning works the same as above.
 

	
 
So that is to say that this `ClosePort(sync)` causes instant failure of `C` if it has used the closed port in a round without consensus, or if it uses that port in the future. Note that this `ClosePort(sync)` system causes cascading failures throughout the disjoint synchronous regions. This is as intended: once one component's PDL program can no longer be executed, we cannot depend on the discovery of all the peers that constitute the intended synchronous region. So instead we rely on a peer-to-peer mechanism to make sure that every component is notified of failure.
 

	
 
However, while these cascading peer-to-peer `ClosePort(sync)` messages are happily shared around, we still have a leader component somewhere, and components that have not yet been notified of the failure. Here we can make several design choices to 
 
\ No newline at end of file
 
However, while these cascading peer-to-peer `ClosePort(sync)` messages are happily shared around, we still have a leader component somewhere, and components that have not yet been notified of the failure.
 
\ No newline at end of file
src/runtime2/component/component.rs
Show inline comments
 
@@ -332,192 +332,193 @@ pub(crate) fn default_handle_received_data_message(
 
    let port_info = comp_ctx.get_port_mut(port_handle);
 
    port_info.last_instruction = port_instruction;
 

	
 
    if port_info.state == PortState::Closed {
 
        return Err((
 
            port_info.last_instruction,
 
            format!("Cannot 'get' because the channel is closed"))
 
        );
 
    }
 

	
 
    // Check if there are any more messages in the backup buffer
 
    let port_info = comp_ctx.get_port(port_handle);
 
    for message_index in 0..inbox_backup.len() {
 
        let message = &inbox_backup[message_index];
 
        if message.data_header.target_port == targeted_port {
 
            // One more message, place it in the slot
 
            let message = inbox_backup.remove(message_index);
 
            debug_assert!(port_info.state.is_blocked()); // since we're removing another message from the backup
 
            *slot = Some(message);
 

	
 
            return Ok(());
 
        }
 
    }
 

	
 
    // Did not have any more messages, so if we were blocked, then we need to
 
    // unblock the port now (and inform the peer of this unblocking)
 
    if port_info.state == PortState::BlockedDueToFullBuffers {
 
        comp_ctx.set_port_state(port_handle, PortState::Open);
 
        let (peer_handle, message) = control.cancel_port_blocking(comp_ctx, port_handle);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 
        peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(message), true);
 
    }
 

	
 
    return Ok(());
 
}
 

	
 
/// Handles control messages in the default way. Note that this function may
 
/// take a lot of actions in the name of the caller: pending messages may be
 
/// sent, ports may become blocked/unblocked, etc. So the execution
 
/// (`CompExecState`), control (`ControlLayer`) and consensus (`Consensus`)
 
/// state may all change.
 
pub(crate) fn default_handle_control_message(
 
    exec_state: &mut CompExecState, control: &mut ControlLayer, consensus: &mut Consensus,
 
    message: ControlMessage, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx
 
) -> Result<(), (PortInstruction, String)> {
 
    match message.content {
 
        ControlMessageContent::Ack => {
 
            default_handle_ack(control, message.id, sched_ctx, comp_ctx);
 
        },
 
        ControlMessageContent::BlockPort(port_id) => {
 
            // One of our messages was accepted, but the port should be
 
            // blocked.
 
            let port_handle = comp_ctx.get_port_handle(port_id);
 
            let port_info = comp_ctx.get_port(port_handle);
 
            debug_assert_eq!(port_info.kind, PortKind::Putter);
 
            if port_info.state == PortState::Open {
 
                // only when open: we don't do this when closed, and we we don't do this if we're blocked due to peer changes
 
                comp_ctx.set_port_state(port_handle, PortState::BlockedDueToFullBuffers);
 
            }
 
        },
 
        ControlMessageContent::ClosePort(content) => {
 
            // Request to close the port. We immediately comply and remove
 
            // the component handle as well
 
            let port_handle = comp_ctx.get_port_handle(content.port_to_close);
 

	
 
            // We're closing the port, so we will always update the peer of the
 
            // port (in case of error messages)
 
            let port_info = comp_ctx.get_port_mut(port_handle);
 
            port_info.peer_comp_id = message.sender_comp_id;
 

	
 
            let port_info = comp_ctx.get_port(port_handle);
 
            let peer_comp_id = port_info.peer_comp_id;
 
            let peer_handle = comp_ctx.get_peer_handle(peer_comp_id);
 

	
 
            // One exception to sending an `Ack` is if we just closed the
 
            // port ourselves, meaning that the `ClosePort` messages got
 
            // sent to one another.
 
            if let Some(control_id) = control.has_close_port_entry(port_handle, comp_ctx) {
 
                // The two components (sender and this component) are closing
 
                // the channel at the same time.
 
                default_handle_ack(control, control_id, sched_ctx, comp_ctx);
 
            } else {
 
                // Respond to the message
 
                let last_instruction = port_info.last_instruction;
 
                let port_was_used = last_instruction != PortInstruction::None;
 
                default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx);
 
                comp_ctx.remove_peer(sched_ctx, port_handle, peer_comp_id, false); // do not remove if closed
 
                comp_ctx.set_port_state(port_handle, PortState::Closed); // now set to closed
 

	
 
                // Make sure that we've not reached an error condition. Note
 
                // that if this condition is not met, then we don't error out
 
                // now, but we may error out in the next sync block when we
 
                // try to `put`/`get` on the port. This condition makes sure
 
                // that if we have a successful sync round, followed by the peer
 
                // closing the port, that we don't consider the sync round to
 
                // have failed by mistake.
 
                let error_due_to_port_use =  
 
                if content.closed_in_sync_round && exec_state.mode.is_in_sync_block() && port_was_used {
 
                    return Err((
 
                        last_instruction,
 
                        format!("Peer component (id:{}) shut down, so previous communication cannot have succeeded", peer_comp_id.0)
 
                    ));
 
                }
 
            }
 
        },
 
        ControlMessageContent::UnblockPort(port_id) => {
 
            // We were previously blocked (or already closed)
 
            let port_handle = comp_ctx.get_port_handle(port_id);
 
            let port_info = comp_ctx.get_port(port_handle);
 
            debug_assert_eq!(port_info.kind, PortKind::Putter);
 
            if port_info.state == PortState::BlockedDueToFullBuffers {
 
                default_handle_unblock_put(exec_state, consensus, port_handle, sched_ctx, comp_ctx);
 
            }
 
        },
 
        ControlMessageContent::PortPeerChangedBlock(port_id) => {
 
            // The peer of our port has just changed. So we are asked to
 
            // temporarily block the port (while our original recipient is
 
            // potentially rerouting some of the in-flight messages) and
 
            // Ack. Then we wait for the `unblock` call.
 
            debug_assert_eq!(message.target_port_id, Some(port_id));
 
            let port_handle = comp_ctx.get_port_handle(port_id);
 
            comp_ctx.set_port_state(port_handle, PortState::BlockedDueToPeerChange);
 

	
 
            let port_info = comp_ctx.get_port(port_handle);
 
            let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 

	
 
            default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx);
 
        },
 
        ControlMessageContent::PortPeerChangedUnblock(new_port_id, new_comp_id) => {
 
            let port_handle = comp_ctx.get_port_handle(message.target_port_id.unwrap());
 
            let port_info = comp_ctx.get_port(port_handle);
 
            debug_assert!(port_info.state == PortState::BlockedDueToPeerChange);
 
            let old_peer_id = port_info.peer_comp_id;
 

	
 
            comp_ctx.remove_peer(sched_ctx, port_handle, old_peer_id, false);
 

	
 
            let port_info = comp_ctx.get_port_mut(port_handle);
 
            port_info.peer_comp_id = new_comp_id;
 
            port_info.peer_port_id = new_port_id;
 
            comp_ctx.add_peer(port_handle, sched_ctx, new_comp_id, None);
 
            default_handle_unblock_put(exec_state, consensus, port_handle, sched_ctx, comp_ctx);
 
        }
 
    }
 

	
 
    return Ok(());
 
}
 

	
 
/// Handles a component initiating the exiting procedure, and closing all of its
 
/// ports. Should only be called once per component (which is ensured by
 
/// checking and modifying the mode in the execution state).
 
#[must_use]
 
pub(crate) fn default_handle_start_exit(
 
    exec_state: &mut CompExecState, control: &mut ControlLayer,
 
    sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus
 
) -> CompScheduling {
 
    debug_assert_eq!(exec_state.mode, CompMode::StartExit);
 
    sched_ctx.log("Component starting exit");
 
    exec_state.mode = CompMode::BusyExit;
 
    let exit_inside_sync = exec_state.exit_reason.is_in_sync();
 

	
 
    // If exiting while inside sync mode, report to the leader of the current
 
    // round that we've failed.
 
    if exit_inside_sync {
 
        let decision = consensus.notify_sync_end_failure(sched_ctx, comp_ctx);
 
        default_handle_sync_decision(sched_ctx, exec_state, decision, consensus);
 
    }
 

	
 
    // Iterating over ports by index to work around borrowing rules
 
    for port_index in 0..comp_ctx.num_ports() {
 
        let port = comp_ctx.get_port_by_index_mut(port_index);
 
        if port.state == PortState::Closed {
 
            // Already closed, or in the process of being closed
 
            continue;
 
        }
 

	
 
        // Mark as closed
 
        let port_id = port.self_id;
 
        port.state = PortState::Closed;
 

	
 
        // Notify peer of closing
 
        let port_handle = comp_ctx.get_port_handle(port_id);
 
        let (peer, message) = control.initiate_port_closing(port_handle, exit_inside_sync, comp_ctx);
 
        let peer_info = comp_ctx.get_peer(peer);
 
        peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(message), true);
 
    }
 

	
 
    return CompScheduling::Immediate; // to check if we can shut down immediately
 
}
 

	
 
/// Handles a component waiting until all peers are notified that it is quitting
 
/// (i.e. after calling `default_handle_start_exit`).
 
#[must_use]
 
pub(crate) fn default_handle_busy_exit(
src/runtime2/component/component_internet.rs
Show inline comments
 
@@ -114,192 +114,194 @@ impl Component for ComponentTcpClient {
 
                    component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message);
 
                }
 
            },
 
            Message::Poll => {
 
                sched_ctx.log("Received polling event");
 
            },
 
        }
 
    }
 

	
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling {
 
        sched_ctx.log(&format!("Running component ComponentTcpClient (mode: {:?}, sync state: {:?})", self.exec_state.mode, self.sync_state));
 

	
 
        match self.exec_state.mode {
 
            CompMode::BlockedSelect => {
 
                // Not possible: we never enter this state
 
                unreachable!();
 
            },
 
            CompMode::NonSync => {
 
                // When in non-sync mode
 
                match &mut self.socket_state {
 
                    SocketState::Connected(_socket) => {
 
                        if self.sync_state == SyncState::FinishSyncThenQuit {
 
                            // Previous request was to let the component shut down
 
                            self.exec_state.set_as_start_exit(ExitReason::Termination);
 
                        } else {
 
                            // Reset for a new request
 
                            self.sync_state = SyncState::AwaitingCmd;
 
                            self.consensus.notify_sync_start(comp_ctx);
 
                            self.exec_state.mode = CompMode::Sync;
 
                        }
 
                        return CompScheduling::Immediate;
 
                    },
 
                    SocketState::Error => {
 
                        // Could potentially send an error message to the
 
                        // connected component.
 
                        self.exec_state.set_as_start_exit(ExitReason::ErrorNonSync);
 
                        return CompScheduling::Immediate;
 
                    }
 
                }
 
            },
 
            CompMode::Sync => {
 
                // When in sync mode: wait for a command to come in
 
                match self.sync_state {
 
                    SyncState::AwaitingCmd => {
 
                        if let Some(message) = &self.inbox_main {
 
                            self.consensus.handle_incoming_data_message(comp_ctx, &message);
 
                            if self.consensus.try_receive_data_message(sched_ctx, comp_ctx, &message) {
 
                                // Check which command we're supposed to execute.
 
                                let message = self.inbox_main.take().unwrap();
 
                                let target_port_id = message.data_header.target_port;
 
                                let receive_result = component::default_handle_received_data_message(
 
                                    target_port_id, PortInstruction::NoSource,
 
                                    &mut self.inbox_main, &mut self.inbox_backup,
 
                                    comp_ctx, sched_ctx, &mut self.control
 
                                );
 

	
 
                                if let Err(location_and_message) = receive_result {
 
                                    component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message);
 
                                    return CompScheduling::Immediate;
 
                                } else {
 
                                    let (tag_value, embedded_heap_pos) = message.content.values[0].as_union();
 
                                    if tag_value == self.input_union_send_tag_value {
 
                                        // Retrieve bytes from the message
 
                                        self.byte_buffer.clear();
 
                                        let union_content = &message.content.regions[embedded_heap_pos as usize];
 
                                        debug_assert_eq!(union_content.len(), 1);
 
                                        let array_heap_pos = union_content[0].as_array();
 
                                        let array_values = &message.content.regions[array_heap_pos as usize];
 
                                        self.byte_buffer.reserve(array_values.len());
 
                                        for value in array_values {
 
                                            self.byte_buffer.push(value.as_uint8());
 
                                        }
 

	
 
                                        self.sync_state = SyncState::Putting;
 
                                        return CompScheduling::Immediate;
 
                                    } else if tag_value == self.input_union_receive_tag_value {
 
                                        // Component requires a `recv`
 
                                        self.sync_state = SyncState::Getting;
 
                                        return CompScheduling::Immediate;
 
                                    } else if tag_value == self.input_union_finish_tag_value {
 
                                        // Component requires us to end the sync round
 
                                        self.sync_state = SyncState::FinishSync;
 
                                        return CompScheduling::Immediate;
 
                                    } else if tag_value == self.input_union_shutdown_tag_value {
 
                                        // Component wants to close the connection
 
                                        self.sync_state = SyncState::FinishSyncThenQuit;
 
                                        return CompScheduling::Immediate;
 
                                    } else {
 
                                        unreachable!("got tag_value {}", tag_value)
 
                                    }
 
                                }
 
                            } else {
 
                                todo!("handle sync failure due to message deadlock");
 
                                return CompScheduling::Sleep;
 
                            }
 
                        } else {
 
                            let port_handle = comp_ctx.get_port_handle(self.pdl_input_port_id);
 
                            comp_ctx.get_port_mut(port_handle).last_instruction = PortInstruction::NoSource;
 
                            self.exec_state.set_as_blocked_get(self.pdl_input_port_id);
 
                            return CompScheduling::Sleep;
 
                        }
 
                    },
 
                    SyncState::Putting => {
 
                        // We're supposed to send a user-supplied message fully
 
                        // over the socket. But we might end up blocking. In
 
                        // that case the component goes to sleep until it is
 
                        // polled.
 
                        let socket = self.socket_state.get_socket();
 
                        while !self.byte_buffer.is_empty() {
 
                            match socket.send(&self.byte_buffer) {
 
                                Ok(bytes_sent) => {
 
                                    self.byte_buffer.drain(..bytes_sent);
 
                                },
 
                                Err(err) => {
 
                                    if err.kind() == IoErrorKind::WouldBlock {
 
                                        return CompScheduling::Sleep; // wait until notified
 
                                    } else {
 
                                        todo!("handle socket.send error {:?}", err)
 
                                    }
 
                                }
 
                            }
 
                        }
 

	
 
                        // If here then we're done putting the data, we can
 
                        // finish the sync round
 
                        let decision = self.consensus.notify_sync_end_success(sched_ctx, comp_ctx);
 
                        self.exec_state.mode = CompMode::SyncEnd;
 
                        component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, decision, &mut self.consensus);
 
                        return CompScheduling::Immediate;
 
                    },
 
                    SyncState::Getting => {
 
                        // We're going to try and receive a single message. If
 
                        // this causes us to end up blocking the component
 
                        // goes to sleep until it is polled.
 
                        const BUFFER_SIZE: usize = 1024; // TODO: Move to config
 

	
 
                        let socket = self.socket_state.get_socket();
 
                        self.byte_buffer.resize(BUFFER_SIZE, 0);
 
                        match socket.receive(&mut self.byte_buffer) {
 
                            Ok(num_received) => {
 
                                self.byte_buffer.resize(num_received, 0);
 
                                let message_content = self.bytes_to_data_message_content(&self.byte_buffer);
 
                                let send_result = component::default_send_data_message(&mut self.exec_state, self.pdl_output_port_id, PortInstruction::NoSource, message_content, sched_ctx, &mut self.consensus, comp_ctx);
 
                                if let Err(location_and_message) = send_result {
 
                                    component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message);
 
                                    return CompScheduling::Immediate;
 
                                } else {
 
                                    let scheduling = send_result.unwrap();
 
                                    self.sync_state = SyncState::AwaitingCmd;
 
                                    return scheduling;
 
                                }
 
                            },
 
                            Err(err) => {
 
                                if err.kind() == IoErrorKind::WouldBlock {
 
                                    return CompScheduling::Sleep; // wait until polled
 
                                } else {
 
                                    todo!("handle socket.receive error {:?}", err)
 
                                }
 
                            }
 
                        }
 
                    },
 
                    SyncState::FinishSync | SyncState::FinishSyncThenQuit => {
 
                        let decision = self.consensus.notify_sync_end_success(sched_ctx, comp_ctx);
 
                        self.exec_state.mode = CompMode::SyncEnd;
 
                        component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, decision, &mut self.consensus);
 
                        return CompScheduling::Requeue;
 
                    },
 
                }
 
            },
 
            CompMode::BlockedGet => {
 
                // Entered when awaiting a new command
 
                debug_assert_eq!(self.sync_state, SyncState::AwaitingCmd);
 
                return CompScheduling::Sleep;
 
            },
 
            CompMode::SyncEnd | CompMode::BlockedPut =>
 
                return CompScheduling::Sleep,
 
            CompMode::StartExit =>
 
                return component::default_handle_start_exit(&mut self.exec_state, &mut self.control, sched_ctx, comp_ctx, &mut self.consensus),
 
            CompMode::BusyExit =>
 
                return component::default_handle_busy_exit(&mut self.exec_state, &mut self.control, sched_ctx),
 
            CompMode::Exit =>
 
                return component::default_handle_exit(&self.exec_state),
 
        }
 
    }
 
}
 

	
 
impl ComponentTcpClient {
 
    pub(crate) fn new(arguments: ValueGroup) -> Self {
 
        use std::net::{IpAddr, Ipv4Addr};
 

	
 
        debug_assert_eq!(arguments.values.len(), 4);
 

	
 
        // Parsing arguments
 
        let ip_heap_pos = arguments.values[0].as_array();
src/runtime2/component/component_pdl.rs
Show inline comments
 
@@ -225,192 +225,194 @@ pub(crate) struct CompPDL {
 
impl Component for CompPDL {
 
    fn on_creation(&mut self, _id: CompId, _sched_ctx: &SchedulerCtx) {
 
        // Intentionally empty
 
    }
 

	
 
    fn on_shutdown(&mut self, _sched_ctx: &SchedulerCtx) {
 
        // Intentionally empty
 
    }
 

	
 
    fn adopt_message(&mut self, comp_ctx: &mut CompCtx, message: DataMessage) {
 
        let port_handle = comp_ctx.get_port_handle(message.data_header.target_port);
 
        let port_index = comp_ctx.get_port_index(port_handle);
 
        if self.inbox_main[port_index].is_none() {
 
            self.inbox_main[port_index] = Some(message);
 
        } else {
 
            self.inbox_backup.push(message);
 
        }
 
    }
 

	
 
    fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, mut message: Message) {
 
        // sched_ctx.log(&format!("handling message: {:?}", message));
 
        if let Some(new_target) = self.control.should_reroute(&mut message) {
 
            let mut target = sched_ctx.runtime.get_component_public(new_target); // TODO: @NoDirectHandle
 
            target.send_message(&sched_ctx.runtime, message, false); // not waking up: we schedule once we've received all PortPeerChanged Acks
 
            let _should_remove = target.decrement_users();
 
            debug_assert!(_should_remove.is_none());
 
            return;
 
        }
 

	
 
        match message {
 
            Message::Data(message) => {
 
                self.handle_incoming_data_message(sched_ctx, comp_ctx, message);
 
            },
 
            Message::Control(message) => {
 
                if let Err(location_and_message) = component::default_handle_control_message(
 
                    &mut self.exec_state, &mut self.control, &mut self.consensus,
 
                    message, sched_ctx, comp_ctx
 
                ) {
 
                    self.handle_generic_component_error(sched_ctx, location_and_message);
 
                }
 
            },
 
            Message::Sync(message) => {
 
                self.handle_incoming_sync_message(sched_ctx, comp_ctx, message);
 
            },
 
            Message::Poll => {
 
                unreachable!(); // because we never register at the polling thread
 
            }
 
        }
 
    }
 

	
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling {
 
        use EvalContinuation as EC;
 

	
 
        sched_ctx.log(&format!("Running component (mode: {:?})", self.exec_state.mode));
 

	
 
        // Depending on the mode don't do anything at all, take some special
 
        // actions, or fall through and run the PDL code.
 
        match self.exec_state.mode {
 
            CompMode::NonSync | CompMode::Sync => {
 
                // continue and run PDL code
 
            },
 
            CompMode::SyncEnd | CompMode::BlockedGet | CompMode::BlockedPut | CompMode::BlockedSelect => {
 
                return CompScheduling::Sleep;
 
            }
 
            CompMode::StartExit => return component::default_handle_start_exit(
 
                &mut self.exec_state, &mut self.control, sched_ctx, comp_ctx, &mut self.consensus
 
            ),
 
            CompMode::BusyExit => return component::default_handle_busy_exit(
 
                &mut self.exec_state, &self.control, sched_ctx
 
            ),
 
            CompMode::Exit => return component::default_handle_exit(&self.exec_state),
 
        }
 

	
 
        let run_result = self.execute_prompt(&sched_ctx);
 
        if let Err(error) = run_result {
 
            self.handle_component_error(sched_ctx, CompError::Executor(error));
 
            return CompScheduling::Immediate;
 
        }
 

	
 
        let run_result = run_result.unwrap();
 

	
 
        match run_result {
 
            EC::Stepping => unreachable!(), // execute_prompt runs until this is no longer returned
 
            EC::BranchInconsistent | EC::NewFork | EC::BlockFires(_) => todo!("remove these"),
 
            // Results that can be returned in sync mode
 
            EC::SyncBlockEnd => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::Sync);
 
                self.handle_sync_end(sched_ctx, comp_ctx);
 
                return CompScheduling::Immediate;
 
            },
 
            EC::BlockGet(expr_id, port_id) => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::Sync);
 
                debug_assert!(self.exec_ctx.stmt.is_none());
 

	
 
                let port_id = port_id_from_eval(port_id);
 
                let port_handle = comp_ctx.get_port_handle(port_id);
 
                comp_ctx.get_port_mut(port_handle).last_instruction = PortInstruction::SourceLocation(expr_id);
 

	
 
                let port_index = comp_ctx.get_port_index(port_handle);
 
                if let Some(message) = &self.inbox_main[port_index] {
 
                    // Check if we can actually receive the message
 
                    if self.consensus.try_receive_data_message(sched_ctx, comp_ctx, message) {
 
                        // Message was received. Make sure any blocked peers and
 
                        // pending messages are handled.
 
                        let message = self.inbox_main[port_index].take().unwrap();
 
                        let receive_result = component::default_handle_received_data_message(
 
                            port_id, PortInstruction::SourceLocation(expr_id),
 
                            &mut self.inbox_main[port_index], &mut self.inbox_backup,
 
                            comp_ctx, sched_ctx, &mut self.control
 
                        );
 
                        if let Err(location_and_message) = receive_result {
 
                            self.handle_generic_component_error(sched_ctx, location_and_message);
 
                            return CompScheduling::Immediate
 
                        } else {
 
                            self.exec_ctx.stmt = ExecStmt::PerformedGet(message.content);
 
                            return CompScheduling::Immediate;
 
                        }
 
                    } else {
 
                        todo!("handle sync failure due to message deadlock");
 
                        return CompScheduling::Sleep;
 
                    }
 
                } else {
 
                    // We need to wait
 
                    self.exec_state.set_as_blocked_get(port_id);
 
                    return CompScheduling::Sleep;
 
                }
 
            },
 
            EC::Put(expr_id, port_id, value) => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::Sync);
 
                sched_ctx.log(&format!("Putting value {:?}", value));
 

	
 
                // Send the message
 
                let target_port_id = port_id_from_eval(port_id);
 
                let send_result = component::default_send_data_message(
 
                    &mut self.exec_state, target_port_id,
 
                    PortInstruction::SourceLocation(expr_id), value,
 
                    sched_ctx, &mut self.consensus, comp_ctx
 
                );
 
                if let Err(location_and_message) = send_result {
 
                    self.handle_generic_component_error(sched_ctx, location_and_message);
 
                    return CompScheduling::Immediate;
 
                } else {
 
                    // When `run` is called again (potentially after becoming
 
                    // unblocked) we need to instruct the executor that we performed
 
                    // the `put`
 
                    let scheduling = send_result.unwrap();
 
                    self.exec_ctx.stmt = ExecStmt::PerformedPut;
 
                    return scheduling;
 
                }
 
            },
 
            EC::SelectStart(num_cases, _num_ports) => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::Sync);
 
                self.select_state.handle_select_start(num_cases);
 
                return CompScheduling::Requeue;
 
            },
 
            EC::SelectRegisterPort(case_index, port_index, port_id) => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::Sync);
 
                let port_id = port_id_from_eval(port_id);
 
                if let Err(_err) = self.select_state.register_select_case_port(comp_ctx, case_index, port_index, port_id) {
 
                    todo!("handle registering a port multiple times");
 
                }
 
                return CompScheduling::Immediate;
 
            },
 
            EC::SelectWait => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::Sync);
 
                let select_decision = self.select_state.handle_select_waiting_point(&self.inbox_main, comp_ctx);
 
                if let SelectDecision::Case(case_index) = select_decision {
 
                    // Reached a conclusion, so we can continue immediately
 
                    self.exec_ctx.stmt = ExecStmt::PerformedSelectWait(case_index);
 
                    self.exec_state.mode = CompMode::Sync;
 
                    return CompScheduling::Immediate;
 
                } else {
 
                    // No decision yet
 
                    self.exec_state.mode = CompMode::BlockedSelect;
 
                    return CompScheduling::Sleep;
 
                }
 
            },
 
            // Results that can be returned outside of sync mode
 
            EC::ComponentTerminated => {
 
                self.exec_state.set_as_start_exit(ExitReason::Termination);
 
                return CompScheduling::Immediate;
 
            },
 
            EC::SyncBlockStart => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::NonSync);
 
                self.handle_sync_start(sched_ctx, comp_ctx);
 
                return CompScheduling::Immediate;
 
            },
 
            EC::NewComponent(definition_id, type_id, arguments) => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::NonSync);
 
                self.create_component_and_transfer_ports(
 
                    sched_ctx, comp_ctx,
 
                    definition_id, type_id, arguments
 
                );
 
                return CompScheduling::Requeue;
 
@@ -485,193 +487,193 @@ impl CompPDL {
 
    /// Handles end of sync. The conclusion to the sync round might arise
 
    /// immediately (and be handled immediately), or might come later through
 
    /// messaging. In any case the component should be scheduled again
 
    /// immediately
 
    fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        sched_ctx.log("Component ending sync mode (now waiting for solution)");
 
        let decision = self.consensus.notify_sync_end_success(sched_ctx, comp_ctx);
 
        self.exec_state.mode = CompMode::SyncEnd;
 
        component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, decision, &mut self.consensus);
 
    }
 

	
 
    fn handle_component_exit(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        sched_ctx.log(&format!("Component exiting (reason: {:?}", self.exec_state.exit_reason));
 
        debug_assert_eq!(self.exec_state.mode, CompMode::StartExit);
 
        self.exec_state.mode = CompMode::BusyExit;
 
        let exit_inside_sync = self.exec_state.exit_reason.is_in_sync();
 

	
 
        // Doing this by index, then retrieving the handle is a bit rediculous,
 
        // but Rust is being Rust with its borrowing rules.
 
        for port_index in 0..comp_ctx.num_ports() {
 
            let port = comp_ctx.get_port_by_index_mut(port_index);
 
            if port.state == PortState::Closed {
 
                // Already closed, or in the process of being closed
 
                continue;
 
            }
 

	
 
            // Mark as closed
 
            let port_id = port.self_id;
 
            port.state = PortState::Closed;
 

	
 
            // Notify peer of closing
 
            let port_handle = comp_ctx.get_port_handle(port_id);
 
            let (peer, message) = self.control.initiate_port_closing(port_handle, exit_inside_sync, comp_ctx);
 
            let peer_info = comp_ctx.get_peer(peer);
 
            peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(message), true);
 
        }
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Handling messages
 
    // -------------------------------------------------------------------------
 

	
 
    /// Handles a message that came in through the public inbox. This function
 
    /// will handle putting it in the correct place, and potentially blocking
 
    /// the port in case too many messages are being received.
 
    fn handle_incoming_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: DataMessage) {
 
        use component::IncomingData;
 

	
 
        // Whatever we do, glean information from headers in message
 
        if self.exec_state.mode.is_in_sync_block() {
 
            self.consensus.handle_incoming_data_message(comp_ctx, &message);
 
        }
 

	
 
        let port_handle = comp_ctx.get_port_handle(message.data_header.target_port);
 
        let port_index = comp_ctx.get_port_index(port_handle);
 
        match component::default_handle_incoming_data_message(
 
            &mut self.exec_state, &mut self.inbox_main[port_index], comp_ctx, message,
 
            sched_ctx, &mut self.control
 
        ) {
 
            IncomingData::PlacedInSlot => {
 
                if self.exec_state.mode == CompMode::BlockedSelect {
 
                    let select_decision = self.select_state.handle_updated_inbox(&self.inbox_main, comp_ctx);
 
                    if let SelectDecision::Case(case_index) = select_decision {
 
                        self.exec_ctx.stmt = ExecStmt::PerformedSelectWait(case_index);
 
                        self.exec_state.mode = CompMode::Sync;
 
                    }
 
                }
 
            },
 
            IncomingData::SlotFull(message) => {
 
                self.inbox_backup.push(message);
 
            }
 
        }
 
    }
 

	
 
    fn handle_incoming_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) {
 
        let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message);
 
        component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, decision, &mut self.consensus);
 
    }
 

	
 
    /// Handles an error coming from the generic `component::handle_xxx`
 
    /// functions. Hence accepts argument as a tuple.
 
    fn handle_generic_component_error(&mut self, sched_ctx: &SchedulerCtx, location_and_message: (PortInstruction, String)) {
 
        // Retrieve location and message, display in terminal
 
        let (location, message) = location_and_message;
 
        let error = match location {
 
            PortInstruction::None => CompError::Component(message),
 
            PortInstruction::NoSource => unreachable!(), // for debugging: all in-sync errors are associated with a source location
 
            PortInstruction::SourceLocation(expression_id) => {
 
                let protocol = &sched_ctx.runtime.protocol;
 
                CompError::Executor(EvalError::new_error_at_expr(
 
                    &self.prompt, &protocol.modules, &protocol.heap,
 
                    expression_id, message
 
                ))
 
            }
 
        };
 

	
 
        self.handle_component_error(sched_Ctx, error);
 
        self.handle_component_error(sched_ctx, error);
 
    }
 

	
 
    fn handle_component_error(&mut self, sched_ctx: &SchedulerCtx, error: CompError) {
 
        sched_ctx.error(&format!("{}", error));
 

	
 
        // Set state to handle subsequent error
 
        let exit_reason = if self.exec_state.mode.is_in_sync_block() {
 
            ExitReason::ErrorInSync
 
        } else {
 
            ExitReason::ErrorNonSync
 
        };
 

	
 
        self.exec_state.set_as_start_exit(exit_reason);
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Handling ports
 
    // -------------------------------------------------------------------------
 

	
 
    /// Creates a new component and transfers ports. Because of the stepwise
 
    /// process in which memory is allocated, ports are transferred, messages
 
    /// are exchanged, component lifecycle methods are called, etc. This
 
    /// function facilitates a lot of implicit assumptions (e.g. when the
 
    /// `Component::on_creation` method is called, the component is already
 
    /// registered at the runtime).
 
    fn create_component_and_transfer_ports(
 
        &mut self,
 
        sched_ctx: &SchedulerCtx, creator_ctx: &mut CompCtx,
 
        definition_id: ProcedureDefinitionId, type_id: TypeId, mut arguments: ValueGroup
 
    ) {
 
        struct PortPair{
 
            creator_handle: LocalPortHandle,
 
            creator_id: PortId,
 
            created_handle: LocalPortHandle,
 
            created_id: PortId,
 
        }
 
        let mut opened_port_id_pairs = Vec::new();
 
        let mut closed_port_id_pairs = Vec::new();
 

	
 
        let reservation = sched_ctx.runtime.start_create_pdl_component();
 
        let mut created_ctx = CompCtx::new(&reservation);
 

	
 
        let other_proc = &sched_ctx.runtime.protocol.heap[definition_id];
 
        let self_proc = &sched_ctx.runtime.protocol.heap[self.prompt.frames[0].definition];
 

	
 
        // dbg_code!({
 
        //     sched_ctx.log(&format!(
 
        //         "DEBUG: Comp '{}' (ID {:?}) is creating comp '{}' (ID {:?})",
 
        //         self_proc.identifier.value.as_str(), creator_ctx.id,
 
        //         other_proc.identifier.value.as_str(), reservation.id()
 
        //     ));
 
        // });
 

	
 
        // Take all the ports ID that are in the `args` (and currently belong to
 
        // the creator component) and translate them into new IDs that are
 
        // associated with the component we're about to create
 
        let mut arg_iter = ValueGroupPortIter::new(&mut arguments);
 
        while let Some(port_reference) = arg_iter.next() {
 
            // Create port entry for new component
 
            let creator_port_id = port_reference.id;
 
            let creator_port_handle = creator_ctx.get_port_handle(creator_port_id);
 
            let creator_port = creator_ctx.get_port(creator_port_handle);
 
            let created_port_handle = created_ctx.add_port(
 
                creator_port.peer_comp_id, creator_port.peer_port_id,
 
                creator_port.kind, creator_port.state
 
            );
 
            let created_port = created_ctx.get_port(created_port_handle);
 
            let created_port_id = created_port.self_id;
 

	
 
            let port_id_pair = PortPair {
 
                creator_handle: creator_port_handle,
 
                creator_id: creator_port_id,
 
                created_handle: created_port_handle,
 
                created_id: created_port_id,
 
            };
 

	
 
            if creator_port.state == PortState::Closed {
 
                closed_port_id_pairs.push(port_id_pair)
 
            } else {
 
                opened_port_id_pairs.push(port_id_pair);
 
            }
 

	
 
            // Modify value in arguments (bit dirty, but double vec in ValueGroup causes lifetime issues)
 
            let arg_value = if let Some(heap_pos) = port_reference.heap_pos {
 
                &mut arg_iter.group.regions[heap_pos][port_reference.index]
 
            } else {
 
                &mut arg_iter.group.values[port_reference.index]
 
            };
 
            match arg_value {
 
                Value::Input(id) => *id = port_id_to_eval(created_port_id),
 
                Value::Output(id) => *id = port_id_to_eval(created_port_id),
 
                _ => unreachable!(),
 
            }
 
        }
 

	
 
        // For each transferred port pair set their peer components to the
src/runtime2/tests/error_handling.rs
Show inline comments
 
new file 100644
 
use super::*;
 

	
 
#[test]
 
fn test_unconnected_component_error() {
 
    let pd = ProtocolDescription::parse(b"
 
    primitive interact_with_noone() {
 
        u8[] array = { 5 };
 
        auto value = array[1];
 
    }
 
    ").unwrap();
 
    let rt = Runtime::new(1, true, pd).unwrap();
 
    create_component(&rt, "", "interact_with_noone", no_args());
 
}
 

	
 
#[test]
 
fn test_connected_uncommunicating_component_error() {
 
    let pd = ProtocolDescription::parse(b"
 
    primitive crashing_and_burning(out<u32> unused) {
 
        u8[] array = { 1337 };
 
        auto value = array[1337];
 
    }
 
    primitive sitting_idly_waiting(in<u32> never_providing) {
 
        sync auto a = get(never_providing);
 
    }
 
    composite constructor() {
 
        channel a -> b;
 
        new sitting_idly_waiting(b);
 
        new crashing_and_burning(a);
 
    }").unwrap();
 
    let rt = Runtime::new(1, true, pd).unwrap();
 
    create_component(&rt, "", "constructor", no_args());
 
}
 
\ No newline at end of file
src/runtime2/tests/mod.rs
Show inline comments
 
use crate::protocol::*;
 
use crate::protocol::eval::*;
 
use crate::runtime2::runtime::*;
 
use crate::runtime2::component::{CompCtx, CompPDL};
 

	
 
fn create_component(rt: &Runtime, module_name: &str, routine_name: &str, args: ValueGroup) {
 
mod error_handling;
 

	
 
pub(crate) fn create_component(rt: &Runtime, module_name: &str, routine_name: &str, args: ValueGroup) {
 
    let prompt = rt.inner.protocol.new_component(
 
        module_name.as_bytes(), routine_name.as_bytes(), args
 
    ).expect("create prompt");
 
    let reserved = rt.inner.start_create_pdl_component();
 
    let ctx = CompCtx::new(&reserved);
 
    let component = Box::new(CompPDL::new(prompt, 0));
 
    let (key, _) = rt.inner.finish_create_pdl_component(reserved, component, ctx, false);
 
    rt.inner.enqueue_work(key);
 
}
 

	
 
fn no_args() -> ValueGroup { ValueGroup::new_stack(Vec::new()) }
 
pub(crate) fn no_args() -> ValueGroup { ValueGroup::new_stack(Vec::new()) }
 

	
 
#[test]
 
fn test_component_creation() {
 
    let pd = ProtocolDescription::parse(b"
 
    primitive nothing_at_all() {
 
        s32 a = 5;
 
        auto b = 5 + a;
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(1, true, pd).unwrap();
 

	
 
    for _i in 0..20 {
 
        create_component(&rt, "", "nothing_at_all", no_args());
 
    }
 
}
 

	
 
#[test]
 
fn test_component_communication() {
 
    let pd = ProtocolDescription::parse(b"
 
    primitive sender(out<u32> o, u32 outside_loops, u32 inside_loops) {
 
        u32 outside_index = 0;
 
        while (outside_index < outside_loops) {
 
            u32 inside_index = 0;
 
            sync while (inside_index < inside_loops) {
 
                put(o, inside_index);
 
                inside_index += 1;
 
            }
 
            outside_index += 1;
 
        }
 
    }
 

	
 
    primitive receiver(in<u32> i, u32 outside_loops, u32 inside_loops) {
 
        u32 outside_index = 0;
 
        while (outside_index < outside_loops) {
 
            u32 inside_index = 0;
 
            sync while (inside_index < inside_loops) {
 
                auto val = get(i);
 
                while (val != inside_index) {} // infinite loop if incorrect value is received
 
                inside_index += 1;
 
            }
 
            outside_index += 1;
 
        }
 
    }
 

	
 
    composite constructor() {
 
        channel o_orom -> i_orom;
 
        channel o_mrom -> i_mrom;
 
        channel o_ormm -> i_ormm;
 
        channel o_mrmm -> i_mrmm;
 

	
 
        // one round, one message per round
 
        new sender(o_orom, 1, 1);
 
        new receiver(i_orom, 1, 1);
 

	
 
        // multiple rounds, one message per round
 
        new sender(o_mrom, 5, 1);
 
        new receiver(i_mrom, 5, 1);
 

	
 
        // one round, multiple messages per round
 
        new sender(o_ormm, 1, 5);
 
        new receiver(i_ormm, 1, 5);
 

	
 
        // multiple rounds, multiple messages per round
 
        new sender(o_mrmm, 5, 5);
 
        new receiver(i_mrmm, 5, 5);
 
    }").expect("compilation");
 
    let rt = Runtime::new(3, true, pd).unwrap();
 
    create_component(&rt, "", "constructor", no_args());
 
}
 

	
 
#[test]
 
fn test_intermediate_messenger() {
 
    let pd = ProtocolDescription::parse(b"
 
    primitive receiver<T>(in<T> rx, u32 num) {
 
        auto index = 0;
 
        while (index < num) {
 
            sync { auto v = get(rx); }
 
            index += 1;
 
        }
 
    }
 

	
 
    primitive middleman<T>(in<T> rx, out<T> tx, u32 num) {
 
        auto index = 0;
 
        while (index < num) {
 
            sync { put(tx, get(rx)); }
 
            index += 1;
 
        }
 
    }
 

	
 
    primitive sender<T>(out<T> tx, u32 num) {
 
        auto index = 0;
 
        while (index < num) {
 
            sync put(tx, 1337);
 
            index += 1;
 
        }
 
    }
0 comments (0 inline, 0 general)