Changeset - 6a7d3acfcb5e
[Not reviewed]
0 6 0
Christopher Esterhuyse - 5 years ago 2020-10-23 13:57:30
christopher.esterhuyse@gmail.com
logging feature exposed for ease of use
6 files changed with 103 insertions and 27 deletions:
0 comments (0 inline, 0 general)
Cargo.toml
Show inline comments
 
@@ -2,49 +2,50 @@
 
name = "reowolf_rs"
 
version = "0.1.4"
 
authors = [
 
	"Christopher Esterhuyse <esterhuy@cwi.nl, christopher.esterhuyse@gmail.com>",
 
	"Hans-Dieter Hiep <hdh@cwi.nl>"
 
]
 
edition = "2018"
 

	
 
[dependencies]
 
# convenience macros
 
maplit = "1.0.2"
 
derive_more = "0.99.2"
 

	
 
# runtime
 
bincode = "1.3.1"
 
serde = { version = "1.0.114", features = ["derive"] }
 
getrandom = "0.1.14" # tiny crate. used to guess controller-id
 

	
 
# network
 
mio = { version = "0.7.0", package = "mio", features = ["udp", "tcp", "os-poll"] }
 
socket2 = { version = "0.3.12", optional = true }
 

	
 
# protocol
 
backtrace = "0.3"
 
lazy_static = "1.4.0"
 

	
 
# ffi
 

	
 
# socket ffi
 
libc = { version = "^0.2", optional = true }
 
os_socketaddr = { version = "0.1.0", optional = true }
 

	
 
[dev-dependencies]
 
# test-generator = "0.3.0"
 
crossbeam-utils = "0.7.2"
 
lazy_static = "1.4.0"
 

	
 
[lib]
 
crate-type = [
 
	"rlib", # for use as a Rust dependency. 
 
	"cdylib" # for FFI use, typically C.
 
]
 

	
 
[features]
 
default = ["ffi"]
 
ffi = [] # see src/ffi/mod.rs
 
ffi_pseudo_socket_api = ["ffi", "libc", "os_socketaddr"]# see src/ffi/pseudo_socket_api.rs.
 
endpoint_logging = [] # see src/macros.rs
 
session_optimization = [] # see src/runtime/setup.rs
 
\ No newline at end of file
 
session_optimization = [] # see src/runtime/setup.rs
 
no_logging = [] # see src/macros.rs
 
\ No newline at end of file
README.md
Show inline comments
 
# Reowolf 1.0 Implementation
 

	
 
