Changeset - d9774c9084d7
[Not reviewed]
4 9 2
Christopher Esterhuyse - 5 years ago 2020-06-30 10:45:50
christopher.esterhuyse@gmail.com
more logging, testing, examples and bugfixes: (1) components remember whether they have submitted a solution; only those are considered when selecting a branch at the end of a round, (2) retrying active connections during setup phase were using the wrong index for looking up their TODO structure, (3) recently failed connections are deregistered from mio and reregistered after the retry process restarts s.t. they don't produce a storm of mio events
15 files changed with 257 insertions and 1046 deletions:
0 comments (0 inline, 0 general)
examples/a_swap/amy.c
Show inline comments
 
new file 100644
 
#include <stdio.h>
 
#include <string.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 

	
 
int main(int argc, char** argv) {
 
    if(argc != 3) {
 
        printf("Expected arg[1] and arg[2] for use as addr str\n");
 
        exit(1);
 
    }            
 
    char * pdl_ptr = buffer_pdl("eg_protocols.pdl");
 
    size_t pdl_len = strlen(pdl_ptr);
 
    Arc_ProtocolDescription * pd = protocol_description_parse(pdl_ptr, pdl_len);
 
    char logpath[] = "./a_amy_log.txt";
 
    Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    
 
    PortId ports[6]; 
 
    connector_add_port_pair(c, &ports[0], &ports[1]);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    connector_add_net_port(c, &ports[2], argv[1], strlen(argv[1]), Getter, Passive);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    connector_add_net_port(c, &ports[3], argv[2], strlen(argv[2]), Putter, Active);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    connector_add_port_pair(c, &ports[4], &ports[5]);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    // native {0,1,2,3,4,5}
 
    
 
    connector_add_component(c, "together", 8, &ports[1], 4);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    // native {0,5} together {1,2,3,4}
 
    
 
    connector_connect(c, 4000);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	connector_put_bytes(c, ports[0], "hi", 2);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	connector_get(c, ports[5]);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
    connector_sync(c, 1000);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	size_t msg_len;
 
	const char * msg_ptr = connector_gotten_bytes(c, ports[5], &msg_len);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	printf("Got msg `%.*s`\n", msg_len, msg_ptr);
 
    
 
    protocol_description_destroy(pd);
 
    connector_destroy(c);
 
    return 0;
 
}
 
\ No newline at end of file
examples/a_swap/bob.c
Show inline comments
 
new file 100644
 
#include <stdio.h>
 
#include <string.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 

	
 
int main(int argc, char** argv) {
 
    if(argc != 3) {
 
        printf("Expected arg[1] and arg[2] for use as addr str\n");
 
        exit(1);
 
    }            
 
    char * pdl_ptr = buffer_pdl("eg_protocols.pdl");
 
    size_t pdl_len = strlen(pdl_ptr);
 
    Arc_ProtocolDescription * pd = protocol_description_parse(pdl_ptr, pdl_len);
 
    char logpath[] = "./a_bob_log.txt";
 
    Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    
 
    PortId ports[6]; 
 
    connector_add_port_pair(c, &ports[0], &ports[1]);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    connector_add_net_port(c, &ports[2], argv[1], strlen(argv[1]), Getter, Passive);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    connector_add_net_port(c, &ports[3], argv[2], strlen(argv[2]), Putter, Active);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    connector_add_port_pair(c, &ports[4], &ports[5]);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    // native {0,1,2,3,4,5}
 
    
 
    connector_add_component(c, "together", 8, &ports[1], 4);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    // native {0,5} together {1,2,3,4}
 
    
 
    connector_connect(c, 4000);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	connector_put_bytes(c, ports[0], "hi", 2);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	connector_get(c, ports[5]);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
    connector_sync(c, 1000);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	size_t msg_len;
 
	const char * msg_ptr = connector_gotten_bytes(c, ports[5], &msg_len);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	printf("Got msg `%.*s`\n", msg_len, msg_ptr);
 
    
 
    protocol_description_destroy(pd);
 
    connector_destroy(c);
 
    return 0;
 
}
 
\ No newline at end of file
examples/eg_protocols.pdl
Show inline comments
 
primitive foo(){}
 
\ No newline at end of file
 
primitive together(in ia, in ib, out oa, out ob){
 
  while(true) synchronous() {
 
    if(fires(ia)) {
 
      put(oa, get(ia));
 
      put(ob, get(ib));
 
    }
 
  }	
 
}
 
\ No newline at end of file
examples/make.py
Show inline comments
 
import os, glob, subprocess
 
import os, glob, subprocess, time
 
script_path = os.path.dirname(os.path.realpath(__file__));
 
for c_file in glob.glob(script_path + "/*/*.c", recursive=False):
 
  print("compiling", c_file)
 
  args = [
 
    "gcc",          # compiler
 
    "-L",           # lib path flag
 
    "./",           # where to look for libs
 
    "-lreowolf_rs", # add lib called "reowolf_rs"
 
    "-Wl,-R./",     # pass -R flag to linker: produce relocatable object
 
    c_file,         # input source file
 
    "-o",           # output flag
 
    c_file[:-2]     # output filename
 
  ];
 
  subprocess.run(args);
 
  subprocess.run(args)
 
input("Blocking until newline...");
examples/reowolf_rs.dll
Show inline comments
 
deleted file
 
binary diff not shown
src/runtime/communication.rs
Show inline comments
 
@@ -5,48 +5,49 @@ use crate::common::*;
 
struct BranchingNative {
 
    branches: HashMap<Predicate, NativeBranch>,
 
}
 
#[derive(Clone, Debug)]
 
struct NativeBranch {
 
    index: usize,
 
    gotten: HashMap<PortId, Payload>,
 
    to_get: HashSet<PortId>,
 
}
 
#[derive(Debug)]
 
struct SolutionStorage {
 
    old_local: HashSet<Predicate>,
 
    new_local: HashSet<Predicate>,
 
    // this pair acts as Route -> HashSet<Predicate> which is friendlier to iteration
 
    subtree_solutions: Vec<HashSet<Predicate>>,
 
