Changeset - 7f75e1f23836
[Not reviewed]
0 5 2
Christopher Esterhuyse - 5 years ago 2020-07-14 13:28:54
christopher.esterhuyse@gmail.com
bugfix: regression from previous vers: proto components correctly enforce that each of their ports' firing variables are assigned FIRING iff that port put or get during the round
7 files changed with 219 insertions and 26 deletions:
0 comments (0 inline, 0 general)
Cargo.toml
Show inline comments
 
[package]
 
name = "reowolf_rs"
 
version = "0.1.4"
 
authors = [
 
	"Christopher Esterhuyse <esterhuy@cwi.nl, christopher.esterhuyse@gmail.com>",
 
	"Hans-Dieter Hiep <hdh@cwi.nl>"
 
]
 
edition = "2018"
 

	
 
[dependencies]
 
# convenience macros
 
maplit = "1.0.2"
 
derive_more = "0.99.2"
 

	
 
# runtime
 
bincode = "1.3.1"
 
serde = { version = "1.0.114", features = ["derive"] }
 
getrandom = "0.1.14" # tiny crate. used to guess controller-id
 

	
 
# network
 
mio = { version = "0.7.0", package = "mio", features = ["udp", "tcp", "os-poll"] }
 

	
 
# protocol
 
backtrace = "0.3"
 

	
 
# socket ffi
 
lazy_static = { version = "1.4.0", optional = true}
 
atomic_refcell = { version = "0.1.6", optional = true }
 

	
 
[dev-dependencies]
 
# test-generator = "0.3.0"
 
crossbeam-utils = "0.7.2"
 
lazy_static = "1.4.0"
 

	
 
[lib]
 
# compile target: dynamically linked library using C ABI
 
crate-type = ["cdylib"]
 

	
 
[features]
 
default = ["ffi", "session_optimization", "ffi_socket_api"]
 
default = ["ffi", "ffi_socket_api"] # // "session_optimization", 
 
ffi = [] # see src/ffi.rs
 
ffi_socket_api = ["ffi", "lazy_static", "atomic_refcell"]
 
endpoint_logging = [] # see src/macros.rs
 
session_optimization = [] # see src/runtime/setup.rs
 
\ No newline at end of file
examples/eg_protocols.pdl
Show inline comments
 
primitive pres_2(in i, out o) {
 
  synchronous {
 
    //put(o, get(i));
 
    put(o, get(i));
 
  }
 
}
 
primitive together(in ia, in ib, out oa, out ob){
 
  while(true) synchronous {
 
    if(fires(ia)) {
 
      put(oa, get(ia));
 
      put(ob, get(ib));
 
    }
 
  }	
 
}
 

	
 
primitive alt_round_merger(in a, in b, out c){
 
  while(true) {
 
    synchronous{ put(c, get(a)); }
 
    synchronous{ put(c, get(b)); }
 
  }	
 
}
examples/pres_5/amy.c
Show inline comments
 
new file 100644
 

	
 
#include "../../reowolf.h"
 
#include "../utility.c"
 

	
 

	
 
int main(int argc, char** argv) {
 
	// Create a connector, configured with our (trivial) protocol.
 
	Arc_ProtocolDescription * pd = protocol_description_parse("", 0);
 
	char logpath[] = "./pres_3_amy.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
	rw_err_peek(c);
 
	
 
	// ... with 2 outgoing network connections
 
	PortId ports[2];
 
	char * addr = "127.0.0.1:8000";
 
	connector_add_net_port(c, &ports[0], addr, strlen(addr),
 
			Polarity_Putter, EndpointPolarity_Passive);
 
	rw_err_peek(c);
 
	addr = "127.0.0.1:8001";
 
	connector_add_net_port(c, &ports[1], addr, strlen(addr),
 
			Polarity_Putter, EndpointPolarity_Passive);
 
	rw_err_peek(c);
 
	
 
	// Connect with peers (5000ms timeout).
 
	connector_connect(c, 5000);
 
	rw_err_peek(c);
 

	
 
	printf("Round 0. Putting {ports[0]=\"r0p0\", ports[1]=\"r0p1\"}\n");
 
	connector_put_bytes(c, ports[0], "r0p0", 4);
 
	connector_put_bytes(c, ports[1], "r0p1", 4);
 
	connector_sync(c, 1000);
 
	rw_err_peek(c);
 

	
 
	printf("Round 1. Putting {ports[1]=\"r1p1\"}\n");
 
	connector_put_bytes(c, ports[1], "r1p1", 4);
 
	connector_sync(c, 1000);
 
	rw_err_peek(c);
 

	
 
	printf("Round 2. Putting {ports[0]=\"r2p0\"}\n");
 
	connector_put_bytes(c, ports[0], "r2p0", 4);
 
	connector_sync(c, 1000);
 
	rw_err_peek(c);
 

	
 
	printf("\nExiting\n");
 
	protocol_description_destroy(pd);
 
	connector_destroy(c);
 
	sleep(1.0);
 
	return 0;
 
}
 
\ No newline at end of file
examples/pres_5/bob.c
Show inline comments
 
new file 100644
 

	
 
#include "../../reowolf.h"
 
#include "../utility.c"
 

	
 

	
 
int main(int argc, char** argv) {
 
	// Create a connector, configured with a protocol defined in a file
 
	char * pdl = buffer_pdl("./eg_protocols.pdl");
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, strlen(pdl));
 
	char logpath[] = "./pres_3_bob.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
	rw_err_peek(c);
 

	
 
	// ... with 2 outgoing network connections
 
	PortId ports[4];
 
	char * addr = "127.0.0.1:8000";
 
	connector_add_net_port(c, &ports[0], addr, strlen(addr),
 
			Polarity_Getter, EndpointPolarity_Active);
 
	rw_err_peek(c);
 
	addr = "127.0.0.1:8001";
 
	connector_add_net_port(c, &ports[1], addr, strlen(addr),
 
			Polarity_Getter, EndpointPolarity_Active);
 
	connector_add_port_pair(c, &ports[2], &ports[3]);
 
	connector_add_component(c, "alt_round_merger", 16, ports, 3);
 
	rw_err_peek(c);
 
	
 
	// Connect with peers (5000ms timeout).
 
	connector_connect(c, 5000);
 
	rw_err_peek(c);
 

	
 
	for(int round=0; round<3; round++) {
 
		printf("\nRound %d\n", round);
 
		connector_get(c, ports[3]);
 
		rw_err_peek(c);
 
		connector_sync(c, 1000);
 
		rw_err_peek(c);
 

	
 
		size_t msg_len = 0;
 
		const char * msg_ptr = connector_gotten_bytes(c, ports[3], &msg_len);
 
		printf("Got msg `%.*s`\n", msg_len, msg_ptr);
 
	}
 
	
 
	printf("Exiting\n");
 
	protocol_description_destroy(pd);
 
	connector_destroy(c);
 
	free(pdl);
 
	sleep(1.0);
 
	return 0;
 
}
 
\ No newline at end of file
src/protocol/inputsource.rs
Show inline comments
 
use std::fmt;
 
use std::fs::File;
 
use std::io;
 
use std::path::Path;
 

	
 
use backtrace::Backtrace;
 

	
 
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
 
pub struct InputSource {
 
    filename: String,
 
    input: Vec<u8>,
 
    line: usize,
 
    column: usize,
 
    offset: usize,
 
}
 

	
 
static STD_LIB_PDL: &'static [u8] = b"
 
primitive forward(in i, out o) {
 
    while(true) synchronous() put(o, get(i));
 
}
 
primitive sync(in i, out o) {
 
    while(true) synchronous() if(fires(i)) put(o, get(i));
 
}
 
primitive alternator_2(in i, out l, out r) {
 
    while(true) {
 
        synchronous() put(l, get(i));
 
        synchronous() put(r, get(i));
 
        synchronous() if(fires(i)) put(l, get(i));
 
        synchronous() if(fires(i)) put(r, get(i));
 
    }
 
}
 
