Changeset - 63f3efc9c886
[Not reviewed]
0 8 1
Christopher Esterhuyse - 5 years ago 2020-06-26 09:26:02
christopher.esterhuyse@gmail.com
more logging, tests and comments. re-implemented checking for consistency for natives (eagerly) and proto components (after run_to_blocker). deduplicated each connector's failure requests toward the sink
9 files changed with 275 insertions and 91 deletions:
0 comments (0 inline, 0 general)
examples/6_amy_log.txt
Show inline comments
 
new file 100644
 
CID(3382080479): Created with connector_id 3382080479
 
CID(3382080479): Added port pair (out->in) ptID(3382080479'0) -> ptID(3382080479'1)
 
CID(3382080479): ~~~ CONNECT called timeout None
 
CID(3382080479): Successfully connected 0 endpoints
 
CID(3382080479): beginning neighborhood construction
 
CID(3382080479): Edge case of no neighbors! No parent an no children!
 
CID(3382080479): Successfully created neighborhood Neighborhood { parent: None, children: {} }
 
CID(3382080479): Beginning session optimization
 
CID(3382080479): Gathered all children's maps. ConnectorId set is... []
 
CID(3382080479): Inserting my own info. Unoptimized subtree map is {3382080479: SessionInfo { serde_proto_description: SerdeProtocolDescription((A big honkin' protocol description)), port_info: PortInfo { polarities: {ptID(3382080479'1): Getter, ptID(3382080479'0): Putter}, peers: {ptID(3382080479'0): ptID(3382080479'1), ptID(3382080479'1): ptID(3382080479'0)}, routes: {ptID(3382080479'0): LocalComponent(Native), ptID(3382080479'1): LocalComponent(Native)} }, proto_components: {} }}
 
CID(3382080479): I am the leader! I will optimize this session
 
CID(3382080479): Session map optimize START
 
CID(3382080479): Session map optimize END
 
CID(3382080479): Optimized info map is {3382080479: SessionInfo { serde_proto_description: SerdeProtocolDescription((A big honkin' protocol description)), port_info: PortInfo { polarities: {ptID(3382080479'1): Getter, ptID(3382080479'0): Putter}, peers: {ptID(3382080479'0): ptID(3382080479'1), ptID(3382080479'1): ptID(3382080479'0)}, routes: {ptID(3382080479'0): LocalComponent(Native), ptID(3382080479'1): LocalComponent(Native)} }, proto_components: {} }}. Sending to children Iter([])
 
CID(3382080479): All session info dumped!: {
 
    3382080479: SessionInfo {
 
        serde_proto_description: SerdeProtocolDescription(
 
            (A big honkin' protocol description),
 
        ),
 
        port_info: PortInfo {
 
            polarities: {
 
                ptID(3382080479'1): Getter,
 
                ptID(3382080479'0): Putter,
 
            },
 
            peers: {
 
                ptID(3382080479'0): ptID(3382080479'1),
 
                ptID(3382080479'1): ptID(3382080479'0),
 
            },
 
            routes: {
 
                ptID(3382080479'0): LocalComponent(
 
                    Native,
 
                ),
 
                ptID(3382080479'1): LocalComponent(
 
                    Native,
 
                ),
 
            },
 
        },
 
        proto_components: {},
 
    },
 
}
 
CID(3382080479): Session optimizations applied
 
CID(3382080479): connect() finished. setup phase complete
 
CID(3382080479): ~~~ SYNC called with timeout Some(5s); starting round 0
 
CID(3382080479): Nonsync running 0 proto components...
 
CID(3382080479): All 0 proto components are now done with Nonsync phase
 
CID(3382080479): Solution storage initialized
 
CID(3382080479): Translating 1 native batches into branches...
 
CID(3382080479): Native branch index=0 contains internal inconsistency wrt. fvID(3382080479'1). Skipping
 
CID(3382080479): Native starts with no branches! Failure!
 
CID(3382080479): No parent. Deciding on failure
 
CID(3382080479): Committing to decision Failure!
 
CID(3382080479): Announcing decision CommMsg(CommMsg { round_index: 0, contents: Announce { decision: Failure } }) through child endpoints {}
 
CID(3382080479): Connector dropping. Goodbye!
examples/6_atomic/amy.c
Show inline comments
 
#include <stdio.h>
 
#include <string.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 

	
 
int main(int argc, char** argv) {
 
	char * pdl_ptr = buffer_pdl("eg_protocols.pdl");
 
	size_t pdl_len = strlen(pdl_ptr);
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl_ptr, pdl_len);
 
	Connector * c = connector_new(pd);
 
	char logpath[] = "./6_amy_log.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
	printf("Err %s\n", reowolf_error_peek(NULL));
 
	
 
	PortId putter, getter;
 
	connector_add_port_pair(c, &putter, &getter);
 
	connector_connect(c, -1);
 
	connector_print_debug(c);
 
	
 
	printf("Let's try to put without get\n");
 
	connector_put_bytes(c, putter, "hello", 5);
 
	// connector_get(c, getter);
 
	
 
	int err = connector_sync(c, 5000);
 
	printf("Error code %d with string `%s`\n", err, reowolf_error_peek(NULL));
 
	
 
	/*
 
	printf("Let's try again, doing both\n");
 
	connector_put_bytes(c, putter, "hello", 5);
 
	connector_get(c, getter);
 
	err = connector_sync(c, 5000);
 
	printf("Error code %d with string `%s`\n", err, reowolf_error_peek(NULL));
 
	size_t msg_len;
 
	const char * msg_ptr = connector_gotten_bytes(c, getter, &msg_len);
 
	printf("Got msg `%.*s`\n", msg_len, msg_ptr);
 
	*/
 
	
 
	protocol_description_destroy(pd);
 
	connector_destroy(c);
 
	free(pdl_ptr);
 
	return 0;
 
}
 
\ No newline at end of file
examples/reowolf_rs.dll
Show inline comments
 
binary diff not shown
reowolf.h
Show inline comments
 
/* CBindgen generated */
 

	
 
#ifndef REOWOLF_HEADER_DEFINED
 
#define REOWOLF_HEADER_DEFINED
 

	
 
#include <stdarg.h>
 
#include <stdbool.h>
 
#include <stdint.h>
 
#include <stdlib.h>
 

	
 
typedef enum {
 
  Active,
 
  Passive,
 
} EndpointPolarity;
 

	
 
typedef enum {
 
  Putter,
 
  Getter,
 
} Polarity;
 

	
 
typedef struct Arc_ProtocolDescription Arc_ProtocolDescription;
 

	
 
typedef struct Connector Connector;
 

	
 
typedef int32_t ErrorCode;
 

	
 
typedef uint32_t ConnectorId;
 

	
 
typedef uint32_t PortSuffix;
 

	
 
typedef struct {
 
  ConnectorId connector_id;
 
  PortSuffix u32_suffix;
 
} Id;
 

	
 
typedef Id PortId;
 

	
 
/**
 
 * Given
 
 * - an initialized connector in setup or connecting state,
 
 * - a string slice for the component's identifier in the connector's configured protocol description,
 
 * - a set of ports (represented as a slice; duplicates are ignored) in the native component's interface,
 
 * the connector creates a new (internal) protocol component C, such that the set of native ports are moved to C.
 
 * Usable in {setup, communication} states.
 
 */
 
ErrorCode connector_add_component(Connector *connector,
 
                                  const uint8_t *ident_ptr,
 
                                  uintptr_t ident_len,
 
                                  const PortId *ports_ptr,
 
                                  uintptr_t ports_len);
 

	
 
/**
 
 * Given
 
 * - an initialized connector in setup or connecting state,
 
 * - a utf-8 encoded socket address,
 
 * - the logical polarity of P,
 
 * - the "physical" polarity in {Active, Passive} of the endpoint through which P's peer will be discovered,
 
 * returns P, a port newly added to the native interface.
 
 */
 
ErrorCode connector_add_net_port(Connector *connector,
 
                                 const uint8_t *addr_str_ptr,
 
                                 uintptr_t addr_str_len,
 
                                 Polarity port_polarity,
 
                                 EndpointPolarity endpoint_polarity,
 
                                 PortId *port);
 

	
 
/**
 
 * Given an initialized connector in setup or connecting state,
 
 * - Creates a new directed port pair with logical channel putter->getter,
 
 * - adds the ports to the native component's interface,
 
 * - and returns them using the given out pointers.
 
 * Usable in {setup, communication} states.
 
 */
 
void connector_add_port_pair(Connector *connector, PortId *out_putter, PortId *out_getter);
 

	
 
/**
 
 * Connects this connector to the distributed system of connectors reachable through endpoints,
 
 * Usable in setup state, and changes the state to communication.
 
 */
 
ErrorCode connector_connect(Connector *connector, int64_t timeout_millis);
 

	
 
/**
 
 * Destroys the given a pointer to the connector on the heap, freeing its resources.
 
 * Usable in {setup, communication} states.
 
 */
 
void connector_destroy(Connector *connector);
 

	
 
ErrorCode connector_get(Connector *connector, PortId port);
 

	
 
const uint8_t *connector_gotten_bytes(Connector *connector, PortId port, uintptr_t *len);
 

	
 
/**
 
 * Allocates a new connector on the heap and returning a pointer,
 
 * given an initialized protocol description.
 
 */
 
Connector *connector_new(const Arc_ProtocolDescription *pd);
 

	
 
/**
 
 * Initializes `out` with a new connector using the given protocol description as its configuration.
 
 * The connector uses the given (internal) connector ID.
 
 */
 
Connector *connector_new_with_id(const Arc_ProtocolDescription *pd, ConnectorId cid);
 
Connector *connector_new(const Arc_ProtocolDescription *pd);
 

	
 
Connector *connector_new_logging(const Arc_ProtocolDescription *pd,
 
                                 const uint8_t *path_ptr,
 
                                 uintptr_t path_len);
 

	
 
intptr_t connector_next_batch(Connector *connector);
 

	
 
void connector_print_debug(Connector *connector);
 

	
 
/**
 
 * Convenience function combining the functionalities of
 
 * "payload_new" with "connector_put_payload".
 
 */
 
ErrorCode connector_put_bytes(Connector *connector,
 
                              PortId port,
 
                              const uint8_t *bytes_ptr,
 
                              uintptr_t bytes_len);
 

	
 
intptr_t connector_sync(Connector *connector, int64_t timeout_millis);
 

	
 
/**
 
 * Given an initialized protocol description, initializes `out` with a clone which can be independently created or destroyed.
 
 */
 
Arc_ProtocolDescription *protocol_description_clone(const Arc_ProtocolDescription *pd);
 

	
 
/**
 
 * Destroys the given initialized protocol description and frees its resources.
 
 */
 
void protocol_description_destroy(Arc_ProtocolDescription *pd);
 

	
 
/**
 
 * Parses the utf8-encoded string slice to initialize a new protocol description object.
 
 * - On success, initializes `out` and returns 0
 
 * - On failure, stores an error string (see `reowolf_error_peek`) and returns -1
 
 */
 
Arc_ProtocolDescription *protocol_description_parse(const uint8_t *pdl, uintptr_t pdl_len);
 

	
 
/**
 
 * Returns length (via out pointer) and pointer (via return value) of the last Reowolf error.
 
 * - pointer is NULL iff there was no last error
 
 * - data at pointer is null-delimited
 
 * - len does NOT include the length of the null-delimiter
 
 * If len is NULL, it will not written to.
 
 */
 
const uint8_t *reowolf_error_peek(uintptr_t *len);
 

	
 
#endif /* REOWOLF_HEADER_DEFINED */
src/common.rs
Show inline comments
 
///////////////////// PRELUDE /////////////////////
 

	
 
pub use crate::protocol::{ComponentState, ProtocolDescription};
 
pub use crate::runtime::{NonsyncProtoContext, SyncProtoContext};
 

	
 
pub use core::{
 
    cmp::Ordering,
 
    fmt::{Debug, Formatter},
 
    hash::{Hash, Hasher},
 
    ops::{Range, RangeFrom},
 
    time::Duration,
 
};
 
pub use indexmap::{IndexMap, IndexSet};
 
pub use maplit::{hashmap, hashset};
 
pub use mio::{
 
    net::{TcpListener, TcpStream},
 
    Events, Interest, Poll, Token,
 
};
 
pub use std::{
 
    collections::{hash_map::Entry, BTreeMap, HashMap, HashSet},
 
    convert::TryInto,
 
    io::{Read, Write},
 
    net::SocketAddr,
 
    sync::Arc,
 
    time::Instant,
 
};
 
pub use Polarity::*;
 

	
 
///////////////////// DEFS /////////////////////
 

	
 
pub type ConnectorId = u32;
 
pub type PortSuffix = u32;
 

	
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
#[repr(C)]
 
pub struct Id {
 
    pub(crate) connector_id: ConnectorId,
 
    pub(crate) u32_suffix: PortSuffix,
 
}
 

	
 
#[derive(Debug, Default)]
 
pub struct U32Stream {
 
    next: u32,
 
}
 

	
 
// globally unique
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
#[repr(transparent)]
 
pub struct PortId(Id);
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
pub struct FiringVar(pub(crate) PortId);
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
pub struct ProtoComponentId(Id);
 

	
 
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
 
// #[repr(C)]
 
pub struct Payload(Arc<Vec<u8>>);
 

	
 
#[derive(
 
    Debug, Eq, PartialEq, Clone, Hash, Copy, Ord, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
#[repr(C)]
 
pub enum Polarity {
 
    Putter, // output port (from the perspective of the component)
 
    Getter, // input port (from the perspective of the component)
 
}
 
#[derive(
 
    Debug, Eq, PartialEq, Clone, Hash, Copy, Ord, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
#[repr(C)]
 
pub enum EndpointPolarity {
 
    Active,  // calls connect()
 
    Passive, // calls bind() listen() accept()
 
}
 

	
 
#[derive(Eq, PartialEq, Copy, Clone, Debug)]
 
pub enum AddComponentError {
 
    NoSuchComponent,
 
    NonPortTypeParameters,
 
    CannotMovePort(PortId),
 
    WrongNumberOfParamaters { expected: usize },
 
    UnknownPort(PortId),
 
    WrongPortPolarity { port: PortId, expected_polarity: Polarity },
 
    DuplicateMovedPort(PortId),
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub enum NonsyncBlocker {
 
    Inconsistent,
 
    ComponentExit,
 
    SyncBlockStart,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub enum SyncBlocker {
 
    Inconsistent,
 
    SyncBlockEnd,
 
    CouldntReadMsg(PortId),
 
    CouldntCheckFiring(PortId),
 
    PutMsg(PortId, Payload),
 
}
 

	
 
///////////////////// IMPL /////////////////////
 
impl U32Stream {
 
    pub fn next(&mut self) -> u32 {
 
        if self.next == u32::MAX {
 
            panic!("NO NEXT!")
 
        }
 
        self.next += 1;
 
        self.next - 1
 
    }
 
}
 
impl From<Id> for PortId {
 
    fn from(id: Id) -> PortId {
 
        Self(id)
 
    }
 
}
 
impl From<Id> for ProtoComponentId {
 
    fn from(id: Id) -> ProtoComponentId {
 
        Self(id)
 
    }
 
}
 
impl From<&[u8]> for Payload {
 
    fn from(s: &[u8]) -> Payload {
 
        Payload(Arc::new(s.to_vec()))
 
    }
 
}
 
impl Payload {
 
    pub fn new(len: usize) -> Payload {
 
        let mut v = Vec::with_capacity(len);
 
        unsafe {
 
            v.set_len(len);
 
        }
 
        Payload(Arc::new(v))
 
    }
 
    pub fn len(&self) -> usize {
 
        self.0.len()
 
    }
 
    pub fn as_slice(&self) -> &[u8] {
 
        &self.0
 
    }
 
    pub fn as_mut_slice(&mut self) -> &mut [u8] {
 
        Arc::make_mut(&mut self.0) as _
 
    }
 
    pub fn concat_with(&mut self, other: &Self) {
 
        let bytes = other.as_slice().iter().copied();
 
        let me = Arc::make_mut(&mut self.0);
 
        me.extend(bytes);
 
    }
 
}
 
impl serde::Serialize for Payload {
 
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
 
    where
 
        S: serde::Serializer,
 
    {
 
        let inner: &Vec<u8> = &self.0;
 
        inner.serialize(serializer)
 
    }
 
}
 
impl<'de> serde::Deserialize<'de> for Payload {
 
    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
 
    where
 
        D: serde::Deserializer<'de>,
 
    {
 
        let inner: Vec<u8> = Vec::deserialize(deserializer)?;
 
        Ok(Self(Arc::new(inner)))
 
    }
 
}
 
impl std::iter::FromIterator<u8> for Payload {
 
    fn from_iter<I: IntoIterator<Item = u8>>(it: I) -> Self {
 
        Self(Arc::new(it.into_iter().collect()))
 
    }
 
}
 
impl From<Vec<u8>> for Payload {
 
    fn from(s: Vec<u8>) -> Self {
 
        Self(s.into())
 
    }
 
}
 
impl Debug for PortId {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        write!(f, "PID<{},{}>", self.0.connector_id, self.0.u32_suffix)
 
        write!(f, "ptID({}'{})", self.0.connector_id, self.0.u32_suffix)
 
    }
 
}
 
impl Debug for FiringVar {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        write!(f, "VID<{},{}>", (self.0).0.connector_id, (self.0).0.u32_suffix)
 
        write!(f, "fvID({}'{})", (self.0).0.connector_id, (self.0).0.u32_suffix)
 
    }
 
}
 
impl Debug for ProtoComponentId {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        write!(f, "ProtoComponentId({},{})", self.0.connector_id, self.0.u32_suffix)
 
        write!(f, "pcID({}'{})", self.0.connector_id, self.0.u32_suffix)
 
    }
 
}
 
impl std::ops::Not for Polarity {
 
    type Output = Self;
 
    fn not(self) -> Self::Output {
 
        use Polarity::*;
 
        match self {
 
            Putter => Getter,
 
            Getter => Putter,
 
        }
 
    }
 
}
src/runtime/communication.rs
Show inline comments
 
use super::*;
 
use crate::common::*;
 

	
 
////////////////
 
struct BranchingNative {
 
    branches: HashMap<Predicate, NativeBranch>,
 
}
 
#[derive(Clone, Debug)]
 
struct NativeBranch {
 
    index: usize,
 
    gotten: HashMap<PortId, Payload>,
 
    to_get: HashSet<PortId>,
 
}
 
#[derive(Debug)]
 
struct SolutionStorage {
 
    old_local: HashSet<Predicate>,
 
    new_local: HashSet<Predicate>,
 
    // this pair acts as Route -> HashSet<Predicate> which is friendlier to iteration
 
    subtree_solutions: Vec<HashSet<Predicate>>,
 
    subtree_id_to_index: HashMap<Route, usize>,
 
}
 
#[derive(Debug)]
 
struct BranchingProtoComponent {
 
    ports: HashSet<PortId>,
 
    branches: HashMap<Predicate, ProtoComponentBranch>,
 
}
 
#[derive(Debug, Clone)]
 
struct ProtoComponentBranch {
 
    inbox: HashMap<PortId, Payload>,
 
    state: ComponentState,
 
}
 
struct CyclicDrainer<'a, K: Eq + Hash, V> {
 
    input: &'a mut HashMap<K, V>,
 
    inner: CyclicDrainInner<'a, K, V>,
 
}
 
struct CyclicDrainInner<'a, K: Eq + Hash, V> {
 
    swap: &'a mut HashMap<K, V>,
 
    output: &'a mut HashMap<K, V>,
 
}
 
trait PayloadMsgSender {
 
    fn putter_send(
 
        &mut self,
 
        cu: &mut ConnectorUnphased,
 
        putter: PortId,
 
        msg: SendPayloadMsg,
 
    ) -> Result<(), SyncError>;
 
}
 
trait ReplaceBoolTrue {
 
    fn replace_with_true(&mut self) -> bool;
 
}
 
impl ReplaceBoolTrue for bool {
 
    fn replace_with_true(&mut self) -> bool {
 
        let was = *self;
 
        *self = true;
 
        !was
 
    }
 
}
 

	
 
////////////////
 
impl Connector {
 
    pub fn gotten(&mut self, port: PortId) -> Result<&Payload, GottenError> {
 
        use GottenError::*;
 
        let Self { phased, .. } = self;
 
        match phased {
 
            ConnectorPhased::Setup { .. } => Err(NoPreviousRound),
 
            ConnectorPhased::Communication(comm) => match &comm.round_result {
 
                Err(_) => Err(PreviousSyncFailed),
 
                Ok(None) => Err(NoPreviousRound),
 
                Ok(Some(round_ok)) => round_ok.gotten.get(&port).ok_or(PortDidntGet),
 
            },
 
        }
 
    }
 
    pub fn next_batch(&mut self) -> Result<usize, NextBatchError> {
 
        // returns index of new batch
 
        use NextBatchError::*;
 
        let Self { phased, .. } = self;
 
        match phased {
 
            ConnectorPhased::Setup { .. } => Err(NotConnected),
 
            ConnectorPhased::Communication(comm) => {
 
                comm.native_batches.push(Default::default());
 
                Ok(comm.native_batches.len() - 1)
 
            }
 
        }
 
    }
 
    fn port_op_access(
 
        &mut self,
 
        port: PortId,
 
        expect_polarity: Polarity,
 
    ) -> Result<&mut NativeBatch, PortOpError> {
 
        use PortOpError::*;
 
        let Self { unphased, phased } = self;
 
        if !unphased.native_ports.contains(&port) {
 
            return Err(PortUnavailable);
 
        }
 
        match unphased.port_info.polarities.get(&port) {
 
            Some(p) if *p == expect_polarity => {}
 
            Some(_) => return Err(WrongPolarity),
 
            None => return Err(UnknownPolarity),
 
        }
 
        match phased {
 
            ConnectorPhased::Setup { .. } => Err(NotConnected),
 
            ConnectorPhased::Communication(comm) => {
 
                let batch = comm.native_batches.last_mut().unwrap(); // length >= 1 is invariant
 
                Ok(batch)
 
            }
 
        }
 
    }
 
    pub fn put(&mut self, port: PortId, payload: Payload) -> Result<(), PortOpError> {
 
        use PortOpError::*;
 
        let batch = self.port_op_access(port, Putter)?;
 
        if batch.to_put.contains_key(&port) {
 
            Err(MultipleOpsOnPort)
 
        } else {
 
            batch.to_put.insert(port, payload);
 
            Ok(())
 
        }
 
    }
 
    pub fn get(&mut self, port: PortId) -> Result<(), PortOpError> {
 
        use PortOpError::*;
 
        let batch = self.port_op_access(port, Getter)?;
 
        if batch.to_get.insert(port) {
 
            Ok(())
 
        } else {
 
            Err(MultipleOpsOnPort)
 
        }
 
    }
 
    // entrypoint for caller. overwrites round result enum, and returns what happened
 
    pub fn sync(&mut self, timeout: Option<Duration>) -> Result<usize, SyncError> {
 
        let Self { unphased, phased } = self;
 
        match phased {
 
            ConnectorPhased::Setup { .. } => Err(SyncError::NotConnected),
 
            ConnectorPhased::Communication(comm) => {
 
                comm.round_result = Self::connected_sync(unphased, comm, timeout);
 
                match &comm.round_result {
 
                    Ok(None) => unreachable!(),
 
                    Ok(Some(ok_result)) => Ok(ok_result.batch_index),
 
                    Err(sync_error) => Err(sync_error.clone()),
 
                }
 
            }
 
        }
 
    }
 
    // private function. mutates state but returns with round
 
    // result ASAP (allows for convenient error return with ?)
 
    fn connected_sync(
 
        cu: &mut ConnectorUnphased,
 
        comm: &mut ConnectorCommunication,
 
        timeout: Option<Duration>,
 
    ) -> Result<Option<RoundOk>, SyncError> {
 
        use SyncError as Se;
 
        let mut deadline = timeout.map(|to| Instant::now() + to);
 
        let deadline = timeout.map(|to| Instant::now() + to);
 
        log!(
 
            cu.logger,
 
            "~~~ SYNC called with timeout {:?}; starting round {}",
 
            &timeout,
 
            comm.round_index
 
        );
 

	
 
        // 1. run all proto components to Nonsync blockers
 
        let mut branching_proto_components =
 
            HashMap::<ProtoComponentId, BranchingProtoComponent>::default();
 
        let mut unrun_components: Vec<(ProtoComponentId, ProtoComponent)> =
 
            cu.proto_components.iter().map(|(&k, v)| (k, v.clone())).collect();
 
        log!(cu.logger, "Nonsync running {} proto components...", unrun_components.len());
 
        while let Some((proto_component_id, mut component)) = unrun_components.pop() {
 
            // TODO coalesce fields
 
            log!(
 
                cu.logger,
 
                "Nonsync running proto component with ID {:?}. {} to go after this",
 
                proto_component_id,
 
                unrun_components.len()
 
            );
 
            let mut ctx = NonsyncProtoContext {
 
                logger: &mut *cu.logger,
 
                port_info: &mut cu.port_info,
 
                id_manager: &mut cu.id_manager,
 
                proto_component_id,
 
                unrun_components: &mut unrun_components,
 
                proto_component_ports: &mut cu
 
                    .proto_components
 
                    .get_mut(&proto_component_id)
 
                    .unwrap() // unrun_components' keys originate from proto_components
 
                    .ports,
 
            };
 
            let blocker = component.state.nonsync_run(&mut ctx, &cu.proto_description);
 
            log!(
 
                cu.logger,
 
                "proto component {:?} ran to nonsync blocker {:?}",
 
                proto_component_id,
 
                &blocker
 
            );
 
            use NonsyncBlocker as B;
 
            match blocker {
 
                B::ComponentExit => drop(component),
 
                B::Inconsistent => return Err(Se::InconsistentProtoComponent(proto_component_id)),
 
                B::SyncBlockStart => {
 
                    branching_proto_components
 
                        .insert(proto_component_id, BranchingProtoComponent::initial(component));
 
                }
 
            }
 
        }
 
        log!(
 
            cu.logger,
 
            "All {} proto components are now done with Nonsync phase",
 
            branching_proto_components.len(),
 
        );
 

	
 
        // NOTE: all msgs in outbox are of form (Getter, Payload)
 
        let mut payloads_to_get: Vec<(PortId, SendPayloadMsg)> = vec![];
 

	
 
        // create the solution storage
 
        let mut solution_storage = {
 
            let n = std::iter::once(Route::LocalComponent(ComponentId::Native));
 
            let c =
 
                cu.proto_components.keys().map(|&id| Route::LocalComponent(ComponentId::Proto(id)));
 
            let e = comm.neighborhood.children.iter().map(|&index| Route::Endpoint { index });
 
            SolutionStorage::new(n.chain(c).chain(e))
 
        };
 
        log!(cu.logger, "Solution storage initialized");
 

	
 
        // 2. kick off the native
 
        log!(
 
            cu.logger,
 
            "Translating {} native batches into branches...",
 
            comm.native_batches.len()
 
        );
 
        let mut branching_native = BranchingNative { branches: Default::default() };
 
        for (index, NativeBatch { to_get, to_put }) in comm.native_batches.drain(..).enumerate() {
 
        'native_branches: for (index, NativeBatch { to_get, to_put }) in
 
            comm.native_batches.drain(..).enumerate()
 
        {
 
            let predicate = {
 
                let mut predicate = Predicate::default();
 
                // assign trues
 
                // assign trues for ports that fire
 
                let firing_ports: HashSet<PortId> =
 
                    to_get.iter().chain(to_put.keys()).copied().collect();
 
                for &port in to_get.iter().chain(to_put.keys()) {
 
                    let var = cu.port_info.firing_var_for(port);
 
                    predicate.assigned.insert(var, true);
 
                }
 
                // assign falses
 
                for &port in cu.native_ports.iter() {
 
                // assign falses for silent ports
 
                for &port in cu.native_ports.difference(&firing_ports) {
 
                    let var = cu.port_info.firing_var_for(port);
 
                    predicate.assigned.entry(var).or_insert(false);
 
                    if let Some(true) = predicate.assigned.insert(var, false) {
 
                        log!(cu.logger, "Native branch index={} contains internal inconsistency wrt. {:?}. Skipping", index, var);
 
                        continue 'native_branches;
 
                    }
 
                }
 
                predicate
 
            };
 
            log!(cu.logger, "Native branch {} has pred {:?}", index, &predicate);
 
            log!(cu.logger, "Native branch index={:?} has consistent {:?}", index, &predicate);
 

	
 
            // put all messages
 
            for (putter, payload) in to_put {
 
                let msg = SendPayloadMsg { predicate: predicate.clone(), payload };
 
                log!(cu.logger, "Native branch {} sending msg {:?}", index, &msg);
 
                payloads_to_get.putter_send(cu, putter, msg)?;
 
            }
 
            if to_get.is_empty() {
 
                log!(
 
                    cu.logger,
 
                    "Native submitting solution for batch {} with {:?}",
 
                    index,
 
                    &predicate
 
                );
 
                solution_storage.submit_and_digest_subtree_solution(
 
                    &mut *cu.logger,
 
                    Route::LocalComponent(ComponentId::Native),
 
                    predicate.clone(),
 
                );
 
            }
 
            let branch = NativeBranch { index, gotten: Default::default(), to_get };
 
            if let Some(existing) = branching_native.branches.insert(predicate, branch) {
 
                // TODO
 
                return Err(Se::IndistinguishableBatches([index, existing.index]));
 
            }
 
        }
 
        let decision = Self::sync_reach_decision(
 
            cu,
 
            comm,
 
            &mut branching_native,
 
            &mut branching_proto_components,
 
            solution_storage,
 
            payloads_to_get,
 
            deadline,
 
        )?;
 
        log!(cu.logger, "Committing to decision {:?}!", &decision);
 

	
 
        // propagate the decision to children
 
        let msg = Msg::CommMsg(CommMsg {
 
            round_index: comm.round_index,
 
            contents: CommMsgContents::Announce { decision: decision.clone() },
 
        });
 
        log!(
 
            cu.logger,
 
            "Announcing decision {:?} through child endpoints {:?}",
 
            &msg,
 
            &comm.neighborhood.children
 
        );
 
        for &child in comm.neighborhood.children.iter() {
 
            comm.endpoint_manager.send_to_comms(child, &msg)?;
 
        }
 
        let ret = match decision {
 
            Decision::Failure => {
 
                // dropping {branching_proto_components, branching_native}
 
                Err(Se::RoundFailure)
 
            }
 
            Decision::Success(predicate) => {
 
                // commit changes to component states
 
                cu.proto_components.clear();
 
                cu.proto_components.extend(
 
                    // consume branching proto components
 
                    branching_proto_components
 
                        .into_iter()
 
                        .map(|(id, bpc)| (id, bpc.collapse_with(&predicate))),
 
                );
 
                log!(
 
                    cu.logger,
 
                    "End round with (updated) component states {:?}",
 
                    cu.proto_components.keys()
 
                );
 
                // consume native
 
                Ok(Some(branching_native.collapse_with(&predicate)))
 
            }
 
        };
 
        log!(cu.logger, "Sync round ending! Cleaning up");
 
        // dropping {solution_storage, payloads_to_get}
 
        ret
 
    }
 

	
 
    fn sync_reach_decision(
 
        cu: &mut ConnectorUnphased,
 
        comm: &mut ConnectorCommunication,
 
        branching_native: &mut BranchingNative,
 
        branching_proto_components: &mut HashMap<ProtoComponentId, BranchingProtoComponent>,
 
        mut solution_storage: SolutionStorage,
 
        mut payloads_to_get: Vec<(PortId, SendPayloadMsg)>,
 
        mut deadline: Option<Instant>,
 
    ) -> Result<Decision, SyncError> {
 
        let mut already_requested_failure = false;
 
        if branching_native.branches.is_empty() {
 
            log!(cu.logger, "Native starts with no branches! Failure!");
 
            match comm.neighborhood.parent {
 
                Some(parent) => {
 
                    if already_requested_failure.replace_with_true() {
 
                        Self::request_failure(cu, comm, parent)?
 
                    } else {
 
                        log!(cu.logger, "Already requested failure");
 
                    }
 
                }
 
                None => {
 
                    log!(cu.logger, "No parent. Deciding on failure");
 
                    return Ok(Decision::Failure);
 
                }
 
            }
 
        }
 
        log!(cu.logger, "Done translating native batches into branches");
 
        comm.native_batches.push(Default::default());
 

	
 
        // run all proto components to their sync blocker
 
        log!(
 
            cu.logger,
 
            "Running all {} proto components to their sync blocker...",
 
            branching_proto_components.len()
 
        );
 
        for (&proto_component_id, proto_component) in branching_proto_components.iter_mut() {
 
            let BranchingProtoComponent { ports, branches } = proto_component;
 
            let mut swap = HashMap::default();
 
            let mut blocked = HashMap::default();
 
            // drain from branches --> blocked
 
            let cd = CyclicDrainer::new(branches, &mut swap, &mut blocked);
 
            BranchingProtoComponent::drain_branches_to_blocked(
 
                cd,
 
                cu,
 
                &mut solution_storage,
 
                &mut payloads_to_get,
 
                proto_component_id,
 
                ports,
 
            )?;
 
            // swap the blocked branches back
 
            std::mem::swap(&mut blocked, branches);
 
            if branches.is_empty() {
 
                log!(cu.logger, "{:?} has become inconsistent!", proto_component_id);
 
                if let Some(parent) = comm.neighborhood.parent {
 
                    if already_requested_failure.replace_with_true() {
 
                        Self::request_failure(cu, comm, parent)?
 
                    } else {
 
                        log!(cu.logger, "Already requested failure");
 
                    }
 
                } else {
 
                    log!(cu.logger, "As the leader, deciding on timeout");
 
                    return Ok(Decision::Failure);
 
                }
 
            }
 
        }
 
        log!(cu.logger, "All proto components are blocked");
 

	
 
        log!(cu.logger, "Entering decision loop...");
 
        comm.endpoint_manager.undelay_all();
 
        let decision = 'undecided: loop {
 
        'undecided: loop {
 
            // drain payloads_to_get, sending them through endpoints / feeding them to components
 
            while let Some((getter, send_payload_msg)) = payloads_to_get.pop() {
 
                assert!(cu.port_info.polarities.get(&getter).copied() == Some(Getter));
 
                match cu.port_info.routes.get(&getter) {
 
                    None => {
 
                        log!(
 
                            cu.logger,
 
                            "Delivery to getter {:?} msg {:?} failed. Physical route unmapped!",
 
                            getter,
 
                            &send_payload_msg
 
                        );
 
                    }
 
                    Some(Route::Endpoint { index }) => {
 
                        let msg = Msg::CommMsg(CommMsg {
 
                            round_index: comm.round_index,
 
                            contents: CommMsgContents::SendPayload(send_payload_msg),
 
                        });
 
                        comm.endpoint_manager.send_to_comms(*index, &msg)?;
 
                    }
 
                    Some(Route::LocalComponent(ComponentId::Native)) => branching_native.feed_msg(
 
                        cu,
 
                        &mut solution_storage,
 
                        // &mut Pay
 
                        getter,
 
                        &send_payload_msg,
 
                    ),
 
                    Some(Route::LocalComponent(ComponentId::Proto(proto_component_id))) => {
 
                        if let Some(branching_component) =
 
                            branching_proto_components.get_mut(proto_component_id)
 
                        {
 
                            let proto_component_id = *proto_component_id;
 
                            // let ConnectorUnphased { port_info, proto_description, .. } = cu;
 
                            branching_component.feed_msg(
 
                                cu,
 
                                &mut solution_storage,
 
                                proto_component_id,
 
                                &mut payloads_to_get,
 
                                getter,
 
                                &send_payload_msg,
 
                            )?;
 
                            if branching_component.branches.is_empty() {
 
                                log!(
 
                                    cu.logger,
 
                                    "{:?} has become inconsistent!",
 
                                    proto_component_id
 
                                );
 
                                if let Some(parent) = comm.neighborhood.parent {
 
                                    if already_requested_failure.replace_with_true() {
 
                                        Self::request_failure(cu, comm, parent)?
 
                                    } else {
 
                                        log!(cu.logger, "Already requested failure");
 
                                    }
 
                                } else {
 
                                    log!(cu.logger, "As the leader, deciding on timeout");
 
                                    return Ok(Decision::Failure);
 
                                }
 
                            }
 
                        } else {
 
                            log!(
 
                                cu.logger,
 
                                "Delivery to getter {:?} msg {:?} failed because {:?} isn't here",
 
                                getter,
 
                                &send_payload_msg,
 
                                proto_component_id
 
                            );
 
                        }
 
                    }
 
                }
 
            }
 

	
 
            // check if we have a solution yet
 
            log!(cu.logger, "Check if we have any local decisions...");
 
            for solution in solution_storage.iter_new_local_make_old() {
 
                log!(cu.logger, "New local decision with solution {:?}...", &solution);
 
                match comm.neighborhood.parent {
 
                    Some(parent) => {
 
                        log!(cu.logger, "Forwarding to my parent {:?}", parent);
 
                        let suggestion = Decision::Success(solution);
 
                        let msg = Msg::CommMsg(CommMsg {
 
                            round_index: comm.round_index,
 
                            contents: CommMsgContents::Suggest { suggestion },
 
                        });
 
                        comm.endpoint_manager.send_to_comms(parent, &msg)?;
 
                    }
 
                    None => {
 
                        log!(cu.logger, "No parent. Deciding on solution {:?}", &solution);
 
                        break 'undecided Decision::Success(solution);
 
                        return Ok(Decision::Success(solution));
 
                    }
 
                }
 
            }
 

	
 
            // stuck! make progress by receiving a msg
 
            // try recv messages arriving through endpoints
 
            log!(cu.logger, "No decision yet. Let's recv an endpoint msg...");
 
            {
 
                let (endpoint_index, msg) = loop {
 
                    match comm.endpoint_manager.try_recv_any_comms(&mut *cu.logger, deadline)? {
 
                        None => {
 
                            log!(cu.logger, "Reached user-defined deadling without decision...");
 
                            if let Some(parent) = comm.neighborhood.parent {
 
                                log!(
 
                                    cu.logger,
 
                                    "Sending failure request to parent index {}",
 
                                    parent
 
                                );
 
                                let msg = Msg::CommMsg(CommMsg {
 
                                    round_index: comm.round_index,
 
                                    contents: CommMsgContents::Suggest {
 
                                        suggestion: Decision::Failure,
 
                                    },
 
                                });
 
                                comm.endpoint_manager.send_to_comms(parent, &msg)?;
 
                                if already_requested_failure.replace_with_true() {
 
                                    Self::request_failure(cu, comm, parent)?
 
                                } else {
 
                                    log!(cu.logger, "Already requested failure");
 
                                }
 
                            } else {
 
                                log!(cu.logger, "As the leader, deciding on timeout");
 
                                break 'undecided Decision::Failure;
 
                                return Ok(Decision::Failure);
 
                            }
 
                            deadline = None;
 
                        }
 
                        Some((endpoint_index, msg)) => break (endpoint_index, msg),
 
                    }
 
                };
 
                log!(cu.logger, "Received from endpoint {} msg {:?}", endpoint_index, &msg);
 
                let comm_msg_contents = match msg {
 
                    Msg::SetupMsg(..) => {
 
                        log!(cu.logger, "Discarding setup message; that phase is over");
 
                        continue 'undecided;
 
                    }
 
                    Msg::CommMsg(comm_msg) => match comm_msg.round_index.cmp(&comm.round_index) {
 
                        Ordering::Equal => comm_msg.contents,
 
                        Ordering::Less => {
 
                            log!(
 
                                cu.logger,
 
                                "We are in round {}, but msg is for round {}. Discard",
 
                                comm_msg.round_index,
 
                                comm.round_index,
 
                            );
 
                            drop(comm_msg);
 
                            continue 'undecided;
 
                        }
 
                        Ordering::Greater => {
 
                            log!(
 
                                cu.logger,
 
                                "We are in round {}, but msg is for round {}. Buffer",
 
                                comm_msg.round_index,
 
                                comm.round_index,
 
                            );
 
                            comm.endpoint_manager
 
                                .delayed_messages
 
                                .push((endpoint_index, Msg::CommMsg(comm_msg)));
 
                            continue 'undecided;
 
                        }
 
                    },
 
                };
 
                match comm_msg_contents {
 
                    CommMsgContents::SendPayload(send_payload_msg) => {
 
                        let getter =
 
                            comm.endpoint_manager.endpoint_exts[endpoint_index].getter_for_incoming;
 
                        assert!(cu.port_info.polarities.get(&getter) == Some(&Getter));
 
                        log!(
 
                            cu.logger,
 
                            "Msg routed to getter port {:?}. Buffer for recv loop",
 
                            getter,
 
                        );
 
                        payloads_to_get.push((getter, send_payload_msg));
 
                    }
 
                    CommMsgContents::Suggest { suggestion } => {
 
                        // only accept this control msg through a child endpoint
 
                        if comm.neighborhood.children.contains(&endpoint_index) {
 
                            match suggestion {
 
                                Decision::Success(predicate) => {
 
                                    // child solution contributes to local solution
 
                                    log!(cu.logger, "Child provided solution {:?}", &predicate);
 
                                    let route = Route::Endpoint { index: endpoint_index };
 
                                    solution_storage.submit_and_digest_subtree_solution(
 
                                        &mut *cu.logger,
 
                                        route,
 
                                        predicate,
 
                                    );
 
                                }
 
                                Decision::Failure => {
 
                                    match comm.neighborhood.parent {
 
                                        None => {
 
                                            log!(
 
                                                cu.logger,
 
                                                "As sink, I decide on my child's failure"
 
                                            );
 
                                            // I am the sink. Decide on failed
 
                                            break 'undecided Decision::Failure;
 
                                            log!(cu.logger, "I decide on my child's failure");
 
                                            break 'undecided Ok(Decision::Failure);
 
                                        }
 
                                        Some(parent) => {
 
                                            log!(cu.logger, "Forwarding failure through my parent endpoint {:?}", parent);
 
                                            // I've got a parent. Forward the failure suggestion.
 
                                            let msg = Msg::CommMsg(CommMsg {
 
                                                round_index: comm.round_index,
 
                                                contents: CommMsgContents::Suggest { suggestion },
 
                                            });
 
                                            comm.endpoint_manager.send_to_comms(parent, &msg)?;
 
                                            if already_requested_failure.replace_with_true() {
 
                                                Self::request_failure(cu, comm, parent)?
 
                                            } else {
 
                                                log!(cu.logger, "Already requested failure");
 
                                            }
 
                                        }
 
                                    }
 
                                }
 
                            }
 
                        } else {
 
                            log!(
 
                                cu.logger,
 
                                "Discarding suggestion {:?} from non-child endpoint idx {:?}",
 
                                &suggestion,
 
                                endpoint_index
 
                            );
 
                        }
 
                    }
 
                    CommMsgContents::Announce { decision } => {
 
                        if Some(endpoint_index) == comm.neighborhood.parent {
 
                            // adopt this decision
 
                            break 'undecided decision;
 
                            return Ok(decision);
 
                        } else {
 
                            log!(
 
                                cu.logger,
 
                                "Discarding announcement {:?} from non-parent endpoint idx {:?}",
 
                                &decision,
 
                                endpoint_index
 
                            );
 
                        }
 
                    }
 
                }
 
            }
 
            log!(cu.logger, "Endpoint msg recv done");
 
        };
 
        log!(cu.logger, "Committing to decision {:?}!", &decision);
 

	
 
        // propagate the decision to children
 
        }
 
    }
 
    fn request_failure(
 
        cu: &mut ConnectorUnphased,
 
        comm: &mut ConnectorCommunication,
 
        parent: usize,
 
    ) -> Result<(), SyncError> {
 
        log!(cu.logger, "Forwarding to my parent {:?}", parent);
 
        let suggestion = Decision::Failure;
 
        let msg = Msg::CommMsg(CommMsg {
 
            round_index: comm.round_index,
 
            contents: CommMsgContents::Announce { decision: decision.clone() },
 
            contents: CommMsgContents::Suggest { suggestion },
 
        });
 
        log!(
 
            cu.logger,
 
            "Announcing decision {:?} through child endpoints {:?}",
 
            &msg,
 
            &comm.neighborhood.children
 
        );
 
        for &child in comm.neighborhood.children.iter() {
 
            comm.endpoint_manager.send_to_comms(child, &msg)?;
 
        }
 
        match decision {
 
            Decision::Failure => Err(Se::RoundFailure),
 
            Decision::Success(predicate) => {
 
                // commit changes to component states
 
                cu.proto_components.clear();
 
                cu.proto_components.extend(
 
                    branching_proto_components
 
                        .into_iter()
 
                        .map(|(id, bpc)| (id, bpc.collapse_with(&predicate))),
 
                );
 
                Ok(Some(branching_native.collapse_with(&predicate)))
 
            }
 
        }
 
        comm.endpoint_manager.send_to_comms(parent, &msg)
 
    }
 
}
 
impl BranchingNative {
 
    fn feed_msg(
 
        &mut self,
 
        cu: &mut ConnectorUnphased,
 
        solution_storage: &mut SolutionStorage,
 
        getter: PortId,
 
        send_payload_msg: &SendPayloadMsg,
 
    ) {
 
        log!(cu.logger, "feeding native getter {:?} {:?}", getter, &send_payload_msg);
 
        assert!(cu.port_info.polarities.get(&getter).copied() == Some(Getter));
 
        let mut draining = HashMap::default();
 
        let finished = &mut self.branches;
 
        std::mem::swap(&mut draining, finished);
 
        for (predicate, mut branch) in draining.drain() {
 
            log!(cu.logger, "visiting native branch {:?} with {:?}", &branch, &predicate);
 
            // check if this branch expects to receive it
 
            let var = cu.port_info.firing_var_for(getter);
 
            let mut feed_branch = |branch: &mut NativeBranch, predicate: &Predicate| {
 
                let was = branch.gotten.insert(getter, send_payload_msg.payload.clone());
 
                assert!(was.is_none());
 
                branch.to_get.remove(&getter);
 
                if branch.to_get.is_empty() {
 
                    let route = Route::LocalComponent(ComponentId::Native);
 
                    solution_storage.submit_and_digest_subtree_solution(
 
                        &mut *cu.logger,
 
                        route,
 
                        predicate.clone(),
 
                    );
 
                }
 
            };
 
            if predicate.query(var) != Some(true) {
 
                // optimization. Don't bother trying this branch
 
                log!(
 
                    cu.logger,
 
                    "skipping branch with {:?} that doesn't want the message (fastpath)",
 
                    &predicate
 
                );
 
                finished.insert(predicate, branch);
 
                continue;
 
            }
 
            use CommonSatResult as Csr;
 
            match predicate.common_satisfier(&send_payload_msg.predicate) {
 
                Csr::Nonexistant => {
 
                    // this branch does not receive the message
 
                    log!(
 
                        cu.logger,
 
                        "skipping branch with {:?} that doesn't want the message (slowpath)",
 
                        &predicate
 
                    );
 
                    finished.insert(predicate, branch);
 
                }
 
                Csr::Equivalent | Csr::FormerNotLatter => {
 
                    // retain the existing predicate, but add this payload
 
                    feed_branch(&mut branch, &predicate);
 
                    log!(cu.logger, "branch pred covers it! Accept the msg");
 
                    finished.insert(predicate, branch);
 
                }
 
                Csr::LatterNotFormer => {
 
                    // fork branch, give fork the message and payload predicate. original branch untouched
 
                    let mut branch2 = branch.clone();
 
                    let predicate2 = send_payload_msg.predicate.clone();
 
                    feed_branch(&mut branch2, &predicate2);
 
                    log!(
 
                        cu.logger,
 
                        "payload pred {:?} covers branch pred {:?}",
 
                        &predicate2,
 
                        &predicate
 
                    );
 
                    finished.insert(predicate, branch);
 
                    finished.insert(predicate2, branch2);
 
                }
 
                Csr::New(predicate2) => {
 
                    // fork branch, give fork the message and the new predicate. original branch untouched
 
                    let mut branch2 = branch.clone();
 
                    feed_branch(&mut branch2, &predicate2);
 
                    log!(
 
                        cu.logger,
 
                        "new subsuming pred created {:?}. forking and feeding",
 
                        &predicate2
 
                    );
 
                    finished.insert(predicate, branch);
 
                    finished.insert(predicate2, branch2);
 
                }
 
            }
 
        }
 
    }
 
    fn collapse_with(self, solution_predicate: &Predicate) -> RoundOk {
 
        for (branch_predicate, branch) in self.branches {
 
            if solution_predicate.satisfies(&branch_predicate) {
 
                let NativeBranch { index, gotten, .. } = branch;
 
                return RoundOk { batch_index: index, gotten };
 
            }
 
        }
 
        panic!("Native had no branches matching pred {:?}", solution_predicate);
 
    }
 
}
 

	
 
// |putter, m| {
 
//     let getter = *cu.port_info.peers.get(&putter).unwrap();
 
//     payloads_to_get.push((getter, m));
 
// },
 
impl BranchingProtoComponent {
 
    fn drain_branches_to_blocked(
 
        cd: CyclicDrainer<Predicate, ProtoComponentBranch>,
 
        cu: &mut ConnectorUnphased,
 
        solution_storage: &mut SolutionStorage,
 
        payload_msg_sender: &mut impl PayloadMsgSender,
 
        proto_component_id: ProtoComponentId,
 
        ports: &HashSet<PortId>,
 
    ) -> Result<(), SyncError> {
 
        cd.cylic_drain(|mut predicate, mut branch, mut drainer| {
 
            let mut ctx = SyncProtoContext {
 
                logger: &mut *cu.logger,
 
                predicate: &predicate,
 
                port_info: &cu.port_info,
 
                inbox: &branch.inbox,
 
            };
 
            let blocker = branch.state.sync_run(&mut ctx, &cu.proto_description);
 
            log!(
 
                cu.logger,
 
                "Proto component with id {:?} branch with pred {:?} hit blocker {:?}",
 
                proto_component_id,
 
                &predicate,
 
                &blocker,
 
            );
 
            use SyncBlocker as B;
 
            match blocker {
 
                B::Inconsistent => {
 
                    // branch is inconsistent. throw it away
 
                    drop((predicate, branch));
 
                }
 
                B::SyncBlockEnd => {
 
                    // make concrete all variables
 
                    for &port in ports.iter() {
 
                        let var = cu.port_info.firing_var_for(port);
 
                        predicate.assigned.entry(var).or_insert(false);
 
                    }
 
                    // submit solution for this component
 
                    solution_storage.submit_and_digest_subtree_solution(
 
                        &mut *cu.logger,
 
                        Route::LocalComponent(ComponentId::Proto(proto_component_id)),
 
                        predicate.clone(),
 
                    );
 
                    // move to "blocked"
 
                    drainer.add_output(predicate, branch);
 
                }
 
                B::CouldntReadMsg(port) => {
 
                    // move to "blocked"
 
                    assert!(!branch.inbox.contains_key(&port));
 
                    drainer.add_output(predicate, branch);
 
                }
 
                B::CouldntCheckFiring(port) => {
 
                    // sanity check
 
                    let var = cu.port_info.firing_var_for(port);
 
                    assert!(predicate.query(var).is_none());
 
                    // keep forks in "unblocked"
 
                    drainer.add_input(predicate.clone().inserted(var, false), branch.clone());
 
                    drainer.add_input(predicate.inserted(var, true), branch);
 
                }
 
                B::PutMsg(putter, payload) => {
 
                    // sanity check
 
                    assert_eq!(Some(&Putter), cu.port_info.polarities.get(&putter));
 
                    // overwrite assignment
 
                    let var = cu.port_info.firing_var_for(putter);
 
                    let was = predicate.assigned.insert(var, true);
 
                    if was == Some(false) {
 
                        log!(cu.logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, putter, var);
 
                        // discard forever
 
                        drop((predicate, branch));
 
                    } else {
 
                        // keep in "unblocked"
 
                        log!(cu.logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, putter, var);
 
                        let msg = SendPayloadMsg { predicate: predicate.clone(), payload };
 
                        payload_msg_sender.putter_send(cu, putter, msg)?;
 
                        drainer.add_input(predicate, branch);
 
                    }
 
                }
 
            }
 
            Ok(())
 
        })
 
    }
 
    fn feed_msg(
 
        &mut self,
 
        cu: &mut ConnectorUnphased,
 
        solution_storage: &mut SolutionStorage,
 
        proto_component_id: ProtoComponentId,
 
        payload_msg_sender: &mut impl PayloadMsgSender,
 
        getter: PortId,
 
        send_payload_msg: &SendPayloadMsg,
 
    ) -> Result<(), SyncError> {
 
        let logger = &mut *cu.logger;
 
        log!(
 
            logger,
 
            "feeding proto component {:?} getter {:?} {:?}",
 
            proto_component_id,
 
            getter,
 
            &send_payload_msg
 
        );
 
        let BranchingProtoComponent { branches, ports } = self;
 
        let mut unblocked = HashMap::default();
 
        let mut blocked = HashMap::default();
 
        // partition drain from branches -> {unblocked, blocked}
 
        log!(logger, "visiting {} blocked branches...", branches.len());
 
        for (predicate, mut branch) in branches.drain() {
 
            use CommonSatResult as Csr;
 
            log!(logger, "visiting branch with pred {:?}", &predicate);
 
            match predicate.common_satisfier(&send_payload_msg.predicate) {
 
                Csr::Nonexistant => {
 
                    // this branch does not receive the message
 
                    log!(logger, "skipping branch");
 
                    blocked.insert(predicate, branch);
 
                }
 
                Csr::Equivalent | Csr::FormerNotLatter => {
 
                    // retain the existing predicate, but add this payload
 
                    log!(logger, "feeding this branch without altering its predicate");
 
                    branch.feed_msg(getter, send_payload_msg.payload.clone());
 
                    unblocked.insert(predicate, branch);
 
                }
 
                Csr::LatterNotFormer => {
 
                    // fork branch, give fork the message and payload predicate. original branch untouched
 
                    log!(logger, "Forking this branch, giving it the predicate of the msg");
 
                    let mut branch2 = branch.clone();
 
                    let predicate2 = send_payload_msg.predicate.clone();
 
                    branch2.feed_msg(getter, send_payload_msg.payload.clone());
 
                    blocked.insert(predicate, branch);
 
                    unblocked.insert(predicate2, branch2);
 
                }
 
                Csr::New(predicate2) => {
 
                    // fork branch, give fork the message and the new predicate. original branch untouched
 
                    log!(logger, "Forking this branch with new predicate {:?}", &predicate2);
 
                    let mut branch2 = branch.clone();
 
                    branch2.feed_msg(getter, send_payload_msg.payload.clone());
 
                    blocked.insert(predicate, branch);
 
                    unblocked.insert(predicate2, branch2);
 
                }
 
            }
 
        }
 
        log!(logger, "blocked {:?} unblocked {:?}", blocked.len(), unblocked.len());
 
        // drain from unblocked --> blocked
 
        let mut swap = HashMap::default();
 
        let cd = CyclicDrainer::new(&mut unblocked, &mut swap, &mut blocked);
 
        BranchingProtoComponent::drain_branches_to_blocked(
 
            cd,
 
            cu,
 
            solution_storage,
 
            payload_msg_sender,
 
            proto_component_id,
 
            ports,
 
        )?;
 
        // swap the blocked branches back
 
        std::mem::swap(&mut blocked, branches);
 
        log!(cu.logger, "component settles down with branches: {:?}", branches.keys());
 
        Ok(())
 
    }
 
    fn collapse_with(self, solution_predicate: &Predicate) -> ProtoComponent {
 
        let BranchingProtoComponent { ports, branches } = self;
 
        for (branch_predicate, branch) in branches {
 
            if branch_predicate.satisfies(solution_predicate) {
 
                let ProtoComponentBranch { state, .. } = branch;
 
                return ProtoComponent { state, ports };
 
            }
 
        }
 
        panic!("ProtoComponent had no branches matching pred {:?}", solution_predicate);
 
    }
 
    fn initial(ProtoComponent { state, ports }: ProtoComponent) -> Self {
 
        let branch = ProtoComponentBranch { inbox: Default::default(), state };
 
        Self { ports, branches: hashmap! { Predicate::default() => branch  } }
 
    }
 
}
 
impl SolutionStorage {
 
    fn new(routes: impl Iterator<Item = Route>) -> Self {
 
        let mut subtree_id_to_index: HashMap<Route, usize> = Default::default();
 
        let mut subtree_solutions = vec![];
 
        for key in routes {
 
            subtree_id_to_index.insert(key, subtree_solutions.len());
 
            subtree_solutions.push(Default::default())
 
        }
 
        Self {
 
            subtree_solutions,
 
            subtree_id_to_index,
 
            old_local: Default::default(),
 
            new_local: Default::default(),
 
        }
 
    }
 
    fn is_clear(&self) -> bool {
 
        self.subtree_id_to_index.is_empty()
 
            && self.subtree_solutions.is_empty()
 
            && self.old_local.is_empty()
 
            && self.new_local.is_empty()
src/runtime/ffi.rs
Show inline comments
 
use super::*;
 

	
 
use core::cell::RefCell;
 
use core::convert::TryFrom;
 
use std::slice::from_raw_parts as slice_from_parts;
 
// use std::os::raw::{c_char, c_int, c_uchar, c_uint};
 

	
 
#[derive(Default)]
 
struct StoredError {
 
    // invariant: len is zero IFF its occupied
 
    // contents are 1+ bytes because we also store the NULL TERMINATOR
 
    buf: Vec<u8>,
 
}
 
impl StoredError {
 
    const NULL_TERMINATOR: u8 = 0;
 
    fn clear(&mut self) {
 
        // no null terminator either!
 
        self.buf.clear();
 
    }
 
    fn debug_store<E: Debug>(&mut self, error: &E) {
 
        let _ = write!(&mut self.buf, "{:?}", error);
 
        self.buf.push(Self::NULL_TERMINATOR);
 
    }
 
    fn tl_debug_store<E: Debug>(error: &E) {
 
        STORED_ERROR.with(|stored_error| {
 
            let mut stored_error = stored_error.borrow_mut();
 
            stored_error.clear();
 
            stored_error.debug_store(error);
 
        })
 
    }
 
    fn bytes_store(&mut self, bytes: &[u8]) {
 
        let _ = self.buf.write_all(bytes);
 
        self.buf.push(Self::NULL_TERMINATOR);
 
    }
 
    fn tl_bytes_store(bytes: &[u8]) {
 
        STORED_ERROR.with(|stored_error| {
 
            let mut stored_error = stored_error.borrow_mut();
 
            stored_error.clear();
 
            stored_error.bytes_store(bytes);
 
        })
 
    }
 
    fn tl_clear() {
 
        STORED_ERROR.with(|stored_error| {
 
            let mut stored_error = stored_error.borrow_mut();
 
            stored_error.clear();
 
        })
 
    }
 
    fn tl_bytes_peek() -> (*const u8, usize) {
 
        STORED_ERROR.with(|stored_error| {
 
            let stored_error = stored_error.borrow();
 
            match stored_error.buf.len() {
 
                0 => (core::ptr::null(), 0), // no error!
 
                n => {
 
                    // stores an error of length n-1 AND a NULL TERMINATOR
 
                    (stored_error.buf.as_ptr(), n - 1)
 
                }
 
            }
 
        })
 
    }
 
}
 
thread_local! {
 
    static STORED_ERROR: RefCell<StoredError> = RefCell::new(StoredError::default());
 
}
 

	
 
type ErrorCode = i32;
 

	
 
///////////////////// REOWOLF //////////////////////////
 

	
 
/// Returns length (via out pointer) and pointer (via return value) of the last Reowolf error.
 
/// - pointer is NULL iff there was no last error
 
/// - data at pointer is null-delimited
 
/// - len does NOT include the length of the null-delimiter
 
/// If len is NULL, it will not written to.
 
#[no_mangle]
 
pub unsafe extern "C" fn reowolf_error_peek(len: *mut usize) -> *const u8 {
 
    let (err_ptr, err_len) = StoredError::tl_bytes_peek();
 
    if !len.is_null() {
 
        len.write(err_len);
 
    }
 
    err_ptr
 
}
 

	
 
///////////////////// PROTOCOL DESCRIPTION //////////////////////////
 

	
 
/// Parses the utf8-encoded string slice to initialize a new protocol description object.
 
/// - On success, initializes `out` and returns 0
 
/// - On failure, stores an error string (see `reowolf_error_peek`) and returns -1
 
#[no_mangle]
 
pub unsafe extern "C" fn protocol_description_parse(
 
    pdl: *const u8,
 
    pdl_len: usize,
 
) -> *mut Arc<ProtocolDescription> {
 
    StoredError::tl_clear();
 
    match ProtocolDescription::parse(&*slice_from_parts(pdl, pdl_len)) {
 
        Ok(new) => Box::into_raw(Box::new(Arc::new(new))),
 
        Err(err) => {
 
            StoredError::tl_bytes_store(err.as_bytes());
 
            std::ptr::null_mut()
 
        }
 
    }
 
}
 

	
 
/// Destroys the given initialized protocol description and frees its resources.
 
#[no_mangle]
 
pub unsafe extern "C" fn protocol_description_destroy(pd: *mut Arc<ProtocolDescription>) {
 
    drop(Box::from_raw(pd))
 
}
 

	
 
/// Given an initialized protocol description, initializes `out` with a clone which can be independently created or destroyed.
 
#[no_mangle]
 
pub unsafe extern "C" fn protocol_description_clone(
 
    pd: &Arc<ProtocolDescription>,
 
) -> *mut Arc<ProtocolDescription> {
 
    Box::into_raw(Box::new(pd.clone()))
 
}
 

	
 
///////////////////// CONNECTOR //////////////////////////
 

	
 
/// Allocates a new connector on the heap and returning a pointer,
 
/// given an initialized protocol description.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_new(pd: &Arc<ProtocolDescription>) -> *mut Connector {
 
    connector_new_with_id(pd, random_connector_id())
 
pub unsafe extern "C" fn connector_new_logging(
 
    pd: &Arc<ProtocolDescription>,
 
    path_ptr: *const u8,
 
    path_len: usize,
 
) -> *mut Connector {
 
    StoredError::tl_clear();
 
    let path_bytes = &*slice_from_parts(path_ptr, path_len);
 
    let path_str = match std::str::from_utf8(path_bytes) {
 
        Ok(path_str) => path_str,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            return std::ptr::null_mut();
 
        }
 
    };
 
    match std::fs::File::create(path_str) {
 
        Ok(file) => {
 
            let connector_id = Connector::random_id();
 
            let file_logger = Box::new(FileLogger::new(connector_id, file));
 
            let c = Connector::new(file_logger, pd.clone(), connector_id, 8);
 
            Box::into_raw(Box::new(c))
 
        }
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            std::ptr::null_mut()
 
        }
 
    }
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_print_debug(connector: &mut Connector) {
 
    println!("Debug print dump {:#?}", connector);
 
}
 

	
 
/// Initializes `out` with a new connector using the given protocol description as its configuration.
 
/// The connector uses the given (internal) connector ID.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_new_with_id(
 
    pd: &Arc<ProtocolDescription>,
 
    cid: ConnectorId,
 
) -> *mut Connector {
 
    let c = Connector::new(Box::new(DummyLogger), pd.clone(), cid, 8);
 
pub unsafe extern "C" fn connector_new(pd: &Arc<ProtocolDescription>) -> *mut Connector {
 
    let c = Connector::new(Box::new(DummyLogger), pd.clone(), Connector::random_id(), 8);
 
    Box::into_raw(Box::new(c))
 
}
 

	
 
/// Destroys the given a pointer to the connector on the heap, freeing its resources.
 
/// Usable in {setup, communication} states.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_destroy(connector: *mut Connector) {
 
    drop(Box::from_raw(connector))
 
}
 

	
 
/// Given an initialized connector in setup or connecting state,
 
/// - Creates a new directed port pair with logical channel putter->getter,
 
/// - adds the ports to the native component's interface,
 
/// - and returns them using the given out pointers.
 
/// Usable in {setup, communication} states.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_add_port_pair(
 
    connector: &mut Connector,
 
    out_putter: *mut PortId,
 
    out_getter: *mut PortId,
 
) {
 
    let [o, i] = connector.new_port_pair();
 
    out_putter.write(o);
 
    out_getter.write(i);
 
}
 

	
 
/// Given
 
/// - an initialized connector in setup or connecting state,
 
/// - a string slice for the component's identifier in the connector's configured protocol description,
 
/// - a set of ports (represented as a slice; duplicates are ignored) in the native component's interface,
 
/// the connector creates a new (internal) protocol component C, such that the set of native ports are moved to C.
 
/// Usable in {setup, communication} states.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_add_component(
 
    connector: &mut Connector,
 
    ident_ptr: *const u8,
 
    ident_len: usize,
 
    ports_ptr: *const PortId,
 
    ports_len: usize,
 
) -> ErrorCode {
 
    StoredError::tl_clear();
 
    match connector.add_component(
 
        &*slice_from_parts(ident_ptr, ident_len),
 
        &*slice_from_parts(ports_ptr, ports_len),
 
    ) {
 
        Ok(()) => 0,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            -1
 
        }
 
    }
 
}
 

	
 
/// Given
 
/// - an initialized connector in setup or connecting state,
 
/// - a utf-8 encoded socket address,
 
/// - the logical polarity of P,
 
/// - the "physical" polarity in {Active, Passive} of the endpoint through which P's peer will be discovered,
 
/// returns P, a port newly added to the native interface.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_add_net_port(
 
    connector: &mut Connector,
 
    addr_str_ptr: *const u8,
 
    addr_str_len: usize,
 
    port_polarity: Polarity,
 
    endpoint_polarity: EndpointPolarity,
 
    port: *mut PortId,
 
) -> ErrorCode {
 
    StoredError::tl_clear();
 
    let addr_bytes = &*slice_from_parts(addr_str_ptr, addr_str_len);
 
    let addr_str = match std::str::from_utf8(addr_bytes) {
 
        Ok(addr_str) => addr_str,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            return -1;
 
        }
 
    };
 
    let sock_address: SocketAddr = match addr_str.parse() {
 
        Ok(addr) => addr,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            return -2;
 
        }
 
    };
 
    match connector.new_net_port(port_polarity, sock_address, endpoint_polarity) {
 
        Ok(p) => {
 
            port.write(p);
 
            0
 
        }
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            -3
 
        }
 
    }
 
}
 

	
 
/// Connects this connector to the distributed system of connectors reachable through endpoints,
 
/// Usable in setup state, and changes the state to communication.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_connect(
 
    connector: &mut Connector,
 
    timeout_millis: i64,
 
) -> ErrorCode {
 
    StoredError::tl_clear();
 
    let option_timeout_millis: Option<u64> = TryFrom::try_from(timeout_millis).ok();
 
    let timeout = option_timeout_millis.map(Duration::from_millis);
 
    match connector.connect(timeout) {
 
        Ok(()) => 0,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            -1
 
        }
 
    }
 
}
 

	
 
