Changeset - d9774c9084d7
[Not reviewed]
4 9 2
Christopher Esterhuyse - 5 years ago 2020-06-30 10:45:50
christopher.esterhuyse@gmail.com
more logging, testing, examples and bugfixes: (1) components remember whether they have submitted a solution; only those are considered when selecting a branch at the end of a round, (2) retrying active connections during setup phase were using the wrong index for looking up their TODO structure, (3) recently failed connections are deregistered from mio and reregistered after the retry process restarts s.t. they don't produce a storm of mio events
15 files changed with 257 insertions and 1046 deletions:
0 comments (0 inline, 0 general)
examples/a_swap/amy.c
Show inline comments
 
new file 100644
 
#include <stdio.h>
 
#include <string.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 

	
 
int main(int argc, char** argv) {
 
    if(argc != 3) {
 
        printf("Expected arg[1] and arg[2] for use as addr str\n");
 
        exit(1);
 
    }            
 
    char * pdl_ptr = buffer_pdl("eg_protocols.pdl");
 
    size_t pdl_len = strlen(pdl_ptr);
 
    Arc_ProtocolDescription * pd = protocol_description_parse(pdl_ptr, pdl_len);
 
    char logpath[] = "./a_amy_log.txt";
 
    Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    
 
    PortId ports[6]; 
 
    connector_add_port_pair(c, &ports[0], &ports[1]);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    connector_add_net_port(c, &ports[2], argv[1], strlen(argv[1]), Getter, Passive);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    connector_add_net_port(c, &ports[3], argv[2], strlen(argv[2]), Putter, Active);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    connector_add_port_pair(c, &ports[4], &ports[5]);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    // native {0,1,2,3,4,5}
 
    
 
    connector_add_component(c, "together", 8, &ports[1], 4);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    // native {0,5} together {1,2,3,4}
 
    
 
    connector_connect(c, 4000);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	connector_put_bytes(c, ports[0], "hi", 2);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	connector_get(c, ports[5]);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
    connector_sync(c, 1000);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	size_t msg_len;
 
	const char * msg_ptr = connector_gotten_bytes(c, ports[5], &msg_len);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	printf("Got msg `%.*s`\n", msg_len, msg_ptr);
 
    
 
    protocol_description_destroy(pd);
 
    connector_destroy(c);
 
    return 0;
 
}
 
\ No newline at end of file
examples/a_swap/bob.c
Show inline comments
 
new file 100644
 
#include <stdio.h>
 
#include <string.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 

	
 
int main(int argc, char** argv) {
 
    if(argc != 3) {
 
        printf("Expected arg[1] and arg[2] for use as addr str\n");
 
        exit(1);
 
    }            
 
    char * pdl_ptr = buffer_pdl("eg_protocols.pdl");
 
    size_t pdl_len = strlen(pdl_ptr);
 
    Arc_ProtocolDescription * pd = protocol_description_parse(pdl_ptr, pdl_len);
 
    char logpath[] = "./a_bob_log.txt";
 
    Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    
 
    PortId ports[6]; 
 
    connector_add_port_pair(c, &ports[0], &ports[1]);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    connector_add_net_port(c, &ports[2], argv[1], strlen(argv[1]), Getter, Passive);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    connector_add_net_port(c, &ports[3], argv[2], strlen(argv[2]), Putter, Active);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    connector_add_port_pair(c, &ports[4], &ports[5]);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    // native {0,1,2,3,4,5}
 
    
 
    connector_add_component(c, "together", 8, &ports[1], 4);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    // native {0,5} together {1,2,3,4}
 
    
 
    connector_connect(c, 4000);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	connector_put_bytes(c, ports[0], "hi", 2);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	connector_get(c, ports[5]);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
    connector_sync(c, 1000);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	size_t msg_len;
 
	const char * msg_ptr = connector_gotten_bytes(c, ports[5], &msg_len);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	printf("Got msg `%.*s`\n", msg_len, msg_ptr);
 
    
 
    protocol_description_destroy(pd);
 
    connector_destroy(c);
 
    return 0;
 
}
 
\ No newline at end of file
examples/eg_protocols.pdl
Show inline comments
 
primitive foo(){}
 
\ No newline at end of file
 
primitive together(in ia, in ib, out oa, out ob){
 
  while(true) synchronous() {
 
    if(fires(ia)) {
 
      put(oa, get(ia));
 
      put(ob, get(ib));
 
    }
 
  }	
 
}
 