primitive replicator_2(in i, out l, out r) {
 
    while(true) synchronous() if(fires(i)) {
 
        msg m = get(i);
 
        put(l, m);
 
        put(r, m);
 
    }
 
}
 
primitive merger_2(in l, in r, out o) {
 
    while(true) synchronous {
 
        if(fires(l))      put(o, get(l));
 
        else         put(o, get(r));
 
        else if(fires(r)) put(o, get(r));
 
    }
 
}";
 

	
 
impl InputSource {
 
    // Constructors
 
    pub fn new<R: io::Read, S: ToString>(filename: S, reader: &mut R) -> io::Result<InputSource> {
 
        let mut vec = STD_LIB_PDL.to_vec();
 
        reader.read_to_end(&mut vec)?;
 
        Ok(InputSource {
 
            filename: filename.to_string(),
 
            input: vec,
 
            line: 1,
 
            column: 1,
 
            offset: 0,
 
        })
 
    }
 
    // Constructor helpers
 
    pub fn from_file(path: &Path) -> io::Result<InputSource> {
 
        let filename = path.file_name();
 
        match filename {
 
            Some(filename) => {
 
                let mut f = File::open(path)?;
 
                InputSource::new(filename.to_string_lossy(), &mut f)
 
            }
 
            None => Err(io::Error::new(io::ErrorKind::NotFound, "Invalid path")),
 
        }
 
    }
 
    pub fn from_string(string: &str) -> io::Result<InputSource> {
 
        let buffer = Box::new(string);
 
        let mut bytes = buffer.as_bytes();
 
        InputSource::new(String::new(), &mut bytes)
 
    }
 
    pub fn from_buffer(buffer: &[u8]) -> io::Result<InputSource> {
 
        InputSource::new(String::new(), &mut Box::new(buffer))
 
    }
 
    // Internal methods
 
    pub fn pos(&self) -> InputPosition {
 
        InputPosition { line: self.line, column: self.column, offset: self.offset }
 
    }
 
    pub fn error<S: ToString>(&self, message: S) -> ParseError {
 
        self.pos().parse_error(message)
 
    }
 
    pub fn is_eof(&self) -> bool {
 
        self.next() == None
 
    }
 
    pub fn next(&self) -> Option<u8> {
 
        if self.offset < self.input.len() {
 
            Some((*self.input)[self.offset])
 
        } else {
 
            None
 
        }
 
    }
 
    pub fn lookahead(&self, pos: usize) -> Option<u8> {
 
        if let Some(x) = usize::checked_add(self.offset, pos) {
 
            if x < self.input.len() {
 
                return Some((*self.input)[x]);
 
            }
 
        }
 
        None
 
    }
 
    pub fn consume(&mut self) {
 
        match self.next() {
 
            Some(x) if x == b'\r' && self.lookahead(1) != Some(b'\n') || x == b'\n' => {
 
                self.line += 1;
 
                self.offset += 1;
 
                self.column = 1;
 
            }
 
            Some(_) => {
 
                self.offset += 1;
 
                self.column += 1;
 
            }
 
            None => {}
 
        }
 
    }
 
}
 

	
 
impl fmt::Display for InputSource {
 
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 
        self.pos().fmt(f)
 
    }
 
}
 

	
 
#[derive(Debug, Clone, Copy, Default, serde::Serialize, serde::Deserialize)]
 
pub struct InputPosition {
 
    line: usize,
 
    column: usize,
 
    offset: usize,
 
}
 

	
 
impl InputPosition {
 
    fn context<'a>(&self, source: &'a InputSource) -> &'a [u8] {
 
        let start = self.offset - (self.column - 1);
 
        let mut end = self.offset;
 
        while end < source.input.len() {
 
            let cur = (*source.input)[end];
 
            if cur == b'\n' || cur == b'\r' {
src/runtime/communication.rs
Show inline comments
 
use super::*;
 
use crate::common::*;
 

	
 
////////////////
 
#[derive(Default)]
 
struct GetterBuffer {
 
    getters_and_sends: Vec<(PortId, SendPayloadMsg)>,
 
}
 
struct RoundCtx {
 
    solution_storage: SolutionStorage,
 
    spec_var_stream: SpecVarStream,
 
    getter_buffer: GetterBuffer,
 
    deadline: Option<Instant>,
 
}
 
struct BranchingNative {
 
    branches: HashMap<Predicate, NativeBranch>,
 
}
 
#[derive(Clone, Debug)]
 
struct NativeBranch {
 
    index: usize,
 
    gotten: HashMap<PortId, Payload>,
 
    to_get: HashSet<PortId>,
 
}
 
#[derive(Debug)]
 
struct SolutionStorage {
 
    old_local: HashSet<Predicate>,
 
    new_local: HashSet<Predicate>,
 
    // this pair acts as SubtreeId -> HashSet<Predicate> which is friendlier to iteration
 
    subtree_solutions: Vec<HashSet<Predicate>>,
 
    subtree_id_to_index: HashMap<SubtreeId, usize>,
 
}
 
#[derive(Debug)]
 
struct BranchingProtoComponent {
 
    ports: HashSet<PortId>,
 
    branches: HashMap<Predicate, ProtoComponentBranch>,
 
}
 
#[derive(Debug, Clone)]
 
struct ProtoComponentBranch {
 
    did_put: HashSet<PortId>,
 
    inbox: HashMap<PortId, Payload>,
 
    state: ComponentState,
 
    untaken_choice: Option<u16>,
 
    ended: bool,
 
}
 
struct CyclicDrainer<'a, K: Eq + Hash, V> {
 
    input: &'a mut HashMap<K, V>,
 
    inner: CyclicDrainInner<'a, K, V>,
 
}
 
struct CyclicDrainInner<'a, K: Eq + Hash, V> {
 
    swap: &'a mut HashMap<K, V>,
 
    output: &'a mut HashMap<K, V>,
 
}
 
trait ReplaceBoolTrue {
 
    fn replace_with_true(&mut self) -> bool;
 
}
 
impl ReplaceBoolTrue for bool {
 
    fn replace_with_true(&mut self) -> bool {
 
        let was = *self;
 
        *self = true;
 
        !was
 
    }
 
}
 

	
 
////////////////
 
impl RoundCtxTrait for RoundCtx {
 
    fn get_deadline(&self) -> &Option<Instant> {
 
        &self.deadline
 
    }
 
    fn getter_add(&mut self, getter: PortId, msg: SendPayloadMsg) {
 
        self.getter_buffer.getter_add(getter, msg)
 
    }
 
}
 
impl Connector {
 
    fn get_comm_mut(&mut self) -> Option<&mut ConnectorCommunication> {
 
        if let ConnectorPhased::Communication(comm) = &mut self.phased {
 
            Some(comm)
 
        } else {
 
            None
 
        }
 
    }
 
    // pub(crate) fn get_mut_udp_sock(&mut self, index: usize) -> Option<&mut UdpSocket> {
 
    //     let sock = &mut self
 
    //         .get_comm_mut()?
 
    //         .endpoint_manager
 
    //         .udp_endpoint_store
 
    //         .endpoint_exts
 
    //         .get_mut(index)?
 
    //         .sock;
 
    //     Some(sock)
 
    // }
 
    pub fn gotten(&mut self, port: PortId) -> Result<&Payload, GottenError> {
 
        use GottenError as Ge;
 
        let comm = self.get_comm_mut().ok_or(Ge::NoPreviousRound)?;
 
        match &comm.round_result {
 
            Err(_) => Err(Ge::PreviousSyncFailed),
 
            Ok(None) => Err(Ge::NoPreviousRound),
 
            Ok(Some(round_ok)) => round_ok.gotten.get(&port).ok_or(Ge::PortDidntGet),
 
        }
 
    }
 
