Changeset - 97217a7b2d18
bin-compiler/src/main.rs
Show inline comments
 
@@ -85,48 +85,49 @@ fn main() {
 
    assert!(input_files.len() > 0); // because arg is required
 

	
 
    let mut builder = rw::ProtocolDescriptionBuilder::new(standard_library_dir)
 
        .expect("create protocol description builder");
 
    let mut file_buffer = Vec::with_capacity(4096);
 

	
 
    for input_file in input_files {
 
        print!("Adding file: {} ... ", input_file);
 
        let mut file = match File::open(input_file) {
 
            Ok(file) => file,
 
            Err(err) => {
 
                println!("FAILED (to open file)\nbecause:\n{}", err);
 
                return;
 
            }
 
        };
 

	
 
        file_buffer.clear();
 
        if let Err(err) = file.read_to_end(&mut file_buffer) {
 
            println!("FAILED (to read file)\nbecause:\n{}", err);
 
            return;
 
        }
 

	
 
        if let Err(err) = builder.add(input_file.to_string(), file_buffer.clone()) {
 
            println!("FAILED (to tokenize file)\nbecause:\n{}", err);
 
            return;
 
        }
 

	
 
        println!("Success");
 
    }
 

	
 
    // Compile the program
 
    print!("Compiling program ... ");
 
    let protocol_description = match builder.compile() {
 
        Ok(pd) => pd,
 
        Err(err) => {
 
            println!("FAILED\nbecause:\n{}", err);
 
            return;
 
        }
 
    };
 

	
 
    println!("Success");
 

	
 
    // Start runtime
 
    print!("Startup of runtime ... ");
 
    let runtime = rw::runtime2::Runtime::new(num_threads, log_level, protocol_description);
 
    if let Err(err) = &runtime {
 
        println!("FAILED\nbecause:\n{}", err);
 
    }
 
    println!("Success");
docs/runtime/01_runtime.md
Show inline comments
 
@@ -190,66 +190,84 @@ comp some_component(
 
   }
 
}
 
```
 

	
 
To facilitate this, we'll follow roughly the same procedure as when we're transferring ports to a newly created component. But we have one complication: we do not have direct access to the private memory of the component we're sending the ports to (we'll call this component the "adopting component", and the sending component the "relinquishing component"). And so we'll have to follow a control protocol that is slightly different.
 

	
 
Note that it is perfectly okay to send closed ports. The adopting component will receive this component together with the information that the port is closed. In this way, if the adopting component attempts a `put` or `get` on that received component, it will crash.
 

	
 
We'll enforce a second rule upon transferring ports. Namely that ports transferred in a synchronous round may not have been used in `get` or `put` operations. I'm certain that it is possible to come up with a set of rules that will make this possible. But the protocol for transferring components over channels is a lot easier if we disallow this. For this reason we'll introduce a field in the metadata for each port that registers when the port was last used. If the relinquishing component attempts to transfer a port that has been used within the same sync round, then it will crash.
 

	
 
Like before we want to ensure that all messages intended for the transferred port arrive in the correct order at the adopting component.
 

	
 
And so the control protocol for transmitting ports proceeds as following:
 

	
 
1. The relinquishing component will first make sure that none of the ports are blocked. If the ports are blocked then it will sleep until the ports become unblocked. As stated above the relinquishing component will also make sure that the ports were not previously used within the synchronous round.
 
2. The relinquishing component will send `PortPeerChanged_Block` message to all of the peers of the ports that will be transferred. However, in this case it will not relay any messages to the new component, they will still pile up in the relinquishing component's inbox.
 
3. The peers, upon receiving the `PortPeerChanged_Block` message, will proceed as they would in the case where ports were transferred to a new component: they'll block the port and send an `Ack`.
 
4. The relinquishing component will wait until all of the expected `Ack` message are received. Once they are received the component will wait until the port the message will travel through becomes unblocked (that is: the port that is used to transfer the ports to the adopting component).
 
5. The relinquishing component will send the data message containing the transferred ports to the adopting component. It will annotate this message with a list containing `(tranferred port ID, peer component ID, peer port ID)` triples. Note that since those peer ports are blocked, they will not be transferred in the meantime. This is essential for the next step.
 
6. The adopting component will receive the annotated data message containing the transferred ports. For each transferred port it will decide upon a new port ID.
 
7. The adopting component will, for each adopted port, send out a `PortPeerChanged_Unblock` message to the blocked peer ports. This message will be annotated with the `(adopting component ID, new port ID)` pairs. Such that the peers all know where the peers can be found.
 

	
 
## Dealing with Crashing Components
 

	
 
### The cases in which peers crash in response
 

	
 
A component may at any point during its execution be triggered to crash. This may be because of something simple like an out-of-bounds array access. But as described above using closed ports may lead to such an event as well. In such a case we not only need to go through the `ClosePort` control protocol, to make sure that we can remove the crashing component's memory from the runtime, but we'll also have to make sure that all of the peers are aware that *their* peer has crashed. Here we'll make a design decision: if a peer component crashes during a synchronous round and there were interactions with that component, then that interacting component should crash as well. The exact reasons will be introduced later, but it comes down to the fact that we need to do something about the fact that the synchronous round will never be able to complete.
 

	
 
We'll talk ourselves through the case of a component crashing before coming up with the control algorithm to deal with components crashing.
 

	
 
We'll first consider that a component may crash inside our outside of a synchronous block. From the point of view of the peer component, we'll have four cases to consider:
 
We'll first consider that a component may crash inside or outside of a synchronous block. From the point of view of the peer component, we'll have four cases to consider:
 

	
 
1. The peer component is not in a synchronous block. 
 
2. The crashing component died before the peer component entered the synchronous block.
 
3. The crashing component died during the same synchronous block as the peer component.
 
4. The crashing component died after reaching consensus on the synchronous block that the peer component is currently still in.
 

	
 
Before discussing these cases, it is important to remember that the entire runtime has components running in their own thread of execution. We may have that the crashing component is unaware of its peers (due to the fact that peer ports might change ownership at any point in time). We'll discuss the consensus algorithm in more detail later within the documentation. For now it is important to note that the components will discover the synchronous region they are part of while the PDL code is executing. So if a component crashes within a synchronous region before the end of the sync block is reached, it may be possible that it will not discover the full synchronous region it would be part of.
 

	
 
Because the crashing component is potentially unaware of the component IDs it will end up notifying that is has failed, we can not design the crash-handling algorithm in such a way such that the crashing component notifies the peers of when they have to crash. We'll do the opposite: the crashing component simply crashes and somehow attempts to notify the peers. Those peers themselves decide whether they have to crash in response to such a notification.
 
Because the crashing component is potentially unaware of the component IDs it will end up notifying that it has failed, we can not design the crash-handling algorithm in such a way such that the crashing component notifies the peers of when they have to crash. We'll do the opposite: the crashing component simply crashes and somehow attempts to notify the peers. Those peers themselves decide whether they have to crash in response to such a notification.
 

	
 
For this reason, it does not make a lot of sense to deal with component failure through the consensus algorithm. Dealing with the failure through the consensus algorithm only makes sense if we can find the synchronous region that we would have discovered if we were able to fully execute the sync block of each participating component. As explained above: we can't, and so we'll opt to deal with failure on a peer-by-peer basis.
 

	
 
We'll go back to the four cases we've discusses above. We'll change our point of view: we're now considering a component (the "handling component") that has to deal with the failure of a peer (the "crashing component"). We'll introduce a small part of our solution a-priori: like a component shutting down, a failing component will simply end its life by broadcasting `ClosePort` message over all of its owned ports that are not closed (and the failing component will also wait until all of those ports are not blocked).
 
We'll go back to the four cases we've discusses above. We'll change our point of view: we're now considering a component (the "handling component") that has to deal with the failure of a peer (the "crashing component"). We'll introduce a small part of our solution a-priori: like a component shutting down, a failing component will simply end its life by broadcasting `ClosePort` message over all of its owned ports that are not closed (and, like the other control algorithms. the failing component will wait for the port that is shutting down to become unblocked before it will send the `ClosePort` message).
 

	
 
In the first case, we're dealing with a failing component while the handling component is not in a synchronous block. This means that if there was a previous synchronous block, that it has succeeded. We might still have data messages in our inbox that were sent by the failing component. But in this case it is rather easy to deal with this: we mark the ports as closed, and if we end up using them in the next synchronous block, then we will crash ourselves.
 

	
 
In the second case we have that the peer component died before we ourselves have entered the synchronous block. This case is somewhat equivalent to the case we described above. The crashing component cannot have sent the handling component any messages. So we mark the port as closed, potentially failing in the future if they end up being used.
 
In the second case we have that the peer component died before we ourselves have entered the synchronous block. This case is somewhat equivalent to the case we described above. The crashing component cannot have sent the handling component any messages. So we mark the port as closed, potentially failing in the future if they end up being used. However, the handling component itself might've performed `put` operations already. So now that the handling component receives a `ClosePort` message, it realizes that those earlier `put` operations can never be acknowledged. For this reason a component stores when it last used a port in the metadata associated with a port. When, in this second case, a `ClosePort` message comes in while the port has been used already, the handling component should crash as well.
 

	
 
Next up is the third case, where both the crashing component and the handling component were both in the same synchronous round. Like before we mark the port as closed and future use will cause a crash. Like the second case, if the handling component has already used a port (which in this case may also be having received a message from the crashing component), then it should crash as well.
 

	
 
The fourth case is where the failing component crashes *after* the handling component finished its sync round. This is an edge cases dealing with the following situation: both the handling as the crashing component have submitted their local solution to the consensus algorithm (assumed to be running somewhere in a thread of execution different from the two components). The crashing component receives a global solution, finishes the sync round, and then crashes, therefore sending the `ClosePort` message to the handling component. The handling component, due to the asynchronous nature of the runtime, receives the `ClosePort` message before the global solution has a chance to reach the handling component. In this case, however, the handling component should be able to finish the synchronous round, and it shouldn't crash.
 

	
 
### Distinguishing the crashing cases
 

	
 
So far we've pretended like we could already determine the relation between the crashing component's synchronous round and the handling component's synchronous round. But in order to do this we need to add a bit of extra information to the `ClosePort` message.
 

	
 
Next up is the third case, where both the crashing component and the handling component were both in the same synchronous round. Like before we mark the port as closed and future use will cause a crash. The difference is that the handling component may be blocked on attempting to `get` from a port which the crashing component now indicates is closed, perhaps we might have already performed successful `get`/`put` operations. In that case the handling component should crash: the crashing component can never submit its local solution, so the synchronous round can never succeed! And so we need to have metadata stored for the component that tracks if the port was used in the synchronous round. If the component is used in the synchronous round and a `ClosePort` message comes in, then the component should crash as well.
 
The simplest case is to determine if the two components are both in the same synchronous round (case three, as described above). The crashing component annotates the `ClosePort` message with whether it was in a synchronous round or not. Then if both components are in a synchronous round (as checking by the handling component), and the about-to-be-closed port at the handling component was used in that round, or will be used in that round, then the handling component should crash.
 

	
 
The fourth case is where the failing component crashes *after* the handling component finished its sync round. This is an edge cases dealing with the following situation: both the handling as the crashing component have submitted their local solution to the consensus algorithm. The crashing component receives a global solution, finishes the sync round, and then crashes, therefore sending the `ClosePort` message to the handling component. The handling component, due to the asynchronous nature of the runtime, receives the `ClosePort` message before the global solution has a chance to reach the handling component. In this case, however, the handling component should be able to finish the synchronous round, and it shouldn't crash.
 
Equally simple: the handling component can figure out itself if it is in a synchronous round (case one, as described above). If not: then the port is marked closed and future use causes crashes.
 

	
 
Here is where we arrive at the protocol for dealing with component crashes. To let the handling component deal with this last case, we'll let the crashing component send the `ClosePort` messages together with a boolean indicating if it crashed inside, or outside of a synchronous block.
 
The last two cases require a bit more work: how do we distinguish the edge case where the handling component's round will complete in the future, from the case where it should crash. To distinguish the edge case we need the handling component to know if the last interaction the crashing component handled was the one in the handling component's current synchronous round.
 

	
 
If the crashing component sends `ClosePort` together with the indication that it crashed inside of the synchronous block, then we're dealing with case 3 *if* there are messages in the inbox, or if the handling component uses the closed port after receiving the `ClosePort` message. But if the crashing component sends `ClosePort` together with the indication that it crashed outside of a synchronous block, then we check if we have performed any operations on the port in the synchronous round. If we have performed operations *and* have received messages from that component, then apparently the synchronous round will succeed. So we will not immediately crash. Otherwise we will.
 
For this reason we keep track of the synchronous round number. That is to say: there is a counter that increments each time a synchronous round completes for a component. We have a field in the metadata for a port that registers this round number. If a component performs a `put` operation, then it stores its own round number in that port's metadata, and sends this round number along with the message. If a component performs a `get` operation, then it stores the *received* round number in the port's metadata.
 

	
 
In this way we modify the control algorithm for terminating components. Now we're able to deal correctly with crashing components.
 
When a component closes a port, it will also send along the last registered round number in the `ClosePort` message. If the handling component receives a `ClosePort` message, and the last registered round number in the port's metadata matches the round number in the `ClosePort` message, and the crashing component was not in a synchronous round, then the crashing component crashed after the handling component's sync round. Hence: the handling component can complete its sync round.
 

	
 
To conclude: if we receive a `ClosePort` message, then we always mark the port as closed. If the handling and the crashing component were in a synchronous round, and the closed port was used in that synchronous round, then the handling component crashes as well. If the handling component *is* in a synchronous round but the crashing component *is not* in a synchronous round, the port of the handling component is used in the synchronous round and the port's last registered round number does not match the round number in the `ClosePort` message, then the handling component crashes as well.
 

	
 
## Sync Algorithm
 

	
 
A description of the synchronous algorithm is present in different documents. We will mention here that central to the consensus algorithm is that two components agree on the interactions that took place over a specific channel. In order for this to happen we'll send along a lot of metadata when trying to reach consensus, but here we're just concerned with attempting to match up the two ends of a channel. 
 

	
 
A port is identified by a `(component ID, port ID)` pair, and channel is a pair of those identifying pairs. So to match up the two ends of a channel we would have to find a consistent pair of ports that agree on who their peers are. However, we're dealing with the problem of eventual consistency: `put`ting ports never know who their peer is, because the sent message might be relayed. However, `get`ting ports *will* know who their peer is for the duration of a single synchronous round once they've received a single message.
 

	
 
This is the trick we will apply in the consensus algorithm. If a channel did not see any messages passing through it, then the components that own those ports will not have to reach consensus because they will not be part of the same synchronous region. However if a message did go through the channel then the components join the same synchronous region, and they'll have to form some sort of consensus on what interaction took place on that channel.
 

	
 
And so the `put`ting component will only submit its own `(component ID, port ID, metadata_for_sync_round)` triplet. The `get`ting port will submit information containing `(self component ID, self port ID, peer component ID, peer port ID, metadata_for_sync_round)`. The consensus algorithm can now figure out which two ports belong to the same channel.
 
\ No newline at end of file
 
And so the `put`ting component will only submit its own `(component ID, port ID, metadata_for_sync_round)` triplet. The `get`ting port will submit information containing `(self component ID, self port ID, peer component ID, peer port ID, metadata_for_sync_round)`. The consensus algorithm can now figure out which two ports belong to the same channel.
 

	
 
## Component Nomenclature
 

	
 
Earlier versions of the Reowolf runtime featured the distinction between primitive and composite components. This was put into the language from a design perspective. Primitive components could do nitty-gritty protocol execution: perform `put`/`get` operations, and entering into sync blocks. Conversely, composite components were tasked with setting up a network of interconnected components: creating channels and handing off the appropriate ports to the instantiated components.
 

	
 
Once the runtime was capable of sending ports over channels, it became apparent that this distinction no longer made sense. Because if only primitive components can send/receive ports, and cannot create new components, then the programmer is limited to using those received ports directly in the primitive's code! And so the split between primitive and composite components was removed: only the concept of a "component" is left.
 
\ No newline at end of file
docs/runtime/04_known_issues.md
Show inline comments
 
@@ -9,37 +9,41 @@ The current implementation of Reowolf has the following known issues:
 
    channel unused -> temporary;
 
    while (true) sync {
 
      if (get(which)) {
 
        temporary = tx1;
 
      } else {
 
        temporary = tx2;
 
      }
 
      put(temporary, 1);
 
    }
 
  }
 
  ```
 

	
 
  Another solution would be to use an empty array and to put a port inside of that. Hacks galore!
 

	
 
