Changeset - cc687c3f75c5
[Not reviewed]
0 3 5
MH - 4 years ago 2021-09-01 22:44:50
contact@maxhenger.nl
WIP on documentation and initial runtime
8 files changed with 459 insertions and 1 deletions:
0 comments (0 inline, 0 general)
docs/runtime/consensus.md
Show inline comments
 
new file 100644
 
# Consensus Algorithm
 

	
 
## Introduction.
 

	
 
An essential concept within the Reowolf language is the `sync` block. Behaviours that are specified within such a block (as imperative code, containing the instructions to send or receive information, and conditions on the values in memory) must agree with all other parties that participate in the interaction. An additional concept within such a `sync` block is speculative execution. Code that uses this execution temporarily forks and is allowed to perform multiple behaviours at the same time. At the end of the `sync` block only one particular execution (i.e. local behaviour) is allowed to complete. This results in additional complexity in finding satisfactory global behaviour.
 

	
 
This document attempts to explain the chosen implementation of the initial consensus protocol. At some point one should be able to write consensus protocols associated with `sync` blocks within PDL. As initial experimentation (mainly to see which information should be available to a programmer using PDL) the consensus protocol will be written in the language in which the runtime is written.
 

	
 
The Reowolf 1.2 consensus protocol aims to fix several issues that were present in the Reowolf 1.0 consensus protocol, among which:
 

	
 
- The newer protocol should not synchronize among all known connectors on a machine. Rather, it should only aim to achieve consensus among the connectors that are actually communicating to one-another in the same interaction. Any connector that does not send or receive messages to this "synchronous region" does not belong to that synchronous region.
 
- The newer protocol should aim to be leaderless. The old protocol featured a leader per interaction. Both the leader election itself and the subsequent picking of the global behaviour caused a large message overhead. Additionally the leader is tasked with a large computational overhead. Especially in the context of relatively large synchronous regions where some participants are running code on low-powered devices this is troublesome.
 
- With regards to performance, the new consensus protocol should aim to reduce the message complexity and amount of transmitted bytes as much as possible. Additionally computational complexity should be reduced by attempting to perform a reduction in the number of valid local connector behaviours, thereby reducing the search space of the global connector behaviour.
 

	
 
In the following discussion, there is a lot of room for optimization. But we'll describe the general algorithm first, and the specific optimizations in a different document in the future.
 

	
 
## Data Structures
 

	
 
### Speculative Execution
 

	
 
First we create a data structure for the speculative execution itself. Speculative execution is tracked in memory as an execution tree. At the root we find the very first bit of code that is executed without any speculative execution. Each node contains the executed code associated with a particular branch. The edges in this tree might imply speculative execution (if there is more than one edge leaving a particular node), or might simply imply a "dependency" (explained later) if there is only one edge.
 

	
 
At the leaves of the tree we find successful executions, where the very last instruction is the "end of the sync block" instruction. Reaching such a leaf implies that we found a local behaviour that satisfies the local constraints placed upon the behaviour. If we trace the path from the root to the particular leaf then we find the execution path. If one imagines that all of the code in all of the nodes in the execution path are concatenated, then one finds all executed instructions in order of execution.
 

	
 
Each time a connector reaches a sync block, it will associate a number with that sync block. We'll call this the `round ID`. Each executed sync block will have a unique `round ID` (up to some reasonable limit in case of integer overflow). Likewise each of the nodes in the execution tree will have a unique number called the `branch ID`. The branch ID is unique among all branches in the execution tree, but numbers may be reused in different `round ID`s.
 

	
 
### Tracking Dependencies
 

	
 
One of the implications of being able to send messages and perform speculative execution is that branches will also be created upon receiving messages. One may imagine connectors `S` and `R`. `R` simply has the behaviour of receiving a message and handing it off to some native application. But `S` branches and sends, in each branch, a message over the same port. This implies that `R` will also end up with two branches: one per received message. In order to track dependencies between these two parties it is sufficient to annotate each message with its sender's branch number. Afterwards we can pick the branch numbers that are consistent between the two parties.
 

	
 