    pub fn next_batch(&mut self) -> Result<usize, WrongStateError> {
 
        // returns index of new batch
 
        let comm = self.get_comm_mut().ok_or(WrongStateError)?;
 
        comm.native_batches.push(Default::default());
 
        Ok(comm.native_batches.len() - 1)
 
    }
 
    fn port_op_access(
 
        &mut self,
 
        port: PortId,
 
        expect_polarity: Polarity,
 
    ) -> Result<&mut NativeBatch, PortOpError> {
 
        use PortOpError as Poe;
 
        let Self { unphased, phased } = self;
 
        if !unphased.native_ports.contains(&port) {
 
            return Err(Poe::PortUnavailable);
 
        }
 
        match unphased.port_info.polarities.get(&port) {
 
            Some(p) if *p == expect_polarity => {}
 
            Some(_) => return Err(Poe::WrongPolarity),
 
            None => return Err(Poe::UnknownPolarity),
 
        }
 
        match phased {
 
            ConnectorPhased::Setup { .. } => Err(Poe::NotConnected),
 
            ConnectorPhased::Communication(comm) => {
 
                let batch = comm.native_batches.last_mut().unwrap(); // length >= 1 is invariant
 
                Ok(batch)
 
            }
 
        }
 
    }
 
    pub fn put(&mut self, port: PortId, payload: Payload) -> Result<(), PortOpError> {
 
        use PortOpError as Poe;
 
        let batch = self.port_op_access(port, Putter)?;
 
        if batch.to_put.contains_key(&port) {
 
            Err(Poe::MultipleOpsOnPort)
 
        } else {
 
            batch.to_put.insert(port, payload);
 
            Ok(())
 
        }
 
    }
 
    pub fn get(&mut self, port: PortId) -> Result<(), PortOpError> {
 
        use PortOpError as Poe;
 
        let batch = self.port_op_access(port, Getter)?;
 
        if batch.to_get.insert(port) {
 
            Ok(())
 
        } else {
 
            Err(Poe::MultipleOpsOnPort)
 
        }
 
    }
 
    // entrypoint for caller. overwrites round result enum, and returns what happened
 
    pub fn sync(&mut self, timeout: Option<Duration>) -> Result<usize, SyncError> {
 
        let Self { unphased: cu, phased } = self;
 
        match phased {
 
            ConnectorPhased::Setup { .. } => Err(SyncError::NotConnected),
 
            ConnectorPhased::Communication(comm) => {
 
                match &comm.round_result {
 
                    Err(SyncError::Unrecoverable(e)) => {
 
                        log!(cu.logger, "Attempted to start sync round, but previous error {:?} was unrecoverable!", e);
 
                        return Err(SyncError::Unrecoverable(e.clone()));
 
                    }
 
                    _ => {}
 
                }
 
                comm.round_result = Self::connected_sync(cu, comm, timeout);
 
                comm.round_index += 1;
 
                match &comm.round_result {
 
                    Ok(None) => unreachable!(),
 
                    Ok(Some(ok_result)) => Ok(ok_result.batch_index),
 
                    Err(sync_error) => Err(sync_error.clone()),
 
                }
 
            }
 
        }
 
    }
 
    // private function. mutates state but returns with round
 
    // result ASAP (allows for convenient error return with ?)
 
    fn connected_sync(
 
        cu: &mut ConnectorUnphased,
 
        comm: &mut ConnectorCommunication,
 
        timeout: Option<Duration>,
 
    ) -> Result<Option<RoundOk>, SyncError> {
 
        //////////////////////////////////
 
        use SyncError as Se;
 
        //////////////////////////////////
 
        log!(
 
            cu.logger,
 
            "~~~ SYNC called with timeout {:?}; starting round {}",
 
            &timeout,
 
            comm.round_index
 
        );
 

	
 
        // 1. run all proto components to Nonsync blockers
 
        // NOTE: original components are immutable until Decision::Success
 
        let mut branching_proto_components =
 
            HashMap::<ProtoComponentId, BranchingProtoComponent>::default();
 
        let mut unrun_components: Vec<(ProtoComponentId, ProtoComponent)> =
 
            cu.proto_components.iter().map(|(&k, v)| (k, v.clone())).collect();
 
        log!(cu.logger, "Nonsync running {} proto components...", unrun_components.len());
 
        // drains unrun_components, and populates branching_proto_components.
 
        while let Some((proto_component_id, mut component)) = unrun_components.pop() {
 
            // TODO coalesce fields
 
            log!(
 
                cu.logger,
 
                "Nonsync running proto component with ID {:?}. {} to go after this",
 
                proto_component_id,
 
                unrun_components.len()
 
            );
 
            let mut ctx = NonsyncProtoContext {
 
                logger: &mut *cu.logger,
 
                port_info: &mut cu.port_info,
 
                id_manager: &mut cu.id_manager,
 
                proto_component_id,
 
                unrun_components: &mut unrun_components,
 
                proto_component_ports: &mut cu
 
                    .proto_components
 
                    .get_mut(&proto_component_id)
 
                    .unwrap() // unrun_components' keys originate from proto_components
 
                    .ports,
 
            };
 
            let blocker = component.state.nonsync_run(&mut ctx, &cu.proto_description);
 
            log!(
 
                cu.logger,
 
                "proto component {:?} ran to nonsync blocker {:?}",
 
                proto_component_id,
 
                &blocker
 
            );
 
            use NonsyncBlocker as B;
 
            match blocker {
 
                B::ComponentExit => drop(component),
 
                B::Inconsistent => return Err(Se::InconsistentProtoComponent(proto_component_id)),
 
                B::SyncBlockStart => {
 
                    branching_proto_components
 
                        .insert(proto_component_id, BranchingProtoComponent::initial(component));
 
                }
 
            }
 
        }
 
        log!(
 
            cu.logger,
 
            "All {} proto components are now done with Nonsync phase",
 
            branching_proto_components.len(),
 
        );
 

	
 
        // Create temp structures needed for the synchronous phase of the round
 
        let mut rctx = RoundCtx {
 
            solution_storage: {
 
                let n = std::iter::once(SubtreeId::LocalComponent(ComponentId::Native));
 
                let c = cu
 
                    .proto_components
 
                    .keys()
 
                    .map(|&id| SubtreeId::LocalComponent(ComponentId::Proto(id)));
 
                let e = comm
 
                    .neighborhood
 
                    .children
 
                    .iter()
 
                    .map(|&index| SubtreeId::NetEndpoint { index });
 
                let subtree_id_iter = n.chain(c).chain(e);
 
                log!(
 
                    cu.logger,
 
                    "Children in subtree are: {:?}",
 
                    subtree_id_iter.clone().collect::<Vec<_>>()
 
                );
 
                SolutionStorage::new(subtree_id_iter)
 
            },
 
            spec_var_stream: cu.id_manager.new_spec_var_stream(),
 
            getter_buffer: Default::default(),
 
            deadline: timeout.map(|to| Instant::now() + to),
 
        };
 
        log!(cu.logger, "Round context structure initialized");
 

	
 
        // Explore all native branches eagerly. Find solutions, buffer messages, etc.
 
        log!(
 
            cu.logger,
 
            "Translating {} native batches into branches...",
 
            comm.native_batches.len()
 
        );
 
        let native_branch_spec_var = rctx.spec_var_stream.next();
 
        log!(cu.logger, "Native branch spec var is {:?}", native_branch_spec_var);
 
        let mut branching_native = BranchingNative { branches: Default::default() };
 
        'native_branches: for ((native_branch, index), branch_spec_val) in
 
            comm.native_batches.drain(..).zip(0..).zip(SpecVal::iter_domain())
 
        {
 
            let NativeBatch { to_get, to_put } = native_branch;
 
            let predicate = {
 
                let mut predicate = Predicate::default();
 
                // assign trues for ports that fire
 
                let firing_ports: HashSet<PortId> =
 
                    to_get.iter().chain(to_put.keys()).copied().collect();
 
                for &port in to_get.iter().chain(to_put.keys()) {
 
                    let var = cu.port_info.spec_var_for(port);
 
@@ -615,588 +617,628 @@ impl Connector {
 
                        } else {
 
                            log!(
 
                                cu.logger,
 
                                "Discarding suggestion {:?} from non-child endpoint idx {:?}",
 
                                &suggestion,
 
                                net_index
 
                            );
 
                        }
 
                    }
 
                    CommCtrlMsg::Announce { decision } => {
 
                        if Some(net_index) == comm.neighborhood.parent {
 
                            // adopt this decision
 
                            return Ok(decision);
 
                        } else {
 
                            log!(
 
                                cu.logger,
 
                                "Discarding announcement {:?} from non-parent endpoint idx {:?}",
 
                                &decision,
 
                                net_index
 
                            );
 
                        }
 
                    }
 
                }
 
            }
 
            log!(cu.logger, "Endpoint msg recv done");
 
        }
 
    }
 
    fn request_failure(
 
        cu: &mut ConnectorUnphased,
 
        comm: &mut ConnectorCommunication,
 
        parent: usize,
 
    ) -> Result<(), UnrecoverableSyncError> {
 
        log!(cu.logger, "Forwarding to my parent {:?}", parent);
 
        let suggestion = Decision::Failure;
 
        let msg = Msg::CommMsg(CommMsg {
 
            round_index: comm.round_index,
 
            contents: CommMsgContents::CommCtrl(CommCtrlMsg::Suggest { suggestion }),
 
        });
 
        comm.endpoint_manager.send_to_comms(parent, &msg)
 
    }
 
}
 