// #[no_mangle]
 
// pub unsafe extern "C" fn connector_put_payload(
 
//     connector: &mut Connector,
 
//     port: PortId,
 
//     payload: *mut Payload,
 
// ) -> ErrorCode {
 
//     match connector.put(port, payload.read()) {
 
//         Ok(()) => 0,
 
//         Err(err) => {
 
//             StoredError::tl_debug_store(&err);
 
//             -1
 
//         }
 
//     }
 
// }
 

	
 
// #[no_mangle]
 
// pub unsafe extern "C" fn connector_put_payload_cloning(
 
//     connector: &mut Connector,
 
//     port: PortId,
 
//     payload: &Payload,
 
// ) -> ErrorCode {
 
//     match connector.put(port, payload.clone()) {
 
//         Ok(()) => 0,
 
//         Err(err) => {
 
//             StoredError::tl_debug_store(&err);
 
//             -1
 
//         }
 
//     }
 
// }
 

	
 
/// Convenience function combining the functionalities of
 
/// "payload_new" with "connector_put_payload".
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_put_bytes(
 
    connector: &mut Connector,
 
    port: PortId,
 
    bytes_ptr: *const u8,
 
    bytes_len: usize,
 
) -> ErrorCode {
 
    StoredError::tl_clear();
 
    let bytes = &*slice_from_parts(bytes_ptr, bytes_len);
 
    match connector.put(port, Payload::from(bytes)) {
 
        Ok(()) => 0,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            -1
 
        }
 
    }
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_get(connector: &mut Connector, port: PortId) -> ErrorCode {
 
    match connector.get(port) {
 
        Ok(()) => 0,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            -1
 
        }
 
    }
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_next_batch(connector: &mut Connector) -> isize {
 
    match connector.next_batch() {
 
        Ok(n) => n as isize,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            -1
 
        }
 
    }
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_sync(connector: &mut Connector, timeout_millis: i64) -> isize {
 
    StoredError::tl_clear();
 
    let option_timeout_millis: Option<u64> = TryFrom::try_from(timeout_millis).ok();
 
    let timeout = option_timeout_millis.map(Duration::from_millis);
src/runtime/mod.rs
Show inline comments
 
mod communication;
 
mod endpoints;
 
pub mod error;
 
mod ffi;
 
mod logging;
 
mod setup;
 

	
 
#[cfg(test)]
 
mod tests;
 

	
 
use crate::common::*;
 
use error::*;
 

	
 
#[derive(Debug)]
 
pub struct RoundOk {
 
    batch_index: usize,
 
    gotten: HashMap<PortId, Payload>,
 
}
 
#[derive(Debug)]
 
pub struct VecSet<T: std::cmp::Ord> {
 
    // invariant: ordered, deduplicated
 
    vec: Vec<T>,
 
}
 
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
pub enum ComponentId {
 
    Native,
 
    Proto(ProtoComponentId),
 
}
 
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
pub enum Route {
 
    LocalComponent(ComponentId),
 
    Endpoint { index: usize },
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub struct MyPortInfo {
 
    polarity: Polarity,
 
    port: PortId,
 
}
 
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
 
pub enum Decision {
 
    Failure,
 
    Success(Predicate),
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub enum Msg {
 
    SetupMsg(SetupMsg),
 
    CommMsg(CommMsg),
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub enum SetupMsg {
 
    MyPortInfo(MyPortInfo),
 
    LeaderWave { wave_leader: ConnectorId },
 
    LeaderAnnounce { tree_leader: ConnectorId },
 
    YouAreMyParent,
 
    SessionGather { unoptimized_map: HashMap<ConnectorId, SessionInfo> },
 
    SessionScatter { optimized_map: HashMap<ConnectorId, SessionInfo> },
 
}
 
#[derive(Debug, Clone)]
 
pub(crate) struct SerdeProtocolDescription(Arc<ProtocolDescription>);
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub struct SessionInfo {
 
    serde_proto_description: SerdeProtocolDescription,
 
    port_info: PortInfo,
 
    proto_components: HashMap<ProtoComponentId, ProtoComponent>,
 
}
 

	
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub struct CommMsg {
 
    pub round_index: usize,
 
    pub contents: CommMsgContents,
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub enum CommMsgContents {
 
    SendPayload(SendPayloadMsg),
 
    Suggest { suggestion: Decision }, // SINKWARD
 
    Announce { decision: Decision },  // SINKAWAYS
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub struct SendPayloadMsg {
 
    predicate: Predicate,
 
    payload: Payload,
 
}
 
#[derive(Debug, PartialEq)]
 
pub enum CommonSatResult {
 
    FormerNotLatter,
 
    LatterNotFormer,
 
    Equivalent,
 
    New(Predicate),
 
    Nonexistant,
 
}
 
pub struct Endpoint {
 
    inbox: Vec<u8>,
 
    stream: TcpStream,
 
}
 
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
 
pub struct ProtoComponent {
 
    state: ComponentState,
 
    ports: HashSet<PortId>,
 
}
 
pub trait Logger: Debug {
 
    fn line_writer(&mut self) -> &mut dyn std::io::Write;
 
}
 
#[derive(Debug)]
 
pub struct VecLogger(ConnectorId, Vec<u8>);
 
#[derive(Debug)]
 
pub struct DummyLogger;
 
#[derive(Debug)]
 
pub struct FileLogger(ConnectorId, std::fs::File);
 
#[derive(Debug, Clone)]
 
pub struct EndpointSetup {
 
    pub sock_addr: SocketAddr,
 
    pub endpoint_polarity: EndpointPolarity,
 
}
 
#[derive(Debug)]
 
pub struct EndpointExt {
 
    endpoint: Endpoint,
 
    getter_for_incoming: PortId,
 
}
 
#[derive(Debug)]
 
pub struct Neighborhood {
 
    parent: Option<usize>,
 
    children: VecSet<usize>,
 
}
 
#[derive(Debug)]
 
pub struct MemInMsg {
 
    inp: PortId,
 
    msg: Payload,
 
}
 
#[derive(Debug)]
 
pub struct IdManager {
 
    connector_id: ConnectorId,
 
    port_suffix_stream: U32Stream,
 
    proto_component_suffix_stream: U32Stream,
 
}
 
#[derive(Debug)]
 
pub struct EndpointManager {
 
    // invariants:
 
    // 1. endpoint N is registered READ | WRITE with poller
 
    // 2. Events is empty
 
    poll: Poll,
 
    events: Events,
 
    polled_undrained: IndexSet<usize>,
 
    delayed_messages: Vec<(usize, Msg)>,
 
    undelayed_messages: Vec<(usize, Msg)>,
 
    endpoint_exts: Vec<EndpointExt>,
 
}
 
#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
 
pub struct PortInfo {
 
    polarities: HashMap<PortId, Polarity>,
 
    peers: HashMap<PortId, PortId>,
 
    routes: HashMap<PortId, Route>,
 
}
 
#[derive(Debug)]
 
// #[repr(C)]
 
pub struct Connector {
 
    unphased: ConnectorUnphased,
 
    phased: ConnectorPhased,
 
}
 
#[derive(Debug)]
 
pub struct ConnectorCommunication {
 
    round_index: usize,
 
    endpoint_manager: EndpointManager,
 
    neighborhood: Neighborhood,
 
    mem_inbox: Vec<MemInMsg>,
 
    native_batches: Vec<NativeBatch>,
 
    round_result: Result<Option<RoundOk>, SyncError>,
 
}
 
#[derive(Debug)]
 
pub struct ConnectorUnphased {
 
    proto_description: Arc<ProtocolDescription>,
 
    proto_components: HashMap<ProtoComponentId, ProtoComponent>,
 
    logger: Box<dyn Logger>,
 
    id_manager: IdManager,
 
    native_ports: HashSet<PortId>,
 
    port_info: PortInfo,
 
}
 
#[derive(Debug)]
 
pub enum ConnectorPhased {
 
    Setup { endpoint_setups: Vec<(PortId, EndpointSetup)>, surplus_sockets: u16 },
 
    Communication(Box<ConnectorCommunication>),
 
}
 
#[derive(Default, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
pub struct Predicate {
 
    pub assigned: BTreeMap<FiringVar, bool>,
 
}
 
#[derive(Debug, Default)]
 
pub struct NativeBatch {
 
    // invariant: putters' and getters' polarities respected
 
    to_put: HashMap<PortId, Payload>,
 
    to_get: HashSet<PortId>,
 
}
 
pub struct NonsyncProtoContext<'a> {
 
    logger: &'a mut dyn Logger,
 
    proto_component_id: ProtoComponentId,
 
    port_info: &'a mut PortInfo,
 
    id_manager: &'a mut IdManager,
 
    proto_component_ports: &'a mut HashSet<PortId>,
 
    unrun_components: &'a mut Vec<(ProtoComponentId, ProtoComponent)>,
 
}
 
pub struct SyncProtoContext<'a> {
 
    logger: &'a mut dyn Logger,
 
    predicate: &'a Predicate,
 
    port_info: &'a PortInfo,
 
    inbox: &'a HashMap<PortId, Payload>,
 
}
 
////////////////
 
pub fn random_connector_id() -> ConnectorId {
 
    type Bytes8 = [u8; std::mem::size_of::<ConnectorId>()];
 
    let mut bytes = Bytes8::default();
 
    getrandom::getrandom(&mut bytes).unwrap();
 
    unsafe { std::mem::transmute::<Bytes8, ConnectorId>(bytes) }
 
}
 
pub fn would_block(err: &std::io::Error) -> bool {
 
    err.kind() == std::io::ErrorKind::WouldBlock
 
}
 
impl<T: std::cmp::Ord> VecSet<T> {
 
    fn new(mut vec: Vec<T>) -> Self {
 
        vec.sort();
 
        vec.dedup();
 
        Self { vec }
 
    }
 
    fn contains(&self, element: &T) -> bool {
 
        self.vec.binary_search(element).is_ok()
 
    }
 
    fn insert(&mut self, element: T) -> bool {
 
        match self.vec.binary_search(&element) {
 
            Ok(_) => false,
 
            Err(index) => {
 
                self.vec.insert(index, element);
 
                true
 
            }
 
        }
 
    }
 
    fn iter(&self) -> std::slice::Iter<T> {
 
        self.vec.iter()
 
    }
 
}
 
impl PortInfo {
 
    fn firing_var_for(&self, port: PortId) -> FiringVar {
 
        FiringVar(match self.polarities.get(&port).unwrap() {
 
            Getter => port,
 
            Putter => *self.peers.get(&port).unwrap(),
 
        })
 
    }
 
}
 
impl IdManager {
 
    fn new(connector_id: ConnectorId) -> Self {
 
        Self {
 
            connector_id,
 
            port_suffix_stream: Default::default(),
 
            proto_component_suffix_stream: Default::default(),
 
        }
 
    }
 
    fn new_port_id(&mut self) -> PortId {
 
        Id { connector_id: self.connector_id, u32_suffix: self.port_suffix_stream.next() }.into()
 
    }
 
    fn new_proto_component_id(&mut self) -> ProtoComponentId {
 
        Id {
 
            connector_id: self.connector_id,
 
            u32_suffix: self.proto_component_suffix_stream.next(),
 
        }
 
        .into()
 
    }
 
}
 
impl Drop for Connector {
 
    fn drop(&mut self) {
 
        log!(&mut *self.unphased.logger, "Connector dropping. Goodbye!");
 
    }
 
}
 
impl Connector {
 
    fn random_id() -> ConnectorId {
 
        type Bytes8 = [u8; std::mem::size_of::<ConnectorId>()];
 
        let mut bytes = Bytes8::default();
 
        getrandom::getrandom(&mut bytes).unwrap();
 
        unsafe { std::mem::transmute::<Bytes8, ConnectorId>(bytes) }
 
    }
 
    pub fn swap_logger(&mut self, mut new_logger: Box<dyn Logger>) -> Box<dyn Logger> {
 
        std::mem::swap(&mut self.unphased.logger, &mut new_logger);
 
        new_logger
 
    }
 
    pub fn get_logger(&mut self) -> &mut dyn Logger {
 
        &mut *self.unphased.logger
 
    }
 
    pub fn new_port_pair(&mut self) -> [PortId; 2] {
 
        let cu = &mut self.unphased;
 
        // adds two new associated ports, related to each other, and exposed to the native
 
        let [o, i] = [cu.id_manager.new_port_id(), cu.id_manager.new_port_id()];
 
        cu.native_ports.insert(o);
 
        cu.native_ports.insert(i);
 
        // {polarity, peer, route} known. {} unknown.
 
        cu.port_info.polarities.insert(o, Putter);
 
        cu.port_info.polarities.insert(i, Getter);
 
        cu.port_info.peers.insert(o, i);
 
        cu.port_info.peers.insert(i, o);
 
        let route = Route::LocalComponent(ComponentId::Native);
 
        cu.port_info.routes.insert(o, route);
 
        cu.port_info.routes.insert(i, route);
 
        log!(cu.logger, "Added port pair (out->in) {:?} -> {:?}", o, i);
 
        [o, i]
 
    }
 
    pub fn add_component(
 
        &mut self,
 
        identifier: &[u8],
 
        ports: &[PortId],
 
    ) -> Result<(), AddComponentError> {
 
        // called by the USER. moves ports owned by the NATIVE
 
        use AddComponentError::*;
 
        // 1. check if this is OK
 
        let cu = &mut self.unphased;
 
        let polarities = cu.proto_description.component_polarities(identifier)?;
 
        if polarities.len() != ports.len() {
 
            return Err(WrongNumberOfParamaters { expected: polarities.len() });
 
        }
 
        for (&expected_polarity, port) in polarities.iter().zip(ports.iter()) {
 
            if !cu.native_ports.contains(port) {
 
                return Err(UnknownPort(*port));
 
            }
 
            if expected_polarity != *cu.port_info.polarities.get(port).unwrap() {
 
                return Err(WrongPortPolarity { port: *port, expected_polarity });
 
            }
 
        }
 
        // 3. remove ports from old component & update port->route
 
        let new_id = cu.id_manager.new_proto_component_id();
 
        for port in ports.iter() {
 
            cu.port_info.routes.insert(*port, Route::LocalComponent(ComponentId::Proto(new_id)));
 
        }
 
        cu.native_ports.retain(|port| !ports.contains(port));
 
        // 4. add new component
 
        cu.proto_components.insert(
 
            new_id,
 
            ProtoComponent {
 
                state: cu.proto_description.new_main_component(identifier, ports),
 
                ports: ports.iter().copied().collect(),
 
            },
 
        );
 
        Ok(())
 
    }
 
}
 
impl Predicate {
 
    #[inline]
 
    pub fn inserted(mut self, k: FiringVar, v: bool) -> Self {
 
        self.assigned.insert(k, v);
 
        self
 
    }
 
    // returns true IFF self.unify would return Equivalent OR FormerNotLatter
 
    pub fn satisfies(&self, other: &Self) -> bool {
 
        let mut s_it = self.assigned.iter();
 
        let mut s = if let Some(s) = s_it.next() {
 
            s
 
        } else {
 
            return other.assigned.is_empty();
 
        };
 
        for (oid, ob) in other.assigned.iter() {
 
            while s.0 < oid {
 
                s = if let Some(s) = s_it.next() {
 
                    s
 
                } else {
 
                    return false;
 
                };
 
            }
 
            if s.0 > oid || s.1 != ob {
 
                return false;
 
            }
 
        }
 
        true
 
    }
 

	
 
    /// Given self and other, two predicates, return the most general Predicate possible, N
 
    /// such that n.satisfies(self) && n.satisfies(other).
 
    /// If none exists Nonexistant is returned.
 
    /// If the resulting predicate is equivlanet to self, other, or both,
 
    /// FormerNotLatter, LatterNotFormer and Equivalent are returned respectively.
 
    /// otherwise New(N) is returned.
 
    pub fn common_satisfier(&self, other: &Self) -> CommonSatResult {
 
        use CommonSatResult as Csr;
 
        // iterators over assignments of both predicates. Rely on SORTED ordering of BTreeMap's keys.
 
        let [mut s_it, mut o_it] = [self.assigned.iter(), other.assigned.iter()];
 
        let [mut s, mut o] = [s_it.next(), o_it.next()];
 
        // lists of assignments in self but not other and vice versa.
 
        let [mut s_not_o, mut o_not_s] = [vec![], vec![]];
 
        loop {
 
            match [s, o] {
 
                [None, None] => break,
 
                [None, Some(x)] => {
 
                    o_not_s.push(x);
 
                    o_not_s.extend(o_it);
 
                    break;
 
                }
 
                [Some(x), None] => {
 
                    s_not_o.push(x);
 
                    s_not_o.extend(s_it);
 
                    break;
 
                }
 
                [Some((sid, sb)), Some((oid, ob))] => {
 
                    if sid < oid {
 
                        // o is missing this element
 
                        s_not_o.push((sid, sb));
 
                        s = s_it.next();
 
                    } else if sid > oid {
 
                        // s is missing this element
 
                        o_not_s.push((oid, ob));
 
                        o = o_it.next();
 
                    } else if sb != ob {
 
                        assert_eq!(sid, oid);
 
                        // both predicates assign the variable but differ on the value
 
                        return Csr::Nonexistant;
 
                    } else {
 
                        // both predicates assign the variable to the same value
 
                        s = s_it.next();
 
                        o = o_it.next();
 
                    }
 
                }
 
            }
 
        }
 
        // Observed zero inconsistencies. A unified predicate exists...
 
        match [s_not_o.is_empty(), o_not_s.is_empty()] {
 
            [true, true] => Csr::Equivalent,       // ... equivalent to both.
 
            [false, true] => Csr::FormerNotLatter, // ... equivalent to self.
 
            [true, false] => Csr::LatterNotFormer, // ... equivalent to other.
 
            [false, false] => {
 
                // ... which is the union of the predicates' assignments but
 
                //     is equivalent to neither self nor other.
 
                let mut new = self.clone();
 
                for (&id, &b) in o_not_s {
 
                    new.assigned.insert(id, b);
 
                }
 
                Csr::New(new)
 
            }
 
        }
 
    }
 
    pub fn union_with(&self, other: &Self) -> Option<Self> {
 
        let mut res = self.clone();
 
        for (&channel_id, &assignment_1) in other.assigned.iter() {
 
            match res.assigned.insert(channel_id, assignment_1) {
 
                Some(assignment_2) if assignment_1 != assignment_2 => return None,
 
                _ => {}
 
            }
 
        }
 
        Some(res)
 
    }
 
    pub fn query(&self, var: FiringVar) -> Option<bool> {
 
        self.assigned.get(&var).copied()
 
    }
 
}
 
impl<T: Debug + std::cmp::Ord> Debug for VecSet<T> {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        f.debug_set().entries(self.vec.iter()).finish()
 
    }
 
}
 
impl Debug for Predicate {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        struct MySet<'a>(&'a Predicate, bool);
 
        impl Debug for MySet<'_> {
 
            fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
                let iter = self.0.assigned.iter().filter_map(|(port, &firing)| {
 
                    if firing == self.1 {
 
                        Some(port)
 
                    } else {
 
                        None
 
                    }
 
                });
 
                f.debug_set().entries(iter).finish()
 
            }
 
        }
 
        f.debug_struct("Predicate")
 
            .field("Trues", &MySet(self, true))
 
            .field("Falses", &MySet(self, false))
 
            .finish()
 
    }
 
}
 

	
 