When more than two parties are communicating, the behaviour becomes more complicated. A series of connectors `A`, `B`, `C`, etc. may have behaviours that depend on one-another in convoluted fashion. A particular valid execution trace in `A` may have send message to multiple different connectors `B`, `C` and `D`, influencing their speculative behaviour. In turn `B`, `C` and `D` may have done some branching on their own, and each of them sends messages to a final connector `E`. We now have that the branches in `B`, `C` and `D` depend on `A`, and `E` depending on the former three. A consensus protocol needs to be able to reason about these dependencies and, when a solution is possible, pick a single execution path in each of the connectors.
 

	
 
In order to achieve this, we'll simplify the subsequent discussion for now by assuming that there is some later algorithm that will kick in once a connector has found a local solution. This algorithm will somehow seek among all connectors if they agree with a particular solution. For now we'll just consider the necessary information that needs to be provided to this algorithm in order for it to find a solution.
 
\
 
\
 
\
 
To start the train of thought, suppose that each connector that sends a message will append its execution path's branch numbers, and any of the branch numbers it has received through messages. This implies that each branch in the execution tree is associated with a mapping from `connector ID` to a set of branch numbers. If a connector receives a message then it can deposit the message in a branch if the received message's mapping contains `connector ID`s that map to a branch number set that is a superset of branch numbers in the branch's mapping itself. There are no restrictions on the set of `connector ID`s itself. Only on the branch number sets that are associated with the intersection of the `connector ID` sets.
 

	
 
The upside of this scheme is that each connector has a complete view of the dependencies that exist within the synchonous region that resulted in the branch. The downside is that the amount of data quickly balloons. Each branch that encountered a `get` call needs to wait for more messages, and needs to keep the complete branch number mapping around.
 

	
 
The subsequent algorithm, the one that makes sure that everyone agrees to a particular solution, then results in sending around this mapping, each connector adding its own compatible branch number mapping to it (or, if there is no compatible mapping, deleting the solution). If this messages reaches all connectors, and all connectors agree to the chosen mapping, then we have found a solution.
 
\
 
\
 
\
 
A different approach would be to take a different look at the global behaviour centered around the channels themselves. Two connectors can only have a dependency on one another if they communicate through a channel. Furthermore, suppose connector `A` sends to `B` and `B` sends to `C`. In the scheme described above `C` would know about its dependency on `A`. However, this is redundant information. If `C` knows about its dependency on `B`, and `B` knows about its dependency on `A`, then globally we have a full view on the dependencies as well. If `A` sends to `C` as well, then `C` does not know about the interdependency between the message traversing `A -> B -> C` and the message traversing `A -> C`. But again: if we take a global view and join the branch number mapping of `A`, `B` and `C`, then we're able to determine the global behaviour.
 

	
 
So instead of sending all branch number information received. We can append only the sending connector's branch numbers along with a message. A receiving connector will now associate these branch numbers with the port through which the message was received. Hence a connector's branch will have a branch number, but also a mapping from `port ID` to the branch number set of the sending party.
 

	
 
If we send around a solution to all connectors (again, the procedure for which will be detailed later) they can be reconciled in the following manner. The connectors sharing a port will always have the "putter" choosing the port number mapping. And the "putter" may have advanced its execution and increased the number of elements in the branch number set. So if the "putter" receives a solution, then it needs to check if the port's branch number set is a subset of its own branch number set. If a "getter" receives a solution then it needs to check if the port's branch number set is a superset of its own branch number set.
 

	
 
Taking a step back: if a global solution exists, then it is composed out of the local solutions per connector, of which there is at least one per connector. The fact that all connectors are part of the same synchronous region implies that each channel will have seen at least one interaction between the connector(s) that own the ports. Hence each channel will have had one set of branch IDs mapped to it. Hence if we were to take the branch ID sets associated with each channel, then we're able to find the global solution.
 