impl NativeBranch {
 
    fn is_ended(&self) -> bool {
 
        self.to_get.is_empty()
 
    }
 
}
 
impl BranchingNative {
 
    fn feed_msg(
 
        &mut self,
 
        cu: &mut ConnectorUnphased,
 
        solution_storage: &mut SolutionStorage,
 
        getter: PortId,
 
        send_payload_msg: &SendPayloadMsg,
 
    ) {
 
        log!(cu.logger, "feeding native getter {:?} {:?}", getter, &send_payload_msg);
 
        assert!(cu.port_info.polarities.get(&getter).copied() == Some(Getter));
 
        let mut draining = HashMap::default();
 
        let finished = &mut self.branches;
 
        std::mem::swap(&mut draining, finished);
 
        for (predicate, mut branch) in draining.drain() {
 
            log!(cu.logger, "visiting native branch {:?} with {:?}", &branch, &predicate);
 
            // check if this branch expects to receive it
 
            let var = cu.port_info.spec_var_for(getter);
 
            let mut feed_branch = |branch: &mut NativeBranch, predicate: &Predicate| {
 
                branch.to_get.remove(&getter);
 
                let was = branch.gotten.insert(getter, send_payload_msg.payload.clone());
 
                assert!(was.is_none());
 
                if branch.is_ended() {
 
                    log!(
 
                        cu.logger,
 
                        "new native solution with {:?} is_ended() with gotten {:?}",
 
                        &predicate,
 
                        &branch.gotten
 
                    );
 
                    let subtree_id = SubtreeId::LocalComponent(ComponentId::Native);
 
                    solution_storage.submit_and_digest_subtree_solution(
 
                        &mut *cu.logger,
 
                        subtree_id,
 
                        predicate.clone(),
 
                    );
 
                } else {
 
                    log!(
 
                        cu.logger,
 
                        "Fed native {:?} still has to_get {:?}",
 
                        &predicate,
 
                        &branch.to_get
 
                    );
 
                }
 
            };
 
            if predicate.query(var) != Some(SpecVal::FIRING) {
 
                // optimization. Don't bother trying this branch
 
                log!(
 
                    cu.logger,
 
                    "skipping branch with {:?} that doesn't want the message (fastpath)",
 
                    &predicate
 
                );
 
                Self::fold_into(finished, predicate, branch);
 
                Self::insert_branch_merging(finished, predicate, branch);
 
                continue;
 
            }
 
            use AssignmentUnionResult as Aur;
 
            match predicate.assignment_union(&send_payload_msg.predicate) {
 
                Aur::Nonexistant => {
 
                    // this branch does not receive the message
 
                    log!(
 
                        cu.logger,
 
                        "skipping branch with {:?} that doesn't want the message (slowpath)",
 
                        &predicate
 
                    );
 
                    Self::fold_into(finished, predicate, branch);
 
                    Self::insert_branch_merging(finished, predicate, branch);
 
                }
 
                Aur::Equivalent | Aur::FormerNotLatter => {
 
                    // retain the existing predicate, but add this payload
 
                    feed_branch(&mut branch, &predicate);
 
                    log!(cu.logger, "branch pred covers it! Accept the msg");
 
                    Self::fold_into(finished, predicate, branch);
 
                    Self::insert_branch_merging(finished, predicate, branch);
 
                }
 
                Aur::LatterNotFormer => {
 
                    // fork branch, give fork the message and payload predicate. original branch untouched
 
                    let mut branch2 = branch.clone();
 
                    let predicate2 = send_payload_msg.predicate.clone();
 
                    feed_branch(&mut branch2, &predicate2);
 
                    log!(
 
                        cu.logger,
 
                        "payload pred {:?} covers branch pred {:?}",
 
                        &predicate2,
 
                        &predicate
 
                    );
 
                    Self::fold_into(finished, predicate, branch);
 
                    Self::fold_into(finished, predicate2, branch2);
 
                    Self::insert_branch_merging(finished, predicate, branch);
 
                    Self::insert_branch_merging(finished, predicate2, branch2);
 
                }
 
                Aur::New(predicate2) => {
 
                    // fork branch, give fork the message and the new predicate. original branch untouched
 
                    let mut branch2 = branch.clone();
 
                    feed_branch(&mut branch2, &predicate2);
 
                    log!(
 
                        cu.logger,
 
                        "new subsuming pred created {:?}. forking and feeding",
 
                        &predicate2
 
                    );
 
                    Self::fold_into(finished, predicate, branch);
 
                    Self::fold_into(finished, predicate2, branch2);
 
                    Self::insert_branch_merging(finished, predicate, branch);
 
                    Self::insert_branch_merging(finished, predicate2, branch2);
 
                }
 
            }
 
        }
 
    }
 