    subtree_id_to_index: HashMap<Route, usize>,
 
}
 
#[derive(Debug)]
 
struct BranchingProtoComponent {
 
    ports: HashSet<PortId>,
 
    branches: HashMap<Predicate, ProtoComponentBranch>,
 
}
 
#[derive(Debug, Clone)]
 
struct ProtoComponentBranch {
 
    ended: bool,
 
    inbox: HashMap<PortId, Payload>,
 
    state: ComponentState,
 
}
 
struct CyclicDrainer<'a, K: Eq + Hash, V> {
 
    input: &'a mut HashMap<K, V>,
 
    inner: CyclicDrainInner<'a, K, V>,
 
}
 
struct CyclicDrainInner<'a, K: Eq + Hash, V> {
 
    swap: &'a mut HashMap<K, V>,
 
    output: &'a mut HashMap<K, V>,
 
}
 
trait PayloadMsgSender {
 
    fn putter_send(
 
        &mut self,
 
        cu: &mut ConnectorUnphased,
 
        putter: PortId,
 
        msg: SendPayloadMsg,
 
    ) -> Result<(), SyncError>;
 
}
 
trait ReplaceBoolTrue {
 
    fn replace_with_true(&mut self) -> bool;
 
}
 
impl ReplaceBoolTrue for bool {
 
    fn replace_with_true(&mut self) -> bool {
 
@@ -298,93 +299,94 @@ impl Connector {
 
        );
 
        for &child in comm.neighborhood.children.iter() {
 
            comm.endpoint_manager.send_to_comms(child, &msg)?;
 
        }
 
        let ret = match decision {
 
            Decision::Failure => {
 
                // dropping {branching_proto_components, branching_native}
 
                Err(Se::RoundFailure)
 
            }
 
            Decision::Success(predicate) => {
 
                // commit changes to component states
 
                cu.proto_components.clear();
 
                cu.proto_components.extend(
 
                    // consume branching proto components
 
                    branching_proto_components
 
                        .into_iter()
 
                        .map(|(id, bpc)| (id, bpc.collapse_with(&predicate))),
 
                );
 
                log!(
 
                    cu.logger,
 
                    "End round with (updated) component states {:?}",
 
                    cu.proto_components.keys()
 
                );
 
                // consume native
 
                Ok(Some(branching_native.collapse_with(&predicate)))
 
                Ok(Some(branching_native.collapse_with(&mut *cu.logger, &predicate)))
 
            }
 
        };
 
        log!(cu.logger, "Sync round ending! Cleaning up");
 
        // dropping {solution_storage, payloads_to_get}
 
        ret
 
    }
 

	
 