\ No newline at end of file
src/collections/freelist.rs
Show inline comments
 
new file 100644
 
use std::marker::PhantomData;
 
use alloc::raw_vec::RawVec;
 

	
 
/// Entry in a freelist. Contains a generation number to ensure silly mistakes
 
/// using an item's index after freeing it.
 
struct Entry<T> {
 
    generation: usize,
 
    item: T,
 
}
 

	
 
/// Key of an item in the freelist. Contains a generation number to prevent
 
/// use-after-free during development.
 
// TODO: Two usizes are probably overkill
 
#[derive(Copy, Clone)]
 
pub struct Key<T> {
 
    generation: usize,
 
    index: usize,
 
    _type: PhantomData<T>,
 
}
 

	
 
/// Generic freelist structure. Item insertion/retrieval/deletion works like a
 
/// HashMap through keys.
 
/// TODO: Use alloc::raw_vec::RawVec once stable and accessible
 
pub struct FreeList<T> {
 
    items: *mut Entry<T>,
 
    capacity: usize,
 
    length: usize,
 
    free: Vec<usize>,
 
}
 

	
 
impl<T> FreeList<T> {
 
    pub fn new() -> Self<T> {
 
        std::alloc::Layout::from_size_align()
 
        Self{
 
            items: std::ptr::null_mut(),
 
            capacity: 0,
 
            length: 0,
 
            free: Vec::new(),
 
        }
 
    }
 

	
 
    pub fn with_capacity(capacity: usize) -> Self {
 
        alloc::
 
        Self{
 
            items: std::,
 
            free: Vec::with_capacity(capacity),
 
            length: 0,
 
        }
 
    }
 

	
 
    /// Inserts a new item into the freelist. Will return a key that can be used
 
    /// to retrieve the item and delete it.
 
    pub fn insert(&mut self, item: T) -> Key<T> {
 
        let mut generation;
 
        let mut index;
 

	
 
        if self.free.is_empty() {
 
            // No free elements, make sure we have enough capacity
 
            if self.length == self.items.capacity() {
 
                self.items.reserve(self.length, 1);
 
            }
 

	
 
            // Now we do
 
            generation = 0;
 
            index = self.length;
 

	
 
            unsafe {
 
                let target = self.items.ptr().add(self.length);
 
                std::ptr::write(&mut target.item, item);
 
                self.length += 1;
 
            }
 
        } else {
 
            // We have a free spot. Note that the generation is incremented upon
 
            // freeing an item. So we can just take the current generation value
 
            // here.
 
            index = self.free.pop().unwrap();
 

	
 
            unsafe {
 
                let target = self.items.ptr().add(self.length);
 
                generation = target.generation;
 
                std::ptr::write(&mut target.item, item);
 
            }
 
        }
 

	
 
        Key { generation, index, _type: PhantomData::default() }
 
    }
 

	
 
    /// Removes the entry using the provided key. Will panic if the element was
 
    /// removed already.
 
    pub fn erase(&mut self, index: Key<T>) {
 
        // This should always be the case
 
        debug_assert!(index.index < self.length);
 

	
 
        // Retrieve element and make sure that the generation matches
 
        unsafe {
 
            let entry = self.items.ptr().add(index.index);
 
            assert_eq!(entry.generation, entry.generation);
 
            *entry.generation += 1;
 
            std::ptr::drop_in_place(&mut entry.item);
 
        }
 

	
 
        // Add the entry to the freelist.
 
        self.free.push(index.index);
 
    }
 
}
 

	
 
impl<T> std::ops::Index<Key<T>> for FreeList<T> {
 
    type Output = T;
 

	
 
    fn index(&self, index: &Key<T>) -> &Self::Output {
 
        debug_assert!(index.index < self.length);
 
        unsafe {
 
            let entry = self.items.ptr().add(index.index);
 
            assert_eq!(entry.generation, index.generation);
 
            return &entry.item;
 
        }
 
    }
 
}
 

	
 