impl serde::Serialize for SerdeProtocolDescription {
 
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
 
    where
 
        S: serde::Serializer,
 
    {
 
        let inner: &ProtocolDescription = &self.0;
 
        inner.serialize(serializer)
 
    }
 
}
 
impl<'de> serde::Deserialize<'de> for SerdeProtocolDescription {
 
    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
 
    where
 
        D: serde::Deserializer<'de>,
 
    {
 
        let inner: ProtocolDescription = ProtocolDescription::deserialize(deserializer)?;
 
        Ok(Self(Arc::new(inner)))
 
    }
 
}
src/runtime/tests.rs
Show inline comments
 
@@ -87,384 +87,394 @@ fn single_node_connect() {
 

	
 
#[test]
 
fn minimal_net_connect() {
 
    let sock_addr = next_test_addr();
 
    let test_log_path = Path::new("./logs/minimal_net_connect");
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let _ = c.new_net_port(Getter, sock_addr, Active).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let _ = c.new_net_port(Putter, sock_addr, Passive).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn put_no_sync() {
 
    let test_log_path = Path::new("./logs/put_no_sync");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [o, _] = c.new_port_pair();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.put(o, (b"hi" as &[_]).into()).unwrap();
 
}
 

	
 
#[test]
 
fn wrong_polarity_bad() {
 
    let test_log_path = Path::new("./logs/wrong_polarity_bad");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, i] = c.new_port_pair();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.put(i, (b"hi" as &[_]).into()).unwrap_err();
 
}
 

	
 
#[test]
 
fn dup_put_bad() {
 
    let test_log_path = Path::new("./logs/dup_put_bad");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [o, _] = c.new_port_pair();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.put(o, (b"hi" as &[_]).into()).unwrap();
 
    c.put(o, (b"hi" as &[_]).into()).unwrap_err();
 
}
 

	
 
#[test]
 
fn trivial_sync() {
 
    let test_log_path = Path::new("./logs/trivial_sync");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.sync(Some(Duration::from_secs(1))).unwrap();
 
}
 

	
 
#[test]
 
fn unconnected_gotten_err() {
 
    let test_log_path = Path::new("./logs/unconnected_gotten_err");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, i] = c.new_port_pair();
 
    assert_eq!(reowolf::error::GottenError::NoPreviousRound, c.gotten(i).unwrap_err());
 
}
 

	
 
