Changeset - 11fd959b348a
[Not reviewed]
docs/runtime/sync.md
Show inline comments
 
# Synchronous Communication
 
# Synchronous Communication and Component Orchestration
 

	
 
## 
 
\ No newline at end of file
 
## Introduction
 

	
 
The Reowolf runtime consists of a system that allows multiple components to run within their own thread of execution. These components are able to exchange messages with one another. Components are capable of creating other components, and of creating channels. We may visualise the runtime as a cloud of all kinds of components, connected by the communication channels between them, hence a kind of communication graph.
 

	
 
With this version of the runtime there were several main drivers. For performance reasons we want:
 

	
 
- As little centralized information as possible (because centralization of information implies synchronization to access it).
 
- As much parallelism as possible (when information *must* be exchanged, then make sure as little components are affected as possible).
 

	
 
To keep the complexity of the runtime to a reasonable minimum, the following requirements have to be met as well:
 

	
 
- A component may terminate, therefore potentially not being able to participate in synchronous communication or receive messages. Experimentation showed that the system that ensures that termination is broadcast to all peers should be kept simple (earlier systems depended on a state-machine with assumptions about the order in which messages were exchanged, which greatly complicates the messaging subsystem of the runtime).
 
- Messages should arrive in order (with some exceptions). As we'll see later in this document we have different types of messages. Reasoning about a component's operational state becomes much simpler if we can assume that the transmission of messages between components is ordered.
 

	
 
As we will see there are several types of messages that can be exchanged. Among them we have:
 

	
 
- Data messages: these messages contain the data that is "transmitted from and to PDL code". For each `put` a data message is annotated by the runtime and sent along to the receiving component, which will then hopefully retrieve the data with a `get`. These messages are conceptually sent over channels.
 
- Sync messages: these messages are sent between components to communicate their consensus-state. These messages are not necessarily associated with channels.
 
- Control messages: these messages are sent between components to ensure that the entire runtime is reliably facilitating data exchange. That is: they ensure that the language is working as intended. As an example: sending a port to a different component requires a bit of bookkeeping to ensure that all involved components are aware of the port exchange.
 

	
 
The remainder of this document tries to describe the various technical aspects of synchronous communication and component orchestration.
 

	
 
## Brief Description of Schedulers
 

	
 
Each component conceptually has its own thread of execution. It is executing a program with at any given point in time a particular memory state. In reality there are a limited number of threads that execute components. Making sure that components are scheduled correctly is based on the fact that components are generally executing programs that are blocked at some point: a message needs to be received, or a port is blocked so we cannot send any information. At that point a component is considered "sleeping". Should another component, scheduled on a particular thread, send a message to this sleeping component, then it is "woken up" by putting it into the execution queue.
 

	
 
The job of the scheduler is then to: execute a component scheduled for execution, wait until a component is scheduled, or shut down in case there are no more components to execute.
 

	
 
The details of the execution queue (currently rather simplistically implemented) is not of interest. What is of interest is that a component can only be scheduled once.
 

	
 
## Creation of Channels
 

	
 
