Changeset - 97217a7b2d18
[Not reviewed]
bin-compiler/src/main.rs
Show inline comments
 
use std::fs::File;
 
use std::io::Read;
 

	
 
use clap::{App, Arg};
 
use reowolf_rs as rw;
 

	
 
fn main() {
 
    let app = App::new("rwc")
 
        .author("Henger, M.")
 
        .version(env!("CARGO_PKG_VERSION"))
 
        .about("Reowolf compiler")
 
        .arg(
 
            Arg::new("input")
 
                .long("input")
 
                .short('i')
 
                .help("input files")
 
                .required(true)
 
                .takes_value(true)
 
                .multiple_occurrences(true)
 
        )
 
        .arg(
 
            Arg::new("threads")
 
                .long("threads")
 
                .short('t')
 
                .help("number of runtime threads")
 
                .default_value("1")
 
                .takes_value(true)
 
        )
 
        .arg(
 
            Arg::new("loglevel")
 
                .long("log_level")
 
                .short('l')
 
                .help("set log level ('none', 'info', 'debug' or 'all')")
 
                .default_value("all")
 
                .takes_value(true)
 
        )
 
        .arg(
 
            Arg::new("stdlib")
 
                .long("stdlib")
 
                .short('s')
 
                .help("standard library directory (overrides default)")
 
                .takes_value(true)
 
        );
 

	
 
    // Retrieve arguments and convert
 
    let app = app.get_matches();
 
    let input_files = app.values_of("input");
 
    if input_files.is_none() {
 
        println!("ERROR: Expected at least one input file");
 
        return;
 
    }
 

	
 
    let num_threads = app.value_of("threads").unwrap();
 
    let num_threads = match num_threads.parse::<i32>() {
 
        Ok(num_threads) => {
 
            if num_threads < 0 || num_threads > 255 {
 
                println!("ERROR: Number of threads must be a number between 0 and 256");
 
                return;
 
            }
 

	
 
            num_threads as u32
 
        },
 
        Err(err) => {
 
            println!("ERROR: Failed to parse number of threads\nbecause: {}", err);
 
            return;
 
        }
 
    };
 

	
 
    let log_level = app.value_of("loglevel").unwrap();
 
    let log_level = match log_level {
 
        "none" | "None" => rw::runtime2::LogLevel::None,
 
        "debug" | "Debug" => rw::runtime2::LogLevel::Debug,
 
        "info" | "Info" | "all" | "All" => rw::runtime2::LogLevel::Info,
 
        _ => {
 
            println!("ERROR: Unexpected log level");
 
            return;
 
        }
 
    };
 

	
 
    let standard_library_dir = app.value_of("stdlib")
 
        .map(|v| v.to_string());
 

	
 
    // Add input files to file buffer
 
    let input_files = input_files.unwrap();
 
    assert!(input_files.len() > 0); // because arg is required
 

	
 
    let mut builder = rw::ProtocolDescriptionBuilder::new(standard_library_dir)
 
        .expect("create protocol description builder");
 
    let mut file_buffer = Vec::with_capacity(4096);
 

	
 
    for input_file in input_files {
 
        print!("Adding file: {} ... ", input_file);
 
        let mut file = match File::open(input_file) {
 
            Ok(file) => file,
 
            Err(err) => {
 
                println!("FAILED (to open file)\nbecause:\n{}", err);
 
                return;
 
            }
 
        };
 

	
 
        file_buffer.clear();
 
        if let Err(err) = file.read_to_end(&mut file_buffer) {
 
            println!("FAILED (to read file)\nbecause:\n{}", err);
 
            return;
 
        }
 

	
 
        if let Err(err) = builder.add(input_file.to_string(), file_buffer.clone()) {
 
            println!("FAILED (to tokenize file)\nbecause:\n{}", err);
 
            return;
 
        }
 

	
 
        println!("Success");
 
    }
 

	
 
    // Compile the program
 
    print!("Compiling program ... ");
 
    let protocol_description = match builder.compile() {
 
        Ok(pd) => pd,
 
        Err(err) => {
 
            println!("FAILED\nbecause:\n{}", err);
 
            return;
 
        }
 
    };
 

	
 
    println!("Success");
 

	
 
    // Start runtime
 
    print!("Startup of runtime ... ");
 
    let runtime = rw::runtime2::Runtime::new(num_threads, log_level, protocol_description);
 
    if let Err(err) = &runtime {
 
        println!("FAILED\nbecause:\n{}", err);
 
    }
 
    println!("Success");
 

	
 
    // Make sure there is a nameless module with a main component
 
    print!("Creating main component ... ");
 
    let runtime = runtime.unwrap();
 
    if let Err(err) = runtime.create_component(b"", b"main") {
 
        use rw::ComponentCreationError as CCE;
 
        let reason = match err {
 
            CCE::ModuleDoesntExist => "Input files did not contain a nameless module (that should contain the 'main' component)",
 
            CCE::DefinitionDoesntExist => "Input files did not contain a component called 'main'",
 
            CCE::DefinitionNotComponent => "Input file contained a 'main' function, but not a 'main' component",
 
            _ => "Unexpected error"
 
        };
 
        println!("FAILED\nbecause:\n{} (raw error: {:?})", reason, err);
 
        return;
 
    }
 

	
 
    println!("Success");
 
    println!("Now running until all components have exited");
 
    println!("--------------------------------------------\n\n");
 
}
 
\ No newline at end of file
docs/runtime/01_runtime.md
Show inline comments
 
# Runtime Design
 

	
 
## General Architecture
 

	
 
Roughly speaking the project consists of the following parts:
 

	
 
1. 
 
   The compiler itself. This transforms the PDL source code into an executable format.
 
2. 
 
   The interpreter. This takes the executable format and executes it. It is a very unoptimized AST walker based on stack frames. Generally speaking the bottommost frame in the stack contains the code and memory associated with a component.
 

	
 
    Once the interpreter hits a point where it needs to interact with the runtime (generally in order to communicate with another component) it will halt and emit a signal to the runtime.
 
3. 
 
   The runtime. This is the code that keeps track of the components, decides when they can run, where their messages should end up, and bring various control algorithms (running behind the scene) to completion.
 

	
 
We'll not go into points 1 and 2 in this document. One may simply assume that at the language level there is support for all of the things that are implemented in the runtime: sync blocks, channels, ports, component creation, sending and receiving messages, etc.
 

	
 
Once such builtin features are encountered in the interpreter (e.g. the creation of a channel), a signal will be emitted to the runtime. A rough outline of the architecture, and handling these signals, is discussed in this document.
 

	
 
## Runtime Architecture
 

	
 
The runtime's code essentially consists of:
 

	
 
- The machinery that keeps track of the components. That is to say: there is some memory reserved for each of the components. And using some kind of component ID we can look up this memory in a registry. If the runtime finds that there are no more user-controlled components running (i.e. the ones that a programmer uses to create more components) and there are no more regular components running, then the runtime shuts down.
 
- A bit of shared memory for all of the OS threads that will be managed by the runtime. This mainly consists of a work queue. This work queue contains the identities of the components that are scheduled for execution.
 
- A set of scheduler threads. They attempt to retrieve work from the work queue. This work is in the form of component IDs, which they use to retrieve the associated component and run its PDL code. They do this by invoking the interpreter on the component's current execution and memory state. Once a runtime signal (as mentioned above) is emitted by the interpreter, the scheduler thread will deal with it appropriately.
 
- An auxilliary polling thread. Not of great importance, so a short description suffices: although most components react to one another, some components (e.g. a TCP socket) might have nothing to do until the OS instructs it to do something. This polling thread ensures that once there is something to do, the component is put back onto the work queue.
 

	
 
As long as the runtime doesn't shut down there will be `T` threads executing `N` components. A component can either be running (by being executed by a scheduler thread), scheduled for execution (its ID is in the work queue), or sleeping. All of these states are exclusive. Maintaining the exclusivity of these states is of great importance! We never want to end up in a place where two threads are both modifying the code/memory state of the same component!
 

	
 
A component will start its lifecycle as being put into the work queue (not exactly true, but this will be dealt with later) by the component that created it. At some point a scheduler will pick up the newly created component's ID from the work queue and start executing its code. Once running (and we'll exclude fatal errors for now) the component may at some point reach the end of its code and terminate, or it may encounter a place in its code where it blocks and needs to wait for external input (e.g. it performed a `get`, but a message has not arrived yet).
 

	
 
Once the execution of a component is blocked, it will attempt to go to sleep. Meaning: it will be in a state where it is not running, but also not scheduled for execution. A component may then be woken up by a different component, or the polling thread, by sending the sleeping a component a message. To prevent components from scheduling a sleeping component multiple times, the memory of a component contains an atomic `sleeping` boolean.
 

	
 
It is instructive at this point to roughly explain how components are stored in memory. The components memory is roughly divided into two regions. There is the publicly accessible memory of a component and a private memory region. The public part is accessible by all scheduler threads. So all of the memory in the public memory region is somehow behind a lock, or some kind of locking mechanism (we will include the concept of atomics into "some kind of locking mechanism" for now). Hence the aforementioned `sleeping` boolean lives in this region. Conversely, the private memory region of a component is only accessed by the scheduler thread that is running the component. So here we store things like the memory state and execution state of the component.
 

	
 
Returning to the idea of a component wishing to enter the "sleeping" state. The procedure in pseudocode is written as:
 

	
 
```
 
func go_to_sleep(this_component) {
 
   // We are currently executing, so our sleeping flag MUST be `false`
 
   assert(atomic::load(this_component.sleeping) == false);
 
   atomic::store(this_component.sleeping, true); // we force the flag to true
 
   // Note that while we were trying to go to sleep, someone has sent us a new
 
   // message, but it did not see yet that we stored `false` in the sleeping
 
   // flag, so we need to check ourselves.
 
   if messages_in_inbox() {
 
      // We try to set the flag back to false, but another scheduler thread may
 
      // have already done this
 
      let swap_success = atomic::cas(this_component.sleeping, true, false);
 
      if swap_success {
 
         put_in_work_queue(this_component.id);
 
      }
 
   }
 
}
 
```
 

	
 
Similarly, each time we try to send a component a message, we must do the following:
 

	
 
```
 
func send_message_to(target_component, message_data) {
 
   put_message_in_inbox_locked(target_component.inbox, message_data);
 
   let swap_success = atomic::cas(target_component.sleeping, true, false);
 
   if swap_success {
 
      put_in_work_queue(target_component.id);
 
   }
 
}
 
```
 

	
 
Note that, because we cannot predict how the OS threads are scheduled, we can also not predict the way in which our own schedulers (which are running on OS threads) will schedule the execution of the components. Hence as a mental model, one may assume that each component is running in its own thread. The system above ensures that if a component has something to do (because it has received a message), it will eventually end up being executed by a scheduler. With the code for the "sleeping" state we'll ensure that a component can only be executed by one scheduler at a time.
 

	
 
## General Messaging
 

	
 
With this rough architecture, components can send each other messages. One will find three kinds of messages in the runtime (okay, four, but the last one is just to make the OS polling thread work):
 

	
 
1. Data messages: these are the messages that are sent by `put` calls and received with `get` calls. As will be explained later in this document, more information is attached to data messages than the values given as argument to the `put` call. These messages will arrive in the target component's inbox. When the target component performs a call to `get` they're pulled out of the inbox and transferred to the component's memory state, such that the PDL code can read the transferred values.
 
2. Control messages: to facilitate certain operations permitted within PDL, the scheduler thread may decide to send messages to other components. These messages are called control messages. We'll encounter them later in this document when describing component creation, transmitting ports, and closing channels. Different from data messages, which may linger in the component's inbox for a while, control messages are handled by the receiving component immediately. This is important for various control algorithms.
 
3. Sync messages: to facilitate the consensus algorithm, there will be messages initiated by the scheduler thread as well. That is to say: when the component sends data messages there will be information attached to the data message that facilitates a successful sync round. But apart from that when the components are all done executing their code they must somehow reach consensus. This is done through these sync messages.
 

	
 
Note that already the concept of a channel, each having its own little slot (or limited buffer), becomes a superfluous design decision for the runtime. This is because the scheduler threads themselves also need to be able to send messages to other components. Hence we'll need something more generic than a per-channel buffer, namely a generic message inbox.
 

	
 
As we'll later also see, the concept of a directional channel *may* be a useful tool within PDL code (and I'm not arguing against such a concept), but as we'll see throughout this document the control messages can (conceptually) flow both ways over the channel.
 

	
 
## Runtime Design Drivers
 

	
 
Most design choices in the runtime were based on the fact that the Reowolf language should facilitate easier programming over the internet. So although the entire system currently only considers components running on a single machine, those components were conceptually regarded as living on different machines. In other words: components were conceptually considered not to have shared memory they can both access.
 

	
 
This has some implications for channels. Certainly a component that sends a value must have it stored somewhere temporarily, and the component receiving it needs to keep it around as well. But the channel is not an entity that you'll find in memory. Rather there is one component owning one port, and when a message is `put` through it, it will arrive at another component owning the peer port; there is no memory sahred between components that will store a message flowing through a channel.
 

	
 
A multi-machine runtime also requires the runtime to embrace the asynchronous nature of components. `put`s are non-blocking and can be performed one after the other before the peer has performed a corresponding `get`. The language does not contain the concept of a "lock" such that two components can agree on who owns a shared bit of memory. Rather each component is executed in its own thread of execution, and for multiple components to coordinate their actions they must use the messaging facilities. In order to make this coordination-through-messages somewhat simple to reason about one of the design drivers of the runtime was to ensure that each message sent in a specific order from one component to another will arrive in that same order at the target component.
 

	
 
And so we have a multi-machine runtime where components running in their own thread can only coordinate through messages. As a result an ever-important consideration in designing internal (control) algorithms is something called "message crossing". Two components may decide to initiate a protocol at the same time, hence send each other the exact same protocol-initiating message (e.g. we have components `A` and `B`, and a protocol that requires an initiator to send a `Request` message, and then wait for a response in terms of a `Response` message, then we may have `A` and `B` both sending each other `Request` at the same time).
 

	
 
Yet another result is that we decided to design the runtime without any globally unique component and/or port IDs. Certainly: on a single machine a component IDs allows one to retrieve a component's memory. But when sending a message to a component living on another machine, it may well be that we're sending it to a through a port that has the same port ID as ours, and targeting a component that has the same ID as ours.
 

	
 
## Control Algorithms
 

	
 
We'll now discuss several of the control algorithms. These control algorithms may be initiated by the scheduler threads when certain runtime signals are emitted by the interpreter. The control algorithms are brought to completion by sending messages. We'll talk about these messages as if they're sent from component to another component (this is for the sake of clarity: in reality they're sent by one scheduler thread to the memory location reserved for the target component's inbox). Because messages may be relayed one or more times before they arrive at the intended receiver (we'll introduce this concept soon), most messages include their intended target port in some kind of message header. This is true for all data messages, and most control messages. Only when a component is certain about the identity of the receiving component can it send messages without a target port in a header.
 

	
 
### Changing Port Peers due to Component Creation
 

	
 
Components, when they're not in sync mode, may decide to create new components. Ports may be used as the arguments to these newly created components. The rule we place on such a kind of port transfer is that the component that is creating the new component fully relinquishes ownership of the transferred port, and after the new component is created, the new component owns that port. As an annotated example:
 

	
 
```
 
comp creator(in<u32> one_port) {
 
   channel another_port -> and_a_final_one;
 
   sync {
 
      auto value = get(one_port); // legal, just receiving an integer
 
      put(another_port, 1337); // legal, sending a value over an owned
 
   }
 
   // perform component creation
 
   new some_other_component(one_port, and_a_final_one); // transferring two ports
 
   
 
   sync get(one_port); // illegal! Port was transferred
 
   sync put(another_port, 1338); // still legal, we still own this port
 
   sync get(and_a_final_one); // also illegal, port was transferred.
 
}
 
```
 

	
 
We have several runtime properties to uphold when we're transferring ports:
 

	
 
- No globally unique port IDs, so the new component is allowed to choose new port IDs for the ports it is adopting ownership of.
 
- The peers of the transferred ports may be unaware that a new component is created. In fact those peers may have already transferred messages to the instantiating component! As a design decision (the one that we find makes sense) any incoming, but unread, messages for a port are transferred along to the new component.
 
- Similarly to the above: a peer of a transferred port needs to be aware at some point that its peer port has changed ownership.
 
- Together with the requirement that peers need to be aware of the transferred ports, we also need to maintain ordering in the sent messages that intend to arrive at that transferred port at some point in time.
 

	
 
Here we see the asynchronous nature of the runtime rear its head. Because the transferring of ports does not just happen to the receiving end of a port (in which case we transfer already received messages, hence messages only arrive at their correct destination eventually). It may happen to the transmitting end of a port as well. What this means for the receiver is that it is never sure which component is its peer until it has recevied a data message that is annotated with the origin of the message. At that moment in time the peer of the port is known, but only until the end of the synchronous round. Because after the synchronous round it is perfectly possible for the port to be passed around again.
 

	
 
For all of the requirements above, the internal control algorithm to transfer a port to a new component is as following:
 

	
 
1. The component that is creating the new component (we'll call the creator the instantiator component, and the created one the new component) temporarily has access to the private memory of the new component. Reason being is that a component is always created on the same machine as the instantiator component. And so the first step it takes is to create new port IDs (that make sense for the newly created component, instead of for the instantiator component) and map the old port IDs to the new ones.
 
2. The component transfers all of the metadata associated with the port, and transfers all of the messages that are targeted at those transferred ports to the new component.
 
3. For each transferred port the instantiator sends a `PortPeerChanged_Block` control message to the peers. This message instructs the peer that the port should be temporarily blocked. Any component that tries to send a message through that port enters a blocked state that can only be lifted if the corresponding `PortPeerChanged_Unblock` control message is sent. At the same time the instantiator sets up a special bit of code that will relay all incoming messages from that peer to the new component. We've mentioned earlier that all messages will have a target port. So when messages arrive at the instantiator component that need to be relayed, the instantiator component will modify the target port to the new component's chosen port ID.
 
4. Once a peer has received a `PortPeerChanged_Block`, it will, as stated above, stop sending messages over that channel. Not only data messages, but control messages as well. This also means that if the other component cannot start transferring ports itself. In any case, it will respond with an `Ack`nowledgement back to the instantiator component.
 
5. The instantiator component waits until it has received an `Ack` for all of the `PortPeerChanged_Block` message it has sent. This is done such that we're sure that we've received all of the messages that are actually intended for the new component (because while the new component is being created the peer may still be sending messages intended for the new component, but sent to the instantiator component). As a result, the new component will have all of the data messages in the inbox in the order in which they were sent, therefore maintaining the runtime property of message ordering.
 
6. When all of the `Ack`s are received, the instantiator component will remove the bit of code that relays all of the messages and will schedule the new component for execution. At this point the instantiator component will no longer access the private memory of the new component. Since the instantiator component is aware of the new component's ID and the new port IDs for all of the transferred ports, it will send `PortPeerChanged_Unblock` messages to all of the peer components. This message will also contain the new component's ID and its port ID.
 
7. The peers, upon receiving the `PortPeerChanged_Unblock` message, will update the metadata of their ports such that they point to the new component's ports. They will also unblock the port such that messages can be sent again.
 

	
 
With this control algorithm, all peers are now aware of the new port's position. We've also maintained message ordering for the message sent to the new component. Although it was mentioned in point (4), we'll mention it here to be extra clear: creating a new component will be blocked until all of the transferred ports are unblocked. If we don't do this a data/control message may end up at the wrong component.
 

	
 
Likewise we see the asynchronous nature of ports: the peers are eventually consistent. This is why we stressed earlier that almost all messages have their targeted port in their message header. This is needed such that a component like the instantiator discussed above knows when to relay messages. In this process the relaying component will also update the target port ID in the header to the new port ID.
 

	
 
### Shutting Down Components
 

	
 
A component will require a bit of memory to run. So when we're done executing a component (either because it has crashes, or because its program has terminated) we would like to release this memory again. Earlier we mentioned that components send messages by accessing an inbox in the public memory region of a component. This memory will, ofcourse, be freed as well. So we need to make sure that when a component shuts down, all of its peers will somehow be notified that they can never send messages to that terminated component again.
 

	
 
In order to do so we have another control protocol. We'll extend this protocol when we discuss encountering crashing components, but we'll introduce a simpler variant here. The protocol is relatively simple. For each of the ports that are not yet closed and are owned by the terminating component we will:
 

	
 
1. Make sure that the port is not blocked. If the port is blocked then the component blocks until the associated port is becomes unblocked. If the port is already closed then we do not execute the other steps in this control algorithm.
 
2. The port will send a `ClosePort` message to the peer of the port that is closing. Note that this `ClosePort` message will have a target port. If it happens to be that the terminating component will receive a `PortPeerChanged_Block` message for that port in the near future, we're certain that the `ClosePort` message will at least arrive at the correct peer (since the target port will be used to relay that message to the correct receiver).
 
3. The peer of the port, upon receiving a `ClosePort` message, will mark the port as being closed in its metadata. From that point onwards, any attempt to `put` or `get` on that port will result in the peer component crashing. In response to the `ClosePort` message, the peer component will send an `Ack`.  There is one exception, and that is when the peer component itself already initiated a `ClosePort` control algorithm for that port. In that case the incoming `ClosePort` message is treated like an `Ack`.
 
4. The terminating component will wait until all of the `Ack`s have arrived (or crossing `ClosePort` messages, as stated in point (3)). Once they do, they will instruct the runtime to remove the component from memory.
 

	
 
To reiterate: we have to be careful and annotate the `ClosePort` message with the target port. The terminating component will delay sending a `ClosePort` message if the port is blocked, but it may be that we have the `ClosePort` message crossing with a `PortPeerChanged_Block` message. Which implies that our `ClosePort` message will be relayed by the peer component.
 

	
 
### Transferring Ports through Data Messages
 

	
 
The PDL code allows for ports to be transferred through ports. As a simple example, consider the following code:
 

	
 
```
 
struct Pair {
 
   in<bool> command,
 
   out<u32> response,
 
}
 

	
 
comp some_component(
 
   in<u32> to_transmit,
 
   out<in<u32>> tx_one_port,
 
   out<Pair> tx_two_ports
 
) {
 
   // Transmitting a port directly
 
   sync put(tx_one_port, to_transmit);
 
   
 
   // Transmitting multiple ports at a time using a data structure
 
   channel command_tx -> command_rx;
 
   channel response_tx -> response_rx;
 
   
 
   sync {
 
      let message = Pair{ 
 
         command: command_rx,
 
         response: response_tx,
 
      };
 
      put(tx_two_ports, message);
 
   }
 
   
 
   // Sending a command and receiving a response
 
   sync {
 
      put(command_tx, true);
 
      auto response = get(response_rx);
 
   }
 
}
 
```
 

	
 
To facilitate this, we'll follow roughly the same procedure as when we're transferring ports to a newly created component. But we have one complication: we do not have direct access to the private memory of the component we're sending the ports to (we'll call this component the "adopting component", and the sending component the "relinquishing component"). And so we'll have to follow a control protocol that is slightly different.
 

	
 
Note that it is perfectly okay to send closed ports. The adopting component will receive this component together with the information that the port is closed. In this way, if the adopting component attempts a `put` or `get` on that received component, it will crash.
 

	
 
We'll enforce a second rule upon transferring ports. Namely that ports transferred in a synchronous round may not have been used in `get` or `put` operations. I'm certain that it is possible to come up with a set of rules that will make this possible. But the protocol for transferring components over channels is a lot easier if we disallow this. For this reason we'll introduce a field in the metadata for each port that registers when the port was last used. If the relinquishing component attempts to transfer a port that has been used within the same sync round, then it will crash.
 

	
 
Like before we want to ensure that all messages intended for the transferred port arrive in the correct order at the adopting component.
 

	
 
And so the control protocol for transmitting ports proceeds as following:
 

	
 
1. The relinquishing component will first make sure that none of the ports are blocked. If the ports are blocked then it will sleep until the ports become unblocked. As stated above the relinquishing component will also make sure that the ports were not previously used within the synchronous round.
 
2. The relinquishing component will send `PortPeerChanged_Block` message to all of the peers of the ports that will be transferred. However, in this case it will not relay any messages to the new component, they will still pile up in the relinquishing component's inbox.
 
3. The peers, upon receiving the `PortPeerChanged_Block` message, will proceed as they would in the case where ports were transferred to a new component: they'll block the port and send an `Ack`.
 
4. The relinquishing component will wait until all of the expected `Ack` message are received. Once they are received the component will wait until the port the message will travel through becomes unblocked (that is: the port that is used to transfer the ports to the adopting component).
 
5. The relinquishing component will send the data message containing the transferred ports to the adopting component. It will annotate this message with a list containing `(tranferred port ID, peer component ID, peer port ID)` triples. Note that since those peer ports are blocked, they will not be transferred in the meantime. This is essential for the next step.
 
6. The adopting component will receive the annotated data message containing the transferred ports. For each transferred port it will decide upon a new port ID.
 
7. The adopting component will, for each adopted port, send out a `PortPeerChanged_Unblock` message to the blocked peer ports. This message will be annotated with the `(adopting component ID, new port ID)` pairs. Such that the peers all know where the peers can be found.
 

	
 
## Dealing with Crashing Components
 

	
 
### The cases in which peers crash in response
 

	
 
A component may at any point during its execution be triggered to crash. This may be because of something simple like an out-of-bounds array access. But as described above using closed ports may lead to such an event as well. In such a case we not only need to go through the `ClosePort` control protocol, to make sure that we can remove the crashing component's memory from the runtime, but we'll also have to make sure that all of the peers are aware that *their* peer has crashed. Here we'll make a design decision: if a peer component crashes during a synchronous round and there were interactions with that component, then that interacting component should crash as well. The exact reasons will be introduced later, but it comes down to the fact that we need to do something about the fact that the synchronous round will never be able to complete.
 

	
 
We'll talk ourselves through the case of a component crashing before coming up with the control algorithm to deal with components crashing.
 

	
 
We'll first consider that a component may crash inside our outside of a synchronous block. From the point of view of the peer component, we'll have four cases to consider:
 
We'll first consider that a component may crash inside or outside of a synchronous block. From the point of view of the peer component, we'll have four cases to consider:
 

	
 
1. The peer component is not in a synchronous block. 
 
2. The crashing component died before the peer component entered the synchronous block.
 
3. The crashing component died during the same synchronous block as the peer component.
 
4. The crashing component died after reaching consensus on the synchronous block that the peer component is currently still in.
 

	
 
Before discussing these cases, it is important to remember that the entire runtime has components running in their own thread of execution. We may have that the crashing component is unaware of its peers (due to the fact that peer ports might change ownership at any point in time). We'll discuss the consensus algorithm in more detail later within the documentation. For now it is important to note that the components will discover the synchronous region they are part of while the PDL code is executing. So if a component crashes within a synchronous region before the end of the sync block is reached, it may be possible that it will not discover the full synchronous region it would be part of.
 

	
 
Because the crashing component is potentially unaware of the component IDs it will end up notifying that is has failed, we can not design the crash-handling algorithm in such a way such that the crashing component notifies the peers of when they have to crash. We'll do the opposite: the crashing component simply crashes and somehow attempts to notify the peers. Those peers themselves decide whether they have to crash in response to such a notification.
 
Because the crashing component is potentially unaware of the component IDs it will end up notifying that it has failed, we can not design the crash-handling algorithm in such a way such that the crashing component notifies the peers of when they have to crash. We'll do the opposite: the crashing component simply crashes and somehow attempts to notify the peers. Those peers themselves decide whether they have to crash in response to such a notification.
 

	
 
For this reason, it does not make a lot of sense to deal with component failure through the consensus algorithm. Dealing with the failure through the consensus algorithm only makes sense if we can find the synchronous region that we would have discovered if we were able to fully execute the sync block of each participating component. As explained above: we can't, and so we'll opt to deal with failure on a peer-by-peer basis.
 

	
 
We'll go back to the four cases we've discusses above. We'll change our point of view: we're now considering a component (the "handling component") that has to deal with the failure of a peer (the "crashing component"). We'll introduce a small part of our solution a-priori: like a component shutting down, a failing component will simply end its life by broadcasting `ClosePort` message over all of its owned ports that are not closed (and the failing component will also wait until all of those ports are not blocked).
 
We'll go back to the four cases we've discusses above. We'll change our point of view: we're now considering a component (the "handling component") that has to deal with the failure of a peer (the "crashing component"). We'll introduce a small part of our solution a-priori: like a component shutting down, a failing component will simply end its life by broadcasting `ClosePort` message over all of its owned ports that are not closed (and, like the other control algorithms. the failing component will wait for the port that is shutting down to become unblocked before it will send the `ClosePort` message).
 

	
 
In the first case, we're dealing with a failing component while the handling component is not in a synchronous block. This means that if there was a previous synchronous block, that it has succeeded. We might still have data messages in our inbox that were sent by the failing component. But in this case it is rather easy to deal with this: we mark the ports as closed, and if we end up using them in the next synchronous block, then we will crash ourselves.
 

	
 
In the second case we have that the peer component died before we ourselves have entered the synchronous block. This case is somewhat equivalent to the case we described above. The crashing component cannot have sent the handling component any messages. So we mark the port as closed, potentially failing in the future if they end up being used.
 
In the second case we have that the peer component died before we ourselves have entered the synchronous block. This case is somewhat equivalent to the case we described above. The crashing component cannot have sent the handling component any messages. So we mark the port as closed, potentially failing in the future if they end up being used. However, the handling component itself might've performed `put` operations already. So now that the handling component receives a `ClosePort` message, it realizes that those earlier `put` operations can never be acknowledged. For this reason a component stores when it last used a port in the metadata associated with a port. When, in this second case, a `ClosePort` message comes in while the port has been used already, the handling component should crash as well.
 

	
 
Next up is the third case, where both the crashing component and the handling component were both in the same synchronous round. Like before we mark the port as closed and future use will cause a crash. Like the second case, if the handling component has already used a port (which in this case may also be having received a message from the crashing component), then it should crash as well.
 

	
 