\ No newline at end of file
examples/make.py
Show inline comments
 
import os, glob, subprocess
 
import os, glob, subprocess, time
 
script_path = os.path.dirname(os.path.realpath(__file__));
 
for c_file in glob.glob(script_path + "/*/*.c", recursive=False):
 
  print("compiling", c_file)
 
  args = [
 
    "gcc",          # compiler
 
    "-L",           # lib path flag
 
@@ -9,7 +9,8 @@ for c_file in glob.glob(script_path + "/*/*.c", recursive=False):
 
    "-lreowolf_rs", # add lib called "reowolf_rs"
 
    "-Wl,-R./",     # pass -R flag to linker: produce relocatable object
 
    c_file,         # input source file
 
    "-o",           # output flag
 
    c_file[:-2]     # output filename
 
  ];
 
  subprocess.run(args);
 
  subprocess.run(args)
 
input("Blocking until newline...");
examples/reowolf_rs.dll
Show inline comments
 
deleted file
 
binary diff not shown
src/runtime/communication.rs
Show inline comments
 
@@ -23,12 +23,13 @@ struct SolutionStorage {
 
struct BranchingProtoComponent {
 
    ports: HashSet<PortId>,
 
    branches: HashMap<Predicate, ProtoComponentBranch>,
 
}
 
#[derive(Debug, Clone)]
 
struct ProtoComponentBranch {
 
    ended: bool,
 
    inbox: HashMap<PortId, Payload>,
 
    state: ComponentState,
 
}
 
struct CyclicDrainer<'a, K: Eq + Hash, V> {
 
    input: &'a mut HashMap<K, V>,
 
    inner: CyclicDrainInner<'a, K, V>,
 
@@ -316,13 +317,13 @@ impl Connector {
 
                log!(
 
                    cu.logger,
 
                    "End round with (updated) component states {:?}",
 
                    cu.proto_components.keys()
 
                );
 
                // consume native
 
                Ok(Some(branching_native.collapse_with(&predicate)))
 
                Ok(Some(branching_native.collapse_with(&mut *cu.logger, &predicate)))
 
            }
 
        };
 
        log!(cu.logger, "Sync round ending! Cleaning up");
 
        // dropping {solution_storage, payloads_to_get}
 
        ret
 
    }
 