- Reserved memory for ports will grow without bounds: Ports can be given away from one component to another by creating a component, or by sending a message containing them. The component sending those ports cannot remove them from its own memory if there are still other references to the transferred port in its memory. This is because we want to throw a reasonable error if that transferred port is used by the original owner. Hence we need to keep some information about that transferred port in the sending component's memory. The solution is to have reference counting for the ports, but this is not implemented.
 

	
 
- An extra to the above statements: when transferring ports to a new component, the memory that remembers the state of that port is removed from the component that is creating the new one. Hence using old references to that port within the creating component's PDL code results in a crash.
 

	
 
- Some control algorithms are not robust under multithreading. Mainly error handling when in sync mode (because there needs to be a revision where we keep track of which components are still reachable by another component). And complicated scenarios where ports are transferred.
 

	
 
- There is an assertion in the interpreter that makes sure that there are no values left on the expression stack when a statement has completed. This is not true when you have an expression statement! If you want to remove this assertion make sure to clear the stack (using the method on the `Store`).
 

	
 
- The TCP listener component should probably do a `shutdown` before a `close` on the socket handle. It should also set the `SO_REUSEADDR` option.
 

	
 
- The TCP listener and TCP sender components have not been tested extensively in a multi-threaded setup.
 

	
 
- The way in which putting ports are ordered to block if the corresponding getter port's main inbox is full is rather silly. This led to the introduction of the "backup inbox" as it is found in the runtime's code. There is a design decision to make here, but the current implementation is a bit silly. There are two options: (a) have an atomic boolean indicating if the message slot for an inbox is full, or (b) do away with the "main inbox" alltogether, and have an unbounded message queue.
 

	
 