impl<T> std::ops::IndexMut<Key<T>> for FreeList<T> {
 
    fn index_mut(&self, index: &Key<T>) -> &Self::Output {
 
        debug_assert!(index.index < self.length);
 
        unsafe {
 
            let entry = self.items.ptr().add(index.index);
 
            assert_eq!(entry.generation, index.generation);
 
            return &mut entry.item;
 
        }
 
    }
 
}
 

	
 
impl<T> Drop for FreeList<T> {
 
    fn drop(&mut self) {
 
        // Sort free indices to use them while traversing the allocated items
 
        self.free.sort_unstable();
 
        let free_length = self.free.len();
 
        let mut next_free_idx = 1;
 
        let mut next_free_item = usize::MAX;
 
        if free_length != 0 {
 
            next_free_item = self.free[0];
 
        }
 

	
 
        // Go through all items. If we didn't yet drop the item, then we do
 
        // so here.
 
        for item_idx in 0..self.length {
 
            if item_idx == next_free_item {
 
                if next_free_idx < free_length {
 
                    next_free_item = self.free[next_free_idx];
 
                    next_free_idx += 1;
 
                } else {
 
                    next_free_item = usize::MAX;
 
                }
 

	
 
                // Skipped the current item, go to the next one
 
                continue
 
            }
 

	
 
            // Need to deallocate the current item
 
            unsafe {
 
                let entry = self.items.ptr().add(item_idx);
 
                std::ptr::drop_in_place(&mut entry.item);
 
            }
 
        }
 
    }
 
}
 

	
 
#[cfg(test)]
 
mod tests {
 
    use super::*;
 
    use std::sync::Arc;
 
    use std::sync::atomic::{AtomicU32, Ordering};
 

	
 
    struct Counters {
 
        constructed: Arc<AtomicU32>,
 
        destructed: Arc<AtomicU32>,
 
    }
 

	
 
    impl Counters {
 
        pub fn new() -> Self {
 
            Self{
 
                constructed: Arc::new(AtomicU32::new(0)),
 
                destructed: Arc::new(AtomicU32::new(0)),
 
            }
 
        }
 
    }
 

	
 
    struct TestEntity {
 
        counters: Counters,
 
        pub value: u32,
 
    }
 

	
 
    impl TestEntity {
 
        pub fn new(counters: &Counters, value: u32) -> Self {
 
            counters.constructed.fetch_add(1, Ordering::SeqCst);
 
            return TestEntity{
 
                counters: Counters{
 
                    constructed: counters.constructed.clone(),
 
                    destructed: counters.destructed.clone(),
 
                },
 
                value,
 
            }
 
        }
 
    }
 

	
 
    impl Drop for TestEntity {
 
        fn drop(&mut self) {
 
            self.counters.destructed.fetch_add(1, Ordering::SeqCst);
 
        }
 
    }
 

	
 
    #[test]
 
    fn only_constructing() {
 
        const NUM_CREATED: u32 = 3;
 
        let counters = Counters::new();
 
        let mut list = FreeList::new();
 

	
 
        for i in 0..NUM_CREATED {
 
            list.insert(TestEntity::new(&counters, i));
 
            assert_eq!(counters.constructed.load(Ordering::AcqRel), i + 1);
 
        }
 

	
 
        // Everything is constructed, check freelist
 
        assert_eq!(counters.constructed.load(Ordering::AcqRel), NUM_CREATED);
 
        assert_eq!(list.length, 3);
 
        assert!(list.items.capacity() >= 3);
 
        assert!(list.free.is_empty());
 

	
 
        // Drop and check everything is properly dropped
 
        drop(list);
 
        assert_eq!(counters.destructed.load(Ordering::AcqRel), NUM_CREATED)
 
    }
 

	
 
    #[test]
 