    fn sync_reach_decision(
 
        cu: &mut ConnectorUnphased,
 
        comm: &mut ConnectorCommunication,
 
        branching_native: &mut BranchingNative,
 
        branching_proto_components: &mut HashMap<ProtoComponentId, BranchingProtoComponent>,
 
        mut solution_storage: SolutionStorage,
 
        mut payloads_to_get: Vec<(PortId, SendPayloadMsg)>,
 
        mut deadline: Option<Instant>,
 
    ) -> Result<Decision, SyncError> {
 
        let mut already_requested_failure = false;
 
        if branching_native.branches.is_empty() {
 
            log!(cu.logger, "Native starts with no branches! Failure!");
 
            match comm.neighborhood.parent {
 
                Some(parent) => {
 
                    if already_requested_failure.replace_with_true() {
 
                        Self::request_failure(cu, comm, parent)?
 
                    } else {
 
                        log!(cu.logger, "Already requested failure");
 
                    }
 
                }
 
                None => {
 
                    log!(cu.logger, "No parent. Deciding on failure");
 
                    return Ok(Decision::Failure);
 
                }
 
            }
 
        }
 
        log!(cu.logger, "Done translating native batches into branches");
 

	
 
        // run all proto components to their sync blocker
 
        log!(
 
            cu.logger,
 
            "Running all {} proto components to their sync blocker...",
 
            branching_proto_components.len()
 
        );
 
        for (&proto_component_id, proto_component) in branching_proto_components.iter_mut() {
 
            let BranchingProtoComponent { ports, branches } = proto_component;
 
            let mut swap = HashMap::default();
 
            // initially, no components have .ended==true
 
            let mut blocked = HashMap::default();
 
            // drain from branches --> blocked
 
            let cd = CyclicDrainer::new(branches, &mut swap, &mut blocked);
 
            BranchingProtoComponent::drain_branches_to_blocked(
 
                cd,
 
                cu,
 
                &mut solution_storage,
 
                &mut payloads_to_get,
 
                proto_component_id,
 
                ports,
 
            )?;
 
            // swap the blocked branches back
 
            std::mem::swap(&mut blocked, branches);
 
            if branches.is_empty() {
 
                log!(cu.logger, "{:?} has become inconsistent!", proto_component_id);
 
                if let Some(parent) = comm.neighborhood.parent {
 
                    if already_requested_failure.replace_with_true() {
 
                        Self::request_failure(cu, comm, parent)?
 
                    } else {
 
                        log!(cu.logger, "Already requested failure");
 
                    }
 
                } else {
 
                    log!(cu.logger, "As the leader, deciding on timeout");
 
                    return Ok(Decision::Failure);
 
@@ -630,48 +632,54 @@ impl Connector {
 
    }
 
}
 
impl BranchingNative {
 
    fn feed_msg(
 
        &mut self,
 
        cu: &mut ConnectorUnphased,
 
        solution_storage: &mut SolutionStorage,
 
        getter: PortId,
 
        send_payload_msg: &SendPayloadMsg,
 
    ) {
 
        log!(cu.logger, "feeding native getter {:?} {:?}", getter, &send_payload_msg);
 
        assert!(cu.port_info.polarities.get(&getter).copied() == Some(Getter));
 
        let mut draining = HashMap::default();
 
        let finished = &mut self.branches;
 
        std::mem::swap(&mut draining, finished);
 
        for (predicate, mut branch) in draining.drain() {
 
            log!(cu.logger, "visiting native branch {:?} with {:?}", &branch, &predicate);
 
            // check if this branch expects to receive it
 
            let var = cu.port_info.firing_var_for(getter);
 
            let mut feed_branch = |branch: &mut NativeBranch, predicate: &Predicate| {
 
                let was = branch.gotten.insert(getter, send_payload_msg.payload.clone());
 
                assert!(was.is_none());
 
                branch.to_get.remove(&getter);
 
                if branch.to_get.is_empty() {
 
                    log!(
 
                        cu.logger,
 
                        "new native solution with {:?} (to_get.is_empty()) with gotten {:?}",
 
                        &predicate,
 
                        &branch.gotten
 
                    );
 
                    let route = Route::LocalComponent(ComponentId::Native);
 
                    solution_storage.submit_and_digest_subtree_solution(
 
                        &mut *cu.logger,
 
                        route,
 
                        predicate.clone(),
 
                    );
 
                }
 
            };
 
            if predicate.query(var) != Some(true) {
 
                // optimization. Don't bother trying this branch
 
                log!(
 
                    cu.logger,
 
                    "skipping branch with {:?} that doesn't want the message (fastpath)",
 
                    &predicate
 
                );
 
                finished.insert(predicate, branch);
 
                continue;
 
            }
 
            use CommonSatResult as Csr;
 
            match predicate.common_satisfier(&send_payload_msg.predicate) {
 
                Csr::Nonexistant => {
 
                    // this branch does not receive the message
 
                    log!(
 
                        cu.logger,
 
@@ -694,52 +702,59 @@ impl BranchingNative {
 
                    log!(
 
                        cu.logger,
 
                        "payload pred {:?} covers branch pred {:?}",
 
                        &predicate2,
 
                        &predicate
 
                    );
 
                    finished.insert(predicate, branch);
 
                    finished.insert(predicate2, branch2);
 
                }
 
                Csr::New(predicate2) => {
 
                    // fork branch, give fork the message and the new predicate. original branch untouched
 
                    let mut branch2 = branch.clone();
 
                    feed_branch(&mut branch2, &predicate2);
 
                    log!(
 
                        cu.logger,
 
                        "new subsuming pred created {:?}. forking and feeding",
 
                        &predicate2
 
                    );
 
                    finished.insert(predicate, branch);
 
                    finished.insert(predicate2, branch2);
 
                }
 
            }
 
        }
 
    }
 
    fn collapse_with(self, solution_predicate: &Predicate) -> RoundOk {
 
    fn collapse_with(self, logger: &mut dyn Logger, solution_predicate: &Predicate) -> RoundOk {
 
        log!(
 
            logger,
 
            "Collapsing native with {} branch preds {:?}",
 
            self.branches.len(),
 
            self.branches.keys()
 
        );
 
        for (branch_predicate, branch) in self.branches {
 
            if solution_predicate.satisfies(&branch_predicate) {
 
            if branch.to_get.is_empty() && solution_predicate.satisfies(&branch_predicate) {
 
                let NativeBranch { index, gotten, .. } = branch;
 
                log!(logger, "Collapsed native has gotten {:?}", &gotten);
 
                return RoundOk { batch_index: index, gotten };
 
            }
 
        }
 
        panic!("Native had no branches matching pred {:?}", solution_predicate);
 
    }
 
}
 
// |putter, m| {
 
//     let getter = *cu.port_info.peers.get(&putter).unwrap();
 
//     payloads_to_get.push((getter, m));
 
// },
 
impl BranchingProtoComponent {
 
    fn drain_branches_to_blocked(
 
        cd: CyclicDrainer<Predicate, ProtoComponentBranch>,
 
        cu: &mut ConnectorUnphased,
 
        solution_storage: &mut SolutionStorage,
 
        payload_msg_sender: &mut impl PayloadMsgSender,
 
        proto_component_id: ProtoComponentId,
 
        ports: &HashSet<PortId>,
 
    ) -> Result<(), SyncError> {
 
        cd.cylic_drain(|mut predicate, mut branch, mut drainer| {
 
            let mut ctx = SyncProtoContext {
 
                logger: &mut *cu.logger,
 
                predicate: &predicate,
 
                port_info: &cu.port_info,
 
@@ -750,48 +765,49 @@ impl BranchingProtoComponent {
 
                cu.logger,
 
                "Proto component with id {:?} branch with pred {:?} hit blocker {:?}",
 
                proto_component_id,
 
                &predicate,
 
                &blocker,
 
            );
 
            use SyncBlocker as B;
 
            match blocker {
 
                B::Inconsistent => {
 
                    // branch is inconsistent. throw it away
 
                    drop((predicate, branch));
 
                }
 
                B::SyncBlockEnd => {
 
                    // make concrete all variables
 
                    for &port in ports.iter() {
 
                        let var = cu.port_info.firing_var_for(port);
 
                        predicate.assigned.entry(var).or_insert(false);
 
                    }
 
                    // submit solution for this component
 
                    solution_storage.submit_and_digest_subtree_solution(
 
                        &mut *cu.logger,
 
                        Route::LocalComponent(ComponentId::Proto(proto_component_id)),
 
                        predicate.clone(),
 
                    );
 
                    branch.ended = true;
 
                    // move to "blocked"
 
                    drainer.add_output(predicate, branch);
 
                }
 
                B::CouldntReadMsg(port) => {
 
                    // move to "blocked"
 
                    assert!(!branch.inbox.contains_key(&port));
 
                    drainer.add_output(predicate, branch);
 
                }
 
                B::CouldntCheckFiring(port) => {
 
                    // sanity check
 
                    let var = cu.port_info.firing_var_for(port);
 
                    assert!(predicate.query(var).is_none());
 
                    // keep forks in "unblocked"
 
                    drainer.add_input(predicate.clone().inserted(var, false), branch.clone());
 
                    drainer.add_input(predicate.inserted(var, true), branch);
 
                }
 
                B::PutMsg(putter, payload) => {
 
                    // sanity check
 
                    assert_eq!(Some(&Putter), cu.port_info.polarities.get(&putter));
 
                    // overwrite assignment
 
                    let var = cu.port_info.firing_var_for(putter);
 
                    let was = predicate.assigned.insert(var, true);
 
                    if was == Some(false) {
 
                        log!(cu.logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, putter, var);
 
@@ -811,48 +827,53 @@ impl BranchingProtoComponent {
 
    }
 
    fn feed_msg(
 
        &mut self,
 
        cu: &mut ConnectorUnphased,
 
        solution_storage: &mut SolutionStorage,
 
        proto_component_id: ProtoComponentId,
 
        payload_msg_sender: &mut impl PayloadMsgSender,
 
        getter: PortId,
 
        send_payload_msg: &SendPayloadMsg,
 
    ) -> Result<(), SyncError> {
 
        let logger = &mut *cu.logger;
 
        log!(
 
            logger,
 
            "feeding proto component {:?} getter {:?} {:?}",
 
            proto_component_id,
 
            getter,
 
            &send_payload_msg
 
        );
 
        let BranchingProtoComponent { branches, ports } = self;
 
        let mut unblocked = HashMap::default();
 
        let mut blocked = HashMap::default();
 
        // partition drain from branches -> {unblocked, blocked}
 
        log!(logger, "visiting {} blocked branches...", branches.len());
 
        for (predicate, mut branch) in branches.drain() {
 
            if branch.ended {
 
                log!(logger, "Skipping ended branch with {:?}", &predicate);
 
                blocked.insert(predicate, branch);
 
                continue;
 
            }
 
            use CommonSatResult as Csr;
 
            log!(logger, "visiting branch with pred {:?}", &predicate);
 
            match predicate.common_satisfier(&send_payload_msg.predicate) {
 
                Csr::Nonexistant => {
 
                    // this branch does not receive the message
 
                    log!(logger, "skipping branch");
 
                    blocked.insert(predicate, branch);
 
                }
 
                Csr::Equivalent | Csr::FormerNotLatter => {
 
                    // retain the existing predicate, but add this payload
 
                    log!(logger, "feeding this branch without altering its predicate");
 
                    branch.feed_msg(getter, send_payload_msg.payload.clone());
 
                    unblocked.insert(predicate, branch);
 
                }
 
                Csr::LatterNotFormer => {
 
                    // fork branch, give fork the message and payload predicate. original branch untouched
 
                    log!(logger, "Forking this branch, giving it the predicate of the msg");
 
                    let mut branch2 = branch.clone();
 
                    let predicate2 = send_payload_msg.predicate.clone();
 
                    branch2.feed_msg(getter, send_payload_msg.payload.clone());
 
                    blocked.insert(predicate, branch);
 
                    unblocked.insert(predicate2, branch2);
 
                }
 
                Csr::New(predicate2) => {
 
@@ -864,57 +885,57 @@ impl BranchingProtoComponent {
 
                    unblocked.insert(predicate2, branch2);
 
                }
 
            }
 
        }
 
        log!(logger, "blocked {:?} unblocked {:?}", blocked.len(), unblocked.len());
 
        // drain from unblocked --> blocked
 
        let mut swap = HashMap::default();
 
        let cd = CyclicDrainer::new(&mut unblocked, &mut swap, &mut blocked);
 
        BranchingProtoComponent::drain_branches_to_blocked(
 
            cd,
 
            cu,
 
            solution_storage,
 
            payload_msg_sender,
 
            proto_component_id,
 
            ports,
 
        )?;
 
        // swap the blocked branches back
 
        std::mem::swap(&mut blocked, branches);
 
        log!(cu.logger, "component settles down with branches: {:?}", branches.keys());
 
        Ok(())
 
    }
 
    fn collapse_with(self, solution_predicate: &Predicate) -> ProtoComponent {
 
        let BranchingProtoComponent { ports, branches } = self;
 
        for (branch_predicate, branch) in branches {
 
            if branch_predicate.satisfies(solution_predicate) {
 
            if branch.ended && branch_predicate.satisfies(solution_predicate) {
 
                let ProtoComponentBranch { state, .. } = branch;
 
                return ProtoComponent { state, ports };
 
            }
 
        }
 
        panic!("ProtoComponent had no branches matching pred {:?}", solution_predicate);
 
    }
 
    fn initial(ProtoComponent { state, ports }: ProtoComponent) -> Self {
 
        let branch = ProtoComponentBranch { inbox: Default::default(), state };
 
        let branch = ProtoComponentBranch { inbox: Default::default(), state, ended: false };
 
        Self { ports, branches: hashmap! { Predicate::default() => branch  } }
 
    }
 
}
 
impl SolutionStorage {
 
    fn new(routes: impl Iterator<Item = Route>) -> Self {
 
        let mut subtree_id_to_index: HashMap<Route, usize> = Default::default();
 
        let mut subtree_solutions = vec![];
 
        for key in routes {
 
            subtree_id_to_index.insert(key, subtree_solutions.len());
 
            subtree_solutions.push(Default::default())
 
        }
 
        Self {
 
            subtree_solutions,
 
            subtree_id_to_index,
 
            old_local: Default::default(),
 
            new_local: Default::default(),
 
        }
 
    }
 
    fn is_clear(&self) -> bool {
 
        self.subtree_id_to_index.is_empty()
 
            && self.subtree_solutions.is_empty()
 
            && self.old_local.is_empty()
 
            && self.new_local.is_empty()
 
    }
src/runtime/endpoints.rs
Show inline comments
 
use super::*;
 

	
 
struct MonitoredReader<R: Read> {
 
    bytes: usize,
 
    r: R,
 
}
 
#[derive(Debug)]
 
enum TryRecyAnyError {
 
    Timeout,
 
    PollFailed,
 
    EndpointError { error: EndpointError, index: usize },
 
}
 

	
 
/////////////////////
 
impl Endpoint {
 
    pub(super) fn try_recv<T: serde::de::DeserializeOwned>(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        _logger: &mut dyn Logger,
 
    ) -> Result<Option<T>, EndpointError> {
 
        use EndpointError::*;
 
        // populate inbox as much as possible
 
        'read_loop: loop {
 
            let res = self.stream.read_to_end(&mut self.inbox);
 
            endptlog!(logger, "Stream read to end {:?}", &res);
 
            match res {
 
                Err(e) if would_block(&e) => break 'read_loop,
 
                Ok(0) => break 'read_loop,
 
                Ok(_) => (),
 
                Err(_e) => return Err(BrokenEndpoint),
 
            }
 
        }
 
        let mut monitored = MonitoredReader::from(&self.inbox[..]);
 
        match bincode::deserialize_from(&mut monitored) {
 
            Ok(msg) => {
 
                let msg_size = monitored.bytes_read();
 
                self.inbox.drain(0..(msg_size.try_into().unwrap()));
 
                Ok(Some(msg))
 
            }
 
            Err(e) => match *e {
 
                bincode::ErrorKind::Io(k) if k.kind() == std::io::ErrorKind::UnexpectedEof => {
 
                    Ok(None)
 
                }
src/runtime/ffi.rs
Show inline comments
 
@@ -220,49 +220,51 @@ pub unsafe extern "C" fn connector_add_net_port(
 
    port: *mut PortId,
 
    addr_str_ptr: *const u8,
 
    addr_str_len: usize,
 
    port_polarity: Polarity,
 
    endpoint_polarity: EndpointPolarity,
 
) -> ErrorCode {
 
    StoredError::tl_clear();
 
    let addr_bytes = &*slice_from_parts(addr_str_ptr, addr_str_len);
 
    let addr_str = match std::str::from_utf8(addr_bytes) {
 
        Ok(addr_str) => addr_str,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            return -1;
 
        }
 
    };
 
    let sock_address: SocketAddr = match addr_str.parse() {
 
        Ok(addr) => addr,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            return -2;
 
        }
 
    };
 
    match connector.new_net_port(port_polarity, sock_address, endpoint_polarity) {
 
        Ok(p) => {
 
            port.write(p);
 
            if !port.is_null() {
 
                port.write(p);
 
            }
 
            0
 
        }
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            -3
 
        }
 
    }
 
}
 

	
 
/// Connects this connector to the distributed system of connectors reachable through endpoints,
 
/// Usable in setup state, and changes the state to communication.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_connect(
 
    connector: &mut Connector,
 
    timeout_millis: i64,
 
) -> ErrorCode {
 
    StoredError::tl_clear();
 
    let option_timeout_millis: Option<u64> = TryFrom::try_from(timeout_millis).ok();
 
    let timeout = option_timeout_millis.map(Duration::from_millis);
 
    match connector.connect(timeout) {
 
        Ok(()) => 0,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            -1
 
@@ -339,55 +341,57 @@ pub unsafe extern "C" fn connector_next_batch(connector: &mut Connector) -> isiz
 
            StoredError::tl_debug_store(&err);
 
            -1
 
        }
 
    }
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_sync(connector: &mut Connector, timeout_millis: i64) -> isize {
 
    StoredError::tl_clear();
 
    let option_timeout_millis: Option<u64> = TryFrom::try_from(timeout_millis).ok();
 
    let timeout = option_timeout_millis.map(Duration::from_millis);
 
    match connector.sync(timeout) {
 
        Ok(n) => n as isize,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            -1
 
        }
 
    }
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_gotten_bytes(
 
    connector: &mut Connector,
 
    port: PortId,
 
    len: *mut usize,
 
    out_len: *mut usize,
 
) -> *const u8 {
 
    StoredError::tl_clear();
 
    match connector.gotten(port) {
 
        Ok(payload_borrow) => {
 
            let slice = payload_borrow.as_slice();
 
            len.write(slice.len());
 
            if !out_len.is_null() {
 
                out_len.write(slice.len());
 
            }
 
            slice.as_ptr()
 
        }
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            std::ptr::null()
 
        }
 
    }
 
}
 

	
 
// #[no_mangle]
 
// unsafe extern "C" fn connector_gotten_payload(
 
//     connector: &mut Connector,
 
//     port: PortId,
 
// ) -> *const Payload {
 
//     StoredError::tl_clear();
 
//     match connector.gotten(port) {
 
//         Ok(payload_borrow) => payload_borrow,
 
//         Err(err) => {
 
//             StoredError::tl_debug_store(&err);
 
//             std::ptr::null()
 
//         }
 
//     }
 
// }
 

	
src/runtime/logging.rs
Show inline comments
 
use super::*;
 

	
 
impl FileLogger {
 
    pub fn new(connector_id: ConnectorId, file: std::fs::File) -> Self {
 
        Self(connector_id, file)
 
    }
 
}
 
impl VecLogger {
 
    pub fn new(connector_id: ConnectorId) -> Self {
 
        Self(connector_id, Default::default())
 
    }
 
}
 
/////////////////
 
impl Logger for DummyLogger {
 
    fn line_writer(&mut self) -> &mut dyn std::io::Write {
 
        self
 
    }
 
}
 
impl Logger for VecLogger {
 
    fn line_writer(&mut self) -> &mut dyn std::io::Write {
 
        let _ = write!(&mut self.1, "CID({}): ", self.0);
 
        let _ = write!(&mut self.1, "CID({}) at {:?} ", self.0, Instant::now());
 
        self
 
    }
 
}
 
impl Logger for FileLogger {
 
    fn line_writer(&mut self) -> &mut dyn std::io::Write {
 
        let _ = write!(&mut self.1, "CID({}): ", self.0);
 
        let _ = write!(&mut self.1, "CID({}) at {:?} ", self.0, Instant::now());
 
        &mut self.1
 
    }
 
}
 
///////////////////
 
impl Drop for VecLogger {
 
    fn drop(&mut self) {
 
        let stdout = std::io::stderr();
 
        let mut lock = stdout.lock();
 
        writeln!(lock, "--- DROP LOG DUMP ---").unwrap();
 
        let _ = std::io::Write::write(&mut lock, self.1.as_slice());
 
    }
 
}
 
impl std::io::Write for VecLogger {
 
    fn flush(&mut self) -> Result<(), std::io::Error> {
 
        Ok(())
 
    }
 
    fn write(&mut self, data: &[u8]) -> Result<usize, std::io::Error> {
 
        self.1.extend_from_slice(data);
 
        Ok(data.len())
 
    }
 
}
 
impl std::io::Write for DummyLogger {
 
    fn flush(&mut self) -> Result<(), std::io::Error> {
 
        Ok(())
src/runtime/mod.rs
Show inline comments
 
@@ -196,99 +196,103 @@ pub struct NonsyncProtoContext<'a> {
 
    port_info: &'a mut PortInfo,
 
    id_manager: &'a mut IdManager,
 
    proto_component_ports: &'a mut HashSet<PortId>,
 
    unrun_components: &'a mut Vec<(ProtoComponentId, ProtoComponent)>,
 
}
 
pub struct SyncProtoContext<'a> {
 
    logger: &'a mut dyn Logger,
 
    predicate: &'a Predicate,
 
    port_info: &'a PortInfo,
 
    inbox: &'a HashMap<PortId, Payload>,
 
}
 
////////////////
 
pub fn would_block(err: &std::io::Error) -> bool {
 
    err.kind() == std::io::ErrorKind::WouldBlock
 
}
 
impl<T: std::cmp::Ord> VecSet<T> {
 
    fn new(mut vec: Vec<T>) -> Self {
 
        vec.sort();
 
        vec.dedup();
 
        Self { vec }
 
    }
 
    fn contains(&self, element: &T) -> bool {
 
        self.vec.binary_search(element).is_ok()
 
    }
 
    fn insert(&mut self, element: T) -> bool {
 
        match self.vec.binary_search(&element) {
 
            Ok(_) => false,
 
            Err(index) => {
 
                self.vec.insert(index, element);
 
                true
 
            }
 
        }
 
    }
 
    // fn insert(&mut self, element: T) -> bool {
 
    //     match self.vec.binary_search(&element) {
 
    //         Ok(_) => false,
 
    //         Err(index) => {
 
    //             self.vec.insert(index, element);
 
    //             true
 
    //         }
 
    //     }
 
    // }
 
    fn iter(&self) -> std::slice::Iter<T> {
 
        self.vec.iter()
 
    }
 
}
 
impl PortInfo {
 
    fn firing_var_for(&self, port: PortId) -> FiringVar {
 
        FiringVar(match self.polarities.get(&port).unwrap() {
 
            Getter => port,
 
            Putter => *self.peers.get(&port).unwrap(),
 
        })
 
    }
 
}
 
impl IdManager {
 
    fn new(connector_id: ConnectorId) -> Self {
 
        Self {
 
            connector_id,
 
            port_suffix_stream: Default::default(),
 
            proto_component_suffix_stream: Default::default(),
 
        }
 
    }
 
    fn new_port_id(&mut self) -> PortId {
 
        Id { connector_id: self.connector_id, u32_suffix: self.port_suffix_stream.next() }.into()
 
    }
 
    fn new_proto_component_id(&mut self) -> ProtoComponentId {
 
        Id {
 
            connector_id: self.connector_id,
 
            u32_suffix: self.proto_component_suffix_stream.next(),
 
        }
 
        .into()
 
    }
 
}
 
impl Drop for Connector {
 
    fn drop(&mut self) {
 
        log!(&mut *self.unphased.logger, "Connector dropping. Goodbye!");
 
    }
 
}
 
impl Connector {
 
    fn random_id() -> ConnectorId {
 
        type Bytes8 = [u8; std::mem::size_of::<ConnectorId>()];
 
        let mut bytes = Bytes8::default();
 
        getrandom::getrandom(&mut bytes).unwrap();
 
        unsafe { std::mem::transmute::<Bytes8, ConnectorId>(bytes) }
 
        unsafe {
 
            let mut bytes = std::mem::MaybeUninit::<Bytes8>::uninit();
 
            // getrandom is the canonical crate for a small, secure rng
 
            getrandom::getrandom(&mut *bytes.as_mut_ptr()).unwrap();
 
            // safe! representations of all valid Byte8 values are valid ConnectorId values
 
            std::mem::transmute::<_, _>(bytes.assume_init())
 
        }
 
    }
 
    pub fn swap_logger(&mut self, mut new_logger: Box<dyn Logger>) -> Box<dyn Logger> {
 
        std::mem::swap(&mut self.unphased.logger, &mut new_logger);
 
        new_logger
 
    }
 
    pub fn get_logger(&mut self) -> &mut dyn Logger {
 
        &mut *self.unphased.logger
 
    }
 
    pub fn new_port_pair(&mut self) -> [PortId; 2] {
 
        let cu = &mut self.unphased;
 
        // adds two new associated ports, related to each other, and exposed to the native
 
        let [o, i] = [cu.id_manager.new_port_id(), cu.id_manager.new_port_id()];
 
        cu.native_ports.insert(o);
 
        cu.native_ports.insert(i);
 
        // {polarity, peer, route} known. {} unknown.
 
        cu.port_info.polarities.insert(o, Putter);
 
        cu.port_info.polarities.insert(i, Getter);
 
        cu.port_info.peers.insert(o, i);
 
        cu.port_info.peers.insert(i, o);
 
        let route = Route::LocalComponent(ComponentId::Native);
 
        cu.port_info.routes.insert(o, route);
 
        cu.port_info.routes.insert(i, route);
 
        log!(cu.logger, "Added port pair (out->in) {:?} -> {:?}", o, i);
 
        [o, i]
src/runtime/setup.rs
Show inline comments
 
@@ -125,137 +125,158 @@ fn new_endpoint_manager(
 
    ) -> Result<Todo, ConnectError> {
 
        let todo_endpoint = if let EndpointPolarity::Active = endpoint_setup.endpoint_polarity {
 
            let mut stream = TcpStream::connect(endpoint_setup.sock_addr)
 
                .expect("mio::TcpStream connect should not fail!");
 
            poll.registry().register(&mut stream, token, BOTH).unwrap();
 
            TodoEndpoint::Endpoint(Endpoint { stream, inbox: vec![] })
 
        } else {
 
            let mut listener = TcpListener::bind(endpoint_setup.sock_addr)
 
                .map_err(|_| BindFailed(endpoint_setup.sock_addr))?;
 
            poll.registry().register(&mut listener, token, BOTH).unwrap();
 
            TodoEndpoint::Accepting(listener)
 
        };
 
        Ok(Todo {
 
            todo_endpoint,
 
            local_port,
 
            sent_local_port: false,
 
            recv_peer_port: None,
 
            endpoint_setup: endpoint_setup.clone(),
 
        })
 
    }
 
    ////////////////////////////////////////////
 

	
 
    // 1. Start to construct EndpointManager
 
    const WAKER_TOKEN: Token = Token(usize::MAX);
 
    const WAKER_PERIOD: Duration = Duration::from_millis(90);
 
    const WAKER_PERIOD: Duration = Duration::from_millis(300);
 

	
 
    assert!(endpoint_setups.len() < WAKER_TOKEN.0); // using MAX usize as waker token
 

	
 
    let mut waker_continue_signal: Option<Arc<AtomicBool>> = None;
 
    let mut poll = Poll::new().map_err(|_| PollInitFailed)?;
 
    let mut events = Events::with_capacity(endpoint_setups.len() * 2 + 4);
 
    let mut polled_undrained = IndexSet::default();
 
    let mut delayed_messages = vec![];
 

	
 
    // 2. create a registered (TcpListener/Endpoint) for passive / active respectively
 
    let mut todos = endpoint_setups
 
        .iter()
 
        .enumerate()
 
        .map(|(index, (local_port, endpoint_setup))| {
 
            init_todo(Token(index), *local_port, endpoint_setup, &mut poll)
 
        })
 
        .collect::<Result<Vec<Todo>, ConnectError>>()?;
 

	
 
    // 3. Using poll to drive progress:
 
    //    - accept an incoming connection for each TcpListener (turning them into endpoints too)
 
    //    - for each endpoint, send the local PortId
 
    //    - for each endpoint, recv the peer's PortId, and
 

	
 
    // all in connect_failed are NOT registered with Poll
 
    let mut connect_failed: HashSet<usize> = Default::default();
 

	
 
    let mut setup_incomplete: HashSet<usize> = (0..todos.len()).collect();
 
    while !setup_incomplete.is_empty() {
 
        let remaining = if let Some(deadline) = deadline {
 
            Some(deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?)
 
        } else {
 
            None
 
        };
 
        poll.poll(&mut events, remaining).map_err(|_| PollFailed)?;
 
        for event in events.iter() {
 
            let token = event.token();
 
            let Token(index) = token;
 
            let todo: &mut Todo = &mut todos[index];
 
            if token == WAKER_TOKEN {
 
                log!(logger, "Notification from waker");
 
                log!(
 
                    logger,
 
                    "Notification from waker. connect_failed is {:?}",
 
                    connect_failed.iter()
 
                );
 
                assert!(waker_continue_signal.is_some());
 
                for index in connect_failed.drain() {
 
                    let todo: &mut Todo = &mut todos[index];
 
                    log!(
 
                        logger,
 
                        "Restarting connection with endpoint {:?} {:?}",
 
                        index,
 
                        todo.endpoint_setup.sock_addr
 
                    );
 
                    match &mut todo.todo_endpoint {
 
                        TodoEndpoint::Endpoint(endpoint) => {
 
                            let mut new_stream = TcpStream::connect(todo.endpoint_setup.sock_addr)
 
                                .expect("mio::TcpStream connect should not fail!");
 
                            poll.registry().deregister(&mut endpoint.stream).unwrap();
 
                            std::mem::swap(&mut endpoint.stream, &mut new_stream);
 
                            poll.registry().register(&mut endpoint.stream, token, BOTH).unwrap();
 
                            poll.registry()
 
                                .register(&mut endpoint.stream, Token(index), BOTH)
 
                                .unwrap();
 
                        }
 
                        _ => unreachable!(),
 
                    }
 
                }
 
            } else {
 
                let todo: &mut Todo = &mut todos[index];
 
                // FIRST try convert this into an endpoint
 
                if let TodoEndpoint::Accepting(listener) = &mut todo.todo_endpoint {
 
                    match listener.accept() {
 
                        Ok((mut stream, peer_addr)) => {
 
                            poll.registry().deregister(listener).unwrap();
 
                            poll.registry().register(&mut stream, token, BOTH).unwrap();
 
                            log!(
 
                                logger,
 
                                "Endpoint[{}] accepted a connection from {:?}",
 
                                index,
 
                                peer_addr
 
                            );
 
                            let endpoint = Endpoint { stream, inbox: vec![] };
 
                            todo.todo_endpoint = TodoEndpoint::Endpoint(endpoint);
 
                        }
 
                        Err(e) if would_block(&e) => {
 
                            log!(logger, "Spurious wakeup on listener {:?}", index)
 
                        }
 
                        Err(_) => {
 
                            log!(logger, "accept() failure on index {}", index);
 
                            return Err(AcceptFailed(listener.local_addr().unwrap()));
 
                        }
 
                    }
 
                }
 
                if let TodoEndpoint::Endpoint(endpoint) = &mut todo.todo_endpoint {
 
                    if event.is_error() {
 
                        if todo.endpoint_setup.endpoint_polarity == EndpointPolarity::Passive {
 
                            // right now you cannot retry an acceptor.
 
                            return Err(AcceptFailed(endpoint.stream.local_addr().unwrap()));
 
                        }
 
                        connect_failed.insert(index);
 
                        if connect_failed.insert(index) {
 
                            log!(
 
                                logger,
 
                                "Connection failed for {:?}. List is {:?}",
 
                                index,
 
                                connect_failed.iter()
 
                            );
 
                            poll.registry().deregister(&mut endpoint.stream).unwrap();
 
                        } else {
 
                            // spurious wakeup
 
                            continue;
 
                        }
 

	
 
                        if waker_continue_signal.is_none() {
 
                            log!(logger, "First connect failure. Starting waker thread");
 
                            let waker =
 
                                Arc::new(mio::Waker::new(poll.registry(), WAKER_TOKEN).unwrap());
 
                            let wcs = Arc::new(AtomicBool::from(true));
 
                            let wcs2 = wcs.clone();
 
                            std::thread::spawn(move || {
 
                                while wcs2.load(std::sync::atomic::Ordering::SeqCst) {
 
                                    std::thread::sleep(WAKER_PERIOD);
 
                                    let _ = waker.wake();
 
                                }
 
                            });
 
                            waker_continue_signal = Some(wcs);
 
                        }
 
                        continue;
 
                    }
 
                    if connect_failed.contains(&index) {
 
                        // spurious wakeup
 
                        continue;
 
                    }
 
                    if !setup_incomplete.contains(&index) {
 
                        // spurious wakeup
 
                        continue;
 
                    }
src/runtime/tests.rs
Show inline comments
 
@@ -5,51 +5,61 @@ use reowolf::{
 
    EndpointPolarity::{Active, Passive},
 
    Polarity::{Getter, Putter},
 
    *,
 
};
 
use std::{fs::File, net::SocketAddr, path::Path, sync::Arc, time::Duration};
 
//////////////////////////////////////////
 
fn next_test_addr() -> SocketAddr {
 
    use std::{
 
        net::{Ipv4Addr, SocketAddrV4},
 
        sync::atomic::{AtomicU16, Ordering::SeqCst},
 
    };
 
    static TEST_PORT: AtomicU16 = AtomicU16::new(5_000);
 
    let port = TEST_PORT.fetch_add(1, SeqCst);
 
    SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into()
 
}
 

	
 
fn file_logged_connector(connector_id: ConnectorId, dir_path: &Path) -> Connector {
 
    let _ = std::fs::create_dir(dir_path); // we will check failure soon
 
    let path = dir_path.join(format!("cid_{:?}.txt", connector_id));
 
    let file = File::create(path).unwrap();
 
    let file_logger = Box::new(FileLogger::new(connector_id, file));
 
    Connector::new(file_logger, MINIMAL_PROTO.clone(), connector_id, 8)
 
}
 

	
 
static MINIMAL_PDL: &'static [u8] = b"
 
primitive together(in ia, in ib, out oa, out ob){
 
  while(true) synchronous() {
 
    if(fires(ia)) {
 
      put(oa, get(ia));
 
      put(ob, get(ib));
 
    }
 
  } 
 
}
 
";
 
lazy_static::lazy_static! {
 
    static ref MINIMAL_PROTO: Arc<ProtocolDescription> = {
 
        Arc::new(reowolf::ProtocolDescription::parse(b"").unwrap())
 
        Arc::new(reowolf::ProtocolDescription::parse(MINIMAL_PDL).unwrap())
 
    };
 
}
 
lazy_static::lazy_static! {
 
    static ref TEST_MSG: Payload = {
 
        Payload::from(b"hello" as &[u8])
 
    };
 
}
 

	
 
//////////////////////////////////////////
 

	
 
#[test]
 
fn basic_connector() {
 
    Connector::new(Box::new(DummyLogger), MINIMAL_PROTO.clone(), 0, 0);
 
}
 

	
 
#[test]
 
fn basic_logged_connector() {
 
    let test_log_path = Path::new("./logs/basic_logged_connector");
 
    file_logged_connector(0, test_log_path);
 
}
 

	
 
#[test]
 
fn new_port_pair() {
 
    let test_log_path = Path::new("./logs/new_port_pair");
 
@@ -496,24 +506,76 @@ fn chain_connect() {
 
            c.new_net_port(Putter, sock_addrs[3], Passive).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, test_log_path);
 
            c.new_net_port(Getter, sock_addrs[3], Active).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn net_self_loop() {
 
    let test_log_path = Path::new("./logs/net_self_loop");
 
    let sock_addr = next_test_addr();
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let p = c.new_net_port(Putter, sock_addr, Active).unwrap();
 
    let g = c.new_net_port(Getter, sock_addr, Passive).unwrap();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.put(p, TEST_MSG.clone()).unwrap();
 
    c.get(g).unwrap();
 
    c.sync(Some(Duration::from_millis(500))).unwrap();
 
}
 

	
 
#[test]
 
fn nobody_connects_active() {
 
    let test_log_path = Path::new("./logs/nobody_connects_active");
 
    let sock_addr = next_test_addr();
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let _g = c.new_net_port(Getter, sock_addr, Active).unwrap();
 
    c.connect(Some(Duration::from_secs(5))).unwrap_err();
 
}
 
#[test]
 
fn nobody_connects_passive() {
 
    let test_log_path = Path::new("./logs/nobody_connects_passive");
 
    let sock_addr = next_test_addr();
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let _g = c.new_net_port(Getter, sock_addr, Passive).unwrap();
 
    c.connect(Some(Duration::from_secs(5))).unwrap_err();
 
}
 

	
 
#[test]
 
fn together() {
 
    let test_log_path = Path::new("./logs/together");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let [p0, p1] = c.new_port_pair();
 
            let p2 = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
            let p3 = c.new_net_port(Putter, sock_addrs[1], Active).unwrap();
 
            let [p4, p5] = c.new_port_pair();
 
            c.add_component(b"together", &[p1, p2, p3, p4]).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.put(p0, TEST_MSG.clone()).unwrap();
 
            c.get(p5).unwrap();
 
            c.sync(Some(Duration::from_millis(500))).unwrap();
 
            c.gotten(p5).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let [p0, p1] = c.new_port_pair();
 
            let p2 = c.new_net_port(Getter, sock_addrs[1], Passive).unwrap();
 
            let p3 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap();
 
            let [p4, p5] = c.new_port_pair();
 
            c.add_component(b"together", &[p1, p2, p3, p4]).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.put(p0, TEST_MSG.clone()).unwrap();
 
            c.get(p5).unwrap();
 
            c.sync(Some(Duration::from_millis(500))).unwrap();
 
            c.gotten(p5).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
src/test/connector.rs
Show inline comments
 
deleted file
src/test/mod.rs
Show inline comments
 
deleted file
src/test/setup.rs
Show inline comments
 
deleted file
0 comments (0 inline, 0 general)