The fourth case is where the failing component crashes *after* the handling component finished its sync round. This is an edge cases dealing with the following situation: both the handling as the crashing component have submitted their local solution to the consensus algorithm (assumed to be running somewhere in a thread of execution different from the two components). The crashing component receives a global solution, finishes the sync round, and then crashes, therefore sending the `ClosePort` message to the handling component. The handling component, due to the asynchronous nature of the runtime, receives the `ClosePort` message before the global solution has a chance to reach the handling component. In this case, however, the handling component should be able to finish the synchronous round, and it shouldn't crash.
 

	
 
### Distinguishing the crashing cases
 

	
 
So far we've pretended like we could already determine the relation between the crashing component's synchronous round and the handling component's synchronous round. But in order to do this we need to add a bit of extra information to the `ClosePort` message.
 

	
 
Next up is the third case, where both the crashing component and the handling component were both in the same synchronous round. Like before we mark the port as closed and future use will cause a crash. The difference is that the handling component may be blocked on attempting to `get` from a port which the crashing component now indicates is closed, perhaps we might have already performed successful `get`/`put` operations. In that case the handling component should crash: the crashing component can never submit its local solution, so the synchronous round can never succeed! And so we need to have metadata stored for the component that tracks if the port was used in the synchronous round. If the component is used in the synchronous round and a `ClosePort` message comes in, then the component should crash as well.
 
The simplest case is to determine if the two components are both in the same synchronous round (case three, as described above). The crashing component annotates the `ClosePort` message with whether it was in a synchronous round or not. Then if both components are in a synchronous round (as checking by the handling component), and the about-to-be-closed port at the handling component was used in that round, or will be used in that round, then the handling component should crash.
 

	
 
The fourth case is where the failing component crashes *after* the handling component finished its sync round. This is an edge cases dealing with the following situation: both the handling as the crashing component have submitted their local solution to the consensus algorithm. The crashing component receives a global solution, finishes the sync round, and then crashes, therefore sending the `ClosePort` message to the handling component. The handling component, due to the asynchronous nature of the runtime, receives the `ClosePort` message before the global solution has a chance to reach the handling component. In this case, however, the handling component should be able to finish the synchronous round, and it shouldn't crash.
 
Equally simple: the handling component can figure out itself if it is in a synchronous round (case one, as described above). If not: then the port is marked closed and future use causes crashes.
 

	
 
Here is where we arrive at the protocol for dealing with component crashes. To let the handling component deal with this last case, we'll let the crashing component send the `ClosePort` messages together with a boolean indicating if it crashed inside, or outside of a synchronous block.
 
The last two cases require a bit more work: how do we distinguish the edge case where the handling component's round will complete in the future, from the case where it should crash. To distinguish the edge case we need the handling component to know if the last interaction the crashing component handled was the one in the handling component's current synchronous round.
 

	
 
If the crashing component sends `ClosePort` together with the indication that it crashed inside of the synchronous block, then we're dealing with case 3 *if* there are messages in the inbox, or if the handling component uses the closed port after receiving the `ClosePort` message. But if the crashing component sends `ClosePort` together with the indication that it crashed outside of a synchronous block, then we check if we have performed any operations on the port in the synchronous round. If we have performed operations *and* have received messages from that component, then apparently the synchronous round will succeed. So we will not immediately crash. Otherwise we will.
 
For this reason we keep track of the synchronous round number. That is to say: there is a counter that increments each time a synchronous round completes for a component. We have a field in the metadata for a port that registers this round number. If a component performs a `put` operation, then it stores its own round number in that port's metadata, and sends this round number along with the message. If a component performs a `get` operation, then it stores the *received* round number in the port's metadata.
 

	
 
In this way we modify the control algorithm for terminating components. Now we're able to deal correctly with crashing components.
 
When a component closes a port, it will also send along the last registered round number in the `ClosePort` message. If the handling component receives a `ClosePort` message, and the last registered round number in the port's metadata matches the round number in the `ClosePort` message, and the crashing component was not in a synchronous round, then the crashing component crashed after the handling component's sync round. Hence: the handling component can complete its sync round.
 

	
 
To conclude: if we receive a `ClosePort` message, then we always mark the port as closed. If the handling and the crashing component were in a synchronous round, and the closed port was used in that synchronous round, then the handling component crashes as well. If the handling component *is* in a synchronous round but the crashing component *is not* in a synchronous round, the port of the handling component is used in the synchronous round and the port's last registered round number does not match the round number in the `ClosePort` message, then the handling component crashes as well.
 

	
 
## Sync Algorithm
 

	
 
A description of the synchronous algorithm is present in different documents. We will mention here that central to the consensus algorithm is that two components agree on the interactions that took place over a specific channel. In order for this to happen we'll send along a lot of metadata when trying to reach consensus, but here we're just concerned with attempting to match up the two ends of a channel. 
 

	
 
A port is identified by a `(component ID, port ID)` pair, and channel is a pair of those identifying pairs. So to match up the two ends of a channel we would have to find a consistent pair of ports that agree on who their peers are. However, we're dealing with the problem of eventual consistency: `put`ting ports never know who their peer is, because the sent message might be relayed. However, `get`ting ports *will* know who their peer is for the duration of a single synchronous round once they've received a single message.
 

	
 
This is the trick we will apply in the consensus algorithm. If a channel did not see any messages passing through it, then the components that own those ports will not have to reach consensus because they will not be part of the same synchronous region. However if a message did go through the channel then the components join the same synchronous region, and they'll have to form some sort of consensus on what interaction took place on that channel.
 

	
 
And so the `put`ting component will only submit its own `(component ID, port ID, metadata_for_sync_round)` triplet. The `get`ting port will submit information containing `(self component ID, self port ID, peer component ID, peer port ID, metadata_for_sync_round)`. The consensus algorithm can now figure out which two ports belong to the same channel.
 
\ No newline at end of file
 
And so the `put`ting component will only submit its own `(component ID, port ID, metadata_for_sync_round)` triplet. The `get`ting port will submit information containing `(self component ID, self port ID, peer component ID, peer port ID, metadata_for_sync_round)`. The consensus algorithm can now figure out which two ports belong to the same channel.
 

	
 
## Component Nomenclature
 

	
 
Earlier versions of the Reowolf runtime featured the distinction between primitive and composite components. This was put into the language from a design perspective. Primitive components could do nitty-gritty protocol execution: perform `put`/`get` operations, and entering into sync blocks. Conversely, composite components were tasked with setting up a network of interconnected components: creating channels and handing off the appropriate ports to the instantiated components.
 

	
 
Once the runtime was capable of sending ports over channels, it became apparent that this distinction no longer made sense. Because if only primitive components can send/receive ports, and cannot create new components, then the programmer is limited to using those received ports directly in the primitive's code! And so the split between primitive and composite components was removed: only the concept of a "component" is left.
 
\ No newline at end of file
docs/runtime/04_known_issues.md
Show inline comments
 
# Known Issues
 

	
 
The current implementation of Reowolf has the following known issues:
 

	
 
- Cannot create uninitialized variables that are later known to be initialized. This is not a problem for the regular types (perhaps a bit tedious), but is a problem for channels/ports. That is to say: if a component needs a temporary variable for a port, then it must create a complete channel. e.g.
 

	
 
  ```
 
  comp send(out<u32> tx1, out<u32> tx2, in<bool> which) {
 
    channel unused -> temporary;
 
    while (true) sync {
 
      if (get(which)) {
 
        temporary = tx1;
 
      } else {
 
        temporary = tx2;
 
      }
 
      put(temporary, 1);
 
    }
 
  }
 
  ```
 

	
 
  Another solution would be to use an empty array and to put a port inside of that. Hacks galore!
 

	
 
- Reserved memory for ports will grow without bounds: Ports can be given away from one component to another by creating a component, or by sending a message containing them. The component sending those ports cannot remove them from its own memory if there are still other references to the transferred port in its memory. This is because we want to throw a reasonable error if that transferred port is used by the original owner. Hence we need to keep some information about that transferred port in the sending component's memory. The solution is to have reference counting for the ports, but this is not implemented.
 

	
 
- An extra to the above statements: when transferring ports to a new component, the memory that remembers the state of that port is removed from the component that is creating the new one. Hence using old references to that port within the creating component's PDL code results in a crash.
 

	
 
- Some control algorithms are not robust under multithreading. Mainly error handling when in sync mode (because there needs to be a revision where we keep track of which components are still reachable by another component). And complicated scenarios where ports are transferred.
 

	
 
- There is an assertion in the interpreter that makes sure that there are no values left on the expression stack when a statement has completed. This is not true when you have an expression statement! If you want to remove this assertion make sure to clear the stack (using the method on the `Store`).
 

	
 
- The TCP listener component should probably do a `shutdown` before a `close` on the socket handle. It should also set the `SO_REUSEADDR` option.
 

	
 
- The TCP listener and TCP sender components have not been tested extensively in a multi-threaded setup.
 

	
 
- The way in which putting ports are ordered to block if the corresponding getter port's main inbox is full is rather silly. This led to the introduction of the "backup inbox" as it is found in the runtime's code. There is a design decision to make here, but the current implementation is a bit silly. There are two options: (a) have an atomic boolean indicating if the message slot for an inbox is full, or (b) do away with the "main inbox" alltogether, and have an unbounded message queue.
 

	
 