@@ -361,12 +362,13 @@ impl Connector {
 
            "Running all {} proto components to their sync blocker...",
 
            branching_proto_components.len()
 
        );
 
        for (&proto_component_id, proto_component) in branching_proto_components.iter_mut() {
 
            let BranchingProtoComponent { ports, branches } = proto_component;
 
            let mut swap = HashMap::default();
 
            // initially, no components have .ended==true
 
            let mut blocked = HashMap::default();
 
            // drain from branches --> blocked
 
            let cd = CyclicDrainer::new(branches, &mut swap, &mut blocked);
 
            BranchingProtoComponent::drain_branches_to_blocked(
 
                cd,
 
                cu,
 
@@ -648,12 +650,18 @@ impl BranchingNative {
 
            let var = cu.port_info.firing_var_for(getter);
 
            let mut feed_branch = |branch: &mut NativeBranch, predicate: &Predicate| {
 
                let was = branch.gotten.insert(getter, send_payload_msg.payload.clone());
 
                assert!(was.is_none());
 
                branch.to_get.remove(&getter);
 
                if branch.to_get.is_empty() {
 
                    log!(
 
                        cu.logger,
 
                        "new native solution with {:?} (to_get.is_empty()) with gotten {:?}",
 
                        &predicate,
 
                        &branch.gotten
 
                    );
 
                    let route = Route::LocalComponent(ComponentId::Native);
 
                    solution_storage.submit_and_digest_subtree_solution(
 
                        &mut *cu.logger,
 
                        route,
 
                        predicate.clone(),
 
                    );
 
@@ -712,16 +720,23 @@ impl BranchingNative {
 
                    finished.insert(predicate, branch);
 
                    finished.insert(predicate2, branch2);
 
                }
 
            }
 
        }
 
    }
 
    fn collapse_with(self, solution_predicate: &Predicate) -> RoundOk {
 
    fn collapse_with(self, logger: &mut dyn Logger, solution_predicate: &Predicate) -> RoundOk {
 
        log!(
 
            logger,
 
            "Collapsing native with {} branch preds {:?}",
 
            self.branches.len(),
 
            self.branches.keys()
 
        );
 
        for (branch_predicate, branch) in self.branches {
 
            if solution_predicate.satisfies(&branch_predicate) {
 
            if branch.to_get.is_empty() && solution_predicate.satisfies(&branch_predicate) {
 
                let NativeBranch { index, gotten, .. } = branch;
 
                log!(logger, "Collapsed native has gotten {:?}", &gotten);
 
                return RoundOk { batch_index: index, gotten };
 
            }
 
        }
 
        panic!("Native had no branches matching pred {:?}", solution_predicate);
 
    }
 
}
 
@@ -768,12 +783,13 @@ impl BranchingProtoComponent {
 
                    // submit solution for this component
 
                    solution_storage.submit_and_digest_subtree_solution(
 
                        &mut *cu.logger,
 
                        Route::LocalComponent(ComponentId::Proto(proto_component_id)),
 
                        predicate.clone(),
 
                    );
 
                    branch.ended = true;
 
                    // move to "blocked"
 
                    drainer.add_output(predicate, branch);
 
                }
 
                B::CouldntReadMsg(port) => {
 
                    // move to "blocked"
 
                    assert!(!branch.inbox.contains_key(&port));
 
@@ -829,12 +845,17 @@ impl BranchingProtoComponent {
 
        let BranchingProtoComponent { branches, ports } = self;
 
        let mut unblocked = HashMap::default();
 
        let mut blocked = HashMap::default();
 
        // partition drain from branches -> {unblocked, blocked}
 
        log!(logger, "visiting {} blocked branches...", branches.len());
 
        for (predicate, mut branch) in branches.drain() {
 
            if branch.ended {
 
                log!(logger, "Skipping ended branch with {:?}", &predicate);
 
                blocked.insert(predicate, branch);
 
                continue;
 
            }
 
            use CommonSatResult as Csr;
 
            log!(logger, "visiting branch with pred {:?}", &predicate);
 
            match predicate.common_satisfier(&send_payload_msg.predicate) {
 
                Csr::Nonexistant => {
 
                    // this branch does not receive the message
 
                    log!(logger, "skipping branch");
 
@@ -882,21 +903,21 @@ impl BranchingProtoComponent {
 
        log!(cu.logger, "component settles down with branches: {:?}", branches.keys());
 
        Ok(())
 
    }
 
    fn collapse_with(self, solution_predicate: &Predicate) -> ProtoComponent {
 
        let BranchingProtoComponent { ports, branches } = self;
 
        for (branch_predicate, branch) in branches {
 
            if branch_predicate.satisfies(solution_predicate) {
 
            if branch.ended && branch_predicate.satisfies(solution_predicate) {
 
                let ProtoComponentBranch { state, .. } = branch;
 
                return ProtoComponent { state, ports };
 
            }
 
        }
 
        panic!("ProtoComponent had no branches matching pred {:?}", solution_predicate);
 
    }
 
    fn initial(ProtoComponent { state, ports }: ProtoComponent) -> Self {
 
        let branch = ProtoComponentBranch { inbox: Default::default(), state };
 
        let branch = ProtoComponentBranch { inbox: Default::default(), state, ended: false };
 
        Self { ports, branches: hashmap! { Predicate::default() => branch  } }
 
    }
 
}
 
impl SolutionStorage {
 
    fn new(routes: impl Iterator<Item = Route>) -> Self {
 
        let mut subtree_id_to_index: HashMap<Route, usize> = Default::default();
src/runtime/endpoints.rs
Show inline comments
 
@@ -12,13 +12,13 @@ enum TryRecyAnyError {
 
}
 

	
 
/////////////////////
 
impl Endpoint {
 
    pub(super) fn try_recv<T: serde::de::DeserializeOwned>(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        _logger: &mut dyn Logger,
 
    ) -> Result<Option<T>, EndpointError> {
 
        use EndpointError::*;
 
        // populate inbox as much as possible
 
        'read_loop: loop {
 
            let res = self.stream.read_to_end(&mut self.inbox);
 
            endptlog!(logger, "Stream read to end {:?}", &res);
src/runtime/ffi.rs
Show inline comments
 
@@ -238,13 +238,15 @@ pub unsafe extern "C" fn connector_add_net_port(
 
            StoredError::tl_debug_store(&err);
 
            return -2;
 
        }
 
    };
 
    match connector.new_net_port(port_polarity, sock_address, endpoint_polarity) {
 
        Ok(p) => {
 
            port.write(p);
 
            if !port.is_null() {
 
                port.write(p);
 
            }
 
            0
 
        }
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            -3
 
        }
 
@@ -357,19 +359,21 @@ pub unsafe extern "C" fn connector_sync(connector: &mut Connector, timeout_milli
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_gotten_bytes(
 
    connector: &mut Connector,
 
    port: PortId,
 
    len: *mut usize,
 
    out_len: *mut usize,
 
) -> *const u8 {
 
    StoredError::tl_clear();
 
    match connector.gotten(port) {
 
        Ok(payload_borrow) => {
 
            let slice = payload_borrow.as_slice();
 
            len.write(slice.len());
 
            if !out_len.is_null() {
 
                out_len.write(slice.len());
 
            }
 
            slice.as_ptr()
 
        }
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            std::ptr::null()
 
        }
src/runtime/logging.rs
Show inline comments
 
@@ -15,19 +15,19 @@ impl Logger for DummyLogger {
 
    fn line_writer(&mut self) -> &mut dyn std::io::Write {
 
        self
 
    }
 
}
 
impl Logger for VecLogger {
 
    fn line_writer(&mut self) -> &mut dyn std::io::Write {
 
        let _ = write!(&mut self.1, "CID({}): ", self.0);
 
        let _ = write!(&mut self.1, "CID({}) at {:?} ", self.0, Instant::now());
 
        self
 
    }
 
}
 
impl Logger for FileLogger {
 
    fn line_writer(&mut self) -> &mut dyn std::io::Write {
 
        let _ = write!(&mut self.1, "CID({}): ", self.0);
 
        let _ = write!(&mut self.1, "CID({}) at {:?} ", self.0, Instant::now());
 
        &mut self.1
 
    }
 
}
 
///////////////////
 
impl Drop for VecLogger {
 
    fn drop(&mut self) {
src/runtime/mod.rs
Show inline comments
 
@@ -214,21 +214,21 @@ impl<T: std::cmp::Ord> VecSet<T> {
 
        vec.dedup();
 
        Self { vec }
 
    }
 
    fn contains(&self, element: &T) -> bool {
 
        self.vec.binary_search(element).is_ok()
 
    }
 
    fn insert(&mut self, element: T) -> bool {
 
        match self.vec.binary_search(&element) {
 
            Ok(_) => false,
 
            Err(index) => {
 
                self.vec.insert(index, element);
 
                true
 
            }
 
        }
 
    }
 
    // fn insert(&mut self, element: T) -> bool {
 
    //     match self.vec.binary_search(&element) {
 
    //         Ok(_) => false,
 
    //         Err(index) => {
 
    //             self.vec.insert(index, element);
 
    //             true
 
    //         }
 
    //     }
 
    // }
 
    fn iter(&self) -> std::slice::Iter<T> {
 
        self.vec.iter()
 
    }
 
}
 
impl PortInfo {
 
    fn firing_var_for(&self, port: PortId) -> FiringVar {
 
@@ -262,15 +262,19 @@ impl Drop for Connector {
 
        log!(&mut *self.unphased.logger, "Connector dropping. Goodbye!");
 
    }
 
}
 
impl Connector {
 
    fn random_id() -> ConnectorId {
 
        type Bytes8 = [u8; std::mem::size_of::<ConnectorId>()];
 
        let mut bytes = Bytes8::default();
 
        getrandom::getrandom(&mut bytes).unwrap();
 
        unsafe { std::mem::transmute::<Bytes8, ConnectorId>(bytes) }
 
        unsafe {
 
            let mut bytes = std::mem::MaybeUninit::<Bytes8>::uninit();
 
            // getrandom is the canonical crate for a small, secure rng
 
            getrandom::getrandom(&mut *bytes.as_mut_ptr()).unwrap();
 
            // safe! representations of all valid Byte8 values are valid ConnectorId values
 
            std::mem::transmute::<_, _>(bytes.assume_init())
 
        }
 
    }
 
    pub fn swap_logger(&mut self, mut new_logger: Box<dyn Logger>) -> Box<dyn Logger> {
 
        std::mem::swap(&mut self.unphased.logger, &mut new_logger);
 
        new_logger
 
    }
 
    pub fn get_logger(&mut self) -> &mut dyn Logger {
src/runtime/setup.rs
Show inline comments
 
@@ -143,13 +143,13 @@ fn new_endpoint_manager(
 
        })
 
    }
 
    ////////////////////////////////////////////
 

	
 
    // 1. Start to construct EndpointManager
 
    const WAKER_TOKEN: Token = Token(usize::MAX);
 
    const WAKER_PERIOD: Duration = Duration::from_millis(90);
 
    const WAKER_PERIOD: Duration = Duration::from_millis(300);
 

	
 
    assert!(endpoint_setups.len() < WAKER_TOKEN.0); // using MAX usize as waker token
 

	
 
    let mut waker_continue_signal: Option<Arc<AtomicBool>> = None;
 
    let mut poll = Poll::new().map_err(|_| PollInitFailed)?;
 
    let mut events = Events::with_capacity(endpoint_setups.len() * 2 + 4);
 
@@ -166,47 +166,56 @@ fn new_endpoint_manager(
 
        .collect::<Result<Vec<Todo>, ConnectError>>()?;
 

	
 
    // 3. Using poll to drive progress:
 
    //    - accept an incoming connection for each TcpListener (turning them into endpoints too)
 
    //    - for each endpoint, send the local PortId
 
    //    - for each endpoint, recv the peer's PortId, and
 

	
 
    // all in connect_failed are NOT registered with Poll
 
    let mut connect_failed: HashSet<usize> = Default::default();
 

	
 
    let mut setup_incomplete: HashSet<usize> = (0..todos.len()).collect();
 
    while !setup_incomplete.is_empty() {
 
        let remaining = if let Some(deadline) = deadline {
 
            Some(deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?)
 
        } else {
 
            None
 
        };
 
        poll.poll(&mut events, remaining).map_err(|_| PollFailed)?;
 
        for event in events.iter() {
 
            let token = event.token();
 
            let Token(index) = token;
 
            let todo: &mut Todo = &mut todos[index];
 
            if token == WAKER_TOKEN {
 
                log!(logger, "Notification from waker");
 
                log!(
 
                    logger,
 
                    "Notification from waker. connect_failed is {:?}",
 
                    connect_failed.iter()
 
                );
 
                assert!(waker_continue_signal.is_some());
 
                for index in connect_failed.drain() {
 
                    let todo: &mut Todo = &mut todos[index];
 
                    log!(
 
                        logger,
 
                        "Restarting connection with endpoint {:?} {:?}",
 
                        index,
 
                        todo.endpoint_setup.sock_addr
 
                    );
 
                    match &mut todo.todo_endpoint {
 
                        TodoEndpoint::Endpoint(endpoint) => {
 
                            let mut new_stream = TcpStream::connect(todo.endpoint_setup.sock_addr)
 
                                .expect("mio::TcpStream connect should not fail!");
 
                            poll.registry().deregister(&mut endpoint.stream).unwrap();
 
                            std::mem::swap(&mut endpoint.stream, &mut new_stream);
 
                            poll.registry().register(&mut endpoint.stream, token, BOTH).unwrap();
 
                            poll.registry()
 
                                .register(&mut endpoint.stream, Token(index), BOTH)
 
                                .unwrap();
 
                        }
 
                        _ => unreachable!(),
 
                    }
 
                }
 
            } else {
 
                let todo: &mut Todo = &mut todos[index];
 
                // FIRST try convert this into an endpoint
 
                if let TodoEndpoint::Accepting(listener) = &mut todo.todo_endpoint {
 
                    match listener.accept() {
 
                        Ok((mut stream, peer_addr)) => {
 
                            poll.registry().deregister(listener).unwrap();
 
                            poll.registry().register(&mut stream, token, BOTH).unwrap();
 
@@ -231,13 +240,25 @@ fn new_endpoint_manager(
 
                if let TodoEndpoint::Endpoint(endpoint) = &mut todo.todo_endpoint {
 
                    if event.is_error() {
 
                        if todo.endpoint_setup.endpoint_polarity == EndpointPolarity::Passive {
 
                            // right now you cannot retry an acceptor.
 
                            return Err(AcceptFailed(endpoint.stream.local_addr().unwrap()));
 
                        }
 
                        connect_failed.insert(index);
 
                        if connect_failed.insert(index) {
 
                            log!(
 
                                logger,
 
                                "Connection failed for {:?}. List is {:?}",
 
                                index,
 
                                connect_failed.iter()
 
                            );
 
                            poll.registry().deregister(&mut endpoint.stream).unwrap();
 
                        } else {
 
                            // spurious wakeup
 
                            continue;
 
                        }
 

	
 
                        if waker_continue_signal.is_none() {
 
                            log!(logger, "First connect failure. Starting waker thread");
 
                            let waker =
 
                                Arc::new(mio::Waker::new(poll.registry(), WAKER_TOKEN).unwrap());
 
                            let wcs = Arc::new(AtomicBool::from(true));
 
                            let wcs2 = wcs.clone();
src/runtime/tests.rs
Show inline comments
 
@@ -23,15 +23,25 @@ fn file_logged_connector(connector_id: ConnectorId, dir_path: &Path) -> Connecto
 
    let path = dir_path.join(format!("cid_{:?}.txt", connector_id));
 
    let file = File::create(path).unwrap();
 
    let file_logger = Box::new(FileLogger::new(connector_id, file));
 
    Connector::new(file_logger, MINIMAL_PROTO.clone(), connector_id, 8)
 
}
 

	
 
static MINIMAL_PDL: &'static [u8] = b"
 
primitive together(in ia, in ib, out oa, out ob){
 
  while(true) synchronous() {
 
    if(fires(ia)) {
 
      put(oa, get(ia));
 
      put(ob, get(ib));
 
    }
 
  } 
 
}
 
";
 
lazy_static::lazy_static! {
 
    static ref MINIMAL_PROTO: Arc<ProtocolDescription> = {
 
        Arc::new(reowolf::ProtocolDescription::parse(b"").unwrap())
 
        Arc::new(reowolf::ProtocolDescription::parse(MINIMAL_PDL).unwrap())
 
    };
 
}
 
lazy_static::lazy_static! {
 
    static ref TEST_MSG: Payload = {
 
        Payload::from(b"hello" as &[u8])
 
    };
 
@@ -514,6 +524,58 @@ fn net_self_loop() {
 
    let g = c.new_net_port(Getter, sock_addr, Passive).unwrap();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.put(p, TEST_MSG.clone()).unwrap();
 
    c.get(g).unwrap();
 
    c.sync(Some(Duration::from_millis(500))).unwrap();
 
}
 

	
 
#[test]
 
fn nobody_connects_active() {
 
    let test_log_path = Path::new("./logs/nobody_connects_active");
 
    let sock_addr = next_test_addr();
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let _g = c.new_net_port(Getter, sock_addr, Active).unwrap();
 
    c.connect(Some(Duration::from_secs(5))).unwrap_err();
 
}
 
#[test]
 
fn nobody_connects_passive() {
 
    let test_log_path = Path::new("./logs/nobody_connects_passive");
 
    let sock_addr = next_test_addr();
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let _g = c.new_net_port(Getter, sock_addr, Passive).unwrap();
 
    c.connect(Some(Duration::from_secs(5))).unwrap_err();
 
}
 

	
 
#[test]
 
fn together() {
 
    let test_log_path = Path::new("./logs/together");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let [p0, p1] = c.new_port_pair();
 
            let p2 = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
            let p3 = c.new_net_port(Putter, sock_addrs[1], Active).unwrap();
 
            let [p4, p5] = c.new_port_pair();
 
            c.add_component(b"together", &[p1, p2, p3, p4]).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.put(p0, TEST_MSG.clone()).unwrap();
 
            c.get(p5).unwrap();
 
            c.sync(Some(Duration::from_millis(500))).unwrap();
 
            c.gotten(p5).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let [p0, p1] = c.new_port_pair();
 
            let p2 = c.new_net_port(Getter, sock_addrs[1], Passive).unwrap();
 
            let p3 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap();
 
            let [p4, p5] = c.new_port_pair();
 
            c.add_component(b"together", &[p1, p2, p3, p4]).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.put(p0, TEST_MSG.clone()).unwrap();
 
            c.get(p5).unwrap();
 
            c.sync(Some(Duration::from_millis(500))).unwrap();
 
            c.gotten(p5).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
src/test/connector.rs
Show inline comments
 
deleted file
src/test/mod.rs
Show inline comments
 
deleted file
src/test/setup.rs
Show inline comments
 
deleted file
0 comments (0 inline, 0 general)