Changeset - 88611f9fd179
bin-compiler/src/main.rs
Show inline comments
 
@@ -106,6 +106,7 @@ fn main() {
 

	
 
        if let Err(err) = builder.add(input_file.to_string(), file_buffer.clone()) {
 
            println!("FAILED (to tokenize file)\nbecause:\n{}", err);
 
            return;
 
        }
 

	
 
        println!("Success");
docs/runtime/01_runtime.md
Show inline comments
 
@@ -211,11 +211,13 @@ And so the control protocol for transmitting ports proceeds as following:
 

	
 
## Dealing with Crashing Components
 

	
 
### The cases in which peers crash in response
 

	
 
A component may at any point during its execution be triggered to crash. This may be because of something simple like an out-of-bounds array access. But as described above using closed ports may lead to such an event as well. In such a case we not only need to go through the `ClosePort` control protocol, to make sure that we can remove the crashing component's memory from the runtime, but we'll also have to make sure that all of the peers are aware that *their* peer has crashed. Here we'll make a design decision: if a peer component crashes during a synchronous round and there were interactions with that component, then that interacting component should crash as well. The exact reasons will be introduced later, but it comes down to the fact that we need to do something about the fact that the synchronous round will never be able to complete.
 

	
 
We'll talk ourselves through the case of a component crashing before coming up with the control algorithm to deal with components crashing.
 

	
 
We'll first consider that a component may crash inside our outside of a synchronous block. From the point of view of the peer component, we'll have four cases to consider:
 
We'll first consider that a component may crash inside or outside of a synchronous block. From the point of view of the peer component, we'll have four cases to consider:
 

	
 
1. The peer component is not in a synchronous block. 
 
2. The crashing component died before the peer component entered the synchronous block.
 
@@ -224,25 +226,35 @@ We'll first consider that a component may crash inside our outside of a synchron
 

	
 
Before discussing these cases, it is important to remember that the entire runtime has components running in their own thread of execution. We may have that the crashing component is unaware of its peers (due to the fact that peer ports might change ownership at any point in time). We'll discuss the consensus algorithm in more detail later within the documentation. For now it is important to note that the components will discover the synchronous region they are part of while the PDL code is executing. So if a component crashes within a synchronous region before the end of the sync block is reached, it may be possible that it will not discover the full synchronous region it would be part of.
 

	
 
Because the crashing component is potentially unaware of the component IDs it will end up notifying that is has failed, we can not design the crash-handling algorithm in such a way such that the crashing component notifies the peers of when they have to crash. We'll do the opposite: the crashing component simply crashes and somehow attempts to notify the peers. Those peers themselves decide whether they have to crash in response to such a notification.
 
Because the crashing component is potentially unaware of the component IDs it will end up notifying that it has failed, we can not design the crash-handling algorithm in such a way such that the crashing component notifies the peers of when they have to crash. We'll do the opposite: the crashing component simply crashes and somehow attempts to notify the peers. Those peers themselves decide whether they have to crash in response to such a notification.
 

	
 
For this reason, it does not make a lot of sense to deal with component failure through the consensus algorithm. Dealing with the failure through the consensus algorithm only makes sense if we can find the synchronous region that we would have discovered if we were able to fully execute the sync block of each participating component. As explained above: we can't, and so we'll opt to deal with failure on a peer-by-peer basis.
 

	
 
We'll go back to the four cases we've discusses above. We'll change our point of view: we're now considering a component (the "handling component") that has to deal with the failure of a peer (the "crashing component"). We'll introduce a small part of our solution a-priori: like a component shutting down, a failing component will simply end its life by broadcasting `ClosePort` message over all of its owned ports that are not closed (and the failing component will also wait until all of those ports are not blocked).
 
We'll go back to the four cases we've discusses above. We'll change our point of view: we're now considering a component (the "handling component") that has to deal with the failure of a peer (the "crashing component"). We'll introduce a small part of our solution a-priori: like a component shutting down, a failing component will simply end its life by broadcasting `ClosePort` message over all of its owned ports that are not closed (and, like the other control algorithms. the failing component will wait for the port that is shutting down to become unblocked before it will send the `ClosePort` message).
 

	
 
In the first case, we're dealing with a failing component while the handling component is not in a synchronous block. This means that if there was a previous synchronous block, that it has succeeded. We might still have data messages in our inbox that were sent by the failing component. But in this case it is rather easy to deal with this: we mark the ports as closed, and if we end up using them in the next synchronous block, then we will crash ourselves.
 

	
 
In the second case we have that the peer component died before we ourselves have entered the synchronous block. This case is somewhat equivalent to the case we described above. The crashing component cannot have sent the handling component any messages. So we mark the port as closed, potentially failing in the future if they end up being used.
 
In the second case we have that the peer component died before we ourselves have entered the synchronous block. This case is somewhat equivalent to the case we described above. The crashing component cannot have sent the handling component any messages. So we mark the port as closed, potentially failing in the future if they end up being used. However, the handling component itself might've performed `put` operations already. So now that the handling component receives a `ClosePort` message, it realizes that those earlier `put` operations can never be acknowledged. For this reason a component stores when it last used a port in the metadata associated with a port. When, in this second case, a `ClosePort` message comes in while the port has been used already, the handling component should crash as well.
 

	
 
Next up is the third case, where both the crashing component and the handling component were both in the same synchronous round. Like before we mark the port as closed and future use will cause a crash. Like the second case, if the handling component has already used a port (which in this case may also be having received a message from the crashing component), then it should crash as well.
 

	
 
The fourth case is where the failing component crashes *after* the handling component finished its sync round. This is an edge cases dealing with the following situation: both the handling as the crashing component have submitted their local solution to the consensus algorithm (assumed to be running somewhere in a thread of execution different from the two components). The crashing component receives a global solution, finishes the sync round, and then crashes, therefore sending the `ClosePort` message to the handling component. The handling component, due to the asynchronous nature of the runtime, receives the `ClosePort` message before the global solution has a chance to reach the handling component. In this case, however, the handling component should be able to finish the synchronous round, and it shouldn't crash.
 

	
 
### Distinguishing the crashing cases
 

	
 
So far we've pretended like we could already determine the relation between the crashing component's synchronous round and the handling component's synchronous round. But in order to do this we need to add a bit of extra information to the `ClosePort` message.
 

	
 
Next up is the third case, where both the crashing component and the handling component were both in the same synchronous round. Like before we mark the port as closed and future use will cause a crash. The difference is that the handling component may be blocked on attempting to `get` from a port which the crashing component now indicates is closed, perhaps we might have already performed successful `get`/`put` operations. In that case the handling component should crash: the crashing component can never submit its local solution, so the synchronous round can never succeed! And so we need to have metadata stored for the component that tracks if the port was used in the synchronous round. If the component is used in the synchronous round and a `ClosePort` message comes in, then the component should crash as well.
 
The simplest case is to determine if the two components are both in the same synchronous round (case three, as described above). The crashing component annotates the `ClosePort` message with whether it was in a synchronous round or not. Then if both components are in a synchronous round (as checking by the handling component), and the about-to-be-closed port at the handling component was used in that round, or will be used in that round, then the handling component should crash.
 

	
 
The fourth case is where the failing component crashes *after* the handling component finished its sync round. This is an edge cases dealing with the following situation: both the handling as the crashing component have submitted their local solution to the consensus algorithm. The crashing component receives a global solution, finishes the sync round, and then crashes, therefore sending the `ClosePort` message to the handling component. The handling component, due to the asynchronous nature of the runtime, receives the `ClosePort` message before the global solution has a chance to reach the handling component. In this case, however, the handling component should be able to finish the synchronous round, and it shouldn't crash.
 
Equally simple: the handling component can figure out itself if it is in a synchronous round (case one, as described above). If not: then the port is marked closed and future use causes crashes.
 

	
 
Here is where we arrive at the protocol for dealing with component crashes. To let the handling component deal with this last case, we'll let the crashing component send the `ClosePort` messages together with a boolean indicating if it crashed inside, or outside of a synchronous block.
 
The last two cases require a bit more work: how do we distinguish the edge case where the handling component's round will complete in the future, from the case where it should crash. To distinguish the edge case we need the handling component to know if the last interaction the crashing component handled was the one in the handling component's current synchronous round.
 

	
 
If the crashing component sends `ClosePort` together with the indication that it crashed inside of the synchronous block, then we're dealing with case 3 *if* there are messages in the inbox, or if the handling component uses the closed port after receiving the `ClosePort` message. But if the crashing component sends `ClosePort` together with the indication that it crashed outside of a synchronous block, then we check if we have performed any operations on the port in the synchronous round. If we have performed operations *and* have received messages from that component, then apparently the synchronous round will succeed. So we will not immediately crash. Otherwise we will.
 
For this reason we keep track of the synchronous round number. That is to say: there is a counter that increments each time a synchronous round completes for a component. We have a field in the metadata for a port that registers this round number. If a component performs a `put` operation, then it stores its own round number in that port's metadata, and sends this round number along with the message. If a component performs a `get` operation, then it stores the *received* round number in the port's metadata.
 

	
 
In this way we modify the control algorithm for terminating components. Now we're able to deal correctly with crashing components.
 
When a component closes a port, it will also send along the last registered round number in the `ClosePort` message. If the handling component receives a `ClosePort` message, and the last registered round number in the port's metadata matches the round number in the `ClosePort` message, and the crashing component was not in a synchronous round, then the crashing component crashed after the handling component's sync round. Hence: the handling component can complete its sync round.
 

	
 
To conclude: if we receive a `ClosePort` message, then we always mark the port as closed. If the handling and the crashing component were in a synchronous round, and the closed port was used in that synchronous round, then the handling component crashes as well. If the handling component *is* in a synchronous round but the crashing component *is not* in a synchronous round, the port of the handling component is used in the synchronous round and the port's last registered round number does not match the round number in the `ClosePort` message, then the handling component crashes as well.
 

	
 
## Sync Algorithm
 

	
 
@@ -252,4 +264,10 @@ A port is identified by a `(component ID, port ID)` pair, and channel is a pair
 

	
 
This is the trick we will apply in the consensus algorithm. If a channel did not see any messages passing through it, then the components that own those ports will not have to reach consensus because they will not be part of the same synchronous region. However if a message did go through the channel then the components join the same synchronous region, and they'll have to form some sort of consensus on what interaction took place on that channel.
 

	
 
And so the `put`ting component will only submit its own `(component ID, port ID, metadata_for_sync_round)` triplet. The `get`ting port will submit information containing `(self component ID, self port ID, peer component ID, peer port ID, metadata_for_sync_round)`. The consensus algorithm can now figure out which two ports belong to the same channel.
 
\ No newline at end of file
 
And so the `put`ting component will only submit its own `(component ID, port ID, metadata_for_sync_round)` triplet. The `get`ting port will submit information containing `(self component ID, self port ID, peer component ID, peer port ID, metadata_for_sync_round)`. The consensus algorithm can now figure out which two ports belong to the same channel.
 

	
 
## Component Nomenclature
 

	
 
Earlier versions of the Reowolf runtime featured the distinction between primitive and composite components. This was put into the language from a design perspective. Primitive components could do nitty-gritty protocol execution: perform `put`/`get` operations, and entering into sync blocks. Conversely, composite components were tasked with setting up a network of interconnected components: creating channels and handing off the appropriate ports to the instantiated components.
 

	
 
Once the runtime was capable of sending ports over channels, it became apparent that this distinction no longer made sense. Because if only primitive components can send/receive ports, and cannot create new components, then the programmer is limited to using those received ports directly in the primitive's code! And so the split between primitive and composite components was removed: only the concept of a "component" is left.
 
\ No newline at end of file
docs/runtime/04_known_issues.md
Show inline comments
 
@@ -30,6 +30,8 @@ The current implementation of Reowolf has the following known issues:
 

	
 
- The TCP listener component should probably do a `shutdown` before a `close` on the socket handle. It should also set the `SO_REUSEADDR` option.
 

	
 
- The TCP listener and TCP sender components have not been tested extensively in a multi-threaded setup.
 

	
 
- The way in which putting ports are ordered to block if the corresponding getter port's main inbox is full is rather silly. This led to the introduction of the "backup inbox" as it is found in the runtime's code. There is a design decision to make here, but the current implementation is a bit silly. There are two options: (a) have an atomic boolean indicating if the message slot for an inbox is full, or (b) do away with the "main inbox" alltogether, and have an unbounded message queue.
 

	
 
- For practical use in components whose code supports an arbitrary number of peers (i.e. their code contains an array of ports that is used for communication and changes in size during the component's lifetime), the `select` statement somehow needs to support waiting on any one of those ports.
 
@@ -42,4 +44,6 @@ The current implementation of Reowolf has the following known issues:
 

	
 
- The `Ack` messages that are sent in response to `PeerPortChanged_Block` messages should contain the sending components `(component ID, port ID)` pair in case the `PeerPortChanged_Block` message is relayed. When such an `Ack` message is received, the peer of the port must be updated before transferring the port to the new owner.
 

	
 
- The compiler currently accepts a select arm's guard that is formulated as `auto a = get(get(rx))`. This should be disallowed.
 
\ No newline at end of file
 
- The compiler currently accepts a select arm's guard that is formulated as `auto a = get(get(rx))`. This should be disallowed.
 

	
 
- The work queue in the runtime is still a mutex-locked queue. The `QueueMpsc` type should be extended to be a multiple-producer multiple-consumer queue. This type should then replace the mutex-locked work queue.
 
\ No newline at end of file
src/protocol/ast.rs
Show inline comments
 
@@ -1170,30 +1170,35 @@ impl Statement {
 
        }
 
    }
 

	
 
    pub fn span(&self) -> InputSpan {
 
    pub fn maybe_span(&self) -> Option<InputSpan> {
 
        match self {
 
            Statement::Block(v) => v.span,
 
            Statement::Local(v) => v.span(),
 
            Statement::Labeled(v) => v.label.span,
 
            Statement::If(v) => v.span,
 
            Statement::While(v) => v.span,
 
            Statement::Break(v) => v.span,
 
            Statement::Continue(v) => v.span,
 
            Statement::Synchronous(v) => v.span,
 
            Statement::Fork(v) => v.span,
 
            Statement::Select(v) => v.span,
 
            Statement::Return(v) => v.span,
 
            Statement::Goto(v) => v.span,
 
            Statement::New(v) => v.span,
 
            Statement::Expression(v) => v.span,
 
            Statement::Block(v) => Some(v.span),
 
            Statement::Local(v) => Some(v.span()),
 
            Statement::Labeled(v) => Some(v.label.span),
 
            Statement::If(v) => Some(v.span),
 
            Statement::While(v) => Some(v.span),
 
            Statement::Break(v) => Some(v.span),
 
            Statement::Continue(v) => Some(v.span),
 
            Statement::Synchronous(v) => Some(v.span),
 
            Statement::Fork(v) => Some(v.span),
 
            Statement::Select(v) => Some(v.span),
 
            Statement::Return(v) => Some(v.span),
 
            Statement::Goto(v) => Some(v.span),
 
            Statement::New(v) => Some(v.span),
 
            Statement::Expression(v) => Some(v.span),
 
            Statement::EndBlock(_)
 
            | Statement::EndIf(_)
 
            | Statement::EndWhile(_)
 
            | Statement::EndSynchronous(_)
 
            | Statement::EndFork(_)
 
            | Statement::EndSelect(_) => unreachable!(),
 
            | Statement::EndSelect(_) => None,
 
        }
 
    }
 

	
 
    pub fn span(&self) -> InputSpan {
 
        return self.maybe_span().unwrap();
 
    }
 

	
 
    pub fn link_next(&mut self, next: StatementId) {
 
        match self {
 
            Statement::Block(stmt) => stmt.next = next,
src/protocol/eval/error.rs
Show inline comments
 
@@ -10,7 +10,7 @@ use super::executor::*;
 
/// Represents a stack frame recorded in an error
 
#[derive(Debug)]
 
pub struct EvalFrame {
 
    pub line: u32,
 
    pub line: Option<u32>,
 
    pub module_name: String,
 
    pub procedure: String, // function or component
 
    pub is_func: bool,
 
@@ -24,10 +24,15 @@ impl fmt::Display for EvalFrame {
 
            "component"
 
        };
 

	
 
        let line_str = match self.line {
 
            Some(line_number) => line_number.to_string(),
 
            None => String::from("??"),
 
        };
 

	
 
        if self.module_name.is_empty() {
 
            write!(f, "{} {}:{}", func_or_comp, &self.procedure, self.line)
 
            write!(f, "{} {}:{}", func_or_comp, &self.procedure, line_str)
 
        } else {
 
            write!(f, "{} {}:{}:{}", func_or_comp, &self.module_name, &self.procedure, self.line)
 
            write!(f, "{} {}:{}:{}", func_or_comp, &self.module_name, &self.procedure, line_str)
 
        }
 
    }
 
}
 
@@ -50,7 +55,7 @@ impl EvalError {
 
        for frame in prompt.frames.iter() {
 
            let definition = &heap[frame.definition];
 
            let statement = &heap[frame.position];
 
            let statement_span = statement.span();
 
            let statement_span = statement.maybe_span();
 

	
 
            // Lookup module name, if it has one
 
            let module = modules.iter().find(|m| m.root_id == definition.defined_in).unwrap();
 
@@ -62,7 +67,7 @@ impl EvalError {
 

	
 
            last_module_source = &module.source;
 
            frames.push(EvalFrame{
 
                line: statement_span.begin.line,
 
                line: statement_span.map(|v| v.begin.line),
 
                module_name,
 
                procedure: definition.identifier.value.as_str().to_string(),
 
                is_func: definition.kind == ProcedureKind::Function,
src/protocol/parser/pass_validation_linking.rs
Show inline comments
 
@@ -50,6 +50,7 @@ use super::visitor::{
 
};
 
use crate::protocol::parser::ModuleCompilationPhase;
 

	
 
#[derive(Debug)]
 
struct ControlFlowStatement {
 
    in_sync: SynchronousStatementId,
 
    in_while: WhileStatementId,
 
@@ -324,11 +325,12 @@ impl Visitor for PassValidationLinking {
 
    }
 

	
 
    fn visit_while_stmt(&mut self, ctx: &mut Ctx, id: WhileStatementId) -> VisitorResult {
 
        let stmt = &ctx.heap[id];
 
        let stmt = &mut ctx.heap[id];
 
        let end_while_id = stmt.end_while;
 
        let test_expr_id = stmt.test;
 
        let body_stmt_id = stmt.body;
 
        let scope_id = stmt.scope;
 
        stmt.in_sync = self.in_sync;
 

	
 
        let old_while = self.in_while;
 
        self.in_while = id;
src/runtime2/communication.rs
Show inline comments
 
@@ -177,6 +177,7 @@ pub enum ControlMessageContent {
 
#[derive(Copy, Clone, Debug)]
 
pub struct ControlMessageClosePort {
 
    pub closed_in_sync_round: bool, // needed to ensure correct handling of errors
 
    pub registered_round: Option<u32>,
 
}
 

	
 
// -----------------------------------------------------------------------------
src/runtime2/component/component.rs
Show inline comments
 
@@ -278,6 +278,7 @@ pub(crate) fn default_send_data_message(
 
    let port_handle = comp_ctx.get_port_handle(transmitting_port_id);
 
    let port_info = comp_ctx.get_port_mut(port_handle);
 
    port_info.last_instruction = port_instruction;
 
    port_info.last_registered_round = Some(consensus.round_number());
 

	
 
    let port_info = comp_ctx.get_port(port_handle);
 
    debug_assert_eq!(port_info.kind, PortKind::Putter);
 
@@ -334,7 +335,6 @@ pub(crate) fn default_handle_incoming_data_message(
 
) -> IncomingData {
 
    let port_handle = comp_ctx.get_port_handle(incoming_message.data_header.target_port);
 
    let port_index = comp_ctx.get_port_index(port_handle);
 
    comp_ctx.get_port_mut(port_handle).received_message_for_sync = true;
 
    let port_value_slot = &mut inbox_main[port_index];
 
    let target_port_id = incoming_message.data_header.target_port;
 

	
 
@@ -452,15 +452,6 @@ pub(crate) fn default_handle_received_data_message(
 
    // Then notify the peers that they can continue sending to this port, but
 
    // now at a new address.
 
    for received_port in &mut message.ports {
 
        // Transfer messages to main/backup inbox
 
        let _new_inbox_index = inbox_main.len();
 
        if !received_port.messages.is_empty() {
 
            inbox_main.push(Some(received_port.messages.remove(0)));
 
            inbox_backup.extend(received_port.messages.drain(..));
 
        } else {
 
            inbox_main.push(None);
 
        }
 

	
 
        // Create a new port locally
 
        let mut new_port_state = received_port.state;
 
        new_port_state.set(PortStateFlag::Received);
 
@@ -468,10 +459,25 @@ pub(crate) fn default_handle_received_data_message(
 
            received_port.peer_comp, received_port.peer_port,
 
            received_port.kind, new_port_state
 
        );
 
        debug_assert_eq!(_new_inbox_index, comp_ctx.get_port_index(new_port_handle));
 
        comp_ctx.change_port_peer(sched_ctx, new_port_handle, Some(received_port.peer_comp));
 
        let new_port = comp_ctx.get_port(new_port_handle);
 

	
 
        // Transfer messages to main/backup inbox. Make sure to modify the
 
        // target port to our own
 
        for message in received_port.messages.iter_mut() {
 
            message.data_header.target_port = new_port.self_id;
 
        }
 

	
 
        let _new_inbox_index = inbox_main.len();
 
        if !received_port.messages.is_empty() {
 
            inbox_main.push(Some(received_port.messages.remove(0)));
 
            inbox_backup.extend(received_port.messages.drain(..));
 
        } else {
 
            inbox_main.push(None);
 
        }
 

	
 
        debug_assert_eq!(_new_inbox_index, comp_ctx.get_port_index(new_port_handle));
 

	
 
        // Add the port tho the consensus
 
        consensus.notify_of_new_port(_new_inbox_index, new_port_handle, comp_ctx);
 

	
 
@@ -505,9 +511,10 @@ pub(crate) fn default_handle_received_data_message(
 
    }
 

	
 
    // Modify last-known location where port instruction was retrieved
 
    let port_info = comp_ctx.get_port(port_handle);
 
    let port_info = comp_ctx.get_port_mut(port_handle);
 
    debug_assert_ne!(port_info.last_instruction, PortInstruction::None); // set by caller
 
    debug_assert!(port_info.state.is_open()); // checked by caller
 
    port_info.last_registered_round = Some(message.sync_header.sync_round);
 

	
 
    // Check if there are any more messages in the backup buffer
 
    for message_index in 0..inbox_backup.len() {
 
@@ -585,8 +592,8 @@ pub(crate) fn default_handle_control_message(
 
            } else {
 
                // Respond to the message
 
                let port_info = comp_ctx.get_port(port_handle);
 
                let last_registered_round = port_info.last_registered_round;
 
                let last_instruction = port_info.last_instruction;
 
                let port_has_had_message = port_info.received_message_for_sync;
 
                default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx);
 
                comp_ctx.change_port_peer(sched_ctx, port_handle, None);
 

	
 
@@ -601,10 +608,11 @@ pub(crate) fn default_handle_control_message(
 
                let port_was_used = last_instruction != PortInstruction::None;
 

	
 
                if exec_state.mode.is_in_sync_block() {
 
                    let closed_during_sync_round = content.closed_in_sync_round && port_was_used;
 
                    let closed_before_sync_round = !content.closed_in_sync_round && !port_has_had_message && port_was_used;
 
                    let round_has_succeeded = !content.closed_in_sync_round && last_registered_round == content.registered_round;
 
                    let closed_during_sync_round = content.closed_in_sync_round;
 
                    let closed_before_sync_round = !closed_during_sync_round && !round_has_succeeded;
 

	
 
                    if closed_during_sync_round || closed_before_sync_round {
 
                    if (closed_during_sync_round || closed_before_sync_round) && port_was_used {
 
                        return Err((
 
                            last_instruction,
 
                            format!("Peer component (id:{}) shut down, so communication cannot (have) succeed(ed)", peer_comp_id.0)
 
@@ -682,7 +690,6 @@ pub(crate) fn default_handle_sync_start(
 
        if let Some(message) = message {
 
            consensus.handle_incoming_data_message(comp_ctx, message);
 
            let port_info = comp_ctx.get_port_by_index_mut(port_index);
 
            port_info.received_message_for_sync = true;
 
        }
 
    }
 

	
src/runtime2/component/component_context.rs
Show inline comments
 
@@ -129,8 +129,8 @@ pub struct Port {
 
    pub kind: PortKind,
 
    pub state: PortState,
 
    // State tracking for error detection and error handling
 
    pub last_registered_round: Option<u32>,
 
    pub last_instruction: PortInstruction, // used during sync round to detect port-closed-during-sync errors
 
    pub received_message_for_sync: bool, // used during sync round to detect port-closed-before-sync errors
 
    pub close_at_sync_end: bool, // set during sync round when receiving a port-closed-after-sync message
 
    pub(crate) associated_with_peer: bool,
 
}
 
@@ -181,9 +181,9 @@ impl CompCtx {
 
            kind: PortKind::Putter,
 
            state: PortState::new(),
 
            peer_comp_id: self.id,
 
            last_registered_round: None,
 
            last_instruction: PortInstruction::None,
 
            close_at_sync_end: false,
 
            received_message_for_sync: false,
 
            associated_with_peer: false,
 
        });
 
        self.ports.push(Port{
 
@@ -192,9 +192,9 @@ impl CompCtx {
 
            kind: PortKind::Getter,
 
            state: PortState::new(),
 
            peer_comp_id: self.id,
 
            last_registered_round: None,
 
            last_instruction: PortInstruction::None,
 
            close_at_sync_end: false,
 
            received_message_for_sync: false,
 
            associated_with_peer: false,
 
        });
 

	
 
@@ -206,9 +206,9 @@ impl CompCtx {
 
        let self_id = PortId(self.take_port_id());
 
        self.ports.push(Port{
 
            self_id, peer_comp_id, peer_port_id, kind, state,
 
            last_registered_round: None,
 
            last_instruction: PortInstruction::None,
 
            close_at_sync_end: false,
 
            received_message_for_sync: false,
 
            associated_with_peer: false,
 
        });
 
        return LocalPortHandle(self_id);
src/runtime2/component/consensus.rs
Show inline comments
 
@@ -270,6 +270,11 @@ impl Consensus {
 
        }
 
    }
 

	
 
    #[inline]
 
    pub(crate) fn round_number(&self) -> u32 {
 
        return self.round_index;
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Managing sync state
 
    // -------------------------------------------------------------------------
 
@@ -331,7 +336,7 @@ impl Consensus {
 
    /// we've failed and wait until all the participants have been notified of
 
    /// the error.
 
    pub(crate) fn notify_sync_end_failure(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &CompCtx) -> SyncRoundDecision {
 
        debug_assert_eq!(self.mode, Mode::SyncBusy);
 
        // debug_assert_eq!(self.mode, Mode::SyncBusy);
 
        self.mode = Mode::SyncAwaitingSolution;
 

	
 
        let local_solution = self.generate_local_solution(comp_ctx, true);
src/runtime2/component/control_layer.rs
Show inline comments
 
@@ -264,6 +264,7 @@ impl ControlLayer {
 
                target_port_id: Some(peer_port_id),
 
                content: ControlMessageContent::ClosePort(ControlMessageClosePort{
 
                    closed_in_sync_round: exit_inside_sync,
 
                    registered_round: port.last_registered_round,
 
                }),
 
            }
 
        );
src/runtime2/stdlib/internet.rs
Show inline comments
 
@@ -119,6 +119,10 @@ impl SocketTcpListener {
 
            unsafe{ libc::close(socket_handle); }
 
            return Err(SocketError::Modifying);
 
        }
 
        if !set_socket_reuse_address(socket_handle) {
 
            unsafe{ libc::close(socket_handle); }
 
            return Err(SocketError::Modifying);
 
        }
 

	
 
        // Listen
 
        unsafe {
 
@@ -388,6 +392,35 @@ fn set_socket_blocking(handle: libc::c_int, blocking: bool) -> bool {
 
    return true;
 
}
 

	
 
#[inline]
 
fn set_socket_reuse_address(handle: libc::c_int) -> bool {
 
    if handle < 0 {
 
        return false;
 
    }
 

	
 
    unsafe {
 
        let enable: libc::c_int = 1;
 
        let enable_ptr: *const _ = &enable;
 
        let result = libc::setsockopt(
 
            handle, libc::SOL_SOCKET, libc::SO_REUSEADDR,
 
            enable_ptr.cast(), size_of::<libc::c_int>() as libc::socklen_t
 
        );
 
        if result < 0 {
 
            return false;
 
        }
 

	
 
        let result = libc::setsockopt(
 
            handle, libc::SOL_SOCKET, libc::SO_REUSEPORT,
 
            enable_ptr.cast(), size_of::<libc::c_int>() as libc::socklen_t
 
        );
 
        if result < 0 {
 
            return false;
 
        }
 
    }
 

	
 
    return true;
 
}
 

	
 
#[inline]
 
fn socket_family_from_ip(ip: IpAddr) -> libc::c_int {
 
    return match ip {
src/runtime2/tests/mod.rs
Show inline comments
 
@@ -14,7 +14,7 @@ const NUM_THREADS: u32 = 1;
 
pub(crate) fn compile_and_create_component(source: &str, routine_name: &str, args: ValueGroup) {
 
    let protocol = ProtocolDescription::parse(source.as_bytes())
 
        .expect("successful compilation");
 
    let runtime = Runtime::new(NUM_THREADS, LogLevel::None, protocol)
 
    let runtime = Runtime::new(NUM_THREADS, LOG_LEVEL, protocol)
 
        .expect("successful runtime startup");
 
    create_component(&runtime, "", routine_name, args);
 
}
testdata/examples/01_reworked_consensus_select_01.pdl
Show inline comments
 
new file 100644
 
// Previously the following example caused the inadvertent synchronization of
 
// all participating components
 

	
 
func lots_of_unneccesary_work(u64 min_val, u64 max_val) -> u64 {
 
    u64 sum = 0;
 
    u64 index = min_val;
 
    while (index <= max_val) {
 
        sum += index;
 
        index += 1;
 
    }
 

	
 
    return sum;
 
}
 

	
 
comp data_producer(out<u64> tx, u64 min_val, u64 max_val, string send_text) {
 
    while (true) {
 
        sync {
 
            auto value = lots_of_unneccesary_work(min_val, max_val);
 
            print(send_text);
 
            put(tx, value);
 
        }
 
    }
 
}
 

	
 
comp data_receiver_v1(in<u64> rx_a, in<u64> rx_b, in<u64> rx_c, u32 num_rounds) {
 
    u32 counter = 0;
 
    auto rxs = { rx_a, rx_b, rx_c };
 

	
 
    while (counter < num_rounds) {
 
        auto num_peers = length(rxs);
 
        auto peer_index = 0;
 
        while (peer_index < num_peers) {
 
            sync {
 
                auto result = get(rxs[peer_index]);
 
                print("received message (V1)");
 
                peer_index += 1;
 
            }
 
        }
 
        counter += 1;
 
    }
 
}
 

	
 
// The reason was that a synchronous interaction checked *all* ports for a valid
 
// interaction. So for the `round_robin_receiver` we have that it communicates
 
// with one peer per round, but it still requires the other peers to agree that
 
// they didn't send anything at all! Note that this already implies that all
 
// running components need to synchronize. We could fix this by writing:
 

	
 
comp data_receiver_v2(in<u64> rx_a, in<u64> rx_b, in<u64> rx_c, u32 num_rounds) {
 
    u32 counter = 0;
 
    auto rxs = { rx_a, rx_b, rx_c };
 

	
 
    while (counter < num_rounds) {
 
        auto num_peers = length(rxs);
 
        auto peer_index = 0;
 
        sync {
 
            while (peer_index < num_peers) {
 
                auto result = get(rxs[peer_index]);
 
                print("received message (V2)");
 
                peer_index += 1;
 
            }
 
        }
 
        counter += 1;
 
    }
 
}
 

	
 
// But this is not the intended behaviour. We want the producer components to
 
// be able to run independently of one another. This requires a change in the
 
// semantics of the language! We no longer have that each peer is automatically
 
// dragged into the synchronous round. Instead, once the first message of the
 
// peer is received through a `get` call, will we join each other's synchronous
 
// rounds.
 
//
 
// With such a change to the runtime, we now have that the first version (
 
// written above) produces the intended behaviour: the consumer accepts one
 
// value and synchronizes with its sender. Then goes to the next round and 4
 
// synchronizes with the next sender.
 
//
 
// But what we would really like to do is to synchronize with any of the peers
 
// that happens to have its work ready for consumption. And so the 'select'
 
// statement is introduced into the language. This statement can be used to
 
// describe a set of possible behaviours we could execute. Each behaviour will
 
// have an associated set of ports. When those associated set of ports have a
 
// message ready to be read, then the corresponding behaviour will execute. So
 
// to complete the example above, we have:
 

	
 
comp data_receiver_v3(in<u64> rx_a, in<u64> rx_b, in<u64> rx_c, u32 num_rounds) {
 
    u32 counter = 0;
 
    auto rxs = { rx_a, rx_b, rx_c };
 

	
 
    u32 received_from_a = 0;
 
    u32 received_from_b_or_c = 0;
 
    u32 received_from_a_or_c = 0;
 
    u64 sum_received_from_c = 0;
 

	
 
    while (counter < num_rounds*3) {
 
        sync {
 
            select {
 
                auto value = get(rx_a) -> {
 
                    received_from_a += 1;
 
                    received_from_a_or_c += 1;
 
                }
 
                auto value = get(rx_b) -> {
 
                    received_from_b_or_c += 1;
 
                }
 
                auto value = get(rx_c) -> {
 
                    received_from_a_or_c += 1;
 
                    received_from_b_or_c += 1;
 
                    sum_received_from_c += value;
 
                }
 
            }
 
            print("received message (V3)");
 
        }
 
        counter += 1;
 
    }
 
}
 

	
 
comp main() {
 
    u32 version = 3;
 
    u32 num_rounds = 3;
 

	
 
    channel tx_a -> rx_a;
 
    channel tx_b -> rx_b;
 
    channel tx_c -> rx_c;
 
    new data_producer(tx_a, 0xBEEF, 0xCAFE, "sent from A");
 
    new data_producer(tx_b, 0xBEEF, 0xCAFE, "sent from B");
 
    new data_producer(tx_c, 0xBEEF, 0xCAFE, "sent from C");
 

	
 
    if (version == 1) {
 
        new data_receiver_v1(rx_a, rx_b, rx_c, num_rounds);
 
    } else if (version == 2) {
 
        new data_receiver_v2(rx_a, rx_b, rx_c, num_rounds);
 
    } else if (version == 3) {
 
        new data_receiver_v3(rx_a, rx_b, rx_c, num_rounds);
 
    } else {
 
        print("ERROR: invalid version in source");
 
    }
 
}
 
\ No newline at end of file
testdata/examples/02_error_handling.pdl
Show inline comments
 
new file 100644
 
// Although in an unstable state, there is an initial implementation for error
 
// handling. Roughly speaking: if a component has failed then it cannot complete
 
// any current or future synchronous rounds anymore. Hence, apart from some edge
 
// cases, any received message by a peer should cause a failure at that peer as
 
// well. We may have a look at the various places where a component with respect
 
// to a peer that is receiving its messages
 

	
 
enum ErrorLocation {
 
    BeforeSync,
 
    DuringSyncBeforeFirstInteraction,
 
    DuringSyncBeforeSecondInteraction,
 
    DuringSyncAfterInteractions,
 
    AfterSync,
 
}
 

	
 
func error_location_to_string(ErrorLocation loc) -> string {
 
    if (let ErrorLocation::BeforeSync = loc) {
 
        return "before sync";
 
    } else if (let ErrorLocation::DuringSyncBeforeFirstInteraction = loc) {
 
        return "during sync before first interaction";
 
    } else if (let ErrorLocation::DuringSyncBeforeSecondInteraction = loc) {
 
        return "during sync before second interaction";
 
    } else if (let ErrorLocation::DuringSyncAfterInteractions = loc) {
 
        return "during sync after interactions";
 
    } else { return "after sync"; }
 
}
 

	
 
func crash() -> u8 {
 
    return {}[0]; // access index 1 of an empty array
 
}
 

	
 
comp sender_and_crasher(out<u32> value, ErrorLocation loc) {
 
    print("sender: will crash " @ error_location_to_string(loc));
 
    if (loc == ErrorLocation::BeforeSync) { crash(); }
 
    sync {
 
        if (loc == ErrorLocation::DuringSyncBeforeFirstInteraction) { crash(); }
 
        print("sender: sending first value");
 
        put(value, 0);
 
        if (loc == ErrorLocation::DuringSyncBeforeSecondInteraction) { crash(); }
 
        print("sender: sending second value");
 
        put(value, 1);
 
        if (loc == ErrorLocation::DuringSyncAfterInteractions) { crash(); }
 
    }
 
    if (loc == ErrorLocation::AfterSync) { crash(); }
 
}
 

	
 
comp receiver(in<u32> value) {
 
    sync {
 
        auto a = get(value);
 
        auto b = get(value);
 
    }
 
}
 

	
 
// Note that when we run the example with the error location before sync, or
 
// during sync, that the receiver always crashes. However the location where it
 
// will crash is somewhat random! Due to the asynchronous nature of the runtime
 
// a sender of messages will always just `put` the value onto the port and
 
// continue execution. So even though the sender component might already be done
 
// with its sync round, the receiver officially still has to receive its first
 
// message. In any case, a neat error message should be displayed in the
 
// console.
 
//
 
// Note especially, given the asynchronous nature of the runtime, that the
 
// receiver should figure out when the peer component has crashed, but it can
 
// still finish the current synchronous round. This might happen if the peer
 
// component crashes *just* after the synchronous round. There may be a case
 
// where the peer receives the information that the peer crashed *before* it
 
// receives the information that the synchronous round has succeeded.
 

	
 
comp main() {
 
    channel tx -> rx;
 

	
 
    new sender_and_crasher(tx, ErrorLocation::AfterSync);
 
    new receiver(rx);
 
}
 
\ No newline at end of file
testdata/examples/03_transmitting_ports_01.pdl
Show inline comments
 
new file 100644
 
// Since this release transmitting ports is possible. This means that we can
 
// send ports through ports. In fact, we can send ports that may send ports that
 
// may send ports. But don't be fooled by the apparent complexity. The inner
 
// type `T` of a port like `in<T>` simply states that that is the message type.
 
// Should the type `T` contain one or more ports, then we kick off a bit of code
 
// that takes care of the transfer of the port. Should the port inside of `T`
 
// itself, after being received, send a port, then we simply kick off that same
 
// procedure again.
 
//
 
// In the simplest case, we have someone transmitting the receiving end of a
 
// channel to another component, which then uses that receiving end to receive a
 
// value.
 

	
 
comp port_sender(out<in<u32>> tx, in<u32> to_transmit) {
 
    sync put(tx, to_transmit);
 
}
 

	
 
comp port_receiver_and_value_getter(in<in<u32>> rx, u32 expected_value) {
 
    u32 got_value = 0;
 
    sync {
 
        auto port = get(rx);
 
        got_value = get(port);
 
    }
 
    if (expected_value == got_value) {
 
        print("got the expected value :)");
 
    } else {
 
        print("got a different value :(");
 
    }
 
}
 

	
 
comp value_sender(out<u32> tx, u32 to_send) {
 
    sync put(tx, to_send);
 
}
 

	
 
comp main() {
 
    u32 value = 1337_2392;
 

	
 
    channel port_tx -> port_rx;
 
    channel value_tx -> value_rx;
 
    new port_sender(port_tx, value_rx);
 
    new port_receiver_and_value_getter(port_rx, value);
 
    new value_sender(value_tx, value);
 
}
 
\ No newline at end of file
testdata/examples/03_transmitting_ports_02.pdl
Show inline comments
 
new file 100644
 
// Ofcourse we may do something a little more complicated than this. Suppose
 
// that we don't just send one port, but send a series of ports. i.e. we use
 
// an `Option` union type, to ...
 

	
 
union Option<T> {
 
    Some(T),
 
    None,
 
}
 

	
 
// ... turn an array of ports that we're going to transmit into a series of
 
// messages containing ports, each sent to a specific component.
 

	
 
comp port_sender(out<Option<in<u32>>>[] txs, in<u32>[] to_transmit) {
 
    auto num_peers = length(txs);
 
    auto num_ports = length(to_transmit);
 

	
 
    auto num_per_peer = num_ports / num_peers;
 
    auto num_remaining = num_ports - (num_per_peer * num_peers);
 

	
 
    auto peer_index = 0;
 
    auto port_index = 0;
 
    while (peer_index < num_peers) {
 
        auto peer_port = txs[peer_index];
 
        auto counter = 0;
 

	
 
        // Distribute part of the ports to one of the peers.
 
        sync {
 
            // Sending the main batch of ports for the peer
 
            while (counter < num_per_peer) {
 
                put(peer_port, Option::Some(to_transmit[port_index]));
 
                port_index += 1;
 
                counter += 1;
 
            }
 

	
 
            // Sending the remainder of ports, one per peer until they're gone
 
            if (num_remaining > 0) {
 
                put(peer_port, Option::Some(to_transmit[port_index]));
 
                port_index += 1;
 
                num_remaining -= 1;
 
            }
 

	
 
            // Finish the custom protocol by sending nothing, which indicates to
 
            // the peer that it has received all the ports we have to hand out.
 
            put(peer_port, Option::None);
 
        }
 

	
 
        peer_index += 1;
 
    }
 
}
 

	
 
// And here we have the component which will receive on that port. We can design
 
// the synchronous regions any we want. In this case when we receive ports we
 
// just synchronize `port_sender`, but the moment we receive messages we
 
// synchronize with everyone.
 

	
 
comp port_receiver(in<Option<in<u32>>> port_rxs, out<u32> sum_tx) {
 
    // Receive all ports
 
    auto value_rxs = {};
 

	
 
    sync {
 
        while (true) {
 
            auto maybe_port = get(port_rxs);
 
            if (let Option::Some(certainly_a_port) = maybe_port) {
 
                value_rxs @= { certainly_a_port };
 
            } else {
 
                break;
 
            }
 
        }
 
    }
 

	
 
    // Receive all values
 
    auto received_sum = 0;
 

	
 
    sync {
 
        auto port_index = 0;
 
        auto num_ports = length(value_rxs);
 
        while (port_index < num_ports) {
 
            auto value = get(value_rxs[port_index]);
 
            received_sum += value;
 
            port_index += 1;
 
        }
 
    }
 

	
 
    // And send the sum
 
    sync put(sum_tx, received_sum);
 
}
 

	
 
// Now we need something to send the values, we'll make something incredibly
 
// simple. Namely:
 

	
 
comp value_sender(out<u32> tx, u32 value_to_send) {
 
    sync put(tx, value_to_send);
 
}
 

	
 
comp sum_collector(in<u32>[] partial_sum_rx, out<u32> total_sum_tx) {
 
    auto sum = 0;
 
    auto index = 0;
 
    while (index < length(partial_sum_rx)) {
 
        sync sum += get(partial_sum_rx[index]);
 
        index += 1;
 
    }
 

	
 
    sync put(total_sum_tx, sum);
 
}
 

	
 
// And we need the component to set this entire system of components up. So we
 
// write the following entry point.
 

	
 
comp main() {
 
    auto num_value_ports = 32;
 
    auto num_receivers = 3;
 

	
 
    // Construct the senders of values
 
    auto value_port_index = 1;
 
    auto value_rx_ports = {};
 
    while (value_port_index <= num_value_ports) {
 
        channel value_tx -> value_rx;
 
        new value_sender(value_tx, value_port_index);
 
        value_rx_ports @= { value_rx };
 
        value_port_index += 1;
 
    }
 

	
 
    // Construct the components that will receive groups of value-receiving
 
    // ports
 
    auto receiver_index = 0;
 
    auto sum_combine_rx_ports = {};
 
    auto port_tx_ports = {};
 

	
 
    while (receiver_index < num_receivers) {
 
        channel sum_tx -> sum_rx;
 
        channel port_tx -> port_rx;
 
        new port_receiver(port_rx, sum_tx);
 

	
 
        sum_combine_rx_ports @= { sum_rx };
 
        port_tx_ports @= { port_tx };
 
        receiver_index += 1;
 
    }
 

	
 
    // Construct the component that redistributes the total number of input
 
    // ports.
 
    new port_sender(port_tx_ports, value_rx_ports);
 

	
 
    // Construct the component that computes the sum of all sent values
 
    channel total_value_tx -> total_value_rx;
 
    new sum_collector(sum_combine_rx_ports, total_value_tx);
 

	
 
    auto expected = num_value_ports * (num_value_ports + 1) / 2;
 
    auto received = 0;
 

	
 
    sync received = get(total_value_rx);
 

	
 
    if (expected == received) {
 
        print("got the expected value!");
 
    } else {
 
        print("got something entirely different");
 
    }
 
}
 
\ No newline at end of file
testdata/examples/04_native_components.pdl
Show inline comments
 
new file 100644
 
// We'll start by important the standard library that defines the builtin
 
// components that support a TCP listener and a TCP client.
 

	
 
import std.internet::*;
 

	
 
// We'll define a little utility used through this document that is called to
 
// retrieve the port we're going to listen on.
 

	
 
func listen_port() -> u16 {
 
    return 2392;
 
}
 

	
 
// Next we define our server. The server accepts (for the case of this example)
 
// a number of connections until it will stop listening. At that point it will
 
// wait until it receives a signal that allows it to shut down.
 

	
 
comp server(u32 num_connections, in<()> shutdown) {
 
    // Here we set up the channels for commands, going to the listener
 
    // component, and the channel that sends new connections back to us.
 
    channel listen_cmd_tx -> listen_cmd_rx;
 
    channel listen_conn_tx -> listen_conn_rx;
 

	
 
    // And we create the tcp_listener, imported from the standard library, here.
 
    new tcp_listener({}, listen_port(), listen_cmd_rx, listen_conn_tx);
 

	
 
    // Here we set up a variable that will hold our received connections
 
    channel client_cmd_tx -> unused_client_cmd_rx;
 
    channel unused_client_data_tx -> client_data_rx;
 
    auto new_connection = TcpConnection{
 
        tx: client_cmd_tx,
 
        rx: client_data_rx,
 
    };
 

	
 
    auto connection_counter = 0;
 
    while (connection_counter < num_connections) {
 
        // We wait until we receive a new connection
 
        print("server: waiting for an accepted connection");
 
        sync {
 
            // The way the standard library is currently written, we need to
 
            // send the `tcp_listener` component the command that it should
 
            // listen to for the next connection. This is only one way in which
 
            // the standard library could be written. We could also write it
 
            // such a way such that a separate component buffers new incoming
 
            // connections, such that we only have to `get` from that separate
 
            // component.
 
            //
 
            // Note that when we get such a new connection, (see the
 
            // TcpConnection struct in the standard library), the peers of the
 
            // two ports are already hooked up to a `tcp_client` component, also
 
            // defined in the standard library.
 
            put(listen_cmd_tx, ListenerCmd::Accept);
 
            new_connection = get(listen_conn_rx);
 
        }
 

	
 
        // In any case, now that the code is here, the synchronous round that
 
        // governed receiving the new connection has completed. And so we send
 
        // that connection off to a handler component. In this case we have the
 
        // `echo_machine` component, defined in this file as well.
 
        print("server: spawning an echo'ing component");
 
        new echo_machine(new_connection);
 
        connection_counter += 1;
 
    }
 

	
 
    // When all of the desired connections have been handled, we first await a
 
    // shutdown signal from another component.
 
    print("server: awaiting shutdown signal");
 
    sync auto v = get(shutdown);
 

	
 
    // And once we have received that signal, we'll instruct the listener
 
    // component to shut down.
 
    print("server: shutting down listener");
 
    sync put(listen_cmd_tx, ListenerCmd::Shutdown);
 
}
 

	
 
// This is the component that is spawned by the server component to handle new
 
// connections. All it does is wait for a single incoming TCP packet, where it
 
// expects a single byte of data, and then echo that back to the peer.
 

	
 
comp echo_machine(TcpConnection conn) {
 
    auto data_to_echo = {};
 

	
 
    // Here is where we receive a message from a peer ...
 
    sync {
 
        print("echo: receiving data");
 
        put(conn.tx, ClientCmd::Receive);
 
        data_to_echo = get(conn.rx);
 
        put(conn.tx, ClientCmd::Finish);
 
    }
 

	
 
    // ... and send it right back to our peer.
 
    print("echo: sending back data");
 
    sync put(conn.tx, ClientCmd::Send(data_to_echo));
 

	
 
    // And we ask the `tcp_client` to shut down neatly.
 
    print("echo: shutting down");
 
    sync put(conn.tx, ClientCmd::Shutdown);
 
}
 

	
 
// Here is the component that we will instantiate to connect to the `server`
 
// component above (more specifically, to the `tcp_listener` component
 
// instantiated by the `server`). This is the component that will ask the
 
// `echo_machine` component to echo a byte of data.
 

	
 
comp echo_requester(u8 byte_to_send, out<()> done) {
 
    // We instantiate the `tcp_client` from the standard library. This will
 
    // perform the "connect" call to the `tcp_listener`.
 
    channel cmd_tx -> cmd_rx;
 
    channel data_tx -> data_rx;
 
    new tcp_client({127, 0, 0, 1}, listen_port(), cmd_rx, data_tx);
 

	
 
    // And once we are connected, we send the single byte to the other side.
 
    print("requester: sending bytes");
 
    sync put(cmd_tx, ClientCmd::Send({ byte_to_send }));
 

	
 
    // This sent byte will arrive at the `echo_machine`, which will send it
 
    // right back to us. So here is where we wait for that byte to arrive.
 
    auto received_byte = byte_to_send + 1;
 
    sync {
 
        print("requester: receiving echo response");
 
        put(cmd_tx, ClientCmd::Receive);
 
        received_byte = get(data_rx)[0];
 
        put(cmd_tx, ClientCmd::Finish);
 
    }
 

	
 
    // We make sure that we got back what we sent
 
    while (byte_to_send != received_byte) {
 
        print("requester: Oh no! we got back a byte different from the one we sent");
 
    }
 

	
 
    // And we shut down the TCP connection
 
    print("requester: shutting down TCP component");
 
    sync put(cmd_tx, ClientCmd::Shutdown);
 

	
 
    // And finally we send a signal to another component (the `main` component)
 
    // to let it know we have finished our little protocol.
 
    sync put(done, ());
 
}
 

	
 
// And here the entry point for our program
 

	
 
comp main() {
 
    // Some settings for the example
 
    auto num_connections = 12;
 

	
 
    // We create a new channel that allows us to shut down our server component.
 
    // That channel being created, we can instantiate the server component.
 
    channel shutdown_listener_tx -> shutdown_listener_rx;
 
    new server(num_connections, shutdown_listener_rx);
 

	
 
    // Here we create all the requesters that will ask their peer to echo back
 
    // a particular byte.
 
    auto connection_index = 0;
 
    auto all_done = {};
 
    while (connection_index < num_connections) {
 
        channel done_tx -> done_rx;
 
        new echo_requester(cast(connection_index), done_tx);
 
        connection_index += 1;
 
        all_done @= {done_rx};
 
    }
 

	
 
    // Here our program starts to shut down. First we'll wait until all of our
 
    // requesting components have gotten back the byte they're expecting.
 
    auto counter = 0;
 
    while (counter < length(all_done)) {
 
        print("constructor: waiting for requester to exit");
 
        sync auto v = get(all_done[counter]);
 
        counter += 1;
 
    }
 

	
 
    // And we shut down our server.
 
    print("constructor: instructing listener to exit");
 
    sync put(shutdown_listener_tx, ());
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)