Files
@ 8607ee53a8f4
Branch filter:
Location: CSY/reowolf/docs/runtime/consensus1.md
8607ee53a8f4
26.4 KiB
text/markdown
Finish documentation on general runtime architecture
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 | # Previous Consensus Algorithm
## Introduction
The previous consensus algorithm (the one within Reowolf 1.0 and 1.1) had support for speculative execution. This means that the user may (directly or indirectly) fork the execution of a component. That particular execution (which may be a forked execution already) then becomes >1 executions. At some point a component will have to choose which particular execution will be committed to memory. This is one reason for the existence of a `sync` block: a block of code wherein one may perform forking, and at the end a component will have to choose the execution that is committed to memory.
With speculative execution we may have multiple components that are all forking their execution and sending/receiving messages. So we do not end up with one component that has to choose its final execution, but all components choosing their final execution. Note that one component's execution may apply restrictions on the validity of another component's execution. As an example, suppose the following components and their executions:
- Component A: Has two executions:
- Execution A1: Component A has sent a message to component B.
- Execution A2: Component A has received a message from component B.
- Component B: Has three executions:
- Execution B1: Component B has received a message from component A, then sends a message back to component A.
- Execution B2: Component B has received a message from component A.
- Execution B3: Component B has sent two messages to component A.
Without delving into too much detail, one may see that the only valid solution to this problem is the combination of `A1` and `B2`. So the components cannot just pick any execution, but must pick an execution that is agreeable with the chosen executions of the components it has interacted with.
## Component Execution Tree, and Execution Traces
Components execute PDL code, which may contain calls to `fork`, `put`, and `get`. A `fork` explicitly forks the execution of the code. A `put` sends a message to a particular component, and a `get` receives a message from a component and forks (as explained later).
As the component enters a `sync` block, it has only one possible execution. But as stated above there are reasons for the execution to split up. These individual executions may themselves split up later, thereby forming a so-called "execution tree":
```
+-----+ +------+
| put |------>| sync |
+-------+ +------+----->+-----+ | end |
| sync | | fork | +------+
| start |----->+------+----->+-----+
+-------+ | get |------>+------+
+-----+ | sync |
| | end |
| +------+
|
+------>+------+
| | sync |
| | end |
| +------+
|
+--> ...
```
This corresponds to the following PDL code:
```
primitive some_component(out<u32> tx, in<u32> rx) {
sync {
fork {
put(tx, 5);
} or fork {
get(rx, 1);
}
}
```
We can see the reason for calling the execution tree a "tree". There are several things to note about the execution tree: Firstly that some executions have been completed and form a complete trace, that is: starting from the "sync start" a complete trace may be represented by the line running to the "sync end". Conversely, there is one trace that is incomplete: there is a trace waiting at the `get` for a message. We'll call a place where the execution splits into multiple branches/executions a "branching point".
Note that the branching points can in the *general case* only be discovered at runtime. Any code may have control flow points like `if` statements, or `while` loops. Consider the following code:
```
primitive some_component(out<u32> tx, bool which_way) {
sync {
if (which_way) {
fork {
put(tx, 1);
} or fork {
put(tx, 2);
}
} else {
put(tx, 3);
}
}
}
```
Depending on the value of `which_way` we produce two different execution trees (of which we can determine all traces). The compiler cannot decide at compile time which execution tree will be generated.
Note that the `get` branching points have an arbitrary number of forked executions arising from them. We'll call them "waiting points". In the *general case* we cannot figure out how many forked executions arise from a `get` branching point. The reason being might be illustrated by the following simple example:
```
primitive sender(out<u32> tx, u32 num_forks) {
sync {
auto fork_counter = 1;
while (fork_counter < num_forks) {
fork {
put(tx, fork_counter);
} or fork { } // empty case
}
put(tx, num_forks);
}
}
primitive receiver(in<u32> rx) {
u32[] values = {};
sync {
bool keep_going = true;
while (keep_going) {
auto new_value = get(rx);
values @= { new_value }; // append
fork {
keep_going = false;
} or fork { }
}
}
}
```
If the sender is connected to the receiver, then the sender will send anywhere between `1` and `num_forks` messages (distributed over `num_forks` forks), depending on a user-supplied parameter (which we cannot figure out at compile-time). The isolated receiver can generate an infinite number of forked executions. We can analyze that the receiver will at most have `num_forks + 1` forked executions arising from its `get` branching point (the `num_forks` branches that do receive, and one final fork that is infinitely waiting on another message), but the compiler cannot.
For this reason a `get` branching point needs to be kept around for the entire duration of the sync block. The runtime will always need to have a copy of the component's memory and execution state the moment it encountered a `get` instruction, because it might just be that another component (in perhaps a new fork, which we cannot predict) will send it another message, such that it needs to produce a new forked execution.
A `get` operation is also a "blocking operation": in the *general case* the component needs to know the value produced by the `get` operation in order to continue its execution (perhaps more specifically: the first time a `read` operation is performed on the variable that will store the transmitted message). Consider the simple case where the received message contains a boolean that is used in the test expression of an `if` statement: we'll need to have actually received that boolean before we can decide which control flow path to take. Speculating on the contents of messages is too computationally expensive to be taken seriously. A put operation is not a blocking operation: the message is sent and the component continues executing its code.
We've touched upon control flow points multiple times. We'll touch upon some aspects of control flow here, to more easily introduce the algorithm for finding consensus later. A component is fully described by its memory (i.e. all of the memory locations it has access to through its variables) and execution state (i.e. its current position in its code). So once a component encounters a control flow point, it can only take one control flow path. The calling of certain impure functions (e.g. retrieving a cryptographically secure random number) does not change this fact. Note that receiving values from other components might change a component's memory state, hence influence the control flow path it takes in the subsequent forked execution. Conversely, a component sending a value might influence another component's memory state.
So before treading into more detail, here we've found that in the general case:
- A interior of a sync block demarks the place where speculative execution may occur.
- Speculative execution implies that we end up with an execution tree.
- A path through the execution tree that reaches the end of the sync block is called a trace, and represents a valid execution of the sync block for the component (but perhaps not for a peer it interacted with).
- The set of traces produced by a component in its sync block can practically only be discovered at runtime.
- A `get` operation is necessarily a blocking operation that always incurs a branching point. A `put` operation is a nonblocking operation that does not incur a branching point.
- The trace of a component is influenced by the messages it has received.
## Towards a Consensus Algorithm
The key to the consensus problem is somehow discovering the ways in which the components have influenced the memory state of their peers. If we have a complete trace for each component, for which all peer components agree on the way they have influenced that complete trace, then we've found a solution to the consensus problem. Hence we can subdivide the consensus problem into four parts:
1. Keeping track of the messages that influence the memory state of components.
2. Keeping track of the peers that influence the memory state of components.
3. Finding a set of interactions between components on which all involved components agree, i.e. each `put` should have a corresponding `get` at the peer.
4. Somehow having a communication protocol that finds these agreeable interactions.
We'll not consider the last point, as this is essentially a gossip protocol, and the appropriate gossip protocol varies with the requirements of the user (e.g. robust to failure, memory efficient, runtime efficiency, message complexity). We define some terms to make the following discussion easier:
- "component graph": A graph where each node is a component, and each channel between components forms the edges to the graph.
- "sync region": The group of components that have interacted with one another and should agree on the global consensus solution.
- "local solution": A complete trace of a component. For the component this is a valid local solution, but might not be part of a global solution.
- "global solution": A set of traces, one for each of the components in the sync region, that all agree on the interactions that took place between the components in the sync region.
We'll now work incrementally to the final consensus algorithm, making it a bit of a story in order to explain the reasoning and intuition behind the consensus algorithm.
Suppose a component can somehow predict exactly which messages we're going to receive during the execution of its code, we'll assume that each received message has the appropriate `get` call associated with it. In this case we're able to produce the set of complete traces that a component produces by symbolically executing its code: we start out with the initial memory state, might perhaps do some explicit `fork`ing, know exactly which messages we receive and how they influence the control flow, and arrive at the end of the sync block. Hence each component can figure out independently which complete trace is the solution to its consensus problem.
However, as we've outlined above, we cannot know exactly which messages we're going to receive. We'll have to discover these messages while executing a component. The next best thing is to keep track of the values of the messages that we've received in a complete trace. Once we have complete traces for all of the interacting components, we can check that the received value corresponds to a sent value. e.g.
```
primitive sender(out<u32> tx) {
sync {
fork {
put(tx, 1);
} or fork {
put(tx, 2);
}
}
}
primitive receiver(in<u32> rx) {
u32 value = 0;
sync {
value = get(rx);
}
}
```
Where `tx` is part of the same channel as `rx`. In this case we'll have two traces for each of the components, resulting in two valid global consensus solutions. In one solution the message `1` was transferred, in another the message `2` was transferred. There are two problems with this solution: firstly it doesn't take the identity of the channel into account. And secondly it doesn't take the effects of previous messages into account.
To illustrate the first problem, consider:
```
primitive sender(out<u32> tx_a, out<u32> tx_b) {
u32 some_variable_in_memory = 0;
sync {
fork {
put(tx_a, 1);
some_variable_in_memory = 1;
} or fork {
put(tx_b, 1);
some_variable_in_memory = 2;
}
}
}
primitive receiver(in<u32> rx_a, in<u32> rx_b) {
u32 value = 0;
sync {
value = get(rx_a);
}
}
```
Here the fact that the sender has the solutions `1` and `1` does not help the receiver figure out which of those corresponds to its own solution of `1`.
To illustrate the second problem, consider:
```
primitive sender(out<u32> tx) {
sync {
fork {
put(tx, 1);
put(tx, 2);
} or fork {
put(tx, 2);
}
}
}
primitive receiver(in<u32> rx) {
u32 value = 0;
sync {
value = get(rx);
}
}
```
Now we'll have `sender` contributing the solutions `1, 2` and `2`. While the receiver will generate the solutions `1`, `2` and `2`. The reason there are three solutions for the receiver is because it cannot figure out that the message `2` from the sender depended on the first message `1` from the sender having arrived.
So we need to change the algorithm. Instead of just tracking which messages were sent, each component needs to have a mapping from port identities to sent messages (internally the runtime will generate port/channel IDs, but for the sake of this discussion we'll use the postfixes to the port names in the PDL code to indicate to which channel they belong, e.g. the `tx_a` out-port is part of the same channel `a` as the `rx_a` in-port). Secondly, if we sent a message, we need to transmit in which way it depends on previously received messages by sending along the sender's port mapping. The receiver, upon `get`ting a message, checks the port mapping to see if there is any of its own executions that can accept that message.
We're already calling this information the "port mapping" (because we'll truly turn it into a mapping later), but for now the sent mapping is a list of pairs containing `(port ID, sent value`).
This changes the way we can interpret the execution tree: each node is not only associated with the performed operation (`fork`, `put` or `get`), but also associated with a particular port mapping that indicates the influence of other components that allowed it to reach that exection node. We modify the port mapping per node in the following way:
- For a `fork`: we fork the execution as many times as needed, and for those forks we copy the port mapping of the part node.
- For a `put`: we transmit the current port mapping, and the transmitted value in the message. We then update the mapping from the sending port: that particular port ID now maps to the recently transmitted value.
- For a `get`: we receive the transmitted port mapping and value. Note that this `get` might be a particular statement executed by multiple different forked executions (each with their own port mapping). And so for a `get` to succeed, we need the shared channels between the sender and the receiver to agree on their port mapping. If such a `get` succeeds, then it forks into a new execution where the receiving port now maps to the received value.
So, for a slightly more complicated example, combining the two previously examples:
```
primitive initiator(out<u32> tx_a, out<u32> tx_b, in<u32> rx_c) {
u32 value = 0;
sync {
put(tx_a, 1); // operation i1
fork {
put(tx_a, 1); // operation i2
value = get(rx_c); // operation i3
} or fork {
put(tx_b, 2); // operation i4
value = get(rx_c); // operation i5
}
}
}
primitive responder(in<u32> rx_a, in<u32> rx_b, out<u32> tx_c) {
sync {
auto value_1 = get(rx_a); // operation r1
auto value_2 = 0;
fork {
value_2 = get(rx_a); // operation r2
} or fork {
value_2 = get(rx_b); // operation r3
}
put(tx_c, value1 + value2); // operation r4
}
```
Here, once the components have completed as much fork executions as possible, we'll have the following execution trees (and mappings). The square bracketed terms denote port mapping. The parenthesized terms correspond to the operations in the code, and the curly bracketed terms are the names for the traces (so we can refer to them in this document).
```
For the initiator:
sync --> put (i1) --> fork +--> put (i2) -----> get (i3) -+-----> sync end {A}
start [(a,1)] | [(a,1),(a,1)] [(a,1),(a,1),(c,2)]
| |
| |
| +-> ...
|
+--> put (i4) -----> get (i5) -+-----> sync end {B}
[(a,1),(b,2)] [(a,1),(b,2),(c,3)]
|
|
+-> ...
For the responder:
sync --> get (r1) -+--> fork +--> get (r2) -----> put (r4) ----> sync end {C}
start [(a,1)] | | [(a,1),(a,1)] [(a,1),(a,1),(c,2)]
| |
+-> ... +--> get (r3) -----> put (r4) ----> sync end {D}
[(a,1),(b,2)] [(a,1),(b,2),(c,3)]
```
We can see that the `put` operation at `i2` does not end up being received at `r1`. The reason being that at `r1` the responder expects to not have received anything on `rx_a` yet. The message that the initiator sends contains the annotation `[(a,1),(a,1)]`, meaning: I have previously sent `[(a,1)]`, and am now sending `[(a,1)]`. The only operation that can receive this operation is at `r2`, because that expects the mapping `[(a,1)]`!
Similarly: when the responder `put`s at `r4`, this happens in two branches. The branch that ends up being trace `C` expects the initiator to be in the state `[(a,1),(a,1)]` (hence can only be received at operation `i3`, resulting in trace `A` of the initiator). The branch that ends up being trace `D` expects the initiator to be in the state `[(a,1),(b,2)]` (hence can only be received at operation `i5`, resulting in trace `B` of the initiator).
And ofcourse, when both components are finished, they can compare the mappings in both of them and conclude that the traces `A` and `C` are compatible, since their port mappings are compatible. Similarly traces `B` and `D` are compatible. So there are two global solutions to the consensus problem.
For the sake of simplicity, we've only considered two components. But there may be many more components involved in a single synchronous region. In this case we'll have to clarify when receiving a message is valid. When a message is sent to another component, the receiving component first filters the port mapping (both for the mapping stored in the execution tree and the mapping sent over the channel) such that only the channels shared between the two components are left. If those two mappings are equal, then the message can be received in that branch.
The same process needs to be applied when we seek a global solution. In rather silly pseudocode, but the simplest way to explain this process, is to have the following algorithm seeking the global solution:
```
all_components = [ component_1, component_2, ..., component_N ]
global_solutions = []
// Nested loop through all components
for all complete traces in component_1:
for all complete traces in component_2:
...
...
for all complete traces in component_N:
let success = true;
let all_traces = [trace_of_1, trace_of_2, ..., trace_of_N];
// Looping through every pair of traces
for index_a in 0..N:
for index_b in index_a + 1..N:
let component_a = all_components[index_a]
let trace_a = all_traces[index_a]
let component_b = all_components[index_b]
let trace_b = all_traces[index_b]
trace_a = filter_on_shared_channels(trace_a, component_a, component_b)
trace_b = filter_on_shared_channels(trace_b, component_a, component_b)
if trace_a != trace_b:
success = false;
break
if !success:
break
if success:
global_solutions = append(global_solutions, all_traces)
```
We'll now apply the last bit of trickery to this algorithm. Firstly keeping track of the sent message values may be prohibitively expensive. Suppose some kind of streaming data processor that receives gigabytes of data in a single synchronous round. It would be an unwise design decision to store all of that data in the port mapping. So what we'll do instead is assinging each node in the execution tree a unique number (unique only for that execution tree, different execution trees might contain the same number). With that unique number we'll redefine the port mapping to consist of a list of `(port ID, branch number)` pairs.
The reason this is a valid trick is because of the arguments made earlier regarding control flow being influenced by received messages. If we know how a component was influenced by external influences, then the control flow path it takes is deterministic, hence the content of the sent messages will be deterministic. Locally a component `A` may only describe the way it was influenced by its peer `B`, but `B` also records how it was influenced by its peers `C` and `D`. So transitively `A` will also know the indirect mutual influences between it and `C` and `D`.
Lastly, we can turn the list of `(port ID, branch number)` pairs into a true mapping `{port ID -> branch number}`, we do not actually need to keep the entire history around. The reason behind this is the fact that the `get` operation is blocking and requires the sent port mapping to be compatible with its execution node's port mapping. Consider the following cases:
- A `get` operation never receives a message: in that case we keep blocking indefinitely, we'll never get a complete trace, hence are prevented from finding a local solution.
- A `get` operation receives a message containing an incompatible port mapping: in that case we have the same case as above. The interpretation is different: the sender and the receiver did not agree on the control flow paths they took.
- A `get` operation receives a message containing a compatible port mapping: in this case both components agree on the order of operations that took place for the message to be transferred.
- A `put` operation is never received: again, we're fine. The `put`ting component might submit a local solution for a complete trace. But the mapping for that never-received message will also never appear in the supposed receiver's port mapping. Hence the two components will never agree on the control flow paths they took.
## The Consensus Algorithm
With the story above, we may describe the complete consensus finding algorithm as following.
Each component will locally construct an execution tree. Branches appear whenever it encounters a `fork` (producing a set of new branches), a `put` operation (this will create a single new branch) or a `get` operation (which will create new branches if the sent message's port mapping is compatible with the port mapping stored at that `get`'s branch). Whenever a new branch is created it is assigned a locally unique identifier.
The port mapping at each branch consists of a mapping `{port ID -> branch number}`. This port mapping starts out empty at the start of a sync block. When a component `put`s a value through a channel, it will:
- Generate the `put` branch and assign it a unique ID.
- Send the message that is annotated with the ancestor branch's port mapping. Together with the (`sending port's ID`, `newly assigned branch ID)` pair.
- Update the port mapping of the `put` branch: it will copy the ancestor branch's port mapping, and update it with the `(send port's ID, newly assigned branch ID)` pair. In case a previous entry is present for that specific `port ID`, then it is overwritten.
When a component encounters a `fork`, it will simply copy the ancestor branch's port mapping for each new branch. Each new branch will receive a new unique ID. When a component performs a `get`, it will block until it receives a message annotated with a port mapping. If that happens, then it will:
- Compare the port mapping in the message with the branch's port mapping. If one of the shared channels do not agree on the port mapping, then the message is not valid for that `get` operation. Note that the message *must* be kept around, because in the future there may be a `get` operation that *is* valid for that port mapping.
- If the port mapping for the shared channels do agree, then we:
- Generate a new fork originating from the `get` branch and assign it a unique ID.
- Update the port mapping of the `get` branch: copy the ancestor branch's port mapping, and update it with the `(sending port ID, peer's branch number)` pair.
## Reasons for not implementing this Consensus Algorithm
There are a lot of practical issues with this consensus algorithm:
1. The fact that a `get` operation never knows when it will receive a new message requires us to keep a complete copy of the component's memory and execution state at that point. Hence for each `get` operation we're incurring a rather large memory overhead.
2. The fact that we never know if a received message can be discarded because it cannot be received by any of the `get` operations in the component's code. There may be another message coming in that causes a fork with a `get` operation that *can* receive this message. Hence we need to keep around *all* of the messages received in a synchronous round.
3. The incredible computational complexity of finding a global solution to the consensus algorithm. We need to check for each component all of its completed traces. For all of those `N` components, each supplying `T` traces (to simplify the math), we need to check each pair of traces. So the computational complexity is `(N*T)*(N*(T-1))/2`.
4. Considering the previous points: the simplicity (especially when running over the internet) for a nefarious actor to incur computational overhead for the receiver. All it has to do is to keep sending messages to the receiver with an acceptable port mapping, but to never offer the consensus algorithm a valid local solution. Each accepted message will spawn a new fork at the receiver.
|