#[test]
 
fn connected_gotten_err_no_round() {
 
    let test_log_path = Path::new("./logs/connected_gotten_err_no_round");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, i] = c.new_port_pair();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    assert_eq!(reowolf::error::GottenError::NoPreviousRound, c.gotten(i).unwrap_err());
 
}
 

	
 
#[test]
 
fn connected_gotten_err_ungotten() {
 
    let test_log_path = Path::new("./logs/connected_gotten_err_ungotten");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, i] = c.new_port_pair();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.sync(Some(Duration::from_secs(1))).unwrap();
 
    assert_eq!(reowolf::error::GottenError::PortDidntGet, c.gotten(i).unwrap_err());
 
}
 

	
 
#[test]
 
fn native_polarity_checks() {
 
    let test_log_path = Path::new("./logs/native_polarity_checks");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [o, i] = c.new_port_pair();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    // fail...
 
    c.get(o).unwrap_err();
 
    c.put(i, (b"hi" as &[_]).into()).unwrap_err();
 
    // succeed..
 
    c.get(i).unwrap();
 
    c.put(o, (b"hi" as &[_]).into()).unwrap();
 
}
 

	
 
#[test]
 
fn native_multiple_gets() {
 
    let test_log_path = Path::new("./logs/native_multiple_gets");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, i] = c.new_port_pair();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.get(i).unwrap();
 
    c.get(i).unwrap_err();
 
}
 

	
 