- For practical use in components whose code supports an arbitrary number of peers (i.e. their code contains an array of ports that is used for communication and changes in size during the component's lifetime), the `select` statement somehow needs to support waiting on any one of those ports.
 

	
 
- The compiler currently prevents one from using `sync` blocks (or corresponding `get` and/or `put` operations) in functions. They can only be used within components. When writing large programs this it makes it rather hard to re-use code: all code that interacts with other components can only be written within a sync block. I would advise creating `sync func`, `nonsync` func and regular `func`tions. Where:
 
  
 
  - `sync func`tions can only be called from within `sync` functions. They may not open new sync blocks, but may perform calls to `get`/`put`. These are useful to encapsulate sequences of `put`/`get` calls together with some common message-modifying code.
 
  - `nonsync func`tions (or `async func`tions) may only be called outside of sync blocks, and may open new sync blocks themselves. They are useful to encapsulate a single interaction with other components. One may also create new components here.
 
  - regular `func`tions. Are as useful as in any other language, but here we disallow calling `nonsync func`tions or `sync func`tions.
 

	
 
- The `Ack` messages that are sent in response to `PeerPortChanged_Block` messages should contain the sending components `(component ID, port ID)` pair in case the `PeerPortChanged_Block` message is relayed. When such an `Ack` message is received, the peer of the port must be updated before transferring the port to the new owner.
 

	
 
- The compiler currently accepts a select arm's guard that is formulated as `auto a = get(get(rx))`. This should be disallowed.
 
\ No newline at end of file
 
- The compiler currently accepts a select arm's guard that is formulated as `auto a = get(get(rx))`. This should be disallowed.
 

	
 
- The work queue in the runtime is still a mutex-locked queue. The `QueueMpsc` type should be extended to be a multiple-producer multiple-consumer queue. This type should then replace the mutex-locked work queue.
 
\ No newline at end of file
src/protocol/ast.rs
Show inline comments
 
@@ -1149,72 +1149,77 @@ pub enum Statement {
 
    While(WhileStatement),
 
    EndWhile(EndWhileStatement),
 
    Break(BreakStatement),
 
    Continue(ContinueStatement),
 
    Synchronous(SynchronousStatement),
 
    EndSynchronous(EndSynchronousStatement),
 
    Fork(ForkStatement),
 
    EndFork(EndForkStatement),
 
    Select(SelectStatement),
 
    EndSelect(EndSelectStatement),
 
    Return(ReturnStatement),
 
    Goto(GotoStatement),
 
    New(NewStatement),
 
    Expression(ExpressionStatement),
 
}
 

	
 
impl Statement {
 
    pub fn as_new(&self) -> &NewStatement {
 
        match self {
 
            Statement::New(result) => result,
 
            _ => panic!("Unable to cast `Statement` to `NewStatement`"),
 
        }
 
    }
 

	
 
    pub fn span(&self) -> InputSpan {
 
    pub fn maybe_span(&self) -> Option<InputSpan> {
 
        match self {
 
            Statement::Block(v) => v.span,
 
            Statement::Local(v) => v.span(),
 
            Statement::Labeled(v) => v.label.span,
 
            Statement::If(v) => v.span,
 
            Statement::While(v) => v.span,
 
            Statement::Break(v) => v.span,
 
            Statement::Continue(v) => v.span,
 
            Statement::Synchronous(v) => v.span,
 
            Statement::Fork(v) => v.span,
 
            Statement::Select(v) => v.span,
 
            Statement::Return(v) => v.span,
 
            Statement::Goto(v) => v.span,
 
            Statement::New(v) => v.span,
 
            Statement::Expression(v) => v.span,
 
            Statement::Block(v) => Some(v.span),
 
            Statement::Local(v) => Some(v.span()),
 
            Statement::Labeled(v) => Some(v.label.span),
 
            Statement::If(v) => Some(v.span),
 
            Statement::While(v) => Some(v.span),
 
            Statement::Break(v) => Some(v.span),
 
            Statement::Continue(v) => Some(v.span),
 
            Statement::Synchronous(v) => Some(v.span),
 
            Statement::Fork(v) => Some(v.span),
 
            Statement::Select(v) => Some(v.span),
 
            Statement::Return(v) => Some(v.span),
 
            Statement::Goto(v) => Some(v.span),
 
            Statement::New(v) => Some(v.span),
 
            Statement::Expression(v) => Some(v.span),
 
            Statement::EndBlock(_)
 
            | Statement::EndIf(_)
 
            | Statement::EndWhile(_)
 
            | Statement::EndSynchronous(_)
 
            | Statement::EndFork(_)
 
            | Statement::EndSelect(_) => unreachable!(),
 
            | Statement::EndSelect(_) => None,
 
        }
 
    }
 

	
 
    pub fn span(&self) -> InputSpan {
 
        return self.maybe_span().unwrap();
 
    }
 

	
 
    pub fn link_next(&mut self, next: StatementId) {
 
        match self {
 
            Statement::Block(stmt) => stmt.next = next,
 
            Statement::EndBlock(stmt) => stmt.next = next,
 
            Statement::Local(stmt) => match stmt {
 
                LocalStatement::Channel(stmt) => stmt.next = next,
 
                LocalStatement::Memory(stmt) => stmt.next = next,
 
            },
 
            Statement::EndIf(stmt) => stmt.next = next,
 
            Statement::EndWhile(stmt) => stmt.next = next,
 
            Statement::EndSynchronous(stmt) => stmt.next = next,
 
            Statement::EndFork(stmt) => stmt.next = next,
 
            Statement::EndSelect(stmt) => stmt.next = next,
 
            Statement::New(stmt) => stmt.next = next,
 
            Statement::Expression(stmt) => stmt.next = next,
 
            Statement::Return(_)
 
            | Statement::Break(_)
 
            | Statement::Continue(_)
 
            | Statement::Synchronous(_)
 
            | Statement::Fork(_)
 
            | Statement::Select(_)
 
            | Statement::Goto(_)
 
            | Statement::While(_)
 
            | Statement::Labeled(_)
src/protocol/eval/error.rs
Show inline comments
 
use std::fmt;
 

	
 
use crate::protocol::{
 
    ast::*,
 
    Module,
 
    input_source::{ErrorStatement, StatementKind}
 
};
 
use super::executor::*;
 

	
 
/// Represents a stack frame recorded in an error
 
#[derive(Debug)]
 
pub struct EvalFrame {
 
    pub line: u32,
 
    pub line: Option<u32>,
 
    pub module_name: String,
 
    pub procedure: String, // function or component
 
    pub is_func: bool,
 
}
 

	
 
impl fmt::Display for EvalFrame {
 
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 
        let func_or_comp = if self.is_func {
 
            "function "
 
        } else {
 
            "component"
 
        };
 

	
 
        let line_str = match self.line {
 
            Some(line_number) => line_number.to_string(),
 
            None => String::from("??"),
 
        };
 

	
 
        if self.module_name.is_empty() {
 
            write!(f, "{} {}:{}", func_or_comp, &self.procedure, self.line)
 
            write!(f, "{} {}:{}", func_or_comp, &self.procedure, line_str)
 
        } else {
 
            write!(f, "{} {}:{}:{}", func_or_comp, &self.module_name, &self.procedure, self.line)
 
            write!(f, "{} {}:{}:{}", func_or_comp, &self.module_name, &self.procedure, line_str)
 
        }
 
    }
 
}
 

	
 
/// Represents an error that ocurred during evaluation. Contains error
 
/// statements just like in parsing errors. Additionally may display the current
 
/// execution state.
 
#[derive(Debug)]
 
pub struct EvalError {
 
    pub(crate) statements: Vec<ErrorStatement>,
 
    pub(crate) frames: Vec<EvalFrame>,
 
}
 

	
 
impl EvalError {
 
    pub(crate) fn new_error_at_expr(prompt: &Prompt, modules: &[Module], heap: &Heap, expr_id: ExpressionId, msg: String) -> EvalError {
 
        // Create frames
 
        debug_assert!(!prompt.frames.is_empty());
 
        let mut frames = Vec::with_capacity(prompt.frames.len());
 
        let mut last_module_source = &modules[0].source;
 
        for frame in prompt.frames.iter() {
 
            let definition = &heap[frame.definition];
 
            let statement = &heap[frame.position];
 
            let statement_span = statement.span();
 
            let statement_span = statement.maybe_span();
 

	
 
            // Lookup module name, if it has one
 
            let module = modules.iter().find(|m| m.root_id == definition.defined_in).unwrap();
 
            let module_name = if let Some(name) = &module.name {
 
                name.as_str().to_string()
 
            } else {
 
                String::new()
 
            };
 

	
 
            last_module_source = &module.source;
 
            frames.push(EvalFrame{
 
                line: statement_span.begin.line,
 
                line: statement_span.map(|v| v.begin.line),
 
                module_name,
 
                procedure: definition.identifier.value.as_str().to_string(),
 
                is_func: definition.kind == ProcedureKind::Function,
 
            });
 
        }
 

	
 
        let expr = &heap[expr_id];
 
        let statements = vec![
 
            ErrorStatement::from_source_at_span(StatementKind::Error, last_module_source, expr.full_span(), msg)
 
        ];
 

	
 
        EvalError{ statements, frames }
 
    }
 
}
 

	
 
impl fmt::Display for EvalError {
 
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 
        // Display error statement(s)
 
        self.statements[0].fmt(f)?;
 
        for statement in self.statements.iter().skip(1) {
 
            writeln!(f)?;
 
            statement.fmt(f)?;
 
        }
 

	
src/protocol/parser/pass_validation_linking.rs
Show inline comments
 
@@ -29,48 +29,49 @@
 
 * statement" field. Because it is so common, this file contain two macros that
 
 * perform that operation.
 
 *
 
 * To make storing types for polymorphic procedures simpler and more efficient,
 
 * we assign to each expression in the procedure a unique ID. This is what the
 
 * "next expression index" field achieves. Each expression simply takes the
 
 * current value, and then increments this counter.
 
 */
 

	
 
use crate::collections::{ScopedBuffer};
 
use crate::protocol::ast::*;
 
use crate::protocol::input_source::*;
 
use crate::protocol::parser::symbol_table::*;
 
use crate::protocol::parser::type_table::*;
 

	
 
use super::visitor::{
 
    BUFFER_INIT_CAP_SMALL,
 
    BUFFER_INIT_CAP_LARGE,
 
    Ctx,
 
    Visitor,
 
    VisitorResult
 
};
 
use crate::protocol::parser::ModuleCompilationPhase;
 

	
 
#[derive(Debug)]
 
struct ControlFlowStatement {
 
    in_sync: SynchronousStatementId,
 
    in_while: WhileStatementId,
 
    in_scope: ScopeId,
 
    statement: StatementId, // of 'break', 'continue' or 'goto'
 
}
 

	
 
/// This particular visitor will go through the entire AST in a recursive manner
 
/// and check if all statements and expressions are legal (e.g. no "return"
 
/// statements in component definitions), and will link certain AST nodes to
 
/// their appropriate targets (e.g. goto statements, or function calls).
 
///
 
/// This visitor will not perform control-flow analysis (e.g. making sure that
 
/// each function actually returns) and will also not perform type checking. So
 
/// the linking of function calls and component instantiations will be checked
 
/// and linked to the appropriate definitions, but the return types and/or
 
/// arguments will not be checked for validity.
 
///
 
/// The main idea is, because we're visiting nodes in a tree, to do as much as
 
/// we can while we have the memory in cache.
 
pub(crate) struct PassValidationLinking {
 
    // Traversal state, all valid IDs if inside a certain AST element. Otherwise
 
    // `id.is_invalid()` returns true.
 
    in_sync: SynchronousStatementId,
 
@@ -303,53 +304,54 @@ impl Visitor for PassValidationLinking {
 

	
 
        self.expr_parent = ExpressionParent::None;
 

	
 
        // Visit true and false branch. Executor chooses next statement based on
 
        // test expression, not on if-statement itself. Hence the if statement
 
        // does not have a static subsequent statement.
 
        assign_then_erase_next_stmt!(self, ctx, id.upcast());
 
        let old_scope = self.push_scope(ctx, false, true_case.scope);
 
        self.visit_stmt(ctx, true_case.body)?;
 
        self.pop_scope(old_scope);
 
        assign_then_erase_next_stmt!(self, ctx, end_if_id.upcast());
 

	
 
        if let Some(false_case) = false_case {
 
            let old_scope = self.push_scope(ctx, false, false_case.scope);
 
            self.visit_stmt(ctx, false_case.body)?;
 
            self.pop_scope(old_scope);
 
            assign_then_erase_next_stmt!(self, ctx, end_if_id.upcast());
 
        }
 

	
 
        self.prev_stmt = end_if_id.upcast();
 
        Ok(())
 
    }
 

	
 
    fn visit_while_stmt(&mut self, ctx: &mut Ctx, id: WhileStatementId) -> VisitorResult {
 
        let stmt = &ctx.heap[id];
 
        let stmt = &mut ctx.heap[id];
 
        let end_while_id = stmt.end_while;
 
        let test_expr_id = stmt.test;
 
        let body_stmt_id = stmt.body;
 
        let scope_id = stmt.scope;
 
        stmt.in_sync = self.in_sync;
 

	
 
        let old_while = self.in_while;
 
        self.in_while = id;
 

	
 
        // Visit test expression
 
        debug_assert_eq!(self.expr_parent, ExpressionParent::None);
 
        debug_assert!(self.in_test_expr.is_invalid());
 
        self.in_test_expr = id.upcast();
 
        self.expr_parent = ExpressionParent::While(id);
 
        self.visit_expr(ctx, test_expr_id)?;
 
        self.in_test_expr = StatementId::new_invalid();
 

	
 
        // Link up to body statement
 
        assign_then_erase_next_stmt!(self, ctx, id.upcast());
 

	
 
        self.expr_parent = ExpressionParent::None;
 
        let old_scope = self.push_scope(ctx, false, scope_id);
 
        self.visit_stmt(ctx, body_stmt_id)?;
 
        self.pop_scope(old_scope);
 
        self.in_while = old_while;
 

	
 
        // Link final entry in while's block statement back to the while. The
 
        // executor will go to the end-while statement if the test expression
 
        // is false, so put that in as the new previous stmt
src/runtime2/communication.rs
Show inline comments
 
@@ -156,48 +156,49 @@ pub enum SyncMessageContent {
 

	
 
#[derive(Debug)]
 
pub struct ControlMessage {
 
    pub(crate) id: ControlId,
 
    pub sender_comp_id: CompId,
 
    pub target_port_id: Option<PortId>,
 
    pub content: ControlMessageContent,
 
}
 

	
 
/// Content of a control message. If the content refers to a port then the
 
/// `target_port_id` field is the one that it refers to.
 
#[derive(Copy, Clone, Debug)]
 
pub enum ControlMessageContent {
 
    Ack,
 
    BlockPort,
 
    UnblockPort,
 
    ClosePort(ControlMessageClosePort),
 
    PortPeerChangedBlock,
 
    PortPeerChangedUnblock(PortId, CompId), // contains (new_port_id, new_component_id)
 
}
 

	
 
#[derive(Copy, Clone, Debug)]
 
pub struct ControlMessageClosePort {
 
    pub closed_in_sync_round: bool, // needed to ensure correct handling of errors
 
    pub registered_round: Option<u32>,
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Messages (generic)
 
// -----------------------------------------------------------------------------
 

	
 
#[derive(Debug)]
 
pub struct MessageSyncHeader {
 
    pub sync_round: u32,
 
    pub sending_id: CompId,
 
    pub highest_id: CompId,
 
}
 

	
 
#[derive(Debug)]
 
pub enum Message {
 
    Data(DataMessage),
 
    Sync(SyncMessage),
 
    Control(ControlMessage),
 
    Poll,
 
}
 

	
 
impl Message {
 
    pub(crate) fn target_port(&self) -> Option<PortId> {
 
        match self {
src/runtime2/component/component.rs
Show inline comments
 
@@ -257,48 +257,49 @@ pub(crate) fn create_component(
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Generic component messaging utilities (for sending and receiving)
 
// -----------------------------------------------------------------------------
 

	
 
/// Default handling of sending a data message. In case the port is blocked then
 
/// the `ExecState` will become blocked as well. Note that
 
/// `default_handle_control_message` will ensure that the port becomes
 
/// unblocked if so instructed by the receiving component. The returned
 
/// scheduling value must be used.
 
#[must_use]
 
pub(crate) fn default_send_data_message(
 
    exec_state: &mut CompExecState, transmitting_port_id: PortId,
 
    port_instruction: PortInstruction, value: ValueGroup,
 
    sched_ctx: &SchedulerCtx, consensus: &mut Consensus,
 
    control: &mut ControlLayer, comp_ctx: &mut CompCtx
 
) -> Result<CompScheduling, (PortInstruction, String)> {
 
    debug_assert_eq!(exec_state.mode, CompMode::Sync);
 

	
 
    let port_handle = comp_ctx.get_port_handle(transmitting_port_id);
 
    let port_info = comp_ctx.get_port_mut(port_handle);
 
    port_info.last_instruction = port_instruction;
 
    port_info.last_registered_round = Some(consensus.round_number());
 

	
 
    let port_info = comp_ctx.get_port(port_handle);
 
    debug_assert_eq!(port_info.kind, PortKind::Putter);
 

	
 
    let mut ports = Vec::new();
 
    find_ports_in_value_group(&value, &mut ports);
 

	
 
    if port_info.state.is_closed() {
 
        // Note: normally peer is eventually consistent, but if it has shut down
 
        // then we can be sure it is consistent (I think?)
 
        return Err((
 
            port_info.last_instruction,
 
            format!("Cannot send on this port, as the peer (id:{}) has shut down", port_info.peer_comp_id.0)
 
        ))
 
    } else if !ports.is_empty() {
 
        start_send_message_with_ports(
 
            transmitting_port_id, port_instruction, value, exec_state,
 
            comp_ctx, sched_ctx, control
 
        )?;
 

	
 
        return Ok(CompScheduling::Sleep);
 
    } else if port_info.state.is_blocked() {
 
        // Port is blocked, so we cannot send
 
        exec_state.set_as_blocked_put_without_ports(transmitting_port_id, value);
 
@@ -313,49 +314,48 @@ pub(crate) fn default_send_data_message(
 

	
 
        return Ok(CompScheduling::Immediate);
 
    }
 
}
 

	
 
pub(crate) enum IncomingData {
 
    PlacedInSlot,
 
    SlotFull(DataMessage),
 
}
 

	
 
/// Default handling of receiving a data message. In case there is no room for
 
/// the message it is returned from this function. Note that this function is
 
/// different from PDL code performing a `get` on a port; this is the case where
 
/// the message first arrives at the component.
 
// NOTE: This is supposed to be a somewhat temporary implementation. It would be
 
//  nicest if the sending component can figure out it cannot send any more data.
 
#[must_use]
 
pub(crate) fn default_handle_incoming_data_message(
 
    exec_state: &mut CompExecState, inbox_main: &mut InboxMain,
 
    comp_ctx: &mut CompCtx, incoming_message: DataMessage,
 
    sched_ctx: &SchedulerCtx, control: &mut ControlLayer
 
) -> IncomingData {
 
    let port_handle = comp_ctx.get_port_handle(incoming_message.data_header.target_port);
 
    let port_index = comp_ctx.get_port_index(port_handle);
 
    comp_ctx.get_port_mut(port_handle).received_message_for_sync = true;
 
    let port_value_slot = &mut inbox_main[port_index];
 
    let target_port_id = incoming_message.data_header.target_port;
 

	
 
    if port_value_slot.is_none() {
 
        // We can put the value in the slot
 
        *port_value_slot = Some(incoming_message);
 

	
 
        // Check if we're blocked on receiving this message.
 
        dbg_code!({
 
            // Our port cannot have been blocked itself, because we're able to
 
            // directly insert the message into its slot.
 
            assert!(!comp_ctx.get_port(port_handle).state.is_blocked());
 
        });
 

	
 
        if exec_state.is_blocked_on_get(target_port_id) {
 
            // Return to normal operation
 
            exec_state.mode = CompMode::Sync;
 
            exec_state.mode_port = PortId::new_invalid();
 
            debug_assert!(exec_state.mode_value.values.is_empty());
 
        }
 

	
 
        return IncomingData::PlacedInSlot
 
    } else {
 
        // Slot is already full, so if the port was previously opened, it will
 
@@ -431,104 +431,111 @@ pub(crate) fn default_attempt_get(
 
        // So enter the BlockedGet state
 
        exec_state.set_as_blocked_get(target_port);
 
        return GetResult::NoMessage;
 
    }
 
}
 

	
 
/// Default handling that has been received through a `get`. Will check if any
 
/// more messages are waiting, and if the corresponding port was blocked because
 
/// of full buffers (hence, will use the control layer to make sure the peer
 
/// will become unblocked).
 
pub(crate) fn default_handle_received_data_message(
 
    targeted_port: PortId, _port_instruction: PortInstruction, message: &mut DataMessage,
 
    inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup,
 
    comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx, control: &mut ControlLayer,
 
    consensus: &mut Consensus
 
) -> Result<(), (PortInstruction, String)> {
 
    let port_handle = comp_ctx.get_port_handle(targeted_port);
 
    let port_index = comp_ctx.get_port_index(port_handle);
 
    debug_assert!(inbox_main[port_index].is_none()); // because we've just received from it
 

	
 
    // If we received any ports, add them to the port tracking and inbox struct.
 
    // Then notify the peers that they can continue sending to this port, but
 
    // now at a new address.
 
    for received_port in &mut message.ports {
 
        // Transfer messages to main/backup inbox
 
        let _new_inbox_index = inbox_main.len();
 
        if !received_port.messages.is_empty() {
 
            inbox_main.push(Some(received_port.messages.remove(0)));
 
            inbox_backup.extend(received_port.messages.drain(..));
 
        } else {
 
            inbox_main.push(None);
 
        }
 

	
 
        // Create a new port locally
 
        let mut new_port_state = received_port.state;
 
        new_port_state.set(PortStateFlag::Received);
 
        let new_port_handle = comp_ctx.add_port(
 
            received_port.peer_comp, received_port.peer_port,
 
            received_port.kind, new_port_state
 
        );
 
        debug_assert_eq!(_new_inbox_index, comp_ctx.get_port_index(new_port_handle));
 
        comp_ctx.change_port_peer(sched_ctx, new_port_handle, Some(received_port.peer_comp));
 
        let new_port = comp_ctx.get_port(new_port_handle);
 

	
 
        // Transfer messages to main/backup inbox. Make sure to modify the
 
        // target port to our own
 
        for message in received_port.messages.iter_mut() {
 
            message.data_header.target_port = new_port.self_id;
 
        }
 

	
 
        let _new_inbox_index = inbox_main.len();
 
        if !received_port.messages.is_empty() {
 
            inbox_main.push(Some(received_port.messages.remove(0)));
 
            inbox_backup.extend(received_port.messages.drain(..));
 
        } else {
 
            inbox_main.push(None);
 
        }
 

	
 
        debug_assert_eq!(_new_inbox_index, comp_ctx.get_port_index(new_port_handle));
 

	
 
        // Add the port tho the consensus
 
        consensus.notify_of_new_port(_new_inbox_index, new_port_handle, comp_ctx);
 

	
 
        // Replace all references to the port in the received message
 
        for message_location in received_port.locations.iter().copied() {
 
            let value = message.content.get_value_mut(message_location);
 

	
 
            match value {
 
                Value::Input(_) => {
 
                    debug_assert_eq!(new_port.kind, PortKind::Getter);
 
                    *value = Value::Input(port_id_to_eval(new_port.self_id));
 
                },
 
                Value::Output(_) => {
 
                    debug_assert_eq!(new_port.kind, PortKind::Putter);
 
                    *value = Value::Output(port_id_to_eval(new_port.self_id));
 
                },
 
                _ => unreachable!(),
 
            }
 
        }
 

	
 
        // Let the peer know that the port can now be used
 
        let peer_handle = comp_ctx.get_peer_handle(new_port.peer_comp_id);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 

	
 
        peer_info.handle.send_message_logged(sched_ctx, Message::Control(ControlMessage{
 
            id: ControlId::new_invalid(),
 
            sender_comp_id: comp_ctx.id,
 
            target_port_id: Some(new_port.peer_port_id),
 
            content: ControlMessageContent::PortPeerChangedUnblock(new_port.self_id, comp_ctx.id)
 
        }), true);
 
    }
 

	
 
    // Modify last-known location where port instruction was retrieved
 
    let port_info = comp_ctx.get_port(port_handle);
 
    let port_info = comp_ctx.get_port_mut(port_handle);
 
    debug_assert_ne!(port_info.last_instruction, PortInstruction::None); // set by caller
 
    debug_assert!(port_info.state.is_open()); // checked by caller
 
    port_info.last_registered_round = Some(message.sync_header.sync_round);
 

	
 
    // Check if there are any more messages in the backup buffer
 
    for message_index in 0..inbox_backup.len() {
 
        let message = &inbox_backup[message_index];
 
        if message.data_header.target_port == targeted_port {
 
            // One more message, place it in the slot
 
            let message = inbox_backup.remove(message_index);
 
            debug_assert!(comp_ctx.get_port(port_handle).state.is_blocked()); // since we're removing another message from the backup
 
            inbox_main[port_index] = Some(message);
 

	
 
            return Ok(());
 
        }
 
    }
 

	
 
    // Did not have any more messages, so if we were blocked, then we need to
 
    // unblock the port now (and inform the peer of this unblocking)
 
    if port_info.state.is_set(PortStateFlag::BlockedDueToFullBuffers) {
 
        let port_info = comp_ctx.get_port_mut(port_handle);
 
        port_info.state.clear(PortStateFlag::BlockedDueToFullBuffers);
 

	
 
        let (peer_handle, message) = control.cancel_port_blocking(comp_ctx, port_handle);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 
        peer_info.handle.send_message_logged(sched_ctx, Message::Control(message), true);
 
    }
 
@@ -564,68 +571,69 @@ pub(crate) fn default_handle_control_message(
 
            // the component handle as well
 
            let port_to_close = message.target_port_id.unwrap();
 
            let port_handle = comp_ctx.get_port_handle(port_to_close);
 

	
 
            // We're closing the port, so we will always update the peer of the
 
            // port (in case of error messages)
 
            let port_info = comp_ctx.get_port_mut(port_handle);
 
            port_info.peer_comp_id = message.sender_comp_id;
 
            port_info.close_at_sync_end = true; // might be redundant (we might set it closed now)
 

	
 
            let peer_comp_id = port_info.peer_comp_id;
 
            let peer_handle = comp_ctx.get_peer_handle(peer_comp_id);
 

	
 
            // One exception to sending an `Ack` is if we just closed the
 
            // port ourselves, meaning that the `ClosePort` messages got
 
            // sent to one another.
 
            if let Some(control_id) = control.has_close_port_entry(port_handle, comp_ctx) {
 
                // The two components (sender and this component) are closing
 
                // the channel at the same time. So we don't care about the
 
                // content of the `ClosePort` message.
 
                default_handle_ack(exec_state, control, control_id, sched_ctx, comp_ctx, consensus, inbox_main, inbox_backup)?;
 
            } else {
 
                // Respond to the message
 
                let port_info = comp_ctx.get_port(port_handle);
 
                let last_registered_round = port_info.last_registered_round;
 
                let last_instruction = port_info.last_instruction;
 
                let port_has_had_message = port_info.received_message_for_sync;
 
                default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx);
 
                comp_ctx.change_port_peer(sched_ctx, port_handle, None);
 

	
 
                // Handle any possible error conditions (which boil down to: the
 
                // port has been used, but the peer has died). If not in sync
 
                // mode then we close the port immediately.
 

	
 
                // Note that `port_was_used` does not mean that any messages
 
                // were actually received. It might also mean that e.g. the
 
                // component attempted a `get`, but there were no messages, so
 
                // now it is in the `BlockedGet` state.
 
                let port_was_used = last_instruction != PortInstruction::None;
 

	
 
                if exec_state.mode.is_in_sync_block() {
 
                    let closed_during_sync_round = content.closed_in_sync_round && port_was_used;
 
                    let closed_before_sync_round = !content.closed_in_sync_round && !port_has_had_message && port_was_used;
 
                    let round_has_succeeded = !content.closed_in_sync_round && last_registered_round == content.registered_round;
 
                    let closed_during_sync_round = content.closed_in_sync_round;
 
                    let closed_before_sync_round = !closed_during_sync_round && !round_has_succeeded;
 

	
 
                    if closed_during_sync_round || closed_before_sync_round {
 
                    if (closed_during_sync_round || closed_before_sync_round) && port_was_used {
 
                        return Err((
 
                            last_instruction,
 
                            format!("Peer component (id:{}) shut down, so communication cannot (have) succeed(ed)", peer_comp_id.0)
 
                        ));
 
                    }
 
                } else {
 
                    let port_info = comp_ctx.get_port_mut(port_handle);
 
                    port_info.state.set(PortStateFlag::Closed);
 
                }
 
            }
 
        },
 
        ControlMessageContent::UnblockPort => {
 
            // We were previously blocked (or already closed)
 
            let port_to_unblock = message.target_port_id.unwrap();
 
            let port_handle = comp_ctx.get_port_handle(port_to_unblock);
 
            let port_info = comp_ctx.get_port_mut(port_handle);
 

	
 
            debug_assert_eq!(port_info.kind, PortKind::Putter);
 
            debug_assert!(port_info.state.is_set(PortStateFlag::BlockedDueToFullBuffers));
 

	
 
            port_info.state.clear(PortStateFlag::BlockedDueToFullBuffers);
 
            default_handle_recently_unblocked_port(
 
                exec_state, control, consensus, port_handle, sched_ctx,
 
                comp_ctx, inbox_main, inbox_backup
 
@@ -661,49 +669,48 @@ pub(crate) fn default_handle_control_message(
 
                exec_state, control, consensus, port_handle, sched_ctx,
 
                comp_ctx, inbox_main, inbox_backup
 
            )?;
 
        }
 
    }
 

	
 
    return Ok(());
 
}
 

	
 
/// Handles a component entering the synchronous block. Will ensure that the
 
/// `Consensus` and the `ComponentCtx` are initialized properly.
 
pub(crate) fn default_handle_sync_start(
 
    exec_state: &mut CompExecState, inbox_main: &mut InboxMainRef,
 
    sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus
 
) {
 
    sched_ctx.info("Component starting sync mode");
 

	
 
    // If any messages are present for this sync round, set the appropriate flag
 
    // and notify the consensus handler of the present messages
 
    consensus.notify_sync_start(comp_ctx);
 
    for (port_index, message) in inbox_main.iter().enumerate() {
 
        if let Some(message) = message {
 
            consensus.handle_incoming_data_message(comp_ctx, message);
 
            let port_info = comp_ctx.get_port_by_index_mut(port_index);
 
            port_info.received_message_for_sync = true;
 
        }
 
    }
 

	
 
    // Modify execution state
 
    debug_assert_eq!(exec_state.mode, CompMode::NonSync);
 
    exec_state.mode = CompMode::Sync;
 
}
 

	
 
/// Handles a component that has reached the end of the sync block. This does
 
/// not necessarily mean that the component will go into the `NonSync` mode, as
 
/// it might have to wait for the leader to finish the round for everyone (see
 
/// `default_handle_sync_decision`)
 
pub(crate) fn default_handle_sync_end(
 
    exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx,
 
    consensus: &mut Consensus
 
) {
 
    sched_ctx.info("Component ending sync mode (but possibly waiting for a solution)");
 
    debug_assert_eq!(exec_state.mode, CompMode::Sync);
 
    let decision = consensus.notify_sync_end_success(sched_ctx, comp_ctx);
 
    exec_state.mode = CompMode::SyncEnd;
 
    default_handle_sync_decision(sched_ctx, exec_state, comp_ctx, decision, consensus);
 
}
 

	
 
/// Handles a component initiating the exiting procedure, and closing all of its
src/runtime2/component/component_context.rs
Show inline comments
 
@@ -108,50 +108,50 @@ impl Debug for PortState {
 
        let mut s = f.debug_struct("PortState");
 
        for (flag_name, flag_value) in &[
 
            ("closed", Closed),
 
            ("blocked_peer_change", BlockedDueToPeerChange),
 
            ("blocked_full_buffers", BlockedDueToFullBuffers),
 
            ("transmitted", Transmitted),
 
        ] {
 
            s.field(flag_name, &self.is_set(*flag_value));
 
        }
 

	
 
        return s.finish();
 
    }
 
}
 

	
 
#[derive(Debug)]
 
pub struct Port {
 
    // Identifiers
 
    pub self_id: PortId,
 
    pub peer_comp_id: CompId, // eventually consistent
 
    pub peer_port_id: PortId, // eventually consistent
 
    // Generic operating state
 
    pub kind: PortKind,
 
    pub state: PortState,
 
    // State tracking for error detection and error handling
 
    pub last_registered_round: Option<u32>,
 
    pub last_instruction: PortInstruction, // used during sync round to detect port-closed-during-sync errors
 
    pub received_message_for_sync: bool, // used during sync round to detect port-closed-before-sync errors
 
    pub close_at_sync_end: bool, // set during sync round when receiving a port-closed-after-sync message
 
    pub(crate) associated_with_peer: bool,
 
}
 

	
 
pub struct Peer {
 
    pub id: CompId,
 
    pub num_associated_ports: u32,
 
    pub(crate) handle: CompHandle,
 
}
 

	
 
/// Port and peer management structure. Will keep a local reference counter to
 
/// the ports associate with peers, additionally manages the atomic reference
 
/// counter associated with the peers' component handles.
 
pub struct CompCtx {
 
    pub id: CompId,
 
    ports: Vec<Port>,
 
    peers: Vec<Peer>,
 
    port_id_counter: u32,
 
}
 

	
 
#[derive(Copy, Clone, PartialEq, Eq)]
 
pub struct LocalPortHandle(PortId);
 

	
 
#[derive(Copy, Clone)]
 
@@ -160,76 +160,76 @@ pub struct LocalPeerHandle(CompId);
 
impl CompCtx {
 
    /// Creates a new component context based on a reserved entry in the
 
    /// component store. This reservation is used such that we already know our
 
    /// assigned ID.
 
    pub(crate) fn new(reservation: &CompReserved) -> Self {
 
        return Self{
 
            id: reservation.id(),
 
            ports: Vec::new(),
 
            peers: Vec::new(),
 
            port_id_counter: 0,
 
        }
 
    }
 

	
 
    /// Creates a new channel that is fully owned by the component associated
 
    /// with this context.
 
    pub(crate) fn create_channel(&mut self) -> Channel {
 
        let putter_id = PortId(self.take_port_id());
 
        let getter_id = PortId(self.take_port_id());
 
        self.ports.push(Port{
 
            self_id: putter_id,
 
            peer_port_id: getter_id,
 
            kind: PortKind::Putter,
 
            state: PortState::new(),
 
            peer_comp_id: self.id,
 
            last_registered_round: None,
 
            last_instruction: PortInstruction::None,
 
            close_at_sync_end: false,
 
            received_message_for_sync: false,
 
            associated_with_peer: false,
 
        });
 
        self.ports.push(Port{
 
            self_id: getter_id,
 
            peer_port_id: putter_id,
 
            kind: PortKind::Getter,
 
            state: PortState::new(),
 
            peer_comp_id: self.id,
 
            last_registered_round: None,
 
            last_instruction: PortInstruction::None,
 
            close_at_sync_end: false,
 
            received_message_for_sync: false,
 
            associated_with_peer: false,
 
        });
 

	
 
        return Channel{ putter_id, getter_id };
 
    }
 

	
 
    /// Adds a new port. Make sure to call `change_peer` afterwards.
 
    pub(crate) fn add_port(&mut self, peer_comp_id: CompId, peer_port_id: PortId, kind: PortKind, state: PortState) -> LocalPortHandle {
 
        let self_id = PortId(self.take_port_id());
 
        self.ports.push(Port{
 
            self_id, peer_comp_id, peer_port_id, kind, state,
 
            last_registered_round: None,
 
            last_instruction: PortInstruction::None,
 
            close_at_sync_end: false,
 
            received_message_for_sync: false,
 
            associated_with_peer: false,
 
        });
 
        return LocalPortHandle(self_id);
 
    }
 

	
 
    /// Adds a self-reference. Called by the runtime/scheduler
 
    pub(crate) fn add_self_reference(&mut self, self_handle: CompHandle) {
 
        debug_assert_eq!(self.id, self_handle.id());
 
        debug_assert!(self.get_peer_index_by_id(self.id).is_none());
 
        self.peers.push(Peer{
 
            id: self.id,
 
            num_associated_ports: 0,
 
            handle: self_handle
 
        });
 
    }
 

	
 
    /// Removes a self-reference. Called by the runtime/scheduler
 
    pub(crate) fn remove_self_reference(&mut self) -> Option<CompKey> {
 
        let self_index = self.get_peer_index_by_id(self.id).unwrap();
 
        let peer = &mut self.peers[self_index];
 
        let maybe_comp_key = peer.handle.decrement_users();
 
        self.peers.remove(self_index);
 

	
 
        return maybe_comp_key;
src/runtime2/component/consensus.rs
Show inline comments
 
@@ -249,48 +249,53 @@ pub struct Consensus {
 
    // General state of consensus manager
 
    mapping_counter: u32,
 
    mode: Mode,
 
    // State associated with sync round
 
    round_index: u32,
 
    highest_id: CompId,
 
    ports: Vec<PortAnnotation>,
 
    // State associated with arriving at a solution and being a (temporary)
 
    // leader in the consensus round
 
    solution: SolutionCombiner,
 
}
 

	
 
impl Consensus {
 
    pub(crate) fn new() -> Self {
 
        return Self{
 
            round_index: 0,
 
            highest_id: CompId::new_invalid(),
 
            ports: Vec::new(),
 
            mapping_counter: 0,
 
            mode: Mode::NonSync,
 
            solution: SolutionCombiner::new(),
 
        }
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn round_number(&self) -> u32 {
 
        return self.round_index;
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Managing sync state
 
    // -------------------------------------------------------------------------
 

	
 
    /// Notifies the consensus management that the PDL code has reached the
 
    /// start of a sync block.
 
    pub(crate) fn notify_sync_start(&mut self, comp_ctx: &CompCtx) {
 
        debug_assert_eq!(self.mode, Mode::NonSync);
 
        self.highest_id = comp_ctx.id;
 
        self.mapping_counter = 0;
 
        self.mode = Mode::SyncBusy;
 

	
 
        // Make the internally stored port annotation array consistent with the
 
        // ports that the component currently owns. They should match by index
 
        // (i.e. annotation at index `i` corresponds to port `i` in `comp_ctx`).
 
        let mut needs_setting_ports = false;
 
        if comp_ctx.num_ports() != self.ports.len() {
 
            needs_setting_ports = true;
 
        } else {
 
            for (idx, port) in comp_ctx.iter_ports().enumerate() {
 
                let comp_port_id = port.self_id;
 
                let cons_port_id = self.ports[idx].self_port_id;
 
                if comp_port_id != cons_port_id {
 
                    needs_setting_ports = true;
 
@@ -310,49 +315,49 @@ impl Consensus {
 
            // Make sure that we consider all peers as undiscovered again
 
            for annotation in self.ports.iter_mut() {
 
                annotation.peer_discovered = false;
 
            }
 
        }
 
    }
 

	
 
    /// Notifies the consensus management that the PDL code has reached the end
 
    /// of a sync block. A local solution will be submitted, after which we wait
 
    /// until the participants in the round (hopefully) reach a conclusion.
 
    pub(crate) fn notify_sync_end_success(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx) -> SyncRoundDecision {
 
        debug_assert_eq!(self.mode, Mode::SyncBusy);
 
        self.mode = Mode::SyncAwaitingSolution;
 

	
 
        let local_solution = self.generate_local_solution(comp_ctx, false);
 
        let decision = self.handle_local_solution(sched_ctx, comp_ctx, comp_ctx.id, local_solution, false);
 
        return decision;
 
    }
 

	
 
    /// Notifies the consensus management that the component has encountered a
 
    /// critical error during the synchronous round. Hence we should report that
 
    /// we've failed and wait until all the participants have been notified of
 
    /// the error.
 
    pub(crate) fn notify_sync_end_failure(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx) -> SyncRoundDecision {
 
        debug_assert_eq!(self.mode, Mode::SyncBusy);
 
        // debug_assert_eq!(self.mode, Mode::SyncBusy);
 
        self.mode = Mode::SyncAwaitingSolution;
 

	
 
        let local_solution = self.generate_local_solution(comp_ctx, true);
 
        let decision = self.handle_local_solution(sched_ctx, comp_ctx, comp_ctx.id, local_solution, true);
 
        return decision;
 
    }
 

	
 
    /// Notifies that a decision has been reached. Note that the caller should
 
    /// still take the appropriate actions based on the decision it is supplying
 
    /// to the consensus layer.
 
    pub(crate) fn notify_sync_decision(&mut self, _decision: SyncRoundDecision) {
 
        // Reset everything for the next round
 
        debug_assert_eq!(self.mode, Mode::SyncAwaitingSolution);
 
        self.mode = Mode::NonSync;
 
        self.round_index = self.round_index.wrapping_add(1);
 

	
 
        for port in self.ports.iter_mut() {
 
            port.mapping = None;
 
        }
 

	
 
        self.solution.clear();
 
    }
 

	
 
    pub(crate) fn notify_of_new_port(&mut self, _expected_index: usize, port_handle: LocalPortHandle, comp_ctx: &CompCtx) {
src/runtime2/component/control_layer.rs
Show inline comments
 
@@ -243,48 +243,49 @@ impl ControlLayer {
 
    /// make sure that the port state has already been set to `Closed`.
 
    pub(crate) fn initiate_port_closing(&mut self, port_handle: LocalPortHandle, exit_inside_sync: bool, comp_ctx: &CompCtx) -> (LocalPeerHandle, ControlMessage) {
 
        let port = comp_ctx.get_port(port_handle);
 
        let peer_port_id = port.peer_port_id;
 
        debug_assert!(port.state.is_closed());
 

	
 
        // Construct the port-closing entry
 
        let entry_id = self.take_id();
 
        self.entries.push(ControlEntry{
 
            id: entry_id,
 
            ack_countdown: 1,
 
            content: ControlContent::ClosedPort(port.self_id),
 
        });
 

	
 
        // Return the messages notifying peer of the closed port
 
        let peer_handle = comp_ctx.get_peer_handle(port.peer_comp_id);
 
        return (
 
            peer_handle,
 
            ControlMessage{
 
                id: entry_id,
 
                sender_comp_id: comp_ctx.id,
 
                target_port_id: Some(peer_port_id),
 
                content: ControlMessageContent::ClosePort(ControlMessageClosePort{
 
                    closed_in_sync_round: exit_inside_sync,
 
                    registered_round: port.last_registered_round,
 
                }),
 
            }
 
        );
 
    }
 

	
 
    /// Generates the control message used to indicate to a peer that a port
 
    /// should be blocked (expects the caller to have set the port's state to
 
    /// blocked).
 
    pub(crate) fn initiate_port_blocking(&mut self, comp_ctx: &CompCtx, port_handle: LocalPortHandle) -> (LocalPeerHandle, ControlMessage) {
 
        let port_info = comp_ctx.get_port(port_handle);
 
        debug_assert_eq!(port_info.kind, PortKind::Getter); // because we're telling the putter to block
 
        debug_assert!(port_info.state.is_set(PortStateFlag::BlockedDueToFullBuffers)); // contract with caller
 

	
 
        let peer_port_id = port_info.peer_port_id;
 
        let peer_comp_id = port_info.peer_comp_id;
 
        let peer_handle = comp_ctx.get_peer_handle(peer_comp_id);
 

	
 
        return (
 
            peer_handle,
 
            ControlMessage{
 
                id: ControlId::new_invalid(),
 
                sender_comp_id: comp_ctx.id,
 
                target_port_id: Some(peer_port_id),
 
                content: ControlMessageContent::BlockPort,
src/runtime2/stdlib/internet.rs
Show inline comments
 
@@ -98,48 +98,52 @@ impl Drop for SocketTcpClient {
 
}
 

	
 
impl AsFileDescriptor for SocketTcpClient {
 
    fn as_file_descriptor(&self) -> FileDescriptor {
 
        return self.socket_handle;
 
    }
 
}
 

	
 
/// TCP listener. Yielding new connections
 
pub struct SocketTcpListener {
 
    socket_handle: libc::c_int,
 
    is_blocking: bool,
 
}
 

	
 
impl SocketTcpListener {
 
    pub fn new(ip: IpAddr, port: u16) -> Result<Self, SocketError> {
 
        // Create and bind
 
        let socket_handle = create_and_bind_socket(
 
            libc::SOCK_STREAM, libc::IPPROTO_TCP, ip, port
 
        )?;
 
        if !set_socket_blocking(socket_handle, SOCKET_BLOCKING) {
 
            unsafe{ libc::close(socket_handle); }
 
            return Err(SocketError::Modifying);
 
        }
 
        if !set_socket_reuse_address(socket_handle) {
 
            unsafe{ libc::close(socket_handle); }
 
            return Err(SocketError::Modifying);
 
        }
 

	
 
        // Listen
 
        unsafe {
 
            let result = listen(socket_handle, libc::SOMAXCONN);
 
            if result < 0 {
 
                unsafe{ libc::close(socket_handle); }
 
                return Err(SocketError::Listening);
 
            }
 
        }
 

	
 

	
 
        return Ok(SocketTcpListener{
 
            socket_handle,
 
            is_blocking: SOCKET_BLOCKING,
 
        });
 
    }
 

	
 
    pub fn accept(&self) -> Result<libc::c_int, IoError> {
 
        let (mut address, mut address_size) = create_sockaddr_in_empty();
 
        let address_pointer = &mut address as *mut sockaddr_in;
 
        let socket_handle = unsafe { accept(self.socket_handle, address_pointer.cast(), &mut address_size) };
 
        if socket_handle < 0 {
 
            return Err(IoError::last_os_error());
 
        }
 
@@ -367,36 +371,65 @@ fn set_socket_blocking(handle: libc::c_int, blocking: bool) -> bool {
 
        return false;
 
    }
 

	
 
    unsafe{
 
        let mut flags = libc::fcntl(handle, libc::F_GETFL, 0);
 
        if flags < 0 {
 
            return false;
 
        }
 

	
 
        if blocking {
 
            flags &= !libc::O_NONBLOCK;
 
        } else {
 
            flags |= libc::O_NONBLOCK;
 
        }
 

	
 
        let result = libc::fcntl(handle, libc::F_SETFL, flags);
 
        if result < 0 {
 
            return false;
 
        }
 
    }
 

	
 
    return true;
 
}
 

	
 
#[inline]
 
fn set_socket_reuse_address(handle: libc::c_int) -> bool {
 
    if handle < 0 {
 
        return false;
 
    }
 

	
 
    unsafe {
 
        let enable: libc::c_int = 1;
 
        let enable_ptr: *const _ = &enable;
 
        let result = libc::setsockopt(
 
            handle, libc::SOL_SOCKET, libc::SO_REUSEADDR,
 
            enable_ptr.cast(), size_of::<libc::c_int>() as libc::socklen_t
 
        );
 
        if result < 0 {
 
            return false;
 
        }
 

	
 
        let result = libc::setsockopt(
 
            handle, libc::SOL_SOCKET, libc::SO_REUSEPORT,
 
            enable_ptr.cast(), size_of::<libc::c_int>() as libc::socklen_t
 
        );
 
        if result < 0 {
 
            return false;
 
        }
 
    }
 

	
 
    return true;
 
}
 

	
 
#[inline]
 
fn socket_family_from_ip(ip: IpAddr) -> libc::c_int {
 
    return match ip {
 
        IpAddr::V4(_) => libc::AF_INET,
 
        IpAddr::V6(_) => libc::AF_INET6,
 
    };
 
}
 

	
 
#[inline]
 
fn htons(port: u16) -> u16 {
 
    return port.to_be();
 
}
src/runtime2/tests/mod.rs
Show inline comments
 
use crate::protocol::*;
 
use crate::protocol::eval::*;
 
use crate::runtime2::runtime::*;
 
use crate::runtime2::component::{CompCtx, CompPDL};
 

	
 
mod messaging;
 
mod error_handling;
 
mod transfer_ports;
 
mod internet;
 

	
 
const LOG_LEVEL: LogLevel = LogLevel::Debug;
 
const NUM_THREADS: u32 = 1;
 

	
 
pub(crate) fn compile_and_create_component(source: &str, routine_name: &str, args: ValueGroup) {
 
    let protocol = ProtocolDescription::parse(source.as_bytes())
 
        .expect("successful compilation");
 
    let runtime = Runtime::new(NUM_THREADS, LogLevel::None, protocol)
 
    let runtime = Runtime::new(NUM_THREADS, LOG_LEVEL, protocol)
 
        .expect("successful runtime startup");
 
    create_component(&runtime, "", routine_name, args);
 
}
 

	
 
pub(crate) fn create_component(rt: &Runtime, module_name: &str, routine_name: &str, args: ValueGroup) {
 
    let prompt = rt.inner.protocol.new_component(
 
        module_name.as_bytes(), routine_name.as_bytes(), args
 
    ).expect("create prompt");
 
    let reserved = rt.inner.start_create_component();
 
    let ctx = CompCtx::new(&reserved);
 
    let component = Box::new(CompPDL::new(prompt, 0));
 
    let (key, _) = rt.inner.finish_create_component(reserved, component, ctx, false);
 
    rt.inner.enqueue_work(key);
 
}
 

	
 
pub(crate) fn no_args() -> ValueGroup { ValueGroup::new_stack(Vec::new()) }
 

	
 
#[test]
 
fn test_component_creation() {
 
    let pd = ProtocolDescription::parse(b"
 
    comp nothing_at_all() {
 
        s32 a = 5;
 
        auto b = 5 + a;
 
    }
testdata/examples/01_reworked_consensus_select_01.pdl
Show inline comments
 
new file 100644
 
// Previously the following example caused the inadvertent synchronization of
 
// all participating components
 

	
 
func lots_of_unneccesary_work(u64 min_val, u64 max_val) -> u64 {
 
    u64 sum = 0;
 
    u64 index = min_val;
 
    while (index <= max_val) {
 
        sum += index;
 
        index += 1;
 
    }
 

	
 
    return sum;
 
}
 

	
 
comp data_producer(out<u64> tx, u64 min_val, u64 max_val, string send_text) {
 
    while (true) {
 
        sync {
 
            auto value = lots_of_unneccesary_work(min_val, max_val);
 
            print(send_text);
 
            put(tx, value);
 
        }
 
    }
 
}
 

	
 
comp data_receiver_v1(in<u64> rx_a, in<u64> rx_b, in<u64> rx_c, u32 num_rounds) {
 
    u32 counter = 0;
 
    auto rxs = { rx_a, rx_b, rx_c };
 

	
 
    while (counter < num_rounds) {
 
        auto num_peers = length(rxs);
 
        auto peer_index = 0;
 
        while (peer_index < num_peers) {
 
            sync {
 
                auto result = get(rxs[peer_index]);
 
                print("received message (V1)");
 
                peer_index += 1;
 
            }
 
        }
 
        counter += 1;
 
    }
 
}
 

	
 
// The reason was that a synchronous interaction checked *all* ports for a valid
 
// interaction. So for the `round_robin_receiver` we have that it communicates
 
// with one peer per round, but it still requires the other peers to agree that
 
// they didn't send anything at all! Note that this already implies that all
 
// running components need to synchronize. We could fix this by writing:
 

	
 
comp data_receiver_v2(in<u64> rx_a, in<u64> rx_b, in<u64> rx_c, u32 num_rounds) {
 
    u32 counter = 0;
 
    auto rxs = { rx_a, rx_b, rx_c };
 

	
 
    while (counter < num_rounds) {
 
        auto num_peers = length(rxs);
 
        auto peer_index = 0;
 
        sync {
 
            while (peer_index < num_peers) {
 
                auto result = get(rxs[peer_index]);
 
                print("received message (V2)");
 
                peer_index += 1;
 
            }
 
        }
 
        counter += 1;
 
    }
 
}
 

	
 
// But this is not the intended behaviour. We want the producer components to
 
// be able to run independently of one another. This requires a change in the
 
// semantics of the language! We no longer have that each peer is automatically
 
// dragged into the synchronous round. Instead, once the first message of the
 
// peer is received through a `get` call, will we join each other's synchronous
 
// rounds.
 
//
 
// With such a change to the runtime, we now have that the first version (
 
// written above) produces the intended behaviour: the consumer accepts one
 
// value and synchronizes with its sender. Then goes to the next round and 4
 
// synchronizes with the next sender.
 
//
 
// But what we would really like to do is to synchronize with any of the peers
 
// that happens to have its work ready for consumption. And so the 'select'
 
// statement is introduced into the language. This statement can be used to
 
// describe a set of possible behaviours we could execute. Each behaviour will
 
// have an associated set of ports. When those associated set of ports have a
 
// message ready to be read, then the corresponding behaviour will execute. So
 
// to complete the example above, we have:
 

	
 
comp data_receiver_v3(in<u64> rx_a, in<u64> rx_b, in<u64> rx_c, u32 num_rounds) {
 
    u32 counter = 0;
 
    auto rxs = { rx_a, rx_b, rx_c };
 

	
 
    u32 received_from_a = 0;
 
    u32 received_from_b_or_c = 0;
 
    u32 received_from_a_or_c = 0;
 
    u64 sum_received_from_c = 0;
 

	
 
    while (counter < num_rounds*3) {
 
        sync {
 
            select {
 
                auto value = get(rx_a) -> {
 
                    received_from_a += 1;
 
                    received_from_a_or_c += 1;
 
                }
 
                auto value = get(rx_b) -> {
 
                    received_from_b_or_c += 1;
 
                }
 
                auto value = get(rx_c) -> {
 
                    received_from_a_or_c += 1;
 
                    received_from_b_or_c += 1;
 
                    sum_received_from_c += value;
 
                }
 
            }
 
            print("received message (V3)");
 
        }
 
        counter += 1;
 
    }
 
}
 

	
 
comp main() {
 
    u32 version = 3;
 
    u32 num_rounds = 3;
 

	
 
    channel tx_a -> rx_a;
 
    channel tx_b -> rx_b;
 
    channel tx_c -> rx_c;
 
    new data_producer(tx_a, 0xBEEF, 0xCAFE, "sent from A");
 
    new data_producer(tx_b, 0xBEEF, 0xCAFE, "sent from B");
 
    new data_producer(tx_c, 0xBEEF, 0xCAFE, "sent from C");
 

	
 
    if (version == 1) {
 
        new data_receiver_v1(rx_a, rx_b, rx_c, num_rounds);
 
    } else if (version == 2) {
 
        new data_receiver_v2(rx_a, rx_b, rx_c, num_rounds);
 
    } else if (version == 3) {
 
        new data_receiver_v3(rx_a, rx_b, rx_c, num_rounds);
 
    } else {
 
        print("ERROR: invalid version in source");
 
    }
 
}
 
\ No newline at end of file
testdata/examples/02_error_handling.pdl
Show inline comments
 
new file 100644
 
// Although in an unstable state, there is an initial implementation for error
 
// handling. Roughly speaking: if a component has failed then it cannot complete
 
// any current or future synchronous rounds anymore. Hence, apart from some edge
 
// cases, any received message by a peer should cause a failure at that peer as
 
// well. We may have a look at the various places where a component with respect
 
// to a peer that is receiving its messages
 

	
 
enum ErrorLocation {
 
    BeforeSync,
 
    DuringSyncBeforeFirstInteraction,
 
    DuringSyncBeforeSecondInteraction,
 
    DuringSyncAfterInteractions,
 
    AfterSync,
 
}
 

	
 
func error_location_to_string(ErrorLocation loc) -> string {
 
    if (let ErrorLocation::BeforeSync = loc) {
 
        return "before sync";
 
    } else if (let ErrorLocation::DuringSyncBeforeFirstInteraction = loc) {
 
        return "during sync before first interaction";
 
    } else if (let ErrorLocation::DuringSyncBeforeSecondInteraction = loc) {
 
        return "during sync before second interaction";
 
    } else if (let ErrorLocation::DuringSyncAfterInteractions = loc) {
 
        return "during sync after interactions";
 
    } else { return "after sync"; }
 
}
 

	
 
func crash() -> u8 {
 
    return {}[0]; // access index 1 of an empty array
 
}
 

	
 
comp sender_and_crasher(out<u32> value, ErrorLocation loc) {
 
    print("sender: will crash " @ error_location_to_string(loc));
 
    if (loc == ErrorLocation::BeforeSync) { crash(); }
 
    sync {
 
        if (loc == ErrorLocation::DuringSyncBeforeFirstInteraction) { crash(); }
 
        print("sender: sending first value");
 
        put(value, 0);
 
        if (loc == ErrorLocation::DuringSyncBeforeSecondInteraction) { crash(); }
 
        print("sender: sending second value");
 
        put(value, 1);
 
        if (loc == ErrorLocation::DuringSyncAfterInteractions) { crash(); }
 
    }
 
    if (loc == ErrorLocation::AfterSync) { crash(); }
 
}
 

	
 
comp receiver(in<u32> value) {
 
    sync {
 
        auto a = get(value);
 
        auto b = get(value);
 
    }
 
}
 

	
 
// Note that when we run the example with the error location before sync, or
 
// during sync, that the receiver always crashes. However the location where it
 
// will crash is somewhat random! Due to the asynchronous nature of the runtime
 
// a sender of messages will always just `put` the value onto the port and
 
// continue execution. So even though the sender component might already be done
 
// with its sync round, the receiver officially still has to receive its first
 
// message. In any case, a neat error message should be displayed in the
 
// console.
 
//
 
// Note especially, given the asynchronous nature of the runtime, that the
 
// receiver should figure out when the peer component has crashed, but it can
 
// still finish the current synchronous round. This might happen if the peer
 
// component crashes *just* after the synchronous round. There may be a case
 
// where the peer receives the information that the peer crashed *before* it
 
// receives the information that the synchronous round has succeeded.
 

	
 
comp main() {
 
    channel tx -> rx;
 

	
 
    new sender_and_crasher(tx, ErrorLocation::AfterSync);
 
    new receiver(rx);
 
}
 
\ No newline at end of file
testdata/examples/03_transmitting_ports_01.pdl
Show inline comments
 
new file 100644
 
// Since this release transmitting ports is possible. This means that we can
 
// send ports through ports. In fact, we can send ports that may send ports that
 
// may send ports. But don't be fooled by the apparent complexity. The inner
 
// type `T` of a port like `in<T>` simply states that that is the message type.
 
// Should the type `T` contain one or more ports, then we kick off a bit of code
 
// that takes care of the transfer of the port. Should the port inside of `T`
 
// itself, after being received, send a port, then we simply kick off that same
 
// procedure again.
 
//
 
// In the simplest case, we have someone transmitting the receiving end of a
 
// channel to another component, which then uses that receiving end to receive a
 
// value.
 

	
 
comp port_sender(out<in<u32>> tx, in<u32> to_transmit) {
 
    sync put(tx, to_transmit);
 
}
 

	
 
comp port_receiver_and_value_getter(in<in<u32>> rx, u32 expected_value) {
 
    u32 got_value = 0;
 
    sync {
 
        auto port = get(rx);
 
        got_value = get(port);
 
    }
 
    if (expected_value == got_value) {
 
        print("got the expected value :)");
 
    } else {
 
        print("got a different value :(");
 
    }
 
}
 

	
 
comp value_sender(out<u32> tx, u32 to_send) {
 
    sync put(tx, to_send);
 
}
 

	
 
comp main() {
 
    u32 value = 1337_2392;
 

	
 
    channel port_tx -> port_rx;
 
    channel value_tx -> value_rx;
 
    new port_sender(port_tx, value_rx);
 
    new port_receiver_and_value_getter(port_rx, value);
 
    new value_sender(value_tx, value);
 
}
 
\ No newline at end of file
testdata/examples/03_transmitting_ports_02.pdl
Show inline comments
 
new file 100644
 
// Ofcourse we may do something a little more complicated than this. Suppose
 
// that we don't just send one port, but send a series of ports. i.e. we use
 
// an `Option` union type, to ...
 

	
 
union Option<T> {
 
    Some(T),
 
    None,
 
}
 

	
 
// ... turn an array of ports that we're going to transmit into a series of
 
// messages containing ports, each sent to a specific component.
 

	
 
comp port_sender(out<Option<in<u32>>>[] txs, in<u32>[] to_transmit) {
 
    auto num_peers = length(txs);
 
    auto num_ports = length(to_transmit);
 

	
 
    auto num_per_peer = num_ports / num_peers;
 
    auto num_remaining = num_ports - (num_per_peer * num_peers);
 

	
 
    auto peer_index = 0;
 
    auto port_index = 0;
 
    while (peer_index < num_peers) {
 
        auto peer_port = txs[peer_index];
 
        auto counter = 0;
 

	
 
        // Distribute part of the ports to one of the peers.
 
        sync {
 
            // Sending the main batch of ports for the peer
 
            while (counter < num_per_peer) {
 
                put(peer_port, Option::Some(to_transmit[port_index]));
 
                port_index += 1;
 
                counter += 1;
 
            }
 

	
 
            // Sending the remainder of ports, one per peer until they're gone
 
            if (num_remaining > 0) {
 
                put(peer_port, Option::Some(to_transmit[port_index]));
 
                port_index += 1;
 
                num_remaining -= 1;
 
            }
 

	
 
            // Finish the custom protocol by sending nothing, which indicates to
 
            // the peer that it has received all the ports we have to hand out.
 
            put(peer_port, Option::None);
 
        }
 

	
 
        peer_index += 1;
 
    }
 
}
 

	
 
// And here we have the component which will receive on that port. We can design
 
// the synchronous regions any we want. In this case when we receive ports we
 
// just synchronize `port_sender`, but the moment we receive messages we
 
// synchronize with everyone.
 

	
 
comp port_receiver(in<Option<in<u32>>> port_rxs, out<u32> sum_tx) {
 
    // Receive all ports
 
    auto value_rxs = {};
 

	
 
    sync {
 
        while (true) {
 
            auto maybe_port = get(port_rxs);
 
            if (let Option::Some(certainly_a_port) = maybe_port) {
 
                value_rxs @= { certainly_a_port };
 
            } else {
 
                break;
 
            }
 
        }
 
    }
 

	
 
    // Receive all values
 
    auto received_sum = 0;
 

	
 
    sync {
 
        auto port_index = 0;
 
        auto num_ports = length(value_rxs);
 
        while (port_index < num_ports) {
 
            auto value = get(value_rxs[port_index]);
 
            received_sum += value;
 
            port_index += 1;
 
        }
 
    }
 

	
 
    // And send the sum
 
    sync put(sum_tx, received_sum);
 
}
 

	
 
// Now we need something to send the values, we'll make something incredibly
 
// simple. Namely:
 

	
 
comp value_sender(out<u32> tx, u32 value_to_send) {
 
    sync put(tx, value_to_send);
 
}
 

	
 
comp sum_collector(in<u32>[] partial_sum_rx, out<u32> total_sum_tx) {
 
    auto sum = 0;
 
    auto index = 0;
 
    while (index < length(partial_sum_rx)) {
 
        sync sum += get(partial_sum_rx[index]);
 
        index += 1;
 
    }
 

	
 
    sync put(total_sum_tx, sum);
 
}
 

	
 
// And we need the component to set this entire system of components up. So we
 
// write the following entry point.
 

	
 
comp main() {
 
    auto num_value_ports = 32;
 
    auto num_receivers = 3;
 

	
 
    // Construct the senders of values
 
    auto value_port_index = 1;
 
    auto value_rx_ports = {};
 
    while (value_port_index <= num_value_ports) {
 
        channel value_tx -> value_rx;
 
        new value_sender(value_tx, value_port_index);
 
        value_rx_ports @= { value_rx };
 
        value_port_index += 1;
 
    }
 

	
 
    // Construct the components that will receive groups of value-receiving
 
    // ports
 
    auto receiver_index = 0;
 
    auto sum_combine_rx_ports = {};
 
    auto port_tx_ports = {};
 

	
 
    while (receiver_index < num_receivers) {
 
        channel sum_tx -> sum_rx;
 
        channel port_tx -> port_rx;
 
        new port_receiver(port_rx, sum_tx);
 

	
 
        sum_combine_rx_ports @= { sum_rx };
 
        port_tx_ports @= { port_tx };
 
        receiver_index += 1;
 
    }
 

	
 
    // Construct the component that redistributes the total number of input
 
    // ports.
 
    new port_sender(port_tx_ports, value_rx_ports);
 

	
 
    // Construct the component that computes the sum of all sent values
 
    channel total_value_tx -> total_value_rx;
 
    new sum_collector(sum_combine_rx_ports, total_value_tx);
 

	
 
    auto expected = num_value_ports * (num_value_ports + 1) / 2;
 
    auto received = 0;
 

	
 
    sync received = get(total_value_rx);
 

	
 
    if (expected == received) {
 
        print("got the expected value!");
 
    } else {
 
        print("got something entirely different");
 
    }
 
}
 
\ No newline at end of file
testdata/examples/04_native_components.pdl
Show inline comments
 
new file 100644
 
// We'll start by important the standard library that defines the builtin
 
// components that support a TCP listener and a TCP client.
 

	
 
import std.internet::*;
 

	
 
// We'll define a little utility used through this document that is called to
 
// retrieve the port we're going to listen on.
 

	
 
func listen_port() -> u16 {
 
    return 2392;
 
}
 

	
 
// Next we define our server. The server accepts (for the case of this example)
 
// a number of connections until it will stop listening. At that point it will
 
// wait until it receives a signal that allows it to shut down.
 

	
 
comp server(u32 num_connections, in<()> shutdown) {
 
    // Here we set up the channels for commands, going to the listener
 
    // component, and the channel that sends new connections back to us.
 
    channel listen_cmd_tx -> listen_cmd_rx;
 
    channel listen_conn_tx -> listen_conn_rx;
 

	
 
    // And we create the tcp_listener, imported from the standard library, here.
 
    new tcp_listener({}, listen_port(), listen_cmd_rx, listen_conn_tx);
 

	
 
    // Here we set up a variable that will hold our received connections
 
    channel client_cmd_tx -> unused_client_cmd_rx;
 
    channel unused_client_data_tx -> client_data_rx;
 
    auto new_connection = TcpConnection{
 
        tx: client_cmd_tx,
 
        rx: client_data_rx,
 
    };
 

	
 
    auto connection_counter = 0;
 
    while (connection_counter < num_connections) {
 
        // We wait until we receive a new connection
 
        print("server: waiting for an accepted connection");
 
        sync {
 
            // The way the standard library is currently written, we need to
 
            // send the `tcp_listener` component the command that it should
 
            // listen to for the next connection. This is only one way in which
 
            // the standard library could be written. We could also write it
 
            // such a way such that a separate component buffers new incoming
 
            // connections, such that we only have to `get` from that separate
 
            // component.
 
            //
 
            // Note that when we get such a new connection, (see the
 
            // TcpConnection struct in the standard library), the peers of the
 
            // two ports are already hooked up to a `tcp_client` component, also
 
            // defined in the standard library.
 
            put(listen_cmd_tx, ListenerCmd::Accept);
 
            new_connection = get(listen_conn_rx);
 
        }
 

	
 
        // In any case, now that the code is here, the synchronous round that
 
        // governed receiving the new connection has completed. And so we send
 
        // that connection off to a handler component. In this case we have the
 
        // `echo_machine` component, defined in this file as well.
 
        print("server: spawning an echo'ing component");
 
        new echo_machine(new_connection);
 
        connection_counter += 1;
 
    }
 

	
 
    // When all of the desired connections have been handled, we first await a
 
    // shutdown signal from another component.
 
    print("server: awaiting shutdown signal");
 
    sync auto v = get(shutdown);
 

	
 
    // And once we have received that signal, we'll instruct the listener
 
    // component to shut down.
 
    print("server: shutting down listener");
 
    sync put(listen_cmd_tx, ListenerCmd::Shutdown);
 
}
 

	
 
// This is the component that is spawned by the server component to handle new
 
// connections. All it does is wait for a single incoming TCP packet, where it
 
// expects a single byte of data, and then echo that back to the peer.
 

	
 
comp echo_machine(TcpConnection conn) {
 
    auto data_to_echo = {};
 

	
 
    // Here is where we receive a message from a peer ...
 
    sync {
 
        print("echo: receiving data");
 
        put(conn.tx, ClientCmd::Receive);
 
        data_to_echo = get(conn.rx);
 
        put(conn.tx, ClientCmd::Finish);
 
    }
 

	
 
    // ... and send it right back to our peer.
 
    print("echo: sending back data");
 
    sync put(conn.tx, ClientCmd::Send(data_to_echo));
 

	
 
    // And we ask the `tcp_client` to shut down neatly.
 
    print("echo: shutting down");
 
    sync put(conn.tx, ClientCmd::Shutdown);
 
}
 

	
 
// Here is the component that we will instantiate to connect to the `server`
 
// component above (more specifically, to the `tcp_listener` component
 
// instantiated by the `server`). This is the component that will ask the
 
// `echo_machine` component to echo a byte of data.
 

	
 
comp echo_requester(u8 byte_to_send, out<()> done) {
 
    // We instantiate the `tcp_client` from the standard library. This will
 
    // perform the "connect" call to the `tcp_listener`.
 
    channel cmd_tx -> cmd_rx;
 
    channel data_tx -> data_rx;
 
    new tcp_client({127, 0, 0, 1}, listen_port(), cmd_rx, data_tx);
 

	
 
    // And once we are connected, we send the single byte to the other side.
 
    print("requester: sending bytes");
 
    sync put(cmd_tx, ClientCmd::Send({ byte_to_send }));
 

	
 
    // This sent byte will arrive at the `echo_machine`, which will send it
 
    // right back to us. So here is where we wait for that byte to arrive.
 
    auto received_byte = byte_to_send + 1;
 
    sync {
 
        print("requester: receiving echo response");
 
        put(cmd_tx, ClientCmd::Receive);
 
        received_byte = get(data_rx)[0];
 
        put(cmd_tx, ClientCmd::Finish);
 
    }
 

	
 
    // We make sure that we got back what we sent
 
    while (byte_to_send != received_byte) {
 
        print("requester: Oh no! we got back a byte different from the one we sent");
 
    }
 

	
 
    // And we shut down the TCP connection
 
    print("requester: shutting down TCP component");
 
    sync put(cmd_tx, ClientCmd::Shutdown);
 

	
 
    // And finally we send a signal to another component (the `main` component)
 
    // to let it know we have finished our little protocol.
 
    sync put(done, ());
 
}
 

	
 
// And here the entry point for our program
 

	
 
comp main() {
 
    // Some settings for the example
 
    auto num_connections = 12;
 

	
 
    // We create a new channel that allows us to shut down our server component.
 
    // That channel being created, we can instantiate the server component.
 
    channel shutdown_listener_tx -> shutdown_listener_rx;
 
    new server(num_connections, shutdown_listener_rx);
 

	
 
    // Here we create all the requesters that will ask their peer to echo back
 
    // a particular byte.
 
    auto connection_index = 0;
 
    auto all_done = {};
 
    while (connection_index < num_connections) {
 
        channel done_tx -> done_rx;
 
        new echo_requester(cast(connection_index), done_tx);
 
        connection_index += 1;
 
        all_done @= {done_rx};
 
    }
 

	
 
    // Here our program starts to shut down. First we'll wait until all of our
 
    // requesting components have gotten back the byte they're expecting.
 
    auto counter = 0;
 
    while (counter < length(all_done)) {
 
        print("constructor: waiting for requester to exit");
 
        sync auto v = get(all_done[counter]);
 
        counter += 1;
 
    }
 

	
 
    // And we shut down our server.
 
    print("constructor: instructing listener to exit");
 
    sync put(shutdown_listener_tx, ());
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)