This library provides connectors as a generalization of sockets for use in communication over the internet. This repository is one of the deliverables of the [Reowolf project](https://nlnet.nl/project/Reowolf/) funded by the NLNet foundation.
 

	
 
## Compilation instructions
 
1. Install the latest stable Rust toolchain (`rustup install stable`) using the [rustup](https://rustup.rs/) CLI tool, available for most platforms.
 
1. Have `cargo` (the Rust language package manager) download source dependencies, and compile the library with release-level optimizations with `cargo build --release`: 
 
	- The resulting dylib can be found in `./target/release/`, to be used with the header file: `./reowolf.h`.
 
	- *Note*: A list of immediate ancestor dependencies is visible in `./Cargo.toml`.
 
	- *Note*: Run `cargo test --release` to run unit tests with release-level optimizations.
 
1. By default, will compile with logging turned ON and session optimization turned OFF. Control this by passing the desired set of feature flags in {"no_logging", "session_optimization"} to the compiler. For example, `cargo build --release --features no_logging`.
 

	
 
## Using the library
 
- The library may be used as a Rust dependency by adding it as a git dependency, i.e., by adding line `reowolf_rs = { git = "https://scm.cwi.nl/FM/reowolf" }` to downstream crate's manifest file, `Cargo.toml`.
 
- The library may be used as a dynamically-linked library using its C ABI as the cdylib written to `./target/release` when compiled with release optimizations, in combination to the header file `./reowolf.h`.
 
- When compiled on Linux, the compiled library will include definitions of pseudo-socket procedures declared in `./pseudo-socket.h` when compiled with `cargo build --release --features ffi_pseudo_socket_api`. The added functionality is only needed when requiring that connectors expose a socket-like API.
 

	
 
## Examples
 
The `examples` directory contains example usages of connectors for message passing over the internet. The programs within require that the library is compiled as a dylib (see above).
 

	
 
## Notes
 
3. Running `cbindgen > reowolf.h` from the root will overwrite the header file. This is only necessary to update it.  
 

	
 
## Short User Overview
 
The bulk of the library's functionality is exposed to the user in two types: 
 
1. `protocol::ProtocolDescription` 
 
1. `runtime::Connector` 
 

	
 
The former is created using `parse`. For the most part, the user is not expected to interact much with the structure, only passing it to the connector as a communication session is being set up.
 

	
 
The latter is created with `new`, configured with methods such as `new_net_port` and `add_component`, and connected via `connect`, whereafter it can be used for multi-party communication through methods `put`, `get`, `next_batch`, and `sync`.
 

	
 
## Contributor Overview
 
The details of the implementation are best understood by reading the doc comments, starting from the procedures listed in the section above. It is suggested to first/also refer to the Reowolf project's companion documentation (link TODO) for a higher level overview of the goals and design of the implementation.
 
\ No newline at end of file
src/macros.rs
Show inline comments
 
/*
 
Change the definition of these macros to control the logging level statically
 
*/
 

	
 
macro_rules! log {
 
    (@ENDPT, $logger:expr, $($arg:tt)*) => {{
 
        // if let Some(w) = $logger.line_writer() {
 
        //     let _ = writeln!(w, $($arg)*);
 
        // }
 
    }};
 
    ($logger:expr, $($arg:tt)*) => {{
 
        // if let Some(w) = $logger.line_writer() {
 
        //     let _ = writeln!(w, $($arg)*);
 
        // }
 
        #[cfg(not(feature = "no_logging"))]
 
        if let Some(w) = $logger.line_writer() {
 
            let _ = writeln!(w, $($arg)*);
 
        }
 
    }};
 
}
src/runtime/communication.rs
Show inline comments
 
@@ -27,96 +27,101 @@ struct NativeBranch {
 
}
 

	
 
// Manages a protocol component's speculative branches for the duration
 
// of the synchronous round.
 
#[derive(Debug)]
 
struct BranchingProtoComponent {
 
    branches: HashMap<Predicate, ProtoComponentBranch>,
 
}
 

	
 
// One specualtive branch of a protocol component.
 
// `ended` IFF this branch has reached SyncBlocker::SyncBlockEnd before.
 
#[derive(Debug, Clone)]
 
struct ProtoComponentBranch {
 
    state: ComponentState,
 
    inner: ProtoComponentBranchInner,
 
    ended: bool,
 
}
 

	
 
// A structure wrapping a set of three pointers, making it impossible
 
// to miss that they are being setup for `cyclic_drain`.
 
struct CyclicDrainer<'a, K: Eq + Hash, V> {
 
    input: &'a mut HashMap<K, V>,
 
    swap: &'a mut HashMap<K, V>,
 
    output: &'a mut HashMap<K, V>,
 
}
 

	
 
// Small convenience trait for extending the stdlib's bool type with
 
// an optionlike replace method for increasing brevity.
 
trait ReplaceBoolTrue {
 
    fn replace_with_true(&mut self) -> bool;
 
}
 

	
 
//////////////// IMPL ////////////////////////////
 

	
 
impl ReplaceBoolTrue for bool {
 
    fn replace_with_true(&mut self) -> bool {
 
        let was = *self;
 
        *self = true;
 
        !was
 
    }
 
}
 

	
 
// CuUndecided provides a mostly immutable view into the ConnectorUnphased structure,
 
// making it harder to accidentally mutate its contents in a way that cannot be rolled back.
 
impl CuUndecided for ConnectorUnphased {
 
    fn logger_and_protocol_description(&mut self) -> (&mut dyn Logger, &ProtocolDescription) {
 
        (&mut *self.logger, &self.proto_description)
 
    }
 
    fn logger_and_protocol_components(
 
        &mut self,
 
    ) -> (&mut dyn Logger, &mut HashMap<ComponentId, ComponentState>) {
 
        (&mut *self.logger, &mut self.proto_components)
 
    }
 
    fn logger(&mut self) -> &mut dyn Logger {
 
        &mut *self.logger
 
    }
 
    fn proto_description(&self) -> &ProtocolDescription {
 
        &self.proto_description
 
    }
 
    fn native_component_id(&self) -> ComponentId {
 
        self.native_component_id
 
    }
 
}
 
impl<'a, K, V> MapTempsGuard<'a, K, V> {
 
    fn reborrow(&mut self) -> MapTempsGuard<'_, K, V> {
 
        MapTempsGuard(self.0)
 
    }
 
    fn split_first_mut(self) -> (MapTempGuard<'a, K, V>, MapTempsGuard<'a, K, V>) {
 
        let (head, tail) = self.0.split_first_mut().expect("Cache exhausted");
 
        (MapTempGuard::new(head), MapTempsGuard(tail))
 
    }
 
}
 
impl<'a, K, V> MapTempGuard<'a, K, V> {
 
    fn new(map: &'a mut HashMap<K, V>) -> Self {
 
        assert!(map.is_empty()); // sanity check
 
        Self(map)
 
    }
 
}
 
impl<'a, K, V> Drop for MapTempGuard<'a, K, V> {
 
    fn drop(&mut self) {
 
        assert!(self.0.is_empty()); // sanity check
 
    }
 
}
 
impl<'a, K, V> Deref for MapTempGuard<'a, K, V> {
 
    type Target = HashMap<K, V>;
 
    fn deref(&self) -> &<Self as Deref>::Target {
 
        self.0
 
    }
 
}
 
impl<'a, K, V> DerefMut for MapTempGuard<'a, K, V> {
 
    fn deref_mut(&mut self) -> &mut <Self as Deref>::Target {
 
        self.0
 
    }
 
}
 
impl Connector {
 
    /// Read the message received by the given port in the previous synchronous round.
 