    fn reusing_slots() {
 
        const NUM_ROUNDS: u32 = 10;
 
        const NUM_IN_USE: u32 = 10;
 

	
 
        let counters = Counters::new();
 
        let mut list = FreeList::new();
 
        let mut indices = Vec::with_capacity(NUM_IN_USE as usize);
 

	
 
        for round_idx in 0..NUM_ROUNDS {
 
            indices.clear();
 

	
 
            // Adding entries
 
            for i in 0..NUM_IN_USE {
 
                let new_index = list.insert(TestEntity::new(&counters, i));
 
                indices.push(new_index);
 
            }
 

	
 
            // Length should always remain the same as the total number of
 
            // entries in use
 
            assert_eq!(list.length, NUM_IN_USE as usize);
 

	
 
            // Removing entries, and making sure that everything is still
 
            // accessible
 
            for idx in 0..NUM_IN_USE {
 
                // Make sure we can still retrieve the item we're going to delete
 
                let idx_to_remove = NUM_IN_USE - 1 - idx;
 
                let pop_index = indices.pop().unwrap();
 
                let entry = &list[pop_index];
 
                assert_eq!(entry.value, idx_to_remove);
 

	
 
                // Remove the entry and make sure the other ones are still
 
                // accessible
 
                list.erase(pop_index);
 

	
 
                for remaining_idx in 0..idx_to_remove + 1 {
 
                    let remaining_key = &indices[remaining_idx as usize];
 
                    let remaining_entry = &list[*remaining_key];
 
                    assert_eq!(remaining_entry.value, remaining_idx);
 
                }
 
            }
 

	
 
            // Now that we're empty, our constructed and destructed counts
 
            // should match
 
            let expected_count = (round_idx + 1) * NUM_IN_USE;
 
            assert_eq!(counters.constructed.load(Ordering::AcqRel), expected_count);
 
            assert_eq!(counters.destructed.load(Ordering::AcqRel), expected_count);
 
        }
 

	
 
        // Make sure the capacity didn't grow out of bounds, maximum growth rate
 
        // I've ever encountered on `Vec`s is 2. So:
 
        assert!(list.items.capacity() >= NUM_IN_USE as usize && list.items.capacity() < 2*NUM_IN_USE as usize);
 

	
 
        // Finally, when we drop the list we shouldn't be destructing anything
 
        // anymore.
 
        drop(list);
 
        let final_count = NUM_ROUNDS * NUM_IN_USE;
 
        assert_eq!(counters.constructed.load(Ordering::AcqRel), final_count);
 
        assert_eq!(counters.destructed.load(Ordering::AcqRel), final_count);
 
    }
 

	
 
    #[test]
 
    #[should_panic]
 
    fn panic_on_reused_key_of_empty_slot() {
 
        let counters = Counters::new();
 
        let mut list = FreeList::new();
 
        let key = list.insert(TestEntity::new(&counters, 0));
 
        list.erase(key);
 
        let entry = &list[key];
 
    }
 

	
 
    #[test]
 
    #[should_panic]
 
    fn panic_on_reused_key_of_used_slot() {
 
        let counters = Counters::new();
 
        let mut list = FreeList::new();
 
        let key1 = list.insert(TestEntity::new(&counters, 0));
 
        list.erase(key1);
 
        let key2 = list.insert(TestEntity::new(&counters, 0));
 
        assert_eq!(key1.index, key2.index);
 
        assert_ne!(key1.generation, key2.generation);
 
        let entry = &list[key1];
 
    }
 
}
 
\ No newline at end of file
src/collections/mod.rs
Show inline comments
 
@@ -2,6 +2,8 @@ mod string_pool;
 
mod scoped_buffer;
 
mod sets;
 

	
 
// TODO: Finish this later, use alloc::alloc and alloc::Layout
 
// mod freelist;
 

	
 
pub(crate) use string_pool::{StringPool, StringRef};
 
pub(crate) use scoped_buffer::{ScopedBuffer, ScopedSection};
src/lib.rs
Show inline comments
 