#[test]
 
fn next_batch() {
 
    let test_log_path = Path::new("./logs/next_batch");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    c.next_batch().unwrap_err();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.next_batch().unwrap();
 
    c.next_batch().unwrap();
 
    c.next_batch().unwrap();
 
}
 

	
 
#[test]
 
fn native_self_msg() {
 
    let test_log_path = Path::new("./logs/native_self_msg");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [o, i] = c.new_port_pair();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.get(i).unwrap();
 
    c.put(o, (b"hi" as &[_]).into()).unwrap();
 
    c.sync(Some(Duration::from_secs(1))).unwrap();
 
}
 

	
 
#[test]
 
fn two_natives_msg() {
 
    let test_log_path = Path::new("./logs/two_natives_msg");
 
    let sock_addr = next_test_addr();
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let g = c.new_net_port(Getter, sock_addr, Active).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.get(g).unwrap();
 
            c.sync(Some(Duration::from_secs(1))).unwrap();
 
            c.gotten(g).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let p = c.new_net_port(Putter, sock_addr, Passive).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.put(p, (b"hello" as &[_]).into()).unwrap();
 
            c.sync(Some(Duration::from_secs(1))).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn trivial_nondet() {
 
    let test_log_path = Path::new("./logs/trivial_nondet");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, i] = c.new_port_pair();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.get(i).unwrap();
 
    // getting 0 batch
 
    c.next_batch().unwrap();
 
    // silent 1 batch
 
    assert_eq!(1, c.sync(Some(Duration::from_secs(1))).unwrap());
 
    c.gotten(i).unwrap_err();
 
}
 

	
 