    pub fn gotten(&self, port: PortId) -> Result<&Payload, GottenError> {
 
        use GottenError as Ge;
 
        if let ConnectorPhased::Communication(comm) = &self.phased {
 
            match &comm.round_result {
 
                Err(_) => Err(Ge::PreviousSyncFailed),
 
@@ -229,97 +234,97 @@ impl Connector {
 
        }
 
    }
 

	
 
    // Attempts to complete the synchronous round for the given
 
    // communication-phased connector structure.
 
    // Modifies components and ports in `cu` IFF the round succeeds.
 
    #[inline]
 
    fn connected_sync(
 
        cu: &mut ConnectorUnphased,
 
        comm: &mut ConnectorCommunication,
 
        timeout: Option<Duration>,
 
    ) -> Result<Option<RoundEndedNative>, SyncError> {
 
        //////////////////////////////////
 
        use SyncError as Se;
 
        //////////////////////////////////
 

	
 
        // Create separate storages for ports and components stored in `cu`,
 
        // while kicking off the branching of components until the set of
 
        // components entering their synchronous block is finalized in `branching_proto_components`.
 
        // This is the last time cu's components and ports are accessed until the round is decided.
 
        let mut ips = cu.ips.clone();
 
        let mut branching_proto_components =
 
            HashMap::<ComponentId, BranchingProtoComponent>::default();
 
        let mut unrun_components: Vec<(ComponentId, ComponentState)> = cu
 
            .proto_components
 
            .iter()
 
            .map(|(&proto_id, proto)| (proto_id, proto.clone()))
 
            .collect();
 
        log!(cu.logger(), "Nonsync running {} proto components...", unrun_components.len());
 
        // initially, the set of components to run is the set of components stored by `cu`,
 
        // but they are eventually drained into `branching_proto_components`.
 
        // Some components exit first, and others are created and put into `unrun_components`.
 
        while let Some((proto_component_id, mut component)) = unrun_components.pop() {
 
            log!(
 
                cu.logger(),
 
                "Nonsync running proto component with ID {:?}. {} to go after this",
 
                proto_component_id,
 
                unrun_components.len()
 
            );
 
            let (logger, proto_description) = cu.logger_and_protocol_description();
 
            let mut ctx = NonsyncProtoContext {
 
                ips: &mut ips,
 
                logger,
 
                proto_component_id,
 
                unrun_components: &mut unrun_components,
 
            };
 
            let blocker = component.nonsync_run(&mut ctx, proto_description);
 
            log!(
 
                cu.logger(),
 
                logger,
 
                "proto component {:?} ran to nonsync blocker {:?}",
 
                proto_component_id,
 
                &blocker
 
            );
 
            use NonsyncBlocker as B;
 
            match blocker {
 
                B::ComponentExit => drop(component),
 
                B::Inconsistent => return Err(Se::InconsistentProtoComponent(proto_component_id)),
 
                B::SyncBlockStart => assert!(branching_proto_components
 
                    .insert(proto_component_id, BranchingProtoComponent::initial(component))
 
                    .is_none()), // Some(_) returned IFF some component identifier key is overwritten (BAD!)
 
            }
 
        }
 
        log!(
 
            cu.logger(),
 
            "All {} proto components are now done with Nonsync phase",
 
            branching_proto_components.len(),
 
        );
 

	
 
        // Create temporary structures needed for the synchronous phase of the round
 
        let mut rctx = RoundCtx {
 
            ips, // already used previously, now moved into RoundCtx
 
            solution_storage: {
 
                let subtree_id_iter = {
 
                    // Create an iterator over the identifiers of this
 
                    // connector's childen in the _solution tree_.
 
                    // Namely, the native, all locally-managed components,
 
                    // and all this connector's children in the _consensus tree_ (other connectors).
 
                    let n = std::iter::once(SubtreeId::LocalComponent(cu.native_component_id));
 
                    let c = branching_proto_components
 
                        .keys()
 
                        .map(|&cid| SubtreeId::LocalComponent(cid));
 
                    let e = comm
 
                        .neighborhood
 
                        .children
 
                        .iter()
 
                        .map(|&index| SubtreeId::NetEndpoint { index });
 
                    n.chain(c).chain(e)
 
                };
 
                log!(
 
                    cu.logger,
 
                    "Children in subtree are: {:?}",
 
                    DebuggableIter(subtree_id_iter.clone())
 
                );
 
                SolutionStorage::new(subtree_id_iter)
 
            },
 
            spec_var_stream: cu.ips.id_manager.new_spec_var_stream(),
 
            payload_inbox: Default::default(), // buffer for in-memory payloads to be handled
 
@@ -407,112 +412,113 @@ impl Connector {
 
                    predicate.clone(),
 
                );
 
            }
 
            if let Some(_) = branching_native.branches.insert(predicate, branch) {
 
                // thanks to the native_spec_var, each batch has a distinct predicate
 
                unreachable!()
 
            }
 
        }
 
        // restore the invariant: !native_batches.is_empty()
 
        comm.native_batches.push(Default::default());
 
        // Call to another big method; keep running this round
 
        // until a distributed decision is reached!
 
        log!(cu.logger(), "Searching for decision...");
 
        let decision = Self::sync_reach_decision(
 
            cu,
 
            comm,
 
            &mut branching_native,
 
            &mut branching_proto_components,
 
            &mut rctx,
 
        )?;
 
        log!(cu.logger(), "Committing to decision {:?}!", &decision);
 
        comm.endpoint_manager.udp_endpoints_round_end(&mut *cu.logger(), &decision)?;
 

	
 
        // propagate the decision to children
 
        let msg = Msg::CommMsg(CommMsg {
 
            round_index: comm.round_index,
 
            contents: CommMsgContents::CommCtrl(CommCtrlMsg::Announce {
 
                decision: decision.clone(),
 
            }),
 
        });
 
        log!(
 
            cu.logger(),
 
            "Announcing decision {:?} through child endpoints {:?}",
 
            &msg,
 
            &comm.neighborhood.children
 
        );
 
        for &child in comm.neighborhood.children.iter() {
 
            comm.endpoint_manager.send_to_comms(child, &msg)?;
 
        }
 
        let ret = match decision {
 
            Decision::Failure => {
 
                // untouched port/component fields of `cu` are NOT overwritten.
 
                // the result is a rollback.
 
                Err(Se::RoundFailure)
 
            }
 
            Decision::Success(predicate) => {
 
                // commit changes to component states
 
                cu.proto_components.clear();
 
                cu.proto_components.extend(
 
                let (logger, proto_components) = cu.logger_and_protocol_components();
 
                proto_components.extend(
 
                    // "flatten" branching components, committing the speculation
 
                    // consistent with the predicate decided upon.
 
                    branching_proto_components
 
                        .into_iter()
 
                        .map(|(cid, bpc)| (cid, bpc.collapse_with(&predicate))),
 
                        .map(|(cid, bpc)| (cid, bpc.collapse_with(logger, &predicate))),
 
                );
 
                // commit changes to ports and id_manager
 
                cu.ips = rctx.ips;
 
                log!(
 
                    cu.logger,
 
                    logger,
 
                    "End round with (updated) component states {:?}",
 
                    cu.proto_components.keys()
 
                    proto_components.keys()
 
                );
 
                cu.ips = rctx.ips;
 
                // consume native
 
                let round_ok = branching_native.collapse_with(&mut *cu.logger(), &predicate);
 
                let round_ok = branching_native.collapse_with(cu.logger(), &predicate);
 
                Ok(Some(round_ok))
 
            }
 
        };
 
        log!(cu.logger(), "Sync round ending! Cleaning up");
 
        ret
 
    }
 

	
 
    // Once the synchronous round has been started, this procedure
 
    // routs and handles payloads, receives control messages from neighboring connectors,
 
    // checks for timeout, and aggregates solutions until a distributed decision is reached.
 
    // The decision is either a solution (success case), or a distributed timeout rollback (failure case)
 
    // The final possible outcome is an unrecoverable error, which results from some fundamental misbehavior,
 
    // a network channel breaking, etc.
 
    fn sync_reach_decision(
 
        cu: &mut impl CuUndecided,
 
        comm: &mut ConnectorCommunication,
 
        branching_native: &mut BranchingNative,
 
        branching_proto_components: &mut HashMap<ComponentId, BranchingProtoComponent>,
 
        rctx: &mut RoundCtx,
 
    ) -> Result<Decision, UnrecoverableSyncError> {
 
        // The round is in progress, and now its just a matter of arriving at a decision.
 
        let mut already_requested_failure = false;
 
        if branching_native.branches.is_empty() {
 
            // An unsatisfiable native is the easiest way to detect failure
 
            log!(cu.logger(), "Native starts with no branches! Failure!");
 
            match comm.neighborhood.parent {
 
                Some(parent) => {
 
                    if already_requested_failure.replace_with_true() {
 
                        Self::request_failure(cu, comm, parent)?
 
                    } else {
 
                        log!(cu.logger(), "Already requested failure");
 
                    }
 
                }
 
                None => {
 
                    log!(cu.logger(), "No parent. Deciding on failure");
 
                    return Ok(Decision::Failure);
 
                }
 
            }
 
        }
 

	
 
        // Create a small set of "workspace" hashmaps, to be passed by-reference into various calls.
 
        // This is an optimization, avoiding repeated allocation.
 
        let mut pcb_temps_owner = <[HashMap<Predicate, ProtoComponentBranch>; 3]>::default();
 
        let mut pcb_temps = MapTempsGuard(&mut pcb_temps_owner);
 
        let mut bn_temp_owner = <HashMap<Predicate, NativeBranch>>::default();
 

	
 
        // first, we run every protocol component to their sync blocker.
 
        // Afterwards we establish a loop invariant: no new decision can be reached
 
@@ -927,96 +933,97 @@ impl BranchingNative {
 
        match e {
 
            Entry::Vacant(ev) => {
 
                // no existing branch present. We insert it no problem. (The most common case)
 
                ev.insert(branch);
 
            }
 
            Entry::Occupied(mut eo) => {
 
                // Oh dear, there is already a branch with this predicate.
 
                // Rather than choosing either branch, we MERGE them.
 
                // This means taking the UNION of their .gotten and the INTERSECTION of their .to_get
 
                let old = eo.get_mut();
 
                for (k, v) in branch.gotten.drain() {
 
                    if old.gotten.insert(k, v).is_none() {
 
                        // added a gotten element in `branch` not already in `old`
 
                        old.to_get.remove(&k);
 
                    }
 
                }
 
            }
 
        }
 
    }
 

	
 
    // Given the predicate for the round's solution, collapse this
 
    // branching native to an ended branch whose predicate is consistent with it.
 
    // return as `RoundEndedNative` the result of a native completing successful round
 
    fn collapse_with(
 
        self,
 
        logger: &mut dyn Logger,
 
        solution_predicate: &Predicate,
 
    ) -> RoundEndedNative {
 
        log!(
 
            logger,
 
            "Collapsing native with {} branch preds {:?}",
 
            self.branches.len(),
 
            self.branches.keys()
 
        );
 
        for (branch_predicate, branch) in self.branches {
 
            log!(
 
                logger,
 
                "Considering native branch {:?} with to_get {:?} gotten {:?}",
 
                &branch_predicate,
 
                &branch.to_get,
 
                &branch.gotten
 
            );
 
            if branch.is_ended() && branch_predicate.assigns_subset(solution_predicate) {
 
                let NativeBranch { index, gotten, .. } = branch;
 
                log!(logger, "Collapsed native has gotten {:?}", &gotten);
 
                return RoundEndedNative { batch_index: index, gotten };
 
            }
 
        }
 
        log!(logger, "Native had no branches matching pred {:?}", solution_predicate);
 
        panic!("Native had no branches matching pred {:?}", solution_predicate);
 
    }
 
}
 
impl BranchingProtoComponent {
 
    // Create a singleton-branch branching protocol component as
 
    // speculation begins, with the given protocol state.
 
    fn initial(state: ComponentState) -> Self {
 
        let branch = ProtoComponentBranch { state, inner: Default::default(), ended: false };
 
        Self { branches: hashmap! { Predicate::default() => branch } }
 
    }
 

	
 
    // run all the given branches (cd.input) to their SyncBlocker,
 
    // populating cd.output by cyclically draining "input" -> "cd."input" / cd.output.
 
    // (to prevent concurrent r/w of one structure, we realize "input" as cd.input for reading and cd.swap for writing)
 
    // This procedure might lose branches, and it might create new branches.
 
    fn drain_branches_to_blocked(
 
        cd: CyclicDrainer<Predicate, ProtoComponentBranch>,
 
        cu: &mut impl CuUndecided,
 
        rctx: &mut RoundCtx,
 
        proto_component_id: ComponentId,
 
    ) -> Result<(), UnrecoverableSyncError> {
 
        // let CyclicDrainer { input, swap, output } = cd;
 
        while !cd.input.is_empty() {
 
            'branch_iter: for (mut predicate, mut branch) in cd.input.drain() {
 
                let mut ctx = SyncProtoContext {
 
                    rctx,
 
                    predicate: &predicate,
 
                    branch_inner: &mut branch.inner,
 
                };
 
                // Run this component's state to the next syncblocker for handling
 
                let blocker = branch.state.sync_run(&mut ctx, cu.proto_description());
 
                log!(
 
                    cu.logger(),
 
                    "Proto component with id {:?} branch with pred {:?} hit blocker {:?}",
 
                    proto_component_id,
 
                    &predicate,
 
                    &blocker,
 
                );
 
                use SyncBlocker as B;
 
                match blocker {
 
                    B::Inconsistent => drop((predicate, branch)), // EXPLICIT inconsistency
 
                    B::CouldntReadMsg(port) => {
 
                        // sanity check: `CouldntReadMsg` returned IFF the message is unavailable
 
                        assert!(!branch.inner.inbox.contains_key(&port));
 
                        // This branch hit a proper blocker: progress awaits the receipt of some message. Exit the cycle.
 
                        Self::insert_branch_merging(cd.output, predicate, branch);
 
                    }
 
                    B::CouldntCheckFiring(port) => {
 
@@ -1169,104 +1176,109 @@ impl BranchingProtoComponent {
 
                    log!(cu.logger(), "Forking this branch with new predicate {:?}", &predicate2);
 
                    let mut branch2 = branch.clone();
 
                    branch2.feed_msg(getter, send_payload_msg.payload.clone());
 
                    // the branch that receives the message is unblocked, the original one is blocked
 
                    Self::insert_branch_merging(&mut blocked, predicate, branch);
 
                    Self::insert_branch_merging(&mut unblocked, predicate2, branch2);
 
                }
 
            }
 
        }
 
        log!(cu.logger(), "blocked {:?} unblocked {:?}", blocked.len(), unblocked.len());
 
        // drain from unblocked --> blocked
 
        let (swap, _pcb_temps) = pcb_temps.split_first_mut(); // peel off ONE temp storage map
 
        let cd = CyclicDrainer { input: unblocked.0, swap: swap.0, output: blocked.0 };
 
        BranchingProtoComponent::drain_branches_to_blocked(cd, cu, rctx, proto_component_id)?;
 
        // swap the blocked branches back
 
        std::mem::swap(blocked.0, &mut self.branches);
 
        log!(cu.logger(), "component settles down with branches: {:?}", self.branches.keys());
 
        Ok(())
 
    }
 

	
 
    // Insert a new speculate branch into the given storage,
 
    // MERGING it with an existing branch if their predicate keys clash.
 
    fn insert_branch_merging(
 
        branches: &mut HashMap<Predicate, ProtoComponentBranch>,
 
        predicate: Predicate,
 
        mut branch: ProtoComponentBranch,
 
    ) {
 
        let e = branches.entry(predicate);
 
        use std::collections::hash_map::Entry;
 
        match e {
 
            Entry::Vacant(ev) => {
 
                // no existing branch present. We insert it no problem. (The most common case)
 
                ev.insert(branch);
 
            }
 
            Entry::Occupied(mut eo) => {
 
                // Oh dear, there is already a branch with this predicate.
 
                // Rather than choosing either branch, we MERGE them.
 
                // This means keeping the existing one in-place, and giving it the UNION of the inboxes
 
                let old = eo.get_mut();
 
                for (k, v) in branch.inner.inbox.drain() {
 
                    old.inner.inbox.insert(k, v);
 
                }
 
            }
 
        }
 
    }
 

	
 
    // Given the predicate for the round's solution, collapse this
 
    // branching native to an ended branch whose predicate is consistent with it.
 
    fn collapse_with(self, solution_predicate: &Predicate) -> ComponentState {
 
    fn collapse_with(
 
        self,
 
        logger: &mut dyn Logger,
 
        solution_predicate: &Predicate,
 
    ) -> ComponentState {
 
        let BranchingProtoComponent { branches } = self;
 
        for (branch_predicate, branch) in branches {
 
            if branch.ended && branch_predicate.assigns_subset(solution_predicate) {
 
                let ProtoComponentBranch { state, .. } = branch;
 
                return state;
 
            }
 
        }
 
        log!(logger, "ProtoComponent had no branches matching pred {:?}", solution_predicate);
 
        panic!("ProtoComponent had no branches matching pred {:?}", solution_predicate);
 
    }
 
}
 
impl ProtoComponentBranch {
 
    // Feed this branch received message.
 
    // It's safe to receive the same message repeatedly,
 
    // but if we receive a message with different contents,
 
    // it's a sign something has gone wrong! keys of type (port, round, predicate)
 
    // should always map to at most one message value!
 
    fn feed_msg(&mut self, getter: PortId, payload: Payload) {
 
        let e = self.inner.inbox.entry(getter);
 
        use std::collections::hash_map::Entry;
 
        match e {
 
            Entry::Vacant(ev) => {
 
                // new message
 
                ev.insert(payload);
 
            }
 
            Entry::Occupied(eo) => {
 
                // redundant recv. can happen as a result of a
 
                // component A having two branches X and Y related by
 
                assert_eq!(eo.get(), &payload);
 
            }
 
        }
 
    }
 
}
 
impl SolutionStorage {
 
    // Create a new solution storage, to manage the local solutions for
 
    // this connector and all of it's children (subtrees) in the solution tree.
 
    fn new(subtree_ids: impl Iterator<Item = SubtreeId>) -> Self {
 
        // For easy iteration, we store this SubtreeId => {Predicate}
 
        // structure instead as a pair of structures: a vector of predicate sets,
 
        // and a subtree_id-to-index lookup map
 
        let mut subtree_id_to_index: HashMap<SubtreeId, usize> = Default::default();
 
        let mut subtree_solutions = vec![];
 
        for id in subtree_ids {
 
            subtree_id_to_index.insert(id, subtree_solutions.len());
 
            subtree_solutions.push(Default::default())
 
        }
 
        // new_local U old_local represents the solutions of this connector itself:
 
        // namely, those that can be created from the union of one element from each child's solution set.
 
        // The difference between new and old is that new stores those NOT YET sent over the network
 
        // to this connector's parent in the solution tree.
 
        // invariant: old_local and new_local have an empty intersection
 
        Self {
 
            subtree_solutions,
 
            subtree_id_to_index,
 
            old_local: Default::default(),
 
            new_local: Default::default(),
src/runtime/mod.rs
Show inline comments
 
@@ -380,96 +380,99 @@ struct Predicate {
 
// (which are leaves in the solution tree), or other connectors reachable through the given
 
// network endpoint (which are internal nodes in the solution tree).
 
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
enum SubtreeId {
 
    LocalComponent(ComponentId),
 
    NetEndpoint { index: usize },
 
}
 

	
 
// An accumulation of the connector's knowledge of all (a) the local solutions its children
 
// in the solution tree have found, and (b) its own solutions derivable from those of its children.
 
// This structure starts off each round with an empty set, and accumulates solutions as they are found
 
// by local components, or received over the network in control messages.
 
// IMPORTANT: solutions, once found, don't go away until the end of the round. That is to
 
// say that these sets GROW until the round is over, and all solutions are reset.
 
#[derive(Debug)]
 
struct SolutionStorage {
 
    // invariant: old_local U new_local solutions are those that can be created from
 
    // the UNION of one element from each set in `subtree_solution`.
 
    // invariant is maintained by potentially populating new_local whenever subtree_solutions is populated.
 
    old_local: HashSet<Predicate>, // already sent to this connector's parent OR decided
 
    new_local: HashSet<Predicate>, // not yet sent to this connector's parent OR decided
 
    // this pair acts as SubtreeId -> HashSet<Predicate> which is friendlier to iteration
 
    subtree_solutions: Vec<HashSet<Predicate>>,
 
    subtree_id_to_index: HashMap<SubtreeId, usize>,
 
}
 

	
 
// Stores the transient data of a synchronous round.
 
// Some of it is for bookkeeping, and the rest is a temporary mirror of fields of
 
// `ConnectorUnphased`, such that any changes are safely contained within RoundCtx,
 
// and can be undone if the round fails.
 
struct RoundCtx {
 
    solution_storage: SolutionStorage,
 
    spec_var_stream: SpecVarStream,
 
    payload_inbox: Vec<(PortId, SendPayloadMsg)>,
 
    deadline: Option<Instant>,
 
    ips: IdAndPortState,
 
}
 

	
 
// A trait intended to limit the access of the ConnectorUnphased structure
 
// such that we don't accidentally modify any important component/port data
 
// while the results of the round are undecided. Why? Any actions during Connector::sync
 
// are _speculative_ until the round is decided, and we need a safe way of rolling
 
// back any changes.
 
trait CuUndecided {
 
    fn logger(&mut self) -> &mut dyn Logger;
 
    fn proto_description(&self) -> &ProtocolDescription;
 
    fn native_component_id(&self) -> ComponentId;
 
    fn logger_and_protocol_description(&mut self) -> (&mut dyn Logger, &ProtocolDescription);
 
    fn logger_and_protocol_components(
 
        &mut self,
 
    ) -> (&mut dyn Logger, &mut HashMap<ComponentId, ComponentState>);
 
}
 

	
 
// Represents a set of synchronous port operations that the native component
 
// has described as an "option" for completing during the synchronous rounds.
 
// Operations contained here succeed together or not at all.
 
// A native with N=2+ batches are expressing an N-way nondeterministic choice
 
#[derive(Debug, Default)]
 
struct NativeBatch {
 
    // invariant: putters' and getters' polarities respected
 
    to_put: HashMap<PortId, Payload>,
 
    to_get: HashSet<PortId>,
 
}
 

	
 
// Parallels a mio::Token type, but more clearly communicates
 
// the way it identifies the evented structre it corresponds to.
 
// See runtime/setup for methods converting between TokenTarget and mio::Token
 
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
 
enum TokenTarget {
 
    NetEndpoint { index: usize },
 
    UdpEndpoint { index: usize },
 
}
 

	
 
// Returned by the endpoint manager as a result of comm_recv, telling the connector what happened,
 
// such that it can know when to continue polling, and when to block.
 
enum CommRecvOk {
 
    TimeoutWithoutNew,
 
    NewPayloadMsgs,
 
    NewControlMsg { net_index: usize, msg: CommCtrlMsg },
 
}
 
////////////////
 
fn err_would_block(err: &std::io::Error) -> bool {
 
    err.kind() == std::io::ErrorKind::WouldBlock
 
}
 
impl<T: std::cmp::Ord> VecSet<T> {
 
    fn new(mut vec: Vec<T>) -> Self {
 
        // establish the invariant
 
        vec.sort();
 
        vec.dedup();
 
        Self { vec }
 
    }
 
    fn contains(&self, element: &T) -> bool {
 
        self.vec.binary_search(element).is_ok()
 
    }
 
    // Insert the given element. Returns whether it was already present.
 
    fn insert(&mut self, element: T) -> bool {
 
        match self.vec.binary_search(&element) {
 
            Ok(_) => false,
 
            Err(index) => {
src/runtime/tests.rs
Show inline comments
 
@@ -1267,107 +1267,165 @@ fn count_stream() {
 
        }
 
    }
 
    ";
 
    let pd = reowolf::ProtocolDescription::parse(pdl).unwrap();
 
    let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd));
 

	
 
    // setup a session between (a) native, and (b) sequencer3, connected by 3 ports.
 
    let [p0, g0] = c.new_port_pair();
 
    c.add_component(b"count_stream", &[p0]).unwrap();
 
    c.connect(None).unwrap();
 

	
 
    for expecting in 0u8..16 {
 
        c.get(g0).unwrap();
 
        c.sync(None).unwrap();
 
        assert_eq!(&[expecting], c.gotten(g0).unwrap().as_slice());
 
    }
 
}
 

	
 
#[test]
 
fn for_msg_byte() {
 
    let test_log_path = Path::new("./logs/for_msg_byte");
 
    let pdl = b"
 
    primitive for_msg_byte(out o) {
 
        byte i = 0;
 
        while(i<8) {
 
            msg m = create(1);
 
            m[0] = i;
 
            synchronous() put(o, m);
 
            i++;
 
        }
 
    }
 
    ";
 
    let pd = reowolf::ProtocolDescription::parse(pdl).unwrap();
 
    let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd));
 

	
 
    // setup a session between (a) native, and (b) sequencer3, connected by 3 ports.
 
    let [p0, g0] = c.new_port_pair();
 
    c.add_component(b"for_msg_byte", &[p0]).unwrap();
 
    c.connect(None).unwrap();
 

	
 
    for expecting in 0u8..8 {
 
        c.get(g0).unwrap();
 
        c.sync(None).unwrap();
 
        assert_eq!(&[expecting], c.gotten(g0).unwrap().as_slice());
 
    }
 
    c.sync(None).unwrap();
 
}
 

	
 
#[test]
 
fn eq_causality() {
 
    let test_log_path = Path::new("./logs/eq_no_causality");
 
    let pdl = b"
 
    primitive eq(in a, in b, out c) {
 
        msg ma = null;
 
        msg mb = null;
 
        while(true) synchronous {
 
            if(fires(a)) {
 
                // b and c also fire!
 
                // left first!
 
                ma = get(a);
 
                put(c, ma);
 
                mb = get(b);
 
                assert(ma == mb);
 
            }
 
        }
 
    }
 
    ";
 
    let pd = reowolf::ProtocolDescription::parse(pdl).unwrap();
 
    let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd));
 

	
 
    /*
 
    [native]p0-->g0[eq]p1--.
 
                 g1        |
 
                 ^---------`
 
    */
 
    let [p0, g0] = c.new_port_pair();
 
    let [p1, g1] = c.new_port_pair();
 
    c.add_component(b"eq", &[g0, g1, p1]).unwrap();
 

	
 
    /*
 
                  V--------.
 
                 g2        |
 
    [native]p2-->g3[eq]p3--`
 
    */
 
    let [p2, g2] = c.new_port_pair();
 
    let [p3, g3] = c.new_port_pair();
 
    c.add_component(b"eq", &[g3, g2, p3]).unwrap();
 
    c.connect(None).unwrap();
 

	
 
    for _ in 0..4 {
 
        // everything is fine with LEFT FIRST
 
        c.put(p0, TEST_MSG.clone()).unwrap();
 
        c.sync(MS100).unwrap();
 

	
 
        // no solution when left is NOT FIRST
 
        c.put(p2, TEST_MSG.clone()).unwrap();
 
        c.sync(MS100).unwrap_err();
 
    }
 
}
 

	
 
#[test]
 
fn eq_no_causality() {
 
    let test_log_path = Path::new("./logs/eq_no_causality");
 
    let pdl = b"
 
    composite eq(in a, in b, out c) {
 
        channel leftfirsto -> leftfirsti;
 
        new eqinner(a, b, c, leftfirsto, leftfirsti);
 
    }
 
    primitive eqinner(in a, in b, out c, out leftfirsto, in leftfirsti) {
 
        msg ma = null;
 
        msg mb = null;
 
        while(true) synchronous {
 
            if(fires(leftfirsti)) {
 
                // left first! DO USE DUMMY
 
                ma = get(a);
 
                put(c, ma);
 
                mb = get(b);
 

	
 
                // using dummy!
 
                put(leftfirsto, ma);
 
                get(leftfirsti);
 
            } else {
 
                // right first! DON'T USE DUMMY
 
                mb = get(b);
 
                put(c, mb);
 
                ma = get(a);
 
            if(fires(a)) {
 
                // b and c also fire!
 
                if(fires(leftfirsti)) {
 
                    // left first! DO USE DUMMY
 
                    ma = get(a);
 
                    put(c, ma);
 
                    mb = get(b);
 

	
 
                    // using dummy!
 
                    put(leftfirsto, ma);
 
                    get(leftfirsti);
 
                } else {
 
                    // right first! DON'T USE DUMMY
 
                    mb = get(b);
 
                    put(c, mb);
 
                    ma = get(a);
 
                }
 
                assert(ma == mb);
 
            }
 
            assert(ma == mb);
 
        }
 
    }
 
    ";
 
    let pd = reowolf::ProtocolDescription::parse(pdl).unwrap();
 
    let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd));
 

	
 
    /*
 
    [native]p0-->g0[eq]p1--.
 
                 g1        |
 
                 ^---------`
 
    */
 
    let [p0, g0] = c.new_port_pair();
 
    let [p1, g1] = c.new_port_pair();
 
    c.add_component(b"eq", &[g0, g1, p1]).unwrap();
 

	
 
    /*
 
                  V--------.
 
                 g2        |
 
    [native]p2-->g3[eq]p3--`
 
    */
 
    let [p2, g2] = c.new_port_pair();
 
    let [p3, g3] = c.new_port_pair();
 
    c.add_component(b"eq", &[g3, g2, p3]).unwrap();
 
    c.connect(None).unwrap();
 

	
 
    for _ in 0..32 {
 
        // ok when they send
 
        c.put(p0, TEST_MSG.clone()).unwrap();
 
        c.put(p2, TEST_MSG.clone()).unwrap();
 
        c.sync(SEC1).unwrap();
 
        // ok when they don't
 
        c.sync(SEC1).unwrap();
 
    }
 
}
0 comments (0 inline, 0 general)