    fn fold_into(
 
    fn insert_branch_merging(
 
        branches: &mut HashMap<Predicate, NativeBranch>,
 
        predicate: Predicate,
 
        mut branch: NativeBranch,
 
    ) {
 
        let e = branches.entry(predicate);
 
        use std::collections::hash_map::Entry;
 
        match e {
 
            Entry::Vacant(ev) => {
 
                // no existing branch present. We insert it no problem. (The most common case)
 
                ev.insert(branch);
 
            }
 
            Entry::Occupied(mut eo) => {
 
                let b = eo.get_mut();
 
                // Oh dear, there is already a branch with this predicate.
 
                // Rather than choosing either branch, we MERGE them.
 
                // This means taking the UNION of their .gotten and the INTERSECTION of their .to_get
 
                let old = eo.get_mut();
 
                for (k, v) in branch.gotten.drain() {
 
                    if b.gotten.insert(k, v).is_none() {
 
                        b.to_get.remove(&k);
 
                    if old.gotten.insert(k, v).is_none() {
 
                        // added a gotten element in `branch` not already in `old`
 
                        old.to_get.remove(&k);
 
                    }
 
                }
 
            }
 
        }
 
        // if let Some(prev) = branches.insert(predicate, branch)
 
    }
 
    fn collapse_with(self, logger: &mut dyn Logger, solution_predicate: &Predicate) -> RoundOk {
 
        log!(
 
            logger,
 
            "Collapsing native with {} branch preds {:?}",
 
            self.branches.len(),
 
            self.branches.keys()
 
        );
 
        for (branch_predicate, branch) in self.branches {
 
            log!(
 
                logger,
 
                "Considering native branch {:?} with to_get {:?} gotten {:?}",
 
                &branch_predicate,
 
                &branch.to_get,
 
                &branch.gotten
 
            );
 
            if branch.is_ended() && branch_predicate.assigns_subset(solution_predicate) {
 
                let NativeBranch { index, gotten, .. } = branch;
 
                log!(logger, "Collapsed native has gotten {:?}", &gotten);
 
                return RoundOk { batch_index: index, gotten };
 
            }
 
        }
 
        panic!("Native had no branches matching pred {:?}", solution_predicate);
 
    }
 
}
 
impl BranchingProtoComponent {
 
    fn drain_branches_to_blocked(
 
        cd: CyclicDrainer<Predicate, ProtoComponentBranch>,
 
        cu: &mut ConnectorUnphased,
 
        rctx: &mut RoundCtx,
 
        proto_component_id: ProtoComponentId,
 
        ports: &HashSet<PortId>,
 
    ) -> Result<(), UnrecoverableSyncError> {
 
        cd.cylic_drain(|mut predicate, mut branch, mut drainer| {
 
        cd.cyclic_drain(|mut predicate, mut branch, mut drainer| {
 
            let mut ctx = SyncProtoContext {
 
                untaken_choice: &mut branch.untaken_choice,
 
                logger: &mut *cu.logger,
 
                predicate: &predicate,
 
                port_info: &cu.port_info,
 
                inbox: &branch.inbox,
 
            };
 
            let blocker = branch.state.sync_run(&mut ctx, &cu.proto_description);
 
            log!(
 
                cu.logger,
 
                "Proto component with id {:?} branch with pred {:?} hit blocker {:?}",
 
                proto_component_id,
 
                &predicate,
 
                &blocker,
 
            );
 
            use SyncBlocker as B;
 
            match blocker {
 
                B::NondetChoice { n } => {
 
                    let var = rctx.spec_var_stream.next();
 
                    for val in SpecVal::iter_domain().take(n as usize) {
 
                        let pred = predicate.clone().inserted(var, val);
 
                        let mut branch_n = branch.clone();
 
                        branch_n.untaken_choice = Some(val.0);
 
                        drainer.add_input(pred, branch_n);
 
                    }
 
                }
 
                B::Inconsistent => {
 
                    // branch is inconsistent. throw it away
 
                    // EXPLICIT inconsistency
 
                    drop((predicate, branch));
 
                }
 
                B::SyncBlockEnd => {
 
                    // make concrete all variables
 
                    for &port in ports.iter() {
 
                        let var = cu.port_info.spec_var_for(port);
 
                        predicate.assigned.entry(var).or_insert(SpecVal::SILENT);
 
                    for port in ports.iter() {
 
                        let var = cu.port_info.spec_var_for(*port);
 
                        let should_have_fired = match cu.port_info.polarities.get(port).unwrap() {
 
                            Polarity::Getter => branch.inbox.contains_key(port),
 
                            Polarity::Putter => branch.did_put.contains(port),
 
                        };
 
                        let val = *predicate.assigned.entry(var).or_insert(SpecVal::SILENT);
 
                        let did_fire = val == SpecVal::FIRING;
 
                        if did_fire != should_have_fired {
 
                            log!(cu.logger, "Inconsistent wrt. port {:?} var {:?} val {:?} did_fire={}, should_have_fired={}", port, var, val, did_fire, should_have_fired);
 
                            // IMPLICIT inconsistency
 
                            drop((predicate, branch));
 
                            return Ok(());
 
                        }
 
                    }
 
                    // submit solution for this component
 
                    let subtree_id = SubtreeId::LocalComponent(ComponentId::Proto(proto_component_id));
 
                    rctx.solution_storage.submit_and_digest_subtree_solution(
 
                        &mut *cu.logger,
 
                        subtree_id,
 
                        predicate.clone(),
 
                    );
 
                    branch.ended = true;
 
                    // move to "blocked"
 
                    drainer.add_output(predicate, branch);
 
                }
 
                B::CouldntReadMsg(port) => {
 
                    // move to "blocked"
 
                    assert!(!branch.inbox.contains_key(&port));
 
                    drainer.add_output(predicate, branch);
 
                }
 
                B::CouldntCheckFiring(port) => {
 
                    // sanity check
 
                    let var = cu.port_info.spec_var_for(port);
 
                    assert!(predicate.query(var).is_none());
 
                    // keep forks in "unblocked"
 
                    drainer.add_input(predicate.clone().inserted(var, SpecVal::SILENT), branch.clone());
 
                    drainer.add_input(predicate.inserted(var, SpecVal::FIRING), branch);
 
                }
 
                B::PutMsg(putter, payload) => {
 
                    // sanity check
 
                    assert_eq!(Some(&Putter), cu.port_info.polarities.get(&putter));
 
                    // overwrite assignment
 
                    let var = cu.port_info.spec_var_for(putter);
 
                    let was = predicate.assigned.insert(var, SpecVal::FIRING);
 
                    if was == Some(SpecVal::SILENT) {
 
                        log!(cu.logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, putter, var);
 
                        // discard forever
 
                        drop((predicate, branch));
 
                    } else {
 
                        // keep in "unblocked"
 
                        branch.did_put.insert(putter);
 
                        log!(cu.logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, putter, var);
 
                        let msg = SendPayloadMsg { predicate: predicate.clone(), payload };
 
                        rctx.getter_buffer.putter_add(cu, putter, msg);
 
                        drainer.add_input(predicate, branch);
 
                    }
 
                }
 
            }
 
            Ok(())
 
        })
 
    }
 
    fn branch_merge_func(
 
        mut a: ProtoComponentBranch,
 
        b: &mut ProtoComponentBranch,
 
    ) -> ProtoComponentBranch {
 
        if b.ended && !a.ended {
 
            a.ended = true;
 
            std::mem::swap(&mut a, b);
 
        }
 
        a
 
    }
 
    fn feed_msg(
 
        &mut self,
 
        cu: &mut ConnectorUnphased,
 
        rctx: &mut RoundCtx,
 
        proto_component_id: ProtoComponentId,
 
        getter: PortId,
 
        send_payload_msg: &SendPayloadMsg,
 
    ) -> Result<(), UnrecoverableSyncError> {
 
        let logger = &mut *cu.logger;
 
        log!(
 
            logger,
 
            "feeding proto component {:?} getter {:?} {:?}",
 
            proto_component_id,
 
            getter,
 
            &send_payload_msg
 
        );
 
        let BranchingProtoComponent { branches, ports } = self;
 
        let mut unblocked = HashMap::default();
 
        let mut blocked = HashMap::default();
 
        // partition drain from branches -> {unblocked, blocked}
 
        log!(logger, "visiting {} blocked branches...", branches.len());
 
        for (predicate, mut branch) in branches.drain() {
 
            if branch.ended {
 
                log!(logger, "Skipping ended branch with {:?}", &predicate);
 
                blocked.insert(predicate, branch);
 
                continue;
 
            }
 
            use AssignmentUnionResult as Aur;
 
            log!(logger, "visiting branch with pred {:?}", &predicate);
 
            match predicate.assignment_union(&send_payload_msg.predicate) {
 
                Aur::Nonexistant => {
 
                    // this branch does not receive the message
 
                    log!(logger, "skipping branch");
 
                    blocked.insert(predicate, branch);
 
                }
 
                Aur::Equivalent | Aur::FormerNotLatter => {
 
                    // retain the existing predicate, but add this payload
 
                    log!(logger, "feeding this branch without altering its predicate");
 
                    branch.feed_msg(getter, send_payload_msg.payload.clone());
 
                    unblocked.insert(predicate, branch);
 
                }
 
                Aur::LatterNotFormer => {
 
                    // fork branch, give fork the message and payload predicate. original branch untouched
 
                    log!(logger, "Forking this branch, giving it the predicate of the msg");
 
                    let mut branch2 = branch.clone();
 
                    let predicate2 = send_payload_msg.predicate.clone();
 
                    branch2.feed_msg(getter, send_payload_msg.payload.clone());
 
                    blocked.insert(predicate, branch);
 
                    unblocked.insert(predicate2, branch2);
 
                }
 
                Aur::New(predicate2) => {
 
                    // fork branch, give fork the message and the new predicate. original branch untouched
 
                    log!(logger, "Forking this branch with new predicate {:?}", &predicate2);
 
                    let mut branch2 = branch.clone();
 
                    branch2.feed_msg(getter, send_payload_msg.payload.clone());
 
                    blocked.insert(predicate, branch);
 
                    unblocked.insert(predicate2, branch2);
 
                }
 
            }
 
        }
 
        log!(logger, "blocked {:?} unblocked {:?}", blocked.len(), unblocked.len());
 
        // drain from unblocked --> blocked
 
        let mut swap = HashMap::default();
 
        let cd = CyclicDrainer::new(&mut unblocked, &mut swap, &mut blocked);
 
        BranchingProtoComponent::drain_branches_to_blocked(
 
            cd,
 
            cu,
 
            rctx,
 
            proto_component_id,
 
            ports,
 
        )?;
 
        // swap the blocked branches back
 
        std::mem::swap(&mut blocked, branches);
 
        log!(cu.logger, "component settles down with branches: {:?}", branches.keys());
 
        Ok(())
 
    }
 
    fn collapse_with(self, solution_predicate: &Predicate) -> ProtoComponent {
 
        let BranchingProtoComponent { ports, branches } = self;
 
        for (branch_predicate, branch) in branches {
 
            if branch.ended && branch_predicate.assigns_subset(solution_predicate) {
 
                let ProtoComponentBranch { state, .. } = branch;
 
                return ProtoComponent { state, ports };
 
            }
 
        }
 
        panic!("ProtoComponent had no branches matching pred {:?}", solution_predicate);
 
    }
 
    fn initial(ProtoComponent { state, ports }: ProtoComponent) -> Self {
 
        let branch = ProtoComponentBranch {
 
            inbox: Default::default(),
 
            did_put: Default::default(),
 
            state,
 
            ended: false,
 
            untaken_choice: None,
 
        };
 
        Self { ports, branches: hashmap! { Predicate::default() => branch  } }
 
    }
 
}
 
impl SolutionStorage {
 
    fn new(subtree_ids: impl Iterator<Item = SubtreeId>) -> Self {
 
        let mut subtree_id_to_index: HashMap<SubtreeId, usize> = Default::default();
 
        let mut subtree_solutions = vec![];
 
        for id in subtree_ids {
 
            subtree_id_to_index.insert(id, subtree_solutions.len());
 
            subtree_solutions.push(Default::default())
 
        }
 
        Self {
 
            subtree_solutions,
 
            subtree_id_to_index,
 
            old_local: Default::default(),
 
            new_local: Default::default(),
 
        }
 
    }
 
    fn is_clear(&self) -> bool {
 
        self.subtree_id_to_index.is_empty()
 
            && self.subtree_solutions.is_empty()
 
            && self.old_local.is_empty()
 
            && self.new_local.is_empty()
 
    }
 
    fn clear(&mut self) {
 
        self.subtree_id_to_index.clear();
 
        self.subtree_solutions.clear();
 
        self.old_local.clear();
 
        self.new_local.clear();
 
    }
 
    fn reset(&mut self, subtree_ids: impl Iterator<Item = SubtreeId>) {
 
        self.subtree_id_to_index.clear();
 
        self.subtree_solutions.clear();
 
        self.old_local.clear();
 
        self.new_local.clear();
 
        for key in subtree_ids {
 
            self.subtree_id_to_index.insert(key, self.subtree_solutions.len());
 
            self.subtree_solutions.push(Default::default())
 
        }
 
    }
 
    pub(crate) fn iter_new_local_make_old(&mut self) -> impl Iterator<Item = Predicate> + '_ {
 
        let Self { old_local, new_local, .. } = self;
 
        new_local.drain().map(move |local| {
 
            old_local.insert(local.clone());
 
            local
 
        })
 
    }
 
    pub(crate) fn submit_and_digest_subtree_solution(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        subtree_id: SubtreeId,
 
        predicate: Predicate,
 
    ) {
 
        log!(logger, "NEW COMPONENT SOLUTION {:?} {:?}", subtree_id, &predicate);
 
        let index = self.subtree_id_to_index[&subtree_id];
 
        let left = 0..index;
 
        let right = (index + 1)..self.subtree_solutions.len();
 

	
 
        let Self { subtree_solutions, new_local, old_local, .. } = self;
 
        let was_new = subtree_solutions[index].insert(predicate.clone());
 
        if was_new {
 
            let set_visitor = left.chain(right).map(|index| &subtree_solutions[index]);
 
            Self::elaborate_into_new_local_rec(
 
                logger,
 
                predicate,
 
                set_visitor,
 
                old_local,
 
                new_local,
 
            );
 
        }
 
    }
 
    fn elaborate_into_new_local_rec<'a, 'b>(
 
        logger: &mut dyn Logger,
 
        partial: Predicate,
 
        mut set_visitor: impl Iterator<Item = &'b HashSet<Predicate>> + Clone,
 
        old_local: &'b HashSet<Predicate>,
 
        new_local: &'a mut HashSet<Predicate>,
 
    ) {
 
        if let Some(set) = set_visitor.next() {
 
            // incomplete solution. keep traversing
 
            for pred in set.iter() {
 
                if let Some(elaborated) = pred.union_with(&partial) {
 
                    Self::elaborate_into_new_local_rec(
 
                        logger,
 
                        elaborated,
 
                        set_visitor.clone(),
 
                        old_local,
 
                        new_local,
 
                    )
 
                }
 
            }
 
        } else {
 
            // recursive stop condition. `partial` is a local subtree solution
 
            if !old_local.contains(&partial) {
 
                // ... and it hasn't been found before
 
                log!(logger, "storing NEW LOCAL SOLUTION {:?}", &partial);
 
                new_local.insert(partial);
 
            }
 
        }
 
    }
 
}
 
impl GetterBuffer {
 
    fn len(&self) -> usize {
 
        self.getters_and_sends.len()
 
    }
 
    fn pop(&mut self) -> Option<(PortId, SendPayloadMsg)> {
 
        self.getters_and_sends.pop()
 
    }
 
    fn getter_add(&mut self, getter: PortId, msg: SendPayloadMsg) {
 
        self.getters_and_sends.push((getter, msg));
 
    }
 
    fn putter_add(&mut self, cu: &mut ConnectorUnphased, putter: PortId, msg: SendPayloadMsg) {
 
        if let Some(&getter) = cu.port_info.peers.get(&putter) {
 
            self.getter_add(getter, msg);
 
        } else {
 
            log!(cu.logger, "Putter {:?} has no known peer!", putter);
 
            panic!("Putter {:?} has no known peer!");
 
        }
 
    }
 
}
 
impl SyncProtoContext<'_> {
 
    pub(crate) fn is_firing(&mut self, port: PortId) -> Option<bool> {
 
        let var = self.port_info.spec_var_for(port);
 
        self.predicate.query(var).map(SpecVal::is_firing)
 
    }
 
    pub(crate) fn read_msg(&mut self, port: PortId) -> Option<&Payload> {
 
        self.inbox.get(&port)
 
    }
 
    pub(crate) fn take_choice(&mut self) -> Option<u16> {
 
        self.untaken_choice.take()
 
    }
 
}
 
impl<'a, K: Eq + Hash, V> CyclicDrainInner<'a, K, V> {
 
    fn add_input(&mut self, k: K, v: V) {
 
        self.swap.insert(k, v);
 
    }
 
    fn merge_input_with<F: FnMut(V, &mut V) -> V>(&mut self, k: K, v: V, mut func: F) {
 
        use std::collections::hash_map::Entry;
 
        let e = self.swap.entry(k);
 
        match e {
 
            Entry::Vacant(ev) => {
 
                ev.insert(v);
 
            }
 
            Entry::Occupied(mut eo) => {
 
                let old = eo.get_mut();
 
                *old = func(v, old);
 
            }
 
        }
 
    }
 
    fn add_output(&mut self, k: K, v: V) {
 
        self.output.insert(k, v);
 
    }
 
}
 
impl NonsyncProtoContext<'_> {
 
    pub fn new_component(&mut self, moved_ports: HashSet<PortId>, state: ComponentState) {
 
        // called by a PROTO COMPONENT. moves its own ports.
 
        // 1. sanity check: this component owns these ports
 
        log!(
 
            self.logger,
 
            "Component {:?} added new component with state {:?}, moving ports {:?}",
 
            self.proto_component_id,
 
            &state,
 
            &moved_ports
 
        );
 
        assert!(self.proto_component_ports.is_subset(&moved_ports));
 
        // 2. remove ports from old component & update port->route
 
        let new_id = self.id_manager.new_proto_component_id();
 
        for port in moved_ports.iter() {
 
            self.proto_component_ports.remove(port);
 
            self.port_info.routes.insert(*port, Route::LocalComponent(ComponentId::Proto(new_id)));
 
        }
 
        // 3. create a new component
 
        self.unrun_components.push((new_id, ProtoComponent { state, ports: moved_ports }));
 
    }
 
    pub fn new_port_pair(&mut self) -> [PortId; 2] {
 
        // adds two new associated ports, related to each other, and exposed to the proto component
 
        let [o, i] = [self.id_manager.new_port_id(), self.id_manager.new_port_id()];
 
        self.proto_component_ports.insert(o);
 
        self.proto_component_ports.insert(i);
 
        // {polarity, peer, route} known. {} unknown.
 
        self.port_info.polarities.insert(o, Putter);
 
        self.port_info.polarities.insert(i, Getter);
 
        self.port_info.peers.insert(o, i);
 
        self.port_info.peers.insert(i, o);
 
        let route = Route::LocalComponent(ComponentId::Proto(self.proto_component_id));
 
        self.port_info.routes.insert(o, route);
 
        self.port_info.routes.insert(i, route);
 
        log!(
 
            self.logger,
 
            "Component {:?} port pair (out->in) {:?} -> {:?}",
 
            self.proto_component_id,
 
            o,
 
            i
 
        );
 
        [o, i]
 
    }
 
}
 
impl ProtoComponentBranch {
 
    fn feed_msg(&mut self, getter: PortId, payload: Payload) {
 
        let was = self.inbox.insert(getter, payload);
 
        assert!(was.is_none())
 
    }
 
}
 
impl<'a, K: Eq + Hash + 'static, V: 'static> CyclicDrainer<'a, K, V> {
 
    fn new(
 
        input: &'a mut HashMap<K, V>,
 
        swap: &'a mut HashMap<K, V>,
 
        output: &'a mut HashMap<K, V>,
 
    ) -> Self {
 
        Self { input, inner: CyclicDrainInner { swap, output } }
 
    }
 
    fn cylic_drain<E>(
 
    fn cyclic_drain<E>(
 
        self,
 
        mut func: impl FnMut(K, V, CyclicDrainInner<'_, K, V>) -> Result<(), E>,
 
    ) -> Result<(), E> {
 
        let Self { input, inner: CyclicDrainInner { swap, output } } = self;
 
        // assert!(swap.is_empty());
 
        while !input.is_empty() {
 
            for (k, v) in input.drain() {
 
                func(k, v, CyclicDrainInner { swap, output })?
 
            }
 
            std::mem::swap(input, swap);
 
        }
 
        Ok(())
 
    }
 
}
src/runtime/tests.rs
Show inline comments
 
use crate as reowolf;
 
use crossbeam_utils::thread::scope;
 
use reowolf::{
 
    error::*,
 
    EndpointPolarity::{Active, Passive},
 
    Polarity::{Getter, Putter},
 
    *,
 
};
 
use std::{fs::File, net::SocketAddr, path::Path, sync::Arc, time::Duration};
 
//////////////////////////////////////////
 
const MS100: Option<Duration> = Some(Duration::from_millis(100));
 
const MS300: Option<Duration> = Some(Duration::from_millis(300));
 
const SEC1: Option<Duration> = Some(Duration::from_secs(1));
 
const SEC5: Option<Duration> = Some(Duration::from_secs(5));
 
const SEC15: Option<Duration> = Some(Duration::from_secs(15));
 
fn next_test_addr() -> SocketAddr {
 
    use std::{
 
        net::{Ipv4Addr, SocketAddrV4},
 
        sync::atomic::{AtomicU16, Ordering::SeqCst},
 
    };
 
    static TEST_PORT: AtomicU16 = AtomicU16::new(5_000);
 
    let port = TEST_PORT.fetch_add(1, SeqCst);
 
    SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into()
 
}
 
fn file_logged_connector(connector_id: ConnectorId, dir_path: &Path) -> Connector {
 
    file_logged_configured_connector(connector_id, dir_path, MINIMAL_PROTO.clone())
 
}
 
fn file_logged_configured_connector(
 
    connector_id: ConnectorId,
 
    dir_path: &Path,
 
    pd: Arc<ProtocolDescription>,
 
) -> Connector {
 
    let _ = std::fs::create_dir(dir_path); // we will check failure soon
 
    let path = dir_path.join(format!("cid_{:?}.txt", connector_id));
 
    let file = File::create(path).unwrap();
 
    let file_logger = Box::new(FileLogger::new(connector_id, file));
 
    Connector::new(file_logger, MINIMAL_PROTO.clone(), connector_id, 8)
 
    Connector::new(file_logger, pd, connector_id, 8)
 
}
 
static MINIMAL_PDL: &'static [u8] = b"
 
primitive together(in ia, in ib, out oa, out ob){
 
  while(true) synchronous() {
 
    if(fires(ia)) {
 
      put(oa, get(ia));
 
      put(ob, get(ib));
 
    }
 
  } 
 
}
 
";
 
lazy_static::lazy_static! {
 
    static ref MINIMAL_PROTO: Arc<ProtocolDescription> = {
 
        Arc::new(reowolf::ProtocolDescription::parse(MINIMAL_PDL).unwrap())
 
    };
 
}
 
static TEST_MSG_BYTES: &'static [u8] = b"hello";
 
lazy_static::lazy_static! {
 
    static ref TEST_MSG: Payload = {
 
        Payload::from(TEST_MSG_BYTES)
 
    };
 
}
 
fn new_u8_buffer(cap: usize) -> Vec<u8> {
 
    let mut v = Vec::with_capacity(cap);
 
    // Safe! len will cover owned bytes in valid state
 
    unsafe { v.set_len(cap) }
 
    v
 
}
 
//////////////////////////////////////////
 

	
 
#[test]
 
fn basic_connector() {
 
    Connector::new(Box::new(DummyLogger), MINIMAL_PROTO.clone(), 0, 0);
 
}
 

	
 
#[test]
 
fn basic_logged_connector() {
 
    let test_log_path = Path::new("./logs/basic_logged_connector");
 
    file_logged_connector(0, test_log_path);
 
}
 

	
 
#[test]
 
fn new_port_pair() {
 
    let test_log_path = Path::new("./logs/new_port_pair");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, _] = c.new_port_pair();
 
    let [_, _] = c.new_port_pair();
 
}
 

	
 
#[test]
 
fn new_sync() {
 
    let test_log_path = Path::new("./logs/new_sync");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [o, i] = c.new_port_pair();
 
    c.add_component(b"sync", &[i, o]).unwrap();
 
}
 

	
 
#[test]
 
fn new_net_port() {
 
    let test_log_path = Path::new("./logs/new_net_port");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let sock_addrs = [next_test_addr()];
 
    let _ = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
    let _ = c.new_net_port(Putter, sock_addrs[0], Active).unwrap();
 
}
 

	
 
#[test]
 
fn trivial_connect() {
 
    let test_log_path = Path::new("./logs/trivial_connect");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    c.connect(SEC1).unwrap();
 
}
 

	
 
#[test]
 
fn single_node_connect() {
 
    let test_log_path = Path::new("./logs/single_node_connect");
 
    let sock_addrs = [next_test_addr()];
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let _ = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
    let _ = c.new_net_port(Putter, sock_addrs[0], Active).unwrap();
 
    c.connect(SEC1).unwrap();
 
}
 

	
 
#[test]
 
fn minimal_net_connect() {
 
    let test_log_path = Path::new("./logs/minimal_net_connect");
 
    let sock_addrs = [next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let _ = c.new_net_port(Getter, sock_addrs[0], Active).unwrap();
 
            c.connect(SEC1).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let _ = c.new_net_port(Putter, sock_addrs[0], Passive).unwrap();
 
@@ -700,137 +707,176 @@ fn solo_udp_get_fail() {
 
#[test]
 
fn reowolf_to_udp() {
 
    let test_log_path = Path::new("./logs/reowolf_to_udp");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    let barrier = std::sync::Barrier::new(2);
 
    scope(|s| {
 
        s.spawn(|_| {
 
            barrier.wait();
 
            // reowolf thread
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let [p0, _] = c.new_udp_port(sock_addrs[0], sock_addrs[1]).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.put(p0, TEST_MSG.clone()).unwrap();
 
            c.sync(MS300).unwrap();
 
            barrier.wait();
 
        });
 
        s.spawn(|_| {
 
            barrier.wait();
 
            // udp thread
 
            let udp = std::net::UdpSocket::bind(sock_addrs[1]).unwrap();
 
            udp.connect(sock_addrs[0]).unwrap();
 
            let mut buf = new_u8_buffer(256);
 
            let len = udp.recv(&mut buf).unwrap();
 
            assert_eq!(TEST_MSG_BYTES, &buf[0..len]);
 
            barrier.wait();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn udp_to_reowolf() {
 
    let test_log_path = Path::new("./logs/udp_to_reowolf");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    let barrier = std::sync::Barrier::new(2);
 
    scope(|s| {
 
        s.spawn(|_| {
 
            barrier.wait();
 
            // reowolf thread
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let [_, p0] = c.new_udp_port(sock_addrs[0], sock_addrs[1]).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.get(p0).unwrap();
 
            c.sync(SEC5).unwrap();
 
            assert_eq!(c.gotten(p0).unwrap().as_slice(), TEST_MSG_BYTES);
 
            barrier.wait();
 
        });
 
        s.spawn(|_| {
 
            barrier.wait();
 
            // udp thread
 
            let udp = std::net::UdpSocket::bind(sock_addrs[1]).unwrap();
 
            udp.connect(sock_addrs[0]).unwrap();
 
            for _ in 0..15 {
 
                udp.send(TEST_MSG_BYTES).unwrap();
 
                std::thread::sleep(MS100.unwrap());
 
            }
 
            barrier.wait();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn udp_reowolf_swap() {
 
    let test_log_path = Path::new("./logs/udp_reowolf_swap");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    let barrier = std::sync::Barrier::new(2);
 
    scope(|s| {
 
        s.spawn(|_| {
 
            barrier.wait();
 
            // reowolf thread
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let [p0, p1] = c.new_udp_port(sock_addrs[0], sock_addrs[1]).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.put(p0, TEST_MSG.clone()).unwrap();
 
            c.get(p1).unwrap();
 
            c.sync(SEC5).unwrap();
 
            assert_eq!(c.gotten(p1).unwrap().as_slice(), TEST_MSG_BYTES);
 
            barrier.wait();
 
        });
 
        s.spawn(|_| {
 
            barrier.wait();
 
            // udp thread
 
            let udp = std::net::UdpSocket::bind(sock_addrs[1]).unwrap();
 
            udp.connect(sock_addrs[0]).unwrap();
 
            let mut buf = new_u8_buffer(256);
 
            udp.send(TEST_MSG_BYTES).unwrap();
 
            let len = udp.recv(&mut buf).unwrap();
 
            assert_eq!(TEST_MSG_BYTES, &buf[0..len]);
 
            barrier.wait();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn pres_3() {
 
    let test_log_path = Path::new("./logs/pres_3");
 
fn example_pres_3() {
 
    let test_log_path = Path::new("./logs/example_pres_3");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            // "amy"
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let p0 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap();
 
            let p1 = c.new_net_port(Putter, sock_addrs[1], Active).unwrap();
 
            c.connect(SEC1).unwrap();
 
            // put {A} and FAIL
 
            c.put(p0, TEST_MSG.clone()).unwrap();
 
            c.sync(SEC1).unwrap_err();
 
            // put {B} and FAIL
 
            c.put(p1, TEST_MSG.clone()).unwrap();
 
            c.sync(SEC1).unwrap_err();
 
            // put {A, B} and SUCCEED
 
            c.put(p0, TEST_MSG.clone()).unwrap();
 
            c.put(p1, TEST_MSG.clone()).unwrap();
 
            c.sync(SEC1).unwrap();
 
        });
 
        s.spawn(|_| {
 
            // "bob"
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let p0 = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
            let p1 = c.new_net_port(Getter, sock_addrs[1], Passive).unwrap();
 
            c.connect(SEC1).unwrap();
 
            for _ in 0..2 {
 
                // get {A, B} and FAIL
 
                c.get(p0).unwrap();
 
                c.get(p1).unwrap();
 
                c.sync(SEC1).unwrap_err();
 
            }
 
            // get {A, B} and SUCCEED
 
            c.get(p0).unwrap();
 
            c.get(p1).unwrap();
 
            c.sync(SEC1).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn ac_not_b() {
 
    let test_log_path = Path::new("./logs/ac_not_b");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            // "amy"
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let p0 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap();
 
            let p1 = c.new_net_port(Putter, sock_addrs[1], Active).unwrap();
 
            c.connect(SEC1).unwrap();
 

	
 
            // put both A and B
 
            c.put(p0, TEST_MSG.clone()).unwrap();
 
            c.put(p1, TEST_MSG.clone()).unwrap();
 
            c.sync(SEC1).unwrap_err();
 
        });
 
        s.spawn(|_| {
 
            // "bob"
 
            let pdl = b"
 
            primitive ac_not_b(in a, in b, out c){
 
                // forward A to C but keep B silent
 
                synchronous{ put(c, get(a)); }
 
            }";
 
            let pd = Arc::new(reowolf::ProtocolDescription::parse(pdl).unwrap());
 
            let mut c = file_logged_configured_connector(1, test_log_path, pd);
 
            let p0 = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
            let p1 = c.new_net_port(Getter, sock_addrs[1], Passive).unwrap();
 
            let [a, b] = c.new_port_pair();
 
            c.add_component(b"ac_not_b", &[p0, p1, a]).unwrap();
 
            c.connect(SEC1).unwrap();
 

	
 
            c.get(b).unwrap();
 
            c.sync(SEC1).unwrap_err();
 
        });
 
    })
 
    .unwrap();
 
}
0 comments (0 inline, 0 general)