#[test]
 
fn connector_pair_nondet() {
 
    let test_log_path = Path::new("./logs/connector_pair_nondet");
 
    let sock_addr = next_test_addr();
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let g = c.new_net_port(Getter, sock_addr, Active).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.next_batch().unwrap();
 
            c.get(g).unwrap();
 
            assert_eq!(1, c.sync(Some(Duration::from_secs(1))).unwrap());
 
            c.gotten(g).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let p = c.new_net_port(Putter, sock_addr, Passive).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.put(p, (b"hello" as &[_]).into()).unwrap();
 
            c.sync(Some(Duration::from_secs(1))).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn native_immediately_inconsistent() {
 
    let test_log_path = Path::new("./logs/native_immediately_inconsistent");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, g] = c.new_port_pair();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.get(g).unwrap();
 
    c.sync(Some(Duration::from_secs(30))).unwrap_err();
 
}
 

	
 
#[test]
 
fn cannot_use_moved_ports() {
 
    /*
 
    native p|-->|g sync
 
    */
 
    let test_log_path = Path::new("./logs/cannot_use_moved_ports");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [p, g] = c.new_port_pair();
 
    c.add_component(b"sync", &[g, p]).unwrap();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.put(p, (b"hello" as &[_]).into()).unwrap_err();
 
    c.get(g).unwrap_err();
 
}
 

	
 