@@ -4,6 +4,7 @@ mod macros;
 
mod common;
 
mod protocol;
 
mod runtime;
 
mod runtime2;
 
mod collections;
 

	
 
pub use common::{ConnectorId, EndpointPolarity, Payload, Polarity, PortId};
src/protocol/mod.rs
Show inline comments
 
mod arena;
 
mod eval;
 
pub(crate) mod eval;
 
pub(crate) mod input_source;
 
mod parser;
 
#[cfg(test)] mod tests;
src/runtime2/mod.rs
Show inline comments
 
new file 100644
 
mod runtime;
 
mod registry;
 
\ No newline at end of file
src/runtime2/registry.rs
Show inline comments
 
new file 100644
 
use std::collections::HashMap;
 

	
 
use crate::PortId;
 

	
 
struct PortDesc {
 
    port_id: u32,
 
    owning_connector_id: u32,
 
}
 

	
 
pub(crate) struct Registry {
 
    connector_counter: u32,
 
    port_counter: u32,
 
    ports: HashMap<PortId, PortDesc>
 
}
 
\ No newline at end of file
src/runtime2/runtime.rs
Show inline comments
 
new file 100644
 
use std::sync::Arc;
 

	
 
use crate::runtime::error as old_error;
 

	
 
use crate::Polarity;
 
use crate::protocol::*;
 
use crate::protocol::eval::*;
 

	
 
use super::registry::Registry;
 

	
 
enum AddComponentError {
 
    ModuleDoesNotExist,
 
    ConnectorDoesNotExist,
 
    InvalidArgumentType(usize), // value is index of (first) invalid argument
 
}
 

	
 
struct Runtime {
 
    protocol: Arc<ProtocolDescription>,
 

	
 
}
 

	
 
impl Runtime {
 
    pub fn new(pd: Arc<ProtocolDescription>) -> Self {
 
        Self{ protocol: pd }
 
    }
 

	
 
    pub fn add_component(&mut self, module: &str, procedure: &str, values: ValueGroup) -> Result<(), AddComponentError> {
 
        use AddComponentError as ACE;
 
        use old_error::AddComponentError as OldACE;
 

	
 
        // TODO: Allow the ValueGroup to contain any kind of value
 
        // TODO: Remove the responsibility of adding a component from the PD
 

	
 
        // Lookup module and the component
 
        // TODO: Remove this error enum translation
 
        let port_polarities = match self.protocol.component_polarities(module.as_bytes(), procedure.as_bytes()) {
 
            Ok(polarities) => polarities,
 
            Err(reason) => match reason {
 
                OldACE::NonPortTypeParameters => return Err(ACE::InvalidArgumentType(0)),
 
                OldACE::NoSuchModule => return Err(ACE::ModuleDoesNotExist),
 
                OldACE::NoSuchComponent => return Err(ACE::ModuleDoesNotExist),
 
                _ => unreachable!(),
 
            }
 
        };
 

	
 
        // Make sure supplied values (and types) are correct
 
        let mut ports = Vec::with_capacity(values.values.len());
 
        for (value_idx, value) in values.values.iter().enumerate() {
 
            let polarity = &port_polarities[value_idx];
 

	
 
            match value {
 
                Value::Input(port_id) => {
 
                    if *polarity != Polarity::Getter {
 
                        return Err(ACE::InvalidArgumentType(value_idx))
 
                    }
 

	
 
                    ports.push(*port_id);
 
                },
 
                Value::Output(port_id) => {
 
                    if *polarity != Polarity::Putter {
 
                        return Err(ACE::InvalidArgumentType(value_idx))
 
                    }
 

	
 
                    ports.push(*port_id);
 
                },
 
                _ => return Err(ACE::InvalidArgumentType(value_idx))
 
            }
 
        }
 

	
 
        // Instantiate the component
 
        let component_state = self.protocol.new_component(module.as_bytes(), procedure.as_bytes(), &ports);
 
    }
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)