diff --git a/docs/runtime/consensus.md b/docs/runtime/consensus.md index f15b30cf115e4f693f1a8c052b2fb403d7527b04..6e24c401cfc97730c7ef1091170d6c4fe2845523 100644 --- a/docs/runtime/consensus.md +++ b/docs/runtime/consensus.md @@ -1,51 +1,213 @@ -# Consensus Algorithm - -## Introduction. - -An essential concept within the Reowolf language is the `sync` block. Behaviours that are specified within such a block (as imperative code, containing the instructions to send or receive information, and conditions on the values in memory) must agree with all other parties that participate in the interaction. An additional concept within such a `sync` block is speculative execution. Code that uses this execution temporarily forks and is allowed to perform multiple behaviours at the same time. At the end of the `sync` block only one particular execution (i.e. local behaviour) is allowed to complete. This results in additional complexity in finding satisfactory global behaviour. - -This document attempts to explain the chosen implementation of the initial consensus protocol. At some point one should be able to write consensus protocols associated with `sync` blocks within PDL. As initial experimentation (mainly to see which information should be available to a programmer using PDL) the consensus protocol will be written in the language in which the runtime is written. - -The Reowolf 1.2 consensus protocol aims to fix several issues that were present in the Reowolf 1.0 consensus protocol, among which: - -- The newer protocol should not synchronize among all known connectors on a machine. Rather, it should only aim to achieve consensus among the connectors that are actually communicating to one-another in the same interaction. Any connector that does not send or receive messages to this "synchronous region" does not belong to that synchronous region. -- The newer protocol should aim to be leaderless. The old protocol featured a leader per interaction. Both the leader election itself and the subsequent picking of the global behaviour caused a large message overhead. Additionally the leader is tasked with a large computational overhead. Especially in the context of relatively large synchronous regions where some participants are running code on low-powered devices this is troublesome. -- With regards to performance, the new consensus protocol should aim to reduce the message complexity and amount of transmitted bytes as much as possible. Additionally computational complexity should be reduced by attempting to perform a reduction in the number of valid local connector behaviours, thereby reducing the search space of the global connector behaviour. - -In the following discussion, there is a lot of room for optimization. But we'll describe the general algorithm first, and the specific optimizations in a different document in the future. - -## Data Structures - -### Speculative Execution - -First we create a data structure for the speculative execution itself. Speculative execution is tracked in memory as an execution tree. At the root we find the very first bit of code that is executed without any speculative execution. Each node contains the executed code associated with a particular branch. The edges in this tree might imply speculative execution (if there is more than one edge leaving a particular node), or might simply imply a "dependency" (explained later) if there is only one edge. - -At the leaves of the tree we find successful executions, where the very last instruction is the "end of the sync block" instruction. Reaching such a leaf implies that we found a local behaviour that satisfies the local constraints placed upon the behaviour. If we trace the path from the root to the particular leaf then we find the execution path. If one imagines that all of the code in all of the nodes in the execution path are concatenated, then one finds all executed instructions in order of execution. - -Each time a connector reaches a sync block, it will associate a number with that sync block. We'll call this the `round ID`. Each executed sync block will have a unique `round ID` (up to some reasonable limit in case of integer overflow). Likewise each of the nodes in the execution tree will have a unique number called the `branch ID`. The branch ID is unique among all branches in the execution tree, but numbers may be reused in different `round ID`s. - -### Tracking Dependencies - -One of the implications of being able to send messages and perform speculative execution is that branches will also be created upon receiving messages. One may imagine connectors `S` and `R`. `R` simply has the behaviour of receiving a message and handing it off to some native application. But `S` branches and sends, in each branch, a message over the same port. This implies that `R` will also end up with two branches: one per received message. In order to track dependencies between these two parties it is sufficient to annotate each message with its sender's branch number. Afterwards we can pick the branch numbers that are consistent between the two parties. - -When more than two parties are communicating, the behaviour becomes more complicated. A series of connectors `A`, `B`, `C`, etc. may have behaviours that depend on one-another in convoluted fashion. A particular valid execution trace in `A` may have send message to multiple different connectors `B`, `C` and `D`, influencing their speculative behaviour. In turn `B`, `C` and `D` may have done some branching on their own, and each of them sends messages to a final connector `E`. We now have that the branches in `B`, `C` and `D` depend on `A`, and `E` depending on the former three. A consensus protocol needs to be able to reason about these dependencies and, when a solution is possible, pick a single execution path in each of the connectors. - -In order to achieve this, we'll simplify the subsequent discussion for now by assuming that there is some later algorithm that will kick in once a connector has found a local solution. This algorithm will somehow seek among all connectors if they agree with a particular solution. For now we'll just consider the necessary information that needs to be provided to this algorithm in order for it to find a solution. -\ -\ -\ -To start the train of thought, suppose that each connector that sends a message will append its execution path's branch numbers, and any of the branch numbers it has received through messages. This implies that each branch in the execution tree is associated with a mapping from `connector ID` to a set of branch numbers. If a connector receives a message then it can deposit the message in a branch if the received message's mapping contains `connector ID`s that map to a branch number set that is a superset of branch numbers in the branch's mapping itself. There are no restrictions on the set of `connector ID`s itself. Only on the branch number sets that are associated with the intersection of the `connector ID` sets. - -The upside of this scheme is that each connector has a complete view of the dependencies that exist within the synchonous region that resulted in the branch. The downside is that the amount of data quickly balloons. Each branch that encountered a `get` call needs to wait for more messages, and needs to keep the complete branch number mapping around. - -The subsequent algorithm, the one that makes sure that everyone agrees to a particular solution, then results in sending around this mapping, each connector adding its own compatible branch number mapping to it (or, if there is no compatible mapping, deleting the solution). If this messages reaches all connectors, and all connectors agree to the chosen mapping, then we have found a solution. -\ -\ -\ -A different approach would be to take a different look at the global behaviour centered around the channels themselves. Two connectors can only have a dependency on one another if they communicate through a channel. Furthermore, suppose connector `A` sends to `B` and `B` sends to `C`. In the scheme described above `C` would know about its dependency on `A`. However, this is redundant information. If `C` knows about its dependency on `B`, and `B` knows about its dependency on `A`, then globally we have a full view on the dependencies as well. If `A` sends to `C` as well, then `C` does not know about the interdependency between the message traversing `A -> B -> C` and the message traversing `A -> C`. But again: if we take a global view and join the branch number mapping of `A`, `B` and `C`, then we're able to determine the global behaviour. - -So instead of sending all branch number information received. We can append only the sending connector's branch numbers along with a message. A receiving connector will now associate these branch numbers with the port through which the message was received. Hence a connector's branch will have a branch number, but also a mapping from `port ID` to the branch number set of the sending party. - -If we send around a solution to all connectors (again, the procedure for which will be detailed later) they can be reconciled in the following manner. The connectors sharing a port will always have the "putter" choosing the port number mapping. And the "putter" may have advanced its execution and increased the number of elements in the branch number set. So if the "putter" receives a solution, then it needs to check if the port's branch number set is a subset of its own branch number set. If a "getter" receives a solution then it needs to check if the port's branch number set is a superset of its own branch number set. - -Taking a step back: if a global solution exists, then it is composed out of the local solutions per connector, of which there is at least one per connector. The fact that all connectors are part of the same synchronous region implies that each channel will have seen at least one interaction between the connector(s) that own the ports. Hence each channel will have had one set of branch IDs mapped to it. Hence if we were to take the branch ID sets associated with each channel, then we're able to find the global solution. \ No newline at end of file +# Previous Consensus Algorithm + +## Introduction + +The previous consensus algorithm (the one within Reowolf 1.0 and 1.1) had support for speculative execution. This means that the user may (directly or indirectly) fork the execution of a component. That particular execution then becomes two executions. At some point a component will have to choose which particular execution will be committed to memory. This is one reason for the existence of a `sync` block: a block of code wherein one may perform forking, and at the end a component will have to choose the execution that is committed to memory. + +With speculative execution we may have multiple components that are all forking their execution and sending/receiving messages. So we do not end up with one component that has to choose its final execution, but all components choosing their final execution. Note that one component's execution may apply restrictions on the validity of another component's execution. As an example, suppose the following components and their executions: + +- Component A: Has two executions: + - Execution A1: Component A has sent a message to component B. + - Execution A2: Component A has received a message from component B. +- Component B: Has three executions: + - Execution B1: Component B has received a message from component A, then sends a message back to component A. + - Execution B2: Component B has received a message from component A. + - Execution B3: Component B has sent two messages to component A. + +Without delving into too much detail, one may see that the only valid solution to this problem is the combination of `A1` and `B2`. + +## Component Execution Tree, and Execution Traces + +Components execute PDL code, which may contain calls to `fork`, `put`, and `get`. A `fork` explicitly forks the execution of the code. A `put` sends a message to a particular component, and a `get` receives a message from a component and forks (as explained later). + +As the component enters a `sync` block, it has only one possible execution. But as stated above there are reasons for the execution to split up. These individual executions may themselves split up later, thereby forming a so-called "execution tree": + +``` + +-----+ +------+ + | put |------>| sync | ++-------+ +------+----->+-----+ | end | +| sync | | fork | +------+ +| start |----->+------+----->+-----+ ++-------+ | get |------>+------+ + +-----+ | sync | + | | end | + | +------+ + | + +------>+------+ + | | sync | + | | end | + | +------+ + | + +--> ... +``` + +This corresponds to the following PDL code: + +``` +primitive some_component(out tx, in rx) { + sync { + fork { + put(tx, 5); + } or fork { + get(rx, 1); + } +} +``` + +We can see the reason for calling the execution tree a "tree". There are several things to note about the execution tree: Firstly that some executions have been completed and form a complete trace, that is: starting from the "sync start" a complete trace may be represented by the line running to the "sync end". Conversely, there is one trace that is incomplete: there is a trace waiting at the `get` for a message. We'll call a place where the execution splits into multiple branches/executions a "branching point". + +Note that the branching points can in the *general case* only be discovered at runtime. Any code may have control flow points like `if` statements, or `while` loops. Consider the following code: + +``` +primitive some_component(out tx, bool which_way) { + sync { + if (which_way) { + fork { + put(tx, 1); + } or fork { + put(tx, 2); + } + } else { + put(tx, 3); + } + } +} +``` + +Depending on the value of `which_way` we produce two different execution trees (of which we can determine all traces). The compiler cannot decide at compile time which execution tree will be generated. + +Note that the `get` branching points have an arbitrary number of forked executions arising from them. We'll call them "waiting points". In the *general case* we cannot figure out how many forked executions arise from a `get` branching point. The reason being might be illustrated by the following simple example: + +``` +primitive sender(out tx, u32 num_forks) { + sync { + auto fork_counter = 1; + while (fork_counter < num_forks) { + fork { + put(tx, fork_counter); + } or fork { } // empty case + } + put(tx, num_forks); + } +} + +primitive receiver(in rx) { + u32[] values = {}; + sync { + bool keep_going = true; + while (keep_going) { + auto new_value = get(rx); + values @= { new_value }; // append + fork { + keep_going = false; + } or fork { } + } + } +} +``` + +If the sender is connected to the receiver, then the sender will send anywhere between `1` and `num_forks` messages (distributed over `num_forks` forks), depending on a user-supplied parameter (which we cannot figure out at compile-time). The isolated receiver can generate an infinite number of forked executions. We can analyze that the receiver will at most have `num_forks + 1` forked executions arising from its `get` branching point (the `num_forks` branches that do receive, and one final fork that is infinitely waiting on another message), but the compiler cannot. + +For this reason a `get` branching point needs to be kept around for the entire duration of the sync block. The runtime will always need to have a copy of the component's memory and execution state the moment it encountered a `get` instruction, because it might just be that another component (in perhaps a new fork, which we cannot predict) will send it another message, such that it needs to produce a new forked execution. + +A `get` operation is also a "blocking operation": in the *general case* the component needs to know the value produced by the `get` operation in order to continue its execution (perhaps more specifically: the first time a `read` operation is performed on the variable that will store the transmitted message). Consider the simple case where the received message contains a boolean that is used in the test expression of an `if` statement: we'll need to have actually received that boolean before we can decide which control flow path to take. Speculating on the contents of messages is too computationally expensive to be taken seriously. A put operation is not a blocking operation: the message is sent and the component continues executing its code. + +We've touched upon control flow points multiple times. We'll touch upon some aspects of control flow here, to more easily introduce the algorithm for finding consensus later. A component is fully described by its memory (i.e. all of the memory locations it has access to through its variables) and execution state (i.e. its current position in its code). So once a component encounters a control flow point, it can only take one control flow path. The calling of certain impure functions (e.g. retrieving a cryptographically secure random number) does not change this fact. Note that receiving values from other components might change a component's memory state, hence influence the control flow path it takes in the subsequent forked execution. Conversely, a component sending a value might influence another component's memory state. + +So before treading into more detail, here we've found that in the general case: + +- A interior of a sync block demarks the place where speculative execution may occur. +- Speculative execution implies that we end up with an execution tree. +- A path through the execution tree that reaches the end of the sync block is called a trace, and represents a valid execution of the sync block for the component (but perhaps not for a peer it interacted with). +- The set of traces produced by a component in its sync block can practically only be discovered at runtime. +- A `get` operation is necessarily a blocking operation that always incurs a branching point. A `put` operation is a nonblocking operation that does not incur a branching point. +- The trace of a component is influenced by the messages it has received. + +## Towards a Consensus Algorithm + +The solution to the consensus problem is somehow discovering the ways in which the components have influenced the memory state of their peers. If we have a complete trace for each component, for which all peer components agree on the way they have influenced that complete trace, then we've found a solution to the consensus problem. Hence we can subdivide the consensus problem into four parts: + +1. Keeping track of the messages that influence the memory state of components. +2. Keeping track of the peers that influence the memory state of components. +3. Finding a set of interactions between components on which all involved components agree, i.e. each `put` should have a corresponding `get` at the peer. +4. Somehow having a communication protocol that finds these agreeable interactions. + +We'll incrementally work towards a solution that satisfies the first three points. We'll not consider the last point, as this is essentially a gossip protocol. We define some terms to make the following discussion easier: + +- "sync region": The group of components that have interacted with one another and should agree on the global consensus solutionj. +- "local solution": A complete trace of a component. For the component this is a valid local solution, but might not be part of a global solution. +- "global solution": A set of traces, one for each of the components in the sync region, that all agree on the interactions that took place between the components in the sync region. + +Suppose a component can somehow predict exactly which messages we're going to receive during the execution of its code, we'll assume that each received message has the appropriate `get` call associated with it. In this case we're able to produce the set of complete traces that a component produces by symbolically executing its code: we start out with the initial memory state, might perhaps do some explicit `fork`ing, know exactly which messages we receive and how they influence the control flow, and arrive at the end of the sync block. + +However, as we've outlined above, we cannot know exactly which messages we're going to receive. We'll have to discover these messages while executing a component. The next best thing is to keep track of the values of the messages that we've received in a complete trace. Once we have complete traces for all of the interacting components, we can check that the received value corresponds to a sent value. e.g. + +``` +primitive sender(out tx) { + sync { + fork { + put(tx, 1); + } or fork { + put(tx, 2); + } + } +} + +primitive receiver(in rx) { + u32 value = 0; + sync { + value = get(rx); + } +} +``` + +Where `tx` is part of the same channel as `rx`. In this case we'll have two traces for each of the components, resulting in two valid global consensus solutions. In one solution the message `1` was transferred, in another the message `2` was transferred. There are two problems with this solution: firstly it doesn't take the identity of the channel into account. And secondly it doesn't take the effects of previous messages into account. + +To illustrate the first problem, consider: + +``` +primitive sender(out tx_a, out tx_b) { + sync { + fork { + put(tx_a, 1); + } or fork { + put(tx_b, 1); + } + } +} + +primitive receiver(in rx_a, in rx_b) { + u32 value = 0; + sync { + value = get(rx_a); + } +} +``` + +Here the fact that the sender has the solutions `1` and `1` does not help the receiver figure out which of those corresponds to its own solution of `1`. + +To illustrate the second problem, consider: + +``` +primitive sender(out tx) { + sync { + fork { + put(tx, 1); + put(tx, 2); + } or fork { + put(tx, 2); + } + } +} + +primitive receiver(in rx) { + u32 value = 0; + sync { + value = get(rx); + } +} +``` + +Now we'll have `sender` contributing the solutions `1, 2` and `2`. While the receiver will generate the solutions `1`, `2` and `2`. The reason there are three solutions for the receiver is because it cannot figure out that the message `2` from the sender depended on the first message `1` from the sender having arrived. + +We can solve this by somehow embedding the identity of the channel associated with the message, and by describing all of the previous interactions \ No newline at end of file diff --git a/docs/runtime/known_issues.md b/docs/runtime/known_issues.md new file mode 100644 index 0000000000000000000000000000000000000000..1e6b3c7b18bf2d314979c2fec31fd7e343e67397 --- /dev/null +++ b/docs/runtime/known_issues.md @@ -0,0 +1,25 @@ +# Known Issues + +The current implementation of Reowolf has the following known issues: + +- Cannot create uninitialized variables that are later known to be initialized. This is not a problem for the regular types (perhaps a bit tedious), but is a problem for channels/ports. That is to say: if a component needs a temporary variable for a port, then it must create a complete channel. e.g. + + ``` + comp send(out tx1, out tx2, in which) { + channel unused -> temporary; + while (true) sync { + if (get(which)) { + temporary = tx1; + } else { + temporary = tx2; + } + put(temporary, 1); + } + } + ``` + +- Reserved memory for ports will grow without bounds: Ports can be given away from one component to another by creating a component, or by sending a message containing them. The component sending those ports cannot remove them from its own memory if there are still other references to the transferred port in its memory. This is because we want to throw a reasonable error if that transferred port is used by the original owner. Hence we need to keep some information about that transferred port in the sending component's memory. The solution is to have reference counting for the ports, but this is not implemented. + +- An extra to the above statements: when transferring ports to a new component, the memory that remembers the state of that port is removed from the component that is creating the new one. Hence using old references to that port within the creating component's PDL code results in a crash. + +- Some control algorithms are not robust under multithreading. Mainly error handling when in sync mode (because there needs to be a revision where we keep track of which components are still reachable by another component). And complicated scenarios where ports are transferred. \ No newline at end of file diff --git a/src/protocol/ast.rs b/src/protocol/ast.rs index 3bcc0b525893c980586cd57246693feb65892865..c119e5ed087671302601906a25c5e0eff565789d 100644 --- a/src/protocol/ast.rs +++ b/src/protocol/ast.rs @@ -1009,8 +1009,7 @@ impl UnionDefinition { #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum ProcedureKind { Function, // with return type - Primitive, // without return type - Composite, + Component, } /// Monomorphed instantiation of a procedure (or the sole instantiation of a @@ -1087,6 +1086,7 @@ pub enum ProcedureSource { // Builtin components, available to user CompRandomU32, // TODO: Remove, temporary thing CompTcpClient, + CompTcpListener, } impl ProcedureSource { @@ -1099,8 +1099,7 @@ impl ProcedureSource { } -/// Generic storage for functions, primitive components and composite -/// components. +/// Generic storage for functions and components. // Note that we will have function definitions for builtin functions as well. In // that case the span, the identifier span and the body are all invalid. #[derive(Debug)] @@ -1854,6 +1853,7 @@ pub enum Method { // Builtin component, ComponentRandomU32, ComponentTcpClient, + ComponentTcpListener, // User-defined UserFunction, UserComponent, @@ -1864,7 +1864,7 @@ impl Method { use Method::*; match self { Get | Put | Fires | Create | Length | Assert | Print => true, - ComponentRandomU32 | ComponentTcpClient => true, + ComponentRandomU32 | ComponentTcpClient | ComponentTcpListener => true, _ => false, } } diff --git a/src/protocol/eval/executor.rs b/src/protocol/eval/executor.rs index 3507bdbbb6d77b6868015cda70c68f49b98b634f..982ee2dd826b4b13a850774b7ce1b20b302ce158 100644 --- a/src/protocol/eval/executor.rs +++ b/src/protocol/eval/executor.rs @@ -770,7 +770,7 @@ impl Prompt { }, } }, - Method::ComponentRandomU32 | Method::ComponentTcpClient => { + Method::ComponentRandomU32 | Method::ComponentTcpClient | Method::ComponentTcpListener => { debug_assert_eq!(heap[expr.procedure].parameters.len(), cur_frame.expr_values.len()); debug_assert_eq!(heap[cur_frame.position].as_new().expression, expr.this); }, diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index d0dd2453c06c9f49d1cce7f935c3c0f092dba0ce..f628e7c21f64a040f7dea4b0fe7c027f60cb40bc 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -160,6 +160,32 @@ impl ProtocolDescription { }); } + /// Again a somewhat temporary method. Can be used by components to look up + /// the definition of a particular procedure. Intended use is to find the + /// DefinitionId/TypeId of builtin components. + pub(crate) fn find_procedure(&self, module_name: &[u8], proc_name: &[u8]) -> Option<(ProcedureDefinitionId, TypeId)> { + // Lookup type definition in module + let root_id = self.lookup_module_root(module_name)?; + let module = &self.heap[root_id]; + let definition_id = module.get_definition_by_ident(&self.heap, proc_name)?; + let definition = &self.heap[definition_id]; + + // Make sure the procedure is not polymorphic + if !definition.poly_vars().is_empty() { + return None; + } + if !definition.is_procedure() { + return None; + } + + // Lookup in type table + let definition = definition.as_procedure(); + let type_parts = [ConcreteTypePart::Component(definition.this, 0)]; + let type_id = self.types.get_monomorph_type_id(&definition.this.upcast(), &type_parts) + .expect("type ID for non-polymorphic procedure"); + return Some((definition.this, type_id)); + } + fn lookup_module_root(&self, module_name: &[u8]) -> Option { for module in self.modules.iter() { match &module.name { @@ -296,6 +322,12 @@ impl<'a> TypeInspector<'a> { let type_table = self.type_table.variant.as_union(); return UnionTypeInspector{ heap, type_table }; } + + pub fn as_struct(&'a self) -> StructTypeInspector<'a> { + let heap = self.heap.as_struct(); + let type_table = self.type_table.variant.as_struct(); + return StructTypeInspector{ heap, type_table }; + } } pub struct UnionTypeInspector<'a> { @@ -310,4 +342,23 @@ impl UnionTypeInspector<'_> { .position(|v| v.identifier.value.as_bytes() == variant_name)?; return Some(variant_index as i64); } +} + +pub struct StructTypeInspector<'a> { + heap: &'a StructDefinition, + type_table: &'a StructMonomorph, +} + +impl StructTypeInspector<'_> { + /// Retrieves number of struct fields + pub fn get_num_struct_fields(&self) -> usize { + return self.heap.fields.len(); + } + + /// Retrieves struct field index + pub fn get_struct_field_index(&self, field_name: &[u8]) -> Option { + let field_index = self.heap.fields.iter() + .position(|v| v.field.value.as_bytes() == field_name)?; + return Some(field_index); + } } \ No newline at end of file diff --git a/src/protocol/parser/pass_definitions.rs b/src/protocol/parser/pass_definitions.rs index f1daca6c3081615e0dee6cdde1a16065d2eaf752..23c7b2ab8be45b30fdeb1ad82d654cbc0f803963 100644 --- a/src/protocol/parser/pass_definitions.rs +++ b/src/protocol/parser/pass_definitions.rs @@ -98,7 +98,7 @@ impl PassDefinitions { Some(KW_ENUM) => self.visit_enum_definition(module, &mut iter, ctx)?, Some(KW_UNION) => self.visit_union_definition(module, &mut iter, ctx)?, Some(KW_FUNCTION) => self.visit_function_definition(module, &mut iter, ctx)?, - Some(KW_PRIMITIVE) | Some(KW_COMPOSITE) => self.visit_component_definition(module, &mut iter, ctx)?, + Some(KW_COMPONENT) => self.visit_component_definition(module, &mut iter, ctx)?, _ => return Err(ParseError::new_error_str_at_pos( &module.source, iter.last_valid_pos(), "unexpected symbol, expected a keyword marking the start of a definition" @@ -297,8 +297,8 @@ impl PassDefinitions { &mut self, module: &Module, iter: &mut TokenIter, ctx: &mut PassCtx ) -> Result<(), ParseError> { // Consume component variant and name - let (_variant_text, _) = consume_any_ident(&module.source, iter)?; - debug_assert!(_variant_text == KW_PRIMITIVE || _variant_text == KW_COMPOSITE); + let (_component_text, _) = consume_any_ident(&module.source, iter)?; + debug_assert!(_component_text == KW_COMPONENT); let (ident_text, _) = consume_ident(&module.source, iter)?; // Retrieve preallocated definition @@ -378,6 +378,7 @@ impl PassDefinitions { ("std.global", "print") => ProcedureSource::FuncPrint, ("std.random", "random_u32") => ProcedureSource::CompRandomU32, ("std.internet", "tcp_client") => ProcedureSource::CompTcpClient, + ("std.internet", "tcp_listener") => ProcedureSource::CompTcpListener, _ => panic!( "compiler error: unknown builtin procedure '{}' in module '{}'", procedure_name, module_name @@ -390,7 +391,7 @@ impl PassDefinitions { let source = match kind { ProcedureKind::Function => ProcedureSource::FuncUserDefined, - ProcedureKind::Primitive | ProcedureKind::Composite => + ProcedureKind::Component => ProcedureSource::CompUserDefined, }; @@ -1663,8 +1664,14 @@ impl PassDefinitions { // TODO: Once we start generating bytecode this is unnecessary let procedure_id = proc_def.this; let method = match proc_def.source { - ProcedureSource::FuncUserDefined => Method::UserFunction, - ProcedureSource::CompUserDefined => Method::UserComponent, + // Bit of a hack, at this point the source is not yet known, except if it is a + // builtin. So we check for the "kind" + ProcedureSource::FuncUserDefined | ProcedureSource::CompUserDefined => { + match proc_def.kind { + ProcedureKind::Function => Method::UserFunction, + ProcedureKind::Component => Method::UserComponent, + } + }, ProcedureSource::FuncGet => Method::Get, ProcedureSource::FuncPut => Method::Put, ProcedureSource::FuncFires => Method::Fires, @@ -1674,6 +1681,7 @@ impl PassDefinitions { ProcedureSource::FuncPrint => Method::Print, ProcedureSource::CompRandomU32 => Method::ComponentRandomU32, ProcedureSource::CompTcpClient => Method::ComponentTcpClient, + ProcedureSource::CompTcpListener => Method::ComponentTcpListener, _ => todo!("other procedure sources"), }; diff --git a/src/protocol/parser/pass_symbols.rs b/src/protocol/parser/pass_symbols.rs index 2688658feb17f4d0e48c04846443c44f55b7a46c..53570356c980c4f4b580aa57ee713a5123c28ab8 100644 --- a/src/protocol/parser/pass_symbols.rs +++ b/src/protocol/parser/pass_symbols.rs @@ -237,14 +237,9 @@ impl PassSymbols { definition_class = DefinitionClass::Function; ast_definition_id = proc_def_id.upcast(); }, - KW_PRIMITIVE | KW_COMPOSITE => { - let procedure_kind = if kw_text == KW_PRIMITIVE { - ProcedureKind::Primitive - } else { - ProcedureKind::Composite - }; + KW_COMPONENT => { let proc_def_id = ctx.heap.alloc_procedure_definition(|this| { - ProcedureDefinition::new_empty(this, module.root_id, procedure_kind, identifier, poly_vars) + ProcedureDefinition::new_empty(this, module.root_id, ProcedureKind::Component, identifier, poly_vars) }); definition_class = DefinitionClass::Component; ast_definition_id = proc_def_id.upcast(); diff --git a/src/protocol/parser/pass_tokenizer.rs b/src/protocol/parser/pass_tokenizer.rs index c611c9c4dc6b79e5d39c2f23c19a8742c7f8db3c..4b79f6b518aa0dbcccea1094491c81cd3da53a25 100644 --- a/src/protocol/parser/pass_tokenizer.rs +++ b/src/protocol/parser/pass_tokenizer.rs @@ -623,8 +623,7 @@ fn demarks_symbol(ident: &[u8]) -> bool { ident == KW_ENUM || ident == KW_UNION || ident == KW_FUNCTION || - ident == KW_PRIMITIVE || - ident == KW_COMPOSITE + ident == KW_COMPONENT } #[inline] diff --git a/src/protocol/parser/pass_validation_linking.rs b/src/protocol/parser/pass_validation_linking.rs index ae2f403e976a52ca8b56971f1b892e21f74264a8..724ed5f0f244e47449660d821740d0d23faa2c78 100644 --- a/src/protocol/parser/pass_validation_linking.rs +++ b/src/protocol/parser/pass_validation_linking.rs @@ -400,10 +400,10 @@ impl Visitor for PassValidationLinking { )); } - if self.proc_kind != ProcedureKind::Primitive { + if self.proc_kind != ProcedureKind::Component { return Err(ParseError::new_error_str_at_span( &ctx.module().source, cur_sync_span, - "synchronous statements may only be used in primitive components" + "synchronous statements may only be used in components" )); } @@ -472,10 +472,10 @@ impl Visitor for PassValidationLinking { )); } - if self.proc_kind != ProcedureKind::Primitive { + if self.proc_kind != ProcedureKind::Component { return Err(ParseError::new_error_str_at_span( &ctx.module().source, select_stmt.span, - "select statements may only be used in primitive components" + "select statements may only be used in components" )); } @@ -558,12 +558,12 @@ impl Visitor for PassValidationLinking { } fn visit_new_stmt(&mut self, ctx: &mut Ctx, id: NewStatementId) -> VisitorResult { - // Make sure the new statement occurs inside a composite component - if self.proc_kind != ProcedureKind::Composite { + // Make sure the new statement occurs inside a component + if self.proc_kind != ProcedureKind::Component { let new_stmt = &ctx.heap[id]; return Err(ParseError::new_error_str_at_span( &ctx.module().source, new_stmt.span, - "instantiating components may only be done in composite components" + "instantiating components may only be done in components" )); } @@ -1113,13 +1113,11 @@ impl Visitor for PassValidationLinking { // Check whether the method is allowed to be called within the code's // context (in sync, definition type, etc.) let mut expecting_wrapping_new_stmt = false; - let mut expecting_primitive_def = false; let mut expecting_wrapping_sync_stmt = false; let mut expecting_no_select_stmt = false; match call_expr.method { Method::Get => { - expecting_primitive_def = true; expecting_wrapping_sync_stmt = true; if !self.in_select_guard.is_invalid() { // In a select guard. Take the argument (i.e. the port we're @@ -1136,12 +1134,10 @@ impl Visitor for PassValidationLinking { } }, Method::Put => { - expecting_primitive_def = true; expecting_wrapping_sync_stmt = true; expecting_no_select_stmt = true; }, Method::Fires => { - expecting_primitive_def = true; expecting_wrapping_sync_stmt = true; }, Method::Create => {}, @@ -1162,7 +1158,8 @@ impl Visitor for PassValidationLinking { | Method::SelectRegisterCasePort | Method::SelectWait => unreachable!(), // not usable by programmer directly Method::ComponentRandomU32 - | Method::ComponentTcpClient => { + | Method::ComponentTcpClient + | Method::ComponentTcpListener => { expecting_wrapping_new_stmt = true; }, Method::UserFunction => {} @@ -1179,15 +1176,6 @@ impl Visitor for PassValidationLinking { let name = String::from_utf8_lossy(ctx.module().source.section_at_span(span)).to_string(); return (span, name); } - if expecting_primitive_def { - if self.proc_kind != ProcedureKind::Primitive { - let (call_span, func_name) = get_span_and_name(ctx, id); - return Err(ParseError::new_error_at_span( - &ctx.module().source, call_span, - format!("a call to '{}' may only occur in primitive component definitions", func_name) - )); - } - } if expecting_wrapping_sync_stmt { if self.in_sync.is_invalid() { diff --git a/src/protocol/parser/token_parsing.rs b/src/protocol/parser/token_parsing.rs index 28663793ff335f24a9db786c9514ece91f92536b..08abe9b338ae2931062d32064ec6016e5d6fcf70 100644 --- a/src/protocol/parser/token_parsing.rs +++ b/src/protocol/parser/token_parsing.rs @@ -17,8 +17,7 @@ pub(crate) const KW_STRUCT: &'static [u8] = b"struct"; pub(crate) const KW_ENUM: &'static [u8] = b"enum"; pub(crate) const KW_UNION: &'static [u8] = b"union"; pub(crate) const KW_FUNCTION: &'static [u8] = b"func"; -pub(crate) const KW_PRIMITIVE: &'static [u8] = b"primitive"; -pub(crate) const KW_COMPOSITE: &'static [u8] = b"composite"; +pub(crate) const KW_COMPONENT: &'static [u8] = b"comp"; pub(crate) const KW_IMPORT: &'static [u8] = b"import"; // Keywords - literals @@ -560,7 +559,7 @@ pub(crate) fn consume_ident_interned( fn is_reserved_definition_keyword(text: &[u8]) -> bool { match text { - KW_STRUCT | KW_ENUM | KW_UNION | KW_FUNCTION | KW_PRIMITIVE | KW_COMPOSITE => true, + KW_STRUCT | KW_ENUM | KW_UNION | KW_FUNCTION | KW_COMPONENT => true, _ => false, } } diff --git a/src/protocol/parser/type_table.rs b/src/protocol/parser/type_table.rs index 2084a2474138f966eba4120d4ecf030ee1b506b1..6704ab82d2db754d64ff0e8256dd727d34d33d9b 100644 --- a/src/protocol/parser/type_table.rs +++ b/src/protocol/parser/type_table.rs @@ -224,6 +224,13 @@ impl MonoTypeVariant { } } + pub(crate) fn as_struct(&self) -> &StructMonomorph { + match self { + MonoTypeVariant::Struct(v) => v, + _ => unreachable!(), + } + } + fn as_tuple_mut(&mut self) -> &mut TupleMonomorph { match self { MonoTypeVariant::Tuple(v) => v, diff --git a/src/protocol/tests/parser_validation.rs b/src/protocol/tests/parser_validation.rs index a3fc6900e3388a8fff3c4f2dc2fa6d2c91b93097..b1d30a38873182dc12dc8d480f57ff36828efd4a 100644 --- a/src/protocol/tests/parser_validation.rs +++ b/src/protocol/tests/parser_validation.rs @@ -596,7 +596,7 @@ fn test_correct_select_statement() { Tester::new_single_source_expect_ok( "guard variable decl", " - primitive f() { + comp f() { channel unused -> input; u32 outer_value = 0; @@ -612,12 +612,12 @@ fn test_correct_select_statement() { Tester::new_single_source_expect_ok( "empty select", - "primitive f() { sync select {} }" + "comp f() { sync select {} }" ); Tester::new_single_source_expect_ok( "mixed uses", " - primitive f() { + comp f() { channel unused_output -> input; u32 outer_value = 0; sync select { @@ -644,7 +644,7 @@ fn test_correct_select_statement() { fn test_incorrect_select_statement() { Tester::new_single_source_expect_err( "outside sync", - "primitive f() { select {} }" + "comp f() { select {} }" ).error(|e| { e .assert_num(1) .assert_occurs_at(0, "select") @@ -653,7 +653,7 @@ fn test_incorrect_select_statement() { Tester::new_single_source_expect_err( "variable in previous block", - "primitive f() { + "comp f() { channel tx -> rx; u32 a = 0; // this one will be shadowed sync select { auto a = get(rx) -> {} } @@ -666,7 +666,7 @@ fn test_incorrect_select_statement() { Tester::new_single_source_expect_err( "put inside arm", - "primitive f() { + "comp f() { channel a -> b; sync select { put(a) -> {} } }" @@ -725,7 +725,7 @@ fn test_incorrect_goto_statement() { Tester::new_single_source_expect_err( "goto jumping outside sync", - "primitive f() { + "comp f() { sync { goto exit; } exit: u32 v = 0; }" @@ -738,7 +738,7 @@ fn test_incorrect_goto_statement() { Tester::new_single_source_expect_err( "goto jumping to select case", - "primitive f(in i) { + "comp f(in i) { sync select { hello: auto a = get(i) -> i += 1 } @@ -750,7 +750,7 @@ fn test_incorrect_goto_statement() { Tester::new_single_source_expect_err( "goto jumping into select case skipping variable", - "primitive f(in i) { + "comp f(in i) { goto waza; sync select { auto a = get(i) -> { @@ -797,7 +797,7 @@ fn test_incorrect_while_statement() { Tester::new_single_source_expect_err( "break outside of sync", - "primitive f() { + "comp f() { outer: while (true) { //mark sync while(true) { break outer; } } diff --git a/src/runtime/tests/api_component.rs b/src/runtime/tests/api_component.rs index 67271d2986bb610f91a18425edff20de76b0b0f2..f4a9d851bff955e1a025343399ae0a8483670adb 100644 --- a/src/runtime/tests/api_component.rs +++ b/src/runtime/tests/api_component.rs @@ -8,7 +8,7 @@ use super::*; #[test] fn test_put_and_get() { const CODE: &'static str = " - primitive handler(in request, out response, u32 loops) { + comp handler(in request, out response, u32 loops) { u32 index = 0; while (index < loops) { sync { @@ -52,7 +52,7 @@ fn test_put_and_get() { #[test] fn test_getting_from_component() { const CODE: &'static str =" - primitive loop_sender(out numbers, u32 cur, u32 last) { + comp loop_sender(out numbers, u32 cur, u32 last) { while (cur < last) { sync { put(numbers, cur); @@ -91,7 +91,7 @@ fn test_getting_from_component() { #[test] fn test_putting_to_component() { const CODE: &'static str = " - primitive loop_receiver(in numbers, u32 cur, u32 last) { + comp loop_receiver(in numbers, u32 cur, u32 last) { while (cur < last) { sync { auto number = get(numbers); @@ -126,7 +126,7 @@ fn test_putting_to_component() { #[test] fn test_doing_nothing() { const CODE: &'static str = " - primitive getter(in input, u32 num_loops) { + comp getter(in input, u32 num_loops) { u32 index = 0; while (index < num_loops) { sync {} diff --git a/src/runtime/tests/data_transmission.rs b/src/runtime/tests/data_transmission.rs index d86dee7f3c8ff915d9a56d49c9281cc5ca30f768..505ee96500575b6100fe24621335602514ac00fc 100644 --- a/src/runtime/tests/data_transmission.rs +++ b/src/runtime/tests/data_transmission.rs @@ -9,7 +9,7 @@ fn test_doing_nothing() { // If this thing does not get into an infinite loop, (hence: the runtime // exits), then the test works const CODE: &'static str =" - primitive silent_willy(u32 loops) { + comp silent_willy(u32 loops) { u32 index = 0; while (index < loops) { sync { index += 1; } @@ -28,7 +28,7 @@ fn test_doing_nothing() { #[test] fn test_single_put_and_get() { const CODE: &'static str = " - primitive putter(out sender, u32 loops) { + comp putter(out sender, u32 loops) { u32 index = 0; while (index < loops) { sync { @@ -38,7 +38,7 @@ fn test_single_put_and_get() { } } - primitive getter(in receiver, u32 loops) { + comp getter(in receiver, u32 loops) { u32 index = 0; while (index < loops) { sync { @@ -69,7 +69,7 @@ fn test_single_put_and_get() { #[test] fn test_combined_put_and_get() { const CODE: &'static str = " - primitive put_then_get(out output, in input, u32 num_loops) { + comp put_then_get(out output, in input, u32 num_loops) { u32 index = 0; while (index < num_loops) { sync { @@ -81,7 +81,7 @@ fn test_combined_put_and_get() { } } - composite constructor(u32 num_loops) { + comp constructor(u32 num_loops) { channel output_a -> input_a; channel output_b -> input_b; new put_then_get(output_a, input_b, num_loops); @@ -99,7 +99,7 @@ fn test_combined_put_and_get() { #[test] fn test_multi_put_and_get() { const CODE: &'static str = " - primitive putter_static(out vals, u32 num_loops) { + comp putter_static(out vals, u32 num_loops) { u32 index = 0; while (index < num_loops) { sync { @@ -112,7 +112,7 @@ fn test_multi_put_and_get() { } } - primitive getter_dynamic(in vals, u32 num_loops) { + comp getter_dynamic(in vals, u32 num_loops) { u32 loop_index = 0; while (loop_index < num_loops) { sync { diff --git a/src/runtime/tests/network_shapes.rs b/src/runtime/tests/network_shapes.rs index a15dcd0ebd7ec07beb24e75c668455d63ad7cfe8..507f09acebdd9ab76eccd0c94ee530a73b57cd70 100644 --- a/src/runtime/tests/network_shapes.rs +++ b/src/runtime/tests/network_shapes.rs @@ -5,7 +5,7 @@ use super::*; #[test] fn test_star_shaped_request() { const CODE: &'static str = " - primitive edge(in input, out output, u32 loops) { + comp edge(in input, out output, u32 loops) { u32 index = 0; while (index < loops) { sync { @@ -16,7 +16,7 @@ fn test_star_shaped_request() { } } - primitive center(out[] requests, in[] responses, u32 loops) { + comp center(out[] requests, in[] responses, u32 loops) { u32 loop_index = 0; auto num_edges = length(requests); @@ -39,7 +39,7 @@ fn test_star_shaped_request() { } } - composite constructor(u32 num_edges, u32 num_loops) { + comp constructor(u32 num_edges, u32 num_loops) { auto requests = {}; auto responses = {}; @@ -70,7 +70,7 @@ fn test_star_shaped_request() { #[test] fn test_conga_line_request() { const CODE: &'static str = " - primitive start(out req, in resp, u32 num_nodes, u32 num_loops) { + comp start(out req, in resp, u32 num_nodes, u32 num_loops) { u32 loop_index = 0; u32 initial_value = 1337; while (loop_index < num_loops) { @@ -83,7 +83,7 @@ fn test_conga_line_request() { } } - primitive middle( + comp middle( in req_in, out req_forward, in resp_in, out resp_forward, u32 num_loops @@ -100,7 +100,7 @@ fn test_conga_line_request() { } } - primitive end(in req_in, out resp_out, u32 num_loops) { + comp end(in req_in, out resp_out, u32 num_loops) { u32 loop_index = 0; while (loop_index < num_loops) { sync { @@ -111,7 +111,7 @@ fn test_conga_line_request() { } } - composite constructor(u32 num_nodes, u32 num_loops) { + comp constructor(u32 num_nodes, u32 num_loops) { channel initial_req -> req_in; channel resp_out -> final_resp; new start(initial_req, final_resp, num_nodes, num_loops); diff --git a/src/runtime/tests/speculation.rs b/src/runtime/tests/speculation.rs index ff1d8d0b1111ff2e7764063380bc3921b0ff2b65..d5bf5c0d021c40371bf6d85a0c7692287101218c 100644 --- a/src/runtime/tests/speculation.rs +++ b/src/runtime/tests/speculation.rs @@ -8,7 +8,7 @@ fn test_maybe_do_nothing() { // somehow not allowed. Note that we "check" by seeing if the test finishes. // Only the branches in which ports fire increment the loop index const CODE: &'static str = " - primitive only_puts(out output, u32 num_loops) { + comp only_puts(out output, u32 num_loops) { u32 index = 0; while (index < num_loops) { sync { put(output, true); } @@ -16,7 +16,7 @@ fn test_maybe_do_nothing() { } } - primitive might_put(out output, u32 num_loops) { + comp might_put(out output, u32 num_loops) { u32 index = 0; while (index < num_loops) { sync { @@ -26,7 +26,7 @@ fn test_maybe_do_nothing() { } } - primitive only_gets(in input, u32 num_loops) { + comp only_gets(in input, u32 num_loops) { u32 index = 0; while (index < num_loops) { sync { auto res = get(input); assert(res); } @@ -34,7 +34,7 @@ fn test_maybe_do_nothing() { } } - primitive might_get(in input, u32 num_loops) { + comp might_get(in input, u32 num_loops) { u32 index = 0; while (index < num_loops) { sync fork { auto res = get(input); assert(res); index += 1; } or {} diff --git a/src/runtime/tests/sync_failure.rs b/src/runtime/tests/sync_failure.rs index 2960280107bae342e742ad3988d1f629f30e6a54..e4f3205d2081570dd4b0b314d18b3cc30aef7a1d 100644 --- a/src/runtime/tests/sync_failure.rs +++ b/src/runtime/tests/sync_failure.rs @@ -9,14 +9,14 @@ fn test_local_sync_failure() { // If the component exits cleanly, then the runtime exits cleanly, and the // test will finish const CODE: &'static str = " - primitive immediate_failure_inside_sync() { + comp immediate_failure_inside_sync() { u32[] only_allows_index_0 = { 1 }; while (true) sync { // note the infinite loop auto value = only_allows_index_0[1]; } } - primitive immediate_failure_outside_sync() { + comp immediate_failure_outside_sync() { u32[] only_allows_index_0 = { 1 }; auto never_gonna_get = only_allows_index_0[1]; while (true) sync {} @@ -35,7 +35,7 @@ fn test_local_sync_failure() { const SHARED_SYNC_CODE: &'static str = " enum Location { BeforeSync, AfterPut, AfterGet, AfterSync, Never } -primitive failing_at_location(in input, out output, Location loc) { +comp failing_at_location(in input, out output, Location loc) { u32[] failure_array = {}; while (true) { if (loc == Location::BeforeSync) failure_array[0]; @@ -50,21 +50,21 @@ primitive failing_at_location(in input, out output, Location loc) { } } -composite constructor_pair_a(Location loc) { +comp constructor_pair_a(Location loc) { channel output_a -> input_a; channel output_b -> input_b; new failing_at_location(input_b, output_a, loc); new failing_at_location(input_a, output_b, Location::Never); } -composite constructor_pair_b(Location loc) { +comp constructor_pair_b(Location loc) { channel output_a -> input_a; channel output_b -> input_b; new failing_at_location(input_b, output_a, Location::Never); new failing_at_location(input_a, output_b, loc); } -composite constructor_ring(u32 ring_size, u32 fail_a, Location loc_a, u32 fail_b, Location loc_b) { +comp constructor_ring(u32 ring_size, u32 fail_a, Location loc_a, u32 fail_b, Location loc_b) { channel output_first -> input_old; channel output_cur -> input_new; diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index d5712961e4599ebea7ed0b73bbdf3aa03a2d2c81..f5e74559dca3ec8e1c6e26f10e21079df43d5b7c 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -234,13 +234,14 @@ pub(crate) fn create_component( arguments: ValueGroup, num_ports: usize ) -> Box { let definition = &protocol.heap[definition_id]; - debug_assert!(definition.kind == ProcedureKind::Primitive || definition.kind == ProcedureKind::Composite); + debug_assert_eq!(definition.kind, ProcedureKind::Component); if definition.source.is_builtin() { // Builtin component let component: Box = match definition.source { ProcedureSource::CompRandomU32 => Box::new(ComponentRandomU32::new(arguments)), ProcedureSource::CompTcpClient => Box::new(ComponentTcpClient::new(arguments)), + ProcedureSource::CompTcpListener => Box::new(ComponentTcpListener::new(arguments)), _ => unreachable!(), }; @@ -472,7 +473,7 @@ pub(crate) fn default_handle_received_data_message( let new_port = comp_ctx.get_port(new_port_handle); // Add the port tho the consensus - consensus.notify_received_port(_new_inbox_index, new_port_handle, comp_ctx); + consensus.notify_of_new_port(_new_inbox_index, new_port_handle, comp_ctx); // Replace all references to the port in the received message for message_location in received_port.locations.iter().copied() { @@ -735,7 +736,6 @@ pub(crate) fn default_handle_start_exit( // Iterating over ports by index to work around borrowing rules for port_index in 0..comp_ctx.num_ports() { let port = comp_ctx.get_port_by_index_mut(port_index); - println!("DEBUG: Considering port:\n{:?}", port); if port.state.is_closed() || port.state.is_set(PortStateFlag::Transmitted) || port.close_at_sync_end { // Already closed, or in the process of being closed continue; @@ -826,7 +826,113 @@ pub(crate) fn default_handle_sync_decision( } } +/// Special component creation function. This function assumes that the +/// transferred ports are NOT blocked, and that the channels to whom the ports +/// belong are fully owned by the creating component. This will be checked in +/// debug mode. +pub(crate) fn special_create_component( + exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, instantiator_ctx: &mut CompCtx, + control: &mut ControlLayer, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup, + component_instance: Box, component_ports: Vec, +) { + debug_assert_eq!(exec_state.mode, CompMode::NonSync); + let reservation = sched_ctx.runtime.start_create_component(); + let mut created_ctx = CompCtx::new(&reservation); + let mut port_pairs = Vec::new(); + + // Retrieve ports + for instantiator_port_id in component_ports.iter() { + let instantiator_port_id = *instantiator_port_id; + let instantiator_port_handle = instantiator_ctx.get_port_handle(instantiator_port_id); + let instantiator_port = instantiator_ctx.get_port(instantiator_port_handle); + + // Check if conditions for calling this function are valid + debug_assert!(!instantiator_port.state.is_blocked_due_to_port_change()); + debug_assert_eq!(instantiator_port.peer_comp_id, instantiator_ctx.id); + + // Create port at new component + let created_port_handle = created_ctx.add_port( + instantiator_port.peer_comp_id, instantiator_port.peer_port_id, + instantiator_port.kind, instantiator_port.state + ); + let created_port = created_ctx.get_port(created_port_handle); + let created_port_id = created_port.self_id; + + // Store in port pairs + let is_open = instantiator_port.state.is_open(); + port_pairs.push(PortPair{ + instantiator_id: instantiator_port_id, + instantiator_handle: instantiator_port_handle, + created_id: created_port_id, + created_handle: created_port_handle, + is_open, + }); + } + + // Set peer of the port for the new component + for pair in port_pairs.iter() { + let instantiator_port_info = instantiator_ctx.get_port(pair.instantiator_handle); + let created_port_info = created_ctx.get_port_mut(pair.created_handle); + + // Note: we checked above (in debug mode) that the peer of the port is + // owned by the creator as well, now check if the peer is transferred + // as well. + let created_port_peer_index = port_pairs.iter() + .position(|v| v.instantiator_id == instantiator_port_info.peer_port_id); + + match created_port_peer_index { + Some(created_port_peer_index) => { + // Both ends of the channel are moving to the new component + let peer_pair = &port_pairs[created_port_peer_index]; + created_port_info.peer_port_id = peer_pair.created_id; + created_port_info.peer_comp_id = reservation.id(); + }, + None => { + created_port_info.peer_comp_id = instantiator_ctx.id; + if pair.is_open { + created_ctx.change_port_peer(sched_ctx, pair.created_handle, Some(instantiator_ctx.id)); + } + } + } + } + + // Store component in runtime storage and retrieve component fields in their + // name memory location + let (created_key, created_runtime_component) = sched_ctx.runtime.finish_create_component( + reservation, component_instance, created_ctx, false + ); + + let created_ctx = &mut created_runtime_component.ctx; + let created_component = &mut created_runtime_component.component; + created_component.on_creation(created_key.downgrade(), sched_ctx); + + // Transfer messages and link instantiator to created component + for pair in port_pairs.iter() { + instantiator_ctx.change_port_peer(sched_ctx, pair.instantiator_handle, None); + transfer_messages(inbox_main, inbox_backup, pair, instantiator_ctx, created_ctx, created_component.as_mut()); + instantiator_ctx.remove_port(pair.instantiator_handle); + + let created_port_info = created_ctx.get_port(pair.created_handle); + if pair.is_open && created_port_info.peer_comp_id == instantiator_ctx.id { + // Set up channel between instantiator component port, and its peer, + // which is owned by the new component + let instantiator_port_handle = instantiator_ctx.get_port_handle(created_port_info.peer_port_id); + let instantiator_port_info = instantiator_ctx.get_port_mut(instantiator_port_handle); + instantiator_port_info.peer_port_id = created_port_info.self_id; + instantiator_ctx.change_port_peer(sched_ctx, instantiator_port_handle, Some(created_ctx.id)); + } + } + // By definition we did not have any remote peers for the transferred ports, + // so we can schedule the new component immediately + sched_ctx.runtime.enqueue_work(created_key); +} + +/// Puts the component in an execution state where the specified component will +/// end up being created. The component goes through state changes (driven by +/// incoming control messages) to make sure that all of the ports that are going +/// to be transferred are not in a blocked state. Once finished the component +/// returns to the `NonSync` mode. pub(crate) fn default_start_create_component( exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, control: &mut ControlLayer, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup, @@ -851,15 +957,6 @@ pub(crate) fn perform_create_component( exec_state: &mut CompExecState, sched_ctx: &SchedulerCtx, instantiator_ctx: &mut CompCtx, control: &mut ControlLayer, inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup ) { - // Small internal utilities - struct PortPair { - instantiator_id: PortId, - instantiator_handle: LocalPortHandle, - created_id: PortId, - created_handle: LocalPortHandle, - is_open: bool, - } - // Retrieve ports from the arguments debug_assert_eq!(exec_state.mode, CompMode::NewComponentBlocked); @@ -978,30 +1075,10 @@ pub(crate) fn perform_create_component( for pair in port_pairs.iter() { // Transferring the messages and removing the port from the // instantiator component - let instantiator_port_index = instantiator_ctx.get_port_index(pair.instantiator_handle); instantiator_ctx.change_port_peer(sched_ctx, pair.instantiator_handle, None); + transfer_messages(inbox_main, inbox_backup, pair, instantiator_ctx, created_ctx, created_component.as_mut()); instantiator_ctx.remove_port(pair.instantiator_handle); - if let Some(mut message) = inbox_main[instantiator_port_index].take() { - message.data_header.target_port = pair.created_id; - created_component.adopt_message(created_ctx, message); - } - - let mut message_index = 0; - while message_index < inbox_backup.len() { - let message = &inbox_backup[message_index]; - if message.data_header.target_port == pair.instantiator_id { - // Transfer the message - let mut message = inbox_backup.remove(message_index); - message.data_header.target_port = pair.created_id; - created_component.adopt_message(created_ctx, message); - } else { - // Message does not belong to the port pair that we're - // transferring to the new component. - message_index += 1; - } - } - // Here we take care of the case where the instantiator previously owned // both ends of the channel, but has transferred one port to the new // component (hence creating a channel between the instantiator @@ -1047,6 +1124,24 @@ pub(crate) fn perform_create_component( exec_state.mode_component = (ProcedureDefinitionId::new_invalid(), TypeId::new_invalid()); } +#[inline] +pub(crate) fn default_handle_exit(_exec_state: &CompExecState) -> CompScheduling { + debug_assert_eq!(_exec_state.mode, CompMode::Exit); + return CompScheduling::Exit; +} + +// ----------------------------------------------------------------------------- +// Internal messaging/state utilities +// ----------------------------------------------------------------------------- + +struct PortPair { + instantiator_id: PortId, + instantiator_handle: LocalPortHandle, + created_id: PortId, + created_handle: LocalPortHandle, + is_open: bool, +} + pub(crate) fn ports_not_blocked(comp_ctx: &CompCtx, ports: &EncounteredPorts) -> bool { for (_port_locations, port_id) in ports { let port_handle = comp_ctx.get_port_handle(*port_id); @@ -1060,6 +1155,32 @@ pub(crate) fn ports_not_blocked(comp_ctx: &CompCtx, ports: &EncounteredPorts) -> return true; } +fn transfer_messages( + inbox_main: &mut InboxMain, inbox_backup: &mut InboxBackup, port_pair: &PortPair, + instantiator_ctx: &mut CompCtx, created_ctx: &mut CompCtx, created_component: &mut dyn Component +) { + let instantiator_port_index = instantiator_ctx.get_port_index(port_pair.instantiator_handle); + if let Some(mut message) = inbox_main.remove(instantiator_port_index) { + message.data_header.target_port = port_pair.created_id; + created_component.adopt_message(created_ctx, message); + } + + let mut message_index = 0; + while message_index < inbox_backup.len() { + let message = &inbox_backup[message_index]; + if message.data_header.target_port == port_pair.instantiator_id { + // Transfer the message + let mut message = inbox_backup.remove(message_index); + message.data_header.target_port = port_pair.created_id; + created_component.adopt_message(created_ctx, message); + } else { + // Message does not belong to the port pair that we're + // transferring to the new component. + message_index += 1; + } + } +} + /// Performs the default action of printing the provided error, and then putting /// the component in the state where it will shut down. Only to be used for /// builtin components: their error message construction is simpler (and more @@ -1080,16 +1201,6 @@ pub(crate) fn default_handle_error_for_builtin( exec_state.set_as_start_exit(exit_reason); } -#[inline] -pub(crate) fn default_handle_exit(_exec_state: &CompExecState) -> CompScheduling { - debug_assert_eq!(_exec_state.mode, CompMode::Exit); - return CompScheduling::Exit; -} - -// ----------------------------------------------------------------------------- -// Internal messaging/state utilities -// ----------------------------------------------------------------------------- - /// Sends a message without any transmitted ports. Does not check if sending /// is actually valid. fn send_message_without_ports( @@ -1180,7 +1291,6 @@ fn perform_send_message_with_ports_notify_peers( // Block the peer of the port let message = control.create_port_transfer_message(unblock_put_control_id, comp_ctx.id, peer_port_id); - println!("DEBUG: Port transfer message\nControl ID: {:?}\nMessage: {:?}", unblock_put_control_id, message); let peer_handle = comp_ctx.get_peer_handle(peer_comp_id); let peer_info = comp_ctx.get_peer(peer_handle); @@ -1294,7 +1404,6 @@ fn default_handle_ack( AckAction::UnblockPutWithPorts => { // Send the message (containing ports) stored in the component // execution state to the recipient - println!("DEBUG: Unblocking put with ports"); debug_assert_eq!(exec_state.mode, CompMode::PutPortsBlockedAwaitingAcks); exec_state.mode = CompMode::PutPortsBlockedSendingPort; let port_handle = comp_ctx.get_port_handle(exec_state.mode_port); diff --git a/src/runtime2/component/component_internet.rs b/src/runtime2/component/component_internet.rs index 4d508f2545e458c5747fd9fdd2eed7fe9d269b92..d40466dfd2406d124b09c13d4c5f91e6f5743b8a 100644 --- a/src/runtime2/component/component_internet.rs +++ b/src/runtime2/component/component_internet.rs @@ -8,18 +8,26 @@ use super::component::{self, *}; use super::control_layer::*; use super::consensus::*; + use std::io::ErrorKind as IoErrorKind; +use std::net::{IpAddr, Ipv4Addr}; +use crate::protocol::{ProcedureDefinitionId, TypeId}; + +// ----------------------------------------------------------------------------- +// ComponentTcpClient +// ----------------------------------------------------------------------------- -enum SocketState { +enum ClientSocketState { Connected(SocketTcpClient), + ErrorReported(String), Error, } -impl SocketState { +impl ClientSocketState { fn get_socket(&self) -> &SocketTcpClient { match self { - SocketState::Connected(v) => v, - SocketState::Error => unreachable!(), + ClientSocketState::Connected(v) => v, + ClientSocketState::ErrorReported(_) | ClientSocketState::Error => unreachable!(), } } } @@ -28,7 +36,7 @@ impl SocketState { /// TCP component (i.e. from the point of view of attempting to interface with /// a socket). #[derive(PartialEq, Debug)] -enum SyncState { +enum ClientSyncState { AwaitingCmd, Getting, Putting, @@ -38,13 +46,14 @@ enum SyncState { pub struct ComponentTcpClient { // Properties for the tcp socket - socket_state: SocketState, - sync_state: SyncState, + socket_state: ClientSocketState, + sync_state: ClientSyncState, poll_ticket: Option, inbox_main: InboxMain, inbox_backup: InboxBackup, pdl_input_port_id: PortId, // input from PDL, so transmitted over socket pdl_output_port_id: PortId, // output towards PDL, so received over socket + // Information about union tags, extracted from PDL input_union_send_tag_value: i64, input_union_receive_tag_value: i64, input_union_finish_tag_value: i64, @@ -61,8 +70,8 @@ impl Component for ComponentTcpClient { fn on_creation(&mut self, id: CompId, sched_ctx: &SchedulerCtx) { // Retrieve type information for messages we're going to receive let pd = &sched_ctx.runtime.protocol; - let cmd_type = pd.find_type(b"std.internet", b"Cmd") - .expect("'Cmd' type in the 'std.internet' module"); + let cmd_type = pd.find_type(b"std.internet", b"ClientCmd") + .expect("'ClientCmd' type in the 'std.internet' module"); let cmd_type = cmd_type .as_union(); @@ -72,10 +81,10 @@ impl Component for ComponentTcpClient { self.input_union_shutdown_tag_value = cmd_type.get_variant_tag_value(b"Shutdown").unwrap(); // Register socket for async events - if let SocketState::Connected(socket) = &self.socket_state { + if let ClientSocketState::Connected(socket) = &self.socket_state { let self_handle = sched_ctx.runtime.get_component_public(id); let poll_ticket = sched_ctx.polling.register(socket, self_handle, true, true) - .expect("registering tcp component"); + .expect("registering tcp client"); debug_assert!(self.poll_ticket.is_none()); self.poll_ticket = Some(poll_ticket); @@ -116,7 +125,7 @@ impl Component for ComponentTcpClient { } }, Message::Poll => { - sched_ctx.info("Received polling event"); + sched_ctx.debug("Received polling event"); }, } } @@ -135,32 +144,37 @@ impl Component for ComponentTcpClient { }, CompMode::NonSync => { // When in non-sync mode - match &mut self.socket_state { - SocketState::Connected(_socket) => { - if self.sync_state == SyncState::FinishSyncThenQuit { + match &self.socket_state { + ClientSocketState::Connected(_socket) => { + if self.sync_state == ClientSyncState::FinishSyncThenQuit { // Previous request was to let the component shut down self.exec_state.set_as_start_exit(ExitReason::Termination); } else { // Reset for a new request - self.sync_state = SyncState::AwaitingCmd; + self.sync_state = ClientSyncState::AwaitingCmd; component::default_handle_sync_start( &mut self.exec_state, &mut self.inbox_main, sched_ctx, comp_ctx, &mut self.consensus ); } return CompScheduling::Immediate; }, - SocketState::Error => { - // Could potentially send an error message to the - // connected component. - self.exec_state.set_as_start_exit(ExitReason::ErrorNonSync); + ClientSocketState::ErrorReported(message) => { + component::default_handle_error_for_builtin( + &mut self.exec_state, sched_ctx, + (PortInstruction::NoSource, format!("failed socket creation, reason: {}", message)) + ); + self.socket_state = ClientSocketState::Error; return CompScheduling::Immediate; } + ClientSocketState::Error => { + return CompScheduling::Sleep; + } } }, CompMode::Sync => { // When in sync mode: wait for a command to come in match self.sync_state { - SyncState::AwaitingCmd => { + ClientSyncState::AwaitingCmd => { match component::default_attempt_get( &mut self.exec_state, self.pdl_input_port_id, PortInstruction::NoSource, &mut self.inbox_main, &mut self.inbox_backup, sched_ctx, comp_ctx, @@ -180,16 +194,16 @@ impl Component for ComponentTcpClient { self.byte_buffer.push(value.as_uint8()); } - self.sync_state = SyncState::Putting; + self.sync_state = ClientSyncState::Putting; } else if tag_value == self.input_union_receive_tag_value { // Component requires a `recv` - self.sync_state = SyncState::Getting; + self.sync_state = ClientSyncState::Getting; } else if tag_value == self.input_union_finish_tag_value { // Component requires us to end the sync round - self.sync_state = SyncState::FinishSync; + self.sync_state = ClientSyncState::FinishSync; } else if tag_value == self.input_union_shutdown_tag_value { // Component wants to close the connection - self.sync_state = SyncState::FinishSyncThenQuit; + self.sync_state = ClientSyncState::FinishSyncThenQuit; } else { unreachable!("got tag_value {}", tag_value) } @@ -205,7 +219,7 @@ impl Component for ComponentTcpClient { } } }, - SyncState::Putting => { + ClientSyncState::Putting => { // We're supposed to send a user-supplied message fully // over the socket. But we might end up blocking. In // that case the component goes to sleep until it is @@ -220,7 +234,11 @@ impl Component for ComponentTcpClient { if err.kind() == IoErrorKind::WouldBlock { return CompScheduling::Sleep; // wait until notified } else { - todo!("handle socket.send error {:?}", err) + component::default_handle_error_for_builtin( + &mut self.exec_state, sched_ctx, + (PortInstruction::NoSource, format!("failed sending on socket, reason: {}", err)) + ); + return CompScheduling::Immediate; } } } @@ -231,7 +249,7 @@ impl Component for ComponentTcpClient { component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus); return CompScheduling::Requeue; }, - SyncState::Getting => { + ClientSyncState::Getting => { // We're going to try and receive a single message. If // this causes us to end up blocking the component // goes to sleep until it is polled. @@ -253,7 +271,7 @@ impl Component for ComponentTcpClient { return CompScheduling::Immediate; } else { let scheduling = send_result.unwrap(); - self.sync_state = SyncState::AwaitingCmd; + self.sync_state = ClientSyncState::AwaitingCmd; return scheduling; } }, @@ -261,12 +279,16 @@ impl Component for ComponentTcpClient { if err.kind() == IoErrorKind::WouldBlock { return CompScheduling::Sleep; // wait until polled } else { - todo!("handle socket.receive error {:?}", err) + component::default_handle_error_for_builtin( + &mut self.exec_state, sched_ctx, + (PortInstruction::NoSource, format!("failed receiving from socket, reason: {}", err)) + ); + return CompScheduling::Immediate; } } } }, - SyncState::FinishSync | SyncState::FinishSyncThenQuit => { + ClientSyncState::FinishSync | ClientSyncState::FinishSyncThenQuit => { component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus); return CompScheduling::Requeue; }, @@ -274,7 +296,7 @@ impl Component for ComponentTcpClient { }, CompMode::BlockedGet => { // Entered when awaiting a new command - debug_assert_eq!(self.sync_state, SyncState::AwaitingCmd); + debug_assert_eq!(self.sync_state, ClientSyncState::AwaitingCmd); return CompScheduling::Sleep; }, CompMode::SyncEnd | CompMode::BlockedPut => @@ -291,35 +313,66 @@ impl Component for ComponentTcpClient { impl ComponentTcpClient { pub(crate) fn new(arguments: ValueGroup) -> Self { - use std::net::{IpAddr, Ipv4Addr}; + fn client_socket_state_from_result(result: Result) -> ClientSocketState { + match result { + Ok(socket) => ClientSocketState::Connected(socket), + Err(error) => ClientSocketState::ErrorReported(format!("Failed to create socket, reason: {:?}", error)), + } + } - debug_assert_eq!(arguments.values.len(), 4); + // Two possible cases here: if the number of arguments is 3, then we + // get: (socket_handle, input_port, output_port). If the number of + // arguments is 4, then we get: (ip, port, input_port, output_port). + assert!(arguments.values.len() == 3 || arguments.values.len() == 4); // Parsing arguments - let ip_heap_pos = arguments.values[0].as_array(); - let ip_elements = &arguments.regions[ip_heap_pos as usize]; - if ip_elements.len() != 4 { - todo!("friendly error reporting: ip contains 4 octects"); - } - let ip_address = IpAddr::V4(Ipv4Addr::new( - ip_elements[0].as_uint8(), ip_elements[1].as_uint8(), - ip_elements[2].as_uint8(), ip_elements[3].as_uint8() - )); + let (socket_state, input_port, output_port) = if arguments.values.len() == 3 { + let socket_handle = arguments.values[0].as_sint32(); + let socket = SocketTcpClient::new_from_handle(socket_handle); + let socket_state = client_socket_state_from_result(socket); - let port = arguments.values[1].as_uint16(); - let input_port = component::port_id_from_eval(arguments.values[2].as_input()); - let output_port = component::port_id_from_eval(arguments.values[3].as_output()); + let input_port = component::port_id_from_eval(arguments.values[1].as_input()); + let output_port = component::port_id_from_eval(arguments.values[2].as_output()); + + (socket_state, input_port, output_port) + } else { + let input_port = component::port_id_from_eval(arguments.values[2].as_input()); + let output_port = component::port_id_from_eval(arguments.values[3].as_output()); - let socket = SocketTcpClient::new(ip_address, port); - if let Err(socket) = socket { - todo!("friendly error reporting: failed to open socket (reason: {:?})", socket); + let ip_and_port = ip_addr_and_port_from_args(&arguments, 0, 1); + let socket_state = match ip_and_port { + Ok((ip_address, port)) => client_socket_state_from_result(SocketTcpClient::new(ip_address, port)), + Err(message) => ClientSocketState::ErrorReported(message), + }; + + (socket_state, input_port, output_port) + }; + + return Self{ + socket_state, + sync_state: ClientSyncState::AwaitingCmd, + poll_ticket: None, + inbox_main: vec![None, None], + inbox_backup: Vec::new(), + input_union_send_tag_value: -1, + input_union_receive_tag_value: -1, + input_union_finish_tag_value: -1, + input_union_shutdown_tag_value: -1, + pdl_input_port_id: input_port, + pdl_output_port_id: output_port, + exec_state: CompExecState::new(), + control: ControlLayer::default(), + consensus: Consensus::new(), + byte_buffer: Vec::new(), } + } + pub(crate) fn new_with_existing_connection(socket: SocketTcpClient, input_port: PortId, output_port: PortId) -> Self { return Self{ - socket_state: SocketState::Connected(socket.unwrap()), - sync_state: SyncState::AwaitingCmd, + socket_state: ClientSocketState::Connected(socket), + sync_state: ClientSyncState::AwaitingCmd, poll_ticket: None, - inbox_main: vec![None], + inbox_main: vec![None, None], inbox_backup: Vec::new(), input_union_send_tag_value: -1, input_union_receive_tag_value: -1, @@ -379,4 +432,383 @@ impl ComponentTcpClient { return value_group; } -} \ No newline at end of file +} + +// ----------------------------------------------------------------------------- +// ComponentTcpListener +// ----------------------------------------------------------------------------- + +enum ListenerSocketState { + Connected(SocketTcpListener), + ErrorReported(String), + Error, +} + +impl ListenerSocketState { + fn get_socket(&self) -> &SocketTcpListener { + match self { + ListenerSocketState::Connected(v) => return v, + ListenerSocketState::ErrorReported(_) | ListenerSocketState::Error => unreachable!(), + } + } +} + +struct PendingComponent { + client: i32, // OS socket handle + cmd_rx: PortId, + data_tx: PortId, +} + +enum ListenerSyncState { + AwaitingCmd, + AcceptCommandReceived, // just received `Accept` command + AcceptChannelGenerated, // created channel, waiting to end the sync round + AcceptGenerateComponent, // sync ended, back in non-sync, now generate component + FinishSyncThenQuit, +} + +pub struct ComponentTcpListener { + // Properties for the tcp socket + socket_state: ListenerSocketState, + sync_state: ListenerSyncState, + pending_component: Option, + poll_ticket: Option, + inbox_main: InboxMain, + inbox_backup: InboxBackup, + pdl_input_port_id: PortId, // input port, receives commands + pdl_output_port_id: PortId, // output port, sends connections + // Type information extracted from protocol + tcp_client_definition: (ProcedureDefinitionId, TypeId), + input_union_accept_tag: i64, + input_union_shutdown_tag: i64, + output_struct_rx_index: usize, + output_struct_tx_index: usize, + // Generic component state + exec_state: CompExecState, + control: ControlLayer, + consensus: Consensus, +} + +impl Component for ComponentTcpListener { + fn on_creation(&mut self, id: CompId, sched_ctx: &SchedulerCtx) { + // Retrieve type information for the message with ports we're going to send + let pd = &sched_ctx.runtime.protocol; + + self.tcp_client_definition = sched_ctx.runtime.protocol.find_procedure(b"std.internet", b"tcp_client") + .expect("'tcp_client' component in the 'std.internet' module"); + + let cmd_type = pd.find_type(b"std.internet", b"ListenerCmd") + .expect("'ListenerCmd' type in the 'std.internet' module"); + let cmd_type = cmd_type.as_union(); + + self.input_union_accept_tag = cmd_type.get_variant_tag_value(b"Accept").unwrap(); + self.input_union_shutdown_tag = cmd_type.get_variant_tag_value(b"Shutdown").unwrap(); + + let conn_type = pd.find_type(b"std.internet", b"TcpConnection") + .expect("'TcpConnection' type in the 'std.internet' module"); + let conn_type = conn_type.as_struct(); + + assert_eq!(conn_type.get_num_struct_fields(), 2); + self.output_struct_rx_index = conn_type.get_struct_field_index(b"rx").unwrap(); + self.output_struct_tx_index = conn_type.get_struct_field_index(b"tx").unwrap(); + + // Register socket for async events + if let ListenerSocketState::Connected(socket) = &self.socket_state { + let self_handle = sched_ctx.runtime.get_component_public(id); + let poll_ticket = sched_ctx.polling.register(socket, self_handle, true, false) + .expect("registering tcp listener"); + + debug_assert!(self.poll_ticket.is_none()); + self.poll_ticket = Some(poll_ticket); + } + } + + fn on_shutdown(&mut self, sched_ctx: &SchedulerCtx) { + if let Some(poll_ticket) = self.poll_ticket.take() { + sched_ctx.polling.unregister(poll_ticket); + } + } + + fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, _message: DataMessage) { + unreachable!(); + } + + fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) { + match message { + Message::Data(message) => { + self.handle_incoming_data_message(sched_ctx, comp_ctx, message); + }, + Message::Sync(message) => { + let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message); + component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, comp_ctx, decision, &mut self.consensus); + }, + Message::Control(message) => { + if let Err(location_and_message) = component::default_handle_control_message( + &mut self.exec_state, &mut self.control, &mut self.consensus, + message, sched_ctx, comp_ctx, &mut self.inbox_main, &mut self.inbox_backup + ) { + component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message); + } + }, + Message::Poll => { + sched_ctx.debug("Received polling event"); + }, + } + } + + fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling { + sched_ctx.info(&format!("Running component ComponentTcpListener (mode: {:?})", self.exec_state.mode)); + + match self.exec_state.mode { + CompMode::BlockedSelect + => unreachable!(), + CompMode::PutPortsBlockedTransferredPorts | + CompMode::PutPortsBlockedAwaitingAcks | + CompMode::PutPortsBlockedSendingPort | + CompMode::NewComponentBlocked + => return CompScheduling::Sleep, + CompMode::NonSync => { + match &self.socket_state { + ListenerSocketState::Connected(_socket) => { + match self.sync_state { + ListenerSyncState::AwaitingCmd => { + component::default_handle_sync_start( + &mut self.exec_state, &mut self.inbox_main, sched_ctx, comp_ctx, &mut self.consensus + ); + }, + ListenerSyncState::AcceptCommandReceived | + ListenerSyncState::AcceptChannelGenerated => unreachable!(), + ListenerSyncState::AcceptGenerateComponent => { + // Now that we're outside the sync round, create the tcp client + // component + let pending = self.pending_component.take().unwrap(); + + let arguments = ValueGroup::new_stack(vec![ + Value::SInt32(pending.client), + Value::Input(port_id_to_eval(pending.cmd_rx)), + Value::Output(port_id_to_eval(pending.data_tx)), + ]); + component::default_start_create_component( + &mut self.exec_state, sched_ctx, comp_ctx, &mut self.control, + &mut self.inbox_main, &mut self.inbox_backup, + self.tcp_client_definition.0, self.tcp_client_definition.1, + arguments + ); + self.sync_state = ListenerSyncState::AwaitingCmd; + }, + ListenerSyncState::FinishSyncThenQuit => { + self.exec_state.set_as_start_exit(ExitReason::Termination); + }, + } + + return CompScheduling::Immediate; + }, + ListenerSocketState::ErrorReported(message) => { + component::default_handle_error_for_builtin( + &mut self.exec_state, sched_ctx, + (PortInstruction::NoSource, message.clone()) + ); + self.socket_state = ListenerSocketState::Error; + return CompScheduling::Immediate; + } + ListenerSocketState::Error => { + return CompScheduling::Sleep; + } + } + }, + CompMode::Sync => { + match self.sync_state { + ListenerSyncState::AwaitingCmd => { + match component::default_attempt_get( + &mut self.exec_state, self.pdl_input_port_id, PortInstruction::NoSource, + &mut self.inbox_main, &mut self.inbox_backup, sched_ctx, comp_ctx, + &mut self.control, &mut self.consensus + ) { + GetResult::Received(message) => { + let (tag_value, _) = message.content.values[0].as_union(); + if tag_value == self.input_union_accept_tag { + self.sync_state = ListenerSyncState::AcceptCommandReceived; + } else if tag_value == self.input_union_shutdown_tag { + self.sync_state = ListenerSyncState::FinishSyncThenQuit; + } else { + unreachable!("got tag_value {}", tag_value); + } + + return CompScheduling::Immediate; + }, + GetResult::NoMessage => { + return CompScheduling::Sleep; + }, + GetResult::Error(location_and_message) => { + component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message); + return CompScheduling::Immediate; + } + } + }, + ListenerSyncState::AcceptCommandReceived => { + let socket = self.socket_state.get_socket(); + match socket.accept() { + Ok(client_handle) => { + // Create the channels (and the inbox entries, to stay consistent + // with the expectations from the `component` module's functions) + let cmd_channel = comp_ctx.create_channel(); + let data_channel = comp_ctx.create_channel(); + + let port_ids = [ + cmd_channel.putter_id, cmd_channel.getter_id, + data_channel.putter_id, data_channel.getter_id, + ]; + for port_id in port_ids { + let expected_port_index = self.inbox_main.len(); + let port_handle = comp_ctx.get_port_handle(port_id); + self.inbox_main.push(None); + self.consensus.notify_of_new_port(expected_port_index, port_handle, comp_ctx); + } + + // Construct the message containing the appropriate ports that will + // be sent to the component commanding this listener. + let mut values = ValueGroup::new_stack(Vec::with_capacity(1)); + values.values.push(Value::Struct(0)); + values.regions.push(vec![Value::Unassigned, Value::Unassigned]); + values.regions[0][self.output_struct_tx_index] = Value::Output(port_id_to_eval(cmd_channel.putter_id)); + values.regions[0][self.output_struct_rx_index] = Value::Input(port_id_to_eval(data_channel.getter_id)); + if let Err(location_and_message) = component::default_send_data_message( + &mut self.exec_state, self.pdl_output_port_id, PortInstruction::NoSource, values, + sched_ctx, &mut self.consensus, &mut self.control, comp_ctx + ) { + component::default_handle_error_for_builtin( + &mut self.exec_state, sched_ctx, location_and_message + ); + } + + // Prepare for finishing the consensus round, and once finished, + // create the tcp client component + self.sync_state = ListenerSyncState::AcceptChannelGenerated; + debug_assert!(self.pending_component.is_none()); + self.pending_component = Some(PendingComponent{ + client: client_handle, + cmd_rx: cmd_channel.getter_id, + data_tx: data_channel.putter_id + }); + + return CompScheduling::Requeue; + }, + Err(err) => { + if err.kind() == IoErrorKind::WouldBlock { + return CompScheduling::Sleep; + } else { + component::default_handle_error_for_builtin( + &mut self.exec_state, sched_ctx, + (PortInstruction::NoSource, format!("failed to listen on socket, reason: {}", err)) + ); + return CompScheduling::Immediate; + } + } + } + }, + ListenerSyncState::AcceptChannelGenerated => { + component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus); + self.sync_state = ListenerSyncState::AcceptGenerateComponent; + return CompScheduling::Requeue; + } + ListenerSyncState::FinishSyncThenQuit => { + component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus); + return CompScheduling::Requeue; + }, + ListenerSyncState::AcceptGenerateComponent => unreachable!(), + } + }, + CompMode::BlockedGet => { + return CompScheduling::Sleep; + }, + CompMode::SyncEnd | CompMode::BlockedPut + => return CompScheduling::Sleep, + CompMode::StartExit => + return component::default_handle_start_exit(&mut self.exec_state, &mut self.control, sched_ctx, comp_ctx, &mut self.consensus), + CompMode::BusyExit => + return component::default_handle_busy_exit(&mut self.exec_state, &mut self.control, sched_ctx), + CompMode::Exit => + return component::default_handle_exit(&self.exec_state), + } + } +} + +impl ComponentTcpListener { + pub(crate) fn new(arguments: ValueGroup) -> Self { + debug_assert_eq!(arguments.values.len(), 4); + + // Parsing arguments + let input_port = component::port_id_from_eval(arguments.values[2].as_input()); + let output_port = component::port_id_from_eval(arguments.values[3].as_output()); + + let socket_state = match ip_addr_and_port_from_args(&arguments, 0, 1) { + Ok((ip_address, port)) => { + let socket = SocketTcpListener::new(ip_address, port); + match socket { + Ok(socket) => ListenerSocketState::Connected(socket), + Err(err) => ListenerSocketState::ErrorReported(format!("failed to create listener socket, reason: {:?}", err), ) + } + }, + Err(message) => ListenerSocketState::ErrorReported(message), + }; + + return Self { + socket_state, + sync_state: ListenerSyncState::AwaitingCmd, + pending_component: None, + poll_ticket: None, + inbox_main: vec![None, None], + inbox_backup: InboxBackup::new(), + pdl_input_port_id: input_port, + pdl_output_port_id: output_port, + tcp_client_definition: (ProcedureDefinitionId::new_invalid(), TypeId::new_invalid()), + input_union_accept_tag: -1, + input_union_shutdown_tag: -1, + output_struct_tx_index: 0, + output_struct_rx_index: 0, + exec_state: CompExecState::new(), + control: ControlLayer::default(), + consensus: Consensus::new(), + } + } + + fn handle_incoming_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: DataMessage) { + if self.exec_state.mode.is_in_sync_block() { + self.consensus.handle_incoming_data_message(comp_ctx, &message); + } + + match component::default_handle_incoming_data_message( + &mut self.exec_state, &mut self.inbox_main, comp_ctx, message, sched_ctx, &mut self.control + ) { + IncomingData::PlacedInSlot => {}, + IncomingData::SlotFull(message) => { + self.inbox_backup.push(message); + } + } + } +} + +fn ip_addr_and_port_from_args( + arguments: &ValueGroup, ip_index: usize, port_index: usize +) -> Result<(IpAddr, u16), String> { + debug_assert!(ip_index < arguments.values.len()); + debug_assert!(port_index < arguments.values.len()); + + // Parsing IP address + let ip_heap_pos = arguments.values[0].as_array(); + let ip_elements = &arguments.regions[ip_heap_pos as usize]; + + let ip_address = match ip_elements.len() { + 0 => IpAddr::V4(Ipv4Addr::UNSPECIFIED), + 4 => IpAddr::V4(Ipv4Addr::new( + ip_elements[0].as_uint8(), ip_elements[1].as_uint8(), + ip_elements[2].as_uint8(), ip_elements[3].as_uint8() + )), + _ => return Err(format!("Expected 0 or 4 elements in the IP address, got {}", ip_elements.len())), + }; + + let port = arguments.values[port_index].as_uint16(); + + return Ok((ip_address, port)); +} + diff --git a/src/runtime2/component/consensus.rs b/src/runtime2/component/consensus.rs index dc11dd2c27b065ab112be629ac1ce3d31144a575..05856ffc3820837f251057c98e963c98eb51ddf4 100644 --- a/src/runtime2/component/consensus.rs +++ b/src/runtime2/component/consensus.rs @@ -355,7 +355,7 @@ impl Consensus { self.solution.clear(); } - pub(crate) fn notify_received_port(&mut self, _expected_index: usize, port_handle: LocalPortHandle, comp_ctx: &CompCtx) { + pub(crate) fn notify_of_new_port(&mut self, _expected_index: usize, port_handle: LocalPortHandle, comp_ctx: &CompCtx) { debug_assert_eq!(_expected_index, self.ports.len()); let port_info = comp_ctx.get_port(port_handle); self.ports.push(PortAnnotation{ diff --git a/src/runtime2/poll/mod.rs b/src/runtime2/poll/mod.rs index 86225ca65fe06aead14e60095c4592e216bf3638..097bfb83d707e17cf517c655598a7c25c546a86d 100644 --- a/src/runtime2/poll/mod.rs +++ b/src/runtime2/poll/mod.rs @@ -260,7 +260,7 @@ impl PollingThread { } fn log(&self, message: &str) { - if self.log_level >= LogLevel::Info { + if LogLevel::Info >= self.log_level { println!("[polling] {}", message); } } diff --git a/src/runtime2/stdlib/internet.rs b/src/runtime2/stdlib/internet.rs index 19235bcc6ba9280a75aa6770ad44135da2f89485..d32e849a1b76464ae208cb9e11c18789b56e5ea9 100644 --- a/src/runtime2/stdlib/internet.rs +++ b/src/runtime2/stdlib/internet.rs @@ -26,7 +26,9 @@ enum SocketState { Listening, } -/// TCP connection +const SOCKET_BLOCKING: bool = false; + +/// TCP (client) connection pub struct SocketTcpClient { socket_handle: libc::c_int, is_blocking: bool, @@ -34,19 +36,31 @@ pub struct SocketTcpClient { impl SocketTcpClient { pub fn new(ip: IpAddr, port: u16) -> Result { - const BLOCKING: bool = false; let socket_handle = create_and_connect_socket( libc::SOCK_STREAM, libc::IPPROTO_TCP, ip, port )?; - if !set_socket_blocking(socket_handle, BLOCKING) { + if !set_socket_blocking(socket_handle, SOCKET_BLOCKING) { + unsafe{ libc::close(socket_handle); } + return Err(SocketError::Modifying); + } + + println!(" CREATE [{:04}] client", socket_handle); + return Ok(SocketTcpClient{ + socket_handle, + is_blocking: SOCKET_BLOCKING, + }) + } + + pub(crate) fn new_from_handle(socket_handle: libc::c_int) -> Result { + if !set_socket_blocking(socket_handle, SOCKET_BLOCKING) { unsafe{ libc::close(socket_handle); } return Err(SocketError::Modifying); } return Ok(SocketTcpClient{ socket_handle, - is_blocking: BLOCKING, + is_blocking: SOCKET_BLOCKING, }) } @@ -79,6 +93,7 @@ impl SocketTcpClient { impl Drop for SocketTcpClient { fn drop(&mut self) { + println!("DESTRUCT [{:04}] client", self.socket_handle); debug_assert!(self.socket_handle >= 0); unsafe{ close(self.socket_handle) }; } @@ -90,6 +105,67 @@ impl AsFileDescriptor for SocketTcpClient { } } +/// TCP listener. Yielding new connections +pub struct SocketTcpListener { + socket_handle: libc::c_int, + is_blocking: bool, +} + +impl SocketTcpListener { + pub fn new(ip: IpAddr, port: u16) -> Result { + // Create and bind + let socket_handle = create_and_bind_socket( + libc::SOCK_STREAM, libc::IPPROTO_TCP, ip, port + )?; + if !set_socket_blocking(socket_handle, SOCKET_BLOCKING) { + unsafe{ libc::close(socket_handle); } + return Err(SocketError::Modifying); + } + + // Listen + unsafe { + let result = listen(socket_handle, libc::SOMAXCONN); + if result < 0 { + unsafe{ libc::close(socket_handle); } + return Err(SocketError::Listening); + } + } + + + println!(" CREATE [{:04}] listener", socket_handle); + return Ok(SocketTcpListener{ + socket_handle, + is_blocking: SOCKET_BLOCKING, + }); + } + + pub fn accept(&self) -> Result { + let (mut address, mut address_size) = create_sockaddr_in_empty(); + let address_pointer = &mut address as *mut sockaddr_in; + let socket_handle = unsafe { accept(self.socket_handle, address_pointer.cast(), &mut address_size) }; + if socket_handle < 0 { + return Err(IoError::last_os_error()); + } + + println!(" CREATE [{:04}] client (from listener)", socket_handle); + return Ok(socket_handle); + } +} + +impl Drop for SocketTcpListener { + fn drop(&mut self) { + println!("DESTRUCT [{:04}] listener", self.socket_handle); + debug_assert!(self.socket_handle >= 0); + unsafe{ close(self.socket_handle) }; + } +} + +impl AsFileDescriptor for SocketTcpListener { + fn as_file_descriptor(&self) -> FileDescriptor { + return self.socket_handle; + } +} + /// Raw socket receiver. Essentially a listener that accepts a single connection struct SocketRawRx { listen_handle: c_int, @@ -235,6 +311,18 @@ fn create_and_connect_socket(socket_type: libc::c_int, protocol: libc::c_int, ip } } +#[inline] +fn create_sockaddr_in_empty() -> (sockaddr_in, libc::socklen_t) { + let socket_address = sockaddr_in{ + sin_family: 0, + sin_port: 0, + sin_addr: in_addr { s_addr: 0 }, + sin_zero: [0; 8], + }; + let address_size = size_of::(); + + return (socket_address, address_size as _); +} #[inline] fn create_sockaddr_in_v4(ip: Ipv4Addr, port: u16) -> (sockaddr_in, libc::socklen_t) { let address = unsafe{ @@ -316,4 +404,4 @@ fn socket_family_from_ip(ip: IpAddr) -> libc::c_int { #[inline] fn htons(port: u16) -> u16 { return port.to_be(); -} \ No newline at end of file +} diff --git a/src/runtime2/tests/error_handling.rs b/src/runtime2/tests/error_handling.rs index ed2632d8cda214f382c891410a3843ada855c689..202b90a85ca830aaf150dddfb2b26f655019cc11 100644 --- a/src/runtime2/tests/error_handling.rs +++ b/src/runtime2/tests/error_handling.rs @@ -3,7 +3,7 @@ use super::*; #[test] fn test_unconnected_component_error() { compile_and_create_component(" - primitive interact_with_noone() { + comp interact_with_noone() { u8[] array = { 5 }; auto value = array[1]; }", "interact_with_noone", no_args()); @@ -12,14 +12,14 @@ fn test_unconnected_component_error() { #[test] fn test_connected_uncommunicating_component_error() { compile_and_create_component(" - primitive crashing_and_burning(out unused) { + comp crashing_and_burning(out unused) { u8[] array = { 1337 }; auto value = array[1337]; } - primitive sitting_idly_waiting(in never_providing) { + comp sitting_idly_waiting(in never_providing) { sync auto a = get(never_providing); } - composite constructor() { + comp constructor() { // Test one way // channel a -> b; // new sitting_idly_waiting(b); @@ -35,17 +35,17 @@ fn test_connected_uncommunicating_component_error() { #[test] fn test_connected_communicating_component_error() { compile_and_create_component(" - primitive send_and_fail(out tx) { + comp send_and_fail(out tx) { u8[] array = {}; sync { put(tx, 0); array[0] = 5; } } - primitive receive_once(in rx) { + comp receive_once(in rx) { sync auto a = get(rx); } - composite constructor() { + comp constructor() { channel a -> b; new send_and_fail(a); new receive_once(b); @@ -60,12 +60,12 @@ fn test_connected_communicating_component_error() { #[test] fn test_failing_after_successful_sync() { compile_and_create_component(" - primitive put_and_fail(out tx) { sync put(tx, 1); u8 a = {}[0]; } - primitive get_and_fail(in rx) { sync auto a = get(rx); u8 a = {}[0]; } - primitive put_and_exit(out tx) { sync put(tx, 2); } - primitive get_and_exit(in rx) { sync auto a = get(rx); } + comp put_and_fail(out tx) { sync put(tx, 1); u8 a = {}[0]; } + comp get_and_fail(in rx) { sync auto a = get(rx); u8 a = {}[0]; } + comp put_and_exit(out tx) { sync put(tx, 2); } + comp get_and_exit(in rx) { sync auto a = get(rx); } - composite constructor() { + comp constructor() { { channel a -> b; new put_and_fail(a); diff --git a/src/runtime2/tests/internet.rs b/src/runtime2/tests/internet.rs new file mode 100644 index 0000000000000000000000000000000000000000..6cb03b954005b551198ecc3add8df9f94a6e9786 --- /dev/null +++ b/src/runtime2/tests/internet.rs @@ -0,0 +1,196 @@ +use super::*; + +// silly test to make sure that the PDL will never be an issue when doing TCP +// stuff with the actual components +#[test] +fn test_stdlib_file() { + compile_and_create_component(" + import std.internet as inet; + + comp fake_listener_once(out tx) { + channel cmd_tx -> cmd_rx; + channel data_tx -> data_rx; + new fake_socket(cmd_rx, data_tx); + sync put(tx, inet::TcpConnection{ + tx: cmd_tx, + rx: data_rx, + }); + } + + comp fake_socket(in cmds, out tx) { + auto to_send = {}; + + auto shutdown = false; + while (!shutdown) { + auto keep_going = true; + sync { + while (keep_going) { + auto cmd = get(cmds); + if (let inet::ClientCmd::Send(data) = cmd) { + to_send = data; + keep_going = false; + } else if (let inet::ClientCmd::Receive = cmd) { + put(tx, to_send); + } else if (let inet::ClientCmd::Finish = cmd) { + keep_going = false; + } else if (let inet::ClientCmd::Shutdown = cmd) { + keep_going = false; + shutdown = true; + } + } + } + } + } + + comp fake_client(inet::TcpConnection conn) { + sync put(conn.tx, inet::ClientCmd::Send({1, 3, 3, 7})); + sync { + put(conn.tx, inet::ClientCmd::Receive); + auto val = get(conn.rx); + while (val[0] != 1 || val[1] != 3 || val[2] != 3 || val[3] != 7) { + print(\"this is going very wrong\"); + } + put(conn.tx, inet::ClientCmd::Finish); + } + sync put(conn.tx, inet::ClientCmd::Shutdown); + } + + comp constructor() { + channel conn_tx -> conn_rx; + new fake_listener_once(conn_tx); + + // Same crap as before: + channel cmd_tx -> unused_cmd_rx; + channel unused_data_tx -> data_rx; + auto connection = inet::TcpConnection{ tx: cmd_tx, rx: data_rx }; + + sync { + connection = get(conn_rx); + } + + new fake_client(connection); + } + ", "constructor", no_args()); +} + +#[test] +fn test_tcp_listener_and_client() { + compile_and_create_component(" + import std.internet::*; + + func listen_port() -> u16 { + return 2392; + } + + comp server(u32 num_connections, in<()> shutdown) { + // Start tcp listener + channel listen_cmd_tx -> listen_cmd_rx; + channel listen_conn_tx -> listen_conn_rx; + new tcp_listener({}, listen_port(), listen_cmd_rx, listen_conn_tx); + + // Fake channels such that we can create a dummy connection variable + channel client_cmd_tx -> unused_client_cmd_rx; + channel unused_client_data_tx -> client_data_rx; + auto new_connection = TcpConnection{ + tx: client_cmd_tx, + rx: client_data_rx, + }; + + auto connection_counter = 0; + while (connection_counter < num_connections) { + // Wait until we get a connection + print(\"server: waiting for an accepted connection\"); + sync { + put(listen_cmd_tx, ListenerCmd::Accept); + new_connection = get(listen_conn_rx); + } + + // We have a new connection, spawn an 'echoer' for it + print(\"server: spawning an echo'ing component\"); + new echo_machine(new_connection); + connection_counter += 1; + } + + // Shut down the listener + print(\"server: shutting down listener\"); + sync auto v = get(shutdown); + sync put(listen_cmd_tx, ListenerCmd::Shutdown); + } + + // Waits for a single TCP byte (to simplify potentially having to + // concatenate requests) and echos it + comp echo_machine(TcpConnection conn) { + auto data_to_echo = {}; + + // Wait for a message + sync { + print(\"echo: receiving data\"); + put(conn.tx, ClientCmd::Receive); + data_to_echo = get(conn.rx); + put(conn.tx, ClientCmd::Finish); + } + + // Echo the message + print(\"echo: sending back data\"); + sync put(conn.tx, ClientCmd::Send(data_to_echo)); + + // Ask the tcp connection to shut down + print(\"echo: shutting down\"); + sync put(conn.tx, ClientCmd::Shutdown); + } + + comp echo_requester(u8 byte_to_send, out<()> done) { + channel cmd_tx -> cmd_rx; + channel data_tx -> data_rx; + new tcp_client({127, 0, 0, 1}, listen_port(), cmd_rx, data_tx); + + // Send the message + print(\"requester: sending bytes\"); + sync put(cmd_tx, ClientCmd::Send({ byte_to_send })); + + // Receive the echo'd byte + auto received_byte = byte_to_send + 1; + sync { + print(\"requester: receiving echo response\"); + put(cmd_tx, ClientCmd::Receive); + received_byte = get(data_rx)[0]; + put(cmd_tx, ClientCmd::Finish); + } + + // Silly check, as always + while (byte_to_send != received_byte) { + print(\"requester: Oh no! The echo is an otherworldly distorter\"); + } + + // Shut down the TCP connection + print(\"requester: shutting down TCP component\"); + sync put(cmd_tx, ClientCmd::Shutdown); + sync put(done, ()); + } + + comp constructor() { + auto num_connections = 1; + channel shutdown_listener_tx -> shutdown_listener_rx; + new server(num_connections, shutdown_listener_rx); + + auto connection_index = 0; + auto all_done = {}; + while (connection_index < num_connections) { + channel done_tx -> done_rx; + new echo_requester(cast(connection_index), done_tx); + connection_index += 1; + all_done @= {done_rx}; + } + + auto counter = 0; + while (counter < length(all_done)) { + print(\"constructor: waiting for requester to exit\"); + sync auto v = get(all_done[counter]); + counter += 1; + } + + print(\"constructor: instructing listener to exit\"); + sync put(shutdown_listener_tx, ()); + } + ", "constructor", no_args()); +} \ No newline at end of file diff --git a/src/runtime2/tests/messaging.rs b/src/runtime2/tests/messaging.rs index fc4a0dd65f241980bcc60e866ee705620648a501..07e2464760f1ad941dc69079be7bfa61e688bddf 100644 --- a/src/runtime2/tests/messaging.rs +++ b/src/runtime2/tests/messaging.rs @@ -4,7 +4,7 @@ use super::*; #[test] fn test_component_communication() { let pd = ProtocolDescription::parse(b" - primitive sender(out o, u32 outside_loops, u32 inside_loops) { + comp sender(out o, u32 outside_loops, u32 inside_loops) { u32 outside_index = 0; while (outside_index < outside_loops) { u32 inside_index = 0; @@ -16,7 +16,7 @@ fn test_component_communication() { } } - primitive receiver(in i, u32 outside_loops, u32 inside_loops) { + comp receiver(in i, u32 outside_loops, u32 inside_loops) { u32 outside_index = 0; while (outside_index < outside_loops) { u32 inside_index = 0; @@ -29,7 +29,7 @@ fn test_component_communication() { } } - composite constructor() { + comp constructor() { channel o_orom -> i_orom; channel o_mrom -> i_mrom; channel o_ormm -> i_ormm; @@ -58,7 +58,7 @@ fn test_component_communication() { #[test] fn test_send_to_self() { compile_and_create_component(" - primitive insane_in_the_membrane() { + comp insane_in_the_membrane() { channel a -> b; sync { put(a, 1); @@ -72,7 +72,7 @@ fn test_send_to_self() { #[test] fn test_intermediate_messenger() { let pd = ProtocolDescription::parse(b" - primitive receiver(in rx, u32 num) { + comp receiver(in rx, u32 num) { auto index = 0; while (index < num) { sync { auto v = get(rx); } @@ -80,7 +80,7 @@ fn test_intermediate_messenger() { } } - primitive middleman(in rx, out tx, u32 num) { + comp middleman(in rx, out tx, u32 num) { auto index = 0; while (index < num) { sync { put(tx, get(rx)); } @@ -88,7 +88,7 @@ fn test_intermediate_messenger() { } } - primitive sender(out tx, u32 num) { + comp sender(out tx, u32 num) { auto index = 0; while (index < num) { sync put(tx, 1337); @@ -96,7 +96,7 @@ fn test_intermediate_messenger() { } } - composite constructor_template() { + comp constructor_template() { auto num = 0; channel tx_a -> rx_a; channel tx_b -> rx_b; @@ -105,7 +105,7 @@ fn test_intermediate_messenger() { new receiver(rx_b, 3); } - composite constructor() { + comp constructor() { new constructor_template(); new constructor_template(); new constructor_template(); diff --git a/src/runtime2/tests/mod.rs b/src/runtime2/tests/mod.rs index 85572c586c717aafef2405f6d35172c9dfe4b791..b3988d25db7fc32e3077a29ad6b0d4589e2ade98 100644 --- a/src/runtime2/tests/mod.rs +++ b/src/runtime2/tests/mod.rs @@ -6,6 +6,7 @@ use crate::runtime2::component::{CompCtx, CompPDL}; mod messaging; mod error_handling; mod transfer_ports; +mod internet; const LOG_LEVEL: LogLevel = LogLevel::Debug; const NUM_THREADS: u32 = 1; @@ -13,7 +14,7 @@ const NUM_THREADS: u32 = 1; pub(crate) fn compile_and_create_component(source: &str, routine_name: &str, args: ValueGroup) { let protocol = ProtocolDescription::parse(source.as_bytes()) .expect("successful compilation"); - let runtime = Runtime::new(NUM_THREADS, LOG_LEVEL, protocol) + let runtime = Runtime::new(NUM_THREADS, LogLevel::None, protocol) .expect("successful runtime startup"); create_component(&runtime, "", routine_name, args); } @@ -34,7 +35,7 @@ pub(crate) fn no_args() -> ValueGroup { ValueGroup::new_stack(Vec::new()) } #[test] fn test_component_creation() { let pd = ProtocolDescription::parse(b" - primitive nothing_at_all() { + comp nothing_at_all() { s32 a = 5; auto b = 5 + a; } @@ -54,7 +55,7 @@ fn test_simple_select() { return (); } - primitive receiver(in in_a, in in_b, u32 num_sends) { + comp receiver(in in_a, in in_b, u32 num_sends) { auto num_from_a = 0; auto num_from_b = 0; while (num_from_a + num_from_b < 2 * num_sends) { @@ -73,7 +74,7 @@ fn test_simple_select() { } } - primitive sender(out tx, u32 num_sends) { + comp sender(out tx, u32 num_sends) { auto index = 0; while (index < num_sends) { sync { @@ -83,7 +84,7 @@ fn test_simple_select() { } } - composite constructor() { + comp constructor() { auto num_sends = 1; channel tx_a -> rx_a; channel tx_b -> rx_b; @@ -99,7 +100,7 @@ fn test_simple_select() { #[test] fn test_unguarded_select() { let pd = ProtocolDescription::parse(b" - primitive constructor_outside_select() { + comp constructor_outside_select() { u32 index = 0; while (index < 5) { sync select { auto v = () -> print(\"hello\"); } @@ -107,7 +108,7 @@ fn test_unguarded_select() { } } - primitive constructor_inside_select() { + comp constructor_inside_select() { u32 index = 0; while (index < 5) { sync select { auto v = () -> index += 1; } @@ -122,7 +123,7 @@ fn test_unguarded_select() { #[test] fn test_empty_select() { let pd = ProtocolDescription::parse(b" - primitive constructor() { + comp constructor() { u32 index = 0; while (index < 5) { sync select {} @@ -139,7 +140,7 @@ fn test_random_u32_temporary_thingo() { let pd = ProtocolDescription::parse(b" import std.random::random_u32; - primitive random_taker(in generator, u32 num_values) { + comp random_taker(in generator, u32 num_values) { auto i = 0; while (i < num_values) { sync { @@ -149,7 +150,7 @@ fn test_random_u32_temporary_thingo() { } } - composite constructor() { + comp constructor() { channel tx -> rx; auto num_values = 25; new random_u32(tx, 1, 100, num_values); @@ -165,17 +166,17 @@ fn test_tcp_socket_http_request() { let _pd = ProtocolDescription::parse(b" import std.internet::*; - primitive requester(out cmd_tx, in data_rx) { + comp requester(out cmd_tx, in data_rx) { print(\"*** TCPSocket: Sending request\"); sync { - put(cmd_tx, Cmd::Send(b\"GET / HTTP/1.1\\r\\n\\r\\n\")); + put(cmd_tx, ClientCmd::Send(b\"GET / HTTP/1.1\\r\\n\\r\\n\")); } print(\"*** TCPSocket: Receiving response\"); auto buffer = {}; auto done_receiving = false; sync while (!done_receiving) { - put(cmd_tx, Cmd::Receive); + put(cmd_tx, ClientCmd::Receive); auto data = get(data_rx); buffer @= data; @@ -200,7 +201,7 @@ fn test_tcp_socket_http_request() { c5 == cast('m') && c6 == cast('l') && c7 == cast('>') ) { print(\"*** TCPSocket: Detected \"); - put(cmd_tx, Cmd::Finish); + put(cmd_tx, ClientCmd::Finish); done_receiving = true; } } @@ -210,11 +211,11 @@ fn test_tcp_socket_http_request() { print(\"*** TCPSocket: Requesting shutdown\"); sync { - put(cmd_tx, Cmd::Shutdown); + put(cmd_tx, ClientCmd::Shutdown); } } - composite main() { + comp main() { channel cmd_tx -> cmd_rx; channel data_tx -> data_rx; new tcp_client({142, 250, 179, 163}, 80, cmd_rx, data_tx); // port 80 of google @@ -223,7 +224,7 @@ fn test_tcp_socket_http_request() { ").expect("compilation"); // This test is disabled because it performs a HTTP request to google. - // let rt = Runtime::new(1, true, pd).unwrap(); + // let rt = Runtime::new(1, LOG_LEVEL, _pd).unwrap(); // create_component(&rt, "", "main", no_args()); } @@ -236,7 +237,7 @@ fn test_sending_receiving_union() { Shutdown, } - primitive database(in rx, out tx) { + comp database(in rx, out tx) { auto stored = {}; auto done = false; while (!done) { @@ -256,7 +257,7 @@ fn test_sending_receiving_union() { } } - primitive client(out tx, in rx, u32 num_rounds) { + comp client(out tx, in rx, u32 num_rounds) { auto round = 0; while (round < num_rounds) { auto set_value = b\"hello there\"; @@ -278,7 +279,7 @@ fn test_sending_receiving_union() { sync put(tx, Cmd::Shutdown); } - composite main() { + comp main() { auto num_rounds = 5; channel cmd_tx -> cmd_rx; channel data_tx -> data_rx; diff --git a/src/runtime2/tests/transfer_ports.rs b/src/runtime2/tests/transfer_ports.rs index e84a5b2bb5880230bc0048c5bee8ccd04270a2c5..d8d8f27a394c5c18b1bf73eb33d625b3a5093379 100644 --- a/src/runtime2/tests/transfer_ports.rs +++ b/src/runtime2/tests/transfer_ports.rs @@ -3,16 +3,16 @@ use super::*; #[test] fn test_transfer_precreated_port_with_owned_peer() { compile_and_create_component(" - primitive port_sender(out> tx) { + comp port_sender(out> tx) { channel a -> b; sync put(tx, b); } - primitive port_receiver(in> rx) { + comp port_receiver(in> rx) { sync auto a = get(rx); } - composite constructor() { + comp constructor() { channel a -> b; new port_sender(a); new port_receiver(b); @@ -20,18 +20,55 @@ fn test_transfer_precreated_port_with_owned_peer() { ", "constructor", no_args()); } +#[test] +fn test_transfer_precreated_in_struct_with_owned_peer() { + compile_and_create_component(" + struct PortPair { + out tx, + in rx, + } + + comp port_sender(out> tx) { + channel created_tx_a -> created_rx_a; + channel created_tx_b -> created_rx_b; + sync put(tx, PortPair{ tx: created_tx_a, rx: created_rx_b }); + sync { + auto val = get(created_rx_a); + put(created_tx_b, val); + } + } + + comp port_receiver(in> rx) { + channel fake_tx -> fake_rx; + auto conn = PortPair{ tx: fake_tx, rx: fake_rx }; + sync conn = get(rx); + sync { + put(conn.tx, 5); + auto val = get(conn.rx); + while (val != 5) {} + } + } + + comp constructor() { + channel tx -> rx; + new port_sender(tx); + new port_receiver(rx); + } + ", "constructor", no_args()); +} + #[test] fn test_transfer_precreated_port_with_foreign_peer() { compile_and_create_component(" - primitive port_sender(out> tx, in to_send) { + comp port_sender(out> tx, in to_send) { sync put(tx, to_send); } - primitive port_receiver(in> rx) { + comp port_receiver(in> rx) { sync auto a = get(rx); } - composite constructor() { + comp constructor() { channel tx -> rx; channel forgotten -> to_send; new port_sender(tx, to_send); @@ -43,18 +80,18 @@ fn test_transfer_precreated_port_with_foreign_peer() { #[test] fn test_transfer_synccreated_port() { compile_and_create_component(" - primitive port_sender(out> tx) { + comp port_sender(out> tx) { sync { channel a -> b; put(tx, b); } } - primitive port_receiver(in> rx) { + comp port_receiver(in> rx) { sync auto a = get(rx); } - composite constructor() { + comp constructor() { channel a -> b; new port_sender(a); new port_receiver(b); @@ -65,20 +102,20 @@ fn test_transfer_synccreated_port() { #[test] fn test_transfer_precreated_port_with_owned_peer_and_communication() { compile_and_create_component(" - primitive port_sender(out> tx) { + comp port_sender(out> tx) { channel a -> b; sync put(tx, b); sync put(a, 1337); } - primitive port_receiver(in> rx) { + comp port_receiver(in> rx) { channel a -> b; // this is stupid, but we need to have a variable to use sync b = get(rx); u32 value = 0; sync value = get(b); while (value != 1337) {} } - composite constructor() { + comp constructor() { channel a -> b; new port_sender(a); new port_receiver(b); @@ -89,15 +126,15 @@ fn test_transfer_precreated_port_with_owned_peer_and_communication() { #[test] fn test_transfer_precreated_port_with_foreign_peer_and_communication() { compile_and_create_component(" - primitive port_sender(out> tx, in to_send) { + comp port_sender(out> tx, in to_send) { sync put(tx, to_send); } - primitive message_transmitter(out tx) { + comp message_transmitter(out tx) { sync put(tx, 1337); } - primitive port_receiver(in> rx) { + comp port_receiver(in> rx) { channel unused -> b; sync b = get(rx); u32 value = 0; @@ -105,7 +142,7 @@ fn test_transfer_precreated_port_with_foreign_peer_and_communication() { while (value != 1337) {} } - composite constructor() { + comp constructor() { channel port_tx -> port_rx; channel value_tx -> value_rx; new port_sender(port_tx, value_rx); @@ -118,7 +155,7 @@ fn test_transfer_precreated_port_with_foreign_peer_and_communication() { #[test] fn test_transfer_precreated_port_with_owned_peer_back_and_forth() { compile_and_create_component(" - primitive port_send_and_receive(out> tx, in> rx) { + comp port_send_and_receive(out> tx, in> rx) { channel a -> b; sync { put(tx, b); @@ -126,7 +163,7 @@ fn test_transfer_precreated_port_with_owned_peer_back_and_forth() { } } - primitive port_receive_and_send(in> rx, out> tx) { + comp port_receive_and_send(in> rx, out> tx) { channel unused -> transferred; // same problem as in different tests sync { transferred = get(rx); @@ -134,7 +171,7 @@ fn test_transfer_precreated_port_with_owned_peer_back_and_forth() { } } - composite constructor() { + comp constructor() { channel port_tx_forward -> port_rx_forward; channel port_tx_backward -> port_rx_backward; @@ -146,7 +183,7 @@ fn test_transfer_precreated_port_with_owned_peer_back_and_forth() { #[test] fn test_transfer_precreated_port_with_foreign_peer_back_and_forth_and_communication() { compile_and_create_component(" - primitive port_send_and_receive(out> tx, in> rx, in to_transfer) { + comp port_send_and_receive(out> tx, in> rx, in to_transfer) { sync { put(tx, to_transfer); to_transfer = get(rx); @@ -157,7 +194,7 @@ fn test_transfer_precreated_port_with_foreign_peer_back_and_forth_and_communicat } } - primitive port_receive_and_send(in> rx, out> tx) { + comp port_receive_and_send(in> rx, out> tx) { channel unused -> transferred; sync { transferred = get(rx); @@ -165,11 +202,11 @@ fn test_transfer_precreated_port_with_foreign_peer_back_and_forth_and_communicat } } - primitive value_sender(out tx) { + comp value_sender(out tx) { sync put(tx, 1337); } - composite constructor() { + comp constructor() { channel port_tx_forward -> port_rx_forward; channel port_tx_backward -> port_rx_backward; channel message_tx -> message_rx; diff --git a/std/std.internet.pdl b/std/std.internet.pdl index 3e016622c4f8586d89ec371eb4e778dcc28f719e..6d5d355696eba8591b4f7fb5d8ce21971eccc4a9 100644 --- a/std/std.internet.pdl +++ b/std/std.internet.pdl @@ -1,12 +1,26 @@ #module std.internet -union Cmd { +union ClientCmd { Send(u8[]), Receive, Finish, Shutdown, } -primitive tcp_client(u8[] ip, u16 port, in cmds, out rx) { +comp tcp_client(u8[] ip, u16 port, in cmds, out rx) { #builtin } + +union ListenerCmd { + Accept, + Shutdown, +} + +struct TcpConnection { + out tx, + in rx, +} + +comp tcp_listener(u8[] ip, u16 port, in cmds, out rx) { + #builtin +} \ No newline at end of file diff --git a/std/std.random.pdl b/std/std.random.pdl index 840e195232766dd8441b9ecb3ae89210e339739d..dda37f9a4394f97bcca45e4b9e5923bd28447146 100644 --- a/std/std.random.pdl +++ b/std/std.random.pdl @@ -1,3 +1,3 @@ #module std.random -primitive random_u32(out generator, u32 min, u32 max, u32 num_sends) { #builtin } +comp random_u32(out generator, u32 min, u32 max, u32 num_sends) { #builtin } diff --git a/testdata/basic-modules/consumer.pdl b/testdata/basic-modules/consumer.pdl index ccaf12cf83958ecccc7eed5abdd616f83793f013..0a33aa253d87be472883c815398ce02eb9748164 100644 --- a/testdata/basic-modules/consumer.pdl +++ b/testdata/basic-modules/consumer.pdl @@ -1,6 +1,6 @@ #module consumer -primitive consumer(in input) { +comp consumer(in input) { sync { print("C: going to receive a value"); auto v = get(input); diff --git a/testdata/basic-modules/main.pdl b/testdata/basic-modules/main.pdl index 0c2b7f8c50c7d568047f1681b1f81c4aa5a6449b..0134a0096bd406726fae8885e715d10993d5eb33 100644 --- a/testdata/basic-modules/main.pdl +++ b/testdata/basic-modules/main.pdl @@ -1,7 +1,7 @@ import consumer as c; import producer::producer; -composite main() { +comp main() { channel output -> input; new c::consumer(input); new producer(output); diff --git a/testdata/basic-modules/producer.pdl b/testdata/basic-modules/producer.pdl index dc4747d7687a4ed6bb25a62b97d769223a01b657..668fd138e0916045474d7238865f78c104c6f040 100644 --- a/testdata/basic-modules/producer.pdl +++ b/testdata/basic-modules/producer.pdl @@ -1,6 +1,6 @@ #module producer -primitive producer(out output) { +comp producer(out output) { sync { print("P: Going to send a value!"); put(output, 1337); diff --git a/testdata/basic/testing.pdl b/testdata/basic/testing.pdl index af02505f5fbca31617368760f4b7727e70ec8e49..751533f88e36890462d541c804f3b9a4fd2d923d 100644 --- a/testdata/basic/testing.pdl +++ b/testdata/basic/testing.pdl @@ -1,4 +1,4 @@ -primitive consumer(in input) { +comp consumer(in input) { sync { print("C: going to receive a value"); auto v = get(input); @@ -7,7 +7,7 @@ primitive consumer(in input) { print("C: I am now exiting"); } -primitive producer(out output) { +comp producer(out output) { sync { print("P: Going to send a value!"); put(output, 1337); @@ -16,7 +16,7 @@ primitive producer(out output) { print("P: I am exiting"); } -composite main() { +comp main() { channel output -> input; new consumer(input); new producer(output);