Within PDL code it is possible to create new channels. And so a component will always (that is to say: for now) create both endpoints of the channel, hence own both endpoints of the channel upon creation. Identifiers for these ports are generated locally (we don't want to resolve to having to synchronize on some kind of global port ID counter).
 

	
 
As these IDs are generated locally there is no real technical challenge, but note that ports at different components may have the same port ID.
 

	
 
## Creation of Components
 

	
 
Within PDL code it is possible to create components. Upon their creation they can be given endpoints of channels. Hence at component creation we are changing the configuration of the communication graph. All of the relevant components need to be informed about the port changing ownership.
 

	
 
Here we run into a plethora of problems. The other endpoint might have been given away to another created component. The other endpoint may have already been used in communication, such that we already have messages underway for the port we're trying to give to a newly created component. We may have that the local port ID assigned by the creating component is not the same as the local port ID that the newly created component may want to assign to it. We may have that this port has been passed along multiple times already, etc.
 

	
 
We cannot help that messages have already arrived, or are in transit, for the transferred port. But we can make some assumptions that simplify the transfer of ports. As a first one, we have that the creating component decides when the created component is scheduled for execution. We'll choose not to execute it initially, such that we can be sure that it will not send messages over its ports the moment is created. To further simplify the problem, we have assumed that messages arrive in order. So although messages might still be underway for the transferred ports, if we ask the sender to stop sending, and the sender blocks the port and acknowledges that it has received this command. Then the moment the creator receives the acknowledgement it is certain that it has received all messages intended for the transferred ports. We'll also disallow the creation of components during synchronous interactions.
 

	
 
And so here we have our first control protocol. If a port is transferred then we might have:
 

	
 
1. That the peer port is transferred to the new component as well. All is fine and we can take care of the exchange immediately.
 
2. That the peer port stays with the creating component. Here all is fine as well, everything is running in a single thread of execution so we diligently do our bookkeeping on the data associated with the port and the channel and we can transfer the port.
 
3. The peer port is already owned by a different component. Here we need to have a slightly more complicated protocol
 

	
 
In this last case we take the following actions, `C` for creating component, `N` for newly created component, and `P` for the peer component that holds the other port of the same channel.
 

	
 
1. `C` transfers the port to the newly created component `N`, and ask it to come up with a new ID for that port. The port had an ID that was decided by its old owner `C`, and now has one that is agreeable with component `N`.
 
2. `C` sends a control message `PeerChangeBlockPort` to the peer `P`.
 
3. `P` receives the `PeerChangeBlockPort` message. It causes the peer port to be temporarily blocked. `P` may still continue executing its code, but the moment it wishes to send something over this port it is forced to block its execution. In response `P` sends an `Acknowledge` message back to `C`.
 
4. `C` waits for the `Acknowledge` message of `C`. Since the `Acknowledge` message was sent after the last data message that `P` sent to the port under consideration, and because `P` has blocked that port, we are certain that we received all messages. We transfer these messages to `N`.
 
5. Note that there may be multiple ports being transferred from `C` to `N`, each with a potentially different peer. And so here `C` waits until steps 2 through 4 are completed for all of the transferred ports.
 
6. Once `C` has received all of the `Acknowledge` messages it was waiting for, it will send a `PeerChangeUnblockPort` message to each peer `P`. This message contains the new port ID, such that `P` can unblock its port, and continue sending messages over this channel, but now correctly arriving at `N`. Additionally, `C` will now schedule the new component `N`.
 

	
 
There is a bit of extra overhead here with the `PeerChangeBlockPort` -> `Acknowledge` -> `PeerChangeUnblockPort` sequence with respect to other possible schemes. But this one allows `P` to continue executing its code as long as it does not use its blocked port. It also ensures that messages arrive in order: they're all collected by `C`, given to `N`, and only then may `P` continue creating messages to `N`, hence arriving after the earlier messages have been handed off to `N`.
 

	
 
As we'll later introduce, ports can end up being closed. When a port is closed the above procedure is not initiated. A port cannot be reopened, and once a port is closed its peer is also notified that the channel cannot be used anymore. As such, it is not needed (and perhaps impossible, if the memory backing the peer component has already been freed) to notify that the port will change peer.
 

	
 
## Managing the Lifetime of Components
 

	
 
Components are created by other components or by the API. Note that the API may for intents and purposes be viewed as a component. It may own and create channels, and it may create components. Component start, like a function, executing at the top of their program listing and may end in two ways. Either they encounter an unrecoverable error, or they neatly reach the end of their execution.
 

	
 
In either case we want to free up the memory that was occupied by the component during its execution. And we want to make sure that any kind of pending message/action is correctly handled. Apart from that the component contains somekind of message inbox, whose memory is associated with that component. Hence we want to make sure that all peers are aware that they're no longer able to send messages to a terminated component.
 

	
 
A small interlude before we continue: trying to take care of unrecoverable errors that occur during a sync round by incorporating the appropriate logic into the consensus algorithm proved to be rather hard. It caused a large increase in the number of states a component could be in, and as a result made the code much harder to reason about. That is: not so much communicating that an error had ocurred (that needs to occur in every synchronous algorithm), but keeping track of which messages can be sent to which component during the consensus algorithm.
 

	
 
For this reason there are two systems that make sure that components stay alive as long as needed. Firstly components will have a reference counter. For simplicity the component also holds a reference to itself. The component will remove this self-reference when it terminates. Each channel causes two components to also hold references to eachother. *If* a consensus algorithm is implemented such that a central components ends up communicating to all participating parties (using channels or not using channels), then we can make sure that it can reach all participating components by incrementing their reference counts (note that this is not yet properly implemented in the runtime). 
 

	
 
Through this mechanism the consensus algorithm can be greatly simplified. If a component encounters a critical error and is already participated in a sync round, then it can notify the other participants, but remain reachable to all other components until the round ends (and the reference counts are decreased again).
 

	
 
A second system is needed to ensure that a component actually exits (because all the peers hold a reference, and we need all of those references to drop to 0 to truly remove the component from the runtime). And so when a component exits it will send `ClosePort` messages to all of its peers. These peers will `Acknowledge` those messages and close the respective ports, meaning that they will no longer allow sending messages over those ports, that will be a fatal error. However, messages that were sent to the exiting component before receiving the `ClosePort` message might still be underway. And so the terminating component will wait for all of the `Acknowledge` messages for all of its channels such that it knows that it has received all data messages. The component will respond to these intermediate messages with a `DataMessageFailed` message, meaning that the message has been received the moment the component was already terminated, hence the sender should consider this a failed message transmission.
 

	
 
Bringing these systems together: apart from data messages there might still be control messages in transit, or the exiting component might still have some control/sync work to do. And so we need to modify something we said earlier: instead of a component removing its self-reference the moment it terminates, we do this the moment we have received all the `Acknowledge` messages we were expecting. This way if a component is busy creating another component, we're certain that the appropriate protocols are observed. So:
 

	
 
Concluding everything described above, two separate mechanisms will act in conjunction:
 

	
 
- A. The exiting component `E` waiting until it has finished notifying all peers `P` of its termination:
 
  1. Component `E` sends a `ClosePort` message to each peer `P` for each of the shared channels (except when the port is already closed because `P` is also in the process of shutting down).
 
  2. The peer `P` receives the `ClosePort` message and closes the port as a result. This is a change of port state that will cause any subsequent `put` operations on that port to become a fatal error for component `P`. In response the peer `P` sends an `Acknowledge` message to component `E`, unless component `P` exited itself (that is: sending a `ClosePort` message to `E` before the `ClosePort` message from `P` arrived). After closing the port the component `P` will remove the reference to `E`.
 
  3. Component `E` waits until all of its pending control operations are finished (i.e. waiting for the `Acknowledge` messages following `ClosePort` messages, `PeerChangeBlockPort` messages, etc.). Once all of these are finished, note that we can no longer participate in any future control actions: component `E` will not create channels/components itself. Since all of its ports are closed, the peers `P` will also not send any data or control messages.
 
  4. Component `E` now checks its inbox for any remaining messages. It will respond to any data messages that arrived after `E` sending `ClosePort` and before `E` receiving `Acknowledge` with a `DataMessageFailed` message (except for ports that were closed before the `Acknowledge`). Then it removes the reference to itself, therefore decrementing the reference counter for the component by 1.
 

	
 
- B. The reference counting mechanism. Any sync round the exiting component `E` is participating in will conceptually hold a reference to `E`. The component `E` will always respond to sync messages as if it were alive (albeit trying to indicate to everyone that it is actually exiting). The component that removes the last reference to the component `E` (which may be `E` itself, but also a peer `P`) will truly remove the associated memory from the runtime. 
 

	
 
**Note**: We did not consider machine termination. That is to say: once we reach the runtime maturity where communication occurs over different machines, then we have to consider that machines encounter fatal errors. However these can only be resolved by embedding the possibility of failure inside the protocol over which these machines communicate. 
 

	
 
## Sending Data Messages
 

	
 
So far we've discussed the following properties associated with sending data messages:
 

	
 
1. Port IDs are decided locally. So a peer may have an ID that is outdated for the intended recipient.
 
2. Ports can move from owner to owner. So a peer might have a component ID that is outdated for the intended recipient.
 
3. Ports may be closed.
 
4. Message intended for specific ports may end up at an intermediate component that is passing that message along.
 

	
 
However, there are some properties that we can take advantage of:
 

	
 
1. When a component sends a message, it knows for certain what its own component ID and port ID is. So a transmitting port always sends the correct source component/port ID.
 
2. When a component receives a message, it knows for certain what its own component ID and port ID is. So once a receiving port receives a properly annotated message from a transmitted port, the receiving end can be certain about the component IDs and port IDs that make up the channel.
 

	
 
Note that although the message transmitter may not be certain about its final recipient, the components along the way *are* aware of the routing that is necessary to make the message arrive at the intended target. Combing back to the case where we have a creator `C`, new component `N` and peer `P`. Then `P` will send a message intended for `N`, but arriving at `C`. Here `C` can change the target port and component to `N` (as it is in the process of transferring that port, so knows both its original and new port ID). Once the message arrives and is accepted by the recipient then it is certain about the component and port IDs.
 

	
 
## Sending Sync Messages
 

	
 
Sync messages have the purpose of making sure that consensus is reached about the interactions that took place in all of the participating components' sync blocks. The previous runtime featured speculative execution and a branching execution model: a component could exhibit multiple behaviours, and at the end all components decide which combination of local behaviours constitute a satisfying single global behaviour (if one exists at all). Without speculative execution the model can be a lot simpler.
 

	
 
We'll only shortly discuss the mechanisms that are present in the synchronization algorithm. A component has a local counter, that is reset for each synchronous interaction, that is used when transmitting data messages. Such a message will be annotated with the counter at `N`, after which the component sends the next message with annotation `N+1`. At the same time the component will keep track of a local mapping from port ID to port annotation, we'll call this the port mapping. Likewise, when a component receives a data message it will assign the received annotation in its own port mapping. If two components assign the same annotation to the ports that constitute a channel, then there is an agreeable interaction there.
 

	
 
And so at the end of the synchronous round a component will somehow broadcast its port mapping. Note from the discussion above that a transmitting port's annotation is only associated with that transmitting port, since a transmitting port can only truly ever know its own component/port ID. While the receiving port's annotation knows about the peer's component/port ID as well. And so a component can broadcast `(component ID, port ID, mapping)` for each of its transmitting ports, while it can broadcast `(own component ID, own port ID, peer component ID, peer port ID, mapping)` for each receiving port. Then a recipient of these mappings can match them up and make sure that the mappings agree.
 

	
 
Note that this broadcasting of synchronous messages is essentially a component-to-component operation. However these messages must still be sent over ports anyway (and any port that was used to transmit messages to a particular receiving component will do). There are two reasons:
 

	
 
1. The sync message may need to be rerouted (e.g. a sender quickly fires both a data message and a subsequent sync message while the receiving port is being transferred to a new component), but needs to arrive at the same target as the data message. This is essentially restating that a transmitter never knows about the component ID of the recipient.
 
2. The sync message must not be taken into account by the recipient if it has not accepted any messages from the sender yet. Ofcourse this can be achieved in various ways but a simple way to achieve this is to send the sync message over ports.
 

	
 
## Annotating Data Messages
 

	
 
These port mappings are also sent along when sending data messages. We will not go into details but here the mapping makes sure that messages arrive in the right order, and certain kinds of deadlock or inconsistent protocol behaviour may be detected. This port mapping is checked for consistency by the recipient and, when consistent, the target port is updated with its new mapping.
 

	
 
As we'll send along this mapping we will only consider the ports that are shared between the two components. But in the most general case the transmitting ports of the component do not have knowledge about the peer component. And so the sent port mapping will have to contain the annotation for *all* transmitting ports. Receiving port mappings only have to be sent along if they received a message, and here we can indeed apply filtering. Likewise, if the recipient of a port mapping has not yet received anything on its receiving port, then it cannot be sure about the identity of the sender.
 

	
 
This leads to problems both for ensuring the correct ordering of the messages. For finding consensus it is not. Suppose that a port `a` sends a message to a port `b`. Port `b` does not accept it. Upon trying to find consensus we see that `a` will submit an entry in its port mapping, and `b` does not submit anything at all. Hence no solution can be found, as desired.
 

	
 
For the message ordering we require from the receiver that it confirms that, for all of the shared channels, it has the same mapping as the sender sends along. Suppose a component `A` has ports `a_put` and `b_put`, while a component `B` has ports `a_get` and `b_get` (where `a_put -> a_get` and `b_put -> b_get`). Suppose component `A` sends on `a_put` and `b_put` consecutively. And component `B` only receives from `b_get`. Then since `a_get` did not receive from `a_put` (hence did not learn that component/port ID pair of `a_put` is associated with `a_get`), the component `B` cannot figure out that `a_get` should precede a `b_get`. Likewise component `A` has no way to know that `a_put` and `b_put` are sending to the same component, hence it cannot indicate to component `B` that `a_get` should precede `b_get`.
 

	
 
There are some simple assumptions we can make that makes the problem a little bit easier to think about. Suppose again `a_put -> a_get` and `b_put -> b_get`. Suppose `a_put` is used first, where we send along the mapping of `a_put` and `b_put`. Then we send along `b_put`, again sending along the mapping. Then it is possible for the receiving component to accept the wrong message first (e.g. `b_get`, therefore learning about `b`), but it will be impossible to get from `a_get` later, since that one requires `b_put` (of which we learned that it matches `b_get`) to not have sent any messages.
 

	
 
Without adding any extra overhead (e.g. some kind of discovery round per synchronous interaction), we can take three approaches:
 

	
 
1. We simply don't care. It is impossible for a round where messages are received out of order to complete. Hence we temporarily allow a component to take the wrong actions, therefore wasting some CPU time, and to crash/error afterward.
 
2. We remove the entire concept of ordering of channels at a single component. Channels are always independent entities. This way we truly do not have to care. All we care about is that the messages that have been sent over a channel arrive at the other side.
 
3. We slightly modify the algorithm to detect these problems. This can be done in reasonable fashion, albeit a bit "hacky". For each channel there is a slot to receive messages. Messages wait there until the receiver performs a `get` in the PDL code. So far we've only considered learning about the component/port IDs that constitute a channel the moment they're received with a `get`. The algorithm could be changed to already learn about the peer component/port ID the moment the message arrives in the slot.
 

	
 
We'll go with the last option in the current implementation. We return to the problematic example above. Note that messages between components are sent in ordered fashion, and `a_put` happens before `b_put`. Then component `B` will first learn that `a_put` is the peer of `a_get`, then it performs the first `get` on the message from `b_put` to `b_get`. This message is annotated with a port mapping that `a_put` has been used before. We're now able to detect at component `B` that we cannot accept `b_get` before `a_get`.
 

	
 
Concluding:
 

	
 
- Every data message that is transmitted needs to contain the port mapping of all `put`ting ports (annotating them appropriately if they have not yet been used). We also need to include the port mapping of all `get`ting ports that have a pending/received message. The port mapping for `put`ting ports will only include their own ID, the port mapping for `get`ting ports will include the IDs of their peer as well.
 
- Every arriving data message will immediately be used to identify the sender as the peer of the corresponding `get`ter port. Since messages between components arrive in order this allows us to detect when the `put`s are in a different order at the sender as the `get`s at the receiver.
 
\ No newline at end of file
src/protocol/eval/executor.rs
Show inline comments
 

	
 
use std::collections::VecDeque;
 

	
 
use super::value::*;
 
use super::store::*;
 
use super::error::*;
 
use crate::protocol::*;
 
use crate::protocol::ast::*;
 
use crate::protocol::type_table::*;
 

	
 
macro_rules! debug_enabled { () => { false }; }
 
macro_rules! debug_log {
 
    ($format:literal) => {
 
        enabled_debug_print!(false, "exec", $format);
 
    };
 
    ($format:literal, $($args:expr),*) => {
 
        enabled_debug_print!(false, "exec", $format, $($args),*);
 
    };
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub(crate) enum ExprInstruction {
 
    EvalExpr(ExpressionId),
 
    PushValToFront,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub(crate) struct Frame {
 
    pub(crate) definition: ProcedureDefinitionId,
 
    pub(crate) monomorph_type_id: TypeId,
 
    pub(crate) monomorph_index: usize,
 
    pub(crate) position: StatementId,
 
    pub(crate) expr_stack: VecDeque<ExprInstruction>, // hack for expression evaluation, evaluated by popping from back
 
    pub(crate) expr_values: VecDeque<Value>, // hack for expression results, evaluated by popping from front/back
 
    pub(crate) max_stack_size: u32,
 
}
 

	
 
impl Frame {
 
    /// Creates a new execution frame. Does not modify the stack in any way.
 
    pub fn new(heap: &Heap, definition_id: ProcedureDefinitionId, monomorph_type_id: TypeId, monomorph_index: u32) -> Self {
 
    pub fn new(heap: &Heap, definition_id: ProcedureDefinitionId, _monomorph_type_id: TypeId, monomorph_index: u32) -> Self {
 
        let definition = &heap[definition_id];
 
        let outer_scope_id = definition.scope;
 
        let first_statement_id = definition.body;
 

	
 
        // Another not-so-pretty thing that has to be replaced somewhere in the
 
        // future...
 
        fn determine_max_stack_size(heap: &Heap, scope_id: ScopeId, max_size: &mut u32) {
 
            let scope = &heap[scope_id];
 

	
 
            // Check current block
 
            let cur_size = scope.next_unique_id_in_scope as u32;
 
            if cur_size > *max_size { *max_size = cur_size; }
 

	
 
            // And child blocks
 
            for child_scope in &scope.nested {
 
                determine_max_stack_size(heap, *child_scope, max_size);
 
            }
 
        }
 

	
 
        let mut max_stack_size = 0;
 
        determine_max_stack_size(heap, outer_scope_id, &mut max_stack_size);
 

	
 
        Frame{
 
            definition: definition_id,
 
            monomorph_type_id,
 
            monomorph_index: monomorph_index as usize,
 
            position: first_statement_id.upcast(),
 
            expr_stack: VecDeque::with_capacity(128),
 
            expr_values: VecDeque::with_capacity(128),
 
            max_stack_size,
 
        }
 
    }
 

	
 
    /// Prepares a single expression for execution. This involves walking the
 
    /// expression tree and putting them in the `expr_stack` such that
 
    /// continuously popping from its back will evaluate the expression. The
 
    /// results of each expression will be stored by pushing onto `expr_values`.
 
    pub fn prepare_single_expression(&mut self, heap: &Heap, expr_id: ExpressionId) {
 
        debug_assert!(self.expr_stack.is_empty());
 
        self.expr_values.clear(); // May not be empty if last expression result(s) were discarded
 

	
 
        self.serialize_expression(heap, expr_id);
 
    }
 

	
 
    /// Prepares multiple expressions for execution (i.e. evaluating all
 
    /// function arguments or all elements of an array/union literal). Per
 
    /// expression this works the same as `prepare_single_expression`. However
 
    /// after each expression is evaluated we insert a `PushValToFront`
 
    /// instruction
 
    pub fn prepare_multiple_expressions(&mut self, heap: &Heap, expr_ids: &[ExpressionId]) {
 
        debug_assert!(self.expr_stack.is_empty());
 
        self.expr_values.clear();
 

	
 
        for expr_id in expr_ids {
 
            self.expr_stack.push_back(ExprInstruction::PushValToFront);
 
            self.serialize_expression(heap, *expr_id);
 
        }
 
    }
 

	
 
    /// Performs depth-first serialization of expression tree. Let's not care
 
    /// about performance for a temporary runtime implementation
 
    fn serialize_expression(&mut self, heap: &Heap, id: ExpressionId) {
 
        self.expr_stack.push_back(ExprInstruction::EvalExpr(id));
 

	
 
        match &heap[id] {
 
            Expression::Assignment(expr) => {
 
                self.serialize_expression(heap, expr.left);
 
                self.serialize_expression(heap, expr.right);
 
            },
 
            Expression::Binding(expr) => {
 
                self.serialize_expression(heap, expr.bound_to);
 
                self.serialize_expression(heap, expr.bound_from);
 
            },
 
            Expression::Conditional(expr) => {
 
                self.serialize_expression(heap, expr.test);
 
            },
 
            Expression::Binary(expr) => {
 
                self.serialize_expression(heap, expr.left);
 
                self.serialize_expression(heap, expr.right);
 
            },
 
            Expression::Unary(expr) => {
 
                self.serialize_expression(heap, expr.expression);
 
            },
 
            Expression::Indexing(expr) => {
 
                self.serialize_expression(heap, expr.index);
 
                self.serialize_expression(heap, expr.subject);
 
            },
 
            Expression::Slicing(expr) => {
 
                self.serialize_expression(heap, expr.from_index);
 
                self.serialize_expression(heap, expr.to_index);
 
                self.serialize_expression(heap, expr.subject);
 
            },
 
            Expression::Select(expr) => {
 
                self.serialize_expression(heap, expr.subject);
 
            },
 
            Expression::Literal(expr) => {
 
                // Here we only care about literals that have subexpressions
 
                match &expr.value {
 
                    Literal::Null | Literal::True | Literal::False |
 
                    Literal::Character(_) | Literal::String(_) |
 
                    Literal::Integer(_) | Literal::Enum(_) => {
 
                        // No subexpressions
 
                    },
 
                    Literal::Struct(literal) => {
 
                        // Note: fields expressions are evaluated in programmer-
 
                        // specified order. But struct construction expects them
 
                        // in type-defined order. I might want to come back to
 
                        // this.
 
                        let mut _num_pushed = 0;
 
                        for want_field_idx in 0..literal.fields.len() {
 
                            for field in &literal.fields {
 
                                if field.field_idx == want_field_idx {
 
                                    _num_pushed += 1;
 
                                    self.expr_stack.push_back(ExprInstruction::PushValToFront);
 
                                    self.serialize_expression(heap, field.value);
 
                                }
 
                            }
 
                        }
 
                        debug_assert_eq!(_num_pushed, literal.fields.len())
 
                    },
 
                    Literal::Union(literal) => {
 
                        for value_expr_id in &literal.values {
 
                            self.expr_stack.push_back(ExprInstruction::PushValToFront);
 
                            self.serialize_expression(heap, *value_expr_id);
 
                        }
 
                    },
 
                    Literal::Array(value_expr_ids) => {
 
                        for value_expr_id in value_expr_ids {
 
                            self.expr_stack.push_back(ExprInstruction::PushValToFront);
 
                            self.serialize_expression(heap, *value_expr_id);
 
                        }
 
                    },
 
                    Literal::Tuple(value_expr_ids) => {
 
                        for value_expr_id in value_expr_ids {
 
                            self.expr_stack.push_back(ExprInstruction::PushValToFront);
 
                            self.serialize_expression(heap, *value_expr_id);
 
                        }
 
                    }
 
                }
 
            },
 
            Expression::Cast(expr) => {
 
                self.serialize_expression(heap, expr.subject);
 
            }
 
            Expression::Call(expr) => {
 
                for arg_expr_id in &expr.arguments {
 
                    self.expr_stack.push_back(ExprInstruction::PushValToFront);
 
                    self.serialize_expression(heap, *arg_expr_id);
 
                }
 
            },
 
            Expression::Variable(_expr) => {
 
                // No subexpressions
 
            }
 
        }
 
    }
 
}
 

	
 
pub type EvalResult = Result<EvalContinuation, EvalError>;
 

	
 
#[derive(Debug)]
 
pub enum EvalContinuation {
 
    // Returned in both sync and non-sync modes
 
    Stepping,
 
    // Returned only in sync mode
 
    BranchInconsistent,
 
    SyncBlockEnd,
 
    NewFork,
 
    BlockFires(PortId),
 
    BlockGet(PortId),
 
    Put(PortId, ValueGroup),
 
    SelectStart(u32, u32), // (num_cases, num_ports_total)
 
    SelectRegisterPort(u32, u32, PortId), // (case_index, port_index_in_case, port_id)
 
    SelectWait, // wait until select can continue
 
    // Returned only in non-sync mode
 
    ComponentTerminated,
 
    SyncBlockStart,
 
    NewComponent(ProcedureDefinitionId, TypeId, ValueGroup),
 
    NewChannel,
 
}
 

	
 
// Note: cloning is fine, methinks. cloning all values and the heap regions then
 
// we end up with valid "pointers" to heap regions.
 
#[derive(Debug, Clone)]
 
pub struct Prompt {
 
    pub(crate) frames: Vec<Frame>,
 
    pub(crate) store: Store,
 
}
 

	
 
impl Prompt {
 
    pub fn new(types: &TypeTable, heap: &Heap, def: ProcedureDefinitionId, type_id: TypeId, args: ValueGroup) -> Self {
 
        let mut prompt = Self{
 
            frames: Vec::new(),
 
            store: Store::new(),
 
        };
 

	
 
        // Maybe do typechecking in the future?
 
        let monomorph_index = types.get_monomorph(type_id).variant.as_procedure().monomorph_index;
 
        let new_frame = Frame::new(heap, def, type_id, monomorph_index);
 
        let max_stack_size = new_frame.max_stack_size;
 
        prompt.frames.push(new_frame);
 
        args.into_store(&mut prompt.store);
 
        prompt.store.reserve_stack(max_stack_size);
 

	
 
        prompt
 
    }
 

	
 
    /// Big 'ol function right here. Didn't want to break it up unnecessarily.
 
    /// It consists of, in sequence: executing any expressions that should be
 
    /// executed before the next statement can be evaluated, then a section that
 
    /// performs debug printing, and finally a section that takes the next
 
    /// statement and executes it. If the statement requires any expressions to
 
    /// be evaluated, then they will be added such that the next time `step` is
 
    /// called, all of these expressions are indeed evaluated.
 
    pub(crate) fn step(&mut self, types: &TypeTable, heap: &Heap, modules: &[Module], ctx: &mut impl RunContext) -> EvalResult {
 
        // Helper function to transfer multiple values from the expression value
 
        // array into a heap region (e.g. constructing arrays or structs).
 
        fn transfer_expression_values_front_into_heap(cur_frame: &mut Frame, store: &mut Store, num_values: usize) -> HeapPos {
 
            let heap_pos = store.alloc_heap();
src/protocol/parser/pass_rewriting.rs
Show inline comments
 
@@ -78,385 +78,385 @@ impl Visitor for PassRewriting {
 

	
 
    fn visit_if_stmt(&mut self, ctx: &mut Ctx, id: IfStatementId) -> VisitorResult {
 
        let if_stmt = &ctx.heap[id];
 
        let true_case = if_stmt.true_case;
 
        let false_case = if_stmt.false_case;
 

	
 
        self.current_scope = true_case.scope;
 
        self.visit_stmt(ctx, true_case.body)?;
 
        if let Some(false_case) = false_case {
 
            self.current_scope = false_case.scope;
 
            self.visit_stmt(ctx, false_case.body)?;
 
        }
 

	
 
        return Ok(())
 
    }
 

	
 
    fn visit_while_stmt(&mut self, ctx: &mut Ctx, id: WhileStatementId) -> VisitorResult {
 
        let while_stmt = &ctx.heap[id];
 
        let body_id = while_stmt.body;
 
        self.current_scope = while_stmt.scope;
 
        return self.visit_stmt(ctx, body_id);
 
    }
 

	
 
    fn visit_synchronous_stmt(&mut self, ctx: &mut Ctx, id: SynchronousStatementId) -> VisitorResult {
 
        let sync_stmt = &ctx.heap[id];
 
        let body_id = sync_stmt.body;
 
        self.current_scope = sync_stmt.scope;
 
        return self.visit_stmt(ctx, body_id);
 
    }
 

	
 
    // --- Visiting the select statement
 

	
 
    fn visit_select_stmt(&mut self, ctx: &mut Ctx, id: SelectStatementId) -> VisitorResult {
 
        // Utility for the last stage of rewriting process. Note that caller
 
        // still needs to point the end of the if-statement to the end of the
 
        // replacement statement of the select statement.
 
        fn transform_select_case_code(
 
            ctx: &mut Ctx, containing_procedure_id: ProcedureDefinitionId,
 
            select_id: SelectStatementId, case_index: usize,
 
            select_var_id: VariableId, select_var_type_id: TypeIdReference
 
        ) -> (IfStatementId, EndIfStatementId, ScopeId) {
 
            // Retrieve statement IDs associated with case
 
            let case = &ctx.heap[select_id].cases[case_index];
 
            let case_guard_id = case.guard;
 
            let case_body_id = case.body;
 
            let case_scope_id = case.scope;
 

	
 
            // Create the if-statement for the result of the select statement
 
            let compare_expr_id = create_ast_equality_comparison_expr(ctx, containing_procedure_id, select_var_id, select_var_type_id, case_index as u64);
 
            let true_case = IfStatementCase{
 
                body: case_guard_id, // which is linked up to the body
 
                scope: case_scope_id,
 
            };
 
            let (if_stmt_id, end_if_stmt_id) = create_ast_if_stmt(ctx, compare_expr_id.upcast(), true_case, None);
 

	
 
            // Link up body statement to end-if
 
            set_ast_statement_next(ctx, case_body_id, end_if_stmt_id.upcast());
 

	
 
            return (if_stmt_id, end_if_stmt_id, case_scope_id);
 
        }
 

	
 
        // Precreate the block that will end up containing all of the
 
        // transformed statements. Also precreate the scope associated with it
 
        let (outer_block_id, outer_end_block_id, outer_scope_id) =
 
            create_ast_block_stmt(ctx, Vec::new());
 

	
 
        // The "select" and the "end select" statement will act like trampolines
 
        // that jump to the replacement block. So set the child/parent
 
        // relationship already.
 
        // --- for the statements
 
        let select_stmt = &mut ctx.heap[id];
 
        select_stmt.next = outer_block_id.upcast();
 
        let end_select_stmt_id = select_stmt.end_select;
 
        let select_stmt_relative_pos = select_stmt.relative_pos_in_parent;
 

	
 
        let outer_end_block_stmt = &mut ctx.heap[outer_end_block_id];
 
        outer_end_block_stmt.next = end_select_stmt_id.upcast();
 

	
 
        // --- for the scopes
 
        link_new_child_to_existing_parent_scope(ctx, &mut self.scope_buffer, self.current_scope, outer_scope_id, select_stmt_relative_pos);
 

	
 
        // Create statements that will create temporary variables for all of the
 
        // ports passed to the "get" calls in the select case guards.
 
        let select_stmt = &ctx.heap[id];
 
        let total_num_cases = select_stmt.cases.len();
 
        let mut total_num_ports = 0;
 
        let end_select_stmt_id = select_stmt.end_select;
 
        let _end_select = &ctx.heap[end_select_stmt_id];
 

	
 
        // Put heap IDs into temporary buffers to handle borrowing rules
 
        let mut call_id_section = self.call_expr_buffer.start_section();
 
        let mut expr_id_section = self.expression_buffer.start_section();
 

	
 
        for case in select_stmt.cases.iter() {
 
            total_num_ports += case.involved_ports.len();
 
            for (call_id, expr_id) in case.involved_ports.iter().copied() {
 
                call_id_section.push(call_id);
 
                expr_id_section.push(expr_id);
 
            }
 
        }
 

	
 
        // Transform all of the call expressions by takings its argument (the
 
        // port from which we `get`) and turning it into a temporary variable.
 
        let mut transformed_stmts = Vec::with_capacity(total_num_ports); // TODO: Recompute this preallocated length, put assert at the end
 
        let mut locals = Vec::with_capacity(total_num_ports);
 

	
 
        for port_var_idx in 0..call_id_section.len() {
 
            let get_call_expr_id = call_id_section[port_var_idx];
 
            let port_expr_id = expr_id_section[port_var_idx];
 
            let port_type_index = ctx.heap[port_expr_id].type_index();
 
            let port_type_ref = TypeIdReference::IndirectSameAsExpr(port_type_index);
 

	
 
            // Move the port expression such that it gets assigned to a temporary variable
 
            let variable_id = create_ast_variable(ctx, outer_scope_id);
 
            let variable_decl_stmt_id = create_ast_variable_declaration_stmt(ctx, self.current_procedure_id, variable_id, port_type_ref, port_expr_id);
 

	
 
            // Replace the original port expression in the call with a reference
 
            // to the replacement variable
 
            let variable_expr_id = create_ast_variable_expr(ctx, self.current_procedure_id, variable_id, port_type_ref);
 
            let call_expr = &mut ctx.heap[get_call_expr_id];
 
            call_expr.arguments[0] = variable_expr_id.upcast();
 

	
 
            transformed_stmts.push(variable_decl_stmt_id.upcast().upcast());
 
            locals.push((variable_id, port_type_ref));
 
        }
 

	
 
        // Insert runtime calls that facilitate the semantics of the select
 
        // block.
 

	
 
        // Create the call that indicates the start of the select block
 
        {
 
            let num_cases_expression_id = create_ast_literal_integer_expr(ctx, self.current_procedure_id, total_num_cases as u64, ctx.arch.uint32_type_id);
 
            let num_ports_expression_id = create_ast_literal_integer_expr(ctx, self.current_procedure_id, total_num_ports as u64, ctx.arch.uint32_type_id);
 
            let arguments = vec![
 
                num_cases_expression_id.upcast(),
 
                num_ports_expression_id.upcast()
 
            ];
 

	
 
            let call_expression_id = create_ast_call_expr(ctx, self.current_procedure_id, Method::SelectStart, &mut self.expression_buffer, arguments);
 
            let call_statement_id = create_ast_expression_stmt(ctx, call_expression_id.upcast());
 

	
 
            transformed_stmts.push(call_statement_id.upcast());
 
        }
 

	
 
        // Create calls for each select case that will register the ports that
 
        // we are waiting on at the runtime.
 
        {
 
            let mut total_port_index = 0;
 
            for case_index in 0..total_num_cases {
 
                let case = &ctx.heap[id].cases[case_index];
 
                let case_num_ports = case.involved_ports.len();
 

	
 
                for case_port_index in 0..case_num_ports {
 
                    // Arguments to runtime call
 
                    let (port_variable_id, port_variable_type) = locals[total_port_index]; // so far this variable contains the temporary variables for the port expressions
 
                    let case_index_expr_id = create_ast_literal_integer_expr(ctx, self.current_procedure_id, case_index as u64, ctx.arch.uint32_type_id);
 
                    let port_index_expr_id = create_ast_literal_integer_expr(ctx, self.current_procedure_id, case_port_index as u64, ctx.arch.uint32_type_id);
 
                    let port_variable_expr_id = create_ast_variable_expr(ctx, self.current_procedure_id, port_variable_id, port_variable_type);
 
                    let runtime_call_arguments = vec![
 
                        case_index_expr_id.upcast(),
 
                        port_index_expr_id.upcast(),
 
                        port_variable_expr_id.upcast()
 
                    ];
 

	
 
                    // Create runtime call, then store it
 
                    let runtime_call_expr_id = create_ast_call_expr(ctx, self.current_procedure_id, Method::SelectRegisterCasePort, &mut self.expression_buffer, runtime_call_arguments);
 
                    let runtime_call_stmt_id = create_ast_expression_stmt(ctx, runtime_call_expr_id.upcast());
 

	
 
                    transformed_stmts.push(runtime_call_stmt_id.upcast());
 

	
 
                    total_port_index += 1;
 
                }
 
            }
 
        }
 

	
 
        // Create the variable that will hold the result of a completed select
 
        // block. Then create the runtime call that will produce this result
 
        let select_variable_id = create_ast_variable(ctx, outer_scope_id);
 
        let select_variable_type = TypeIdReference::DirectTypeId(ctx.arch.uint32_type_id);
 
        locals.push((select_variable_id, select_variable_type));
 

	
 
        {
 
            let runtime_call_expr_id = create_ast_call_expr(ctx, self.current_procedure_id, Method::SelectWait, &mut self.expression_buffer, Vec::new());
 
            let variable_stmt_id = create_ast_variable_declaration_stmt(ctx, self.current_procedure_id, select_variable_id, select_variable_type, runtime_call_expr_id.upcast());
 
            transformed_stmts.push(variable_stmt_id.upcast().upcast());
 
        }
 

	
 
        call_id_section.forget();
 
        expr_id_section.forget();
 

	
 
        // Now we transform each of the select block case's guard and code into
 
        // a chained if-else statement.
 
        let mut relative_pos = transformed_stmts.len() as i32;
 
        let relative_pos = transformed_stmts.len() as i32;
 
        if total_num_cases > 0 {
 
            let (if_stmt_id, end_if_stmt_id, scope_id) = transform_select_case_code(ctx, self.current_procedure_id, id, 0, select_variable_id, select_variable_type);
 
            link_existing_child_to_new_parent_scope(ctx, &mut self.scope_buffer, outer_scope_id, scope_id, relative_pos);
 
            let first_end_if_stmt = &mut ctx.heap[end_if_stmt_id];
 
            first_end_if_stmt.next = outer_end_block_id.upcast();
 

	
 
            let mut last_if_stmt_id = if_stmt_id;
 
            let mut last_end_if_stmt_id = end_if_stmt_id;
 
            let mut last_parent_scope_id = outer_scope_id;
 
            let mut last_relative_pos = transformed_stmts.len() as i32 + 1;
 
            transformed_stmts.push(last_if_stmt_id.upcast());
 

	
 
            for case_index in 1..total_num_cases {
 
                let (if_stmt_id, end_if_stmt_id, scope_id) = transform_select_case_code(ctx, self.current_procedure_id, id, case_index, select_variable_id, select_variable_type);
 
                let false_case_scope_id = ctx.heap.alloc_scope(|this| Scope::new(this, ScopeAssociation::If(last_if_stmt_id, false)));
 
                link_existing_child_to_new_parent_scope(ctx, &mut self.scope_buffer, false_case_scope_id, scope_id, 0);
 
                link_new_child_to_existing_parent_scope(ctx, &mut self.scope_buffer, last_parent_scope_id, false_case_scope_id, last_relative_pos);
 
                set_ast_if_statement_false_body(ctx, last_if_stmt_id, last_end_if_stmt_id, IfStatementCase{ body: if_stmt_id.upcast(), scope: false_case_scope_id });
 

	
 
                let end_if_stmt = &mut ctx.heap[end_if_stmt_id];
 
                end_if_stmt.next = last_end_if_stmt_id.upcast();
 

	
 
                last_if_stmt_id = if_stmt_id;
 
                last_end_if_stmt_id = end_if_stmt_id;
 
                last_parent_scope_id = false_case_scope_id;
 
                last_relative_pos = 0;
 
            }
 
        }
 

	
 
        // Final steps: set the statements of the replacement block statement,
 
        // link all of those statements together, and update the scopes.
 
        let first_stmt_id = transformed_stmts[0];
 
        let mut last_stmt_id = transformed_stmts[0];
 
        for stmt_id in transformed_stmts.iter().skip(1).copied() {
 
            set_ast_statement_next(ctx, last_stmt_id, stmt_id);
 
            last_stmt_id = stmt_id;
 
        }
 

	
 
        if total_num_cases == 0 {
 
            // If we don't have any cases, then we didn't connect the statements
 
            // up to the end of the outer block, so do that here
 
            set_ast_statement_next(ctx, last_stmt_id, outer_end_block_id.upcast());
 
        }
 

	
 
        let outer_block_stmt = &mut ctx.heap[outer_block_id];
 
        outer_block_stmt.next = first_stmt_id;
 
        outer_block_stmt.statements = transformed_stmts;
 

	
 
        return Ok(())
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Utilities to create compiler-generated AST nodes
 
// -----------------------------------------------------------------------------
 

	
 
#[derive(Clone, Copy)]
 
enum TypeIdReference {
 
    DirectTypeId(TypeId),
 
    IndirectSameAsExpr(i32), // by type index
 
}
 

	
 
fn create_ast_variable(ctx: &mut Ctx, scope_id: ScopeId) -> VariableId {
 
    let variable_id = ctx.heap.alloc_variable(|this| Variable{
 
        this,
 
        kind: VariableKind::Local,
 
        parser_type: ParserType{
 
            elements: Vec::new(),
 
            full_span: InputSpan::new(),
 
        },
 
        identifier: Identifier::new_empty(InputSpan::new()),
 
        relative_pos_in_parent: -1,
 
        unique_id_in_scope: -1,
 
    });
 
    let scope = &mut ctx.heap[scope_id];
 
    scope.variables.push(variable_id);
 

	
 
    return variable_id;
 
}
 

	
 
fn create_ast_variable_expr(ctx: &mut Ctx, containing_procedure_id: ProcedureDefinitionId, variable_id: VariableId, variable_type_id: TypeIdReference) -> VariableExpressionId {
 
    let variable_type_index = add_new_procedure_expression_type(ctx, containing_procedure_id, variable_type_id);
 
    return ctx.heap.alloc_variable_expression(|this| VariableExpression{
 
        this,
 
        identifier: Identifier::new_empty(InputSpan::new()),
 
        declaration: Some(variable_id),
 
        used_as_binding_target: false,
 
        parent: ExpressionParent::None,
 
        type_index: variable_type_index,
 
    });
 
}
 

	
 
fn create_ast_call_expr(ctx: &mut Ctx, containing_procedure_id: ProcedureDefinitionId, method: Method, buffer: &mut ScopedBuffer<ExpressionId>, arguments: Vec<ExpressionId>) -> CallExpressionId {
 
    let call_type_id = match method {
 
        Method::SelectStart => ctx.arch.void_type_id,
 
        Method::SelectRegisterCasePort => ctx.arch.void_type_id,
 
        Method::SelectWait => ctx.arch.uint32_type_id, // TODO: Not pretty, this. Pretty error prone
 
        _ => unreachable!(), // if this goes of, add the appropriate method here.
 
    };
 

	
 
    let expression_ids = buffer.start_section_initialized(&arguments);
 
    let call_type_index = add_new_procedure_expression_type(ctx, containing_procedure_id, TypeIdReference::DirectTypeId(call_type_id));
 
    let call_expression_id = ctx.heap.alloc_call_expression(|this| CallExpression{
 
        func_span: InputSpan::new(),
 
        this,
 
        full_span: InputSpan::new(),
 
        parser_type: ParserType{
 
            elements: Vec::new(),
 
            full_span: InputSpan::new(),
 
        },
 
        method,
 
        arguments,
 
        procedure: ProcedureDefinitionId::new_invalid(),
 
        parent: ExpressionParent::None,
 
        type_index: call_type_index,
 
    });
 

	
 
    for argument_index in 0..expression_ids.len() {
 
        let argument_id = expression_ids[argument_index];
 
        let argument_expr = &mut ctx.heap[argument_id];
 
        *argument_expr.parent_mut() = ExpressionParent::Expression(call_expression_id.upcast(), argument_index as u32);
 
    }
 

	
 
    return call_expression_id;
 
}
 

	
 
fn create_ast_literal_integer_expr(ctx: &mut Ctx, containing_procedure_id: ProcedureDefinitionId, unsigned_value: u64, type_id: TypeId) -> LiteralExpressionId {
 
    let literal_type_index = add_new_procedure_expression_type(ctx, containing_procedure_id, TypeIdReference::DirectTypeId(type_id));
 
    return ctx.heap.alloc_literal_expression(|this| LiteralExpression{
 
        this,
 
        span: InputSpan::new(),
 
        value: Literal::Integer(LiteralInteger{
 
            unsigned_value,
 
            negated: false,
 
        }),
 
        parent: ExpressionParent::None,
 
        type_index: literal_type_index,
 
    });
 
}
 

	
 
fn create_ast_equality_comparison_expr(
 
    ctx: &mut Ctx, containing_procedure_id: ProcedureDefinitionId,
 
    variable_id: VariableId, variable_type: TypeIdReference, value: u64
 
) -> BinaryExpressionId {
 
    let var_expr_id = create_ast_variable_expr(ctx, containing_procedure_id, variable_id, variable_type);
 
    let int_expr_id = create_ast_literal_integer_expr(ctx, containing_procedure_id, value, ctx.arch.uint32_type_id);
 
    let cmp_type_index = add_new_procedure_expression_type(ctx, containing_procedure_id, TypeIdReference::DirectTypeId(ctx.arch.bool_type_id));
 
    let cmp_expr_id = ctx.heap.alloc_binary_expression(|this| BinaryExpression{
 
        this,
 
        operator_span: InputSpan::new(),
 
        full_span: InputSpan::new(),
 
        left: var_expr_id.upcast(),
 
        operation: BinaryOperator::Equality,
 
        right: int_expr_id.upcast(),
 
        parent: ExpressionParent::None,
 
        type_index: cmp_type_index,
 
    });
 

	
 
    let var_expr = &mut ctx.heap[var_expr_id];
 
    var_expr.parent = ExpressionParent::Expression(cmp_expr_id.upcast(), 0);
 
    let int_expr = &mut ctx.heap[int_expr_id];
 
    int_expr.parent = ExpressionParent::Expression(cmp_expr_id.upcast(), 1);
 

	
 
    return cmp_expr_id;
 
}
 

	
 
fn create_ast_expression_stmt(ctx: &mut Ctx, expression_id: ExpressionId) -> ExpressionStatementId {
 
    let statement_id = ctx.heap.alloc_expression_statement(|this| ExpressionStatement{
 
        this,
 
        span: InputSpan::new(),
 
        expression: expression_id,
 
        next: StatementId::new_invalid(),
 
    });
 

	
 
    let expression = &mut ctx.heap[expression_id];
 
    *expression.parent_mut() = ExpressionParent::ExpressionStmt(statement_id);
 

	
 
    return statement_id;
 
}
 

	
 
fn create_ast_variable_declaration_stmt(
 
    ctx: &mut Ctx, containing_procedure_id: ProcedureDefinitionId,
 
    variable_id: VariableId, variable_type: TypeIdReference, initial_value_expr_id: ExpressionId
 
) -> MemoryStatementId {
 
    // Create the assignment expression, assigning the initial value to the variable
 
    let variable_expr_id = create_ast_variable_expr(ctx, containing_procedure_id, variable_id, variable_type);
 
    let void_type_index = add_new_procedure_expression_type(ctx, containing_procedure_id, TypeIdReference::DirectTypeId(ctx.arch.void_type_id));
 
    let assignment_expr_id = ctx.heap.alloc_assignment_expression(|this| AssignmentExpression{
 
        this,
 
        operator_span: InputSpan::new(),
 
        full_span: InputSpan::new(),
 
        left: variable_expr_id.upcast(),
src/protocol/parser/pass_validation_linking.rs
Show inline comments
 
@@ -1104,389 +1104,389 @@ impl Visitor for PassValidationLinking {
 
                &ctx.module().source, span, "cannot assign to the result from a call expression"
 
            ))
 
        }
 

	
 
        // Check whether the method is allowed to be called within the code's
 
        // context (in sync, definition type, etc.)
 
        let mut expecting_wrapping_new_stmt = false;
 
        let mut expecting_primitive_def = false;
 
        let mut expecting_wrapping_sync_stmt = false;
 
        let mut expecting_no_select_stmt = false;
 

	
 
        match call_expr.method {
 
            Method::Get => {
 
                expecting_primitive_def = true;
 
                expecting_wrapping_sync_stmt = true;
 
                if !self.in_select_guard.is_invalid() {
 
                    // In a select guard. Take the argument (i.e. the port we're
 
                    // retrieving from) and add it to the list of involved ports
 
                    // of the guard
 
                    if call_expr.arguments.len() == 1 {
 
                        // We're checking the number of arguments later, for now
 
                        // assume it is correct.
 
                        let argument = call_expr.arguments[0];
 
                        let select_stmt = &mut ctx.heap[self.in_select_guard];
 
                        let select_case = &mut select_stmt.cases[self.in_select_arm as usize];
 
                        select_case.involved_ports.push((id, argument));
 
                    }
 
                }
 
            },
 
            Method::Put => {
 
                expecting_primitive_def = true;
 
                expecting_wrapping_sync_stmt = true;
 
                expecting_no_select_stmt = true;
 
            },
 
            Method::Fires => {
 
                expecting_primitive_def = true;
 
                expecting_wrapping_sync_stmt = true;
 
            },
 
            Method::Create => {},
 
            Method::Length => {},
 
            Method::Assert => {
 
                expecting_wrapping_sync_stmt = true;
 
                expecting_no_select_stmt = true;
 
                if self.proc_kind == ProcedureKind::Function {
 
                    let call_span = call_expr.func_span;
 
                    return Err(ParseError::new_error_str_at_span(
 
                        &ctx.module().source, call_span,
 
                        "assert statement may only occur in components"
 
                    ));
 
                }
 
            },
 
            Method::Print => {},
 
            Method::SelectStart
 
            | Method::SelectRegisterCasePort
 
            | Method::SelectWait => unreachable!(), // not usable by programmer directly
 
            Method::UserFunction => {}
 
            Method::UserComponent => {
 
                expecting_wrapping_new_stmt = true;
 
            },
 
        }
 

	
 
        let call_expr = &mut ctx.heap[id];
 

	
 
        fn get_span_and_name<'a>(ctx: &'a Ctx, id: CallExpressionId) -> (InputSpan, String) {
 
            let call = &ctx.heap[id];
 
            let span = call.func_span;
 
            let name = String::from_utf8_lossy(ctx.module().source.section_at_span(span)).to_string();
 
            return (span, name);
 
        }
 
        if expecting_primitive_def {
 
            if self.proc_kind != ProcedureKind::Primitive {
 
                let (call_span, func_name) = get_span_and_name(ctx, id);
 
                return Err(ParseError::new_error_at_span(
 
                    &ctx.module().source, call_span,
 
                    format!("a call to '{}' may only occur in primitive component definitions", func_name)
 
                ));
 
            }
 
        }
 

	
 
        if expecting_wrapping_sync_stmt {
 
            if self.in_sync.is_invalid() {
 
                let (call_span, func_name) = get_span_and_name(ctx, id);
 
                return Err(ParseError::new_error_at_span(
 
                    &ctx.module().source, call_span,
 
                    format!("a call to '{}' may only occur inside synchronous blocks", func_name)
 
                ))
 
            }
 
        }
 

	
 
        if expecting_no_select_stmt {
 
            if !self.in_select_guard.is_invalid() {
 
                let (call_span, func_name) = get_span_and_name(ctx, id);
 
                return Err(ParseError::new_error_at_span(
 
                    &ctx.module().source, call_span,
 
                    format!("a call to '{}' may not occur in a select statement's guard", func_name)
 
                ));
 
            }
 
        }
 

	
 
        if expecting_wrapping_new_stmt {
 
            if !self.expr_parent.is_new() {
 
                let call_span = call_expr.func_span;
 
                return Err(ParseError::new_error_str_at_span(
 
                    &ctx.module().source, call_span,
 
                    "cannot call a component, it can only be instantiated by using 'new'"
 
                ));
 
            }
 
        } else {
 
            if self.expr_parent.is_new() {
 
                let call_span = call_expr.func_span;
 
                return Err(ParseError::new_error_str_at_span(
 
                    &ctx.module().source, call_span,
 
                    "only components can be instantiated, this is a function"
 
                ));
 
            }
 
        }
 

	
 
        // Check the number of arguments
 
        let call_definition = ctx.types.get_base_definition(&call_expr.procedure.upcast()).unwrap();
 
        let num_expected_args = match &call_definition.definition {
 
            DefinedTypeVariant::Procedure(definition) => definition.arguments.len(),
 
            _ => unreachable!(),
 
        };
 

	
 
        let num_provided_args = call_expr.arguments.len();
 
        if num_provided_args != num_expected_args {
 
            let argument_text = if num_expected_args == 1 { "argument" } else { "arguments" };
 
            let call_span = call_expr.full_span;
 
            return Err(ParseError::new_error_at_span(
 
                &ctx.module().source, call_span, format!(
 
                    "expected {} {}, but {} were provided",
 
                    num_expected_args, argument_text, num_provided_args
 
                )
 
            ));
 
        }
 

	
 
        // Recurse into all of the arguments and set the expression's parent
 
        let upcast_id = id.upcast();
 

	
 
        let section = self.expression_buffer.start_section_initialized(&call_expr.arguments);
 
        let old_expr_parent = self.expr_parent;
 
        call_expr.parent = old_expr_parent;
 

	
 
        for arg_expr_idx in 0..section.len() {
 
            let arg_expr_id = section[arg_expr_idx];
 
            self.expr_parent = ExpressionParent::Expression(upcast_id, arg_expr_idx as u32);
 
            self.visit_expr(ctx, arg_expr_id)?;
 
        }
 

	
 
        section.forget();
 
        self.expr_parent = old_expr_parent;
 

	
 
        Ok(())
 
    }
 

	
 
    fn visit_variable_expr(&mut self, ctx: &mut Ctx, id: VariableExpressionId) -> VisitorResult {
 
        let var_expr = &ctx.heap[id];
 

	
 
        // Check if declaration was already resolved (this occurs for the
 
        // variable expr that is on the LHS of the assignment expr that is
 
        // associated with a variable declaration)
 
        let mut variable_id = var_expr.declaration;
 
        let mut is_binding_target = false;
 

	
 
        // Otherwise try to find it
 
        if variable_id.is_none() {
 
            variable_id = self.find_variable(ctx, self.relative_pos_in_parent, &var_expr.identifier);
 
        }
 

	
 
        // Otherwise try to see if is a variable introduced by a binding expr
 
        let variable_id = if let Some(variable_id) = variable_id {
 
            variable_id
 
        } else {
 
            if self.in_binding_expr.is_invalid() || !self.in_binding_expr_lhs {
 
                return Err(ParseError::new_error_str_at_span(
 
                    &ctx.module().source, var_expr.identifier.span, "unresolved variable"
 
                ));
 
            }
 

	
 
            // This is a binding variable, but it may only appear in very
 
            // specific locations.
 
            let is_valid_binding = match self.expr_parent {
 
                ExpressionParent::Expression(expr_id, idx) => {
 
                    match &ctx.heap[expr_id] {
 
                        Expression::Binding(_binding_expr) => {
 
                            // Nested binding is disallowed, and because of
 
                            // the check above we know we're directly at the
 
                            // LHS of the binding expression
 
                            debug_assert_eq!(_binding_expr.this, self.in_binding_expr);
 
                            debug_assert_eq!(idx, 0);
 
                            true
 
                        }
 
                        Expression::Literal(lit_expr) => {
 
                        Expression::Literal(_lit_expr) => {
 
                            // Only struct, unions, tuples and arrays can
 
                            // have subexpressions, so we're always fine
 
                            dbg_code!({
 
                                match lit_expr.value {
 
                                match _lit_expr.value {
 
                                    Literal::Struct(_) | Literal::Union(_) | Literal::Array(_) | Literal::Tuple(_) => {},
 
                                    _ => unreachable!(),
 
                                }
 
                            });
 

	
 
                            true
 
                        },
 
                        _ => false,
 
                    }
 
                },
 
                _ => {
 
                    false
 
                }
 
            };
 

	
 
            if !is_valid_binding {
 
                let binding_expr = &ctx.heap[self.in_binding_expr];
 
                return Err(ParseError::new_error_str_at_span(
 
                    &ctx.module().source, var_expr.identifier.span,
 
                    "illegal location for binding variable: binding variables may only be nested under a binding expression, or a struct, union or array literal"
 
                ).with_info_at_span(
 
                    &ctx.module().source, binding_expr.operator_span, format!(
 
                        "'{}' was interpreted as a binding variable because the variable is not declared and it is nested under this binding expression",
 
                        var_expr.identifier.value.as_str()
 
                    )
 
                ));
 
            }
 

	
 
            // By now we know that this is a valid binding expression. Given
 
            // that a binding expression must be nested under an if/while
 
            // statement, we now add the variable to the scope associated with
 
            // that statement.
 
            let bound_identifier = var_expr.identifier.clone();
 
            let bound_variable_id = ctx.heap.alloc_variable(|this| Variable {
 
                this,
 
                kind: VariableKind::Binding,
 
                parser_type: ParserType {
 
                    elements: vec![ParserTypeElement {
 
                        element_span: bound_identifier.span,
 
                        variant: ParserTypeVariant::Inferred
 
                    }],
 
                    full_span: bound_identifier.span
 
                },
 
                identifier: bound_identifier,
 
                relative_pos_in_parent: 0,
 
                unique_id_in_scope: -1,
 
            });
 

	
 
            let scope_id = match &ctx.heap[self.in_test_expr] {
 
                Statement::If(stmt) => stmt.true_case.scope,
 
                Statement::While(stmt) => stmt.scope,
 
                _ => unreachable!(),
 
            };
 

	
 
            self.checked_at_single_scope_add_local(ctx, scope_id, -1, bound_variable_id)?; // add at -1 such that first statement can find the variable if needed
 

	
 
            is_binding_target = true;
 
            bound_variable_id
 
        };
 

	
 
        let var_expr = &mut ctx.heap[id];
 
        var_expr.declaration = Some(variable_id);
 
        var_expr.used_as_binding_target = is_binding_target;
 
        var_expr.parent = self.expr_parent;
 

	
 
        Ok(())
 
    }
 
}
 

	
 
impl PassValidationLinking {
 
    //--------------------------------------------------------------------------
 
    // Special traversal
 
    //--------------------------------------------------------------------------
 

	
 
    /// Pushes a new scope associated with a particular statement. If that
 
    /// statement already has an associated scope (i.e. scope associated with
 
    /// sync statement or select statement's arm) then we won't do anything.
 
    /// In all cases the caller must call `pop_statement_scope` with the scope
 
    /// and relative scope position returned by this function.
 
    fn push_scope(&mut self, ctx: &mut Ctx, is_top_level_scope: bool, pushed_scope_id: ScopeId) -> (ScopeId, i32) {
 
        // Set the properties of the pushed scope (it is already created during
 
        // AST construction, but most values are not yet set to their correct
 
        // values)
 
        let old_scope_id = self.cur_scope;
 

	
 
        let scope = &mut ctx.heap[pushed_scope_id];
 
        if !is_top_level_scope {
 
            scope.parent = Some(old_scope_id);
 
        }
 

	
 
        scope.relative_pos_in_parent = self.relative_pos_in_parent;
 
        let old_relative_pos = self.relative_pos_in_parent;
 
        self.relative_pos_in_parent = 0;
 

	
 
        // Link up scopes
 
        if !is_top_level_scope {
 
            let old_scope = &mut ctx.heap[old_scope_id];
 
            old_scope.nested.push(pushed_scope_id);
 
        }
 

	
 
        // Set as current traversal scope, then return old scope
 
        self.cur_scope = pushed_scope_id;
 
        return (old_scope_id, old_relative_pos)
 
    }
 

	
 
    fn pop_scope(&mut self, scope_to_restore: (ScopeId, i32)) {
 
        self.cur_scope = scope_to_restore.0;
 
        self.relative_pos_in_parent = scope_to_restore.1;
 
    }
 

	
 
    fn resolve_pending_control_flow_targets(&mut self, ctx: &mut Ctx) -> Result<(), ParseError> {
 
        for entry in &self.control_flow_stmts {
 
            let stmt = &ctx.heap[entry.statement];
 

	
 
            match stmt {
 
                Statement::Break(stmt) => {
 
                    let stmt_id = stmt.this;
 
                    let target_while_id = Self::resolve_break_or_continue_target(ctx, entry, stmt.span, &stmt.label)?;
 
                    let target_while_stmt = &ctx.heap[target_while_id];
 
                    let target_end_while_id = target_while_stmt.end_while;
 
                    debug_assert!(!target_end_while_id.is_invalid());
 

	
 
                    let break_stmt = &mut ctx.heap[stmt_id];
 
                    break_stmt.target = target_end_while_id;
 
                },
 
                Statement::Continue(stmt) => {
 
                    let stmt_id = stmt.this;
 
                    let target_while_id = Self::resolve_break_or_continue_target(ctx, entry, stmt.span, &stmt.label)?;
 

	
 
                    let continue_stmt = &mut ctx.heap[stmt_id];
 
                    continue_stmt.target = target_while_id;
 
                },
 
                Statement::Goto(stmt) => {
 
                    let stmt_id = stmt.this;
 
                    let target_id = Self::find_label(entry.in_scope, ctx, &stmt.label)?;
 
                    let target_stmt = &ctx.heap[target_id];
 
                    if entry.in_sync != target_stmt.in_sync {
 
                        // Nested sync not allowed. And goto can only go to
 
                        // outer scopes, so we must be escaping from a sync.
 
                        debug_assert!(target_stmt.in_sync.is_invalid());    // target not in sync
 
                        debug_assert!(!entry.in_sync.is_invalid()); // but the goto is in sync
 
                        let goto_stmt = &ctx.heap[stmt_id];
 
                        let sync_stmt = &ctx.heap[entry.in_sync];
 
                        return Err(
 
                            ParseError::new_error_str_at_span(&ctx.module().source, goto_stmt.span, "goto may not escape the surrounding synchronous block")
 
                            .with_info_str_at_span(&ctx.module().source, target_stmt.label.span, "this is the target of the goto statement")
 
                            .with_info_str_at_span(&ctx.module().source, sync_stmt.span, "which will jump past this statement")
 
                        );
 
                    }
 

	
 
                    let goto_stmt = &mut ctx.heap[stmt_id];
 
                    goto_stmt.target = target_id;
 
                },
 
                _ => unreachable!("cannot resolve control flow target for {:?}", stmt),
 
            }
 
        }
 

	
 
        return Ok(())
 
    }
 

	
 
    //--------------------------------------------------------------------------
 
    // Utilities
 
    //--------------------------------------------------------------------------
 

	
 
    /// Adds a local variable to the current scope. It will also annotate the
 
    /// `Local` in the AST with its relative position in the block.
 
    fn checked_add_local(&mut self, ctx: &mut Ctx, target_scope_id: ScopeId, target_relative_pos: i32, new_variable_id: VariableId) -> Result<(), ParseError> {
 
        let new_variable = &ctx.heap[new_variable_id];
 

	
 
        // We immediately go to the parent scope. We check the target scope
 
        // in the call at the end. That is also where we check for collisions
 
        // with symbols.
 
        let mut scope = &ctx.heap[target_scope_id];
 
        let mut cur_relative_pos = scope.relative_pos_in_parent;
 
        while let Some(scope_parent_id) = scope.parent {
 
            scope = &ctx.heap[scope_parent_id];
 

	
 
            // Check for collisions
 
            for variable_id in scope.variables.iter().copied() {
 
                let existing_variable = &ctx.heap[variable_id];
 
                if existing_variable.identifier == new_variable.identifier &&
 
                    existing_variable.this != new_variable_id &&
 
                    cur_relative_pos >= existing_variable.relative_pos_in_parent {
 
                    return Err(
 
                        ParseError::new_error_str_at_span(
 
                            &ctx.module().source, new_variable.identifier.span, "Local variable name conflicts with another variable"
 
                        ).with_info_str_at_span(
 
                            &ctx.module().source, existing_variable.identifier.span, "Previous variable is found here"
 
                        )
 
                    );
 
                }
 
            }
src/protocol/parser/type_table.rs
Show inline comments
 
/**
 
 * type_table.rs
 
 *
 
 * The type table is a lookup from AST definition (which contains just what the
 
 * programmer typed) to a type with additional information computed (e.g. the
 
 * byte size and offsets of struct members). The type table should be considered
 
 * the authoritative source of information on types by the compiler (not the
 
 * AST itself!).
 
 *
 
 * The type table operates in two modes: one is where we just look up the type,
 
 * check its fields for correctness and mark whether it is polymorphic or not.
 
 * The second one is where we compute byte sizes, alignment and offsets.
 
 *
 
 * The basic algorithm for type resolving and computing byte sizes is to
 
 * recursively try to lay out each member type of a particular type. This is
 
 * done in a stack-like fashion, where each embedded type pushes a breadcrumb
 
 * unto the stack. We may discover a cycle in embedded types (we call this a
 
 * "type loop"). After which the type table attempts to break the type loop by
 
 * making specific types heap-allocated. Upon doing so we know their size
 
 * because their stack-size is now based on pointers. Hence breaking the type
 
 * loop required for computing the byte size of types.
 
 *
 
 * The reason for these type shenanigans is because PDL is a value-based
 
 * language, but we would still like to be able to express recursively defined
 
 * types like trees or linked lists. Hence we need to insert pointers somewhere
 
 * to break these cycles.
 
 *
 
 * We will insert these pointers into the variants of unions. However note that
 
 * we can only compute the stack size of a union until we've looked at *all*
 
 * variants. Hence we perform an initial pass where we detect type loops, a
 
 * second pass where we compute the stack sizes of everything, and a third pass
 
 * where we actually compute the size of the heap allocations for unions.
 
 *
 
 * As a final bit of global documentation: non-polymorphic types will always
 
 * have one "monomorph" entry. This contains the non-polymorphic type's memory
 
 * layout.
 
 */
 

	
 
// Programmer note: deduplication of types is currently disabled, see the
 
// @Deduplication key. Tests might fail when it is re-enabled.
 
use std::collections::HashMap;
 
use std::hash::{Hash, Hasher};
 

	
 
use crate::protocol::ast::*;
 
use crate::protocol::parser::symbol_table::SymbolScope;
 
use crate::protocol::input_source::ParseError;
 
use crate::protocol::parser::*;
 

	
 
//------------------------------------------------------------------------------
 
// Defined Types
 
//------------------------------------------------------------------------------
 

	
 
/// Struct wrapping around a potentially polymorphic type. If the type does not
 
/// have any polymorphic arguments then it will not have any monomorphs and
 
/// `is_polymorph` will be set to `false`. A type with polymorphic arguments
 
/// only has `is_polymorph` set to `true` if the polymorphic arguments actually
 
/// appear in the types associated types (function return argument, struct
 
/// field, enum variant, etc.). Otherwise the polymorphic argument is just a
 
/// marker and does not influence the bytesize of the type.
 
#[allow(unused)]
 
pub struct DefinedType {
 
    pub(crate) ast_root: RootId,
 
    pub(crate) ast_definition: DefinitionId,
 
    pub(crate) definition: DefinedTypeVariant,
 
    pub(crate) poly_vars: Vec<PolymorphicVariable>,
 
    pub(crate) is_polymorph: bool,
 
}
 

	
 
pub enum DefinedTypeVariant {
 
    Enum(EnumType),
 
    Union(UnionType),
 
    Struct(StructType),
 
    Procedure(ProcedureType),
 
}
 

	
 
impl DefinedTypeVariant {
 
    pub(crate) fn is_data_type(&self) -> bool {
 
        use DefinedTypeVariant as DTV;
 

	
 
        match self {
 
            DTV::Struct(_) | DTV::Enum(_) | DTV::Union(_) => return true,
 
            DTV::Procedure(_) => return false,
 
        }
 
    }
 

	
 
    pub(crate) fn as_struct(&self) -> &StructType {
 
        match self {
 
            DefinedTypeVariant::Struct(v) => v,
 
            _ => unreachable!()
 
        }
 
    }
 

	
 
    pub(crate) fn as_enum(&self) -> &EnumType {
 
        match self {
 
            DefinedTypeVariant::Enum(v) => v,
 
            _ => unreachable!()
 
        }
 
    }
 

	
 
    pub(crate) fn as_union(&self) -> &UnionType {
 
        match self {
 
            DefinedTypeVariant::Union(v) => v,
 
            _ => unreachable!()
 
        }
 
    }
 
}
 

	
 
pub struct PolymorphicVariable {
 
    pub(crate) identifier: Identifier,
 
    pub(crate) is_in_use: bool, // a polymorphic argument may be defined, but not used by the type definition
 
}
 

	
 
/// `EnumType` is the classical C/C++ enum type. It has various variants with
 
/// an assigned integer value. The integer values may be user-defined,
 
/// compiler-defined, or a mix of the two. If a user assigns the same enum
 
/// value multiple times, we assume the user is an expert and we consider both
 
/// variants to be equal to one another.
 
pub struct EnumType {
 
    pub variants: Vec<EnumVariant>,
 
    pub minimum_tag_value: i64,
 
    pub maximum_tag_value: i64,
 
    pub tag_type: ConcreteType,
 
    pub size: usize,
 
    pub alignment: usize,
 
}
 

	
 
// TODO: Also support maximum u64 value
 
pub struct EnumVariant {
 
    pub identifier: Identifier,
 
    pub value: i64,
 
}
 

	
 
/// `UnionType` is the algebraic datatype (or sum type, or discriminated union).
 
/// A value is an element of the union, identified by its tag, and may contain
 
/// a single subtype.
 
/// For potentially infinite types (i.e. a tree, or a linked list) only unions
 
/// can break the infinite cycle. So when we lay out these unions in memory we
 
/// will reserve enough space on the stack for all union variants that do not
 
/// cause "type loops" (i.e. a union `A` with a variant containing a struct
 
/// `B`). And we will reserve enough space on the heap (and store a pointer in
 
/// the union) for all variants which do cause type loops (i.e. a union `A`
 
/// with a variant to a struct `B` that contains the union `A` again).
 
pub struct UnionType {
 
    pub variants: Vec<UnionVariant>,
 
    pub tag_type: ConcreteType,
 
    pub tag_size: usize,
 
}
 

	
 
pub struct UnionVariant {
 
    pub identifier: Identifier,
 
    pub embedded: Vec<ParserType>, // zero-length does not have embedded values
 
    pub tag_value: i64,
 
}
 

	
 
/// `StructType` is a generic C-like struct type (or record type, or product
 
/// type) type.
 
pub struct StructType {
 
    pub fields: Vec<StructField>,
 
}
 

	
 
pub struct StructField {
 
    pub identifier: Identifier,
 
    pub parser_type: ParserType,
 
}
 

	
 
/// `ProcedureType` is the signature of a procedure/component
 
pub struct ProcedureType {
 
    pub kind: ProcedureKind,
 
    pub return_type: Option<ParserType>,
 
    pub arguments: Vec<ProcedureArgument>,
 
}
 

	
 
pub struct ProcedureArgument {
 
    identifier: Identifier,
 
    parser_type: ParserType,
 
}
 

	
 
/// Represents the data associated with a single expression after type inference
 
/// for a monomorph (or just the normal expression types, if dealing with a
 
/// non-polymorphic function/component).
 
pub struct MonomorphExpression {
 
    // The output type of the expression. Note that for a function it is not the
 
    // function's signature but its return type
 
    pub(crate) expr_type: ConcreteType,
 
    // Has multiple meanings: the field index for select expressions, the
 
    // monomorph index for polymorphic function calls or literals. Negative
 
    // values are never used, but used to catch programming errors.
 
    pub(crate) field_or_monomorph_idx: i32,
 
    pub(crate) type_id: TypeId,
 
}
 

	
 
//------------------------------------------------------------------------------
 
// Type monomorph storage
 
//------------------------------------------------------------------------------
 

	
 
pub(crate) enum MonoTypeVariant {
 
    Builtin, // no extra data, added manually in compiler initialization code
 
    Enum, // no extra data
 
    Struct(StructMonomorph),
 
    Union(UnionMonomorph),
 
    Procedure(ProcedureMonomorph), // functions, components
 
    Tuple(TupleMonomorph),
 
}
 

	
 
impl MonoTypeVariant {
 
    fn as_struct_mut(&mut self) -> &mut StructMonomorph {
 
        match self {
 
            MonoTypeVariant::Struct(v) => v,
 
            _ => unreachable!(),
 
        }
 
    }
 

	
 
    pub(crate) fn as_union(&self) -> &UnionMonomorph {
 
        match self {
 
            MonoTypeVariant::Union(v) => v,
 
            _ => unreachable!(),
 
        }
 
    }
 

	
 
    fn as_union_mut(&mut self) -> &mut UnionMonomorph {
 
        match self {
 
            MonoTypeVariant::Union(v) => v,
 
            _ => unreachable!(),
 
        }
 
    }
 

	
 
    fn as_tuple_mut(&mut self) -> &mut TupleMonomorph {
 
        match self {
 
            MonoTypeVariant::Tuple(v) => v,
 
            _ => unreachable!(),
 
        }
 
    }
 

	
 
    pub(crate) fn as_procedure(&self) -> &ProcedureMonomorph {
 
        match self {
 
            MonoTypeVariant::Procedure(v) => v,
 
            _ => unreachable!(),
 
        }
 
    }
 

	
 
    fn as_procedure_mut(&mut self) -> &mut ProcedureMonomorph {
 
        match self {
 
            MonoTypeVariant::Procedure(v) => v,
 
            _ => unreachable!(),
 
        }
 
    }
 
}
 

	
 
/// Struct monomorph
 
pub struct StructMonomorph {
 
    pub fields: Vec<StructMonomorphField>,
 
}
 
@@ -1266,386 +1267,384 @@ impl TypeTable {
 
                }
 
            };
 

	
 
            // Handle the result of attempting to resolve the current breadcrumb
 
            match resolve_result {
 
                TypeLoopResult::TypeExists => {
 
                    // We finished parsing the type
 
                    self.type_loop_breadcrumbs.pop();
 
                },
 
                TypeLoopResult::PushBreadcrumb(definition_id, concrete_type) => {
 
                    // We recurse into the member type.
 
                    self.type_loop_breadcrumbs[breadcrumb_idx] = breadcrumb;
 
                    self.handle_new_breadcrumb_for_type_loops(arch, definition_id, concrete_type);
 
                },
 
                TypeLoopResult::TypeLoop(first_idx) => {
 
                    // Because we will be modifying breadcrumbs within the
 
                    // type-loop handling code, put back the modified breadcrumb
 
                    self.type_loop_breadcrumbs[breadcrumb_idx] = breadcrumb;
 

	
 
                    // We're in a type loop. Add the type loop
 
                    let mut loop_members = Vec::with_capacity(self.type_loop_breadcrumbs.len() - first_idx);
 
                    let mut contains_union = false;
 

	
 
                    for breadcrumb_idx in first_idx..self.type_loop_breadcrumbs.len() {
 
                        let breadcrumb = &mut self.type_loop_breadcrumbs[breadcrumb_idx];
 
                        let mut is_union = false;
 

	
 
                        // Check if type loop member is a union that may be
 
                        // broken up by moving some of its members to the heap.
 
                        let mono_type = &mut self.mono_types[breadcrumb.type_id.0 as usize];
 
                        if let MonoTypeVariant::Union(union_type) = &mut mono_type.variant {
 
                            // Mark the variant that caused the loop as heap
 
                            // allocated to break the type loop.
 
                            let variant = &mut union_type.variants[breadcrumb.next_member as usize];
 
                            variant.lives_on_heap = true;
 
                            breadcrumb.next_embedded += 1;
 

	
 
                            is_union = true;
 
                            contains_union = true;
 
                        } // else: we don't care about the type for now
 

	
 
                        loop_members.push(TypeLoopEntry{
 
                            type_id: breadcrumb.type_id,
 
                            is_union
 
                        });
 
                    }
 

	
 
                    let new_type_loop = TypeLoop{ members: loop_members };
 
                    if !contains_union {
 
                        // No way to (potentially) break the union. So return a
 
                        // type loop error. This is because otherwise our
 
                        // breadcrumb resolver ends up in an infinite loop.
 
                        return Err(construct_type_loop_error(
 
                            &self.mono_types, &new_type_loop, modules, heap
 
                        ));
 
                    }
 

	
 
                    self.type_loops.push(new_type_loop);
 
                }
 
            }
 
        }
 

	
 
        // All breadcrumbs have been cleared. So now `type_loops` contains all
 
        // of the encountered type loops, and `encountered_types` contains a
 
        // list of all unique monomorphs we encountered.
 

	
 
        // The next step is to figure out if all of the type loops can be
 
        // broken. A type loop can be broken if at least one union exists in the
 
        // loop and that union ended up having variants that are not part of
 
        // a type loop.
 
        fn type_loop_source_span_and_message<'a>(
 
            modules: &'a [Module], heap: &Heap, mono_types: &MonoTypeArray,
 
            definition_id: DefinitionId, mono_type_id: TypeId, index_in_loop: usize
 
        ) -> (&'a InputSource, InputSpan, String) {
 
            // Note: because we will discover the type loop the *first* time we
 
            // instantiate a monomorph with the provided polymorphic arguments
 
            // (not all arguments are actually used in the type). We don't have
 
            // to care about a second instantiation where certain unused
 
            // polymorphic arguments are different.
 
            let mono_type = &mono_types[mono_type_id.0 as usize];
 
            let type_name = mono_type.concrete_type.display_name(heap);
 

	
 
            let message = if index_in_loop == 0 {
 
                format!(
 
                    "encountered an infinitely large type for '{}' (which can be fixed by \
 
                    introducing a union type that has a variant whose embedded types are \
 
                    not part of a type loop, or do not have embedded types)",
 
                    type_name
 
                )
 
            } else if index_in_loop == 1 {
 
                format!("because it depends on the type '{}'", type_name)
 
            } else {
 
                format!("which depends on the type '{}'", type_name)
 
            };
 

	
 
            let ast_definition = &heap[definition_id];
 
            let ast_root_id = ast_definition.defined_in();
 

	
 
            return (
 
                &modules[ast_root_id.index as usize].source,
 
                ast_definition.identifier().span,
 
                message
 
            );
 
        }
 

	
 
        fn construct_type_loop_error(mono_types: &MonoTypeArray, type_loop: &TypeLoop, modules: &[Module], heap: &Heap) -> ParseError {
 
            // Seek first entry to produce parse error. Then continue builder
 
            // pattern. This is the error case so efficiency can go home.
 
            let mut parse_error = None;
 
            let mut next_member_index = 0;
 
            while next_member_index < type_loop.members.len() {
 
                let first_entry = &type_loop.members[next_member_index];
 
                next_member_index += 1;
 

	
 
                // Retrieve definition of first type in loop
 
                let first_mono_type = &mono_types[first_entry.type_id.0 as usize];
 
                let first_definition_id = get_concrete_type_definition(&first_mono_type.concrete_type.parts);
 
                if first_definition_id.is_none() {
 
                    continue;
 
                }
 
                let first_definition_id = first_definition_id.unwrap();
 

	
 
                // Produce error message for first type in loop
 
                let (first_module, first_span, first_message) = type_loop_source_span_and_message(
 
                    modules, heap, mono_types, first_definition_id, first_entry.type_id, 0
 
                );
 
                parse_error = Some(ParseError::new_error_at_span(first_module, first_span, first_message));
 
                break;
 
            }
 

	
 
            let mut parse_error = parse_error.unwrap(); // Loop above cannot have failed, because we must have a type loop, type loops cannot contain only unnamed types
 

	
 
            let mut error_counter = 1;
 
            for member_idx in next_member_index..type_loop.members.len() {
 
                let entry = &type_loop.members[member_idx];
 
                let mono_type = &mono_types[entry.type_id.0 as usize];
 
                let definition_id = get_concrete_type_definition(&mono_type.concrete_type.parts);
 
                if definition_id.is_none() {
 
                    continue;
 
                }
 
                let definition_id = definition_id.unwrap();
 

	
 
                let (module, span, message) = type_loop_source_span_and_message(
 
                    modules, heap, mono_types, definition_id, entry.type_id, error_counter
 
                );
 
                parse_error = parse_error.with_info_at_span(module, span, message);
 
                error_counter += 1;
 
            }
 

	
 
            parse_error
 
        }
 

	
 
        for type_loop in &self.type_loops {
 
            let mut can_be_broken = false;
 
            debug_assert!(!type_loop.members.is_empty());
 

	
 
            for entry in &type_loop.members {
 
                if entry.is_union {
 
                    let mono_type = self.mono_types[entry.type_id.0 as usize].variant.as_union();
 
                    debug_assert!(!mono_type.variants.is_empty()); // otherwise it couldn't be part of the type loop
 
                    let has_stack_variant = mono_type.variants.iter().any(|variant| !variant.lives_on_heap);
 
                    if has_stack_variant {
 
                        can_be_broken = true;
 
                        break;
 
                    }
 
                }
 
            }
 

	
 
            if !can_be_broken {
 
                // Construct a type loop error
 
                return Err(construct_type_loop_error(&self.mono_types, type_loop, modules, heap));
 
            }
 
        }
 

	
 
        // If here, then all type loops have been resolved and we can lay out
 
        // all of the members
 
        self.type_loops.clear();
 

	
 
        return Ok(());
 
    }
 

	
 
    /// Checks if the specified type needs to be resolved (i.e. we need to push
 
    /// a breadcrumb), is already resolved (i.e. we can continue with the next
 
    /// member of the currently considered type) or is in the process of being
 
    /// resolved (i.e. we're in a type loop). Because of borrowing rules we
 
    /// don't do any modifications of internal types here. Hence: if we
 
    /// return `PushBreadcrumb` then call `handle_new_breadcrumb_for_type_loops`
 
    /// to take care of storing the appropriate types.
 
    fn check_member_for_type_loops(
 
        breadcrumbs: &[TypeLoopBreadcrumb], definition_map: &DefinitionMap, mono_type_map: &MonoTypeMap,
 
        mono_key: &mut MonoSearchKey, concrete_type: &ConcreteType
 
    ) -> TypeLoopResult {
 
        use ConcreteTypePart as CTP;
 

	
 
        // Depending on the type, lookup if the type has already been visited
 
        // (i.e. either already has its memory layed out, or is part of a type
 
        // loop because we've already visited the type)
 
        debug_assert!(!concrete_type.parts.is_empty());
 
        let definition_id = if let ConcreteTypePart::Instance(definition_id, _) = concrete_type.parts[0] {
 
            definition_id
 
        } else {
 
            DefinitionId::new_invalid()
 
        };
 

	
 
        Self::set_search_key_to_type(mono_key, definition_map, &concrete_type.parts);
 
        if let Some(type_id) = mono_type_map.get(mono_key).copied() {
 
            for (breadcrumb_idx, breadcrumb) in breadcrumbs.iter().enumerate() {
 
                if breadcrumb.type_id == type_id {
 
                    return TypeLoopResult::TypeLoop(breadcrumb_idx);
 
                }
 
            }
 

	
 
            return TypeLoopResult::TypeExists;
 
        }
 

	
 
        // Type is not yet known, so we need to insert it into the lookup and
 
        // push a new breadcrumb.
 
        return TypeLoopResult::PushBreadcrumb(definition_id, concrete_type.clone());
 
    }
 

	
 
    /// Handles the `PushBreadcrumb` result for a `check_member_for_type_loops`
 
    /// call. Will preallocate entries in the monomorphed type storage (with
 
    /// all memory properties zeroed).
 
    fn handle_new_breadcrumb_for_type_loops(&mut self, arch: &TargetArch, definition_id: DefinitionId, concrete_type: ConcreteType) {
 
        use DefinedTypeVariant as DTV;
 
        use ConcreteTypePart as CTP;
 

	
 
        let mut is_union = false;
 

	
 
        let type_id = match &concrete_type.parts[0] {
 
            // Builtin types
 
            CTP::Void | CTP::Message | CTP::Bool |
 
            CTP::UInt8 | CTP::UInt16 | CTP::UInt32 | CTP::UInt64 |
 
            CTP::SInt8 | CTP::SInt16 | CTP::SInt32 | CTP::SInt64 |
 
            CTP::Character | CTP::String |
 
            CTP::Array | CTP::Slice | CTP::Input | CTP::Output | CTP::Pointer => {
 
                // Insert the entry for the builtin type, we should be able to
 
                // immediately "steal" the size from the preinserted builtins.
 
                let base_type_id = match &concrete_type.parts[0] {
 
                    CTP::Void => arch.void_type_id,
 
                    CTP::Message => arch.message_type_id,
 
                    CTP::Bool => arch.bool_type_id,
 
                    CTP::UInt8 => arch.uint8_type_id,
 
                    CTP::UInt16 => arch.uint16_type_id,
 
                    CTP::UInt32 => arch.uint32_type_id,
 
                    CTP::UInt64 => arch.uint64_type_id,
 
                    CTP::SInt8 => arch.sint8_type_id,
 
                    CTP::SInt16 => arch.sint16_type_id,
 
                    CTP::SInt32 => arch.sint32_type_id,
 
                    CTP::SInt64 => arch.sint64_type_id,
 
                    CTP::Character => arch.char_type_id,
 
                    CTP::String => arch.string_type_id,
 
                    CTP::Array => arch.array_type_id,
 
                    CTP::Slice => arch.slice_type_id,
 
                    CTP::Input => arch.input_type_id,
 
                    CTP::Output => arch.output_type_id,
 
                    CTP::Pointer => arch.pointer_type_id,
 
                    _ => unreachable!(),
 
                };
 
                let base_type = &self.mono_types[base_type_id.0 as usize];
 
                let base_type_size = base_type.size;
 
                let base_type_alignment = base_type.alignment;
 

	
 
                let type_id = TypeId(self.mono_types.len() as i64);
 
                Self::set_search_key_to_type(&mut self.mono_search_key, &self.definition_lookup, &concrete_type.parts);
 
                self.mono_type_lookup.insert(self.mono_search_key.clone(), type_id);
 
                self.mono_types.push(MonoType{
 
                    type_id,
 
                    concrete_type,
 
                    size: base_type_size,
 
                    alignment: base_type_alignment,
 
                    variant: MonoTypeVariant::Builtin
 
                });
 

	
 
                type_id
 
            },
 
            // User-defined types
 
            CTP::Tuple(num_embedded) => {
 
                debug_assert!(definition_id.is_invalid()); // because tuples do not have an associated `DefinitionId`
 
                let mut members = Vec::with_capacity(*num_embedded as usize);
 
                for section in ConcreteTypeIter::new(&concrete_type.parts, 0) {
 
                    members.push(TupleMonomorphMember{
 
                        type_id: TypeId::new_invalid(),
 
                        concrete_type: ConcreteType{ parts: Vec::from(section) },
 
                        size: 0,
 
                        alignment: 0,
 
                        offset: 0
 
                    });
 
                }
 

	
 
                let type_id = TypeId(self.mono_types.len() as i64);
 
                Self::set_search_key_to_tuple(&mut self.mono_search_key, &self.definition_lookup, &concrete_type.parts);
 
                self.mono_type_lookup.insert(self.mono_search_key.clone(), type_id);
 
                self.mono_types.push(MonoType::new_empty(type_id, concrete_type, MonoTypeVariant::Tuple(TupleMonomorph{ members })));
 

	
 
                type_id
 
            },
 
            CTP::Instance(_check_definition_id, _) => {
 
                debug_assert_eq!(definition_id, *_check_definition_id); // because this is how `definition_id` was determined
 

	
 
                Self::set_search_key_to_type(&mut self.mono_search_key, &self.definition_lookup, &concrete_type.parts);
 
                let base_type = self.definition_lookup.get(&definition_id).unwrap();
 
                let type_id = match &base_type.definition {
 
                    DTV::Enum(definition) => {
 
                        // The enum is a bit exceptional in that when we insert
 
                        // it we we will immediately set its size/alignment:
 
                        // there is nothing to compute here.
 
                        debug_assert!(definition.size != 0 && definition.alignment != 0);
 
                        let type_id = TypeId(self.mono_types.len() as i64);
 
                        self.mono_type_lookup.insert(self.mono_search_key.clone(), type_id);
 
                        self.mono_types.push(MonoType::new_empty(type_id, concrete_type, MonoTypeVariant::Enum));
 

	
 
                        let mono_type = &mut self.mono_types[type_id.0 as usize];
 
                        mono_type.size = definition.size;
 
                        mono_type.alignment = definition.alignment;
 

	
 
                        type_id
 
                    },
 
                    DTV::Union(definition) => {
 
                        // Create all the variants with their concrete types
 
                        let mut mono_variants = Vec::with_capacity(definition.variants.len());
 
                        for poly_variant in &definition.variants {
 
                            let mut mono_embedded = Vec::with_capacity(poly_variant.embedded.len());
 
                            for poly_embedded in &poly_variant.embedded {
 
                                let mono_concrete = Self::construct_concrete_type(poly_embedded, &concrete_type);
 
                                mono_embedded.push(UnionMonomorphEmbedded{
 
                                    type_id: TypeId::new_invalid(),
 
                                    concrete_type: mono_concrete,
 
                                    size: 0,
 
                                    alignment: 0,
 
                                    offset: 0
 
                                });
 
                            }
 

	
 
                            mono_variants.push(UnionMonomorphVariant{
 
                                lives_on_heap: false,
 
                                embedded: mono_embedded,
 
                            })
 
                        }
 

	
 
                        let type_id = TypeId(self.mono_types.len() as i64);
 
                        let tag_size = definition.tag_size;
 
                        Self::set_search_key_to_type(&mut self.mono_search_key, &self.definition_lookup, &concrete_type.parts);
 
                        self.mono_type_lookup.insert(self.mono_search_key.clone(), type_id);
 
                        self.mono_types.push(MonoType::new_empty(type_id, concrete_type, MonoTypeVariant::Union(UnionMonomorph{
 
                            variants: mono_variants,
 
                            tag_size,
 
                            heap_size: 0,
 
                            heap_alignment: 0,
 
                        })));
 

	
 
                        is_union = true;
 
                        type_id
 
                    },
 
                    DTV::Struct(definition) => {
 
                        // Create fields
 
                        let mut mono_fields = Vec::with_capacity(definition.fields.len());
 
                        for poly_field in &definition.fields {
 
                            let mono_concrete = Self::construct_concrete_type(&poly_field.parser_type, &concrete_type);
 
                            mono_fields.push(StructMonomorphField{
 
                                type_id: TypeId::new_invalid(),
 
                                concrete_type: mono_concrete,
 
                                size: 0,
 
                                alignment: 0,
 
                                offset: 0
 
                            })
 
                        }
 

	
 
                        let type_id = TypeId(self.mono_types.len() as i64);
 
                        Self::set_search_key_to_type(&mut self.mono_search_key, &self.definition_lookup, &concrete_type.parts);
 
                        self.mono_type_lookup.insert(self.mono_search_key.clone(), type_id);
 
                        self.mono_types.push(MonoType::new_empty(type_id, concrete_type, MonoTypeVariant::Struct(StructMonomorph{
 
                            fields: mono_fields,
 
                        })));
 

	
 
                        type_id
 
                    },
 
                    DTV::Procedure(_) => {
 
                        unreachable!("pushing type resolving breadcrumb for procedure type")
 
                    },
 
                };
 

	
 
                type_id
 
            },
 
            CTP::Function(_, _) | CTP::Component(_, _) => todo!("function pointers"),
 
        };
src/runtime/scheduler.rs
Show inline comments
 
@@ -221,389 +221,389 @@ impl Scheduler {
 
                    },
 
                    ControlContent::Ping => {},
 
                }
 
            } else {
 
                // Not a control message
 
                if scheduled.shutting_down {
 
                    // Since we're shutting down, we just want to respond with a
 
                    // message saying the message did not arrive.
 
                    debug_assert!(scheduled.ctx.inbox.get_next_message_ticket().is_none()); // public inbox should be completely cleared
 
                    self.handle_message_while_shutting_down(message, scheduled);
 
                } else {
 
                    scheduled.ctx.inbox.insert_new(message);
 
                }
 
            }
 
        }
 
    }
 

	
 
    fn handle_message_while_shutting_down(&mut self, message: Message, scheduled: &mut ScheduledConnector) {
 
        let target_port_and_round_number = match message {
 
            Message::Data(msg) => Some((msg.data_header.target_port, msg.sync_header.sync_round)),
 
            Message::SyncComp(_) => None,
 
            Message::SyncPort(msg) => Some((msg.target_port, msg.sync_header.sync_round)),
 
            Message::SyncControl(_) => None,
 
            Message::Control(_) => None,
 
        };
 

	
 
        if let Some((target_port, sync_round)) = target_port_and_round_number {
 
            // This message is aimed at a port, but we're shutting down, so
 
            // notify the peer that its was not received properly.
 
            // (also: since we're shutting down, we're not in sync mode and
 
            // the context contains the definitive set of owned ports)
 
            let port = scheduled.ctx.get_port_by_id(target_port).unwrap();
 
            if port.state == PortState::Open {
 
                let message = SyncControlMessage {
 
                    in_response_to_sync_round: sync_round,
 
                    target_component_id: port.peer_connector,
 
                    content: SyncControlContent::ChannelIsClosed(port.peer_id),
 
                };
 
                self.debug_conn(scheduled.ctx.id, &format!("Sending message to {:?} [shutdown]\n --- {:?}", port.peer_connector, message));
 
                self.runtime.send_message_assumed_alive(port.peer_connector, Message::SyncControl(message));
 
            }
 
        }
 
    }
 

	
 
    /// Handles changes to the context that were made by the component. This is
 
    /// the way (due to Rust's borrowing rules) that we bubble up changes in the
 
    /// component's state that the scheduler needs to know about (e.g. a message
 
    /// that the component wants to send, a port that has been added).
 
    fn handle_changes_in_context(&mut self, scheduled: &mut ScheduledConnector) {
 
        let connector_id = scheduled.ctx.id;
 

	
 
        // Handling any messages that were sent
 
        while let Some(message) = scheduled.ctx.outbox.pop_front() {
 
            let (target_component_id, over_port) = match &message {
 
                Message::Data(content) => {
 
                    // Data messages are always sent to a particular port, and
 
                    // may end up being rerouted.
 
                    let port_desc = scheduled.ctx.get_port_by_id(content.data_header.sending_port).unwrap();
 
                    debug_assert_eq!(port_desc.peer_id, content.data_header.target_port);
 
                    debug_assert_eq!(port_desc.state, PortState::Open); // checked when adding to context
 

	
 
                    (port_desc.peer_connector, true)
 
                },
 
                Message::SyncComp(content) => {
 
                    // Sync messages are always sent to a particular component,
 
                    // the sender must make sure it actually wants to send to
 
                    // the specified component (and is not using an inconsistent
 
                    // component ID associated with a port).
 
                    (content.target_component_id, false)
 
                },
 
                Message::SyncPort(content) => {
 
                    let port_desc = scheduled.ctx.get_port_by_id(content.source_port).unwrap();
 
                    debug_assert_eq!(port_desc.peer_id, content.target_port);
 
                    debug_assert_eq!(port_desc.state, PortState::Open); // checked when adding to context
 

	
 
                    (port_desc.peer_connector, true)
 
                },
 
                Message::SyncControl(_) => unreachable!("component sending 'SyncControl' messages directly"),
 
                Message::Control(_) => unreachable!("component sending 'Control' messages directly"),
 
            };
 

	
 
            self.debug_conn(connector_id, &format!("Sending message to {:?} [outbox, over port: {}] \n --- {:#?}", target_component_id, over_port, message));
 
            if over_port {
 
                self.runtime.send_message_assumed_alive(target_component_id, message);
 
            } else {
 
                self.runtime.send_message_maybe_destroyed(target_component_id, message);
 
            }
 
        }
 

	
 
        while let Some(state_change) = scheduled.ctx.state_changes.pop_front() {
 
            match state_change {
 
                ComponentStateChange::CreatedComponent(component, initial_ports) => {
 
                    // Creating a new component. Need to relinquish control of
 
                    // the ports.
 
                    let new_component_key = self.runtime.create_pdl_component(component, false);
 
                    let new_connector = self.runtime.get_component_private(&new_component_key);
 

	
 
                    // First pass: transfer ports and the associated messages,
 
                    // also count the number of ports that have peers
 
                    let mut num_peers = 0;
 
                    for port_id in initial_ports {
 
                        // Transfer messages associated with the transferred port
 
                        scheduled.ctx.inbox.transfer_messages_for_port(port_id, &mut new_connector.ctx.inbox);
 

	
 
                        // Transfer the port itself
 
                        let port_index = scheduled.ctx.ports.iter()
 
                            .position(|v| v.self_id == port_id)
 
                            .unwrap();
 
                        let port = scheduled.ctx.ports.remove(port_index);
 
                        new_connector.ctx.ports.push(port.clone());
 

	
 
                        if port.state == PortState::Open {
 
                            num_peers += 1;
 
                        }
 
                    }
 

	
 
                    if num_peers == 0 {
 
                        // No peers to notify, so just schedule the component
 
                        self.runtime.push_work(new_component_key);
 
                    } else {
 
                        // Some peers to notify
 
                        let new_component_id = new_component_key.downcast();
 
                        let control_id = scheduled.router.prepare_new_component(new_component_key);
 
                        for port in new_connector.ctx.ports.iter() {
 
                            if port.state == PortState::Closed {
 
                                continue;
 
                            }
 

	
 
                            let control_message = scheduled.router.prepare_changed_port_peer(
 
                                control_id, scheduled.ctx.id,
 
                                port.peer_connector, port.peer_id,
 
                                new_component_id, port.self_id
 
                            );
 
                            self.debug_conn(connector_id, &format!("Sending message to {:?} [newcom]\n --- {:#?}", port.peer_connector, control_message));
 
                            self.runtime.send_message_assumed_alive(port.peer_connector, Message::Control(control_message));
 
                        }
 
                    }
 
                },
 
                ComponentStateChange::CreatedPort(port) => {
 
                    scheduled.ctx.ports.push(port);
 
                },
 
                ComponentStateChange::ChangedPort(port_change) => {
 
                    if port_change.is_acquired {
 
                        scheduled.ctx.ports.push(port_change.port);
 
                    } else {
 
                        let index = scheduled.ctx.ports
 
                            .iter()
 
                            .position(|v| v.self_id == port_change.port.self_id)
 
                            .unwrap();
 
                        scheduled.ctx.ports.remove(index);
 
                    }
 
                }
 
            }
 
        }
 

	
 
        // Finally, check if we just entered or just left a sync region
 
        if scheduled.ctx.changed_in_sync {
 
            if scheduled.ctx.is_in_sync {
 
                // Just entered sync region
 
            } else {
 
                // Just left sync region. So prepare inbox for the next sync
 
                // round
 
                scheduled.ctx.inbox.clear_read_messages();
 
            }
 

	
 
            scheduled.ctx.changed_in_sync = false; // reset flag
 
        }
 
    }
 

	
 
    fn try_go_to_sleep(&self, connector_key: ConnectorKey, connector: &mut ScheduledConnector) {
 
        debug_assert_eq!(connector_key.index, connector.ctx.id.index);
 
        debug_assert_eq!(connector.public.sleeping.load(Ordering::Acquire), false);
 

	
 
        // This is the running connector, and only the running connector may
 
        // decide it wants to sleep again.
 
        connector.public.sleeping.store(true, Ordering::Release);
 

	
 
        // But due to reordering we might have received messages from peers who
 
        // did not consider us sleeping. If so, then we wake ourselves again.
 
        if !connector.public.inbox.is_empty() {
 
            // Try to wake ourselves up (needed because someone might be trying
 
            // the exact same atomic compare-and-swap at this point in time)
 
            let should_wake_up_again = connector.public.sleeping
 
                .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
 
                .is_ok();
 

	
 
            if should_wake_up_again {
 
                self.runtime.push_work(connector_key)
 
            }
 
        }
 
    }
 

	
 
    fn debug(&self, message: &str) {
 
    fn debug(&self, _message: &str) {
 
        // println!("DEBUG [thrd:{:02} conn:  ]: {}", self.scheduler_id, message);
 
    }
 

	
 
    fn debug_conn(&self, conn: ConnectorId, message: &str) {
 
    fn debug_conn(&self, _conn: ConnectorId, _message: &str) {
 
        // println!("DEBUG [thrd:{:02} conn:{:02}]: {}", self.scheduler_id, conn.index, message);
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// ComponentCtx
 
// -----------------------------------------------------------------------------
 

	
 
enum ComponentStateChange {
 
    CreatedComponent(ConnectorPDL, Vec<PortIdLocal>),
 
    CreatedPort(Port),
 
    ChangedPort(ComponentPortChange),
 
}
 

	
 
#[derive(Clone)]
 
pub(crate) struct ComponentPortChange {
 
    pub is_acquired: bool, // otherwise: released
 
    pub port: Port,
 
}
 

	
 
/// The component context (better name may be invented). This was created
 
/// because part of the component's state is managed by the scheduler, and part
 
/// of it by the component itself. When the component starts a sync block or
 
/// exits a sync block the partially managed state by both component and
 
/// scheduler need to be exchanged.
 
pub(crate) struct ComponentCtx {
 
    // Mostly managed by the scheduler
 
    pub(crate) id: ConnectorId,
 
    ports: Vec<Port>,
 
    inbox: Inbox,
 
    // Submitted by the component
 
    is_in_sync: bool,
 
    changed_in_sync: bool,
 
    outbox: VecDeque<Message>,
 
    state_changes: VecDeque<ComponentStateChange>,
 

	
 
    // Workspaces that may be used by components to (generally) prevent
 
    // allocations. Be a good scout and leave it empty after you've used it.
 
    // TODO: Move to scheduler ctx, this is the wrong place
 
    pub workspace_ports: Vec<PortIdLocal>,
 
    pub workspace_branches: Vec<BranchId>,
 
}
 

	
 
impl ComponentCtx {
 
    pub(crate) fn new_empty() -> Self {
 
        return Self{
 
            id: ConnectorId::new_invalid(),
 
            ports: Vec::new(),
 
            inbox: Inbox::new(),
 
            is_in_sync: false,
 
            changed_in_sync: false,
 
            outbox: VecDeque::new(),
 
            state_changes: VecDeque::new(),
 
            workspace_ports: Vec::new(),
 
            workspace_branches: Vec::new(),
 
        };
 
    }
 

	
 
    /// Notify the runtime that the component has created a new component. May
 
    /// only be called outside of a sync block.
 
    pub(crate) fn push_component(&mut self, component: ConnectorPDL, initial_ports: Vec<PortIdLocal>) {
 
        debug_assert!(!self.is_in_sync);
 
        self.state_changes.push_back(ComponentStateChange::CreatedComponent(component, initial_ports));
 
    }
 

	
 
    /// Notify the runtime that the component has created a new port. May only
 
    /// be called outside of a sync block (for ports received during a sync
 
    /// block, pass them when calling `notify_sync_end`).
 
    pub(crate) fn push_port(&mut self, port: Port) {
 
        debug_assert!(!self.is_in_sync);
 
        self.state_changes.push_back(ComponentStateChange::CreatedPort(port))
 
    }
 

	
 
    /// Notify the runtime of an error. Note that this will not perform any
 
    /// special action beyond printing the error. The component is responsible
 
    /// for waiting until it is appropriate to shut down (i.e. being outside
 
    /// of a sync region) and returning the `Exit` scheduling code.
 
    pub(crate) fn push_error(&mut self, error: EvalError) {
 
        println!("ERROR: Component ({}) encountered a critical error:\n{}", self.id.index, error);
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn get_ports(&self) -> &[Port] {
 
        return self.ports.as_slice();
 
    }
 

	
 
    pub(crate) fn get_port_by_id(&self, id: PortIdLocal) -> Option<&Port> {
 
        return self.ports.iter().find(|v| v.self_id == id);
 
    }
 

	
 
    pub(crate) fn get_port_by_channel_id(&self, id: ChannelId) -> Option<&Port> {
 
        return self.ports.iter().find(|v| v.channel_id == id);
 
    }
 

	
 
    fn get_port_mut_by_id(&mut self, id: PortIdLocal) -> Option<&mut Port> {
 
        return self.ports.iter_mut().find(|v| v.self_id == id);
 
    }
 

	
 
    /// Notify that component will enter a sync block. Note that after calling
 
    /// this function you must allow the scheduler to pick up the changes in the
 
    /// context by exiting your code-executing loop, and to continue executing
 
    /// code the next time the scheduler picks up the component.
 
    pub(crate) fn notify_sync_start(&mut self) {
 
        debug_assert!(!self.is_in_sync);
 

	
 
        self.is_in_sync = true;
 
        self.changed_in_sync = true;
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn is_in_sync(&self) -> bool {
 
        return self.is_in_sync;
 
    }
 

	
 
    /// Submit a message for the scheduler to send to the appropriate receiver.
 
    /// May only be called inside of a sync block.
 
    pub(crate) fn submit_message(&mut self, contents: Message) -> Result<(), ()> {
 
        debug_assert!(self.is_in_sync);
 
        if let Some(port_id) = contents.source_port() {
 
            let port_info = self.get_port_by_id(port_id);
 
            let is_valid = match port_info {
 
                Some(port_info) => {
 
                    port_info.state == PortState::Open
 
                },
 
                None => false,
 
            };
 
            if !is_valid {
 
                // We don't own the port
 
                return Err(());
 
            }
 
        }
 

	
 
        self.outbox.push_back(contents);
 
        return Ok(());
 
    }
 

	
 
    /// Notify that component just finished a sync block. Like
 
    /// `notify_sync_start`: drop out of the `Component::Run` function.
 
    pub(crate) fn notify_sync_end(&mut self, changed_ports: &[ComponentPortChange]) {
 
        debug_assert!(self.is_in_sync);
 

	
 
        self.is_in_sync = false;
 
        self.changed_in_sync = true;
 

	
 
        self.state_changes.reserve(changed_ports.len());
 
        for changed_port in changed_ports {
 
            self.state_changes.push_back(ComponentStateChange::ChangedPort(changed_port.clone()));
 
        }
 
    }
 

	
 
    /// Retrieves messages matching a particular port and branch id. But only
 
    /// those messages that have been previously received with
 
    /// `read_next_message`.
 
    pub(crate) fn get_read_data_messages(&self, match_port_id: PortIdLocal) -> MessagesIter {
 
        return self.inbox.get_read_data_messages(match_port_id);
 
    }
 

	
 
    pub(crate) fn get_next_message_ticket(&mut self) -> Option<MessageTicket> {
 
        if !self.is_in_sync { return None; }
 
        return self.inbox.get_next_message_ticket();
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn get_next_message_ticket_even_if_not_in_sync(&mut self) -> Option<MessageTicket> {
 
        return self.inbox.get_next_message_ticket();
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn read_message_using_ticket(&self, ticket: MessageTicket) -> &Message {
 
        return self.inbox.read_message_using_ticket(ticket);
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn take_message_using_ticket(&mut self, ticket: MessageTicket) -> Message {
 
        return self.inbox.take_message_using_ticket(ticket)
 
    }
 

	
 
    /// Puts back a message back into the inbox. The reason being that the
 
    /// message is actually part of the next sync round. This will
 
    pub(crate) fn put_back_message(&mut self, message: Message) {
 
        self.inbox.put_back_message(message);
 
    }
 
}
 

	
 
pub(crate) struct MessagesIter<'a> {
 
    messages: &'a [Message],
 
    next_index: usize,
 
    max_index: usize,
 
    match_port_id: PortIdLocal,
 
}
 

	
 
impl<'a> Iterator for MessagesIter<'a> {
src/runtime2/communication.rs
Show inline comments
 
use crate::protocol::eval::*;
 
use super::runtime::*;
 
use super::component::*;
 

	
 
// -----------------------------------------------------------------------------
 
// Generic types
 
// -----------------------------------------------------------------------------
 

	
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub struct PortId(pub u32);
 

	
 
impl PortId {
 
    /// This value is not significant, it is chosen to make debugging easier: a
 
    /// very large port number is more likely to shine a light on bugs.
 
    pub fn new_invalid() -> Self {
 
        return Self(u32::MAX);
 
    }
 
}
 

	
 
pub struct CompPortIds {
 
    pub comp: CompId,
 
    pub port: PortId,
 
}
 

	
 
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
 
pub enum PortKind {
 
    Putter,
 
    Getter,
 
}
 

	
 
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 
pub enum PortState {
 
    Open,
 
    BlockedDueToPeerChange,
 
    BlockedDueToFullBuffers,
 
    Closed,
 
}
 

	
 
impl PortState {
 
    pub fn is_blocked(&self) -> bool {
 
        match self {
 
            PortState::BlockedDueToPeerChange | PortState::BlockedDueToFullBuffers => true,
 
            PortState::Open | PortState::Closed => false,
 
        }
 
    }
 
}
 

	
 
pub struct Channel {
 
    pub putter_id: PortId,
 
    pub getter_id: PortId,
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Data messages
 
// -----------------------------------------------------------------------------
 

	
 
#[derive(Debug)]
 
pub struct DataMessage {
 
    pub data_header: MessageDataHeader,
 
    pub sync_header: MessageSyncHeader,
 
    pub content: ValueGroup,
 
}
 

	
 
#[derive(Debug)]
 
pub enum PortAnnotationKind {
 
    Getter(PortAnnotationGetter),
 
    Putter(PortAnnotationPutter),
 
}
 

	
 
#[derive(Debug)]
 
pub struct PortAnnotationGetter {
 
    pub self_comp_id: CompId,
 
    pub self_port_id: PortId,
 
    pub peer_comp_id: CompId,
 
    pub peer_port_id: PortId,
 
}
 

	
 
#[derive(Debug)]
 
pub struct PortAnnotationPutter {
 
    pub self_comp_id: CompId,
 
    pub self_port_id: PortId,
 
}
 

	
 
#[derive(Debug)]
 
pub struct MessageDataHeader {
 
    pub expected_mapping: Vec<(PortId, Option<u32>)>,
 
    pub expected_mapping: Vec<(PortAnnotationKind, Option<u32>)>,
 
    pub new_mapping: u32,
 
    pub source_port: PortId,
 
    pub target_port: PortId,
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Sync messages
 
// -----------------------------------------------------------------------------
 

	
 
#[derive(Debug)]
 
pub struct SyncMessage {
 
    pub sync_header: MessageSyncHeader,
 
    pub content: SyncMessageContent,
 
}
 

	
 
#[derive(Debug)]
 
pub enum SyncLocalSolutionEntry {
 
    Putter(SyncSolutionPutterPort),
 
    Getter(SyncSolutionGetterPort),
 
}
 

	
 
pub type SyncLocalSolution = Vec<SyncLocalSolutionEntry>;
 

	
 
/// Getter port in a solution. Upon receiving a message it is certain about who
 
/// its peer is.
 
#[derive(Debug)]
 
pub struct SyncSolutionGetterPort {
 
    pub self_comp_id: CompId,
 
    pub self_port_id: PortId,
 
    pub peer_comp_id: CompId,
 
    pub peer_port_id: PortId,
 
    pub mapping: u32,
 
}
 

	
 
/// Putter port in a solution. A putter may not be certain about who its peer
 
/// component/port is.
 
#[derive(Debug)]
 
pub struct SyncSolutionPutterPort {
 
    pub self_comp_id: CompId,
 
    pub self_port_id: PortId,
 
    pub mapping: u32,
 
}
 

	
 
#[derive(Debug)]
 
pub struct SyncSolutionChannel {
 
    pub putter: Option<SyncSolutionPutterPort>,
 
    pub getter: Option<SyncSolutionGetterPort>,
 
}
 

	
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub enum SyncRoundDecision {
 
    None,
 
    Solution,
 
    Failure,
 
}
 

	
 
#[derive(Debug)]
 
pub struct SyncPartialSolution {
 
    pub channel_mapping: Vec<SyncSolutionChannel>,
 
    pub decision: SyncRoundDecision,
 
}
 

	
 
impl Default for SyncPartialSolution {
 
    fn default() -> Self {
 
        return Self{
 
            channel_mapping: Vec::new(),
 
            decision: SyncRoundDecision::None,
 
        }
 
    }
 
}
 

	
 
#[derive(Debug)]
 
pub enum SyncMessageContent {
 
    NotificationOfLeader,
 
    LocalSolution(CompId, SyncLocalSolution), // local solution of the specified component
 
    PartialSolution(SyncPartialSolution), // partial solution of multiple components
 
    GlobalSolution,
 
    GlobalFailure,
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Control messages
 
// -----------------------------------------------------------------------------
 

	
 
#[derive(Debug)]
 
pub struct ControlMessage {
 
    pub(crate) id: ControlId,
 
    pub sender_comp_id: CompId,
 
    pub target_port_id: Option<PortId>,
 
    pub content: ControlMessageContent,
 
}
 

	
 
#[derive(Copy, Clone, Debug)]
 
pub enum ControlMessageContent {
 
    Ack,
 
    BlockPort(PortId),
 
    UnblockPort(PortId),
 
    ClosePort(PortId),
 
    PortPeerChangedBlock(PortId),
 
    PortPeerChangedUnblock(PortId, CompId),
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Messages (generic)
 
// -----------------------------------------------------------------------------
 

	
 
#[derive(Debug)]
 
pub struct MessageSyncHeader {
 
    pub sync_round: u32,
 
    pub sending_id: CompId,
 
    pub highest_id: CompId,
 
}
 

	
 
#[derive(Debug)]
 
pub enum Message {
 
    Data(DataMessage),
 
    Sync(SyncMessage),
 
    Control(ControlMessage),
 
}
 

	
 
impl Message {
 
    pub(crate) fn target_port(&self) -> Option<PortId> {
 
        match self {
 
            Message::Data(v) =>
 
                return Some(v.data_header.target_port),
 
            Message::Control(v) =>
 
                return v.target_port_id,
 
            Message::Sync(_) =>
 
                return None,
 
        }
 
    }
 

	
 
    pub(crate) fn modify_target_port(&mut self, port_id: PortId) {
 
        match self {
 
            Message::Data(v) =>
 
                v.data_header.target_port = port_id,
 
            Message::Control(v) =>
 
                v.target_port_id = Some(port_id),
 
            Message::Sync(_) => unreachable!(), // should never be called for this message type
 
        }
 
    }
 
}
 

	
 

	
src/runtime2/component/component_context.rs
Show inline comments
 
use crate::runtime2::scheduler::*;
 
use crate::runtime2::runtime::*;
 
use crate::runtime2::communication::*;
 

	
 
#[derive(Debug)]
 
pub struct Port {
 
    pub self_id: PortId,
 
    pub peer_comp_id: CompId, // eventually consistent
 
    pub peer_port_id: PortId, // eventually consistent
 
    pub kind: PortKind,
 
    pub state: PortState,
 
    #[cfg(debug_assertions)] pub(crate) associated_with_peer: bool,
 
}
 

	
 
pub struct Peer {
 
    pub id: CompId,
 
    pub num_associated_ports: u32,
 
    pub(crate) handle: CompHandle,
 
}
 

	
 
/// Port and peer management structure. Will keep a local reference counter to
 
/// the ports associate with peers, additionally manages the atomic reference
 
/// counter associated with the peers' component handles.
 
pub struct CompCtx {
 
    pub id: CompId,
 
    ports: Vec<Port>,
 
    peers: Vec<Peer>,
 
    port_id_counter: u32,
 
}
 

	
 
#[derive(Copy, Clone, PartialEq, Eq)]
 
pub struct LocalPortHandle(PortId);
 

	
 
#[derive(Copy, Clone)]
 
pub struct LocalPeerHandle(CompId);
 

	
 
impl CompCtx {
 
    /// Creates a new component context based on a reserved entry in the
 
    /// component store. This reservation is used such that we already know our
 
    /// assigned ID.
 
    pub(crate) fn new(reservation: &CompReserved) -> Self {
 
        return Self{
 
            id: reservation.id(),
 
            ports: Vec::new(),
 
            peers: Vec::new(),
 
            port_id_counter: 0,
 
        }
 
    }
 

	
 
    /// Creates a new channel that is fully owned by the component associated
 
    /// with this context.
 
    pub(crate) fn create_channel(&mut self) -> Channel {
 
        let putter_id = PortId(self.take_port_id());
 
        let getter_id = PortId(self.take_port_id());
 
        self.ports.push(Port{
 
            self_id: putter_id,
 
            peer_port_id: getter_id,
 
            kind: PortKind::Putter,
 
            state: PortState::Open,
 
            peer_comp_id: self.id,
 
            associated_with_peer: false,
 
            #[cfg(debug_assertions)] associated_with_peer: false,
 
        });
 
        self.ports.push(Port{
 
            self_id: getter_id,
 
            peer_port_id: putter_id,
 
            kind: PortKind::Getter,
 
            state: PortState::Open,
 
            peer_comp_id: self.id,
 
            associated_with_peer: false,
 
            #[cfg(debug_assertions)] associated_with_peer: false,
 
        });
 

	
 
        return Channel{ putter_id, getter_id };
 
    }
 

	
 
    /// Adds a new port. Make sure to call `add_peer` afterwards.
 
    pub(crate) fn add_port(&mut self, peer_comp_id: CompId, peer_port_id: PortId, kind: PortKind, state: PortState) -> LocalPortHandle {
 
        let self_id = PortId(self.take_port_id());
 
        self.ports.push(Port{
 
            self_id, peer_comp_id, peer_port_id, kind, state,
 
            #[cfg(debug_assertions)] associated_with_peer: false,
 
        });
 
        return LocalPortHandle(self_id);
 
    }
 

	
 
    /// Removes a port. Make sure you called `remove_peer` first.
 
    pub(crate) fn remove_port(&mut self, port_handle: LocalPortHandle) -> Port {
 
        let port_index = self.must_get_port_index(port_handle);
 
        let port = self.ports.remove(port_index);
 
        debug_assert!(!port.associated_with_peer);
 
        dbg_code!(assert!(!port.associated_with_peer));
 
        return port;
 
    }
 

	
 
    /// Adds a new peer. This must be called for every port, no matter the
 
    /// component the channel is connected to. If a `CompHandle` is supplied,
 
    /// then it will be used to add the peer. Otherwise it will be retrieved
 
    /// from the runtime using its ID.
 
    pub(crate) fn add_peer(&mut self, port_handle: LocalPortHandle, sched_ctx: &SchedulerCtx, peer_comp_id: CompId, handle: Option<&CompHandle>) {
 
        let self_id = self.id;
 
        let port = self.get_port_mut(port_handle);
 
        debug_assert_eq!(port.peer_comp_id, peer_comp_id);
 
        debug_assert!(!port.associated_with_peer);
 
        dbg_code!(assert!(!port.associated_with_peer));
 
        if !Self::requires_peer_reference(port, self_id, false) {
 
            return;
 
        }
 

	
 
        dbg_code!(port.associated_with_peer = true);
 
        match self.get_peer_index_by_id(peer_comp_id) {
 
            Some(peer_index) => {
 
                let peer = &mut self.peers[peer_index];
 
                peer.num_associated_ports += 1;
 
            },
 
            None => {
 
                let handle = match handle {
 
                    Some(handle) => handle.clone(),
 
                    None => sched_ctx.runtime.get_component_public(peer_comp_id)
 
                };
 
                self.peers.push(Peer{
 
                    id: peer_comp_id,
 
                    num_associated_ports: 1,
 
                    handle,
 
                });
 
            }
 
        }
 
    }
 

	
 
    /// Removes a peer associated with a port.
 
    pub(crate) fn remove_peer(&mut self, sched_ctx: &SchedulerCtx, port_handle: LocalPortHandle, peer_id: CompId, also_remove_if_closed: bool) {
 
        let self_id = self.id;
 
        let port = self.get_port_mut(port_handle);
 
        debug_assert_eq!(port.peer_comp_id, peer_id);
 
        if !Self::requires_peer_reference(port, self_id, also_remove_if_closed) {
 
            return;
 
        }
 

	
 
        debug_assert!(port.associated_with_peer);
 
        dbg_code!(assert!(port.associated_with_peer));
 
        dbg_code!(port.associated_with_peer = false);
 
        let peer_index = self.get_peer_index_by_id(peer_id).unwrap();
 
        let peer = &mut self.peers[peer_index];
 
        peer.num_associated_ports -= 1;
 
        if peer.num_associated_ports == 0 {
 
            let mut peer = self.peers.remove(peer_index);
 
            if let Some(key) = peer.handle.decrement_users() {
 
                debug_assert_ne!(key.downgrade(), self.id); // should be upheld by the code that shuts down a component
 
                sched_ctx.runtime.destroy_component(key);
 
            }
 
        }
 
    }
 

	
 
    pub(crate) fn set_port_state(&mut self, port_handle: LocalPortHandle, new_state: PortState) {
 
        let port_info = self.get_port_mut(port_handle);
 
        debug_assert_ne!(port_info.state, PortState::Closed); // because then we do not expect to change the state
 
        port_info.state = new_state;
 
    }
 

	
 
    pub(crate) fn get_port_handle(&self, port_id: PortId) -> LocalPortHandle {
 
        return LocalPortHandle(port_id);
 
    }
 

	
 
    // should perhaps be revised, used in main inbox
 
    pub(crate) fn get_port_index(&self, port_handle: LocalPortHandle) -> usize {
 
        return self.must_get_port_index(port_handle);
 
    }
 

	
 
    pub(crate) fn get_peer_handle(&self, peer_id: CompId) -> LocalPeerHandle {
 
        return LocalPeerHandle(peer_id);
 
    }
 

	
 
    pub(crate) fn get_port(&self, port_handle: LocalPortHandle) -> &Port {
 
        let index = self.must_get_port_index(port_handle);
 
        return &self.ports[index];
 
    }
 

	
 
    pub(crate) fn get_port_mut(&mut self, port_handle: LocalPortHandle) -> &mut Port {
 
        let index = self.must_get_port_index(port_handle);
 
        return &mut self.ports[index];
 
    }
 

	
 
    pub(crate) fn get_port_by_index_mut(&mut self, index: usize) -> &mut Port {
 
        return &mut self.ports[index];
 
    }
 

	
 
    pub(crate) fn get_peer(&self, peer_handle: LocalPeerHandle) -> &Peer {
 
        let index = self.must_get_peer_index(peer_handle);
 
        return &self.peers[index];
 
    }
 

	
 
    pub(crate) fn get_peer_mut(&mut self, peer_handle: LocalPeerHandle) -> &mut Peer {
 
        let index = self.must_get_peer_index(peer_handle);
 
        return &mut self.peers[index];
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn iter_ports(&self) -> impl Iterator<Item=&Port> {
 
        return self.ports.iter();
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn iter_ports_mut(&mut self) -> impl Iterator<Item=&mut Port> {
 
        return self.ports.iter_mut();
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn iter_peers(&self) -> impl Iterator<Item=&Peer> {
 
        return self.peers.iter();
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn num_ports(&self) -> usize {
 
        return self.ports.len();
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Local utilities
 
    // -------------------------------------------------------------------------
 

	
 
    #[inline]
 
    fn requires_peer_reference(port: &Port, self_id: CompId, required_if_closed: bool) -> bool {
 
        return (port.state != PortState::Closed || required_if_closed) && port.peer_comp_id != self_id;
 
    }
 

	
 
    fn must_get_port_index(&self, handle: LocalPortHandle) -> usize {
 
        for (index, port) in self.ports.iter().enumerate() {
 
            if port.self_id == handle.0 {
 
                return index;
 
            }
 
        }
 

	
 
        unreachable!()
 
    }
 

	
 
    fn must_get_peer_index(&self, handle: LocalPeerHandle) -> usize {
 
        for (index, peer) in self.peers.iter().enumerate() {
 
            if peer.id == handle.0 {
 
                return index;
 
            }
 
        }
 

	
 
        unreachable!()
 
    }
 

	
 
    fn get_peer_index_by_id(&self, comp_id: CompId) -> Option<usize> {
 
        for (index, peer) in self.peers.iter().enumerate() {
 
            if peer.id == comp_id {
 
                return Some(index);
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    fn take_port_id(&mut self) -> u32 {
 
        let port_id = self.port_id_counter;
 
        self.port_id_counter = self.port_id_counter.wrapping_add(1);
 
        return port_id;
 
    }
 
}
 
\ No newline at end of file
src/runtime2/component/component_pdl.rs
Show inline comments
 
use crate::random::Random;
 
use crate::protocol::*;
 
use crate::protocol::ast::ProcedureDefinitionId;
 
use crate::protocol::eval::{
 
    PortId as EvalPortId, Prompt,
 
    ValueGroup, Value,
 
    EvalContinuation, EvalResult, EvalError
 
};
 

	
 
use crate::runtime2::scheduler::SchedulerCtx;
 
use crate::runtime2::communication::*;
 

	
 
use super::component_context::*;
 
use super::control_layer::*;
 
use super::consensus::Consensus;
 

	
 
pub enum CompScheduling {
 
    Immediate,
 
    Requeue,
 
    Sleep,
 
    Exit,
 
}
 

	
 
pub enum ExecStmt {
 
    CreatedChannel((Value, Value)),
 
    PerformedPut,
 
    PerformedGet(ValueGroup),
 
    PerformedSelectWait(u32),
 
    None,
 
}
 

	
 
impl ExecStmt {
 
    fn take(&mut self) -> ExecStmt {
 
        let mut value = ExecStmt::None;
 
        std::mem::swap(self, &mut value);
 
        return value;
 
    }
 

	
 
    fn is_none(&self) -> bool {
 
        match self {
 
            ExecStmt::None => return true,
 
            _ => return false,
 
        }
 
    }
 
}
 

	
 
pub struct ExecCtx {
 
    stmt: ExecStmt,
 
}
 

	
 
impl RunContext for ExecCtx {
 
    fn performed_put(&mut self, _port: EvalPortId) -> bool {
 
        match self.stmt.take() {
 
            ExecStmt::None => return false,
 
            ExecStmt::PerformedPut => return true,
 
            _ => unreachable!(),
 
        }
 
    }
 

	
 
    fn performed_get(&mut self, _port: EvalPortId) -> Option<ValueGroup> {
 
        match self.stmt.take() {
 
            ExecStmt::None => return None,
 
            ExecStmt::PerformedGet(value) => return Some(value),
 
            _ => unreachable!(),
 
        }
 
    }
 

	
 
    fn fires(&mut self, _port: EvalPortId) -> Option<Value> {
 
        todo!("remove fires")
 
    }
 

	
 
    fn performed_fork(&mut self) -> Option<bool> {
 
        todo!("remove fork")
 
    }
 

	
 
    fn created_channel(&mut self) -> Option<(Value, Value)> {
 
        match self.stmt.take() {
 
            ExecStmt::None => return None,
 
            ExecStmt::CreatedChannel(ports) => return Some(ports),
 
            _ => unreachable!(),
 
        }
 
    }
 

	
 
    fn performed_select_wait(&mut self) -> Option<u32> {
 
        match self.stmt.take() {
 
            ExecStmt::None => return None,
 
            ExecStmt::PerformedSelectWait(selected_case) => Some(selected_case),
 
            _v => unreachable!(),
 
        }
 
    }
 
}
 

	
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub(crate) enum Mode {
 
    NonSync, // not in sync mode
 
    Sync, // in sync mode, can interact with other components
 
    SyncEnd, // awaiting a solution, i.e. encountered the end of the sync block
 
    BlockedGet, // blocked because we need to receive a message on a particular port
 
    BlockedPut, // component is blocked because the port is blocked
 
    BlockedSelect, // waiting on message to complete the select statement
 
    StartExit, // temporary state: if encountered then we start the shutdown process
 
    BusyExit, // temporary state: waiting for Acks for all the closed ports
 
    Exit, // exiting: shutdown process started, now waiting until the reference count drops to 0
 
}
 

	
 
impl Mode {
 
    fn is_in_sync_block(&self) -> bool {
 
        use Mode::*;
 

	
 
        match self {
 
            Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect => true,
 
            NonSync | StartExit | BusyExit | Exit => false,
 
        }
 
    }
 
}
 

	
 
struct SelectCase {
 
    involved_ports: Vec<LocalPortHandle>,
 
}
 

	
 
// TODO: @Optimize, flatten cases into single array, have index-pointers to next case
 
struct SelectState {
 
    cases: Vec<SelectCase>,
 
    next_case: u32,
 
    num_cases: u32,
 
    random: Random,
 
    candidates_workspace: Vec<usize>,
 
}
 

	
 
enum SelectDecision {
 
    None,
 
    Case(u32), // contains case index, should be passed along to PDL code
 
}
 

	
 
type InboxMain = Vec<Option<DataMessage>>;
 

	
 
impl SelectState {
 
    fn new() -> Self {
 
        return Self{
 
            cases: Vec::new(),
 
            next_case: 0,
 
            num_cases: 0,
 
            random: Random::new(),
 
            candidates_workspace: Vec::new(),
 
        }
 
    }
 

	
 
    fn handle_select_start(&mut self, num_cases: u32) {
 
        self.cases.clear();
 
        self.next_case = 0;
 
        self.num_cases = num_cases;
 
    }
 

	
 
    /// Register a port as belonging to a particular case. As for correctness of
 
    /// PDL code one cannot register the same port twice, this function might
 
    /// return an error
 
    fn register_select_case_port(&mut self, comp_ctx: &CompCtx, case_index: u32, _port_index: u32, port_id: PortId) -> Result<(), PortId> {
 
        // Retrieve case and port handle
 
        self.ensure_at_case(case_index);
 
        let cur_case = &mut self.cases[case_index as usize];
 
        let port_handle = comp_ctx.get_port_handle(port_id);
 
        debug_assert_eq!(cur_case.involved_ports.len(), _port_index as usize);
 

	
 
        // Make sure port wasn't added before, we disallow having the same port
 
        // in the same select guard twice.
 
        if cur_case.involved_ports.contains(&port_handle) {
 
            return Err(port_id);
 
        }
 

	
 
        cur_case.involved_ports.push(port_handle);
 
        return Ok(());
 
    }
 

	
 
    /// Notification that all ports have been registered and we should now wait
 
    /// until the appropriate messages have come in.
 
    fn handle_select_waiting_point(&mut self, inbox: &InboxMain, comp_ctx: &CompCtx) -> SelectDecision {
 
        if self.num_cases != self.next_case {
 
            // This happens when there are >=1 select cases written at the end
 
            // of the select block.
 
            self.ensure_at_case(self.num_cases - 1);
 
        }
 

	
 
        return self.has_decision(inbox, comp_ctx);
 
    }
 

	
 
    fn handle_updated_inbox(&mut self, inbox: &InboxMain, comp_ctx: &CompCtx) -> SelectDecision {
 
        return self.has_decision(inbox, comp_ctx);
 
    }
 

	
 
    /// Internal helper, pushes empty cases inbetween last case and provided new
 
    /// case index.
 
    fn ensure_at_case(&mut self, new_case_index: u32) {
 
        // Push an empty case for all intermediate cases that were not
 
        // registered with a port.
 
        debug_assert!(new_case_index >= self.next_case && new_case_index < self.num_cases);
 
        for _ in self.next_case..new_case_index + 1 {
 
            self.cases.push(SelectCase{ involved_ports: Vec::new() });
 
        }
 
        self.next_case = new_case_index + 1;
 
    }
 

	
 
    /// Checks if a decision can be reached
 
    fn has_decision(&mut self, inbox: &InboxMain, comp_ctx: &CompCtx) -> SelectDecision {
 
        self.candidates_workspace.clear();
 
        if self.cases.is_empty() {
 
            // If there are no cases then we can immediately reach a "bogus
 
            // decision".
 
            return SelectDecision::Case(0);
 
        }
 

	
 
        // Need to check for valid case
 
        'case_loop: for (case_index, case) in self.cases.iter().enumerate() {
 
            for port_handle in case.involved_ports.iter().copied() {
 
                let port_index = comp_ctx.get_port_index(port_handle);
 
                if inbox[port_index].is_none() {
 
                    // Condition not satisfied
 
                    continue 'case_loop;
 
                }
 
            }
 

	
 
            // If here then the case guard is satisfied
 
            self.candidates_workspace.push(case_index);
 
        }
 

	
 
        if self.candidates_workspace.is_empty() {
 
            return SelectDecision::None;
 
        } else {
 
            let candidate_index = self.random.get_u64() as usize % self.candidates_workspace.len();
 
            return SelectDecision::Case(self.candidates_workspace[candidate_index] as u32);
 
        }
 
    }
 
}
 

	
 
pub(crate) struct CompPDL {
 
    pub mode: Mode,
 
    pub mode_port: PortId, // when blocked on a port
 
    pub mode_value: ValueGroup, // when blocked on a put
 
    select: SelectState,
 
    pub prompt: Prompt,
 
    pub control: ControlLayer,
 
    pub consensus: Consensus,
 
    pub sync_counter: u32,
 
    pub exec_ctx: ExecCtx,
 
    // TODO: Temporary field, simulates future plans of having one storage place
 
    //  reserved per port.
 
    // Should be same length as the number of ports. Corresponding indices imply
 
    // message is intended for that port.
 
    pub inbox_main: InboxMain,
 
    pub inbox_backup: Vec<DataMessage>,
 
}
 

	
 
impl CompPDL {
 
    pub(crate) fn new(initial_state: Prompt, num_ports: usize) -> Self {
 
        let mut inbox_main = Vec::new();
 
        inbox_main.reserve(num_ports);
 
        for _ in 0..num_ports {
 
            inbox_main.push(None);
 
        }
 

	
 
        return Self{
 
            mode: Mode::NonSync,
 
            mode_port: PortId::new_invalid(),
 
            mode_value: ValueGroup::default(),
 
            select: SelectState::new(),
 
            prompt: initial_state,
 
            control: ControlLayer::default(),
 
            consensus: Consensus::new(),
 
            sync_counter: 0,
 
            exec_ctx: ExecCtx{
 
                stmt: ExecStmt::None,
 
            },
 
            inbox_main,
 
            inbox_backup: Vec::new(),
 
        }
 
    }
 

	
 
    pub(crate) fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, mut message: Message) {
 
        sched_ctx.log(&format!("handling message: {:#?}", message));
 
        if let Some(new_target) = self.control.should_reroute(&mut message) {
 
            let mut target = sched_ctx.runtime.get_component_public(new_target);
 
            target.send_message(sched_ctx, message, false); // not waking up: we schedule once we've received all PortPeerChanged Acks
 
            let _should_remove = target.decrement_users();
 
            debug_assert!(_should_remove.is_none());
 
            return;
 
        }
 

	
 
        match message {
 
            Message::Data(message) => {
 
                self.handle_incoming_data_message(sched_ctx, comp_ctx, message);
 
            },
 
            Message::Control(message) => {
 
                self.handle_incoming_control_message(sched_ctx, comp_ctx, message);
 
            },
 
            Message::Sync(message) => {
 
                self.handle_incoming_sync_message(sched_ctx, comp_ctx, message);
 
            }
 
        }
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Running component and handling changes in global component state
 
    // -------------------------------------------------------------------------
 

	
 
    pub(crate) fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result<CompScheduling, EvalError> {
 
        use EvalContinuation as EC;
 

	
 
        sched_ctx.log(&format!("Running component (mode: {:?})", self.mode));
 

	
 
        // Depending on the mode don't do anything at all, take some special
 
        // actions, or fall through and run the PDL code.
 
        match self.mode {
 
            Mode::NonSync | Mode::Sync | Mode::BlockedSelect => {
 
                // continue and run PDL code
 
            },
 
            Mode::SyncEnd | Mode::BlockedGet | Mode::BlockedPut => {
 
                return Ok(CompScheduling::Sleep);
 
            }
 
            Mode::StartExit => {
 
                self.handle_component_exit(sched_ctx, comp_ctx);
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            Mode::BusyExit => {
 
                if self.control.has_acks_remaining() {
 
                    return Ok(CompScheduling::Sleep);
 
                } else {
 
                    self.mode = Mode::Exit;
 
                    return Ok(CompScheduling::Exit);
 
                }
 
            },
 
            Mode::Exit => {
 
                return Ok(CompScheduling::Exit);
 
            }
 
        }
 

	
 
        let run_result = self.execute_prompt(&sched_ctx)?;
 

	
 
        match run_result {
 
            EC::Stepping => unreachable!(), // execute_prompt runs until this is no longer returned
 
            EC::BranchInconsistent | EC::NewFork | EC::BlockFires(_) => todo!("remove these"),
 
            // Results that can be returned in sync mode
 
            EC::SyncBlockEnd => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 
                self.handle_sync_end(sched_ctx, comp_ctx);
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            EC::BlockGet(port_id) => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 
                debug_assert!(self.exec_ctx.stmt.is_none());
 

	
 
                let port_id = port_id_from_eval(port_id);
 
                let port_handle = comp_ctx.get_port_handle(port_id);
 
                let port_index = comp_ctx.get_port_index(port_handle);
 
                if let Some(message) = &self.inbox_main[port_index] {
 
                    // Check if we can actually receive the message
 
                    if self.consensus.try_receive_data_message(sched_ctx, comp_ctx, message) {
 
                        // Message was received. Make sure any blocked peers and
 
                        // pending messages are handled.
 
                        let message = self.inbox_main[port_index].take().unwrap();
 
                        self.handle_received_data_message(sched_ctx, comp_ctx, port_handle);
 

	
 
                        self.exec_ctx.stmt = ExecStmt::PerformedGet(message.content);
 
                        return Ok(CompScheduling::Immediate);
 
                    } else {
 
                        todo!("handle sync failure due to message deadlock");
 
                        return Ok(CompScheduling::Sleep);
 
                    }
 
                } else {
 
                    // We need to wait
 
                    self.mode = Mode::BlockedGet;
 
                    self.mode_port = port_id;
 
                    return Ok(CompScheduling::Sleep);
 
                }
 
            },
 
            EC::Put(port_id, value) => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 
                sched_ctx.log(&format!("Putting value {:?}", value));
 
                let port_id = port_id_from_eval(port_id);
 
                let port_handle = comp_ctx.get_port_handle(port_id);
 
                let port_info = comp_ctx.get_port(port_handle);
 
                if port_info.state.is_blocked() {
 
                    self.mode = Mode::BlockedPut;
 
                    self.mode_port = port_id;
 
                    self.mode_value = value;
 
                    self.exec_ctx.stmt = ExecStmt::PerformedPut; // prepare for when we become unblocked
 
                    return Ok(CompScheduling::Sleep);
 
                } else {
 
                    self.send_data_message_and_wake_up(sched_ctx, comp_ctx, port_handle, value);
 
                    self.exec_ctx.stmt = ExecStmt::PerformedPut;
 
                    return Ok(CompScheduling::Immediate);
 
                }
 
            },
 
            EC::SelectStart(num_cases, _num_ports) => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 
                self.select.handle_select_start(num_cases);
 
                return Ok(CompScheduling::Requeue);
 
            },
 
            EC::SelectRegisterPort(case_index, port_index, port_id) => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 
                let port_id = port_id_from_eval(port_id);
 
                if let Err(_err) = self.select.register_select_case_port(comp_ctx, case_index, port_index, port_id) {
 
                    todo!("handle registering a port multiple times");
 
                }
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            EC::SelectWait => {
 
                debug_assert_eq!(self.mode, Mode::Sync);
 
                let select_decision = self.select.handle_select_waiting_point(&self.inbox_main, comp_ctx);
 
                if let SelectDecision::Case(case_index) = select_decision {
 
                    // Reached a conclusion, so we can continue immediately
 
                    self.exec_ctx.stmt = ExecStmt::PerformedSelectWait(case_index);
 
                    self.mode = Mode::Sync;
 
                    return Ok(CompScheduling::Immediate);
 
                } else {
 
                    // No decision yet
 
                    self.mode = Mode::BlockedSelect;
 
                    return Ok(CompScheduling::Sleep);
 
                }
 
            },
 
            // Results that can be returned outside of sync mode
 
            EC::ComponentTerminated => {
 
                self.mode = Mode::StartExit; // next call we'll take care of the exit
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            EC::SyncBlockStart => {
 
                debug_assert_eq!(self.mode, Mode::NonSync);
 
                self.handle_sync_start(sched_ctx, comp_ctx);
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            EC::NewComponent(definition_id, type_id, arguments) => {
 
                debug_assert_eq!(self.mode, Mode::NonSync);
 
                self.create_component_and_transfer_ports(
 
                    sched_ctx, comp_ctx,
 
                    definition_id, type_id, arguments
 
                );
 
                return Ok(CompScheduling::Requeue);
 
            },
 
            EC::NewChannel => {
 
                debug_assert_eq!(self.mode, Mode::NonSync);
 
                debug_assert!(self.exec_ctx.stmt.is_none());
 
                let channel = comp_ctx.create_channel();
 
                self.exec_ctx.stmt = ExecStmt::CreatedChannel((
 
                    Value::Output(port_id_to_eval(channel.putter_id)),
 
                    Value::Input(port_id_to_eval(channel.getter_id))
 
                ));
 
                self.inbox_main.push(None);
 
                self.inbox_main.push(None);
 
                return Ok(CompScheduling::Immediate);
 
            }
 
        }
 
    }
 

	
 
    fn execute_prompt(&mut self, sched_ctx: &SchedulerCtx) -> EvalResult {
 
        let mut step_result = EvalContinuation::Stepping;
 
        while let EvalContinuation::Stepping = step_result {
 
            step_result = self.prompt.step(
 
                &sched_ctx.runtime.protocol.types, &sched_ctx.runtime.protocol.heap,
 
                &sched_ctx.runtime.protocol.modules, &mut self.exec_ctx,
 
            )?;
 
        }
 

	
 
        return Ok(step_result)
 
    }
 

	
 
    fn handle_sync_start(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        sched_ctx.log("Component starting sync mode");
 
        self.consensus.notify_sync_start(comp_ctx);
 
        for message in self.inbox_main.iter() {
 
            if let Some(message) = message {
 
                self.consensus.handle_new_data_message(comp_ctx, message);
 
            }
 
        }
 
        debug_assert_eq!(self.mode, Mode::NonSync);
 
        self.mode = Mode::Sync;
 
    }
 

	
 
    /// Handles end of sync. The conclusion to the sync round might arise
 
    /// immediately (and be handled immediately), or might come later through
 
    /// messaging. In any case the component should be scheduled again
 
    /// immediately
 
    fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        sched_ctx.log("Component ending sync mode (now waiting for solution)");
 
        let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx);
 
        self.mode = Mode::SyncEnd;
 
        self.handle_sync_decision(sched_ctx, comp_ctx, decision);
 
    }
 

	
 
    /// Handles decision from the consensus round. This will cause a change in
 
    /// the internal `Mode`, such that the next call to `run` can take the
 
    /// appropriate next steps.
 
    fn handle_sync_decision(&mut self, sched_ctx: &SchedulerCtx, _comp_ctx: &mut CompCtx, decision: SyncRoundDecision) {
 
        sched_ctx.log(&format!("Handling sync decision: {:?} (in mode {:?})", decision, self.mode));
 
        let is_success = match decision {
 
            SyncRoundDecision::None => {
 
                // No decision yet
 
                return;
 
            },
 
            SyncRoundDecision::Solution => true,
 
            SyncRoundDecision::Failure => false,
 
        };
 

	
 
        // If here then we've reached a decision
 
        debug_assert_eq!(self.mode, Mode::SyncEnd);
 
        if is_success {
 
            self.mode = Mode::NonSync;
 
            self.consensus.notify_sync_decision(decision);
 
        } else {
 
            self.mode = Mode::StartExit;
 
        }
 
    }
 

	
 
    fn handle_component_exit(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        sched_ctx.log("Component exiting");
 
        debug_assert_eq!(self.mode, Mode::StartExit);
 
        self.mode = Mode::BusyExit;
 

	
 
        // Doing this by index, then retrieving the handle is a bit rediculous,
 
        // but Rust is being Rust with its borrowing rules.
 
        for port_index in 0..comp_ctx.num_ports() {
 
            let port = comp_ctx.get_port_by_index_mut(port_index);
 
            if port.state == PortState::Closed {
 
                // Already closed, or in the process of being closed
 
                continue;
 
            }
 

	
 
            // Mark as closed
 
            let port_id = port.self_id;
 
            port.state = PortState::Closed;
 

	
 
            // Notify peer of closing
 
            let port_handle = comp_ctx.get_port_handle(port_id);
 
            let (peer, message) = self.control.initiate_port_closing(port_handle, comp_ctx);
 
            let peer_info = comp_ctx.get_peer(peer);
 
            peer_info.handle.send_message(sched_ctx, Message::Control(message), true);
 
        }
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Handling messages
 
    // -------------------------------------------------------------------------
 

	
 
    fn send_data_message_and_wake_up(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, source_port_handle: LocalPortHandle, value: ValueGroup) {
 
        let port_info = comp_ctx.get_port(source_port_handle);
 
        let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 
        let annotated_message = self.consensus.annotate_data_message(comp_ctx, port_info, value);
 
        peer_info.handle.send_message(sched_ctx, Message::Data(annotated_message), true);
 
    }
 

	
 
    /// Handles a message that came in through the public inbox. This function
 
    /// will handle putting it in the correct place, and potentially blocking
 
    /// the port in case too many messages are being received.
 
    fn handle_incoming_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: DataMessage) {
 
        // Whatever we do, glean information from headers in message
 
        if self.mode.is_in_sync_block() {
 
            self.consensus.handle_new_data_message(comp_ctx, &message);
 
        }
 

	
 
        // Check if we can insert it directly into the storage associated with
 
        // the port
 
        let target_port_id = message.data_header.target_port;
 
        let port_handle = comp_ctx.get_port_handle(target_port_id);
 
        let port_index = comp_ctx.get_port_index(port_handle);
 
        if self.inbox_main[port_index].is_none() {
 
            self.inbox_main[port_index] = Some(message);
 

	
 
            // After direct insertion, check if this component's execution is 
 
            // blocked on receiving a message on that port
 
            debug_assert!(!comp_ctx.get_port(port_handle).state.is_blocked()); // because we could insert directly
 
            if self.mode == Mode::BlockedGet && self.mode_port == target_port_id {
 
                // We were indeed blocked
 
                self.mode = Mode::Sync;
 
                self.mode_port = PortId::new_invalid();
 
            } else if self.mode == Mode::BlockedSelect {
 
                let select_decision = self.select.handle_updated_inbox(&self.inbox_main, comp_ctx);
 
                if let SelectDecision::Case(case_index) = select_decision {
 
                    self.exec_ctx.stmt = ExecStmt::PerformedSelectWait(case_index);
 
                    self.mode = Mode::Sync;
 
                }
 
            }
 
            
 
            return;
 
        }
 

	
 
        // The direct inbox is full, so the port will become (or was already) blocked
 
        let port_info = comp_ctx.get_port_mut(port_handle);
 
        debug_assert!(port_info.state == PortState::Open || port_info.state.is_blocked());
 

	
 
        if port_info.state == PortState::Open {
 
            comp_ctx.set_port_state(port_handle, PortState::BlockedDueToFullBuffers);
 
            let (peer_handle, message) =
 
                self.control.initiate_port_blocking(comp_ctx, port_handle);
 

	
 
            let peer = comp_ctx.get_peer(peer_handle);
 
            peer.handle.send_message(sched_ctx, Message::Control(message), true);
 
        }
 

	
 
        // But we still need to remember the message, so:
 
        self.inbox_backup.push(message);
 
    }
 

	
 
    /// Handles when a message has been handed off from the inbox to the PDL
 
    /// code. We check to see if there are more messages waiting and, if not,
 
    /// then we handle the case where the port might have been blocked
 
    /// previously.
 
    fn handle_received_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, port_handle: LocalPortHandle) {
 
        let port_index = comp_ctx.get_port_index(port_handle);
 
        debug_assert!(self.inbox_main[port_index].is_none()); // this function should be called after the message is taken out
 

	
 
        // Check for any more messages
 
        let port_info = comp_ctx.get_port(port_handle);
 
        for message_index in 0..self.inbox_backup.len() {
 
            let message = &self.inbox_backup[message_index];
 
            if message.data_header.target_port == port_info.self_id {
 
                // One more message for this port
 
                let message = self.inbox_backup.remove(message_index);
 
                debug_assert!(comp_ctx.get_port(port_handle).state.is_blocked()); // since we had >1 message on the port
 
                self.inbox_main[port_index] = Some(message);
 

	
 
                return;
 
            }
 
        }
 

	
 
        // Did not have any more messages. So if we were blocked, then we need
 
        // to send the "unblock" message.
 
        if port_info.state == PortState::BlockedDueToFullBuffers {
 
            comp_ctx.set_port_state(port_handle, PortState::Open);
 
            let (peer_handle, message) = self.control.cancel_port_blocking(comp_ctx, port_handle);
 
            let peer_info = comp_ctx.get_peer(peer_handle);
 
            peer_info.handle.send_message(sched_ctx, Message::Control(message), true);
 
        }
 
    }
 

	
 
    fn handle_incoming_control_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: ControlMessage) {
 
        // Little local utility to send an Ack
 
        fn send_control_ack_message(sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, causer_id: ControlId, peer_handle: LocalPeerHandle) {
 
            let peer_info = comp_ctx.get_peer(peer_handle);
 
            peer_info.handle.send_message(sched_ctx, Message::Control(ControlMessage{
 
                id: causer_id,
 
                sender_comp_id: comp_ctx.id,
 
                target_port_id: None,
 
                content: ControlMessageContent::Ack,
 
            }), true);
 
        }
 

	
 
        // Handle the content of the control message, and optionally Ack it
 
        match message.content {
 
            ControlMessageContent::Ack => {
 
                self.handle_ack(sched_ctx, comp_ctx, message.id);
 
            },
 
            ControlMessageContent::BlockPort(port_id) => {
 
                // On of our messages was accepted, but the port should be
 
                // blocked.
 
                let port_handle = comp_ctx.get_port_handle(port_id);
 
                let port_info = comp_ctx.get_port(port_handle);
 
                debug_assert_eq!(port_info.kind, PortKind::Putter);
 
                if port_info.state == PortState::Open {
 
                    // only when open: we don't do this when closed, and we we don't do this if we're blocked due to peer changes
 
                    comp_ctx.set_port_state(port_handle, PortState::BlockedDueToFullBuffers);
 
                }
 
            },
 
            ControlMessageContent::ClosePort(port_id) => {
 
                // Request to close the port. We immediately comply and remove
 
                // the component handle as well
 
                let port_handle = comp_ctx.get_port_handle(port_id);
 
                let peer_comp_id = comp_ctx.get_port(port_handle).peer_comp_id;
 
                let peer_handle = comp_ctx.get_peer_handle(peer_comp_id);
 

	
 
                // One exception to sending an `Ack` is if we just closed the
 
                // port ourselves, meaning that the `ClosePort` messages got
 
                // sent to one another.
 
                if let Some(control_id) = self.control.has_close_port_entry(port_handle, comp_ctx) {
 
                    self.handle_ack(sched_ctx, comp_ctx, control_id);
 
                } else {
 
                    send_control_ack_message(sched_ctx, comp_ctx, message.id, peer_handle);
 
                    comp_ctx.remove_peer(sched_ctx, port_handle, peer_comp_id, false); // do not remove if closed
 
                    comp_ctx.set_port_state(port_handle, PortState::Closed); // now set to closed
 
                }
 
            },
 
            ControlMessageContent::UnblockPort(port_id) => {
 
                // We were previously blocked (or already closed)
 
                let port_handle = comp_ctx.get_port_handle(port_id);
 
                let port_info = comp_ctx.get_port(port_handle);
 
                debug_assert_eq!(port_info.kind, PortKind::Putter);
 
                if port_info.state == PortState::BlockedDueToFullBuffers {
 
                    self.handle_unblock_port_instruction(sched_ctx, comp_ctx, port_handle);
 
                }
 
            },
 
            ControlMessageContent::PortPeerChangedBlock(port_id) => {
 
                // The peer of our port has just changed. So we are asked to
 
                // temporarily block the port (while our original recipient is
 
                // potentially rerouting some of the in-flight messages) and
 
                // Ack. Then we wait for the `unblock` call.
 
                debug_assert_eq!(message.target_port_id, Some(port_id));
 
                let port_handle = comp_ctx.get_port_handle(port_id);
 
                comp_ctx.set_port_state(port_handle, PortState::BlockedDueToPeerChange);
 

	
 
                let port_info = comp_ctx.get_port(port_handle);
 
                let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 

	
 
                send_control_ack_message(sched_ctx, comp_ctx, message.id, peer_handle);
 
            },
 
            ControlMessageContent::PortPeerChangedUnblock(new_port_id, new_comp_id) => {
 
                let port_handle = comp_ctx.get_port_handle(message.target_port_id.unwrap());
 
                let port_info = comp_ctx.get_port(port_handle);
 
                debug_assert!(port_info.state == PortState::BlockedDueToPeerChange);
 
                let old_peer_id = port_info.peer_comp_id;
 

	
 
                comp_ctx.remove_peer(sched_ctx, port_handle, old_peer_id, false);
 

	
 
                let port_info = comp_ctx.get_port_mut(port_handle);
 
                port_info.peer_comp_id = new_comp_id;
 
                port_info.peer_port_id = new_port_id;
 
                comp_ctx.add_peer(port_handle, sched_ctx, new_comp_id, None);
 
                self.handle_unblock_port_instruction(sched_ctx, comp_ctx, port_handle);
 
            }
 
        }
 
    }
 

	
 
    fn handle_incoming_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) {
 
        let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message);
 
        self.handle_sync_decision(sched_ctx, comp_ctx, decision);
 
    }
 

	
 
    /// Little helper that notifies the control layer of an `Ack`, and takes the
 
    /// appropriate subsequent action
 
    fn handle_ack(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, control_id: ControlId) {
 
        let mut to_ack = control_id;
 
        loop {
 
            let (action, new_to_ack) = self.control.handle_ack(to_ack, sched_ctx, comp_ctx);
 
            match action {
 
                AckAction::SendMessage(target_comp, message) => {
 
                    // FIX @NoDirectHandle
 
                    let mut handle = sched_ctx.runtime.get_component_public(target_comp);
 
                    handle.send_message(sched_ctx, Message::Control(message), true);
 
                    let _should_remove = handle.decrement_users();
 
                    debug_assert!(_should_remove.is_none());
 
                },
 
                AckAction::ScheduleComponent(to_schedule) => {
 
                    // FIX @NoDirectHandle
 
                    let mut handle = sched_ctx.runtime.get_component_public(to_schedule);
 

	
 
                    // Note that the component is intentionally not
 
                    // sleeping, so we just wake it up
 
                    debug_assert!(!handle.sleeping.load(std::sync::atomic::Ordering::Acquire));
 
                    let key = unsafe{ to_schedule.upgrade() };
 
                    sched_ctx.runtime.enqueue_work(key);
 
                    let _should_remove = handle.decrement_users();
 
                    debug_assert!(_should_remove.is_none());
 
                },
src/runtime2/component/consensus.rs
Show inline comments
 
use crate::protocol::eval::ValueGroup;
 
use crate::runtime2::scheduler::*;
 
use crate::runtime2::runtime::*;
 
use crate::runtime2::communication::*;
 

	
 
use super::component_context::*;
 

	
 
pub struct PortAnnotation {
 
    self_comp_id: CompId,
 
    self_port_id: PortId,
 
    peer_comp_id: CompId, // only valid for getter ports
 
    peer_port_id: PortId, // only valid for getter ports
 
    peer_discovered: bool, // only valid for getter ports
 
    mapping: Option<u32>,
 
    kind: PortKind,
 
}
 

	
 
impl PortAnnotation {
 
    fn new(comp_id: CompId, port_id: PortId) -> Self {
 
    fn new(comp_id: CompId, port_id: PortId, kind: PortKind) -> Self {
 
        return Self{
 
            self_comp_id: comp_id,
 
            self_port_id: port_id,
 
            peer_comp_id: CompId::new_invalid(),
 
            peer_port_id: PortId::new_invalid(),
 
            mapping: None
 
            peer_discovered: false,
 
            mapping: None,
 
            kind,
 
        }
 
    }
 
}
 

	
 
#[derive(Debug, Eq, PartialEq)]
 
enum Mode {
 
    NonSync,
 
    SyncBusy,
 
    SyncAwaitingSolution,
 
    SelectBusy,
 
    SelectWait,
 
}
 

	
 
struct SolutionCombiner {
 
    solution: SyncPartialSolution,
 
    matched_channels: usize,
 
}
 

	
 
impl SolutionCombiner {
 
    fn new() -> Self {
 
        return Self {
 
            solution: SyncPartialSolution::default(),
 
            matched_channels: 0,
 
        }
 
    }
 

	
 
    #[inline]
 
    fn has_contributions(&self) -> bool {
 
        return !self.solution.channel_mapping.is_empty();
 
    }
 

	
 
    /// Returns a decision for the current round. If there is no decision (yet)
 
    /// then `RoundDecision::None` is returned.
 
    fn get_decision(&self) -> SyncRoundDecision {
 
        if self.matched_channels == self.solution.channel_mapping.len() {
 
            debug_assert_ne!(self.solution.decision, SyncRoundDecision::None);
 
            return self.solution.decision;
 
        }
 

	
 
        return SyncRoundDecision::None; // even in case of failure: wait for everyone.
 
    }
 

	
 
    fn combine_with_partial_solution(&mut self, partial: SyncPartialSolution) {
 
        debug_assert_ne!(self.solution.decision, SyncRoundDecision::Solution);
 
        debug_assert_ne!(partial.decision, SyncRoundDecision::Solution);
 

	
 
        if partial.decision == SyncRoundDecision::Failure {
 
            self.solution.decision = SyncRoundDecision::Failure;
 
        }
 

	
 
        for entry in partial.channel_mapping {
 
            let channel_index = if entry.getter.is_some() && entry.putter.is_some() {
 
                let channel_index = self.solution.channel_mapping.len();
 
                self.solution.channel_mapping.push(entry);
 
                self.matched_channels += 1;
 

	
 
                channel_index
 
            } else if let Some(putter) = entry.putter {
 
                self.combine_with_putter_port(putter)
 
            } else if let Some(getter) = entry.getter {
 
                self.combine_with_getter_port(getter)
 
            } else {
 
                unreachable!(); // both putter and getter are None
 
            };
 

	
 
            let channel = &self.solution.channel_mapping[channel_index];
 
            if let Some(consistent) = Self::channel_is_consistent(channel) {
 
                if !consistent {
 
                    self.solution.decision = SyncRoundDecision::Failure;
 
                }
 
                self.matched_channels += 1;
 
            }
 
        }
 

	
 
        self.update_solution();
 
    }
 

	
 
    /// Combines the currently stored global solution (if any) with the newly
 
    /// provided local solution. Make sure to check the `has_decision` return
 
    /// value afterwards.
 
    fn combine_with_local_solution(&mut self, _comp_id: CompId, solution: SyncLocalSolution) {
 
        debug_assert_ne!(self.solution.decision, SyncRoundDecision::Solution);
 

	
 
        // Combine partial solution with the local solution entries
 
        for entry in solution {
 
            // Match the current entry up with its peer endpoint, or add a new
 
            // entry.
 
            let channel_index = match entry {
 
                SyncLocalSolutionEntry::Putter(putter) => {
 
                    self.combine_with_putter_port(putter)
 
                },
 
                SyncLocalSolutionEntry::Getter(getter) => {
 
                    self.combine_with_getter_port(getter)
 
                }
 
            };
 

	
 
            // Check if channel is now consistent
 
            let channel = &self.solution.channel_mapping[channel_index];
 
            if let Some(consistent) = Self::channel_is_consistent(channel) {
 
                if !consistent {
 
                    self.solution.decision = SyncRoundDecision::Failure;
 
                }
 
                self.matched_channels += 1;
 
            }
 
        }
 

	
 
        self.update_solution();
 
    }
 

	
 
    /// Takes whatever partial solution is present in the solution combiner and
 
    /// returns it. The solution combiner's solution will end up being empty.
 
    /// This is used when a new leader is found and we need to pass along our
 
    /// partial results.
 
    fn take_partial_solution(&mut self) -> SyncPartialSolution {
 
        let mut partial_solution = SyncPartialSolution::default();
 
        std::mem::swap(&mut partial_solution, &mut self.solution);
 
        self.clear();
 

	
 
        return partial_solution;
 
    }
 

	
 
    fn clear(&mut self) {
 
        self.solution.channel_mapping.clear();
 
        self.solution.decision = SyncRoundDecision::None;
 
        self.matched_channels = 0;
 
    }
 

	
 
    // --- Small utilities for combining solutions
 

	
 
    fn combine_with_putter_port(&mut self, putter: SyncSolutionPutterPort) -> usize {
 
        let channel_index = self.get_channel_index_for_putter(putter.self_comp_id, putter.self_port_id);
 
        if let Some(channel_index) = channel_index {
 
            let channel = &mut self.solution.channel_mapping[channel_index];
 
            debug_assert!(channel.putter.is_none());
 
            channel.putter = Some(putter);
 

	
 
            return channel_index;
 
        } else {
 
            let channel_index = self.solution.channel_mapping.len();
 
            self.solution.channel_mapping.push(SyncSolutionChannel{
 
                putter: Some(putter),
 
                getter: None,
 
            });
 

	
 
            return channel_index;
 
        }
 
    }
 

	
 
    fn combine_with_getter_port(&mut self, getter: SyncSolutionGetterPort) -> usize {
 
        let channel_index = self.get_channel_index_for_getter(getter.peer_comp_id, getter.peer_port_id);
 
        if let Some(channel_index) = channel_index {
 
            let channel = &mut self.solution.channel_mapping[channel_index];
 
            debug_assert!(channel.getter.is_none());
 
            channel.getter = Some(getter);
 

	
 
            return channel_index;
 
        } else {
 
            let channel_index = self.solution.channel_mapping.len();
 
            self.solution.channel_mapping.push(SyncSolutionChannel{
 
                putter: None,
 
                getter: Some(getter)
 
            });
 

	
 
            return channel_index;
 
        }
 
    }
 

	
 
    /// Retrieve index of the channel containing a getter port that has received
 
    /// from the specified putter port.
 
    fn get_channel_index_for_putter(&self, putter_comp_id: CompId, putter_port_id: PortId) -> Option<usize> {
 
        for (channel_index, channel) in self.solution.channel_mapping.iter().enumerate() {
 
            if let Some(getter) = &channel.getter {
 
                if getter.peer_comp_id == putter_comp_id && getter.peer_port_id == putter_port_id {
 
                    return Some(channel_index);
 
                }
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    /// Retrieve index of the channel for a getter port. To find this channel
 
    /// the **peer** component/port IDs of the getter port are used.
 
    fn get_channel_index_for_getter(&self, peer_comp_id: CompId, peer_port_id: PortId) -> Option<usize> {
 
        for (channel_index, channel) in self.solution.channel_mapping.iter().enumerate() {
 
            if let Some(putter) = &channel.putter {
 
                if putter.self_comp_id == peer_comp_id && putter.self_port_id == peer_port_id {
 
                    return Some(channel_index);
 
                }
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    fn channel_is_consistent(channel: &SyncSolutionChannel) -> Option<bool> {
 
        if channel.putter.is_none() || channel.getter.is_none() {
 
            return None;
 
        }
 

	
 
        let putter = channel.putter.as_ref().unwrap();
 
        let getter = channel.getter.as_ref().unwrap();
 
        return Some(putter.mapping == getter.mapping);
 
    }
 

	
 
    /// Determines the global solution if all components have contributed their
 
    /// local solutions.
 
    fn update_solution(&mut self) {
 
        if self.matched_channels == self.solution.channel_mapping.len() {
 
            if self.solution.decision != SyncRoundDecision::Failure {
 
                self.solution.decision = SyncRoundDecision::Solution;
 
            }
 
        }
 
    }
 
}
 

	
 
/// Tracking consensus state
 
pub struct Consensus {
 
    // General state of consensus manager
 
    mapping_counter: u32,
 
    mode: Mode,
 
    // State associated with sync round
 
    round_index: u32,
 
    highest_id: CompId,
 
    ports: Vec<PortAnnotation>,
 
    // State associated with arriving at a solution and being a (temporary)
 
    // leader in the consensus round
 
    solution: SolutionCombiner,
 
}
 

	
 
impl Consensus {
 
    pub(crate) fn new() -> Self {
 
        return Self{
 
            round_index: 0,
 
            highest_id: CompId::new_invalid(),
 
            ports: Vec::new(),
 
            mapping_counter: 0,
 
            mode: Mode::NonSync,
 
            solution: SolutionCombiner::new(),
 
        }
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Managing sync state
 
    // -------------------------------------------------------------------------
 

	
 
    /// Notifies the consensus management that the PDL code has reached the
 
    /// start of a sync block.
 
    pub(crate) fn notify_sync_start(&mut self, comp_ctx: &CompCtx) {
 
        debug_assert_eq!(self.mode, Mode::NonSync);
 
        self.highest_id = comp_ctx.id;
 
        self.mapping_counter = 0;
 
        self.mode = Mode::SyncBusy;
 
        self.make_ports_consistent_with_ctx(comp_ctx);
 

	
 
        // Make the internally stored port annotation array consistent with the
 
        // ports that the component currently owns. They should match by index
 
        // (i.e. annotation at index `i` corresponds to port `i` in `comp_ctx`).
 
        let mut needs_setting_ports = false;
 
        if comp_ctx.num_ports() != self.ports.len() {
 
            needs_setting_ports = true;
 
        } else {
 
            for (idx, port) in comp_ctx.iter_ports().enumerate() {
 
                let comp_port_id = port.self_id;
 
                let cons_port_id = self.ports[idx].self_port_id;
 
                if comp_port_id != cons_port_id {
 
                    needs_setting_ports = true;
 
                    break;
 
                }
 
            }
 
        }
 

	
 
        if needs_setting_ports {
 
            // Reset all ports
 
            self.ports.clear();
 
            self.ports.reserve(comp_ctx.num_ports());
 
            for port in comp_ctx.iter_ports() {
 
                self.ports.push(PortAnnotation::new(comp_ctx.id, port.self_id, port.kind));
 
            }
 
        } else {
 
            // Make sure that we consider all peers as undiscovered again
 
            for annotation in self.ports.iter_mut() {
 
                annotation.peer_discovered = false;
 
            }
 
        }
 
    }
 

	
 
    /// Notifies the consensus management that the PDL code has reached the end
 
    /// of a sync block. A local solution will be submitted, after which we wait
 
    /// until the participants in the round (hopefully) reach a conclusion.
 
    pub(crate) fn notify_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx) -> SyncRoundDecision {
 
        debug_assert_eq!(self.mode, Mode::SyncBusy);
 
        self.mode = Mode::SyncAwaitingSolution;
 

	
 
        // Submit our port mapping as a solution
 
        let mut local_solution = Vec::with_capacity(self.ports.len());
 
        for port in &self.ports {
 
            if let Some(mapping) = port.mapping {
 
                let port_handle = comp_ctx.get_port_handle(port.self_port_id);
 
                let port_info = comp_ctx.get_port(port_handle);
 
                let new_entry = match port_info.kind {
 
                    PortKind::Putter => SyncLocalSolutionEntry::Putter(SyncSolutionPutterPort{
 
                        self_comp_id: comp_ctx.id,
 
                        self_port_id: port_info.self_id,
 
                        mapping
 
                    }),
 
                    PortKind::Getter => SyncLocalSolutionEntry::Getter(SyncSolutionGetterPort{
 
                        self_comp_id: comp_ctx.id,
 
                        self_port_id: port_info.self_id,
 
                        peer_comp_id: port.peer_comp_id,
 
                        peer_port_id: port.peer_port_id,
 
                        mapping
 
                    })
 
                };
 
                local_solution.push(new_entry);
 
            }
 
        }
 

	
 
        let decision = self.handle_local_solution(sched_ctx, comp_ctx, comp_ctx.id, local_solution);
 
        return decision;
 
    }
 

	
 
    /// Notifies that a decision has been reached. Note that the caller should
 
    /// still take the appropriate actions based on the decision it is supplying
 
    /// to the consensus layer.
 
    pub(crate) fn notify_sync_decision(&mut self, _decision: SyncRoundDecision) {
 
        // Reset everything for the next round
 
        debug_assert_eq!(self.mode, Mode::SyncAwaitingSolution);
 
        self.mode = Mode::NonSync;
 
        self.round_index = self.round_index.wrapping_add(1);
 

	
 
        for port in self.ports.iter_mut() {
 
            port.mapping = None;
 
        }
 

	
 
        self.solution.clear();
 
    }
 

	
 
    fn make_ports_consistent_with_ctx(&mut self, comp_ctx: &CompCtx) {
 
        let mut needs_setting_ports = false;
 
        if comp_ctx.num_ports() != self.ports.len() {
 
            needs_setting_ports = true;
 
        } else {
 
            for (idx, port) in comp_ctx.iter_ports().enumerate() {
 
                let comp_port_id = port.self_id;
 
                let cons_port_id = self.ports[idx].self_port_id;
 
                if comp_port_id != cons_port_id {
 
                    needs_setting_ports = true;
 
                    break;
 
                }
 
            }
 
        }
 

	
 
        if needs_setting_ports {
 
            self.ports.clear();
 
            self.ports.reserve(comp_ctx.num_ports());
 
            for port in comp_ctx.iter_ports() {
 
                self.ports.push(PortAnnotation::new(comp_ctx.id, port.self_id))
 
            }
 
        }
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Handling inbound and outbound messages
 
    // -------------------------------------------------------------------------
 

	
 
    /// Prepares a set of values to be sent of a channel.
 
    pub(crate) fn annotate_data_message(&mut self, comp_ctx: &CompCtx, port_info: &Port, content: ValueGroup) -> DataMessage {
 
        debug_assert_eq!(self.mode, Mode::SyncBusy); // can only send between sync start and sync end
 
        debug_assert!(self.ports.iter().any(|v| v.self_port_id == port_info.self_id));
 
        let data_header = self.create_data_header_and_update_mapping(port_info);
 
        let sync_header = self.create_sync_header(comp_ctx);
 

	
 
        return DataMessage{ data_header, sync_header, content };
 
    }
 

	
 
    /// Handles the arrival of a new data message (needs to be called for every
 
    /// new data message, even though it might not end up being received). This
 
    /// is used to determine peers of `get`ter ports.
 
    pub(crate) fn handle_new_data_message(&mut self, comp_ctx: &CompCtx, message: &DataMessage) {
 
        let target_handle = comp_ctx.get_port_handle(message.data_header.target_port);
 
        let target_index = comp_ctx.get_port_index(target_handle);
 
        let annotation = &mut self.ports[target_index];
 
        debug_assert!(
 
            !annotation.peer_discovered || (
 
                annotation.peer_comp_id == message.sync_header.sending_id &&
 
                annotation.peer_port_id == message.data_header.source_port
 
            )
 
        );
 
        annotation.peer_comp_id = message.sync_header.sending_id;
 
        annotation.peer_port_id = message.data_header.source_port;
 
        annotation.peer_discovered = true;
 
    }
 

	
 
    /// Checks if the data message can be received (due to port annotations), if
 
    /// it can then `true` is returned and the caller is responsible for handing
 
    /// the message of to the PDL code. Otherwise the message cannot be
 
    /// received.
 
    pub(crate) fn try_receive_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: &DataMessage) -> bool {
 
        debug_assert_eq!(self.mode, Mode::SyncBusy);
 
        debug_assert!(self.ports.iter().any(|v| v.self_port_id == message.data_header.target_port));
 

	
 
        // Make sure the expected mapping matches the currently stored mapping
 
        for (expected_id, expected_annotation) in &message.data_header.expected_mapping {
 
            let got_annotation = self.get_annotation(*expected_id);
 
            if got_annotation != *expected_annotation {
 
        for (peer_port_kind, expected_annotation) in &message.data_header.expected_mapping {
 
            // Determine our annotation, in order to do so we need to find the
 
            // port matching the peer ports
 
            let mut self_annotation = None;
 
            let mut self_annotation_found = false;
 
            match peer_port_kind {
 
                PortAnnotationKind::Putter(peer_port) => {
 
                    for self_port in &self.ports {
 
                        if self_port.kind == PortKind::Getter &&
 
                            self_port.peer_discovered &&
 
                            self_port.peer_comp_id == peer_port.self_comp_id &&
 
                            self_port.peer_port_id == peer_port.self_port_id
 
                        {
 
                            self_annotation = self_port.mapping;
 
                            self_annotation_found = true;
 
                            break;
 
                        }
 
                    }
 
                },
 
                PortAnnotationKind::Getter(peer_port) => {
 
                    if peer_port.peer_comp_id == comp_ctx.id {
 
                        // Peer indicates that we talked to it
 
                        let self_port_handle = comp_ctx.get_port_handle(peer_port.peer_port_id);
 
                        let self_port_index = comp_ctx.get_port_index(self_port_handle);
 
                        self_annotation = self.ports[self_port_index].mapping;
 
                        self_annotation_found = true;
 
                    }
 
                }
 
            }
 

	
 
            if !self_annotation_found {
 
                continue
 
            }
 

	
 
            if self_annotation != *expected_annotation {
 
                return false;
 
            }
 
        }
 

	
 
        // Expected mapping matches current mapping, so we will receive the message
 
        self.set_annotation(message.sync_header.sending_id, &message.data_header);
 

	
 
        // Handle the sync header embedded within the data message
 
        self.handle_sync_header(sched_ctx, comp_ctx, &message.sync_header);
 

	
 
        return true;
 
    }
 

	
 
    /// Receives the sync message and updates the consensus state appropriately.
 
    pub(crate) fn receive_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) -> SyncRoundDecision {
 
        // Whatever happens: handle the sync header (possibly changing the
 
        // currently registered leader)
 
        self.handle_sync_header(sched_ctx, comp_ctx, &message.sync_header);
 

	
 
        match message.content {
 
            SyncMessageContent::NotificationOfLeader => {
 
                return SyncRoundDecision::None;
 
            },
 
            SyncMessageContent::LocalSolution(solution_generator_id, local_solution) => {
 
                return self.handle_local_solution(sched_ctx, comp_ctx, solution_generator_id, local_solution);
 
            },
 
            SyncMessageContent::PartialSolution(partial_solution) => {
 
                return self.handle_partial_solution(sched_ctx, comp_ctx, partial_solution);
 
            },
 
            SyncMessageContent::GlobalSolution => {
 
                debug_assert_eq!(self.mode, Mode::SyncAwaitingSolution); // leader can only find global- if we submitted local solution
 
                return SyncRoundDecision::Solution;
 
            },
 
            SyncMessageContent::GlobalFailure => {
 
                debug_assert_eq!(self.mode, Mode::SyncAwaitingSolution);
 
                return SyncRoundDecision::Failure;
 
            }
 
        }
 
    }
 

	
 
    fn handle_sync_header(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, header: &MessageSyncHeader) {
 
        if header.highest_id.0 > self.highest_id.0 {
 
            // Sender knows of someone with a higher ID. So store highest ID,
 
            // notify all peers, and forward local solutions
 
            self.highest_id = header.highest_id;
 
            for peer in comp_ctx.iter_peers() {
 
                if peer.id == header.sending_id {
 
                    continue; // do not send to sender: it has the higher ID
 
                }
 

	
 
                // also: only send if we received a message in this round
 
                let mut performed_communication = false; // TODO: Revise, temporary fix
 
                for port in self.ports.iter() {
 
                    if port.peer_comp_id == peer.id && port.mapping.is_some() {
 
                        performed_communication = true;
 
                        break;
 
                    }
 
                }
 

	
 
                if !performed_communication {
 
                    continue;
 
                }
 

	
 
                let message = SyncMessage{
 
                    sync_header: self.create_sync_header(comp_ctx),
 
                    content: SyncMessageContent::NotificationOfLeader,
 
                };
 
                peer.handle.send_message(sched_ctx, Message::Sync(message), true);
 
            }
 

	
 
            self.forward_partial_solution(sched_ctx, comp_ctx);
 
        } else if header.highest_id.0 < self.highest_id.0 {
 
            // Sender has a lower ID, so notify it of our higher one
 
            let message = SyncMessage{
 
                sync_header: self.create_sync_header(comp_ctx),
 
                content: SyncMessageContent::NotificationOfLeader,
 
            };
 
            let peer_handle = comp_ctx.get_peer_handle(header.sending_id);
 
            let peer_info = comp_ctx.get_peer(peer_handle);
 
            peer_info.handle.send_message(sched_ctx, Message::Sync(message), true);
 
        } // else: exactly equal
 
    }
 

	
 
    fn get_annotation(&self, port_id: PortId) -> Option<u32> {
 
        for annotation in self.ports.iter() {
 
            if annotation.self_port_id == port_id {
 
                return annotation.mapping;
 
            }
 
        }
 

	
 
        debug_assert!(false);
 
        return None;
 
    }
 

	
 
    fn set_annotation(&mut self, source_comp_id: CompId, data_header: &MessageDataHeader) {
 
        for annotation in self.ports.iter_mut() {
 
            if annotation.self_port_id == data_header.target_port {
 
                annotation.peer_comp_id = source_comp_id;
 
                annotation.peer_port_id = data_header.source_port;
 
                // Message should have already passed the `handle_new_data_message` function, so we
 
                // should have already annotated the peer of the port.
 
                debug_assert!(
 
                    annotation.peer_discovered &&
 
                    annotation.peer_comp_id == source_comp_id &&
 
                    annotation.peer_port_id == data_header.source_port
 
                );
 
                annotation.mapping = Some(data_header.new_mapping);
 
            }
 
        }
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Leader-related methods
 
    // -------------------------------------------------------------------------
 

	
 
    fn forward_partial_solution(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        debug_assert_ne!(self.highest_id, comp_ctx.id); // not leader
 

	
 
        // Make sure that we have something to send
 
        if !self.solution.has_contributions() {
 
            return;
 
        }
 

	
 
        // Swap the container with the partial solution and then send it along
 
        let partial_solution = self.solution.take_partial_solution();
 
        self.send_to_leader(sched_ctx, comp_ctx, Message::Sync(SyncMessage{
 
            sync_header: self.create_sync_header(comp_ctx),
 
            content: SyncMessageContent::PartialSolution(partial_solution),
 
        }));
 
    }
 

	
 
    fn handle_local_solution(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, solution_sender_id: CompId, solution: SyncLocalSolution) -> SyncRoundDecision {
 
        if self.highest_id == comp_ctx.id {
 
            // We are the leader
 
            self.solution.combine_with_local_solution(solution_sender_id, solution);
 
            let round_decision = self.solution.get_decision();
 
            if round_decision != SyncRoundDecision::None {
 
                self.broadcast_decision(sched_ctx, comp_ctx, round_decision);
 
            }
 
            return round_decision;
 
        } else {
 
            // Forward the solution
 
            let message = SyncMessage{
 
                sync_header: self.create_sync_header(comp_ctx),
 
                content: SyncMessageContent::LocalSolution(solution_sender_id, solution),
 
            };
 
            self.send_to_leader(sched_ctx, comp_ctx, Message::Sync(message));
 
            return SyncRoundDecision::None;
 
        }
 
    }
 

	
 
    fn handle_partial_solution(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, solution: SyncPartialSolution) -> SyncRoundDecision {
 
        if self.highest_id == comp_ctx.id {
 
            // We are the leader, combine existing and new solution
 
            self.solution.combine_with_partial_solution(solution);
 
            let round_decision = self.solution.get_decision();
 
            if round_decision != SyncRoundDecision::None {
 
                self.broadcast_decision(sched_ctx, comp_ctx, round_decision);
 
            }
 
            return round_decision;
 
        } else {
 
            // Forward the partial solution
 
            let message = SyncMessage{
 
                sync_header: self.create_sync_header(comp_ctx),
 
                content: SyncMessageContent::PartialSolution(solution),
 
            };
 
            self.send_to_leader(sched_ctx, comp_ctx, Message::Sync(message));
 
            return SyncRoundDecision::None;
 
        }
 
    }
 

	
 
    fn broadcast_decision(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, decision: SyncRoundDecision) {
 
        debug_assert_eq!(self.highest_id, comp_ctx.id);
 

	
 
        let is_success = match decision {
 
            SyncRoundDecision::None => unreachable!(),
 
            SyncRoundDecision::Solution => true,
 
            SyncRoundDecision::Failure => false,
 
        };
 

	
 
        let mut peers = Vec::with_capacity(self.solution.solution.channel_mapping.len()); // TODO: @Performance
 

	
 
        for channel in self.solution.solution.channel_mapping.iter() {
 
            let getter = channel.getter.as_ref().unwrap();
 
            if getter.self_comp_id != comp_ctx.id && !peers.contains(&getter.self_comp_id) {
 
                peers.push(getter.self_comp_id);
 
            }
 
            if getter.peer_comp_id != comp_ctx.id && !peers.contains(&getter.peer_comp_id) {
 
                peers.push(getter.peer_comp_id);
 
            }
 
        }
 

	
 
        for peer in peers {
 
            let mut handle = sched_ctx.runtime.get_component_public(peer);
 
            let message = Message::Sync(SyncMessage{
 
                sync_header: self.create_sync_header(comp_ctx),
 
                content: if is_success { SyncMessageContent::GlobalSolution } else { SyncMessageContent::GlobalFailure },
 
            });
 
            handle.send_message(sched_ctx, message, true);
 
            let _should_remove = handle.decrement_users();
 
            debug_assert!(_should_remove.is_none());
 
        }
 
    }
 

	
 
    fn send_to_leader(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, message: Message) {
 
        debug_assert_ne!(self.highest_id, comp_ctx.id); // we're not the leader
 
        let mut leader_info = sched_ctx.runtime.get_component_public(self.highest_id);
 
        leader_info.send_message(sched_ctx, message, true);
 
        let should_remove = leader_info.decrement_users();
 
        if let Some(key) = should_remove {
 
            sched_ctx.runtime.destroy_component(key);
 
        }
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Creating message headers
 
    // -------------------------------------------------------------------------
 

	
 
    fn create_data_header_and_update_mapping(&mut self, port_info: &Port) -> MessageDataHeader {
 
        let mut expected_mapping = Vec::with_capacity(self.ports.len());
 
        let mut port_index = usize::MAX;
 
        for (index, port) in self.ports.iter().enumerate() {
 
            if port.self_port_id == port_info.self_id {
 
                port_index = index;
 
                port_index = index; // remember for later updating
 
            }
 
            expected_mapping.push((port.self_port_id, port.mapping));
 

	
 
            // Add all of the
 
            let annotation_kind = match port.kind {
 
                PortKind::Putter => {
 
                    PortAnnotationKind::Putter(PortAnnotationPutter{
 
                        self_comp_id: port.self_comp_id,
 
                        self_port_id: port.self_port_id
 
                    })
 
                },
 
                PortKind::Getter => {
 
                    if !port.peer_discovered {
 
                        continue;
 
                    }
 

	
 
                    PortAnnotationKind::Getter(PortAnnotationGetter{
 
                        self_comp_id: port.self_comp_id,
 
                        self_port_id: port.self_port_id,
 
                        peer_comp_id: port.peer_comp_id,
 
                        peer_port_id: port.peer_port_id,
 
                    })
 
                }
 
            };
 
            expected_mapping.push((annotation_kind, port.mapping));
 
        }
 

	
 
        let new_mapping = self.take_mapping();
 
        self.ports[port_index].mapping = Some(new_mapping);
 
        debug_assert_eq!(port_info.kind, PortKind::Putter);
 
        return MessageDataHeader{
 
            expected_mapping,
 
            new_mapping,
 
            source_port: port_info.self_id,
 
            target_port: port_info.peer_port_id,
 
        };
 
    }
 

	
 
    #[inline]
 
    fn create_sync_header(&self, comp_ctx: &CompCtx) -> MessageSyncHeader {
 
        return MessageSyncHeader{
 
            sync_round: self.round_index,
 
            sending_id: comp_ctx.id,
 
            highest_id: self.highest_id,
 
        };
 
    }
 

	
 
    #[inline]
 
    fn take_mapping(&mut self) -> u32 {
 
        let mapping = self.mapping_counter;
 
        self.mapping_counter = self.mapping_counter.wrapping_add(1);
 
        return mapping;
 
    }
 
}
 
\ No newline at end of file
src/runtime2/runtime.rs
Show inline comments
 
use std::sync::{Arc, Mutex, Condvar};
 
use std::sync::atomic::{AtomicU32, AtomicBool, Ordering};
 
use std::collections::VecDeque;
 

	
 
use crate::protocol::*;
 

	
 
use super::communication::Message;
 
use super::component::{wake_up_if_sleeping, CompPDL, CompCtx};
 
use super::store::{ComponentStore, ComponentReservation, QueueDynMpsc, QueueDynProducer};
 
use super::scheduler::*;
 

	
 
// -----------------------------------------------------------------------------
 
// Component
 
// -----------------------------------------------------------------------------
 

	
 
/// Key to a component. Type system somewhat ensures that there can only be one
 
/// of these. Only with a key one may retrieve privately-accessible memory for
 
/// a component. Practically just a generational index, like `CompId` is.
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub(crate) struct CompKey(pub u32);
 

	
 
impl CompKey {
 
    pub(crate) fn downgrade(&self) -> CompId {
 
        return CompId(self.0);
 
    }
 
}
 

	
 
/// Generational ID of a component
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub struct CompId(pub u32);
 

	
 
impl CompId {
 
    pub(crate) fn new_invalid() -> CompId {
 
        return CompId(u32::MAX);
 
    }
 

	
 
    /// Upgrade component ID to component key. Unsafe because the caller needs
 
    /// to make sure that only one component key can exist at a time (to ensure
 
    /// a component can only be scheduled/executed by one thread).
 
    pub(crate) unsafe fn upgrade(&self) -> CompKey {
 
        return CompKey(self.0);
 
    }
 
}
 

	
 
/// Handle to a component that is being created.
 
pub(crate) struct CompReserved {
 
    reservation: ComponentReservation,
 
}
 

	
 
impl CompReserved {
 
    pub(crate) fn id(&self) -> CompId {
 
        return CompId(self.reservation.index)
 
    }
 
}
 

	
 
/// Private fields of a component, may only be modified by a single thread at
 
/// a time.
 
pub(crate) struct RuntimeComp {
 
    pub public: CompPublic,
 
    pub code: CompPDL,
 
    pub ctx: CompCtx,
 
    pub inbox: QueueDynMpsc<Message>,
 
    pub exiting: bool,
 
}
 

	
 
/// Should contain everything that is accessible in a thread-safe manner
 
// TODO: Do something about the `num_handles` thing. This needs to be a bit more
 
//  "foolproof" to lighten the mental burden of using the `num_handles`
 
//  variable.
 
pub(crate) struct CompPublic {
 
    pub sleeping: AtomicBool,
 
    pub num_handles: AtomicU32, // manually modified (!)
 
    inbox: QueueDynProducer<Message>,
 
}
 

	
 
/// Handle to public part of a component. Would be nice if we could
 
/// automagically manage the `num_handles` counter. But when it reaches zero we
 
/// need to manually remove the handle from the runtime. So we just have debug
 
/// code to make sure this actually happens.
 
pub(crate) struct CompHandle {
 
    target: *const CompPublic,
 
    id: CompId, // TODO: @Remove after debugging
 
    #[cfg(debug_assertions)] decremented: bool,
 
}
 

	
 
impl CompHandle {
 
    fn new(id: CompId, public: &CompPublic) -> CompHandle {
 
        let handle = CompHandle{
 
            target: public,
 
            id,
 
            #[cfg(debug_assertions)] decremented: false,
 
        };
 
        handle.increment_users();
 
        return handle;
 
    }
 

	
 
    pub(crate) fn send_message(&self, sched_ctx: &SchedulerCtx, message: Message, try_wake_up: bool) {
 
        sched_ctx.log(&format!("Sending message to [c:{:03}, wakeup:{}]: {:?}", self.id.0, try_wake_up, message));
 
        self.inbox.push(message);
 
        if try_wake_up {
 
            wake_up_if_sleeping(sched_ctx, self.id, self);
 
        }
 
    }
 

	
 
    fn increment_users(&self) {
 
        let old_count = self.num_handles.fetch_add(1, Ordering::AcqRel);
 
        debug_assert!(old_count > 0); // because we should never be able to retrieve a handle when the component is (being) destroyed
 
    }
 

	
 
    /// Returns the `CompKey` to the component if it should be destroyed
 
    pub(crate) fn decrement_users(&mut self) -> Option<CompKey> {
 
        debug_assert!(!self.decremented, "illegal to 'decrement_users' twice");
 
        dbg_code!(assert!(self.decremented, "illegal to 'decrement_users' twice"));
 
        let old_count = self.num_handles.fetch_sub(1, Ordering::AcqRel);
 
        let new_count = old_count - 1;
 
        dbg_code!(self.decremented = true);
 
        if new_count == 0 {
 
            return Some(unsafe{ self.id.upgrade() });
 
        }
 

	
 
        return None;
 
    }
 
}
 

	
 
impl Clone for CompHandle {
 
    fn clone(&self) -> Self {
 
        debug_assert!(!self.decremented, "illegal to clone after 'decrement_users'");
 
        dbg_code!(assert!(!self.decremented, "illegal to clone after 'decrement_users'"));
 
        self.increment_users();
 
        return CompHandle{
 
            target: self.target,
 
            id: self.id,
 
            #[cfg(debug_assertions)] decremented: false,
 
        };
 
    }
 
}
 

	
 
impl std::ops::Deref for CompHandle {
 
    type Target = CompPublic;
 

	
 
    fn deref(&self) -> &Self::Target {
 
        debug_assert!(!self.decremented); // cannot access if control is relinquished
 
        dbg_code!(assert!(!self.decremented)); // cannot access if control is relinquished
 
        return unsafe{ &*self.target };
 
    }
 
}
 

	
 
impl Drop for CompHandle {
 
    fn drop(&mut self) {
 
        debug_assert!(self.decremented, "need call to 'decrement_users' before dropping");
 
        dbg_code!(assert!(self.decremented, "need call to 'decrement_users' before dropping"));
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Runtime
 
// -----------------------------------------------------------------------------
 

	
 
pub struct Runtime {
 
    pub(crate) inner: Arc<RuntimeInner>,
 
    threads: Vec<std::thread::JoinHandle<()>>,
 
}
 

	
 
impl Runtime {
 
    // TODO: debug_logging should be removed at some point
 
    pub fn new(num_threads: u32, debug_logging: bool, protocol_description: ProtocolDescription) -> Runtime {
 
        assert!(num_threads > 0, "need a thread to perform work");
 
        let runtime_inner = Arc::new(RuntimeInner {
 
            protocol: protocol_description,
 
            components: ComponentStore::new(128),
 
            work_queue: Mutex::new(VecDeque::with_capacity(128)),
 
            work_condvar: Condvar::new(),
 
            active_elements: AtomicU32::new(1),
 
        });
 
        let mut runtime = Runtime {
 
            inner: runtime_inner,
 
            threads: Vec::with_capacity(num_threads as usize),
 
        };
 

	
 
        for thread_index in 0..num_threads {
 
            let mut scheduler = Scheduler::new(runtime.inner.clone(), thread_index, debug_logging);
 
            let thread_handle = std::thread::spawn(move || {
 
                scheduler.run();
 
            });
 

	
 
            runtime.threads.push(thread_handle);
 
        }
 

	
 
        return runtime;
 
    }
 

	
 
    pub fn create_component(&self, module_name: &[u8], routine_name: &[u8]) -> Result<(), ComponentCreationError> {
 
        use crate::protocol::eval::ValueGroup;
 
        let prompt = self.inner.protocol.new_component(
 
            module_name, routine_name,
 
            ValueGroup::new_stack(Vec::new())
 
        )?;
 
        let reserved = self.inner.start_create_pdl_component();
 
        let ctx = CompCtx::new(&reserved);
 
        let (key, _) = self.inner.finish_create_pdl_component(reserved, CompPDL::new(prompt, 0), ctx, false);
 
        self.inner.enqueue_work(key);
 

	
 
        return Ok(())
 
    }
 
}
 

	
 
impl Drop for Runtime {
 
    fn drop(&mut self) {
 
        self.inner.decrement_active_components();
 
        for handle in self.threads.drain(..) {
 
            handle.join().expect("join scheduler thread");
 
        }
 
    }
 
}
 

	
 
/// Memory that is maintained by "the runtime". In practice it is maintained by
 
/// multiple schedulers, and this serves as the common interface to that memory.
 
pub(crate) struct RuntimeInner {
 
    pub protocol: ProtocolDescription,
 
    components: ComponentStore<RuntimeComp>,
 
    work_queue: Mutex<VecDeque<CompKey>>,
 
    work_condvar: Condvar,
 
    active_elements: AtomicU32, // active components and APIs (i.e. component creators)
 
}
 

	
 
impl RuntimeInner {
 
    // Scheduling and retrieving work
 

	
 
    pub(crate) fn take_work(&self) -> Option<CompKey> {
 
        let mut lock = self.work_queue.lock().unwrap();
 
        while lock.is_empty() && self.active_elements.load(Ordering::Acquire) != 0 {
 
            lock = self.work_condvar.wait(lock).unwrap();
 
        }
 

	
 
        // We have work, or the schedulers should exit.
 
        return lock.pop_front();
 
    }
 

	
 
    pub(crate) fn enqueue_work(&self, key: CompKey) {
 
        let mut lock = self.work_queue.lock().unwrap();
 
        lock.push_back(key);
 
        self.work_condvar.notify_one();
 
    }
 

	
 
    // Creating/destroying components
 

	
 
    pub(crate) fn start_create_pdl_component(&self) -> CompReserved {
 
        self.increment_active_components();
 
        let reservation = self.components.reserve();
 
        return CompReserved{ reservation };
 
    }
 

	
 
    pub(crate) fn finish_create_pdl_component(
 
        &self, reserved: CompReserved,
 
        component: CompPDL, mut context: CompCtx, initially_sleeping: bool,
 
    ) -> (CompKey, &mut RuntimeComp) {
 
        let inbox_queue = QueueDynMpsc::new(16);
 
        let inbox_producer = inbox_queue.producer();
 

	
 
        let _id = reserved.id();
 
        context.id = reserved.id();
 
        let component = RuntimeComp {
 
            public: CompPublic{
 
                sleeping: AtomicBool::new(initially_sleeping),
 
                num_handles: AtomicU32::new(1), // the component itself acts like a handle
 
                inbox: inbox_producer,
 
            },
 
            code: component,
 
            ctx: context,
 
            inbox: inbox_queue,
 
            exiting: false,
 
        };
 

	
 
        let index = self.components.submit(reserved.reservation, component);
 
        debug_assert_eq!(index, _id.0);
 
        let component = self.components.get_mut(index);
 

	
 
        return (CompKey(index), component);
 
    }
 

	
 
    pub(crate) fn get_component(&self, key: CompKey) -> &mut RuntimeComp {
 
        let component = self.components.get_mut(key.0);
 
        return component;
 
    }
 

	
 
    pub(crate) fn get_component_public(&self, id: CompId) -> CompHandle {
 
        let component = self.components.get(id.0);
 
        return CompHandle::new(id, &component.public);
 
    }
 

	
 
    pub(crate) fn destroy_component(&self, key: CompKey) {
 
        dbg_code!({
 
            let component = self.get_component(key);
 
            debug_assert!(component.exiting);
 
            debug_assert_eq!(component.public.num_handles.load(Ordering::Acquire), 0);
 
        });
 
        self.decrement_active_components();
 
        self.components.destroy(key.0);
 
    }
 

	
 
    // Tracking number of active interfaces and the active components
 

	
 
    #[inline]
 
    fn increment_active_components(&self) {
 
        let _old_val = self.active_elements.fetch_add(1, Ordering::AcqRel);
 
        debug_assert!(_old_val > 0); // can only create a component from a API/component, so can never be 0.
 
    }
 

	
 
    fn decrement_active_components(&self) {
 
        let old_val = self.active_elements.fetch_sub(1, Ordering::AcqRel);
 
        debug_assert!(old_val > 0); // something wrong with incr/decr logic
 
        let new_val = old_val - 1;
 
        if new_val == 0 {
 
            // Just to be sure, in case the last thing that gets destroyed is an
 
            // API instead of a thread.
 
            let _lock = self.work_queue.lock();
 
            self.work_condvar.notify_all();
 
        }
 
    }
 
}
src/runtime2/scheduler.rs
Show inline comments
 
use std::sync::Arc;
 
use std::sync::atomic::Ordering;
 

	
 
use super::component::*;
 
use super::runtime::*;
 

	
 
/// Data associated with a scheduler thread
 
pub(crate) struct Scheduler {
 
    runtime: Arc<RuntimeInner>,
 
    scheduler_id: u32,
 
    debug_logging: bool,
 
}
 

	
 
pub(crate) struct SchedulerCtx<'a> {
 
    pub runtime: &'a RuntimeInner,
 
    pub id: u32,
 
    pub comp: u32,
 
    pub logging_enabled: bool,
 
}
 

	
 
impl<'a> SchedulerCtx<'a> {
 
    pub fn new(runtime: &'a RuntimeInner, id: u32, logging_enabled: bool) -> Self {
 
        return Self {
 
            runtime,
 
            id,
 
            comp: 0,
 
            logging_enabled,
 
        }
 
    }
 

	
 
    pub(crate) fn log(&self, text: &str) {
 
        if self.logging_enabled {
 
            println!("[s:{:02}, c:{:03}] {}", self.id, self.comp, text);
 
        }
 
    }
 

	
 
    // TODO: Obviously remove, but useful for testing
 
    pub(crate) fn log_special(&self, text: &str) {
 
        if self.logging_enabled {
 
            println!("[s:{:02}, c:{:03}] *** *** {}", self.id, self.comp, text);
 
        }
 
    }
 
}
 

	
 
impl Scheduler {
 
    // public interface to thread
 

	
 
    pub fn new(runtime: Arc<RuntimeInner>, scheduler_id: u32, debug_logging: bool) -> Self {
 
        return Scheduler{ runtime, scheduler_id, debug_logging }
 
    }
 

	
 
    pub fn run(&mut self) {
 
        let mut scheduler_ctx = SchedulerCtx::new(&*self.runtime, self.scheduler_id, self.debug_logging);
 

	
 
        'run_loop: loop {
 
            // Wait until we have something to do (or need to quit)
 
            let comp_key = self.runtime.take_work();
 
            if comp_key.is_none() {
 
                break 'run_loop;
 
            }
 

	
 
            let comp_key = comp_key.unwrap();
 
            let component = self.runtime.get_component(comp_key);
 
            scheduler_ctx.comp = comp_key.0;
 

	
 
            // Run the component until it no longer indicates that it needs to
 
            // be re-executed immediately.
 
            let mut new_scheduling = CompScheduling::Immediate;
 
            while let CompScheduling::Immediate = new_scheduling {
 
                while let Some(message) = component.inbox.pop() {
 
                    component.code.handle_message(&mut scheduler_ctx, &mut component.ctx, message);
 
                }
 
                new_scheduling = component.code.run(&mut scheduler_ctx, &mut component.ctx).expect("TODO: Handle error");
 
            }
 

	
 
            // Handle the new scheduling
 
            match new_scheduling {
 
                CompScheduling::Immediate => unreachable!(),
 
                CompScheduling::Requeue => { self.runtime.enqueue_work(comp_key); },
 
                CompScheduling::Sleep => { self.mark_component_as_sleeping(comp_key, component); },
 
                CompScheduling::Exit => { self.mark_component_as_exiting(&scheduler_ctx, component); }
 
            }
 
        }
 
    }
 

	
 
    // local utilities
 

	
 
    /// Marks component as sleeping, if after marking itself as sleeping the
 
    /// inbox contains messages then the component will be immediately
 
    /// rescheduled. After calling this function the component should not be
 
    /// executed anymore.
 
    fn mark_component_as_sleeping(&self, key: CompKey, component: &mut RuntimeComp) {
 
        debug_assert_eq!(key.downgrade(), component.ctx.id); // make sure component matches key
 
        debug_assert_eq!(component.public.sleeping.load(Ordering::Acquire), false); // we're executing it, so it cannot be sleeping
 

	
 
        component.public.sleeping.store(true, Ordering::Release);
 
        if component.inbox.can_pop() {
 
            let should_reschedule = component.public.sleeping
 
                .compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed)
 
                .is_ok();
 

	
 
            if should_reschedule {
 
                self.runtime.enqueue_work(key);
 
            }
 
        }
 
    }
 

	
 
    /// Marks the component as exiting by removing the reference it holds to
 
    /// itself. Afterward the component will enter "normal" sleeping mode (if it
 
    /// has not yet been destroyed)
 
    fn mark_component_as_exiting(&self, sched_ctx: &SchedulerCtx, component: &mut RuntimeComp) {
 
        // If we didn't yet decrement our reference count, do so now
 
        let comp_key = unsafe{ component.ctx.id.upgrade() };
 

	
 
        if !component.exiting {
 
            component.exiting = true;
 

	
 
            let old_count = component.public.num_handles.fetch_sub(1, Ordering::AcqRel);
 
            let new_count = old_count - 1;
 
            if new_count == 0 {
 
                sched_ctx.runtime.destroy_component(comp_key);
 
                return;
 
            }
 
        }
 

	
 
        // Enter "regular" sleeping mode
 
        self.mark_component_as_sleeping(comp_key, component);
 
    }
 
}
 
\ No newline at end of file
src/runtime2/store/component.rs
Show inline comments
 
/*
 
 * Component Store
 
 *
 
 * Concurrent datastructure for creating/destroying/retrieving components using
 
 * their ID. It is essentially a variation on a concurrent freelist. We store an
 
 * array of (potentially null) pointers to data. Indices into this array that
 
 * are unused (but may be left allocated) are in a freelist. So creating a new
 
 * bit of data involves taking an index from this freelist. Destruction involves
 
 * putting the index back.
 
 *
 
 * This datastructure takes care of the threadsafe implementation of the
 
 * freelist and calling the data's destructor when needed. Note that it is not
 
 * completely safe (in Rust's sense of the word) because it is possible to
 
 * get more than one mutable reference to a piece of data. Likewise it is
 
 * possible to put back bogus indices into the freelist, which will destroy the
 
 * integrity of the datastructure.
 
 *
 
 * Some underlying assumptions that led to this design (note that I haven't
 
 * actually checked these conditions or performed any real profiling, yet):
 
 *  - Resizing the freelist should be very rare. The datastructure should grow
 
 *    to some kind of maximum size and stay at that size.
 
 *  - Creation should (preferably) be faster than deletion of data. Reason being
 
 *    that creation implies we're creating a component that has code to be
 
 *    executed. Better to quickly be able to execute code than being able to
 
 *    quickly tear down finished components.
 
 *  - Retrieval is much more likely than creation/destruction.
 
 *
 
 * Some obvious flaws with this implementation:
 
 *  - Because of the freelist implementation we will generally allocate all of
 
 *    the data pointers that are available (i.e. if we have a buffer of size
 
 *    64, but we generally use 33 elements, than we'll have 64 elements
 
 *    allocated), which might be wasteful at larger array sizes (which are
 
 *    always powers of two).
 
 *  - A lot of concurrent operations are not necessary: we may move some of the
 
 *    access to the global concurrent datastructure by an initial access to some
 
 *    kind of thread-local datastructure.
 
 */
 

	
 
use std::mem::transmute;
 
use std::alloc::{dealloc, Layout};
 
use std::ptr;
 
use std::sync::atomic::{AtomicUsize, Ordering};
 

	
 
use super::unfair_se_lock::{UnfairSeLock, UnfairSeLockSharedGuard};
 

	
 
/// Generic store of components. Essentially a resizable freelist (implemented
 
/// as a ringbuffer) combined with an array of actual elements.
 
pub struct ComponentStore<T: Sized> {
 
    inner: UnfairSeLock<Inner<T>>,
 
    read_head: AtomicUsize,
 
    write_head: AtomicUsize,
 
    limit_head: AtomicUsize,
 
}
 

	
 
unsafe impl<T: Sized> Send for ComponentStore<T>{}
 
unsafe impl<T: Sized> Sync for ComponentStore<T>{}
 

	
 
/// Contents of the `ComponentStore` that require a shared/exclusive locking
 
/// mechanism for consistency.
 
struct Inner<T: Sized> {
 
    freelist: Vec<u32>,
 
    data: Vec<*mut T>,
 
    size: usize,
 
    compare_mask: usize,
 
    index_mask: usize,
 
}
 

	
 
type InnerShared<'a, T> = UnfairSeLockSharedGuard<'a, Inner<T>>;
 

	
 
/// Reservation of a slot in the component store. Corresponds to the case where
 
/// an index has been taken from the freelist, but the element has not yet been
 
/// initialized
 
pub struct ComponentReservation {
 
    pub(crate) index: u32,
 
    #[cfg(debug_assertions)] submitted: bool,
 
}
 

	
 
impl ComponentReservation {
 
    fn new(index: u32) -> Self {
 
        return Self{
 
            index,
 
            #[cfg(debug_assertions)] submitted: false,
 
        }
 
    }
 
}
 

	
 
impl Drop for ComponentReservation {
 
    fn drop(&mut self) {
 
        debug_assert!(self.submitted);
 
        dbg_code!( assert!(self.submitted) );
 
    }
 
}
 

	
 
impl<T: Sized> ComponentStore<T> {
 
    pub fn new(initial_size: usize) -> Self {
 
        Self::assert_valid_size(initial_size);
 

	
 
        // Fill initial freelist and preallocate data array
 
        let mut initial_freelist = Vec::with_capacity(initial_size);
 
        for idx in 0..initial_size {
 
            initial_freelist.push(idx as u32)
 
        }
 

	
 
        let mut initial_data = Vec::new();
 
        initial_data.resize(initial_size, ptr::null_mut());
 

	
 
        // Return initial store
 
        return Self{
 
            inner: UnfairSeLock::new(Inner{
 
                freelist: initial_freelist,
 
                data: initial_data,
 
                size: initial_size,
 
                compare_mask: 2*initial_size - 1,
 
                index_mask: initial_size - 1,
 
            }),
 
            read_head: AtomicUsize::new(0),
 
            write_head: AtomicUsize::new(initial_size),
 
            limit_head: AtomicUsize::new(initial_size),
 
        };
 
    }
 

	
 
    /// Creates a new element initialized to the provided `value`. This returns
 
    /// the index at which the element can be retrieved.
 
    pub fn create(&self, value: T) -> u32 {
 
        let lock = self.inner.lock_shared();
 
        let (lock, index) = self.pop_freelist_index(lock);
 
        Self::initialize_at_index(lock, index, value);
 
        return index;
 
    }
 

	
 
    pub fn reserve(&self) -> ComponentReservation {
 
        let lock = self.inner.lock_shared();
 
        let (_lock, index) = self.pop_freelist_index(lock);
 
        return ComponentReservation::new(index);
 
    }
 

	
 
    pub fn submit(&self, mut reservation: ComponentReservation, value: T) -> u32 {
 
        dbg_code!({ reservation.submitted = true; });
 
        let lock = self.inner.lock_shared();
 
        Self::initialize_at_index(lock, reservation.index, value);
 
        return reservation.index;
 
    }
 

	
 
    /// Destroys an element at the provided `index`. The caller must make sure
 
    /// that it does not use any previously received references to the data at
 
    /// this index, and that no more calls to `get` are performed using this
 
    /// index. This is allowed again if the index has been reacquired using
 
    /// `create`.
 
    pub fn destroy(&self, index: u32) {
 
        let lock = self.inner.lock_shared();
 
        self.destruct_at_index(&lock, index);
 
        self.push_freelist_index(&lock, index);
 
    }
 

	
 
    /// Retrieves an element by reference
 
    pub fn get(&self, index: u32) -> &T {
 
        let lock = self.inner.lock_shared();
 
        let value = lock.data[index as usize];
 
        unsafe {
 
            debug_assert!(!value.is_null());
 
            return &*value;
 
        }
 
    }
 

	
 
    /// Retrieves an element by mutable reference. The caller should ensure that
 
    /// use of that mutability is thread-safe
 
    pub fn get_mut(&self, index: u32) -> &mut T {
 
        let lock = self.inner.lock_shared();
 
        let value = lock.data[index as usize];
 
        unsafe {
 
            debug_assert!(!value.is_null());
 
            return &mut *value;
 
        }
 
    }
 

	
 
    #[inline]
 
    fn pop_freelist_index<'a>(&'a self, mut shared_lock: InnerShared<'a, T>) -> (InnerShared<'a, T>, u32) {
 
        'attempt_read: loop {
 
            // Load indices and check for reallocation condition
 
            let current_size = shared_lock.size;
 
            let mut read_index = self.read_head.load(Ordering::Relaxed);
 
            let limit_index = self.limit_head.load(Ordering::Acquire);
 

	
 
            if read_index == limit_index {
 
                shared_lock = self.reallocate(current_size, shared_lock);
 
                continue 'attempt_read;
 
            }
 

	
 
            loop {
 
                let preemptive_read = shared_lock.freelist[read_index & shared_lock.index_mask];
 
                if let Err(actual_read_index) = self.read_head.compare_exchange(
 
                if let Err(_actual_read_index) = self.read_head.compare_exchange(
 
                    read_index, (read_index + 1) & shared_lock.compare_mask,
 
                    Ordering::AcqRel, Ordering::Acquire
 
                ) {
 
                    // We need to try again
 
                    read_index = actual_read_index;
 
                    // TODO: Fix this update loop at some point. When update
 
                    //  loop is disabled, popping the freelist index is not
 
                    //  reliable.
 
                    // read_index = actual_read_index;
 
                    continue 'attempt_read;
 
                }
 

	
 
                // If here then we performed the read
 
                return (shared_lock, preemptive_read);
 
            }
 
        }
 
    }
 

	
 
    #[inline]
 
    fn initialize_at_index(read_lock: InnerShared<T>, index: u32, value: T) {
 
        let mut target_ptr = read_lock.data[index as usize];
 

	
 
        unsafe {
 
            if target_ptr.is_null() {
 
                let layout = Layout::for_value(&value);
 
                target_ptr = std::alloc::alloc(layout).cast();
 
                let rewrite: *mut *mut T = transmute(read_lock.data.as_ptr());
 
                *rewrite.add(index as usize) = target_ptr;
 
            }
 

	
 
            std::ptr::write(target_ptr, value);
 
        }
 
    }
 

	
 
    #[inline]
 
    fn push_freelist_index(&self, read_lock: &InnerShared<T>, index_to_put_back: u32) {
 
        // Acquire an index in the freelist to which we can write
 
        let mut cur_write_index = self.write_head.load(Ordering::Relaxed);
 
        let mut new_write_index = (cur_write_index + 1) & read_lock.compare_mask;
 
        while let Err(actual_write_index) = self.write_head.compare_exchange(
 
            cur_write_index, new_write_index,
 
            Ordering::AcqRel, Ordering::Acquire
 
        ) {
 
            cur_write_index = actual_write_index;
 
            new_write_index = (cur_write_index + 1) & read_lock.compare_mask;
 
        }
 

	
 
        // We own the data at the index, write to it and notify reader through
 
        // limit_head that it can be read from. Note that we cheat around the
 
        // rust mutability system here :)
 
        unsafe {
 
            let target: *mut u32 = transmute(read_lock.freelist.as_ptr());
 
            *(target.add(cur_write_index & read_lock.index_mask)) = index_to_put_back;
 
        }
 

	
 
        // Essentially spinlocking, relaxed failure ordering because the logic
 
        // is that a write first moves the `write_head`, then the `limit_head`.
 
        while let Err(_) = self.limit_head.compare_exchange(
 
            cur_write_index, new_write_index,
 
            Ordering::AcqRel, Ordering::Relaxed
 
        ) {};
 
    }
 

	
 
    #[inline]
 
    fn destruct_at_index(&self, read_lock: &InnerShared<T>, index: u32) {
 
        let target_ptr = read_lock.data[index as usize];
 
        unsafe{ ptr::drop_in_place(target_ptr); }
 
    }
 

	
 
    // NOTE: Bit of a mess, and could have a cleanup with better logic for the
 
    // resizing. Maybe even a different indexing scheme...
 
    fn reallocate(&self, old_size: usize, inner: InnerShared<T>) -> InnerShared<T> {
 
        drop(inner);
 
        {
 
            // After dropping read lock, acquire write lock
 
            let mut lock = self.inner.lock_exclusive();
 

	
 
            if old_size == lock.size {
 
                // We are the thread that is supposed to reallocate
 
                let new_size = old_size * 2;
 
                Self::assert_valid_size(new_size);
 

	
 
                // Note that the atomic indices are in the range [0, new_size)
 
                // already, so we need to be careful
 
                let new_index_mask = new_size - 1;
 
                let new_compare_mask = (2 * new_size) - 1;
 
                lock.data.resize(new_size, ptr::null_mut());
 
                lock.freelist.resize(new_size, 0);
 
                for idx in 0..old_size {
 
                    lock.freelist[old_size + idx] = lock.freelist[idx];
 
                }
 

	
 
                // We need to fill the freelist with the indices of all of the
 
                // new elements that we have just created.
 
                debug_assert_eq!(self.limit_head.load(Ordering::SeqCst), self.write_head.load(Ordering::SeqCst));
 
                let old_read_index = self.read_head.load(Ordering::SeqCst);
 
                let old_write_index = self.write_head.load(Ordering::SeqCst);
 

	
 
                if old_read_index > old_write_index {
 
                    // Read index wraps, so keep it as-is and fill
 
                    let new_read_index = old_read_index + old_size;
 
                    for index in 0..old_size {
 
                        let target_idx = (new_read_index + index) & new_index_mask;
 
                        lock.freelist[target_idx] = (old_size + index) as u32;
 
                    }
 

	
 
                    self.read_head.store(new_read_index, Ordering::SeqCst);
 
                    debug_assert!(new_read_index < 2*new_size);
 
                    debug_assert!(old_write_index.wrapping_sub(new_read_index) & new_compare_mask <= new_size);
 
                } else {
 
                    // No wrapping, so increment write index
 
                    let new_write_index = old_write_index + old_size;
 
                    for index in 0..old_size {
 
                        let target_idx = (old_write_index + index) & new_index_mask;
 
                        lock.freelist[target_idx] = (old_size + index) as u32;
 
                    }
 

	
 
                    // Update write/limit heads
 
                    self.write_head.store(new_write_index, Ordering::SeqCst);
 
                    self.limit_head.store(new_write_index, Ordering::SeqCst);
 
                    debug_assert!(new_write_index < 2*new_size);
 
                    debug_assert!(new_write_index.wrapping_sub(old_read_index) & new_compare_mask <= new_size);
 
                }
 

	
 
                // Update sizes and masks
 
                lock.size = new_size;
 
                lock.compare_mask = new_compare_mask;
 
                lock.index_mask = new_index_mask;
 
            } // else: someone else allocated, so we don't have to
 
        }
 

	
 
        // We've dropped the write lock, acquire the read lock again
 
        return self.inner.lock_shared();
 
    }
 

	
 
    #[inline]
 
    fn assert_valid_size(size: usize) {
 
        // Condition the size needs to adhere to. Some are a bit excessive, but
 
        // we don't hit this check very often
 
        assert!(
 
            size.is_power_of_two() &&
 
                size >= 4 &&
 
                size <= usize::MAX / 2 &&
 
                size <= u32::MAX as usize
 
        );
 
    }
 
}
 

	
 
impl<T: Sized> Drop for ComponentStore<T> {
 
    fn drop(&mut self) {
 
        let value_layout = Layout::from_size_align(
 
            std::mem::size_of::<T>(), std::mem::align_of::<T>()
 
        ).unwrap();
 

	
 
        // Note that if the indices exist in the freelist then the destructor
 
        // has already been called. So handle them first
 
        let mut lock = self.inner.lock_exclusive();
 

	
 
        let read_index = self.read_head.load(Ordering::Acquire);
 
        let write_index = self.write_head.load(Ordering::Acquire);
 
        debug_assert_eq!(write_index, self.limit_head.load(Ordering::Acquire));
 

	
 
        let mut index = read_index;
 
        while index != write_index {
 
            let dealloc_index = lock.freelist[index & lock.index_mask] as usize;
 
            let target_ptr = lock.data[dealloc_index];
 

	
 
            unsafe {
 
                dealloc(target_ptr.cast(), value_layout);
 
                lock.data[dealloc_index] = ptr::null_mut();
 
            }
 

	
 
            index += 1;
 
            index &= lock.compare_mask;
 
        }
 

	
 
        // With all of those set to null, we'll just iterate through all
 
        // pointers and destruct+deallocate the ones not set to null yet
 
        for target_ptr in lock.data.iter().copied() {
 
            if !target_ptr.is_null() {
 
                unsafe {
 
                    ptr::drop_in_place(target_ptr);
 
                    dealloc(target_ptr.cast(), value_layout);
 
                }
 
            }
 
        }
 
    }
 
}
 

	
 
#[cfg(test)]
 
mod tests {
 
    use super::*;
 
    use super::super::tests::*;
 

	
 
    use rand::prelude::*;
 
    use rand_pcg::Pcg32;
 

	
 
    fn seeds() -> Vec<[u8;16]> {
 
        return vec![
 
            [241, 47, 70, 87, 240, 246, 20, 173, 219, 143, 74, 23, 158, 58, 205, 172],
 
            [178, 112, 230, 205, 230, 178, 2, 90, 162, 218, 49, 196, 224, 222, 208, 43],
src/runtime2/tests/mod.rs
Show inline comments
 
use crate::protocol::*;
 
use crate::protocol::eval::*;
 
use crate::runtime2::runtime::*;
 
use crate::runtime2::component::{CompCtx, CompPDL};
 

	
 
fn create_component(rt: &Runtime, module_name: &str, routine_name: &str, args: ValueGroup) {
 
    let prompt = rt.inner.protocol.new_component(
 
        module_name.as_bytes(), routine_name.as_bytes(), args
 
    ).expect("create prompt");
 
    let reserved = rt.inner.start_create_pdl_component();
 
    let ctx = CompCtx::new(&reserved);
 
    let (key, _) = rt.inner.finish_create_pdl_component(reserved, CompPDL::new(prompt, 0), ctx, false);
 
    rt.inner.enqueue_work(key);
 
}
 

	
 
fn no_args() -> ValueGroup { ValueGroup::new_stack(Vec::new()) }
 

	
 
#[test]
 
fn test_component_creation() {
 
    let pd = ProtocolDescription::parse(b"
 
    primitive nothing_at_all() {
 
        s32 a = 5;
 
        auto b = 5 + a;
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(1, true, pd);
 

	
 
    for _i in 0..20 {
 
        create_component(&rt, "", "nothing_at_all", no_args());
 
    }
 
}
 

	
 
#[test]
 
fn test_component_communication() {
 
    let pd = ProtocolDescription::parse(b"
 
    primitive sender(out<u32> o, u32 outside_loops, u32 inside_loops) {
 
        u32 outside_index = 0;
 
        while (outside_index < outside_loops) {
 
            u32 inside_index = 0;
 
            sync while (inside_index < inside_loops) {
 
                put(o, inside_index);
 
                inside_index += 1;
 
            }
 
            outside_index += 1;
 
        }
 
    }
 

	
 
    primitive receiver(in<u32> i, u32 outside_loops, u32 inside_loops) {
 
        u32 outside_index = 0;
 
        while (outside_index < outside_loops) {
 
            u32 inside_index = 0;
 
            sync while (inside_index < inside_loops) {
 
                auto val = get(i);
 
                while (val != inside_index) {} // infinite loop if incorrect value is received
 
                inside_index += 1;
 
            }
 
            outside_index += 1;
 
        }
 
    }
 

	
 
    composite constructor() {
 
        channel o_orom -> i_orom;
 
        channel o_mrom -> i_mrom;
 
        channel o_ormm -> i_ormm;
 
        channel o_mrmm -> i_mrmm;
 

	
 
        // one round, one message per round
 
        new sender(o_orom, 1, 1);
 
        new receiver(i_orom, 1, 1);
 

	
 
        // multiple rounds, one message per round
 
        new sender(o_mrom, 5, 1);
 
        new receiver(i_mrom, 5, 1);
 

	
 
        // one round, multiple messages per round
 
        new sender(o_ormm, 1, 5);
 
        new receiver(i_ormm, 1, 5);
 

	
 
        // multiple rounds, multiple messages per round
 
        new sender(o_mrmm, 5, 5);
 
        new receiver(i_mrmm, 5, 5);
 
    }").expect("compilation");
 
    let rt = Runtime::new(3, true, pd);
 
    create_component(&rt, "", "constructor", no_args());
 
}
 

	
 
#[test]
 
fn test_intermediate_messenger() {
 
    let pd = ProtocolDescription::parse(b"
 
    primitive receiver<T>(in<T> rx, u32 num) {
 
        auto index = 0;
 
        while (index < num) {
 
            sync { auto v = get(rx); }
 
            index += 1;
 
        }
 
    }
 

	
 
    primitive middleman<T>(in<T> rx, out<T> tx, u32 num) {
 
        auto index = 0;
 
        while (index < num) {
 
            sync { put(tx, get(rx)); }
 
            index += 1;
 
        }
 
    }
 

	
 
    primitive sender<T>(out<T> tx, u32 num) {
 
        auto index = 0;
 
        while (index < num) {
 
            sync put(tx, 1337);
 
            index += 1;
 
        }
 
    }
 

	
 
    composite constructor_template<T>() {
 
        auto num = 0;
 
        channel<T> tx_a -> rx_a;
 
        channel tx_b -> rx_b;
 
        new sender(tx_a, 3);
 
        new middleman(rx_a, tx_b, 3);
 
        new receiver(rx_b, 3);
 
    }
 

	
 
    composite constructor() {
 
        new constructor_template<u16>();
 
        new constructor_template<u32>();
 
        new constructor_template<u64>();
 
        new constructor_template<s16>();
 
        new constructor_template<s32>();
 
        new constructor_template<s64>();
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(3, true, pd);
 
    create_component(&rt, "", "constructor", no_args());
 
}
 

	
 
#[test]
 
fn test_simple_select() {
 
    let pd = ProtocolDescription::parse(b"
 
    func infinite_assert<T>(T val, T expected) -> () {
 
        while (val != expected) { print(\"nope!\"); }
 
        return ();
 
    }
 

	
 
    primitive receiver(in<u32> in_a, in<u32> in_b, u32 num_sends) {
 
        auto num_from_a = 0;
 
        auto num_from_b = 0;
 
        while (num_from_a + num_from_b < 2 * num_sends) {
 
            sync select {
 
                auto v = get(in_a) -> {
 
                    print(\"got something from A\");
 
                    auto _ = infinite_assert(v, num_from_a);
 
                    num_from_a += 1;
 
                }
 
                auto v = get(in_b) -> {
 
                    print(\"got something from B\");
 
                    auto _ = infinite_assert(v, num_from_b);
 
                    num_from_b += 1;
 
                }
 
            }
 
        }
 
    }
 

	
 
    primitive sender(out<u32> tx, u32 num_sends) {
 
        auto index = 0;
 
        while (index < num_sends) {
 
            sync {
 
                put(tx, index);
 
                index += 1;
 
            }
 
        }
 
    }
 

	
 
    composite constructor() {
 
        auto num_sends = 15;
 
        channel tx_a -> rx_a;
 
        channel tx_b -> rx_b;
 
        new sender(tx_a, num_sends);
 
        new receiver(rx_a, rx_b, num_sends);
 
        new sender(tx_b, num_sends);
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(3, false, pd);
 
    create_component(&rt, "", "constructor", no_args());
 
}
 

	
 
#[test]
 
fn test_unguarded_select() {
 
    let pd = ProtocolDescription::parse(b"
 
    primitive constructor_outside_select() {
 
        u32 index = 0;
 
        while (index < 5) {
 
            sync select { auto v = () -> print(\"hello\"); }
 
            index += 1;
 
        }
 
    }
 

	
 
    primitive constructor_inside_select() {
 
        u32 index = 0;
 
        while (index < 5) {
 
            sync select { auto v = () -> index += 1; }
 
        }
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(3, false, pd);
 
    create_component(&rt, "", "constructor_outside_select", no_args());
 
    create_component(&rt, "", "constructor_inside_select", no_args());
 
}
 

	
 
#[test]
 
fn test_empty_select() {
 
    let pd = ProtocolDescription::parse(b"
 
    primitive constructor() {
 
        u32 index = 0;
 
        while (index < 5) {
 
            sync select {}
 
            index += 1;
 
        }
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(3, false, pd);
 
    create_component(&rt, "", "constructor", no_args());
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)