#[test]
 
fn sync_sync() {
 
    /*
 
    native p0|-->|g0 sync
 
           g1|<--|p1
 
    */
 
    let test_log_path = Path::new("./logs/sync_sync");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [p0, g0] = c.new_port_pair();
 
    let [p1, g1] = c.new_port_pair();
 
    c.add_component(b"sync", &[g0, p1]).unwrap();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.put(p0, (b"hello" as &[_]).into()).unwrap();
 
    c.get(g1).unwrap();
 
    c.sync(Some(Duration::from_secs(1))).unwrap();
 
    c.gotten(g1).unwrap();
 
}
 

	
 
#[test]
 
fn double_net_connect() {
 
    let test_log_path = Path::new("./logs/double_net_connect");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let [_p, _g] = [
 
                c.new_net_port(Putter, sock_addrs[0], Active).unwrap(),
 
                c.new_net_port(Getter, sock_addrs[1], Active).unwrap(),
 
            ];
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let [_g, _p] = [
 
                c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(),
 
                c.new_net_port(Putter, sock_addrs[1], Passive).unwrap(),
 
            ];
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn distributed_msg_bounce() {
 
    /*
 
    native[0] | sync 0.p|-->|1.p native[1]
 
                     0.g|<--|1.g
 
    */
 
    let test_log_path = Path::new("./logs/distributed_msg_bounce");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            /*
 
            native | sync p|-->
 
                   |      g|<--
 
            */
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let [p, g] = [
 
                c.new_net_port(Putter, sock_addrs[0], Active).unwrap(),
 
                c.new_net_port(Getter, sock_addrs[1], Active).unwrap(),
 
            ];
 
            c.add_component(b"sync", &[g, p]).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.sync(Some(Duration::from_secs(1))).unwrap();
 
        });
 
        s.spawn(|_| {
 
            /*
 
            native p|-->
 
                   g|<--
 
            */
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let [g, p] = [
 
                c.new_net_port(Getter, sock_addrs[0], Passive).unwrap(),
 
                c.new_net_port(Putter, sock_addrs[1], Passive).unwrap(),
 
            ];
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.put(p, (b"hello" as &[_]).into()).unwrap();
 
            c.get(g).unwrap();
 
            c.sync(Some(Duration::from_secs(1))).unwrap();
 
            c.gotten(g).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn local_timeout() {
 
    let test_log_path = Path::new("./logs/local_timeout");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, g] = c.new_port_pair();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.get(g).unwrap();
 
    match c.sync(Some(Duration::from_millis(200))) {
 
        Err(SyncError::RoundFailure) => {}
 
        res => panic!("expeted timeout. but got {:?}", res),
 
    }
 
}
 

	
 
#[test]
 
fn parent_timeout() {
 
    let test_log_path = Path::new("./logs/parent_timeout");
 
    let sock_addr = next_test_addr();
 
    scope(|s| {
 
        s.spawn(|_| {
 
            // parent; times out
 
            let mut c = file_logged_connector(999, test_log_path);
 
            let _ = c.new_net_port(Putter, sock_addr, Active).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.sync(Some(Duration::from_millis(300))).unwrap_err(); // timeout
 
        });
 
        s.spawn(|_| {
 
            // child
 
            let mut c = file_logged_connector(000, test_log_path);
 
            let g = c.new_net_port(Getter, sock_addr, Passive).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.get(g).unwrap(); // not matched by put
 
            c.sync(None).unwrap_err(); // no timeout
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn child_timeout() {
 
    let test_log_path = Path::new("./logs/child_timeout");
 
    let sock_addr = next_test_addr();
 
    scope(|s| {
 
        s.spawn(|_| {
 
            // child; times out
 
            let mut c = file_logged_connector(000, test_log_path);
 
            let _ = c.new_net_port(Putter, sock_addr, Active).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.sync(Some(Duration::from_millis(300))).unwrap_err(); // timeout
 
        });
 
        s.spawn(|_| {
 
            // parent
 
            let mut c = file_logged_connector(999, test_log_path);
 
            let g = c.new_net_port(Getter, sock_addr, Passive).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.get(g).unwrap(); // not matched by put
 
            c.sync(None).unwrap_err(); // no timeout
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn chain_connect() {
 
    let test_log_path = Path::new("./logs/chain_connect");
 
    let sock_addrs = [next_test_addr(), next_test_addr(), next_test_addr(), next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(0, test_log_path);
 
            c.new_net_port(Putter, sock_addrs[0], Passive).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(10, test_log_path);
 
            c.new_net_port(Getter, sock_addrs[0], Active).unwrap();
 
            c.new_net_port(Putter, sock_addrs[1], Passive).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
        });
 
        s.spawn(|_| {
 
            // LEADER
 
            let mut c = file_logged_connector(7, test_log_path);
 
            c.new_net_port(Getter, sock_addrs[1], Active).unwrap();
 
            c.new_net_port(Putter, sock_addrs[2], Passive).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(4, test_log_path);
 
            c.new_net_port(Getter, sock_addrs[2], Active).unwrap();
 
            c.new_net_port(Putter, sock_addrs[3], Passive).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, test_log_path);
0 comments (0 inline, 0 general)