- For practical use in components whose code supports an arbitrary number of peers (i.e. their code contains an array of ports that is used for communication and changes in size during the component's lifetime), the `select` statement somehow needs to support waiting on any one of those ports.
 

	
 
- The compiler currently prevents one from using `sync` blocks (or corresponding `get` and/or `put` operations) in functions. They can only be used within components. When writing large programs this it makes it rather hard to re-use code: all code that interacts with other components can only be written within a sync block. I would advise creating `sync func`, `nonsync` func and regular `func`tions. Where:
 
  
 
  - `sync func`tions can only be called from within `sync` functions. They may not open new sync blocks, but may perform calls to `get`/`put`. These are useful to encapsulate sequences of `put`/`get` calls together with some common message-modifying code.
 
  - `nonsync func`tions (or `async func`tions) may only be called outside of sync blocks, and may open new sync blocks themselves. They are useful to encapsulate a single interaction with other components. One may also create new components here.
 
  - regular `func`tions. Are as useful as in any other language, but here we disallow calling `nonsync func`tions or `sync func`tions.
 

	
 
- The `Ack` messages that are sent in response to `PeerPortChanged_Block` messages should contain the sending components `(component ID, port ID)` pair in case the `PeerPortChanged_Block` message is relayed. When such an `Ack` message is received, the peer of the port must be updated before transferring the port to the new owner.
 

	
 
- The compiler currently accepts a select arm's guard that is formulated as `auto a = get(get(rx))`. This should be disallowed.
 
\ No newline at end of file
 
- The compiler currently accepts a select arm's guard that is formulated as `auto a = get(get(rx))`. This should be disallowed.
 

	
 
- The work queue in the runtime is still a mutex-locked queue. The `QueueMpsc` type should be extended to be a multiple-producer multiple-consumer queue. This type should then replace the mutex-locked work queue.
 
\ No newline at end of file
src/protocol/ast.rs
Show inline comments
 
@@ -789,792 +789,797 @@ impl Scope {
 
}
 

	
 
#[derive(Debug, Clone, PartialEq, Eq)]
 
pub enum VariableKind {
 
    Parameter,      // in parameter list of function/component
 
    Local,          // declared in function/component body
 
    Binding,        // may be bound to in a binding expression (determined in validator/linker)
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct Variable {
 
    pub this: VariableId,
 
    // Parsing
 
    pub kind: VariableKind,
 
    pub parser_type: ParserType,
 
    pub identifier: Identifier,
 
    // Validator/linker
 
    pub relative_pos_in_parent: i32,
 
    pub unique_id_in_scope: i32, // Temporary fix until proper bytecode/asm is generated
 
}
 

	
 
#[derive(Debug)]
 
pub enum Definition {
 
    Struct(StructDefinition),
 
    Enum(EnumDefinition),
 
    Union(UnionDefinition),
 
    Procedure(ProcedureDefinition),
 
}
 

	
 
impl Definition {
 
    pub fn is_struct(&self) -> bool {
 
        match self {
 
            Definition::Struct(_) => true,
 
            _ => false
 
        }
 
    }
 
    pub(crate) fn as_struct(&self) -> &StructDefinition {
 
        match self {
 
            Definition::Struct(result) => result,
 
            _ => panic!("Unable to cast 'Definition' to 'StructDefinition'"),
 
        }
 
    }
 
    pub(crate) fn as_struct_mut(&mut self) -> &mut StructDefinition {
 
        match self {
 
            Definition::Struct(result) => result,
 
            _ => panic!("Unable to cast 'Definition' to 'StructDefinition'"),
 
        }
 
    }
 
    pub fn is_enum(&self) -> bool {
 
        match self {
 
            Definition::Enum(_) => true,
 
            _ => false,
 
        }
 
    }
 
    pub(crate) fn as_enum(&self) -> &EnumDefinition {
 
        match self {
 
            Definition::Enum(result) => result,
 
            _ => panic!("Unable to cast 'Definition' to 'EnumDefinition'"),
 
        }
 
    }
 
    pub(crate) fn as_enum_mut(&mut self) -> &mut EnumDefinition {
 
        match self {
 
            Definition::Enum(result) => result,
 
            _ => panic!("Unable to cast 'Definition' to 'EnumDefinition'"),
 
        }
 
    }
 
    pub fn is_union(&self) -> bool {
 
        match self {
 
            Definition::Union(_) => true,
 
            _ => false,
 
        }
 
    }
 
    pub(crate) fn as_union(&self) -> &UnionDefinition {
 
        match self {
 
            Definition::Union(result) => result, 
 
            _ => panic!("Unable to cast 'Definition' to 'UnionDefinition'"),
 
        }
 
    }
 

	
 
    pub(crate) fn as_union_mut(&mut self) -> &mut UnionDefinition {
 
        match self {
 
            Definition::Union(result) => result,
 
            _ => panic!("Unable to cast 'Definition' to 'UnionDefinition'"),
 
        }
 
    }
 

	
 
    pub fn is_procedure(&self) -> bool {
 
        match self {
 
            Definition::Procedure(_) => true,
 
            _ => false,
 
        }
 
    }
 

	
 
    pub(crate) fn as_procedure(&self) -> &ProcedureDefinition {
 
        match self {
 
            Definition::Procedure(result) => result,
 
            _ => panic!("Unable to cast `Definition` to `Function`"),
 
        }
 
    }
 

	
 
    pub(crate) fn as_procedure_mut(&mut self) -> &mut ProcedureDefinition {
 
        match self {
 
            Definition::Procedure(result) => result,
 
            _ => panic!("Unable to cast `Definition` to `Function`"),
 
        }
 
    }
 

	
 
    pub fn defined_in(&self) -> RootId {
 
        match self {
 
            Definition::Struct(def) => def.defined_in,
 
            Definition::Enum(def) => def.defined_in,
 
            Definition::Union(def) => def.defined_in,
 
            Definition::Procedure(def) => def.defined_in,
 
        }
 
    }
 

	
 
    pub fn identifier(&self) -> &Identifier {
 
        match self {
 
            Definition::Struct(def) => &def.identifier,
 
            Definition::Enum(def) => &def.identifier,
 
            Definition::Union(def) => &def.identifier,
 
            Definition::Procedure(def) => &def.identifier,
 
        }
 
    }
 
    pub fn poly_vars(&self) -> &Vec<Identifier> {
 
        match self {
 
            Definition::Struct(def) => &def.poly_vars,
 
            Definition::Enum(def) => &def.poly_vars,
 
            Definition::Union(def) => &def.poly_vars,
 
            Definition::Procedure(def) => &def.poly_vars,
 
        }
 
    }
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct StructFieldDefinition {
 
    pub span: InputSpan,
 
    pub field: Identifier,
 
    pub parser_type: ParserType,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct StructDefinition {
 
    pub this: StructDefinitionId,
 
    pub defined_in: RootId,
 
    // Symbol scanning
 
    pub identifier: Identifier,
 
    pub poly_vars: Vec<Identifier>,
 
    // Parsing
 
    pub fields: Vec<StructFieldDefinition>
 
}
 

	
 
impl StructDefinition {
 
    pub(crate) fn new_empty(
 
        this: StructDefinitionId, defined_in: RootId,
 
        identifier: Identifier, poly_vars: Vec<Identifier>
 
    ) -> Self {
 
        Self{ this, defined_in, identifier, poly_vars, fields: Vec::new() }
 
    }
 
}
 

	
 
#[derive(Debug, Clone, Copy)]
 
pub enum EnumVariantValue {
 
    None,
 
    Integer(i64),
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct EnumVariantDefinition {
 
    pub identifier: Identifier,
 
    pub value: EnumVariantValue,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct EnumDefinition {
 
    pub this: EnumDefinitionId,
 
    pub defined_in: RootId,
 
    // Symbol scanning
 
    pub identifier: Identifier,
 
    pub poly_vars: Vec<Identifier>,
 
    // Parsing
 
    pub variants: Vec<EnumVariantDefinition>,
 
}
 

	
 
impl EnumDefinition {
 
    pub(crate) fn new_empty(
 
        this: EnumDefinitionId, defined_in: RootId,
 
        identifier: Identifier, poly_vars: Vec<Identifier>
 
    ) -> Self {
 
        Self{ this, defined_in, identifier, poly_vars, variants: Vec::new() }
 
    }
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct UnionVariantDefinition {
 
    pub span: InputSpan,
 
    pub identifier: Identifier,
 
    pub value: Vec<ParserType>, // if empty, then union variant does not contain any embedded types
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct UnionDefinition {
 
    pub this: UnionDefinitionId,
 
    pub defined_in: RootId,
 
    // Phase 1: symbol scanning
 
    pub identifier: Identifier,
 
    pub poly_vars: Vec<Identifier>,
 
    // Phase 2: parsing
 
    pub variants: Vec<UnionVariantDefinition>,
 
}
 

	
 
impl UnionDefinition {
 
    pub(crate) fn new_empty(
 
        this: UnionDefinitionId, defined_in: RootId,
 
        identifier: Identifier, poly_vars: Vec<Identifier>
 
    ) -> Self {
 
        Self{ this, defined_in, identifier, poly_vars, variants: Vec::new() }
 
    }
 
}
 

	
 
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 
pub enum ProcedureKind {
 
    Function, // with return type
 
    Component,
 
}
 

	
 
/// Monomorphed instantiation of a procedure (or the sole instantiation of a
 
/// non-polymorphic procedure).
 
#[derive(Debug)]
 
pub struct ProcedureDefinitionMonomorph {
 
    pub argument_types: Vec<TypeId>,
 
    pub expr_info: Vec<ExpressionInfo>
 
}
 

	
 
impl ProcedureDefinitionMonomorph {
 
    pub(crate) fn new_invalid() -> Self {
 
        return Self{
 
            argument_types: Vec::new(),
 
            expr_info: Vec::new(),
 
        }
 
    }
 
}
 

	
 
#[derive(Debug, Clone, Copy)]
 
pub struct ExpressionInfo {
 
    pub type_id: TypeId,
 
    pub variant: ExpressionInfoVariant,
 
}
 

	
 
impl ExpressionInfo {
 
    pub(crate) fn new_invalid() -> Self {
 
        return Self{
 
            type_id: TypeId::new_invalid(),
 
            variant: ExpressionInfoVariant::Generic,
 
        }
 
    }
 
}
 

	
 
#[derive(Debug, Clone, Copy)]
 
pub enum ExpressionInfoVariant {
 
    Generic,
 
    Procedure(TypeId, u32), // procedure TypeID and its monomorph index
 
    Select(i32), // index
 
}
 

	
 
impl ExpressionInfoVariant {
 
    pub(crate) fn as_select(&self) -> i32 {
 
        match self {
 
            ExpressionInfoVariant::Select(v) => *v,
 
            _ => unreachable!(),
 
        }
 
    }
 

	
 
    pub(crate) fn as_procedure(&self) -> (TypeId, u32) {
 
        match self {
 
            ExpressionInfoVariant::Procedure(type_id, monomorph_index) => (*type_id, *monomorph_index),
 
            _ => unreachable!(),
 
        }
 
    }
 
}
 

	
 
#[derive(Debug)]
 
pub enum ProcedureSource {
 
    FuncUserDefined,
 
    CompUserDefined,
 
    // Builtin functions, available to user
 
    FuncGet,
 
    FuncPut,
 
    FuncFires,
 
    FuncCreate,
 
    FuncLength,
 
    FuncAssert,
 
    FuncPrint,
 
    // Buitlin functions, not available to user
 
    FuncSelectStart,
 
    FuncSelectRegisterCasePort,
 
    FuncSelectWait,
 
    // Builtin components, available to user
 
    CompRandomU32, // TODO: Remove, temporary thing
 
    CompTcpClient,
 
    CompTcpListener,
 
}
 

	
 
impl ProcedureSource {
 
    pub(crate) fn is_builtin(&self) -> bool {
 
        match self {
 
            ProcedureSource::FuncUserDefined | ProcedureSource::CompUserDefined => false,
 
            _ => true,
 
        }
 
    }
 
}
 

	
 

	
 
/// Generic storage for functions and components.
 
// Note that we will have function definitions for builtin functions as well. In
 
// that case the span, the identifier span and the body are all invalid.
 
#[derive(Debug)]
 
pub struct ProcedureDefinition {
 
    pub this: ProcedureDefinitionId,
 
    pub defined_in: RootId,
 
    // Symbol scanning
 
    pub kind: ProcedureKind,
 
    pub identifier: Identifier,
 
    pub poly_vars: Vec<Identifier>,
 
    // Parser
 
    pub source: ProcedureSource,
 
    pub return_type: Option<ParserType>, // present on functions, not components
 
    pub parameters: Vec<VariableId>,
 
    pub scope: ScopeId,
 
    pub body: BlockStatementId,
 
    // Monomorphization of typed procedures
 
    pub monomorphs: Vec<ProcedureDefinitionMonomorph>,
 
}
 

	
 
impl ProcedureDefinition {
 
    pub(crate) fn new_empty(
 
        this: ProcedureDefinitionId, defined_in: RootId,
 
        kind: ProcedureKind, identifier: Identifier, poly_vars: Vec<Identifier>
 
    ) -> Self {
 
        Self {
 
            this, defined_in,
 
            kind, identifier, poly_vars,
 
            source: ProcedureSource::FuncUserDefined,
 
            return_type: None,
 
            parameters: Vec::new(),
 
            scope: ScopeId::new_invalid(),
 
            body: BlockStatementId::new_invalid(),
 
            monomorphs: Vec::new(),
 
        }
 
    }
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub enum Statement {
 
    Block(BlockStatement),
 
    EndBlock(EndBlockStatement),
 
    Local(LocalStatement),
 
    Labeled(LabeledStatement),
 
    If(IfStatement),
 
    EndIf(EndIfStatement),
 
    While(WhileStatement),
 
    EndWhile(EndWhileStatement),
 
    Break(BreakStatement),
 
    Continue(ContinueStatement),
 
    Synchronous(SynchronousStatement),
 
    EndSynchronous(EndSynchronousStatement),
 
    Fork(ForkStatement),
 
    EndFork(EndForkStatement),
 
    Select(SelectStatement),
 
    EndSelect(EndSelectStatement),
 
    Return(ReturnStatement),
 
    Goto(GotoStatement),
 
    New(NewStatement),
 
    Expression(ExpressionStatement),
 
}
 

	
 
impl Statement {
 
    pub fn as_new(&self) -> &NewStatement {
 
        match self {
 
            Statement::New(result) => result,
 
            _ => panic!("Unable to cast `Statement` to `NewStatement`"),
 
        }
 
    }
 

	
 
    pub fn span(&self) -> InputSpan {
 
    pub fn maybe_span(&self) -> Option<InputSpan> {
 
        match self {
 
            Statement::Block(v) => v.span,
 
            Statement::Local(v) => v.span(),
 
            Statement::Labeled(v) => v.label.span,
 
            Statement::If(v) => v.span,
 
            Statement::While(v) => v.span,
 
            Statement::Break(v) => v.span,
 
            Statement::Continue(v) => v.span,
 
            Statement::Synchronous(v) => v.span,
 
            Statement::Fork(v) => v.span,
 
            Statement::Select(v) => v.span,
 
            Statement::Return(v) => v.span,
 
            Statement::Goto(v) => v.span,
 
            Statement::New(v) => v.span,
 
            Statement::Expression(v) => v.span,
 
            Statement::Block(v) => Some(v.span),
 
            Statement::Local(v) => Some(v.span()),
 
            Statement::Labeled(v) => Some(v.label.span),
 
            Statement::If(v) => Some(v.span),
 
            Statement::While(v) => Some(v.span),
 
            Statement::Break(v) => Some(v.span),
 
            Statement::Continue(v) => Some(v.span),
 
            Statement::Synchronous(v) => Some(v.span),
 
            Statement::Fork(v) => Some(v.span),
 
            Statement::Select(v) => Some(v.span),
 
            Statement::Return(v) => Some(v.span),
 
            Statement::Goto(v) => Some(v.span),
 
            Statement::New(v) => Some(v.span),
 
            Statement::Expression(v) => Some(v.span),
 
            Statement::EndBlock(_)
 
            | Statement::EndIf(_)
 
            | Statement::EndWhile(_)
 
            | Statement::EndSynchronous(_)
 
            | Statement::EndFork(_)
 
            | Statement::EndSelect(_) => unreachable!(),
 
            | Statement::EndSelect(_) => None,
 
        }
 
    }
 

	
 
    pub fn span(&self) -> InputSpan {
 
        return self.maybe_span().unwrap();
 
    }
 

	
 
    pub fn link_next(&mut self, next: StatementId) {
 
        match self {
 
            Statement::Block(stmt) => stmt.next = next,
 
            Statement::EndBlock(stmt) => stmt.next = next,
 
            Statement::Local(stmt) => match stmt {
 
                LocalStatement::Channel(stmt) => stmt.next = next,
 
                LocalStatement::Memory(stmt) => stmt.next = next,
 
            },
 
            Statement::EndIf(stmt) => stmt.next = next,
 
            Statement::EndWhile(stmt) => stmt.next = next,
 
            Statement::EndSynchronous(stmt) => stmt.next = next,
 
            Statement::EndFork(stmt) => stmt.next = next,
 
            Statement::EndSelect(stmt) => stmt.next = next,
 
            Statement::New(stmt) => stmt.next = next,
 
            Statement::Expression(stmt) => stmt.next = next,
 
            Statement::Return(_)
 
            | Statement::Break(_)
 
            | Statement::Continue(_)
 
            | Statement::Synchronous(_)
 
            | Statement::Fork(_)
 
            | Statement::Select(_)
 
            | Statement::Goto(_)
 
            | Statement::While(_)
 
            | Statement::Labeled(_)
 
            | Statement::If(_) => unreachable!(),
 
        }
 
    }
 

	
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct BlockStatement {
 
    pub this: BlockStatementId,
 
    // Phase 1: parser
 
    pub span: InputSpan, // of the complete block
 
    pub statements: Vec<StatementId>,
 
    pub end_block: EndBlockStatementId,
 
    // Phase 2: linker
 
    pub scope: ScopeId,
 
    pub next: StatementId,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct EndBlockStatement {
 
    pub this: EndBlockStatementId,
 
    // Parser
 
    pub start_block: BlockStatementId,
 
    // Validation/Linking
 
    pub next: StatementId,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub enum LocalStatement {
 
    Memory(MemoryStatement),
 
    Channel(ChannelStatement),
 
}
 

	
 
impl LocalStatement {
 
    pub fn this(&self) -> LocalStatementId {
 
        match self {
 
            LocalStatement::Memory(stmt) => stmt.this.upcast(),
 
            LocalStatement::Channel(stmt) => stmt.this.upcast(),
 
        }
 
    }
 
    pub fn span(&self) -> InputSpan {
 
        match self {
 
            LocalStatement::Channel(v) => v.span,
 
            LocalStatement::Memory(v) => v.span,
 
        }
 
    }
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct MemoryStatement {
 
    pub this: MemoryStatementId,
 
    // Phase 1: parser
 
    pub span: InputSpan,
 
    pub variable: VariableId,
 
    pub initial_expr: AssignmentExpressionId,
 
    // Phase 2: linker
 
    pub next: StatementId,
 
}
 

	
 
/// ChannelStatement is the declaration of an input and output port associated
 
/// with the same channel. Note that the polarity of the ports are from the
 
/// point of view of the component. So an output port is something that a
 
/// component uses to send data over (i.e. it is the "input end" of the
 
/// channel), and vice versa.
 
#[derive(Debug, Clone)]
 
pub struct ChannelStatement {
 
    pub this: ChannelStatementId,
 
    // Phase 1: parser
 
    pub span: InputSpan, // of the "channel" keyword
 
    pub from: VariableId, // output
 
    pub to: VariableId,   // input
 
    // Phase 2: linker
 
    pub relative_pos_in_parent: i32,
 
    pub next: StatementId,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct LabeledStatement {
 
    pub this: LabeledStatementId,
 
    // Phase 1: parser
 
    pub label: Identifier,
 
    pub body: StatementId,
 
    // Phase 2: linker
 
    pub relative_pos_in_parent: i32,
 
    pub in_sync: SynchronousStatementId, // may be invalid
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct IfStatement {
 
    pub this: IfStatementId,
 
    // Phase 1: parser
 
    pub span: InputSpan, // of the "if" keyword
 
    pub test: ExpressionId,
 
    pub true_case: IfStatementCase,
 
    pub false_case: Option<IfStatementCase>,
 
    pub end_if: EndIfStatementId,
 
}
 

	
 
#[derive(Debug, Clone, Copy)]
 
pub struct IfStatementCase {
 
    pub body: StatementId,
 
    pub scope: ScopeId,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct EndIfStatement {
 
    pub this: EndIfStatementId,
 
    pub start_if: IfStatementId,
 
    pub next: StatementId,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct WhileStatement {
 
    pub this: WhileStatementId,
 
    // Phase 1: parser
 
    pub span: InputSpan, // of the "while" keyword
 
    pub test: ExpressionId,
 
    pub scope: ScopeId,
 
    pub body: StatementId,
 
    pub end_while: EndWhileStatementId,
 
    pub in_sync: SynchronousStatementId, // may be invalid
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct EndWhileStatement {
 
    pub this: EndWhileStatementId,
 
    pub start_while: WhileStatementId,
 
    // Phase 2: linker
 
    pub next: StatementId,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct BreakStatement {
 
    pub this: BreakStatementId,
 
    // Phase 1: parser
 
    pub span: InputSpan, // of the "break" keyword
 
    pub label: Option<Identifier>,
 
    // Phase 2: linker
 
    pub target: EndWhileStatementId, // invalid if not yet set
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct ContinueStatement {
 
    pub this: ContinueStatementId,
 
    // Phase 1: parser
 
    pub span: InputSpan, // of the "continue" keyword
 
    pub label: Option<Identifier>,
 
    // Phase 2: linker
 
    pub target: WhileStatementId, // invalid if not yet set
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct SynchronousStatement {
 
    pub this: SynchronousStatementId,
 
    // Phase 1: parser
 
    pub span: InputSpan, // of the "sync" keyword
 
    pub scope: ScopeId,
 
    pub body: StatementId,
 
    pub end_sync: EndSynchronousStatementId,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct EndSynchronousStatement {
 
    pub this: EndSynchronousStatementId,
 
    pub start_sync: SynchronousStatementId,
 
    // Phase 2: linker
 
    pub next: StatementId,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct ForkStatement {
 
    pub this: ForkStatementId,
 
    // Phase 1: parser
 
    pub span: InputSpan, // of the "fork" keyword
 
    pub left_body: StatementId,
 
    pub right_body: Option<StatementId>,
 
    pub end_fork: EndForkStatementId,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct EndForkStatement {
 
    pub this: EndForkStatementId,
 
    pub start_fork: ForkStatementId,
 
    pub next: StatementId,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct SelectStatement {
 
    pub this: SelectStatementId,
 
    pub span: InputSpan, // of the "select" keyword
 
    pub cases: Vec<SelectCase>,
 
    pub end_select: EndSelectStatementId,
 
    pub relative_pos_in_parent: i32,
 
    pub next: StatementId, // note: the select statement will be transformed into other AST elements, this `next` jumps to those replacement statements
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct SelectCase {
 
    // The guard statement of a `select` is either a MemoryStatement or an
 
    // ExpressionStatement. Nothing else is allowed by the initial parsing
 
    pub guard: StatementId,
 
    pub body: StatementId,
 
    pub scope: ScopeId,
 
    // Phase 2: Validation and Linking
 
    pub involved_ports: Vec<(CallExpressionId, ExpressionId)>, // call to `get` and its port argument
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct EndSelectStatement {
 
    pub this: EndSelectStatementId,
 
    pub start_select: SelectStatementId,
 
    pub next: StatementId,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct ReturnStatement {
 
    pub this: ReturnStatementId,
 
    // Phase 1: parser
 
    pub span: InputSpan, // of the "return" keyword
 
    pub expressions: Vec<ExpressionId>,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct GotoStatement {
 
    pub this: GotoStatementId,
 
    // Phase 1: parser
 
    pub span: InputSpan, // of the "goto" keyword
 
    pub label: Identifier,
 
    // Phase 2: linker
 
    pub target: LabeledStatementId, // invalid if not yet set
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct NewStatement {
 
    pub this: NewStatementId,
 
    // Phase 1: parser
 
    pub span: InputSpan, // of the "new" keyword
 
    pub expression: CallExpressionId,
 
    // Phase 2: linker
 
    pub next: StatementId,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct ExpressionStatement {
 
    pub this: ExpressionStatementId,
 
    // Phase 1: parser
 
    pub span: InputSpan,
 
    pub expression: ExpressionId,
 
    // Phase 2: linker
 
    pub next: StatementId,
 
}
 

	
 
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
 
pub enum ExpressionParent {
 
    None, // only set during initial parsing
 
    Memory(MemoryStatementId),
 
    If(IfStatementId),
 
    While(WhileStatementId),
 
    Return(ReturnStatementId),
 
    New(NewStatementId),
 
    ExpressionStmt(ExpressionStatementId),
 
    Expression(ExpressionId, u32) // index within expression (e.g LHS or RHS of expression, or index in array literal, etc.)
 
}
 

	
 
impl ExpressionParent {
 
    pub fn is_new(&self) -> bool {
 
        match self {
 
            ExpressionParent::New(_) => true,
 
            _ => false,
 
        }
 
    }
 

	
 
    pub fn as_expression(&self) -> ExpressionId {
 
        match self {
 
            ExpressionParent::Expression(id, _) => *id,
 
            _ => panic!("called as_expression() on {:?}", self),
 
        }
 
    }
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub enum Expression {
 
    Assignment(AssignmentExpression),
 
    Binding(BindingExpression),
 
    Conditional(ConditionalExpression),
 
    Binary(BinaryExpression),
 
    Unary(UnaryExpression),
 
    Indexing(IndexingExpression),
 
    Slicing(SlicingExpression),
 
    Select(SelectExpression),
 
    Literal(LiteralExpression),
 
    Cast(CastExpression),
 
    Call(CallExpression),
 
    Variable(VariableExpression),
 
}
 

	
 
impl Expression {
 
    pub fn as_variable(&self) -> &VariableExpression {
 
        match self {
 
            Expression::Variable(result) => result,
 
            _ => panic!("Unable to cast `Expression` to `VariableExpression`"),
 
        }
 
    }
 

	
 
    /// Returns operator span, function name, a binding's "let" span, etc. An
 
    /// indicator for the kind of expression that is being applied.
 
    pub fn operation_span(&self) -> InputSpan {
 
        match self {
 
            Expression::Assignment(expr) => expr.operator_span,
 
            Expression::Binding(expr) => expr.operator_span,
 
            Expression::Conditional(expr) => expr.operator_span,
 
            Expression::Binary(expr) => expr.operator_span,
 
            Expression::Unary(expr) => expr.operator_span,
 
            Expression::Indexing(expr) => expr.operator_span,
 
            Expression::Slicing(expr) => expr.slicing_span,
 
            Expression::Select(expr) => expr.operator_span,
 
            Expression::Literal(expr) => expr.span,
 
            Expression::Cast(expr) => expr.cast_span,
 
            Expression::Call(expr) => expr.func_span,
 
            Expression::Variable(expr) => expr.identifier.span,
 
        }
 
    }
 

	
 
    /// Returns the span covering the entire expression (i.e. including the
 
    /// spans of the arguments as well).
 
    pub fn full_span(&self) -> InputSpan {
 
        match self {
 
            Expression::Assignment(expr) => expr.full_span,
 
            Expression::Binding(expr) => expr.full_span,
 
            Expression::Conditional(expr) => expr.full_span,
 
            Expression::Binary(expr) => expr.full_span,
 
            Expression::Unary(expr) => expr.full_span,
 
            Expression::Indexing(expr) => expr.full_span,
 
            Expression::Slicing(expr) => expr.full_span,
 
            Expression::Select(expr) => expr.full_span,
 
            Expression::Literal(expr) => expr.span,
 
            Expression::Cast(expr) => expr.full_span,
 
            Expression::Call(expr) => expr.full_span,
 
            Expression::Variable(expr) => expr.identifier.span,
 
        }
 
    }
 

	
 
    pub fn parent(&self) -> &ExpressionParent {
 
        match self {
 
            Expression::Assignment(expr) => &expr.parent,
 
            Expression::Binding(expr) => &expr.parent,
 
            Expression::Conditional(expr) => &expr.parent,
 
            Expression::Binary(expr) => &expr.parent,
 
            Expression::Unary(expr) => &expr.parent,
 
            Expression::Indexing(expr) => &expr.parent,
 
            Expression::Slicing(expr) => &expr.parent,
 
            Expression::Select(expr) => &expr.parent,
 
            Expression::Literal(expr) => &expr.parent,
 
            Expression::Cast(expr) => &expr.parent,
 
            Expression::Call(expr) => &expr.parent,
 
            Expression::Variable(expr) => &expr.parent,
 
        }
 
    }
 

	
 
    pub fn parent_mut(&mut self) -> &mut ExpressionParent {
src/protocol/eval/error.rs
Show inline comments
 
use std::fmt;
 

	
 
use crate::protocol::{
 
    ast::*,
 
    Module,
 
    input_source::{ErrorStatement, StatementKind}
 
};
 
use super::executor::*;
 

	
 
/// Represents a stack frame recorded in an error
 
#[derive(Debug)]
 
pub struct EvalFrame {
 
    pub line: u32,
 
    pub line: Option<u32>,
 
    pub module_name: String,
 
    pub procedure: String, // function or component
 
    pub is_func: bool,
 
}
 

	
 
impl fmt::Display for EvalFrame {
 
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 
        let func_or_comp = if self.is_func {
 
            "function "
 
        } else {
 
            "component"
 
        };
 

	
 
        let line_str = match self.line {
 
            Some(line_number) => line_number.to_string(),
 
            None => String::from("??"),
 
        };
 

	
 
        if self.module_name.is_empty() {
 
            write!(f, "{} {}:{}", func_or_comp, &self.procedure, self.line)
 
            write!(f, "{} {}:{}", func_or_comp, &self.procedure, line_str)
 
        } else {
 
            write!(f, "{} {}:{}:{}", func_or_comp, &self.module_name, &self.procedure, self.line)
 
            write!(f, "{} {}:{}:{}", func_or_comp, &self.module_name, &self.procedure, line_str)
 
        }
 
    }
 
}
 

	
 
/// Represents an error that ocurred during evaluation. Contains error
 
/// statements just like in parsing errors. Additionally may display the current
 
/// execution state.
 
#[derive(Debug)]
 
pub struct EvalError {
 
    pub(crate) statements: Vec<ErrorStatement>,
 
    pub(crate) frames: Vec<EvalFrame>,
 
}
 

	
 
impl EvalError {
 
    pub(crate) fn new_error_at_expr(prompt: &Prompt, modules: &[Module], heap: &Heap, expr_id: ExpressionId, msg: String) -> EvalError {
 
        // Create frames
 
        debug_assert!(!prompt.frames.is_empty());
 
        let mut frames = Vec::with_capacity(prompt.frames.len());
 
        let mut last_module_source = &modules[0].source;
 
        for frame in prompt.frames.iter() {
 
            let definition = &heap[frame.definition];
 
            let statement = &heap[frame.position];
 
            let statement_span = statement.span();
 
            let statement_span = statement.maybe_span();
 

	
 
            // Lookup module name, if it has one
 
            let module = modules.iter().find(|m| m.root_id == definition.defined_in).unwrap();
 
            let module_name = if let Some(name) = &module.name {
 
                name.as_str().to_string()
 
            } else {
 
                String::new()
 
            };
 

	
 
            last_module_source = &module.source;
 
            frames.push(EvalFrame{
 
                line: statement_span.begin.line,
 
                line: statement_span.map(|v| v.begin.line),
 
                module_name,
 
                procedure: definition.identifier.value.as_str().to_string(),
 
                is_func: definition.kind == ProcedureKind::Function,
 
            });
 
        }
 

	
 
        let expr = &heap[expr_id];
 
        let statements = vec![
 
            ErrorStatement::from_source_at_span(StatementKind::Error, last_module_source, expr.full_span(), msg)
 
        ];
 

	
 
        EvalError{ statements, frames }
 
    }
 
}
 

	
 
impl fmt::Display for EvalError {
 
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 
        // Display error statement(s)
 
        self.statements[0].fmt(f)?;
 
        for statement in self.statements.iter().skip(1) {
 
            writeln!(f)?;
 
            statement.fmt(f)?;
 
        }
 

	
 
        // Display stack trace
 
        writeln!(f)?;
 
        writeln!(f, " +-  Stack trace:")?;
 
        for frame in self.frames.iter().rev() {
 
            write!(f, " | ")?;
 
            frame.fmt(f)?;
 
            writeln!(f)?;
 
        }
 

	
 
        Ok(())
 
    }
 
}
 
\ No newline at end of file
src/protocol/parser/pass_validation_linking.rs
Show inline comments
 
/*
 
 * pass_validation_linking.rs
 
 *
 
 * The pass that will validate properties of the AST statements (one is not
 
 * allowed to nest synchronous statements, instantiating components occurs in
 
 * the right places, etc.) and expressions (assignments may not occur in
 
 * arbitrary expressions).
 
 *
 
 * Furthermore, this pass will also perform "linking", in the sense of: some AST
 
 * nodes have something to do with one another, so we link them up in this pass
 
 * (e.g. setting the parents of expressions, linking the control flow statements
 
 * like `continue` and `break` up to the respective loop statement, etc.).
 
 *
 
 * There are several "confusing" parts about this pass:
 
 *
 
 * Setting expression parents: this is the simplest one. The pass struct acts
 
 * like a little state machine. When visiting an expression it will set the
 
 * "parent expression" field of the pass to itself, then visit its child. The
 
 * child will look at this "parent expression" field to determine its parent.
 
 *
 
 * Setting the `next` statement: the AST is a tree, but during execution we walk
 
 * a linear path through all statements. So where appropriate a statement may
 
 * set the "previous statement" field of the pass to itself. When visiting the
 
 * subsequent statement it will check this "previous statement", and if set, it
 
 * will link this previous statement up to itself. Not every statement has a
 
 * previous statement. Hence there are two patterns that occur: assigning the
 
 * `next` value, then clearing the "previous statement" field. And assigning the
 
 * `next` value, and then putting the current statement's ID in the "previous
 
 * statement" field. Because it is so common, this file contain two macros that
 
 * perform that operation.
 
 *
 
 * To make storing types for polymorphic procedures simpler and more efficient,
 
 * we assign to each expression in the procedure a unique ID. This is what the
 
 * "next expression index" field achieves. Each expression simply takes the
 
 * current value, and then increments this counter.
 
 */
 

	
 
use crate::collections::{ScopedBuffer};
 
use crate::protocol::ast::*;
 
use crate::protocol::input_source::*;
 
use crate::protocol::parser::symbol_table::*;
 
use crate::protocol::parser::type_table::*;
 

	
 
use super::visitor::{
 
    BUFFER_INIT_CAP_SMALL,
 
    BUFFER_INIT_CAP_LARGE,
 
    Ctx,
 
    Visitor,
 
    VisitorResult
 
};
 
use crate::protocol::parser::ModuleCompilationPhase;
 

	
 
#[derive(Debug)]
 
struct ControlFlowStatement {
 
    in_sync: SynchronousStatementId,
 
    in_while: WhileStatementId,
 
    in_scope: ScopeId,
 
    statement: StatementId, // of 'break', 'continue' or 'goto'
 
}
 

	
 
/// This particular visitor will go through the entire AST in a recursive manner
 
/// and check if all statements and expressions are legal (e.g. no "return"
 
/// statements in component definitions), and will link certain AST nodes to
 
/// their appropriate targets (e.g. goto statements, or function calls).
 
///
 
/// This visitor will not perform control-flow analysis (e.g. making sure that
 
/// each function actually returns) and will also not perform type checking. So
 
/// the linking of function calls and component instantiations will be checked
 
/// and linked to the appropriate definitions, but the return types and/or
 
/// arguments will not be checked for validity.
 
///
 
/// The main idea is, because we're visiting nodes in a tree, to do as much as
 
/// we can while we have the memory in cache.
 
pub(crate) struct PassValidationLinking {
 
    // Traversal state, all valid IDs if inside a certain AST element. Otherwise
 
    // `id.is_invalid()` returns true.
 
    in_sync: SynchronousStatementId,
 
    in_while: WhileStatementId, // to resolve labeled continue/break
 
    in_select_guard: SelectStatementId, // for detection/rejection of builtin calls
 
    in_select_arm: u32,
 
    in_test_expr: StatementId, // wrapping if/while stmt id
 
    in_binding_expr: BindingExpressionId, // to resolve variable expressions
 
    in_binding_expr_lhs: bool,
 
    // Traversal state, current scope (which can be used to find the parent
 
    // scope) and the definition variant we are considering.
 
    cur_scope: ScopeId,
 
    proc_id: ProcedureDefinitionId,
 
    proc_kind: ProcedureKind,
 
    // "Trailing" traversal state, set be child/prev stmt/expr used by next one
 
    prev_stmt: StatementId,
 
    expr_parent: ExpressionParent,
 
    // Set by parent to indicate that child expression must be assignable. The
 
    // child will throw an error if it is not assignable. The stored span is
 
    // used for the error's position
 
    must_be_assignable: Option<InputSpan>,
 
    // Keeping track of relative positions and unique IDs.
 
    relative_pos_in_parent: i32, // of statements: to determine when variables are visible
 
    // Control flow statements that require label resolving
 
    control_flow_stmts: Vec<ControlFlowStatement>,
 
    // Various temporary buffers for traversal. Essentially working around
 
    // Rust's borrowing rules since it cannot understand we're modifying AST
 
    // members but not the AST container.
 
    variable_buffer: ScopedBuffer<VariableId>,
 
    definition_buffer: ScopedBuffer<DefinitionId>,
 
    statement_buffer: ScopedBuffer<StatementId>,
 
    expression_buffer: ScopedBuffer<ExpressionId>,
 
    scope_buffer: ScopedBuffer<ScopeId>,
 
}
 

	
 
impl PassValidationLinking {
 
    pub(crate) fn new() -> Self {
 
        Self{
 
            in_sync: SynchronousStatementId::new_invalid(),
 
            in_while: WhileStatementId::new_invalid(),
 
            in_select_guard: SelectStatementId::new_invalid(),
 
            in_select_arm: 0,
 
            in_test_expr: StatementId::new_invalid(),
 
            in_binding_expr: BindingExpressionId::new_invalid(),
 
            in_binding_expr_lhs: false,
 
            cur_scope: ScopeId::new_invalid(),
 
            prev_stmt: StatementId::new_invalid(),
 
            expr_parent: ExpressionParent::None,
 
            proc_id: ProcedureDefinitionId::new_invalid(),
 
            proc_kind: ProcedureKind::Function,
 
            must_be_assignable: None,
 
            relative_pos_in_parent: 0,
 
            control_flow_stmts: Vec::with_capacity(BUFFER_INIT_CAP_SMALL),
 
            variable_buffer: ScopedBuffer::with_capacity(BUFFER_INIT_CAP_SMALL),
 
            definition_buffer: ScopedBuffer::with_capacity(BUFFER_INIT_CAP_SMALL),
 
            statement_buffer: ScopedBuffer::with_capacity(BUFFER_INIT_CAP_LARGE),
 
            expression_buffer: ScopedBuffer::with_capacity(BUFFER_INIT_CAP_LARGE),
 
            scope_buffer: ScopedBuffer::with_capacity(BUFFER_INIT_CAP_SMALL),
 
        }
 
    }
 

	
 
    fn reset_state(&mut self) {
 
        self.in_sync = SynchronousStatementId::new_invalid();
 
        self.in_while = WhileStatementId::new_invalid();
 
        self.in_select_guard = SelectStatementId::new_invalid();
 
        self.in_test_expr = StatementId::new_invalid();
 
        self.in_binding_expr = BindingExpressionId::new_invalid();
 
        self.in_binding_expr_lhs = false;
 
        self.cur_scope = ScopeId::new_invalid();
 
        self.proc_id = ProcedureDefinitionId::new_invalid();
 
        self.proc_kind = ProcedureKind::Function;
 
        self.prev_stmt = StatementId::new_invalid();
 
        self.expr_parent = ExpressionParent::None;
 
        self.must_be_assignable = None;
 
        self.relative_pos_in_parent = 0;
 
        self.control_flow_stmts.clear();
 
    }
 
}
 

	
 
macro_rules! assign_then_erase_next_stmt {
 
    ($self:ident, $ctx:ident, $stmt_id:expr) => {
 
        if !$self.prev_stmt.is_invalid() {
 
            $ctx.heap[$self.prev_stmt].link_next($stmt_id);
 
            $self.prev_stmt = StatementId::new_invalid();
 
        }
 
    }
 
}
 

	
 
macro_rules! assign_and_replace_next_stmt {
 
    ($self:ident, $ctx:ident, $stmt_id:expr) => {
 
        if !$self.prev_stmt.is_invalid() {
 
            $ctx.heap[$self.prev_stmt].link_next($stmt_id);
 
        }
 
        $self.prev_stmt = $stmt_id;
 
    }
 
}
 

	
 
impl Visitor for PassValidationLinking {
 
    fn visit_module(&mut self, ctx: &mut Ctx) -> VisitorResult {
 
        debug_assert_eq!(ctx.module().phase, ModuleCompilationPhase::TypesAddedToTable);
 

	
 
        let root = &ctx.heap[ctx.module().root_id];
 
        let section = self.definition_buffer.start_section_initialized(&root.definitions);
 
        for definition_id in section.iter_copied() {
 
            self.visit_definition(ctx, definition_id)?;
 
        }
 
        section.forget();
 

	
 
        ctx.module_mut().phase = ModuleCompilationPhase::ValidatedAndLinked;
 
        Ok(())
 
    }
 
    //--------------------------------------------------------------------------
 
    // Definition visitors
 
    //--------------------------------------------------------------------------
 

	
 
    fn visit_procedure_definition(&mut self, ctx: &mut Ctx, id: ProcedureDefinitionId) -> VisitorResult {
 
        self.reset_state();
 

	
 
        let definition = &ctx.heap[id];
 
        self.proc_id = id;
 
        self.proc_kind = definition.kind;
 
        self.expr_parent = ExpressionParent::None;
 

	
 
        // Visit parameters
 
        let scope_id = definition.scope;
 
        let old_scope = self.push_scope(ctx, true, scope_id);
 

	
 
        let definition = &ctx.heap[id];
 
        let body_id = definition.body;
 
        let definition_is_builtin = definition.source.is_builtin();
 
        let section = self.variable_buffer.start_section_initialized(&definition.parameters);
 
        for variable_idx in 0..section.len() {
 
            let variable_id = section[variable_idx];
 
            self.checked_at_single_scope_add_local(ctx, self.cur_scope, -1, variable_id)?;
 
        }
 
        section.forget();
 

	
 
        // Visit statements in function body, if present at all
 
        if !definition_is_builtin {
 
            self.visit_block_stmt(ctx, body_id)?;
 
        }
 

	
 
        self.pop_scope(old_scope);
 

	
 
        self.resolve_pending_control_flow_targets(ctx)?;
 

	
 
        Ok(())
 
    }
 

	
 
    //--------------------------------------------------------------------------
 
    // Statement visitors
 
    //--------------------------------------------------------------------------
 

	
 
    fn visit_block_stmt(&mut self, ctx: &mut Ctx, id: BlockStatementId) -> VisitorResult {
 
        // Get end of block
 
        let block_stmt = &ctx.heap[id];
 
        let end_block_id = block_stmt.end_block;
 
        let scope_id = block_stmt.scope;
 

	
 
        // Traverse statements in block
 
        let statement_section = self.statement_buffer.start_section_initialized(&block_stmt.statements);
 
        let old_scope = self.push_scope(ctx, false, scope_id);
 
        assign_and_replace_next_stmt!(self, ctx, id.upcast());
 

	
 
        for stmt_idx in 0..statement_section.len() {
 
            self.relative_pos_in_parent = stmt_idx as i32;
 
            self.visit_stmt(ctx, statement_section[stmt_idx])?;
 
        }
 

	
 
        statement_section.forget();
 
        assign_and_replace_next_stmt!(self, ctx, end_block_id.upcast());
 

	
 
        self.pop_scope(old_scope);
 
        Ok(())
 
    }
 

	
 
    fn visit_local_memory_stmt(&mut self, ctx: &mut Ctx, id: MemoryStatementId) -> VisitorResult {
 
        let stmt = &ctx.heap[id];
 
        let expr_id = stmt.initial_expr;
 
        let variable_id = stmt.variable;
 

	
 
        self.checked_add_local(ctx, self.cur_scope, self.relative_pos_in_parent, variable_id)?;
 

	
 
        assign_and_replace_next_stmt!(self, ctx, id.upcast().upcast());
 
        debug_assert_eq!(self.expr_parent, ExpressionParent::None);
 
        self.expr_parent = ExpressionParent::Memory(id);
 
        self.visit_assignment_expr(ctx, expr_id)?;
 
        self.expr_parent = ExpressionParent::None;
 

	
 
        Ok(())
 
    }
 

	
 
    fn visit_local_channel_stmt(&mut self, ctx: &mut Ctx, id: ChannelStatementId) -> VisitorResult {
 
        let stmt = &ctx.heap[id];
 
        let from_id = stmt.from;
 
        let to_id = stmt.to;
 

	
 
        self.checked_add_local(ctx, self.cur_scope, self.relative_pos_in_parent, from_id)?;
 
        self.checked_add_local(ctx, self.cur_scope, self.relative_pos_in_parent, to_id)?;
 

	
 
        assign_and_replace_next_stmt!(self, ctx, id.upcast().upcast());
 
        Ok(())
 
    }
 

	
 
    fn visit_labeled_stmt(&mut self, ctx: &mut Ctx, id: LabeledStatementId) -> VisitorResult {
 
        let stmt = &ctx.heap[id];
 
        let body_id = stmt.body;
 

	
 
        self.checked_add_label(ctx, self.relative_pos_in_parent, self.in_sync, id)?;
 

	
 
        self.visit_stmt(ctx, body_id)?;
 
        Ok(())
 
    }
 

	
 
    fn visit_if_stmt(&mut self, ctx: &mut Ctx, id: IfStatementId) -> VisitorResult {
 
        let if_stmt = &ctx.heap[id];
 
        let end_if_id = if_stmt.end_if;
 
        let test_expr_id = if_stmt.test;
 
        let true_case = if_stmt.true_case;
 
        let false_case = if_stmt.false_case;
 

	
 
        // Visit test expression
 
        debug_assert_eq!(self.expr_parent, ExpressionParent::None);
 
        debug_assert!(self.in_test_expr.is_invalid());
 

	
 
        self.in_test_expr = id.upcast();
 
        self.expr_parent = ExpressionParent::If(id);
 
        self.visit_expr(ctx, test_expr_id)?;
 
        self.in_test_expr = StatementId::new_invalid();
 

	
 
        self.expr_parent = ExpressionParent::None;
 

	
 
        // Visit true and false branch. Executor chooses next statement based on
 
        // test expression, not on if-statement itself. Hence the if statement
 
        // does not have a static subsequent statement.
 
        assign_then_erase_next_stmt!(self, ctx, id.upcast());
 
        let old_scope = self.push_scope(ctx, false, true_case.scope);
 
        self.visit_stmt(ctx, true_case.body)?;
 
        self.pop_scope(old_scope);
 
        assign_then_erase_next_stmt!(self, ctx, end_if_id.upcast());
 

	
 
        if let Some(false_case) = false_case {
 
            let old_scope = self.push_scope(ctx, false, false_case.scope);
 
            self.visit_stmt(ctx, false_case.body)?;
 
            self.pop_scope(old_scope);
 
            assign_then_erase_next_stmt!(self, ctx, end_if_id.upcast());
 
        }
 

	
 
        self.prev_stmt = end_if_id.upcast();
 
        Ok(())
 
    }
 

	
 
    fn visit_while_stmt(&mut self, ctx: &mut Ctx, id: WhileStatementId) -> VisitorResult {
 
        let stmt = &ctx.heap[id];
 
        let stmt = &mut ctx.heap[id];
 
        let end_while_id = stmt.end_while;
 
        let test_expr_id = stmt.test;
 
        let body_stmt_id = stmt.body;
 
        let scope_id = stmt.scope;
 
        stmt.in_sync = self.in_sync;
 

	
 
        let old_while = self.in_while;
 
        self.in_while = id;
 

	
 
        // Visit test expression
 
        debug_assert_eq!(self.expr_parent, ExpressionParent::None);
 
        debug_assert!(self.in_test_expr.is_invalid());
 
        self.in_test_expr = id.upcast();
 
        self.expr_parent = ExpressionParent::While(id);
 
        self.visit_expr(ctx, test_expr_id)?;
 
        self.in_test_expr = StatementId::new_invalid();
 

	
 
        // Link up to body statement
 
        assign_then_erase_next_stmt!(self, ctx, id.upcast());
 

	
 
        self.expr_parent = ExpressionParent::None;
 
        let old_scope = self.push_scope(ctx, false, scope_id);
 
        self.visit_stmt(ctx, body_stmt_id)?;
 
        self.pop_scope(old_scope);
 
        self.in_while = old_while;
 

	
 
        // Link final entry in while's block statement back to the while. The
 
        // executor will go to the end-while statement if the test expression
 
        // is false, so put that in as the new previous stmt
 
        assign_then_erase_next_stmt!(self, ctx, id.upcast());
 
        self.prev_stmt = end_while_id.upcast();
 

	
 
        Ok(())
 
    }
 

	
 
    fn visit_break_stmt(&mut self, ctx: &mut Ctx, id: BreakStatementId) -> VisitorResult {
 
        self.control_flow_stmts.push(ControlFlowStatement{
 
            in_sync: self.in_sync,
 
            in_while: self.in_while,
 
            in_scope: self.cur_scope,
 
            statement: id.upcast()
 
        });
 
        assign_then_erase_next_stmt!(self, ctx, id.upcast());
 

	
 
        Ok(())
 
    }
 

	
 
    fn visit_continue_stmt(&mut self, ctx: &mut Ctx, id: ContinueStatementId) -> VisitorResult {
 
        self.control_flow_stmts.push(ControlFlowStatement{
 
            in_sync: self.in_sync,
 
            in_while: self.in_while,
 
            in_scope: self.cur_scope,
 
            statement: id.upcast()
 
        });
 
        assign_then_erase_next_stmt!(self, ctx, id.upcast());
 

	
 
        Ok(())
 
    }
 

	
 
    fn visit_synchronous_stmt(&mut self, ctx: &mut Ctx, id: SynchronousStatementId) -> VisitorResult {
 
        // Check for validity of synchronous statement
 
        let sync_stmt = &ctx.heap[id];
 
        let end_sync_id = sync_stmt.end_sync;
 
        let cur_sync_span = sync_stmt.span;
 
        let scope_id = sync_stmt.scope;
 

	
 
        if !self.in_sync.is_invalid() {
 
            // Nested synchronous statement
 
            let old_sync_span = ctx.heap[self.in_sync].span;
 
            return Err(ParseError::new_error_str_at_span(
 
                &ctx.module().source, cur_sync_span, "Illegal nested synchronous statement"
 
            ).with_info_str_at_span(
 
                &ctx.module().source, old_sync_span, "It is nested in this synchronous statement"
 
            ));
 
        }
 

	
 
        if self.proc_kind != ProcedureKind::Component {
 
            return Err(ParseError::new_error_str_at_span(
 
                &ctx.module().source, cur_sync_span,
 
                "synchronous statements may only be used in components"
 
            ));
 
        }
 

	
 
        // Synchronous statement implicitly moves to its block
 
        assign_then_erase_next_stmt!(self, ctx, id.upcast());
 

	
 
        // Visit block statement. Note that we explicitly push the scope here
 
        // (and the `visit_block_stmt` will also push, but without effect) to
 
        // ensure the scope contains the sync ID.
 
        let sync_body = ctx.heap[id].body;
 
        debug_assert!(self.in_sync.is_invalid());
 
        self.in_sync = id;
 
        let old_scope = self.push_scope(ctx, false, scope_id);
 
        self.visit_stmt(ctx, sync_body)?;
 
        self.pop_scope(old_scope);
 
        assign_and_replace_next_stmt!(self, ctx, end_sync_id.upcast());
 

	
 
        self.in_sync = SynchronousStatementId::new_invalid();
 

	
 
        Ok(())
 
    }
 

	
 
    fn visit_fork_stmt(&mut self, ctx: &mut Ctx, id: ForkStatementId) -> VisitorResult {
 
        let fork_stmt = &ctx.heap[id];
 
        let end_fork_id = fork_stmt.end_fork;
 
        let left_body_id = fork_stmt.left_body;
 
        let right_body_id = fork_stmt.right_body;
 

	
 
        // Fork statements may only occur inside sync blocks
 
        if self.in_sync.is_invalid() {
 
            return Err(ParseError::new_error_str_at_span(
 
                &ctx.module().source, fork_stmt.span,
 
                "Forking may only occur inside sync blocks"
 
            ));
 
        }
 

	
 
        // Visit the respective bodies. Like the if statement, a fork statement
 
        // does not have a single static subsequent statement. It forks and then
 
        // each fork has a different next statement.
 
        assign_then_erase_next_stmt!(self, ctx, id.upcast());
 
        self.visit_stmt(ctx, left_body_id)?;
 
        assign_then_erase_next_stmt!(self, ctx, end_fork_id.upcast());
 

	
 
        if let Some(right_body_id) = right_body_id {
 
            self.visit_stmt(ctx, right_body_id)?;
 
            assign_then_erase_next_stmt!(self, ctx, end_fork_id.upcast());
 
        }
 

	
 
        self.prev_stmt = end_fork_id.upcast();
 
        Ok(())
 
    }
 

	
 
    fn visit_select_stmt(&mut self, ctx: &mut Ctx, id: SelectStatementId) -> VisitorResult {
 
        let select_stmt = &mut ctx.heap[id];
 
        select_stmt.relative_pos_in_parent = self.relative_pos_in_parent;
 
        self.relative_pos_in_parent += 1;
 

	
 
        let select_stmt = &ctx.heap[id];
 
        let end_select_id = select_stmt.end_select;
 

	
 
        // Select statements may only occur inside sync blocks
 
        if self.in_sync.is_invalid() {
 
            return Err(ParseError::new_error_str_at_span(
 
                &ctx.module().source, select_stmt.span,
 
                "select statements may only occur inside sync blocks"
 
            ));
 
        }
 

	
 
        if self.proc_kind != ProcedureKind::Component {
 
            return Err(ParseError::new_error_str_at_span(
 
                &ctx.module().source, select_stmt.span,
 
                "select statements may only be used in components"
 
            ));
 
        }
 

	
 
        // Visit the various arms in the select block
 
        let mut case_stmt_ids = self.statement_buffer.start_section();
 
        let mut case_scope_ids = self.scope_buffer.start_section();
 
        let num_cases = select_stmt.cases.len();
 
        for case in &select_stmt.cases {
 
            // We add them in pairs, so the subsequent for-loop retrieves in pairs
 
            case_stmt_ids.push(case.guard);
 
            case_stmt_ids.push(case.body);
 
            case_scope_ids.push(case.scope);
 
        }
 

	
 
        assign_then_erase_next_stmt!(self, ctx, id.upcast());
 

	
 
        for index in 0..num_cases {
 
            let base_index = 2 * index;
 
            let guard_id     = case_stmt_ids[base_index];
 
            let case_body_id = case_stmt_ids[base_index + 1];
 
            let case_scope_id = case_scope_ids[index];
 

	
 
            // The guard statement ends up belonging to the block statement
 
            // following the arm. The reason we parse it separately is to
 
            // extract all of the "get" calls.
 
            let old_scope = self.push_scope(ctx, false, case_scope_id);
 

	
 
            // Visit the guard of this arm
 
            debug_assert!(self.in_select_guard.is_invalid());
 
            self.in_select_guard = id;
 
            self.in_select_arm = index as u32;
 
            self.visit_stmt(ctx, guard_id)?;
 
            self.in_select_guard = SelectStatementId::new_invalid();
 

	
 
            // Visit the code associated with the guard
 
            self.relative_pos_in_parent += 1;
 
            self.visit_stmt(ctx, case_body_id)?;
 
            self.pop_scope(old_scope);
 

	
 
            // Link up last statement in block to EndSelect
 
            assign_then_erase_next_stmt!(self, ctx, end_select_id.upcast());
 
        }
 

	
 
        self.in_select_guard = SelectStatementId::new_invalid();
 
        self.prev_stmt = end_select_id.upcast();
 
        Ok(())
 
    }
 

	
 
    fn visit_return_stmt(&mut self, ctx: &mut Ctx, id: ReturnStatementId) -> VisitorResult {
 
        // Check if "return" occurs within a function
 
        let stmt = &ctx.heap[id];
 
        if self.proc_kind != ProcedureKind::Function {
 
            return Err(ParseError::new_error_str_at_span(
 
                &ctx.module().source, stmt.span,
 
                "return statements may only appear in function bodies"
 
            ));
 
        }
 

	
 
        // If here then we are within a function
 
        assign_then_erase_next_stmt!(self, ctx, id.upcast());
 
        debug_assert_eq!(self.expr_parent, ExpressionParent::None);
 
        debug_assert_eq!(ctx.heap[id].expressions.len(), 1);
 
        self.expr_parent = ExpressionParent::Return(id);
 
        self.visit_expr(ctx, ctx.heap[id].expressions[0])?;
 
        self.expr_parent = ExpressionParent::None;
 

	
 
        Ok(())
 
    }
 

	
 
    fn visit_goto_stmt(&mut self, ctx: &mut Ctx, id: GotoStatementId) -> VisitorResult {
 
        self.control_flow_stmts.push(ControlFlowStatement{
 
            in_sync: self.in_sync,
 
            in_while: self.in_while,
 
            in_scope: self.cur_scope,
 
            statement: id.upcast(),
 
        });
 
        assign_then_erase_next_stmt!(self, ctx, id.upcast());
 

	
 
        Ok(())
 
    }
 

	
 
    fn visit_new_stmt(&mut self, ctx: &mut Ctx, id: NewStatementId) -> VisitorResult {
 
        // Make sure the new statement occurs inside a component
 
        if self.proc_kind != ProcedureKind::Component {
 
            let new_stmt = &ctx.heap[id];
 
            return Err(ParseError::new_error_str_at_span(
 
                &ctx.module().source, new_stmt.span,
 
                "instantiating components may only be done in components"
 
            ));
 
        }
 

	
 
        // Recurse into call expression (which will check the expression parent
 
        // to ensure that the "new" statment instantiates a component)
 
        let call_expr_id = ctx.heap[id].expression;
 

	
 
        assign_and_replace_next_stmt!(self, ctx, id.upcast());
 
        debug_assert_eq!(self.expr_parent, ExpressionParent::None);
 
        self.expr_parent = ExpressionParent::New(id);
 
        self.visit_call_expr(ctx, call_expr_id)?;
 
        self.expr_parent = ExpressionParent::None;
 

	
 
        Ok(())
 
    }
 

	
 
    fn visit_expr_stmt(&mut self, ctx: &mut Ctx, id: ExpressionStatementId) -> VisitorResult {
 
        let expr_id = ctx.heap[id].expression;
 

	
 
        assign_and_replace_next_stmt!(self, ctx, id.upcast());
 
        debug_assert_eq!(self.expr_parent, ExpressionParent::None);
 
        self.expr_parent = ExpressionParent::ExpressionStmt(id);
 
        self.visit_expr(ctx, expr_id)?;
 
        self.expr_parent = ExpressionParent::None;
 

	
 
        Ok(())
 
    }
 

	
 

	
 
    //--------------------------------------------------------------------------
 
    // Expression visitors
 
    //--------------------------------------------------------------------------
 

	
 
    fn visit_assignment_expr(&mut self, ctx: &mut Ctx, id: AssignmentExpressionId) -> VisitorResult {
 
        let upcast_id = id.upcast();
 

	
 
        let assignment_expr = &mut ctx.heap[id];
 

	
 
        // Although we call assignment an expression to simplify the compiler's
 
        // code (mainly typechecking), we disallow nested use in expressions
 
        match self.expr_parent {
 
            // Look at us: lying through our teeth while providing error messages.
 
            ExpressionParent::Memory(_) => {},
 
            ExpressionParent::ExpressionStmt(_) => {},
 
            _ => {
 
                let assignment_span = assignment_expr.full_span;
 
                return Err(ParseError::new_error_str_at_span(
 
                    &ctx.module().source, assignment_span,
 
                    "assignments are statements, and cannot be used in expressions"
 
                ))
 
            },
 
        }
 

	
 
        let left_expr_id = assignment_expr.left;
 
        let right_expr_id = assignment_expr.right;
 
        let old_expr_parent = self.expr_parent;
 
        assignment_expr.parent = old_expr_parent;
 

	
 
        self.expr_parent = ExpressionParent::Expression(upcast_id, 0);
 
        self.must_be_assignable = Some(assignment_expr.operator_span);
 
        self.visit_expr(ctx, left_expr_id)?;
 
        self.expr_parent = ExpressionParent::Expression(upcast_id, 1);
 
        self.must_be_assignable = None;
 
        self.visit_expr(ctx, right_expr_id)?;
 
        self.expr_parent = old_expr_parent;
 
        Ok(())
 
    }
 

	
 
    fn visit_binding_expr(&mut self, ctx: &mut Ctx, id: BindingExpressionId) -> VisitorResult {
 
        let upcast_id = id.upcast();
 

	
 
        // Check for valid context of binding expression
 
        if let Some(span) = self.must_be_assignable {
 
            return Err(ParseError::new_error_str_at_span(
 
                &ctx.module().source, span, "cannot assign to the result from a binding expression"
 
            ));
 
        }
 

	
 
        if self.in_test_expr.is_invalid() {
 
            let binding_expr = &ctx.heap[id];
 
            return Err(ParseError::new_error_str_at_span(
 
                &ctx.module().source, binding_expr.full_span,
 
                "binding expressions can only be used inside the testing expression of 'if' and 'while' statements"
 
            ));
 
        }
 

	
 
        if !self.in_binding_expr.is_invalid() {
 
            let binding_expr = &ctx.heap[id];
 
            let previous_expr = &ctx.heap[self.in_binding_expr];
 
            return Err(ParseError::new_error_str_at_span(
 
                &ctx.module().source, binding_expr.full_span,
 
                "nested binding expressions are not allowed"
 
            ).with_info_str_at_span(
 
                &ctx.module().source, previous_expr.operator_span,
 
                "the outer binding expression is found here"
 
            ));
 
        }
 

	
 
        let mut seeking_parent = self.expr_parent;
 
        loop {
 
            // Perform upward search to make sure only LogicalAnd is applied to
 
            // the binding expression
 
            let valid = match seeking_parent {
 
                ExpressionParent::If(_) | ExpressionParent::While(_) => {
 
                    // Every parent expression (if any) were LogicalAnd.
 
                    break;
 
                }
 
                ExpressionParent::Expression(parent_id, _) => {
 
                    let parent_expr = &ctx.heap[parent_id];
 
                    match parent_expr {
 
                        Expression::Binary(parent_expr) => {
 
                            // Set new parent to continue the search. Otherwise
 
                            // halt and provide an error using the current
 
                            // parent.
 
                            if parent_expr.operation == BinaryOperator::LogicalAnd {
 
                                seeking_parent = parent_expr.parent;
 
                                true
 
                            } else {
 
                                false
 
                            }
 
                        },
 
                        _ => false,
 
                    }
 
                },
 
                _ => unreachable!(), // nested under if/while, so always expressions as parents
 
            };
 

	
 
            if !valid {
 
                let binding_expr = &ctx.heap[id];
 
                let parent_expr = &ctx.heap[seeking_parent.as_expression()];
 
                return Err(ParseError::new_error_str_at_span(
 
                    &ctx.module().source, binding_expr.full_span,
 
                    "only the logical-and operator (&&) may be applied to binding expressions"
 
                ).with_info_str_at_span(
 
                    &ctx.module().source, parent_expr.operation_span(),
 
                    "this was the disallowed operation applied to the result from a binding expression"
 
                ));
 
            }
 
        }
 

	
 
        // Perform all of the index/parent assignment magic
 
        let binding_expr = &mut ctx.heap[id];
 

	
 
        let old_expr_parent = self.expr_parent;
 
        binding_expr.parent = old_expr_parent;
 
        self.in_binding_expr = id;
 

	
 
        // Perform preliminary check on children: binding expressions only make
 
        // sense if the left hand side is just a variable expression, or if it
src/runtime2/communication.rs
Show inline comments
 
use crate::protocol::eval::*;
 
use crate::protocol::eval::value::ValueId;
 
use super::runtime::*;
 
use super::component::*;
 

	
 
// -----------------------------------------------------------------------------
 
// Generic types
 
// -----------------------------------------------------------------------------
 

	
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub struct PortId(pub u32);
 

	
 
impl PortId {
 
    /// This value is not significant, it is chosen to make debugging easier: a
 
    /// very large port number is more likely to shine a light on bugs.
 
    pub fn new_invalid() -> Self {
 
        return Self(u32::MAX);
 
    }
 
}
 

	
 
pub struct Channel {
 
    pub putter_id: PortId,
 
    pub getter_id: PortId,
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Data messages
 
// -----------------------------------------------------------------------------
 

	
 
#[derive(Debug)]
 
pub struct DataMessage {
 
    pub data_header: MessageDataHeader,
 
    pub sync_header: MessageSyncHeader,
 
    pub content: ValueGroup,
 
    pub ports: Vec<TransmittedPort>,
 
}
 

	
 
#[derive(Debug)]
 
pub enum PortAnnotationKind {
 
    Getter(PortAnnotationGetter),
 
    Putter(PortAnnotationPutter),
 
}
 

	
 
#[derive(Debug)]
 
pub struct PortAnnotationGetter {
 
    pub self_comp_id: CompId,
 
    pub self_port_id: PortId,
 
    pub peer_comp_id: CompId,
 
    pub peer_port_id: PortId,
 
}
 

	
 
#[derive(Debug)]
 
pub struct PortAnnotationPutter {
 
    pub self_comp_id: CompId,
 
    pub self_port_id: PortId,
 
}
 

	
 
#[derive(Debug)]
 
pub struct MessageDataHeader {
 
    pub expected_mapping: Vec<(PortAnnotationKind, Option<u32>)>,
 
    pub new_mapping: u32,
 
    pub source_port: PortId,
 
    pub target_port: PortId,
 
}
 

	
 
#[derive(Debug)]
 
pub struct TransmittedPort {
 
    pub locations: Vec<ValueId>, // within `content`
 
    pub messages: Vec<DataMessage>, // owned by previous component
 
    pub peer_comp: CompId,
 
    pub peer_port: PortId,
 
    pub kind: PortKind,
 
    pub state: PortState,
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Sync messages
 
// -----------------------------------------------------------------------------
 

	
 
#[derive(Debug)]
 
pub struct SyncMessage {
 
    pub sync_header: MessageSyncHeader,
 
    pub content: SyncMessageContent,
 
}
 

	
 
#[derive(Debug)]
 
pub enum SyncLocalSolutionEntry {
 
    Putter(SyncSolutionPutterPort),
 
    Getter(SyncSolutionGetterPort),
 
}
 

	
 
pub type SyncLocalSolution = Vec<SyncLocalSolutionEntry>;
 

	
 
/// Getter port in a solution. Upon receiving a message it is certain about who
 
/// its peer is.
 
#[derive(Debug)]
 
pub struct SyncSolutionGetterPort {
 
    pub self_comp_id: CompId,
 
    pub self_port_id: PortId,
 
    pub peer_comp_id: CompId,
 
    pub peer_port_id: PortId,
 
    pub mapping: u32,
 
    pub failed: bool,
 
}
 

	
 
/// Putter port in a solution. A putter may not be certain about who its peer
 
/// component/port is.
 
#[derive(Debug)]
 
pub struct SyncSolutionPutterPort {
 
    pub self_comp_id: CompId,
 
    pub self_port_id: PortId,
 
    pub mapping: u32,
 
    pub failed: bool,
 
}
 

	
 
#[derive(Debug)]
 
pub struct SyncSolutionChannel {
 
    pub putter: Option<SyncSolutionPutterPort>,
 
    pub getter: Option<SyncSolutionGetterPort>,
 
}
 

	
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub enum SyncRoundDecision {
 
    None,
 
    Solution,
 
    Failure,
 
}
 

	
 
#[derive(Debug)]
 
pub struct SyncPartialSolution {
 
    pub channel_mapping: Vec<SyncSolutionChannel>,
 
    pub decision: SyncRoundDecision,
 
}
 

	
 
impl Default for SyncPartialSolution {
 
    fn default() -> Self {
 
        return Self{
 
            channel_mapping: Vec::new(),
 
            decision: SyncRoundDecision::None,
 
        }
 
    }
 
}
 

	
 
#[derive(Debug)]
 
pub enum SyncMessageContent {
 
    NotificationOfLeader,
 
    LocalSolution(CompId, SyncLocalSolution), // local solution of the specified component
 
    PartialSolution(SyncPartialSolution), // partial solution of multiple components
 
    GlobalSolution,
 
    GlobalFailure,
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Control messages
 
// -----------------------------------------------------------------------------
 

	
 
#[derive(Debug)]
 
pub struct ControlMessage {
 
    pub(crate) id: ControlId,
 
    pub sender_comp_id: CompId,
 
    pub target_port_id: Option<PortId>,
 
    pub content: ControlMessageContent,
 
}
 

	
 
/// Content of a control message. If the content refers to a port then the
 
/// `target_port_id` field is the one that it refers to.
 
#[derive(Copy, Clone, Debug)]
 
pub enum ControlMessageContent {
 
    Ack,
 
    BlockPort,
 
    UnblockPort,
 
    ClosePort(ControlMessageClosePort),
 
    PortPeerChangedBlock,
 
    PortPeerChangedUnblock(PortId, CompId), // contains (new_port_id, new_component_id)
 
}
 

	
 
#[derive(Copy, Clone, Debug)]
 
pub struct ControlMessageClosePort {
 
    pub closed_in_sync_round: bool, // needed to ensure correct handling of errors
 
    pub registered_round: Option<u32>,
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Messages (generic)
 
// -----------------------------------------------------------------------------
 

	
 
#[derive(Debug)]
 
pub struct MessageSyncHeader {
 
    pub sync_round: u32,
 
    pub sending_id: CompId,
 
    pub highest_id: CompId,
 
}
 

	
 
#[derive(Debug)]
 
pub enum Message {
 
    Data(DataMessage),
 
    Sync(SyncMessage),
 
    Control(ControlMessage),
 
    Poll,
 
}
 

	
 
impl Message {
 
    pub(crate) fn target_port(&self) -> Option<PortId> {
 
        match self {
 
            Message::Data(v) =>
 
                return Some(v.data_header.target_port),
 
            Message::Control(v) =>
 
                return v.target_port_id,
 
            Message::Sync(_) =>
 
                return None,
 
            Message::Poll =>
 
                return None,
 
        }
 
    }
 

	
 
    pub(crate) fn modify_target_port(&mut self, port_id: PortId) {
 
        match self {
 
            Message::Data(v) =>
 
                v.data_header.target_port = port_id,
 
            Message::Control(v) =>
 
                v.target_port_id = Some(port_id),
 
            Message::Sync(_) => unreachable!(), // should never be called for this message type
 
            Message::Poll => unreachable!(),
 
        }
 
    }
 
}
 

	
 

	
src/runtime2/component/component.rs
Show inline comments
 
/*
 
 * Default toolkit for creating components. Contains handlers for initiating and
 
 * responding to various events.
 
 */
 

	
 
use std::fmt::{Display as FmtDisplay, Result as FmtResult, Formatter};
 

	
 
use crate::protocol::eval::{Prompt, EvalError, ValueGroup, Value, ValueId, PortId as EvalPortId};
 
use crate::protocol::*;
 
use crate::runtime2::*;
 
use crate::runtime2::communication::*;
 

	
 
use super::{CompCtx, CompPDL, CompId};
 
use super::component_context::*;
 
use super::component_random::*;
 
use super::component_internet::*;
 
use super::control_layer::*;
 
use super::consensus::*;
 

	
 
pub enum CompScheduling {
 
    Immediate,
 
    Requeue,
 
    Sleep,
 
    Exit,
 
}
 

	
 
/// Potential error emitted by a component
 
pub enum CompError {
 
    /// Error originating from the code executor. Hence has an associated
 
    /// source location.
 
    Executor(EvalError),
 
    /// Error originating from a component, but not necessarily associated with
 
    /// a location in the source.
 
    Component(String), // TODO: Maybe a different embedded value in the future?
 
    /// Pure runtime error. Not necessarily originating from the component
 
    /// itself. Should be treated as a very severe runtime-compromising error.
 
    Runtime(RtError),
 
}
 

	
 
impl FmtDisplay for CompError {
 
    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
 
        match self {
 
            CompError::Executor(v) => v.fmt(f),
 
            CompError::Component(v) => v.fmt(f),
 
            CompError::Runtime(v) => v.fmt(f),
 
        }
 
    }
 
}
 

	
 
/// Generic representation of a component (as viewed by a scheduler).
 
pub(crate) trait Component {
 
    /// Called upon the creation of the component. Note that the scheduler
 
    /// context is officially running another component (the component that is
 
    /// creating the new component).
 
    fn on_creation(&mut self, comp_id: CompId, sched_ctx: &SchedulerCtx);
 

	
 
    /// Called when a component crashes or wishes to exit. So is not called
 
    /// right before destruction, other components may still hold a handle to
 
    /// the component and send it messages!
 
    fn on_shutdown(&mut self, sched_ctx: &SchedulerCtx);
 

	
 
    /// Called if the component is created by another component and the messages
 
    /// are being transferred between the two.
 
    fn adopt_message(&mut self, comp_ctx: &mut CompCtx, message: DataMessage);
 

	
 
    /// Called if the component receives a new message. The component is
 
    /// responsible for deciding where that messages goes.
 
    fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message);
 

	
 
    /// Called if the component's routine should be executed. The return value
 
    /// can be used to indicate when the routine should be run again.
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling;
 
}
 

	
 
/// Representation of the generic operating mode of a component. Although not
 
/// every state may be used by every kind of (builtin) component, this allows
 
/// writing standard handlers for particular events in a component's lifetime.
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub(crate) enum CompMode {
 
    NonSync, // not in sync mode
 
    Sync, // in sync mode, can interact with other components
 
    SyncEnd, // awaiting a solution, i.e. encountered the end of the sync block
 
    BlockedGet, // blocked because we need to receive a message on a particular port
 
    BlockedPut, // component is blocked because the port is blocked
 
    BlockedSelect, // waiting on message to complete the select statement
 
    PutPortsBlockedTransferredPorts, // sending a message with ports, those sent ports are (partly) blocked
 
    PutPortsBlockedAwaitingAcks, // sent out PPC message for blocking transferred ports, now awaiting Acks
 
    PutPortsBlockedSendingPort, // sending a message with ports, message sent through a still-blocked port
 
    NewComponentBlocked, // waiting until ports are in the appropriate state to create a new component
 
    StartExit, // temporary state: if encountered then we start the shutdown process.
 
    BusyExit, // temporary state: waiting for Acks for all the closed ports, potentially waiting for sync round to finish
 
    Exit, // exiting: shutdown process started, now waiting until the reference count drops to 0
 
}
 

	
 
impl CompMode {
 
    pub(crate) fn is_in_sync_block(&self) -> bool {
 
        use CompMode::*;
 

	
 
        match self {
 
            Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect |
 
                PutPortsBlockedTransferredPorts |
 
                PutPortsBlockedAwaitingAcks |
 
                PutPortsBlockedSendingPort => true,
 
            NonSync | NewComponentBlocked | StartExit | BusyExit | Exit => false,
 
        }
 
    }
 

	
 
    pub(crate) fn is_busy_exiting(&self) -> bool {
 
        use CompMode::*;
 

	
 
        match self {
 
            NonSync | Sync | SyncEnd | BlockedGet | BlockedPut | BlockedSelect |
 
            PutPortsBlockedTransferredPorts |
 
            PutPortsBlockedAwaitingAcks |
 
            PutPortsBlockedSendingPort |
 
                NewComponentBlocked => false,
 
            StartExit | BusyExit => true,
 
            Exit => false,
 
        }
 
    }
 
}
 

	
 
#[derive(Debug)]
 
pub(crate) enum ExitReason {
 
    Termination, // regular termination of component
 
    ErrorInSync,
 
    ErrorNonSync,
 
}
 

	
 
impl ExitReason {
 
    pub(crate) fn is_in_sync(&self) -> bool {
 
        use ExitReason::*;
 

	
 
        match self {
 
            Termination | ErrorNonSync => false,
 
            ErrorInSync => true,
 
        }
 
    }
 

	
 
    pub(crate) fn is_error(&self) -> bool {
 
        use ExitReason::*;
 

	
 
        match self {
 
            Termination => false,
 
            ErrorInSync | ErrorNonSync => true,
 
        }
 
    }
 
}
 

	
 
/// Component execution state: the execution mode along with some descriptive
 
/// fields. Fields are public for ergonomic reasons, use member functions when
 
/// appropriate.
 
pub(crate) struct CompExecState {
 
    pub mode: CompMode,
 
    pub mode_port: PortId, // valid if blocked on a port (put/get)
 
    pub mode_value: ValueGroup, // valid if blocked on a put
 
    pub mode_component: (ProcedureDefinitionId, TypeId),
 
    pub exit_reason: ExitReason, // valid if in StartExit/BusyExit/Exit mode
 
}
 

	
 
impl CompExecState {
 
    pub(crate) fn new() -> Self {
 
        return Self{
 
            mode: CompMode::NonSync,
 
            mode_port: PortId::new_invalid(),
 
            mode_value: ValueGroup::default(),
 
            mode_component: (ProcedureDefinitionId::new_invalid(), TypeId::new_invalid()),
 
            exit_reason: ExitReason::Termination,
 
        }
 
    }
 

	
 
    pub(crate) fn set_as_start_exit(&mut self, reason: ExitReason) {
 
        self.mode = CompMode::StartExit;
 
        self.exit_reason = reason;
 
    }
 

	
 
    pub(crate) fn set_as_blocked_get(&mut self, port: PortId) {
 
        self.mode = CompMode::BlockedGet;
 
        self.mode_port = port;
 
        debug_assert!(self.mode_value.values.is_empty());
 
    }
 

	
 
    pub(crate) fn set_as_create_component_blocked(
 
        &mut self, proc_id: ProcedureDefinitionId, type_id: TypeId,
 
        arguments: ValueGroup
 
    ) {
 
        self.mode = CompMode::NewComponentBlocked;
 
        self.mode_value = arguments;
 
        self.mode_component = (proc_id, type_id);
 
    }
 

	
 
    pub(crate) fn is_blocked_on_get(&self, port: PortId) -> bool {
 
        return
 
            self.mode == CompMode::BlockedGet &&
 
            self.mode_port == port;
 
    }
 

	
 
    pub(crate) fn set_as_blocked_put_without_ports(&mut self, port: PortId, value: ValueGroup) {
 
        self.mode = CompMode::BlockedPut;
 
        self.mode_port = port;
 
        self.mode_value = value;
 
    }
 

	
 
    pub(crate) fn set_as_blocked_put_with_ports(&mut self, port: PortId, value: ValueGroup) {
 
        self.mode = CompMode::PutPortsBlockedTransferredPorts;
 
        self.mode_port = port;
 
        self.mode_value = value;
 
    }
 

	
 
    pub(crate) fn is_blocked_on_put_without_ports(&self, port: PortId) -> bool {
 
        return
 
            self.mode == CompMode::BlockedPut &&
 
            self.mode_port == port;
 
    }
 

	
 
    pub(crate) fn is_blocked_on_create_component(&self) -> bool {
 
        return self.mode == CompMode::NewComponentBlocked;
 
    }
 
}
 

	
 
// TODO: Replace when implementing port sending. Should probably be incorporated
 
//  into CompCtx (and rename CompCtx into CompComms)
 
pub(crate) type InboxMain = Vec<Option<DataMessage>>;
 
pub(crate) type InboxMainRef = [Option<DataMessage>];
 
pub(crate) type InboxBackup = Vec<DataMessage>;
 

	
 
/// Creates a new component based on its definition. Meaning that if it is a
 
/// user-defined component then we set up the PDL code state. Otherwise we
 
/// construct a custom component. This does NOT take care of port and message
 
/// management.
 
pub(crate) fn create_component(
 
    protocol: &ProtocolDescription,
 
    definition_id: ProcedureDefinitionId, type_id: TypeId,
 
    arguments: ValueGroup, num_ports: usize
 
) -> Box<dyn Component> {
 
    let definition = &protocol.heap[definition_id];
 
    debug_assert_eq!(definition.kind, ProcedureKind::Component);
 

	
 
    if definition.source.is_builtin() {
 
        // Builtin component
 
        let component: Box<dyn Component> = match definition.source {
 
            ProcedureSource::CompRandomU32 => Box::new(ComponentRandomU32::new(arguments)),
 
            ProcedureSource::CompTcpClient => Box::new(ComponentTcpClient::new(arguments)),
 
            ProcedureSource::CompTcpListener => Box::new(ComponentTcpListener::new(arguments)),
 
            _ => unreachable!(),
 
        };
 

	
 
        return component;
 
    } else {
 
        // User-defined component
 
        let prompt = Prompt::new(
 
            &protocol.types, &protocol.heap,
 
            definition_id, type_id, arguments
 
        );
 
        let component = CompPDL::new(prompt, num_ports);
 
        return Box::new(component);
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Generic component messaging utilities (for sending and receiving)
 
// -----------------------------------------------------------------------------
 

	
 
/// Default handling of sending a data message. In case the port is blocked then
 
/// the `ExecState` will become blocked as well. Note that
 
/// `default_handle_control_message` will ensure that the port becomes
 
/// unblocked if so instructed by the receiving component. The returned
 
/// scheduling value must be used.
 
#[must_use]
 
pub(crate) fn default_send_data_message(
 
    exec_state: &mut CompExecState, transmitting_port_id: PortId,
 
    port_instruction: PortInstruction, value: ValueGroup,
 
    sched_ctx: &SchedulerCtx, consensus: &mut Consensus,
 
    control: &mut ControlLayer, comp_ctx: &mut CompCtx
 
) -> Result<CompScheduling, (PortInstruction, String)> {
 
    debug_assert_eq!(exec_state.mode, CompMode::Sync);
 

	
 
    let port_handle = comp_ctx.get_port_handle(transmitting_port_id);
 
    let port_info = comp_ctx.get_port_mut(port_handle);
 
    port_info.last_instruction = port_instruction;
 
    port_info.last_registered_round = Some(consensus.round_number());
 

	
 
    let port_info = comp_ctx.get_port(port_handle);
 
    debug_assert_eq!(port_info.kind, PortKind::Putter);
 

	
 
    let mut ports = Vec::new();
 
    find_ports_in_value_group(&value, &mut ports);
 

	
 
    if port_info.state.is_closed() {
 
        // Note: normally peer is eventually consistent, but if it has shut down
 
        // then we can be sure it is consistent (I think?)
 
        return Err((
 
            port_info.last_instruction,
 
            format!("Cannot send on this port, as the peer (id:{}) has shut down", port_info.peer_comp_id.0)
 
        ))
 
    } else if !ports.is_empty() {
 
        start_send_message_with_ports(
 
            transmitting_port_id, port_instruction, value, exec_state,
 
            comp_ctx, sched_ctx, control
 
        )?;
 

	
 
        return Ok(CompScheduling::Sleep);
 
    } else if port_info.state.is_blocked() {
 
        // Port is blocked, so we cannot send
 
        exec_state.set_as_blocked_put_without_ports(transmitting_port_id, value);
 

	
 
        return Ok(CompScheduling::Sleep);
 
    } else {
 
        // Port is not blocked and no ports to transfer: send to the peer
 
        let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 
        let annotated_message = consensus.annotate_data_message(comp_ctx, port_info, value);
 
        peer_info.handle.send_message_logged(sched_ctx, Message::Data(annotated_message), true);
 

	
 
        return Ok(CompScheduling::Immediate);
 
    }
 
}
 

	
 
pub(crate) enum IncomingData {
 
    PlacedInSlot,
 
    SlotFull(DataMessage),
 
}
 

	
 
/// Default handling of receiving a data message. In case there is no room for
 
/// the message it is returned from this function. Note that this function is
 
/// different from PDL code performing a `get` on a port; this is the case where
 
/// the message first arrives at the component.
 
// NOTE: This is supposed to be a somewhat temporary implementation. It would be
 
//  nicest if the sending component can figure out it cannot send any more data.
 
#[must_use]
 
pub(crate) fn default_handle_incoming_data_message(
 
    exec_state: &mut CompExecState, inbox_main: &mut InboxMain,
 
    comp_ctx: &mut CompCtx, incoming_message: DataMessage,
 
    sched_ctx: &SchedulerCtx, control: &mut ControlLayer
 
) -> IncomingData {
 
    let port_handle = comp_ctx.get_port_handle(incoming_message.data_header.target_port);
 
    let port_index = comp_ctx.get_port_index(port_handle);
 
    comp_ctx.get_port_mut(port_handle).received_message_for_sync = true;
 
    let port_value_slot = &mut inbox_main[port_index];
 
    let target_port_id = incoming_message.data_header.target_port;
 

	
 
    if port_value_slot.is_none() {
 
        // We can put the value in the slot
 
        *port_value_slot = Some(incoming_message);
 

	
 
        // Check if we're blocked on receiving this message.
 
        dbg_code!({
 
            // Our port cannot have been blocked itself, because we're able to
 
            // directly insert the message into its slot.
 
            assert!(!comp_ctx.get_port(port_handle).state.is_blocked());
 
        });
 

	
 
        if exec_state.is_blocked_on_get(target_port_id) {
 
            // Return to normal operation
 
            exec_state.mode = CompMode::Sync;
 
            exec_state.mode_port = PortId::new_invalid();
 
            debug_assert!(exec_state.mode_value.values.is_empty());
 
        }
 

	
 
        return IncomingData::PlacedInSlot
 
    } else {
 
        // Slot is already full, so if the port was previously opened, it will
 
        // now become closed
 
        let port_info = comp_ctx.get_port_mut(port_handle);
 
        if port_info.state.is_open() {
 
            port_info.state.set(PortStateFlag::BlockedDueToFullBuffers);
 

	
 
            let (peer_handle, message) =
 
                control.initiate_port_blocking(comp_ctx, port_handle);
 
            let peer = comp_ctx.get_peer(peer_handle);
 
            peer.handle.send_message_logged(sched_ctx, Message::Control(message), true);
 
        }
 

	
 
        return IncomingData::SlotFull(incoming_message)
 
    }
 
}
 

	
 
pub(crate) enum GetResult {
 
    Received(DataMessage),
 
    NoMessage,
 
    Error((PortInstruction, String)),
 
}
 

	
 
/// Default attempt at trying to receive from a port (i.e. through a `get`, or
 
/// the equivalent operation for a builtin component). `target_port` is the port
 
/// we're trying to receive from, and the `target_port_instruction` is the
 
/// instruction we're attempting on this port.
 
pub(crate) fn default_attempt_get(
 
    exec_state: &mut CompExecState, target_port: PortId, target_port_instruction: PortInstruction,
 
    inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup, sched_ctx: &SchedulerCtx,
 
    comp_ctx: &mut CompCtx, control: &mut ControlLayer, consensus: &mut Consensus
 
) -> GetResult {
 
    let port_handle = comp_ctx.get_port_handle(target_port);
 
    let port_index = comp_ctx.get_port_index(port_handle);
 

	
 
    let port_info = comp_ctx.get_port_mut(port_handle);
 
    port_info.last_instruction = target_port_instruction;
 
    if port_info.state.is_closed() {
 
        let peer_id = port_info.peer_comp_id;
 
        return GetResult::Error((
 
            target_port_instruction,
 
            format!("Cannot get from this port, as the peer component (id:{}) closed the port", peer_id.0)
 
        ));
 
    }
 

	
 
    if let Some(message) = &inbox_main[port_index] {
 
        if consensus.try_receive_data_message(sched_ctx, comp_ctx, message) {
 
            // We're allowed to receive this message
 
            let mut message = inbox_main[port_index].take().unwrap();
 
            debug_assert_eq!(target_port, message.data_header.target_port);
 

	
 
            // Note: we can still run into an unrecoverable error when actually
 
            // receiving this message
 
            match default_handle_received_data_message(
 
                target_port, target_port_instruction,
 
                &mut message, inbox_main, inbox_backup,
 
                comp_ctx, sched_ctx, control, consensus
 
            ) {
 
                Ok(()) => return GetResult::Received(message),
 
                Err(location_and_message) => return GetResult::Error(location_and_message)
 
            }
 
        } else {
 
            // We're not allowed to receive this message. This means that the
 
            // receiver is attempting to receive something out of order with
 
            // respect to the sender.
 
            return GetResult::Error((target_port_instruction, String::from(
 
                "Cannot get from this port, as this causes a deadlock. This happens if you `get` in a different order as another component `put`s"
 
            )));
 
        }
 
    } else {
 
        // We don't have a message waiting for us and the port is not blocked.
 
        // So enter the BlockedGet state
 
        exec_state.set_as_blocked_get(target_port);
 
        return GetResult::NoMessage;
 
    }
 
}
 

	
 
/// Default handling that has been received through a `get`. Will check if any
 
/// more messages are waiting, and if the corresponding port was blocked because
 
/// of full buffers (hence, will use the control layer to make sure the peer
 
/// will become unblocked).
 
pub(crate) fn default_handle_received_data_message(
 
    targeted_port: PortId, _port_instruction: PortInstruction, message: &mut DataMessage,
 
    inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup,
 
    comp_ctx: &mut CompCtx, sched_ctx: &SchedulerCtx, control: &mut ControlLayer,
 
    consensus: &mut Consensus
 
) -> Result<(), (PortInstruction, String)> {
 
    let port_handle = comp_ctx.get_port_handle(targeted_port);
 
    let port_index = comp_ctx.get_port_index(port_handle);
 
    debug_assert!(inbox_main[port_index].is_none()); // because we've just received from it
 

	
 
    // If we received any ports, add them to the port tracking and inbox struct.
 
    // Then notify the peers that they can continue sending to this port, but
 
    // now at a new address.
 
    for received_port in &mut message.ports {
 
        // Transfer messages to main/backup inbox
 
        let _new_inbox_index = inbox_main.len();
 
        if !received_port.messages.is_empty() {
 
            inbox_main.push(Some(received_port.messages.remove(0)));
 
            inbox_backup.extend(received_port.messages.drain(..));
 
        } else {
 
            inbox_main.push(None);
 
        }
 

	
 
        // Create a new port locally
 
        let mut new_port_state = received_port.state;
 
        new_port_state.set(PortStateFlag::Received);
 
        let new_port_handle = comp_ctx.add_port(
 
            received_port.peer_comp, received_port.peer_port,
 
            received_port.kind, new_port_state
 
        );
 
        debug_assert_eq!(_new_inbox_index, comp_ctx.get_port_index(new_port_handle));
 
        comp_ctx.change_port_peer(sched_ctx, new_port_handle, Some(received_port.peer_comp));
 
        let new_port = comp_ctx.get_port(new_port_handle);
 

	
 
        // Transfer messages to main/backup inbox. Make sure to modify the
 
        // target port to our own
 
        for message in received_port.messages.iter_mut() {
 
            message.data_header.target_port = new_port.self_id;
 
        }
 

	
 
        let _new_inbox_index = inbox_main.len();
 
        if !received_port.messages.is_empty() {
 
            inbox_main.push(Some(received_port.messages.remove(0)));
 
            inbox_backup.extend(received_port.messages.drain(..));
 
        } else {
 
            inbox_main.push(None);
 
        }
 

	
 
        debug_assert_eq!(_new_inbox_index, comp_ctx.get_port_index(new_port_handle));
 

	
 
        // Add the port tho the consensus
 
        consensus.notify_of_new_port(_new_inbox_index, new_port_handle, comp_ctx);
 

	
 
        // Replace all references to the port in the received message
 
        for message_location in received_port.locations.iter().copied() {
 
            let value = message.content.get_value_mut(message_location);
 

	
 
            match value {
 
                Value::Input(_) => {
 
                    debug_assert_eq!(new_port.kind, PortKind::Getter);
 
                    *value = Value::Input(port_id_to_eval(new_port.self_id));
 
                },
 
                Value::Output(_) => {
 
                    debug_assert_eq!(new_port.kind, PortKind::Putter);
 
                    *value = Value::Output(port_id_to_eval(new_port.self_id));
 
                },
 
                _ => unreachable!(),
 
            }
 
        }
 

	
 
        // Let the peer know that the port can now be used
 
        let peer_handle = comp_ctx.get_peer_handle(new_port.peer_comp_id);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 

	
 
        peer_info.handle.send_message_logged(sched_ctx, Message::Control(ControlMessage{
 
            id: ControlId::new_invalid(),
 
            sender_comp_id: comp_ctx.id,
 
            target_port_id: Some(new_port.peer_port_id),
 
            content: ControlMessageContent::PortPeerChangedUnblock(new_port.self_id, comp_ctx.id)
 
        }), true);
 
    }
 

	
 
    // Modify last-known location where port instruction was retrieved
 
    let port_info = comp_ctx.get_port(port_handle);
 
    let port_info = comp_ctx.get_port_mut(port_handle);
 
    debug_assert_ne!(port_info.last_instruction, PortInstruction::None); // set by caller
 
    debug_assert!(port_info.state.is_open()); // checked by caller
 
    port_info.last_registered_round = Some(message.sync_header.sync_round);
 

	
 
    // Check if there are any more messages in the backup buffer
 
    for message_index in 0..inbox_backup.len() {
 
        let message = &inbox_backup[message_index];
 
        if message.data_header.target_port == targeted_port {
 
            // One more message, place it in the slot
 
            let message = inbox_backup.remove(message_index);
 
            debug_assert!(comp_ctx.get_port(port_handle).state.is_blocked()); // since we're removing another message from the backup
 
            inbox_main[port_index] = Some(message);
 

	
 
            return Ok(());
 
        }
 
    }
 

	
 
    // Did not have any more messages, so if we were blocked, then we need to
 
    // unblock the port now (and inform the peer of this unblocking)
 
    if port_info.state.is_set(PortStateFlag::BlockedDueToFullBuffers) {
 
        let port_info = comp_ctx.get_port_mut(port_handle);
 
        port_info.state.clear(PortStateFlag::BlockedDueToFullBuffers);
 

	
 
        let (peer_handle, message) = control.cancel_port_blocking(comp_ctx, port_handle);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 
        peer_info.handle.send_message_logged(sched_ctx, Message::Control(message), true);
 
    }
 

	
 
    return Ok(());
 
}
 

	
 
/// Handles control messages in the default way. Note that this function may
 
/// take a lot of actions in the name of the caller: pending messages may be
 
/// sent, ports may become blocked/unblocked, etc. So the execution
 
/// (`CompExecState`), control (`ControlLayer`) and consensus (`Consensus`)
 
/// state may all change.
 
pub(crate) fn default_handle_control_message(
 
    exec_state: &mut CompExecState, control: &mut ControlLayer, consensus: &mut Consensus,
 
    message: ControlMessage, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx,
 
    inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup
 
) -> Result<(), (PortInstruction, String)> {
 
    match message.content {
 
        ControlMessageContent::Ack => {
 
            default_handle_ack(exec_state, control, message.id, sched_ctx, comp_ctx, consensus, inbox_main, inbox_backup)?;
 
        },
 
        ControlMessageContent::BlockPort => {
 
            // One of our messages was accepted, but the port should be
 
            // blocked.
 
            let port_to_block = message.target_port_id.unwrap();
 
            let port_handle = comp_ctx.get_port_handle(port_to_block);
 
            let port_info = comp_ctx.get_port_mut(port_handle);
 
            debug_assert_eq!(port_info.kind, PortKind::Putter);
 
            port_info.state.set(PortStateFlag::BlockedDueToFullBuffers);
 
        },
 
        ControlMessageContent::ClosePort(content) => {
 
            // Request to close the port. We immediately comply and remove
 
            // the component handle as well
 
            let port_to_close = message.target_port_id.unwrap();
 
            let port_handle = comp_ctx.get_port_handle(port_to_close);
 

	
 
            // We're closing the port, so we will always update the peer of the
 
            // port (in case of error messages)
 
            let port_info = comp_ctx.get_port_mut(port_handle);
 
            port_info.peer_comp_id = message.sender_comp_id;
 
            port_info.close_at_sync_end = true; // might be redundant (we might set it closed now)
 

	
 
            let peer_comp_id = port_info.peer_comp_id;
 
            let peer_handle = comp_ctx.get_peer_handle(peer_comp_id);
 

	
 
            // One exception to sending an `Ack` is if we just closed the
 
            // port ourselves, meaning that the `ClosePort` messages got
 
            // sent to one another.
 
            if let Some(control_id) = control.has_close_port_entry(port_handle, comp_ctx) {
 
                // The two components (sender and this component) are closing
 
                // the channel at the same time. So we don't care about the
 
                // content of the `ClosePort` message.
 
                default_handle_ack(exec_state, control, control_id, sched_ctx, comp_ctx, consensus, inbox_main, inbox_backup)?;
 
            } else {
 
                // Respond to the message
 
                let port_info = comp_ctx.get_port(port_handle);
 
                let last_registered_round = port_info.last_registered_round;
 
                let last_instruction = port_info.last_instruction;
 
                let port_has_had_message = port_info.received_message_for_sync;
 
                default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx);
 
                comp_ctx.change_port_peer(sched_ctx, port_handle, None);
 

	
 
                // Handle any possible error conditions (which boil down to: the
 
                // port has been used, but the peer has died). If not in sync
 
                // mode then we close the port immediately.
 

	
 
                // Note that `port_was_used` does not mean that any messages
 
                // were actually received. It might also mean that e.g. the
 
                // component attempted a `get`, but there were no messages, so
 
                // now it is in the `BlockedGet` state.
 
                let port_was_used = last_instruction != PortInstruction::None;
 

	
 
                if exec_state.mode.is_in_sync_block() {
 
                    let closed_during_sync_round = content.closed_in_sync_round && port_was_used;
 
                    let closed_before_sync_round = !content.closed_in_sync_round && !port_has_had_message && port_was_used;
 
                    let round_has_succeeded = !content.closed_in_sync_round && last_registered_round == content.registered_round;
 
                    let closed_during_sync_round = content.closed_in_sync_round;
 
                    let closed_before_sync_round = !closed_during_sync_round && !round_has_succeeded;
 

	
 
                    if closed_during_sync_round || closed_before_sync_round {
 
                    if (closed_during_sync_round || closed_before_sync_round) && port_was_used {
 
                        return Err((
 
                            last_instruction,
 
                            format!("Peer component (id:{}) shut down, so communication cannot (have) succeed(ed)", peer_comp_id.0)
 
                        ));
 
                    }
 
                } else {
 
                    let port_info = comp_ctx.get_port_mut(port_handle);
 
                    port_info.state.set(PortStateFlag::Closed);
 
                }
 
            }
 
        },
 
        ControlMessageContent::UnblockPort => {
 
            // We were previously blocked (or already closed)
 
            let port_to_unblock = message.target_port_id.unwrap();
 
            let port_handle = comp_ctx.get_port_handle(port_to_unblock);
 
            let port_info = comp_ctx.get_port_mut(port_handle);
 

	
 
            debug_assert_eq!(port_info.kind, PortKind::Putter);
 
            debug_assert!(port_info.state.is_set(PortStateFlag::BlockedDueToFullBuffers));
 

	
 
            port_info.state.clear(PortStateFlag::BlockedDueToFullBuffers);
 
            default_handle_recently_unblocked_port(
 
                exec_state, control, consensus, port_handle, sched_ctx,
 
                comp_ctx, inbox_main, inbox_backup
 
            )?;
 
        },
 
        ControlMessageContent::PortPeerChangedBlock => {
 
            // The peer of our port has just changed. So we are asked to
 
            // temporarily block the port (while our original recipient is
 
            // potentially rerouting some of the in-flight messages) and
 
            // Ack. Then we wait for the `unblock` call.
 
            let port_to_change = message.target_port_id.unwrap();
 
            let port_handle = comp_ctx.get_port_handle(port_to_change);
 

	
 
            let port_info = comp_ctx.get_port_mut(port_handle);
 
            let peer_comp_id = port_info.peer_comp_id;
 
            port_info.state.set(PortStateFlag::BlockedDueToPeerChange);
 
            let peer_handle = comp_ctx.get_peer_handle(peer_comp_id);
 

	
 
            default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx);
 
        },
 
        ControlMessageContent::PortPeerChangedUnblock(new_port_id, new_comp_id) => {
 
            let port_to_change = message.target_port_id.unwrap();
 
            let port_handle = comp_ctx.get_port_handle(port_to_change);
 
            let port_info = comp_ctx.get_port(port_handle);
 
            debug_assert!(port_info.state.is_set(PortStateFlag::BlockedDueToPeerChange));
 

	
 
            let port_info = comp_ctx.get_port_mut(port_handle);
 
            port_info.peer_port_id = new_port_id;
 

	
 
            port_info.state.clear(PortStateFlag::BlockedDueToPeerChange);
 
            comp_ctx.change_port_peer(sched_ctx, port_handle, Some(new_comp_id));
 
            default_handle_recently_unblocked_port(
 
                exec_state, control, consensus, port_handle, sched_ctx,
 
                comp_ctx, inbox_main, inbox_backup
 
            )?;
 
        }
 
    }
 

	
 
    return Ok(());
 
}
 

	
 
/// Handles a component entering the synchronous block. Will ensure that the
 
/// `Consensus` and the `ComponentCtx` are initialized properly.
 
pub(crate) fn default_handle_sync_start(
 
    exec_state: &mut CompExecState, inbox_main: &mut InboxMainRef,
 
    sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus
 
) {
 
    sched_ctx.info("Component starting sync mode");
 

	
 
    // If any messages are present for this sync round, set the appropriate flag
 
    // and notify the consensus handler of the present messages
 
    consensus.notify_sync_start(comp_ctx);
 
    for (port_index, message) in inbox_main.iter().enumerate() {
 
        if let Some(message) = message {
 
            consensus.handle_incoming_data_message(comp_ctx, message);
 
            let port_info = comp_ctx.get_port_by_index_mut(port_index);
 
            port_info.received_message_for_sync = true;
 
        }
 
    }
 

	
 
    // Modify execution state
 
    debug_assert_eq!(exec_state.mode, CompMode::NonSync);
 
    exec_state.mode = CompMode::Sync;
 
}
 

	
 
/// Handles a component that has reached the end of the sync block. This does
 
/// not necessarily mean that the component will go into the `NonSync` mode, as
 
/// it might have to wait for the leader to finish the round for everyone (see
 
/// `default_handle_sync_decision`)
 
pub(crate) fn default_handle_sync_end(
 
    exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx,
 
    consensus: &mut Consensus
 
) {
 
    sched_ctx.info("Component ending sync mode (but possibly waiting for a solution)");
 
    debug_assert_eq!(exec_state.mode, CompMode::Sync);
 
    let decision = consensus.notify_sync_end_success(sched_ctx, comp_ctx);
 
    exec_state.mode = CompMode::SyncEnd;
 
    default_handle_sync_decision(sched_ctx, exec_state, comp_ctx, decision, consensus);
 
}
 

	
 
/// Handles a component initiating the exiting procedure, and closing all of its
 
/// ports. Should only be called once per component (which is ensured by
 
/// checking and modifying the mode in the execution state).
 
#[must_use]
 
pub(crate) fn default_handle_start_exit(
 
    exec_state: &mut CompExecState, control: &mut ControlLayer,
 
    sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, consensus: &mut Consensus
 
) -> CompScheduling {
 
    debug_assert_eq!(exec_state.mode, CompMode::StartExit);
 
    for port_index in 0..comp_ctx.num_ports() {
 
        let port_info = comp_ctx.get_port_by_index_mut(port_index);
 
        if port_info.state.is_blocked() {
 
            return CompScheduling::Sleep;
 
        }
 
    }
 

	
 
    sched_ctx.info(&format!("Component starting exit (reason: {:?})", exec_state.exit_reason));
 
    exec_state.mode = CompMode::BusyExit;
 
    let exit_inside_sync = exec_state.exit_reason.is_in_sync();
 

	
 
    // If exiting while inside sync mode, report to the leader of the current
 
    // round that we've failed.
 
    if exit_inside_sync {
 
        let decision = consensus.notify_sync_end_failure(sched_ctx, comp_ctx);
 
        default_handle_sync_decision(sched_ctx, exec_state, comp_ctx, decision, consensus);
 
    }
 

	
 
    // Iterating over ports by index to work around borrowing rules
 
    for port_index in 0..comp_ctx.num_ports() {
 
        let port = comp_ctx.get_port_by_index_mut(port_index);
 
        if port.state.is_closed() || port.state.is_set(PortStateFlag::Transmitted) || port.close_at_sync_end {
 
            // Already closed, or in the process of being closed
 
            continue;
 
        }
 

	
 
        // Mark as closed
 
        let port_id = port.self_id;
 
        port.state.set(PortStateFlag::Closed);
 

	
 
        // Notify peer of closing
 
        let port_handle = comp_ctx.get_port_handle(port_id);
 
        let (peer, message) = control.initiate_port_closing(port_handle, exit_inside_sync, comp_ctx);
 
        let peer_info = comp_ctx.get_peer(peer);
 
        peer_info.handle.send_message_logged(sched_ctx, Message::Control(message), true);
 
    }
 

	
 
    return CompScheduling::Immediate; // to check if we can shut down immediately
 
}
 

	
 
/// Handles a component waiting until all peers are notified that it is quitting
 
/// (i.e. after calling `default_handle_start_exit`).
 
#[must_use]
 
pub(crate) fn default_handle_busy_exit(
 
    exec_state: &mut CompExecState, control: &ControlLayer,
 
    sched_ctx: &SchedulerCtx
 
) -> CompScheduling {
 
    debug_assert_eq!(exec_state.mode, CompMode::BusyExit);
 
    if control.has_acks_remaining() {
 
        sched_ctx.info("Component busy exiting, still has `Ack`s remaining");
 
        return CompScheduling::Sleep;
 
    } else {
 
        sched_ctx.info("Component busy exiting, now shutting down");
 
        exec_state.mode = CompMode::Exit;
 
        return CompScheduling::Exit;
 
    }
 
}
 

	
 
/// Handles a potential synchronous round decision. If there was a decision then
 
/// the `Some(success)` value indicates whether the round succeeded or not.
 
/// Might also end up changing the `ExecState`.
 
///
 
/// Might be called in two cases:
 
/// 1. The component is in regular execution mode, at the end of a sync round,
 
///     and is waiting for a solution to the round.
 
/// 2. The component has encountered an error during a sync round and is
 
///     exiting, hence is waiting for a "Failure" message from the leader.
 
pub(crate) fn default_handle_sync_decision(
 
    sched_ctx: &SchedulerCtx, exec_state: &mut CompExecState, comp_ctx: &mut CompCtx,
 
    decision: SyncRoundDecision, consensus: &mut Consensus
 
) -> Option<bool> {
 
    let success = match decision {
 
        SyncRoundDecision::None => return None,
 
        SyncRoundDecision::Solution => true,
 
        SyncRoundDecision::Failure => false,
 
    };
 

	
 
    debug_assert!(
 
        exec_state.mode == CompMode::SyncEnd || (
 
            exec_state.mode.is_busy_exiting() && exec_state.exit_reason.is_error()
 
        ) || (
 
            exec_state.mode.is_in_sync_block() && decision == SyncRoundDecision::Failure
 
        )
 
    );
 

	
 
    sched_ctx.info(&format!("Handling decision {:?} (in mode: {:?})", decision, exec_state.mode));
 
    consensus.notify_sync_decision(decision);
 
    if success {
 
        // We cannot get a success message if the component has encountered an
 
        // error.
 
        for port_index in 0..comp_ctx.num_ports() {
 
            let port_info = comp_ctx.get_port_by_index_mut(port_index);
 
            if port_info.close_at_sync_end {
 
                port_info.state.set(PortStateFlag::Closed);
 
            }
 
            port_info.state.clear(PortStateFlag::Received);
 
        }
 
        debug_assert_eq!(exec_state.mode, CompMode::SyncEnd);
 
        exec_state.mode = CompMode::NonSync;
 
        return Some(true);
 
    } else {
 
        // We may get failure both in all possible cases. But we should only
 
        // modify the execution state if we're not already in exit mode
 
        if !exec_state.mode.is_busy_exiting() {
 
            sched_ctx.error("failed synchronous round, initiating exit");
 
            exec_state.set_as_start_exit(ExitReason::ErrorNonSync);
 
        }
 
        return Some(false);
 
    }
 
}
 

	
 
/// Special component creation function. This function assumes that the
 
/// transferred ports are NOT blocked, and that the channels to whom the ports
 
/// belong are fully owned by the creating component. This will be checked in
 
/// debug mode.
 
pub(crate) fn special_create_component(
 
    exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, instantiator_ctx: &mut CompCtx,
 
    control: &mut ControlLayer, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup,
 
    component_instance: Box<dyn Component>, component_ports: Vec<PortId>,
 
) {
 
    debug_assert_eq!(exec_state.mode, CompMode::NonSync);
 
    let reservation = sched_ctx.runtime.start_create_component();
 
    let mut created_ctx = CompCtx::new(&reservation);
 
    let mut port_pairs = Vec::new();
 

	
 
    // Retrieve ports
 
    for instantiator_port_id in component_ports.iter() {
 
        let instantiator_port_id = *instantiator_port_id;
 
        let instantiator_port_handle = instantiator_ctx.get_port_handle(instantiator_port_id);
 
        let instantiator_port = instantiator_ctx.get_port(instantiator_port_handle);
 

	
 
        // Check if conditions for calling this function are valid
 
        debug_assert!(!instantiator_port.state.is_blocked_due_to_port_change());
 
        debug_assert_eq!(instantiator_port.peer_comp_id, instantiator_ctx.id);
 

	
 
        // Create port at new component
 
        let created_port_handle = created_ctx.add_port(
 
            instantiator_port.peer_comp_id, instantiator_port.peer_port_id,
 
            instantiator_port.kind, instantiator_port.state
 
        );
 
        let created_port = created_ctx.get_port(created_port_handle);
 
        let created_port_id = created_port.self_id;
 

	
 
        // Store in port pairs
 
        let is_open = instantiator_port.state.is_open();
 
        port_pairs.push(PortPair{
 
            instantiator_id: instantiator_port_id,
 
            instantiator_handle: instantiator_port_handle,
 
            created_id: created_port_id,
 
            created_handle: created_port_handle,
 
            is_open,
 
        });
 
    }
 

	
 
    // Set peer of the port for the new component
 
    for pair in port_pairs.iter() {
 
        let instantiator_port_info = instantiator_ctx.get_port(pair.instantiator_handle);
 
        let created_port_info = created_ctx.get_port_mut(pair.created_handle);
 

	
 
        // Note: we checked above (in debug mode) that the peer of the port is
 
        // owned by the creator as well, now check if the peer is transferred
 
        // as well.
 
        let created_port_peer_index = port_pairs.iter()
 
            .position(|v| v.instantiator_id == instantiator_port_info.peer_port_id);
 

	
 
        match created_port_peer_index {
 
            Some(created_port_peer_index) => {
 
                // Both ends of the channel are moving to the new component
 
                let peer_pair = &port_pairs[created_port_peer_index];
 
                created_port_info.peer_port_id = peer_pair.created_id;
 
                created_port_info.peer_comp_id = reservation.id();
 
            },
 
            None => {
 
                created_port_info.peer_comp_id = instantiator_ctx.id;
 
                if pair.is_open {
 
                    created_ctx.change_port_peer(sched_ctx, pair.created_handle, Some(instantiator_ctx.id));
 
                }
 
            }
 
        }
 
    }
 

	
 
    // Store component in runtime storage and retrieve component fields in their
 
    // name memory location
 
    let (created_key, created_runtime_component) = sched_ctx.runtime.finish_create_component(
 
        reservation, component_instance, created_ctx, false
 
    );
 

	
 
    let created_ctx = &mut created_runtime_component.ctx;
 
    let created_component = &mut created_runtime_component.component;
 
    created_component.on_creation(created_key.downgrade(), sched_ctx);
 

	
 
    // Transfer messages and link instantiator to created component
 
    for pair in port_pairs.iter() {
 
        instantiator_ctx.change_port_peer(sched_ctx, pair.instantiator_handle, None);
 
        transfer_messages(inbox_main, inbox_backup, pair, instantiator_ctx, created_ctx, created_component.as_mut());
 
        instantiator_ctx.remove_port(pair.instantiator_handle);
 

	
 
        let created_port_info = created_ctx.get_port(pair.created_handle);
 
        if pair.is_open && created_port_info.peer_comp_id == instantiator_ctx.id {
 
            // Set up channel between instantiator component port, and its peer,
 
            // which is owned by the new component
 
            let instantiator_port_handle = instantiator_ctx.get_port_handle(created_port_info.peer_port_id);
 
            let instantiator_port_info = instantiator_ctx.get_port_mut(instantiator_port_handle);
 
            instantiator_port_info.peer_port_id = created_port_info.self_id;
 
            instantiator_ctx.change_port_peer(sched_ctx, instantiator_port_handle, Some(created_ctx.id));
 
        }
 
    }
 

	
 
    // By definition we did not have any remote peers for the transferred ports,
 
    // so we can schedule the new component immediately
 
    sched_ctx.runtime.enqueue_work(created_key);
 
}
 

	
 
/// Puts the component in an execution state where the specified component will
 
/// end up being created. The component goes through state changes (driven by
 
/// incoming control messages) to make sure that all of the ports that are going
 
/// to be transferred are not in a blocked state. Once finished the component
 
/// returns to the `NonSync` mode.
 
pub(crate) fn default_start_create_component(
 
    exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx,
 
    control: &mut ControlLayer, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup,
 
    definition_id: ProcedureDefinitionId, type_id: TypeId, arguments: ValueGroup
 
) {
 
    debug_assert_eq!(exec_state.mode, CompMode::NonSync);
 

	
 
    let mut transferred_ports = Vec::new();
 
    find_ports_in_value_group(&arguments, &mut transferred_ports);
 

	
 
    // Set execution state as waiting until we can create the component. If we
 
    // can do so right away, then we will.
 
    exec_state.set_as_create_component_blocked(definition_id, type_id, arguments);
 
    if ports_not_blocked(comp_ctx, &transferred_ports) {
 
        perform_create_component(exec_state, sched_ctx, comp_ctx, control, inbox_main, inbox_backup);
 
    }
 
}
 

	
 
/// Actually creates a component (and assumes that the caller made sure that
 
/// none of the ports are involved in a blocking operation).
 
pub(crate) fn perform_create_component(
 
    exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, instantiator_ctx: &mut CompCtx,
 
    control: &mut ControlLayer, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup
 
) {
 
    // Retrieve ports from the arguments
 
    debug_assert_eq!(exec_state.mode, CompMode::NewComponentBlocked);
 

	
 
    let (procedure_id, procedure_type_id) = exec_state.mode_component;
 
    let mut arguments = exec_state.mode_value.take();
 
    let mut ports = Vec::new();
 
    find_ports_in_value_group(&arguments, &mut ports);
 
    debug_assert!(ports_not_blocked(instantiator_ctx, &ports));
 

	
 
    // Reserve a location for the new component
 
    let reservation = sched_ctx.runtime.start_create_component();
 
    let mut created_ctx = CompCtx::new(&reservation);
 

	
 
    let mut port_pairs = Vec::with_capacity(ports.len());
 

	
 
    // Go over all the ports that will be transferred. Since the ports will get
 
    // a new ID in the new component, we will take care of that here.
 
    for (port_location, instantiator_port_id) in &ports {
 
        // Retrieve port information from instantiator
 
        let instantiator_port_id = *instantiator_port_id;
 
        let instantiator_port_handle = instantiator_ctx.get_port_handle(instantiator_port_id);
 
        let instantiator_port = instantiator_ctx.get_port(instantiator_port_handle);
 

	
 
        // Create port at created component
 
        let created_port_handle = created_ctx.add_port(
 
            instantiator_port.peer_comp_id, instantiator_port.peer_port_id,
 
            instantiator_port.kind, instantiator_port.state
 
        );
 
        let created_port = created_ctx.get_port(created_port_handle);
 
        let created_port_id = created_port.self_id;
 

	
 
        // Modify port ID in the arguments to the new component and store them
 
        // for later access
 
        let is_open = instantiator_port.state.is_open();
 
        port_pairs.push(PortPair{
 
            instantiator_id: instantiator_port_id,
 
            instantiator_handle: instantiator_port_handle,
 
            created_id: created_port_id,
 
            created_handle: created_port_handle,
 
            is_open,
 
        });
 

	
 
        for location in port_location.iter().copied() {
 
            let value = arguments.get_value_mut(location);
 
            match value {
 
                Value::Input(id) => *id = port_id_to_eval(created_port_id),
 
                Value::Output(id) => *id = port_id_to_eval(created_port_id),
 
                _ => unreachable!(),
 
            }
 
        }
 
    }
 

	
 
    // For each of the ports in the newly created component we set the peer to
 
    // the correct value. We will not yet change the peer on the instantiator's
 
    // ports (as we haven't yet stored the new component in the runtime's
 
    // component storage)
 
    let mut created_component_has_remote_peers = false;
 
    for pair in port_pairs.iter() {
 
        let instantiator_port_info = instantiator_ctx.get_port(pair.instantiator_handle);
 
        let created_port_info = created_ctx.get_port_mut(pair.created_handle);
 

	
 
        if created_port_info.peer_comp_id == instantiator_ctx.id {
 
            // The peer of the created component's port seems to be the
 
            // instantiator.
 
            let created_port_peer_index = port_pairs.iter()
 
                .position(|v| v.instantiator_id == instantiator_port_info.peer_port_id);
 

	
 
            match created_port_peer_index {
 
                Some(created_port_peer_index) => {
 
                    // However, the peer port is also moved to the new
 
                    // component, so the complete channel is owned by the new
 
                    // component.
 
                    let peer_pair = &port_pairs[created_port_peer_index];
 
                    created_port_info.peer_port_id = peer_pair.created_id;
 
                    created_port_info.peer_comp_id = reservation.id();
 
                },
 
                None => {
 
                    // Peer port remains with instantiator. However, we cannot
 
                    // set the peer on the instantiator yet, because the new
 
                    // component has not yet been stored in the runtime's
 
                    // component storage. So we do this later
 
                    created_port_info.peer_comp_id = instantiator_ctx.id;
 
                    if pair.is_open {
 
                        created_ctx.change_port_peer(sched_ctx, pair.created_handle, Some(instantiator_ctx.id));
 
                    }
 
                }
 
            }
 
        } else {
 
            // Peer is a different component
 
            if pair.is_open {
 
                // And the port is still open, so we need to notify the peer
 
                let peer_handle = instantiator_ctx.get_peer_handle(created_port_info.peer_comp_id);
 
                let peer_info = instantiator_ctx.get_peer(peer_handle);
 
                created_ctx.change_port_peer(sched_ctx, pair.created_handle, Some(peer_info.id));
 
                created_component_has_remote_peers = true;
 
            }
 
        }
 
    }
 

	
 
    // Now we store the new component into the runtime's component storage using
 
    // the reservation.
 
    let component = create_component(
 
        &sched_ctx.runtime.protocol, procedure_id, procedure_type_id,
 
        arguments, port_pairs.len()
 
    );
 
    let (created_key, created_runtime_component) = sched_ctx.runtime.finish_create_component(
 
        reservation, component, created_ctx, false
 
    );
 
    let created_ctx = &mut created_runtime_component.ctx;
 
    let created_component = &mut created_runtime_component.component;
src/runtime2/component/component_context.rs
Show inline comments
 
use std::fmt::{Debug, Formatter, Result as FmtResult};
 

	
 
use crate::runtime2::scheduler::*;
 
use crate::runtime2::runtime::*;
 
use crate::runtime2::communication::*;
 

	
 
use crate::protocol::ExpressionId;
 

	
 
/// Helper struct to remember when the last operation on the port took place.
 
#[derive(Debug, PartialEq, Copy, Clone)]
 
pub enum PortInstruction {
 
    None,
 
    NoSource,
 
    SourceLocation(ExpressionId),
 
}
 

	
 
impl PortInstruction {
 
    pub fn is_none(&self) -> bool {
 
        match self {
 
            PortInstruction::None => return true,
 
            _ => return false,
 
        }
 
    }
 
}
 

	
 
/// Directionality of a port
 
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
 
pub enum PortKind {
 
    Putter,
 
    Getter,
 
}
 

	
 
/// Bitflags for port
 
// TODO: Incorporate remaining flags from `Port` struct
 
#[repr(u32)]
 
#[derive(Debug, Copy, Clone)]
 
pub enum PortStateFlag {
 
    Closed = 0x01, // If not closed, then the port is open
 
    BlockedDueToPeerChange = 0x02, // busy changing peers, hence use of port is temporarily blocked
 
    BlockedDueToFullBuffers = 0x04,
 
    Transmitted = 0x08, // Transmitted, so cannot be used anymore
 
    Received = 0x10, // Received, so cannot be used yet, only after the sync round
 
}
 

	
 
#[derive(Copy, Clone)]
 
pub struct PortState {
 
    flags: u32
 
}
 

	
 
impl PortState {
 
    pub(crate) fn new() -> PortState {
 
        return PortState{ flags: 0 }
 
    }
 

	
 
    // high-level
 

	
 
    #[inline]
 
    pub fn is_open(&self) -> bool {
 
        return !self.is_closed();
 
    }
 

	
 
    #[inline]
 
    pub fn can_send(&self) -> bool {
 
        return
 
            !self.is_set(PortStateFlag::Closed) &&
 
            !self.is_set(PortStateFlag::Transmitted) &&
 
            !self.is_set(PortStateFlag::Received);
 
    }
 

	
 
    #[inline]
 
    pub fn is_closed(&self) -> bool {
 
        return self.is_set(PortStateFlag::Closed);
 
    }
 

	
 
    #[inline]
 
    pub fn is_blocked(&self) -> bool {
 
        return
 
            self.is_set(PortStateFlag::BlockedDueToPeerChange) ||
 
            self.is_set(PortStateFlag::BlockedDueToFullBuffers);
 
    }
 

	
 
    #[inline]
 
    pub fn is_blocked_due_to_port_change(&self) -> bool {
 
        return self.is_set(PortStateFlag::BlockedDueToPeerChange);
 
    }
 

	
 
    // lower-level utils
 
    #[inline]
 
    pub fn set(&mut self, flag: PortStateFlag) {
 
        self.flags |= flag as u32;
 
    }
 

	
 
    #[inline]
 
    pub fn clear(&mut self, flag: PortStateFlag) {
 
        self.flags &= !(flag as u32);
 
    }
 

	
 
    #[inline]
 
    pub const fn is_set(&self, flag: PortStateFlag) -> bool {
 
        return (self.flags & (flag as u32)) != 0;
 
    }
 
}
 

	
 
impl Debug for PortState {
 
    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
 
        use PortStateFlag::*;
 

	
 
        let mut s = f.debug_struct("PortState");
 
        for (flag_name, flag_value) in &[
 
            ("closed", Closed),
 
            ("blocked_peer_change", BlockedDueToPeerChange),
 
            ("blocked_full_buffers", BlockedDueToFullBuffers),
 
            ("transmitted", Transmitted),
 
        ] {
 
            s.field(flag_name, &self.is_set(*flag_value));
 
        }
 

	
 
        return s.finish();
 
    }
 
}
 

	
 
#[derive(Debug)]
 
pub struct Port {
 
    // Identifiers
 
    pub self_id: PortId,
 
    pub peer_comp_id: CompId, // eventually consistent
 
    pub peer_port_id: PortId, // eventually consistent
 
    // Generic operating state
 
    pub kind: PortKind,
 
    pub state: PortState,
 
    // State tracking for error detection and error handling
 
    pub last_registered_round: Option<u32>,
 
    pub last_instruction: PortInstruction, // used during sync round to detect port-closed-during-sync errors
 
    pub received_message_for_sync: bool, // used during sync round to detect port-closed-before-sync errors
 
    pub close_at_sync_end: bool, // set during sync round when receiving a port-closed-after-sync message
 
    pub(crate) associated_with_peer: bool,
 
}
 

	
 
pub struct Peer {
 
    pub id: CompId,
 
    pub num_associated_ports: u32,
 
    pub(crate) handle: CompHandle,
 
}
 

	
 
/// Port and peer management structure. Will keep a local reference counter to
 
/// the ports associate with peers, additionally manages the atomic reference
 
/// counter associated with the peers' component handles.
 
pub struct CompCtx {
 
    pub id: CompId,
 
    ports: Vec<Port>,
 
    peers: Vec<Peer>,
 
    port_id_counter: u32,
 
}
 

	
 
#[derive(Copy, Clone, PartialEq, Eq)]
 
pub struct LocalPortHandle(PortId);
 

	
 
#[derive(Copy, Clone)]
 
pub struct LocalPeerHandle(CompId);
 

	
 
impl CompCtx {
 
    /// Creates a new component context based on a reserved entry in the
 
    /// component store. This reservation is used such that we already know our
 
    /// assigned ID.
 
    pub(crate) fn new(reservation: &CompReserved) -> Self {
 
        return Self{
 
            id: reservation.id(),
 
            ports: Vec::new(),
 
            peers: Vec::new(),
 
            port_id_counter: 0,
 
        }
 
    }
 

	
 
    /// Creates a new channel that is fully owned by the component associated
 
    /// with this context.
 
    pub(crate) fn create_channel(&mut self) -> Channel {
 
        let putter_id = PortId(self.take_port_id());
 
        let getter_id = PortId(self.take_port_id());
 
        self.ports.push(Port{
 
            self_id: putter_id,
 
            peer_port_id: getter_id,
 
            kind: PortKind::Putter,
 
            state: PortState::new(),
 
            peer_comp_id: self.id,
 
            last_registered_round: None,
 
            last_instruction: PortInstruction::None,
 
            close_at_sync_end: false,
 
            received_message_for_sync: false,
 
            associated_with_peer: false,
 
        });
 
        self.ports.push(Port{
 
            self_id: getter_id,
 
            peer_port_id: putter_id,
 
            kind: PortKind::Getter,
 
            state: PortState::new(),
 
            peer_comp_id: self.id,
 
            last_registered_round: None,
 
            last_instruction: PortInstruction::None,
 
            close_at_sync_end: false,
 
            received_message_for_sync: false,
 
            associated_with_peer: false,
 
        });
 

	
 
        return Channel{ putter_id, getter_id };
 
    }
 

	
 
    /// Adds a new port. Make sure to call `change_peer` afterwards.
 
    pub(crate) fn add_port(&mut self, peer_comp_id: CompId, peer_port_id: PortId, kind: PortKind, state: PortState) -> LocalPortHandle {
 
        let self_id = PortId(self.take_port_id());
 
        self.ports.push(Port{
 
            self_id, peer_comp_id, peer_port_id, kind, state,
 
            last_registered_round: None,
 
            last_instruction: PortInstruction::None,
 
            close_at_sync_end: false,
 
            received_message_for_sync: false,
 
            associated_with_peer: false,
 
        });
 
        return LocalPortHandle(self_id);
 
    }
 

	
 
    /// Adds a self-reference. Called by the runtime/scheduler
 
    pub(crate) fn add_self_reference(&mut self, self_handle: CompHandle) {
 
        debug_assert_eq!(self.id, self_handle.id());
 
        debug_assert!(self.get_peer_index_by_id(self.id).is_none());
 
        self.peers.push(Peer{
 
            id: self.id,
 
            num_associated_ports: 0,
 
            handle: self_handle
 
        });
 
    }
 

	
 
    /// Removes a self-reference. Called by the runtime/scheduler
 
    pub(crate) fn remove_self_reference(&mut self) -> Option<CompKey> {
 
        let self_index = self.get_peer_index_by_id(self.id).unwrap();
 
        let peer = &mut self.peers[self_index];
 
        let maybe_comp_key = peer.handle.decrement_users();
 
        self.peers.remove(self_index);
 

	
 
        return maybe_comp_key;
 
    }
 

	
 
    /// Removes a port. Make sure you called `change_peer` first.
 
    pub(crate) fn remove_port(&mut self, port_handle: LocalPortHandle) -> Port {
 
        let port_index = self.must_get_port_index(port_handle);
 
        let port = self.ports.remove(port_index);
 
        dbg_code!(assert!(!port.associated_with_peer));
 
        return port;
 
    }
 

	
 
    /// Changes a peer
 
    pub(crate) fn change_port_peer(&mut self, sched_ctx: &SchedulerCtx, port_handle: LocalPortHandle, new_peer_comp_id: Option<CompId>) {
 
        // If port is currently associated with a peer, then remove that peer
 
        let port_index = self.get_port_index(port_handle);
 
        let port = &mut self.ports[port_index];
 
        let port_is_closed = port.state.is_closed();
 
        if port.associated_with_peer {
 
            // Remove old peer association
 
            port.associated_with_peer = false;
 
            let peer_comp_id = port.peer_comp_id;
 
            let peer_index = self.get_peer_index_by_id(peer_comp_id).unwrap();
 
            let peer = &mut self.peers[peer_index];
 

	
 
            peer.num_associated_ports -= 1;
 
            if peer.num_associated_ports == 0 {
 
                let mut peer = self.peers.remove(peer_index);
 
                if let Some(key) = peer.handle.decrement_users() {
 
                    sched_ctx.runtime.destroy_component(key);
 
                }
 
            }
 
        }
 

	
 
        // If there is a new peer, then set it as the peer associated with the
 
        // port
 
        if let Some(peer_id) = new_peer_comp_id {
 
            let port = &mut self.ports[port_index];
 
            port.peer_comp_id = peer_id;
 

	
 
            if peer_id != self.id && !port_is_closed {
 
                port.associated_with_peer = true;
 

	
 
                match self.get_peer_index_by_id(peer_id) {
 
                    Some(index) => {
 
                        let peer = &mut self.peers[index];
 
                        peer.num_associated_ports += 1;
 
                    },
 
                    None => {
 
                        let handle = sched_ctx.runtime.get_component_public(peer_id);
 
                        self.peers.push(Peer {
 
                            id: peer_id,
 
                            num_associated_ports: 1,
 
                            handle
 
                        })
 
                    }
 
                }
 
            }
 
        }
 
    }
 

	
 
    pub(crate) fn get_port_handle(&self, port_id: PortId) -> LocalPortHandle {
 
        return LocalPortHandle(port_id);
 
    }
 

	
 
    // should perhaps be revised, used in main inbox
 
    pub(crate) fn get_port_index(&self, port_handle: LocalPortHandle) -> usize {
 
        return self.must_get_port_index(port_handle);
 
    }
 

	
 
    pub(crate) fn get_peer_handle(&self, peer_id: CompId) -> LocalPeerHandle {
 
        return LocalPeerHandle(peer_id);
 
    }
 

	
 
    pub(crate) fn get_port(&self, port_handle: LocalPortHandle) -> &Port {
 
        let index = self.must_get_port_index(port_handle);
 
        return &self.ports[index];
 
    }
 

	
 
    pub(crate) fn get_port_mut(&mut self, port_handle: LocalPortHandle) -> &mut Port {
 
        let index = self.must_get_port_index(port_handle);
 
        return &mut self.ports[index];
 
    }
 

	
 
    pub(crate) fn get_port_by_index_mut(&mut self, index: usize) -> &mut Port {
 
        return &mut self.ports[index];
 
    }
 

	
 
    pub(crate) fn get_peer(&self, peer_handle: LocalPeerHandle) -> &Peer {
 
        let index = self.must_get_peer_index(peer_handle);
 
        return &self.peers[index];
 
    }
 

	
 
    pub(crate) fn get_peer_mut(&mut self, peer_handle: LocalPeerHandle) -> &mut Peer {
 
        let index = self.must_get_peer_index(peer_handle);
 
        return &mut self.peers[index];
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn iter_ports(&self) -> impl Iterator<Item=&Port> {
 
        return self.ports.iter();
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn iter_ports_mut(&mut self) -> impl Iterator<Item=&mut Port> {
 
        return self.ports.iter_mut();
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn iter_peers(&self) -> impl Iterator<Item=&Peer> {
 
        return self.peers.iter();
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn num_ports(&self) -> usize {
 
        return self.ports.len();
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Local utilities
 
    // -------------------------------------------------------------------------
 

	
 
    fn must_get_port_index(&self, handle: LocalPortHandle) -> usize {
 
        for (index, port) in self.ports.iter().enumerate() {
 
            if port.self_id == handle.0 {
 
                return index;
 
            }
 
        }
 

	
 
        unreachable!()
 
    }
 

	
 
    fn must_get_peer_index(&self, handle: LocalPeerHandle) -> usize {
 
        for (index, peer) in self.peers.iter().enumerate() {
 
            if peer.id == handle.0 {
 
                return index;
 
            }
 
        }
 

	
 
        unreachable!()
 
    }
 

	
 
    fn get_peer_index_by_id(&self, comp_id: CompId) -> Option<usize> {
 
        for (index, peer) in self.peers.iter().enumerate() {
 
            if peer.id == comp_id {
 
                return Some(index);
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    fn take_port_id(&mut self) -> u32 {
 
        let port_id = self.port_id_counter;
 
        self.port_id_counter = self.port_id_counter.wrapping_add(1);
 
        return port_id;
 
    }
 
}
 
\ No newline at end of file
src/runtime2/component/consensus.rs
Show inline comments
 
use crate::protocol::eval::ValueGroup;
 
use crate::runtime2::scheduler::*;
 
use crate::runtime2::runtime::*;
 
use crate::runtime2::communication::*;
 

	
 
use super::component_context::*;
 

	
 
pub struct PortAnnotation {
 
    self_comp_id: CompId,
 
    self_port_id: PortId,
 
    peer_comp_id: CompId, // only valid for getter ports
 
    peer_port_id: PortId, // only valid for getter ports
 
    peer_discovered: bool, // only valid for getter ports
 
    mapping: Option<u32>,
 
    kind: PortKind,
 
}
 

	
 
impl PortAnnotation {
 
    fn new(comp_id: CompId, port_id: PortId, kind: PortKind) -> Self {
 
        return Self{
 
            self_comp_id: comp_id,
 
            self_port_id: port_id,
 
            peer_comp_id: CompId::new_invalid(),
 
            peer_port_id: PortId::new_invalid(),
 
            peer_discovered: false,
 
            mapping: None,
 
            kind,
 
        }
 
    }
 
}
 

	
 
#[derive(Debug, Eq, PartialEq)]
 
enum Mode {
 
    NonSync,
 
    SyncBusy,
 
    SyncAwaitingSolution,
 
    SelectBusy,
 
    SelectWait,
 
}
 

	
 
struct SolutionCombiner {
 
    solution: SyncPartialSolution,
 
    matched_channels: usize,
 
}
 

	
 
impl SolutionCombiner {
 
    fn new() -> Self {
 
        return Self {
 
            solution: SyncPartialSolution::default(),
 
            matched_channels: 0,
 
        }
 
    }
 

	
 
    #[inline]
 
    fn has_contributions(&self) -> bool {
 
        return !self.solution.channel_mapping.is_empty();
 
    }
 

	
 
    /// Returns a decision for the current round. If there is no decision (yet)
 
    /// then `RoundDecision::None` is returned.
 
    fn get_decision(&self) -> SyncRoundDecision {
 
        if self.matched_channels == self.solution.channel_mapping.len() {
 
            debug_assert_ne!(self.solution.decision, SyncRoundDecision::None);
 
            return self.solution.decision;
 
        }
 

	
 
        return SyncRoundDecision::None; // even in case of failure: wait for everyone.
 
    }
 

	
 
    fn combine_with_partial_solution(&mut self, partial: SyncPartialSolution) {
 
        debug_assert_ne!(self.solution.decision, SyncRoundDecision::Solution);
 
        debug_assert_ne!(partial.decision, SyncRoundDecision::Solution);
 

	
 
        if partial.decision == SyncRoundDecision::Failure {
 
            self.solution.decision = SyncRoundDecision::Failure;
 
        }
 

	
 
        for entry in partial.channel_mapping {
 
            let channel_index = if entry.getter.is_some() && entry.putter.is_some() {
 
                let channel_index = self.solution.channel_mapping.len();
 
                self.solution.channel_mapping.push(entry);
 

	
 
                channel_index
 
            } else if let Some(putter) = entry.putter {
 
                self.combine_with_putter_port(putter)
 
            } else if let Some(getter) = entry.getter {
 
                self.combine_with_getter_port(getter)
 
            } else {
 
                unreachable!(); // both putter and getter are None
 
            };
 

	
 
            let channel = &self.solution.channel_mapping[channel_index];
 
            if let Some(consistent) = Self::channel_is_consistent(channel) {
 
                if !consistent {
 
                    self.solution.decision = SyncRoundDecision::Failure;
 
                }
 
                self.matched_channels += 1;
 
            }
 
        }
 

	
 
        self.update_solution();
 
    }
 

	
 
    /// Combines the currently stored global solution (if any) with the newly
 
    /// provided local solution. Make sure to check the `has_decision` return
 
    /// value afterwards.
 
    fn combine_with_local_solution(&mut self, _comp_id: CompId, solution: SyncLocalSolution) {
 
        debug_assert_ne!(self.solution.decision, SyncRoundDecision::Solution);
 

	
 
        // Combine partial solution with the local solution entries
 
        for entry in solution {
 
            // Match the current entry up with its peer endpoint, or add a new
 
            // entry.
 
            let channel_index = match entry {
 
                SyncLocalSolutionEntry::Putter(putter) => {
 
                    self.combine_with_putter_port(putter)
 
                },
 
                SyncLocalSolutionEntry::Getter(getter) => {
 
                    self.combine_with_getter_port(getter)
 
                }
 
            };
 

	
 
            // Check if channel is now consistent
 
            let channel = &self.solution.channel_mapping[channel_index];
 
            if let Some(consistent) = Self::channel_is_consistent(channel) {
 
                if !consistent {
 
                    self.solution.decision = SyncRoundDecision::Failure;
 
                }
 
                self.matched_channels += 1;
 
            }
 
        }
 

	
 
        self.update_solution();
 
    }
 

	
 
    /// Takes whatever partial solution is present in the solution combiner and
 
    /// returns it. The solution combiner's solution will end up being empty.
 
    /// This is used when a new leader is found and we need to pass along our
 
    /// partial results.
 
    fn take_partial_solution(&mut self) -> SyncPartialSolution {
 
        let mut partial_solution = SyncPartialSolution::default();
 
        std::mem::swap(&mut partial_solution, &mut self.solution);
 
        self.clear();
 

	
 
        return partial_solution;
 
    }
 

	
 
    fn clear(&mut self) {
 
        self.solution.channel_mapping.clear();
 
        self.solution.decision = SyncRoundDecision::None;
 
        self.matched_channels = 0;
 
    }
 

	
 
    // --- Small utilities for combining solutions
 

	
 
    fn combine_with_putter_port(&mut self, putter: SyncSolutionPutterPort) -> usize {
 
        let channel_index = self.get_channel_index_for_putter(putter.self_comp_id, putter.self_port_id);
 
        if let Some(channel_index) = channel_index {
 
            let channel = &mut self.solution.channel_mapping[channel_index];
 
            debug_assert!(channel.putter.is_none());
 
            channel.putter = Some(putter);
 

	
 
            return channel_index;
 
        } else {
 
            let channel_index = self.solution.channel_mapping.len();
 
            self.solution.channel_mapping.push(SyncSolutionChannel{
 
                putter: Some(putter),
 
                getter: None,
 
            });
 

	
 
            return channel_index;
 
        }
 
    }
 

	
 
    fn combine_with_getter_port(&mut self, getter: SyncSolutionGetterPort) -> usize {
 
        let channel_index = self.get_channel_index_for_getter(getter.peer_comp_id, getter.peer_port_id);
 
        if let Some(channel_index) = channel_index {
 
            let channel = &mut self.solution.channel_mapping[channel_index];
 
            debug_assert!(channel.getter.is_none());
 
            channel.getter = Some(getter);
 

	
 
            return channel_index;
 
        } else {
 
            let channel_index = self.solution.channel_mapping.len();
 
            self.solution.channel_mapping.push(SyncSolutionChannel{
 
                putter: None,
 
                getter: Some(getter)
 
            });
 

	
 
            return channel_index;
 
        }
 
    }
 

	
 
    /// Retrieve index of the channel containing a getter port that has received
 
    /// from the specified putter port.
 
    fn get_channel_index_for_putter(&self, putter_comp_id: CompId, putter_port_id: PortId) -> Option<usize> {
 
        for (channel_index, channel) in self.solution.channel_mapping.iter().enumerate() {
 
            if let Some(getter) = &channel.getter {
 
                if getter.peer_comp_id == putter_comp_id && getter.peer_port_id == putter_port_id {
 
                    return Some(channel_index);
 
                }
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    /// Retrieve index of the channel for a getter port. To find this channel
 
    /// the **peer** component/port IDs of the getter port are used.
 
    fn get_channel_index_for_getter(&self, peer_comp_id: CompId, peer_port_id: PortId) -> Option<usize> {
 
        for (channel_index, channel) in self.solution.channel_mapping.iter().enumerate() {
 
            if let Some(putter) = &channel.putter {
 
                if putter.self_comp_id == peer_comp_id && putter.self_port_id == peer_port_id {
 
                    return Some(channel_index);
 
                }
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    fn channel_is_consistent(channel: &SyncSolutionChannel) -> Option<bool> {
 
        if channel.putter.is_none() || channel.getter.is_none() {
 
            return None;
 
        }
 

	
 
        let putter = channel.putter.as_ref().unwrap();
 
        let getter = channel.getter.as_ref().unwrap();
 
        return Some(
 
            !putter.failed &&
 
            !getter.failed &&
 
            putter.mapping == getter.mapping
 
        );
 
    }
 

	
 
    /// Determines the global solution if all components have contributed their
 
    /// local solutions.
 
    fn update_solution(&mut self) {
 
        if self.matched_channels == self.solution.channel_mapping.len() {
 
            if self.solution.decision != SyncRoundDecision::Failure {
 
                self.solution.decision = SyncRoundDecision::Solution;
 
            }
 
        }
 
    }
 
}
 

	
 
/// Tracking consensus state
 
pub struct Consensus {
 
    // General state of consensus manager
 
    mapping_counter: u32,
 
    mode: Mode,
 
    // State associated with sync round
 
    round_index: u32,
 
    highest_id: CompId,
 
    ports: Vec<PortAnnotation>,
 
    // State associated with arriving at a solution and being a (temporary)
 
    // leader in the consensus round
 
    solution: SolutionCombiner,
 
}
 

	
 
impl Consensus {
 
    pub(crate) fn new() -> Self {
 
        return Self{
 
            round_index: 0,
 
            highest_id: CompId::new_invalid(),
 
            ports: Vec::new(),
 
            mapping_counter: 0,
 
            mode: Mode::NonSync,
 
            solution: SolutionCombiner::new(),
 
        }
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn round_number(&self) -> u32 {
 
        return self.round_index;
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Managing sync state
 
    // -------------------------------------------------------------------------
 

	
 
    /// Notifies the consensus management that the PDL code has reached the
 
    /// start of a sync block.
 
    pub(crate) fn notify_sync_start(&mut self, comp_ctx: &CompCtx) {
 
        debug_assert_eq!(self.mode, Mode::NonSync);
 
        self.highest_id = comp_ctx.id;
 
        self.mapping_counter = 0;
 
        self.mode = Mode::SyncBusy;
 

	
 
        // Make the internally stored port annotation array consistent with the
 
        // ports that the component currently owns. They should match by index
 
        // (i.e. annotation at index `i` corresponds to port `i` in `comp_ctx`).
 
        let mut needs_setting_ports = false;
 
        if comp_ctx.num_ports() != self.ports.len() {
 
            needs_setting_ports = true;
 
        } else {
 
            for (idx, port) in comp_ctx.iter_ports().enumerate() {
 
                let comp_port_id = port.self_id;
 
                let cons_port_id = self.ports[idx].self_port_id;
 
                if comp_port_id != cons_port_id {
 
                    needs_setting_ports = true;
 
                    break;
 
                }
 
            }
 
        }
 

	
 
        if needs_setting_ports {
 
            // Reset all ports
 
            self.ports.clear();
 
            self.ports.reserve(comp_ctx.num_ports());
 
            for port in comp_ctx.iter_ports() {
 
                self.ports.push(PortAnnotation::new(comp_ctx.id, port.self_id, port.kind));
 
            }
 
        } else {
 
            // Make sure that we consider all peers as undiscovered again
 
            for annotation in self.ports.iter_mut() {
 
                annotation.peer_discovered = false;
 
            }
 
        }
 
    }
 

	
 
    /// Notifies the consensus management that the PDL code has reached the end
 
    /// of a sync block. A local solution will be submitted, after which we wait
 
    /// until the participants in the round (hopefully) reach a conclusion.
 
    pub(crate) fn notify_sync_end_success(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx) -> SyncRoundDecision {
 
        debug_assert_eq!(self.mode, Mode::SyncBusy);
 
        self.mode = Mode::SyncAwaitingSolution;
 

	
 
        let local_solution = self.generate_local_solution(comp_ctx, false);
 
        let decision = self.handle_local_solution(sched_ctx, comp_ctx, comp_ctx.id, local_solution, false);
 
        return decision;
 
    }
 

	
 
    /// Notifies the consensus management that the component has encountered a
 
    /// critical error during the synchronous round. Hence we should report that
 
    /// we've failed and wait until all the participants have been notified of
 
    /// the error.
 
    pub(crate) fn notify_sync_end_failure(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx) -> SyncRoundDecision {
 
        debug_assert_eq!(self.mode, Mode::SyncBusy);
 
        // debug_assert_eq!(self.mode, Mode::SyncBusy);
 
        self.mode = Mode::SyncAwaitingSolution;
 

	
 
        let local_solution = self.generate_local_solution(comp_ctx, true);
 
        let decision = self.handle_local_solution(sched_ctx, comp_ctx, comp_ctx.id, local_solution, true);
 
        return decision;
 
    }
 

	
 
    /// Notifies that a decision has been reached. Note that the caller should
 
    /// still take the appropriate actions based on the decision it is supplying
 
    /// to the consensus layer.
 
    pub(crate) fn notify_sync_decision(&mut self, _decision: SyncRoundDecision) {
 
        // Reset everything for the next round
 
        debug_assert_eq!(self.mode, Mode::SyncAwaitingSolution);
 
        self.mode = Mode::NonSync;
 
        self.round_index = self.round_index.wrapping_add(1);
 

	
 
        for port in self.ports.iter_mut() {
 
            port.mapping = None;
 
        }
 

	
 
        self.solution.clear();
 
    }
 

	
 
    pub(crate) fn notify_of_new_port(&mut self, _expected_index: usize, port_handle: LocalPortHandle, comp_ctx: &CompCtx) {
 
        debug_assert_eq!(_expected_index, self.ports.len());
 
        let port_info = comp_ctx.get_port(port_handle);
 
        self.ports.push(PortAnnotation{
 
            self_comp_id: comp_ctx.id,
 
            self_port_id: port_info.self_id,
 
            peer_comp_id: port_info.peer_comp_id,
 
            peer_port_id: port_info.peer_port_id,
 
            peer_discovered: false,
 
            mapping: None,
 
            kind: port_info.kind,
 
        });
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Handling inbound and outbound messages
 
    // -------------------------------------------------------------------------
 

	
 
    /// Prepares a set of values to be sent of a channel.
 
    pub(crate) fn annotate_data_message(&mut self, comp_ctx: &CompCtx, port_info: &Port, content: ValueGroup) -> DataMessage {
 
        debug_assert_eq!(self.mode, Mode::SyncBusy); // can only send between sync start and sync end
 
        debug_assert!(self.ports.iter().any(|v| v.self_port_id == port_info.self_id));
 
        let data_header = self.create_data_header_and_update_mapping(port_info);
 
        let sync_header = self.create_sync_header(comp_ctx);
 

	
 
        return DataMessage{
 
            data_header, sync_header, content,
 
            ports: Vec::new()
 
        };
 
    }
 

	
 
    /// Handles the arrival of a new data message (needs to be called for every
 
    /// new data message, even though it might not end up being received). This
 
    /// is used to determine peers of `get`ter ports.
 
    // TODO: The use of this function is rather ugly. Find a more robust
 
    //  scheme about owners of `get`ter ports not knowing about their peers.
 
    pub(crate) fn handle_incoming_data_message(&mut self, comp_ctx: &CompCtx, message: &DataMessage) {
 
        let target_handle = comp_ctx.get_port_handle(message.data_header.target_port);
 
        let target_index = comp_ctx.get_port_index(target_handle);
 
        let annotation = &mut self.ports[target_index];
 
        debug_assert!(
 
            !annotation.peer_discovered || (
 
                annotation.peer_comp_id == message.sync_header.sending_id &&
 
                annotation.peer_port_id == message.data_header.source_port
 
            )
 
        );
 
        annotation.peer_comp_id = message.sync_header.sending_id;
 
        annotation.peer_port_id = message.data_header.source_port;
 
        annotation.peer_discovered = true;
 
    }
 

	
 
    /// Checks if the data message can be received (due to port annotations), if
 
    /// it can then `true` is returned and the caller is responsible for handing
 
    /// the message of to the PDL code. Otherwise the message cannot be
 
    /// received.
 
    pub(crate) fn try_receive_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: &DataMessage) -> bool {
 
        debug_assert_eq!(self.mode, Mode::SyncBusy);
 
        debug_assert!(self.ports.iter().any(|v| v.self_port_id == message.data_header.target_port));
 

	
 
        // Make sure the expected mapping matches the currently stored mapping
 
        for (peer_port_kind, expected_annotation) in &message.data_header.expected_mapping {
 
            // Determine our annotation, in order to do so we need to find the
 
            // port matching the peer ports
 
            let mut self_annotation = None;
 
            let mut self_annotation_found = false;
 
            match peer_port_kind {
 
                PortAnnotationKind::Putter(peer_port) => {
 
                    for self_port in &self.ports {
 
                        if self_port.kind == PortKind::Getter &&
 
                            self_port.peer_discovered &&
 
                            self_port.peer_comp_id == peer_port.self_comp_id &&
 
                            self_port.peer_port_id == peer_port.self_port_id
 
                        {
 
                            self_annotation = self_port.mapping;
 
                            self_annotation_found = true;
 
                            break;
 
                        }
 
                    }
 
                },
 
                PortAnnotationKind::Getter(peer_port) => {
 
                    if peer_port.peer_comp_id == comp_ctx.id {
 
                        // Peer indicates that we talked to it
 
                        let self_port_handle = comp_ctx.get_port_handle(peer_port.peer_port_id);
 
                        let self_port_index = comp_ctx.get_port_index(self_port_handle);
 
                        self_annotation = self.ports[self_port_index].mapping;
 
                        self_annotation_found = true;
 
                    }
 
                }
 
            }
 

	
 
            if !self_annotation_found {
 
                continue
 
            }
 

	
 
            if self_annotation != *expected_annotation {
 
                return false;
 
            }
 
        }
 

	
 
        // Expected mapping matches current mapping, so we will receive the message
 
        self.set_annotation(message.sync_header.sending_id, &message.data_header);
 

	
 
        // Handle the sync header embedded within the data message
 
        self.handle_sync_header(sched_ctx, comp_ctx, &message.sync_header);
 

	
 
        return true;
 
    }
 

	
 
    /// Receives the sync message and updates the consensus state appropriately.
 
    pub(crate) fn receive_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) -> SyncRoundDecision {
 
        // Whatever happens: handle the sync header (possibly changing the
 
        // currently registered leader)
 
        self.handle_sync_header(sched_ctx, comp_ctx, &message.sync_header);
 

	
 
        match message.content {
 
            SyncMessageContent::NotificationOfLeader => {
 
                return SyncRoundDecision::None;
 
            },
 
            SyncMessageContent::LocalSolution(solution_generator_id, local_solution) => {
 
                return self.handle_local_solution(sched_ctx, comp_ctx, solution_generator_id, local_solution, false);
 
            },
 
            SyncMessageContent::PartialSolution(partial_solution) => {
 
                return self.handle_partial_solution(sched_ctx, comp_ctx, partial_solution);
 
            },
 
            SyncMessageContent::GlobalSolution => {
 
                debug_assert_eq!(self.mode, Mode::SyncAwaitingSolution); // leader can only find global- if we submitted local solution
 
                return SyncRoundDecision::Solution;
 
            },
 
            SyncMessageContent::GlobalFailure => {
 
                debug_assert_eq!(self.mode, Mode::SyncAwaitingSolution);
 
                return SyncRoundDecision::Failure;
 
            }
 
        }
 
    }
 

	
 
    fn handle_sync_header(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, header: &MessageSyncHeader) {
 
        if header.highest_id.0 > self.highest_id.0 {
 
            // Sender knows of someone with a higher ID. So store highest ID,
 
            // notify all peers, and forward local solutions
 
            self.highest_id = header.highest_id;
 
            for peer in comp_ctx.iter_peers() {
 
                if peer.id == header.sending_id {
 
                    continue; // do not send to sender: it has the higher ID
 
                }
 

	
 
                // also: only send if we received a message in this round
 
                let mut performed_communication = false; // TODO: Revise, temporary fix
 
                for port in self.ports.iter() {
 
                    if port.peer_comp_id == peer.id && port.mapping.is_some() {
 
                        performed_communication = true;
 
                        break;
 
                    }
 
                }
 

	
 
                if !performed_communication {
 
                    continue;
 
                }
 

	
 
                let message = SyncMessage{
 
                    sync_header: self.create_sync_header(comp_ctx),
 
                    content: SyncMessageContent::NotificationOfLeader,
 
                };
 
                peer.handle.send_message_logged(sched_ctx, Message::Sync(message), true);
 
            }
 

	
 
            self.forward_partial_solution(sched_ctx, comp_ctx);
 
        } else if header.highest_id.0 < self.highest_id.0 {
 
            // Sender has a lower ID, so notify it of our higher one
 
            let message = SyncMessage{
 
                sync_header: self.create_sync_header(comp_ctx),
 
                content: SyncMessageContent::NotificationOfLeader,
 
            };
 
            let peer_handle = comp_ctx.get_peer_handle(header.sending_id);
 
            let peer_info = comp_ctx.get_peer(peer_handle);
 
            peer_info.handle.send_message_logged(sched_ctx, Message::Sync(message), true);
 
        } // else: exactly equal
 
    }
 

	
 
    fn set_annotation(&mut self, source_comp_id: CompId, data_header: &MessageDataHeader) {
 
        for annotation in self.ports.iter_mut() {
 
            if annotation.self_port_id == data_header.target_port {
 
                // Message should have already passed the `handle_new_data_message` function, so we
 
                // should have already annotated the peer of the port.
 
                debug_assert!(
 
                    annotation.peer_discovered &&
 
                    annotation.peer_comp_id == source_comp_id &&
 
                    annotation.peer_port_id == data_header.source_port
 
                );
 
                annotation.mapping = Some(data_header.new_mapping);
 
            }
 
        }
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Leader-related methods
 
    // -------------------------------------------------------------------------
 

	
 
    fn forward_partial_solution(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        debug_assert_ne!(self.highest_id, comp_ctx.id); // not leader
 

	
 
        // Make sure that we have something to send
 
        if !self.solution.has_contributions() {
 
            return;
 
        }
 

	
 
        // Swap the container with the partial solution and then send it along
 
        let partial_solution = self.solution.take_partial_solution();
 
        self.send_to_leader(sched_ctx, comp_ctx, Message::Sync(SyncMessage{
 
            sync_header: self.create_sync_header(comp_ctx),
 
            content: SyncMessageContent::PartialSolution(partial_solution),
 
        }));
 
    }
 

	
 
    fn handle_local_solution(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, solution_sender_id: CompId, solution: SyncLocalSolution, fail_if_empty: bool) -> SyncRoundDecision {
 
        if self.highest_id == comp_ctx.id {
 
            // We are the leader
 
            self.solution.combine_with_local_solution(solution_sender_id, solution);
 
            let mut round_decision = self.solution.get_decision();
 
            if round_decision != SyncRoundDecision::None {
 
                if fail_if_empty && self.solution.matched_channels == 0 {
 
                    // TODO: Not sure about this, bit of a hack. Situation is that a component
 
                    //  cannot interact with other components, but it is in a sync round, and has
 
                    //  failed that sync round.
 
                    round_decision = SyncRoundDecision::Failure;
 
                }
 
                self.broadcast_decision(sched_ctx, comp_ctx, round_decision);
 
            }
 
            return round_decision;
 
        } else {
 
            // Forward the solution
 
            let message = SyncMessage{
 
                sync_header: self.create_sync_header(comp_ctx),
 
                content: SyncMessageContent::LocalSolution(solution_sender_id, solution),
 
            };
 
            self.send_to_leader(sched_ctx, comp_ctx, Message::Sync(message));
 
            return SyncRoundDecision::None;
 
        }
 
    }
 

	
 
    fn handle_partial_solution(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, solution: SyncPartialSolution) -> SyncRoundDecision {
 
        if self.highest_id == comp_ctx.id {
 
            // We are the leader, combine existing and new solution
 
            self.solution.combine_with_partial_solution(solution);
 
            let round_decision = self.solution.get_decision();
 
            if round_decision != SyncRoundDecision::None {
 
                self.broadcast_decision(sched_ctx, comp_ctx, round_decision);
 
            }
 
            return round_decision;
 
        } else {
 
            // Forward the partial solution
 
            let message = SyncMessage{
 
                sync_header: self.create_sync_header(comp_ctx),
 
                content: SyncMessageContent::PartialSolution(solution),
 
            };
 
            self.send_to_leader(sched_ctx, comp_ctx, Message::Sync(message));
 
            return SyncRoundDecision::None;
 
        }
 
    }
 

	
 
    fn broadcast_decision(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, decision: SyncRoundDecision) {
 
        debug_assert_eq!(self.highest_id, comp_ctx.id);
 

	
 
        let is_success = match decision {
 
            SyncRoundDecision::None => unreachable!(),
 
            SyncRoundDecision::Solution => true,
 
            SyncRoundDecision::Failure => false,
 
        };
 

	
 
        let mut peers = Vec::with_capacity(self.solution.solution.channel_mapping.len()); // TODO: @Performance
 

	
 
        for channel in self.solution.solution.channel_mapping.iter() {
 
            let getter = channel.getter.as_ref().unwrap();
 
            if getter.self_comp_id != comp_ctx.id && !peers.contains(&getter.self_comp_id) {
 
                peers.push(getter.self_comp_id);
 
            }
 
            if getter.peer_comp_id != comp_ctx.id && !peers.contains(&getter.peer_comp_id) {
 
                peers.push(getter.peer_comp_id);
 
            }
 
        }
 

	
 
        for peer in peers {
 
            let mut handle = sched_ctx.runtime.get_component_public(peer);
 
            let message = Message::Sync(SyncMessage{
 
                sync_header: self.create_sync_header(comp_ctx),
 
                content: if is_success { SyncMessageContent::GlobalSolution } else { SyncMessageContent::GlobalFailure },
 
            });
 
            handle.send_message_logged(sched_ctx, message, true);
 
            let _should_remove = handle.decrement_users();
 
            debug_assert!(_should_remove.is_none());
 
        }
 
    }
 

	
 
    fn send_to_leader(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx, message: Message) {
 
        debug_assert_ne!(self.highest_id, comp_ctx.id); // we're not the leader, // TODO: @NoDirectHandle
 
        let mut leader_info = sched_ctx.runtime.get_component_public(self.highest_id);
 
        leader_info.send_message_logged(sched_ctx, message, true);
 
        let should_remove = leader_info.decrement_users();
 
        if let Some(key) = should_remove {
 
            sched_ctx.runtime.destroy_component(key);
 
        }
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Small utilities
 
    // -------------------------------------------------------------------------
 

	
 
    fn generate_local_solution(&self, comp_ctx: &CompCtx, failed: bool) -> SyncLocalSolution {
 
        let mut local_solution = Vec::with_capacity(self.ports.len());
 
        for port in &self.ports {
 
            if let Some(mapping) = port.mapping {
 
                let port_handle = comp_ctx.get_port_handle(port.self_port_id);
 
                let port_info = comp_ctx.get_port(port_handle);
 
                let new_entry = match port_info.kind {
 
                    PortKind::Putter => SyncLocalSolutionEntry::Putter(SyncSolutionPutterPort{
 
                        self_comp_id: comp_ctx.id,
 
                        self_port_id: port_info.self_id,
 
                        mapping,
 
                        failed
 
                    }),
 
                    PortKind::Getter => SyncLocalSolutionEntry::Getter(SyncSolutionGetterPort{
 
                        self_comp_id: comp_ctx.id,
 
                        self_port_id: port_info.self_id,
 
                        peer_comp_id: port.peer_comp_id,
 
                        peer_port_id: port.peer_port_id,
 
                        mapping,
 
                        failed
 
                    })
 
                };
 
                local_solution.push(new_entry);
 
            }
 
        }
 

	
 
        return local_solution;
 
    }
 

	
 
    fn create_data_header_and_update_mapping(&mut self, port_info: &Port) -> MessageDataHeader {
 
        let mut expected_mapping = Vec::with_capacity(self.ports.len());
 
        let mut port_index = usize::MAX;
 
        for (index, port) in self.ports.iter().enumerate() {
 
            if port.self_port_id == port_info.self_id {
 
                port_index = index; // remember for later updating
 
            }
 

	
 
            // Add all of the
 
            let annotation_kind = match port.kind {
 
                PortKind::Putter => {
 
                    PortAnnotationKind::Putter(PortAnnotationPutter{
 
                        self_comp_id: port.self_comp_id,
 
                        self_port_id: port.self_port_id
 
                    })
 
                },
 
                PortKind::Getter => {
 
                    if !port.peer_discovered {
 
                        continue;
 
                    }
 

	
 
                    PortAnnotationKind::Getter(PortAnnotationGetter{
 
                        self_comp_id: port.self_comp_id,
 
                        self_port_id: port.self_port_id,
 
                        peer_comp_id: port.peer_comp_id,
 
                        peer_port_id: port.peer_port_id,
src/runtime2/component/control_layer.rs
Show inline comments
 
use crate::runtime2::runtime::*;
 
use crate::runtime2::communication::*;
 
use crate::runtime2::component::*;
 

	
 
use super::component_context::*;
 

	
 
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
 
pub(crate) struct ControlId(pub(crate) u32);
 

	
 
impl ControlId {
 
    /// Like other invalid IDs, this one doesn't care any significance, but is
 
    /// just set at u32::MAX to hopefully bring out bugs sooner.
 
    pub(crate) fn new_invalid() -> Self {
 
        return ControlId(u32::MAX);
 
    }
 
}
 

	
 
struct ControlEntry {
 
    id: ControlId,
 
    ack_countdown: u32,
 
    content: ControlContent,
 
}
 

	
 
enum ControlContent {
 
    PeerChange(ContentPeerChange),
 
    ScheduleComponent(CompId),
 
    ClosedPort(PortId),
 
    UnblockPutWithPorts
 
}
 

	
 
struct ContentPeerChange {
 
    source_port: PortId,
 
    source_comp: CompId,
 
    old_target_port: PortId,
 
    new_target_port: PortId,
 
    new_target_comp: CompId,
 
    schedule_entry_id: ControlId,
 
}
 

	
 
struct ControlClosedPort {
 
    closed_port: PortId,
 
    exit_entry_id: Option<ControlId>,
 
}
 

	
 
pub(crate) enum AckAction {
 
    None,
 
    SendMessage(CompId, ControlMessage),
 
    ScheduleComponent(CompId),
 
    UnblockPutWithPorts,
 
}
 

	
 
/// Handling/sending control messages.
 
pub(crate) struct ControlLayer {
 
    id_counter: ControlId,
 
    entries: Vec<ControlEntry>,
 
}
 

	
 
impl ControlLayer {
 
    pub(crate) fn should_reroute(&self, message: &mut Message) -> Option<CompId> {
 
        // Safety note: rerouting should occur during the time when we're
 
        // notifying a peer of a new component. During this period that
 
        // component hasn't been executed yet, so cannot have died yet.
 
        // FIX @NoDirectHandle
 
        let target_port = message.target_port();
 
        if target_port.is_none() {
 
            return None;
 
        }
 

	
 
        let target_port = target_port.unwrap();
 
        for entry in &self.entries {
 
            if let ControlContent::PeerChange(entry) = &entry.content {
 
                if entry.old_target_port == target_port {
 
                    message.modify_target_port(entry.new_target_port);
 
                    return Some(entry.new_target_comp);
 
                }
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    /// Handles an acknowledgement. The returned action must be performed by the
 
    /// caller. The optionally returned `ControlId` must be used and passed to
 
    /// `handle_ack` again.
 
    pub(crate) fn handle_ack(&mut self, entry_id: ControlId, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) -> (AckAction, Option<ControlId>) {
 
        let entry_index = self.get_entry_index_by_id(entry_id).unwrap();
 
        let entry = &mut self.entries[entry_index];
 
        debug_assert!(entry.ack_countdown > 0);
 

	
 
        entry.ack_countdown -= 1;
 
        if entry.ack_countdown != 0 {
 
            return (AckAction::None, None);
 
        }
 

	
 
        let entry = self.entries.remove(entry_index);
 

	
 
        // All `Ack`s received, take action based on the kind of entry
 
        match entry.content {
 
            ControlContent::PeerChange(content) => {
 
                // If change of peer is ack'd. Then we are certain we have
 
                // rerouted all of the messages, and the sender's port can now
 
                // be unblocked again.
 
                let target_comp_id = content.source_comp;
 
                let message_to_send = ControlMessage{
 
                    id: ControlId::new_invalid(),
 
                    sender_comp_id: comp_ctx.id,
 
                    target_port_id: Some(content.source_port),
 
                    content: ControlMessageContent::PortPeerChangedUnblock(
 
                        content.new_target_port,
 
                        content.new_target_comp
 
                    )
 
                };
 
                let to_ack = content.schedule_entry_id;
 

	
 
                return (
 
                    AckAction::SendMessage(target_comp_id, message_to_send),
 
                    Some(to_ack)
 
                );
 
            },
 
            ControlContent::ScheduleComponent(to_schedule) => {
 
                // If all change-of-peers are `Ack`d, then we're ready to
 
                // schedule the component!
 
                return (AckAction::ScheduleComponent(to_schedule), None);
 
            },
 
            ControlContent::ClosedPort(closed_port) => {
 
                // If a closed port is Ack'd, then we remove the reference to
 
                // that component.
 
                let port_handle = comp_ctx.get_port_handle(closed_port);
 
                debug_assert!(comp_ctx.get_port(port_handle).state.is_closed());
 
                comp_ctx.change_port_peer(sched_ctx, port_handle, None);
 

	
 
                return (AckAction::None, None);
 
            },
 
            ControlContent::UnblockPutWithPorts => {
 
                return (AckAction::UnblockPutWithPorts, None);
 
            }
 
        }
 
    }
 

	
 
    pub(crate) fn has_acks_remaining(&self) -> bool {
 
        return !self.entries.is_empty();
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Port transfer
 
    // -------------------------------------------------------------------------
 

	
 
    /// Adds an entry that, when completely ack'd, will schedule a component.
 
    pub(crate) fn add_schedule_entry(&mut self, to_schedule_id: CompId) -> ControlId {
 
        let entry_id = self.take_id();
 
        self.entries.push(ControlEntry{
 
            id: entry_id,
 
            ack_countdown: 0, // incremented by calls to `add_reroute_entry`
 
            content: ControlContent::ScheduleComponent(to_schedule_id),
 
        });
 

	
 
        return entry_id;
 
    }
 

	
 
    /// Adds an entry that returns the similarly named Ack action
 
    pub(crate) fn add_unblock_put_with_ports_entry(&mut self) -> ControlId {
 
        let entry_id = self.take_id();
 
        self.entries.push(ControlEntry{
 
            id: entry_id,
 
            ack_countdown: 0, // incremented by calls to `add_reroute_entry`
 
            content: ControlContent::UnblockPutWithPorts,
 
        });
 

	
 
        return entry_id;
 
    }
 

	
 
    /// Adds a rerouting entry (used to ensure all messages will end up at a
 
    /// newly created component). Used when creating a new component.
 
    pub(crate) fn add_reroute_entry(
 
        &mut self, creator_comp_id: CompId,
 
        source_port_id: PortId, source_comp_id: CompId,
 
        old_target_port_id: PortId, new_target_port_id: PortId, new_comp_id: CompId,
 
        schedule_entry_id: ControlId,
 
    ) -> Message {
 
        let entry_id = self.take_id();
 
        self.entries.push(ControlEntry{
 
            id: entry_id,
 
            ack_countdown: 1,
 
            content: ControlContent::PeerChange(ContentPeerChange{
 
                source_port: source_port_id,
 
                source_comp: source_comp_id,
 
                old_target_port: old_target_port_id,
 
                new_target_port: new_target_port_id,
 
                new_target_comp: new_comp_id,
 
                schedule_entry_id,
 
            }),
 
        });
 

	
 
        // increment counter on schedule entry
 
        let entry_index = self.get_entry_index_by_id(schedule_entry_id).unwrap();
 
        self.entries[entry_index].ack_countdown += 1;
 

	
 
        return Message::Control(ControlMessage{
 
            id: entry_id,
 
            sender_comp_id: creator_comp_id,
 
            target_port_id: Some(source_port_id),
 
            content: ControlMessageContent::PortPeerChangedBlock
 
        })
 
    }
 

	
 
    /// Creates a PortPeerChanged message (and increments ack-counter on a
 
    /// pre-created control entry) that is used as a preliminary step before
 
    /// transferring a port over a channel.
 
    pub(crate) fn create_port_transfer_message(
 
        &mut self, associated_control_id: ControlId,
 
        sender_comp_id: CompId, target_port_id: PortId
 
    ) -> Message {
 
        let entry_index = self.get_entry_index_by_id(associated_control_id).unwrap();
 
        self.entries[entry_index].ack_countdown += 1;
 

	
 
        return Message::Control(ControlMessage{
 
            id: associated_control_id,
 
            sender_comp_id,
 
            target_port_id: Some(target_port_id),
 
            content: ControlMessageContent::PortPeerChangedBlock
 
        })
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Blocking, unblocking, and closing ports
 
    // -------------------------------------------------------------------------
 

	
 
    pub(crate) fn has_close_port_entry(&self, port_handle: LocalPortHandle, comp_ctx: &CompCtx) -> Option<ControlId> {
 
        let port = comp_ctx.get_port(port_handle);
 
        let port_id = port.self_id;
 
        for entry in self.entries.iter() {
 
            if let ControlContent::ClosedPort(entry_port_id) = &entry.content {
 
                if *entry_port_id == port_id {
 
                    return Some(entry.id);
 
                }
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    /// Initiates the control message procedures for closing a port. Caller must
 
    /// make sure that the port state has already been set to `Closed`.
 
    pub(crate) fn initiate_port_closing(&mut self, port_handle: LocalPortHandle, exit_inside_sync: bool, comp_ctx: &CompCtx) -> (LocalPeerHandle, ControlMessage) {
 
        let port = comp_ctx.get_port(port_handle);
 
        let peer_port_id = port.peer_port_id;
 
        debug_assert!(port.state.is_closed());
 

	
 
        // Construct the port-closing entry
 
        let entry_id = self.take_id();
 
        self.entries.push(ControlEntry{
 
            id: entry_id,
 
            ack_countdown: 1,
 
            content: ControlContent::ClosedPort(port.self_id),
 
        });
 

	
 
        // Return the messages notifying peer of the closed port
 
        let peer_handle = comp_ctx.get_peer_handle(port.peer_comp_id);
 
        return (
 
            peer_handle,
 
            ControlMessage{
 
                id: entry_id,
 
                sender_comp_id: comp_ctx.id,
 
                target_port_id: Some(peer_port_id),
 
                content: ControlMessageContent::ClosePort(ControlMessageClosePort{
 
                    closed_in_sync_round: exit_inside_sync,
 
                    registered_round: port.last_registered_round,
 
                }),
 
            }
 
        );
 
    }
 

	
 
    /// Generates the control message used to indicate to a peer that a port
 
    /// should be blocked (expects the caller to have set the port's state to
 
    /// blocked).
 
    pub(crate) fn initiate_port_blocking(&mut self, comp_ctx: &CompCtx, port_handle: LocalPortHandle) -> (LocalPeerHandle, ControlMessage) {
 
        let port_info = comp_ctx.get_port(port_handle);
 
        debug_assert_eq!(port_info.kind, PortKind::Getter); // because we're telling the putter to block
 
        debug_assert!(port_info.state.is_set(PortStateFlag::BlockedDueToFullBuffers)); // contract with caller
 

	
 
        let peer_port_id = port_info.peer_port_id;
 
        let peer_comp_id = port_info.peer_comp_id;
 
        let peer_handle = comp_ctx.get_peer_handle(peer_comp_id);
 

	
 
        return (
 
            peer_handle,
 
            ControlMessage{
 
                id: ControlId::new_invalid(),
 
                sender_comp_id: comp_ctx.id,
 
                target_port_id: Some(peer_port_id),
 
                content: ControlMessageContent::BlockPort,
 
            }
 
        );
 
    }
 

	
 
    /// Generates a messages used to indicate to a peer that a port should be
 
    /// unblocked again.
 
    pub(crate) fn cancel_port_blocking(&mut self, comp_ctx: &CompCtx, port_handle: LocalPortHandle) -> (LocalPeerHandle, ControlMessage) {
 
        let port_info = comp_ctx.get_port(port_handle);
 
        debug_assert_eq!(port_info.kind, PortKind::Getter); // because we're initiating the unblocking
 

	
 
        let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 

	
 
        return (
 
            peer_handle,
 
            ControlMessage{
 
                id: ControlId::new_invalid(),
 
                sender_comp_id: comp_ctx.id,
 
                target_port_id: Some(port_info.peer_port_id),
 
                content: ControlMessageContent::UnblockPort,
 
            }
 
        );
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Internal utilities
 
    // -------------------------------------------------------------------------
 

	
 
    fn take_id(&mut self) -> ControlId {
 
        let id = self.id_counter;
 
        self.id_counter.0 = self.id_counter.0.wrapping_add(1);
 
        return id;
 
    }
 

	
 
    fn get_entry_index_by_id(&self, entry_id: ControlId) -> Option<usize> {
 
        for (index, entry) in self.entries.iter().enumerate() {
 
            if entry.id == entry_id {
 
                return Some(index);
 
            }
 
        }
 

	
 
        return None;
 
    }
 
}
 

	
 
impl Default for ControlLayer {
 
    fn default() -> Self {
 
        return ControlLayer{
 
            id_counter: ControlId(0),
 
            entries: Vec::new(),
 
        }
 
    }
 
}
 
\ No newline at end of file
src/runtime2/stdlib/internet.rs
Show inline comments
 
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
 
use std::mem::size_of;
 
use std::io::Error as IoError;
 

	
 
use libc::{
 
    c_int,
 
    sockaddr_in, sockaddr_in6, in_addr, in6_addr,
 
    socket, bind, listen, accept, connect, close,
 
};
 

	
 
use crate::runtime2::poll::{AsFileDescriptor, FileDescriptor};
 

	
 
#[derive(Debug)]
 
pub enum SocketError {
 
    Opening,
 
    Modifying,
 
    Binding,
 
    Listening,
 
    Connecting,
 
    Accepted,
 
    Accepting,
 
}
 

	
 
enum SocketState {
 
    Opened,
 
    Listening,
 
}
 

	
 
const SOCKET_BLOCKING: bool = false;
 

	
 
/// TCP (client) connection
 
pub struct SocketTcpClient {
 
    socket_handle: libc::c_int,
 
    is_blocking: bool,
 
}
 

	
 
impl SocketTcpClient {
 
    pub fn new(ip: IpAddr, port: u16) -> Result<Self, SocketError> {
 

	
 
        let socket_handle = create_and_connect_socket(
 
            libc::SOCK_STREAM, libc::IPPROTO_TCP, ip, port
 
        )?;
 
        if !set_socket_blocking(socket_handle, SOCKET_BLOCKING) {
 
            unsafe{ libc::close(socket_handle); }
 
            return Err(SocketError::Modifying);
 
        }
 

	
 
        return Ok(SocketTcpClient{
 
            socket_handle,
 
            is_blocking: SOCKET_BLOCKING,
 
        })
 
    }
 

	
 
    pub(crate) fn new_from_handle(socket_handle: libc::c_int) -> Result<Self, SocketError> {
 
        if !set_socket_blocking(socket_handle, SOCKET_BLOCKING) {
 
            unsafe{ libc::close(socket_handle); }
 
            return Err(SocketError::Modifying);
 
        }
 

	
 
        return Ok(SocketTcpClient{
 
            socket_handle,
 
            is_blocking: SOCKET_BLOCKING,
 
        })
 
    }
 

	
 
    pub fn send(&self, message: &[u8]) -> Result<usize, IoError> {
 
        let result = unsafe{
 
            let message_pointer = message.as_ptr().cast();
 
            libc::send(self.socket_handle, message_pointer, message.len() as libc::size_t, 0)
 
        };
 
        if result < 0 {
 
            return Err(IoError::last_os_error());
 
        }
 

	
 
        return Ok(result as usize);
 
    }
 

	
 
    /// Receives data from the TCP socket. Returns the number of bytes received.
 
    /// More bytes may be present even thought `used < buffer.len()`.
 
    pub fn receive(&self, buffer: &mut [u8]) -> Result<usize, IoError> {
 
        let result = unsafe {
 
            let message_pointer = buffer.as_mut_ptr().cast();
 
            libc::recv(self.socket_handle, message_pointer, buffer.len(), 0)
 
        };
 
        if result < 0 {
 
            return Err(IoError::last_os_error());
 
        }
 

	
 
        return Ok(result as usize);
 
    }
 
}
 

	
 
impl Drop for SocketTcpClient {
 
    fn drop(&mut self) {
 
        debug_assert!(self.socket_handle >= 0);
 
        unsafe{ close(self.socket_handle) };
 
    }
 
}
 

	
 
impl AsFileDescriptor for SocketTcpClient {
 
    fn as_file_descriptor(&self) -> FileDescriptor {
 
        return self.socket_handle;
 
    }
 
}
 

	
 
/// TCP listener. Yielding new connections
 
pub struct SocketTcpListener {
 
    socket_handle: libc::c_int,
 
    is_blocking: bool,
 
}
 

	
 
impl SocketTcpListener {
 
    pub fn new(ip: IpAddr, port: u16) -> Result<Self, SocketError> {
 
        // Create and bind
 
        let socket_handle = create_and_bind_socket(
 
            libc::SOCK_STREAM, libc::IPPROTO_TCP, ip, port
 
        )?;
 
        if !set_socket_blocking(socket_handle, SOCKET_BLOCKING) {
 
            unsafe{ libc::close(socket_handle); }
 
            return Err(SocketError::Modifying);
 
        }
 
        if !set_socket_reuse_address(socket_handle) {
 
            unsafe{ libc::close(socket_handle); }
 
            return Err(SocketError::Modifying);
 
        }
 

	
 
        // Listen
 
        unsafe {
 
            let result = listen(socket_handle, libc::SOMAXCONN);
 
            if result < 0 {
 
                unsafe{ libc::close(socket_handle); }
 
                return Err(SocketError::Listening);
 
            }
 
        }
 

	
 

	
 
        return Ok(SocketTcpListener{
 
            socket_handle,
 
            is_blocking: SOCKET_BLOCKING,
 
        });
 
    }
 

	
 
    pub fn accept(&self) -> Result<libc::c_int, IoError> {
 
        let (mut address, mut address_size) = create_sockaddr_in_empty();
 
        let address_pointer = &mut address as *mut sockaddr_in;
 
        let socket_handle = unsafe { accept(self.socket_handle, address_pointer.cast(), &mut address_size) };
 
        if socket_handle < 0 {
 
            return Err(IoError::last_os_error());
 
        }
 

	
 
        return Ok(socket_handle);
 
    }
 
}
 

	
 
impl Drop for SocketTcpListener {
 
    fn drop(&mut self) {
 
        debug_assert!(self.socket_handle >= 0);
 
        unsafe{ close(self.socket_handle) };
 
    }
 
}
 

	
 
impl AsFileDescriptor for SocketTcpListener {
 
    fn as_file_descriptor(&self) -> FileDescriptor {
 
        return self.socket_handle;
 
    }
 
}
 

	
 
/// Raw socket receiver. Essentially a listener that accepts a single connection
 
struct SocketRawRx {
 
    listen_handle: c_int,
 
    accepted_handle: c_int,
 
}
 

	
 
impl SocketRawRx {
 
    pub fn new(ip: Option<Ipv4Addr>, port: u16) -> Result<Self, SocketError> {
 
        let ip = ip.unwrap_or(Ipv4Addr::UNSPECIFIED); // unspecified is the same as INADDR_ANY
 
        let address = unsafe{ in_addr{
 
            s_addr: std::mem::transmute(ip.octets()),
 
        }};
 
        let socket_address = sockaddr_in{
 
            sin_family: libc::AF_INET as libc::sa_family_t,
 
            sin_port: htons(port),
 
            sin_addr: address,
 
            sin_zero: [0; 8],
 
        };
 

	
 
        unsafe {
 
            let socket_handle = create_and_bind_socket(libc::SOCK_RAW, 0, IpAddr::V4(ip), port)?;
 

	
 
            let result = listen(socket_handle, 3);
 
            if result < 0 { return Err(SocketError::Listening); }
 

	
 
            return Ok(SocketRawRx{
 
                listen_handle: socket_handle,
 
                accepted_handle: -1,
 
            });
 
        }
 
    }
 

	
 
    // pub fn try_accept(&mut self, timeout_ms: u32) -> Result<(), SocketError> {
 
    //     if self.accepted_handle >= 0 {
 
    //         // Already accepted a connection
 
    //         return Err(SocketError::Accepted);
 
    //     }
 
    //
 
    //     let mut socket_address = sockaddr_in{
 
    //         sin_family: 0,
 
    //         sin_port: 0,
 
    //         sin_addr: in_addr{ s_addr: 0 },
 
    //         sin_zero: [0; 8]
 
    //     };
 
    //     let mut size = size_of::<sockaddr_in>() as u32;
 
    //     unsafe {
 
    //         let result = accept(self.listen_handle, &mut socket_address as *mut _, &mut size as *mut _);
 
    //         if result < 0 {
 
    //             return Err(SocketError::Accepting);
 
    //         }
 
    //     }
 
    //
 
    //     return Ok(());
 
    // }
 
}
 

	
 
impl Drop for SocketRawRx {
 
    fn drop(&mut self) {
 
        if self.accepted_handle >= 0 {
 
            unsafe {
 
                close(self.accepted_handle);
 
            }
 
        }
 

	
 
        if self.listen_handle >= 0 {
 
            unsafe {
 
                close(self.listen_handle);
 
            }
 
        }
 
    }
 
}
 

	
 
// The following is essentially stolen from `mio`'s io_source.rs file.
 
#[cfg(unix)]
 
trait AsRawFileDescriptor {
 
    fn as_raw_file_descriptor(&self) -> c_int;
 
}
 

	
 
impl AsRawFileDescriptor for SocketTcpClient {
 
    fn as_raw_file_descriptor(&self) -> c_int {
 
        return self.socket_handle;
 
    }
 
}
 

	
 
/// Performs the `socket` and `bind` calls.
 
fn create_and_bind_socket(socket_type: libc::c_int, protocol: libc::c_int, ip: IpAddr, port: u16) -> Result<libc::c_int, SocketError> {
 
    let family = socket_family_from_ip(ip);
 

	
 
    unsafe {
 
        let socket_handle = socket(family, socket_type, protocol);
 
        if socket_handle < 0 {
 
            return Err(SocketError::Opening);
 
        }
 

	
 
        let result = match ip {
 
            IpAddr::V4(ip) => {
 
                let (socket_address, address_size) = create_sockaddr_in_v4(ip, port);
 
                let socket_pointer = &socket_address as *const sockaddr_in;
 
                bind(socket_handle, socket_pointer.cast(), address_size)
 
            },
 
            IpAddr::V6(ip) => {
 
                let (socket_address, address_size) = create_sockaddr_in_v6(ip, port);
 
                let socket_pointer= &socket_address as *const sockaddr_in6;
 
                bind(socket_handle, socket_pointer.cast(), address_size)
 
            }
 
        };
 
        if result < 0 {
 
            close(socket_handle);
 
            return Err(SocketError::Binding);
 
        }
 

	
 
        return Ok(socket_handle);
 
    }
 
}
 

	
 
/// Performs the `socket` and `connect` calls
 
fn create_and_connect_socket(socket_type: libc::c_int, protocol: libc::c_int, ip: IpAddr, port: u16) -> Result<libc::c_int, SocketError> {
 
    let family = socket_family_from_ip(ip);
 
    unsafe {
 
        let socket_handle = socket(family, socket_type, protocol);
 
        if socket_handle < 0 {
 
            return Err(SocketError::Opening);
 
        }
 

	
 
        let result = match ip {
 
            IpAddr::V4(ip) => {
 
                let (socket_address, address_size) = create_sockaddr_in_v4(ip, port);
 
                let socket_pointer = &socket_address as *const sockaddr_in;
 
                connect(socket_handle, socket_pointer.cast(), address_size)
 
            },
 
            IpAddr::V6(ip) => {
 
                let (socket_address, address_size) = create_sockaddr_in_v6(ip, port);
 
                let socket_pointer= &socket_address as *const sockaddr_in6;
 
                connect(socket_handle, socket_pointer.cast(), address_size)
 
            }
 
        };
 
        if result < 0 {
 
            close(socket_handle);
 
            return Err(SocketError::Connecting);
 
        }
 

	
 
        return Ok(socket_handle);
 
    }
 
}
 

	
 
#[inline]
 
fn create_sockaddr_in_empty() -> (sockaddr_in, libc::socklen_t) {
 
    let socket_address = sockaddr_in{
 
        sin_family: 0,
 
        sin_port: 0,
 
        sin_addr: in_addr { s_addr: 0 },
 
        sin_zero: [0; 8],
 
    };
 
    let address_size = size_of::<sockaddr_in>();
 

	
 
    return (socket_address, address_size as _);
 
}
 
#[inline]
 
fn create_sockaddr_in_v4(ip: Ipv4Addr, port: u16) -> (sockaddr_in, libc::socklen_t) {
 
    let address = unsafe{
 
        in_addr{
 
            s_addr: std::mem::transmute(ip.octets())
 
        }
 
    };
 

	
 
    let socket_address = sockaddr_in{
 
        sin_family: libc::AF_INET as libc::sa_family_t,
 
        sin_port: htons(port),
 
        sin_addr: address,
 
        sin_zero: [0; 8]
 
    };
 
    let address_size = size_of::<sockaddr_in>();
 

	
 
    return (socket_address, address_size as _);
 
}
 

	
 
#[inline]
 
fn create_sockaddr_in_v6(ip: Ipv6Addr, port: u16) -> (sockaddr_in6, libc::socklen_t) {
 
    // flow label is advised to be, according to RFC6437 a (somewhat
 
    // secure) random number taken from a uniform distribution
 
    let flow_info = rand::random();
 

	
 
    let address = unsafe{
 
        in6_addr{
 
            s6_addr: ip.octets()
 
        }
 
    };
 

	
 
    let socket_address = sockaddr_in6{
 
        sin6_family: libc::AF_INET6 as libc::sa_family_t,
 
        sin6_port: htons(port),
 
        sin6_flowinfo: flow_info,
 
        sin6_addr: address,
 
        sin6_scope_id: 0, // incorrect in case of loopback address
 
    };
 
    let address_size = size_of::<sockaddr_in6>();
 

	
 
    return (socket_address, address_size as _);
 
}
 

	
 
#[inline]
 
fn set_socket_blocking(handle: libc::c_int, blocking: bool) -> bool {
 
    if handle < 0 {
 
        return false;
 
    }
 

	
 
    unsafe{
 
        let mut flags = libc::fcntl(handle, libc::F_GETFL, 0);
 
        if flags < 0 {
 
            return false;
 
        }
 

	
 
        if blocking {
 
            flags &= !libc::O_NONBLOCK;
 
        } else {
 
            flags |= libc::O_NONBLOCK;
 
        }
 

	
 
        let result = libc::fcntl(handle, libc::F_SETFL, flags);
 
        if result < 0 {
 
            return false;
 
        }
 
    }
 

	
 
    return true;
 
}
 

	
 
#[inline]
 
fn set_socket_reuse_address(handle: libc::c_int) -> bool {
 
    if handle < 0 {
 
        return false;
 
    }
 

	
 
    unsafe {
 
        let enable: libc::c_int = 1;
 
        let enable_ptr: *const _ = &enable;
 
        let result = libc::setsockopt(
 
            handle, libc::SOL_SOCKET, libc::SO_REUSEADDR,
 
            enable_ptr.cast(), size_of::<libc::c_int>() as libc::socklen_t
 
        );
 
        if result < 0 {
 
            return false;
 
        }
 

	
 
        let result = libc::setsockopt(
 
            handle, libc::SOL_SOCKET, libc::SO_REUSEPORT,
 
            enable_ptr.cast(), size_of::<libc::c_int>() as libc::socklen_t
 
        );
 
        if result < 0 {
 
            return false;
 
        }
 
    }
 

	
 
    return true;
 
}
 

	
 
#[inline]
 
fn socket_family_from_ip(ip: IpAddr) -> libc::c_int {
 
    return match ip {
 
        IpAddr::V4(_) => libc::AF_INET,
 
        IpAddr::V6(_) => libc::AF_INET6,
 
    };
 
}
 

	
 
#[inline]
 
fn htons(port: u16) -> u16 {
 
    return port.to_be();
 
}
src/runtime2/tests/mod.rs
Show inline comments
 
use crate::protocol::*;
 
use crate::protocol::eval::*;
 
use crate::runtime2::runtime::*;
 
use crate::runtime2::component::{CompCtx, CompPDL};
 

	
 
mod messaging;
 
mod error_handling;
 
mod transfer_ports;
 
mod internet;
 

	
 
const LOG_LEVEL: LogLevel = LogLevel::Debug;
 
const NUM_THREADS: u32 = 1;
 

	
 
pub(crate) fn compile_and_create_component(source: &str, routine_name: &str, args: ValueGroup) {
 
    let protocol = ProtocolDescription::parse(source.as_bytes())
 
        .expect("successful compilation");
 
    let runtime = Runtime::new(NUM_THREADS, LogLevel::None, protocol)
 
    let runtime = Runtime::new(NUM_THREADS, LOG_LEVEL, protocol)
 
        .expect("successful runtime startup");
 
    create_component(&runtime, "", routine_name, args);
 
}
 

	
 
pub(crate) fn create_component(rt: &Runtime, module_name: &str, routine_name: &str, args: ValueGroup) {
 
    let prompt = rt.inner.protocol.new_component(
 
        module_name.as_bytes(), routine_name.as_bytes(), args
 
    ).expect("create prompt");
 
    let reserved = rt.inner.start_create_component();
 
    let ctx = CompCtx::new(&reserved);
 
    let component = Box::new(CompPDL::new(prompt, 0));
 
    let (key, _) = rt.inner.finish_create_component(reserved, component, ctx, false);
 
    rt.inner.enqueue_work(key);
 
}
 

	
 
pub(crate) fn no_args() -> ValueGroup { ValueGroup::new_stack(Vec::new()) }
 

	
 
#[test]
 
fn test_component_creation() {
 
    let pd = ProtocolDescription::parse(b"
 
    comp nothing_at_all() {
 
        s32 a = 5;
 
        auto b = 5 + a;
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(1, LOG_LEVEL, pd).unwrap();
 

	
 
    for _i in 0..20 {
 
        create_component(&rt, "", "nothing_at_all", no_args());
 
    }
 
}
 

	
 
#[test]
 
fn test_simple_select() {
 
    let pd = ProtocolDescription::parse(b"
 
    func infinite_assert<T>(T val, T expected) -> () {
 
        while (val != expected) { print(\"nope!\"); }
 
        return ();
 
    }
 

	
 
    comp receiver(in<u32> in_a, in<u32> in_b, u32 num_sends) {
 
        auto num_from_a = 0;
 
        auto num_from_b = 0;
 
        while (num_from_a + num_from_b < 2 * num_sends) {
 
            sync select {
 
                auto v = get(in_a) -> {
 
                    print(\"got something from A\");
 
                    auto _ = infinite_assert(v, num_from_a);
 
                    num_from_a += 1;
 
                }
 
                auto v = get(in_b) -> {
 
                    print(\"got something from B\");
 
                    auto _ = infinite_assert(v, num_from_b);
 
                    num_from_b += 1;
 
                }
 
            }
 
        }
 
    }
 

	
 
    comp sender(out<u32> tx, u32 num_sends) {
 
        auto index = 0;
 
        while (index < num_sends) {
 
            sync {
 
                put(tx, index);
 
                index += 1;
 
            }
 
        }
 
    }
 

	
 
    comp constructor() {
 
        auto num_sends = 1;
 
        channel tx_a -> rx_a;
 
        channel tx_b -> rx_b;
 
        new sender(tx_a, num_sends);
 
        new receiver(rx_a, rx_b, num_sends);
 
        new sender(tx_b, num_sends);
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(3, LOG_LEVEL, pd).unwrap();
 
    create_component(&rt, "", "constructor", no_args());
 
}
 

	
 
#[test]
 
fn test_unguarded_select() {
 
    let pd = ProtocolDescription::parse(b"
 
    comp constructor_outside_select() {
 
        u32 index = 0;
 
        while (index < 5) {
 
            sync select { auto v = () -> print(\"hello\"); }
 
            index += 1;
 
        }
 
    }
 

	
 
    comp constructor_inside_select() {
 
        u32 index = 0;
 
        while (index < 5) {
 
            sync select { auto v = () -> index += 1; }
 
        }
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(3, LOG_LEVEL, pd).unwrap();
 
    create_component(&rt, "", "constructor_outside_select", no_args());
 
    create_component(&rt, "", "constructor_inside_select", no_args());
 
}
 

	
 
#[test]
 
fn test_empty_select() {
 
    let pd = ProtocolDescription::parse(b"
 
    comp constructor() {
 
        u32 index = 0;
 
        while (index < 5) {
 
            sync select {}
 
            index += 1;
 
        }
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(3, LOG_LEVEL, pd).unwrap();
 
    create_component(&rt, "", "constructor", no_args());
 
}
 

	
 
#[test]
 
fn test_random_u32_temporary_thingo() {
 
    let pd = ProtocolDescription::parse(b"
 
    import std.random::random_u32;
 

	
 
    comp random_taker(in<u32> generator, u32 num_values) {
 
        auto i = 0;
 
        while (i < num_values) {
 
            sync {
 
                auto a = get(generator);
 
            }
 
            i += 1;
 
        }
 
    }
 

	
 
    comp constructor() {
 
        channel tx -> rx;
 
        auto num_values = 25;
 
        new random_u32(tx, 1, 100, num_values);
 
        new random_taker(rx, num_values);
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(1, LOG_LEVEL, pd).unwrap();
 
    create_component(&rt, "", "constructor", no_args());
 
}
 

	
 
#[test]
 
fn test_tcp_socket_http_request() {
 
    let _pd = ProtocolDescription::parse(b"
 
    import std.internet::*;
 

	
 
    comp requester(out<ClientCmd> cmd_tx, in<u8[]> data_rx) {
 
        print(\"*** TCPSocket: Sending request\");
 
        sync {
 
            put(cmd_tx, ClientCmd::Send(b\"GET / HTTP/1.1\\r\\n\\r\\n\"));
 
        }
 

	
 
        print(\"*** TCPSocket: Receiving response\");
 
        auto buffer = {};
 
        auto done_receiving = false;
 
        sync while (!done_receiving) {
 
            put(cmd_tx, ClientCmd::Receive);
 
            auto data = get(data_rx);
 
            buffer @= data;
 

	
 
            // Completely crap detection of end-of-document. But here we go, we
 
            // try to detect the trailing </html>. Proper way would be to parse
 
            // for 'content-length' or 'content-encoding'
 
            s32 index = 0;
 
            s32 partial_length = cast(length(data) - 7);
 
            while (index < partial_length) {
 
                // No string conversion yet, so check byte buffer one byte at
 
                // a time.
 
                auto c1 = data[index];
 
                if (c1 == cast('<')) {
 
                    auto c2 = data[index + 1];
 
                    auto c3 = data[index + 2];
 
                    auto c4 = data[index + 3];
 
                    auto c5 = data[index + 4];
 
                    auto c6 = data[index + 5];
 
                    auto c7 = data[index + 6];
 
                    if ( // i.e. if (data[index..] == '</html>'
 
                        c2 == cast('/') && c3 == cast('h') && c4 == cast('t') &&
 
                        c5 == cast('m') && c6 == cast('l') && c7 == cast('>')
 
                    ) {
 
                        print(\"*** TCPSocket: Detected </html>\");
 
                        put(cmd_tx, ClientCmd::Finish);
 
                        done_receiving = true;
 
                    }
 
                }
 
                index += 1;
 
            }
 
        }
 

	
 
        print(\"*** TCPSocket: Requesting shutdown\");
 
        sync {
 
            put(cmd_tx, ClientCmd::Shutdown);
 
        }
 
    }
 

	
 
    comp main() {
 
        channel cmd_tx -> cmd_rx;
 
        channel data_tx -> data_rx;
 
        new tcp_client({142, 250, 179, 163}, 80, cmd_rx, data_tx); // port 80 of google
 
        new requester(cmd_tx, data_rx);
 
    }
 
    ").expect("compilation");
 

	
 
    // This test is disabled because it performs a HTTP request to google.
 
    // let rt = Runtime::new(1, LOG_LEVEL, _pd).unwrap();
 
    // create_component(&rt, "", "main", no_args());
 
}
 

	
 
#[test]
 
fn test_sending_receiving_union() {
 
    let pd = ProtocolDescription::parse(b"
 
    union Cmd {
 
        Set(u8[]),
 
        Get,
 
        Shutdown,
 
    }
 

	
 
    comp database(in<Cmd> rx, out<u8[]> tx) {
 
        auto stored = {};
 
        auto done = false;
 
        while (!done) {
 
            sync {
 
                auto command = get(rx);
 
                if (let Cmd::Set(bytes) = command) {
 
                    print(\"database: storing value\");
 
                    stored = bytes;
 
                } else if (let Cmd::Get = command) {
 
                    print(\"database: returning value\");
 
                    put(tx, stored);
 
                } else if (let Cmd::Shutdown = command) {
 
                    print(\"database: shutting down\");
 
                    done = true;
 
                } else while (true) print(\"impossible\"); // no other case possible
 
            }
 
        }
 
    }
 

	
 
    comp client(out<Cmd> tx, in<u8[]> rx, u32 num_rounds) {
 
        auto round = 0;
 
        while (round < num_rounds) {
 
            auto set_value = b\"hello there\";
 
            print(\"client: putting a value\");
 
            sync put(tx, Cmd::Set(set_value));
 

	
 
            auto retrieved = {};
 
            print(\"client: retrieving what was sent\");
 
            sync {
 
                put(tx, Cmd::Get);
 
                retrieved = get(rx);
 
            }
 

	
 
            if (set_value != retrieved) while (true) print(\"wrong!\");
 

	
 
            round += 1;
 
        }
 

	
 
        sync put(tx, Cmd::Shutdown);
 
    }
 

	
 
    comp main() {
 
        auto num_rounds = 5;
 
        channel cmd_tx -> cmd_rx;
 
        channel data_tx -> data_rx;
 
        new database(cmd_rx, data_tx);
 
        new client(cmd_tx, data_rx, num_rounds);
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(1, LOG_LEVEL, pd).unwrap();
 
    create_component(&rt, "", "main", no_args());
 
}
 
\ No newline at end of file

Changeset was too big and was cut off... Show full diff anyway

0 comments (0 inline, 0 general)