Changeset - 1db6ec21a437
[Not reviewed]
0 5 0
Christopher Esterhuyse - 5 years ago 2020-07-21 12:13:33
christopher.esterhuyse@gmail.com
more elegant on-demand native branch indexing with a speculative var
5 files changed with 78 insertions and 50 deletions:
0 comments (0 inline, 0 general)
src/ffi/mod.rs
Show inline comments
 
@@ -263,43 +263,43 @@ pub unsafe extern "C" fn connector_add_net_port(
 
        }
 
    }
 
}
 

	
 
/// Given
 
/// - an initialized connector in setup or connecting state,
 
/// - a utf-8 encoded BIND socket addresses (i.e., "local"),
 
/// - a utf-8 encoded CONNECT socket addresses (i.e., "peer"),
 
/// returns [P, G] via out pointers [putter, getter],
 
/// - where P is a Putter port that sends messages into the socket
 
/// - where G is a Getter port that recvs messages from the socket
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_add_udp_port(
 
pub unsafe extern "C" fn connector_add_udp_port_pair(
 
    connector: &mut Connector,
 
    putter: *mut PortId,
 
    getter: *mut PortId,
 
    local_addr_str_ptr: *const u8,
 
    local_addr_str_len: usize,
 
    peer_addr_str_ptr: *const u8,
 
    peer_addr_str_len: usize,
 
) -> c_int {
 
    StoredError::tl_clear();
 
    let local = match tl_socketaddr_from_raw(local_addr_str_ptr, local_addr_str_len) {
 
        Ok(local) => local,
 
        Err(errcode) => return errcode,
 
    };
 
    let peer = match tl_socketaddr_from_raw(peer_addr_str_ptr, peer_addr_str_len) {
 
        Ok(local) => local,
 
        Err(errcode) => return errcode,
 
    };
 
    match connector.new_udp_port(local, peer) {
 
    match connector.new_udp_mediator_component(local, peer) {
 
        Ok([p, g]) => {
 
            if !putter.is_null() {
 
                putter.write(p);
 
            }
 
            if !getter.is_null() {
 
                getter.write(g);
 
            }
 
            ERR_OK
 
        }
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            ERR_REOWOLF
src/ffi/socket_api.rs
Show inline comments
 
@@ -18,26 +18,26 @@ struct ConnectorBound {
 
    putter: PortId,
 
    getter: PortId,
 
}
 
struct MaybeConnector {
 
    // invariants:
 
    // 1. connector is a upd-socket singleton
 
    // 2. putter and getter are ports in the native interface with the appropriate polarities
 
    // 3. peer_addr always mirrors connector's single udp socket's connect addr. both are overwritten together.
 
    peer_addr: SocketAddr,
 
    connector_bound: Option<ConnectorBound>,
 
}
 
#[derive(Default)]
 
struct CspStorage {
 
    fd_to_mc: HashMap<c_int, RwLock<MaybeConnector>>,
 
struct FdcStorage {
 
    fd_to_c: HashMap<c_int, RwLock<MaybeConnector>>,
 
    fd_allocator: FdAllocator,
 
}
 
fn trivial_peer_addr() -> SocketAddr {
 
    // SocketAddrV4::new isn't a constant-time func
 
    SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0))
 
}
 
///////////////////////////////////////////////////////////////////
 

	
 
impl Default for FdAllocator {
 
    fn default() -> Self {
 
        Self {
 
            next: Some(0), // positive values used only
 
@@ -52,25 +52,25 @@ impl FdAllocator {
 
        }
 
        if let Some(fd) = self.next {
 
            self.next = fd.checked_add(1);
 
            return fd;
 
        }
 
        panic!("No more Connector FDs to allocate!")
 
    }
 
    fn free(&mut self, fd: c_int) {
 
        self.freed.push(fd);
 
    }
 
}
 
lazy_static::lazy_static! {
 
    static ref CSP_STORAGE: RwLock<CspStorage> = Default::default();
 
    static ref FDC_STORAGE: RwLock<FdcStorage> = Default::default();
 
}
 
impl MaybeConnector {
 
    fn connect(&mut self, peer_addr: SocketAddr) -> c_int {
 
        self.peer_addr = peer_addr;
 
        if let Some(ConnectorBound { connector, .. }) = &mut self.connector_bound {
 
            if connector.get_mut_udp_sock(0).unwrap().connect(peer_addr).is_err() {
 
                return CONNECT_FAILED;
 
            }
 
        }
 
        ERR_OK
 
    }
 
    unsafe fn send(&mut self, bytes_ptr: *const c_void, bytes_len: usize) -> isize {
 
@@ -101,124 +101,125 @@ impl MaybeConnector {
 
                err => return err as isize,
 
            }
 
        } else {
 
            WRONG_STATE as isize // not bound!
 
        }
 
    }
 
}
 
///////////////////////////////////////////////////////////////////
 

	
 
#[no_mangle]
 
pub extern "C" fn rw_socket(_domain: c_int, _type: c_int) -> c_int {
 
    // ignoring domain and type
 
    let mut w = if let Ok(w) = CSP_STORAGE.write() { w } else { return FD_LOCK_POISONED };
 
    let mut w = if let Ok(w) = FDC_STORAGE.write() { w } else { return FD_LOCK_POISONED };
 
    let fd = w.fd_allocator.alloc();
 
    let mc = MaybeConnector { peer_addr: trivial_peer_addr(), connector_bound: None };
 
    w.fd_to_mc.insert(fd, RwLock::new(mc));
 
    w.fd_to_c.insert(fd, RwLock::new(mc));
 
    fd
 
}
 

	
 
#[no_mangle]
 
pub extern "C" fn rw_close(fd: c_int, _how: c_int) -> c_int {
 
    // ignoring HOW
 
    let mut w = if let Ok(w) = CSP_STORAGE.write() { w } else { return FD_LOCK_POISONED };
 
    w.fd_allocator.free(fd);
 
    if w.fd_to_mc.remove(&fd).is_some() {
 
    let mut w = if let Ok(w) = FDC_STORAGE.write() { w } else { return FD_LOCK_POISONED };
 
    if w.fd_to_c.remove(&fd).is_some() {
 
        w.fd_allocator.free(fd);
 
        ERR_OK
 
    } else {
 
        CLOSE_FAIL
 
    }
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn rw_bind(
 
    fd: c_int,
 
    local_addr: *const SocketAddr,
 
    _addr_len: usize,
 
) -> c_int {
 
    // assuming _domain is AF_INET and _type is SOCK_DGRAM
 
    let r = if let Ok(r) = CSP_STORAGE.read() { r } else { return FD_LOCK_POISONED };
 
    let mc = if let Some(mc) = r.fd_to_mc.get(&fd) { mc } else { return BAD_FD };
 
    let r = if let Ok(r) = FDC_STORAGE.read() { r } else { return FD_LOCK_POISONED };
 
    let mc = if let Some(mc) = r.fd_to_c.get(&fd) { mc } else { return BAD_FD };
 
    let mut mc = if let Ok(mc) = mc.write() { mc } else { return FD_LOCK_POISONED };
 
    let mc: &mut MaybeConnector = &mut mc;
 
    if mc.connector_bound.is_some() {
 
        return WRONG_STATE;
 
    }
 
    mc.connector_bound = {
 
        let mut connector = Connector::new(
 
            Box::new(crate::DummyLogger),
 
            crate::TRIVIAL_PD.clone(),
 
            Connector::random_id(),
 
        );
 
        let [putter, getter] = connector.new_udp_port(local_addr.read(), mc.peer_addr).unwrap();
 
        let [putter, getter] =
 
            connector.new_udp_mediator_component(local_addr.read(), mc.peer_addr).unwrap();
 
        Some(ConnectorBound { connector, putter, getter })
 
    };
 
    ERR_OK
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn rw_connect(
 
    fd: c_int,
 
    peer_addr: *const SocketAddr,
 
    _address_len: usize,
 
) -> c_int {
 
    // assuming _domain is AF_INET and _type is SOCK_DGRAM
 
    let r = if let Ok(r) = CSP_STORAGE.read() { r } else { return FD_LOCK_POISONED };
 
    let mc = if let Some(mc) = r.fd_to_mc.get(&fd) { mc } else { return BAD_FD };
 
    let r = if let Ok(r) = FDC_STORAGE.read() { r } else { return FD_LOCK_POISONED };
 
    let mc = if let Some(mc) = r.fd_to_c.get(&fd) { mc } else { return BAD_FD };
 
    let mut mc = if let Ok(mc) = mc.write() { mc } else { return FD_LOCK_POISONED };
 
    let mc: &mut MaybeConnector = &mut mc;
 
    mc.connect(peer_addr.read())
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn rw_send(
 
    fd: c_int,
 
    bytes_ptr: *const c_void,
 
    bytes_len: usize,
 
    _flags: c_int,
 
) -> isize {
 
    // ignoring flags
 
    let r = if let Ok(r) = CSP_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize };
 
    let mc = if let Some(mc) = r.fd_to_mc.get(&fd) { mc } else { return BAD_FD as isize };
 
    let r = if let Ok(r) = FDC_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize };
 
    let mc = if let Some(mc) = r.fd_to_c.get(&fd) { mc } else { return BAD_FD as isize };
 
    let mut mc = if let Ok(mc) = mc.write() { mc } else { return FD_LOCK_POISONED as isize };
 
    let mc: &mut MaybeConnector = &mut mc;
 
    mc.send(bytes_ptr, bytes_len)
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn rw_recv(
 
    fd: c_int,
 
    bytes_ptr: *mut c_void,
 
    bytes_len: usize,
 
    _flags: c_int,
 
) -> isize {
 
    // ignoring flags
 
    let r = if let Ok(r) = CSP_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize };
 
    let mc = if let Some(mc) = r.fd_to_mc.get(&fd) { mc } else { return BAD_FD as isize };
 
    let r = if let Ok(r) = FDC_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize };
 
    let mc = if let Some(mc) = r.fd_to_c.get(&fd) { mc } else { return BAD_FD as isize };
 
    let mut mc = if let Ok(mc) = mc.write() { mc } else { return FD_LOCK_POISONED as isize };
 
    let mc: &mut MaybeConnector = &mut mc;
 
    mc.recv(bytes_ptr, bytes_len)
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn rw_sendto(
 
    fd: c_int,
 
    bytes_ptr: *mut c_void,
 
    bytes_len: usize,
 
    _flags: c_int,
 
    peer_addr: *const SocketAddr,
 
    _addr_len: usize,
 
) -> isize {
 
    let r = if let Ok(r) = CSP_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize };
 
    let mc = if let Some(mc) = r.fd_to_mc.get(&fd) { mc } else { return BAD_FD as isize };
 
    let r = if let Ok(r) = FDC_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize };
 
    let mc = if let Some(mc) = r.fd_to_c.get(&fd) { mc } else { return BAD_FD as isize };
 
    let mut mc = if let Ok(mc) = mc.write() { mc } else { return FD_LOCK_POISONED as isize };
 
    let mc: &mut MaybeConnector = &mut mc;
 
    // copy currently connected peer addr
 
    let connected = mc.peer_addr;
 
    // connect to given peer_addr
 
    match mc.connect(peer_addr.read()) {
 
        e if e != ERR_OK => return e as isize,
 
        _ => {}
 
    }
 
    // send
 
    let ret = mc.send(bytes_ptr, bytes_len);
 
    // restore connected peer addr
 
@@ -230,26 +231,26 @@ pub unsafe extern "C" fn rw_sendto(
 
}
 

	
 
#[no_mangle]
 
#[no_mangle]
 
pub unsafe extern "C" fn rw_recvfrom(
 
    fd: c_int,
 
    bytes_ptr: *mut c_void,
 
    bytes_len: usize,
 
    _flags: c_int,
 
    peer_addr: *const SocketAddr,
 
    _addr_len: usize,
 
) -> isize {
 
    let r = if let Ok(r) = CSP_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize };
 
    let mc = if let Some(mc) = r.fd_to_mc.get(&fd) { mc } else { return BAD_FD as isize };
 
    let r = if let Ok(r) = FDC_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize };
 
    let mc = if let Some(mc) = r.fd_to_c.get(&fd) { mc } else { return BAD_FD as isize };
 
    let mut mc = if let Ok(mc) = mc.write() { mc } else { return FD_LOCK_POISONED as isize };
 
    let mc: &mut MaybeConnector = &mut mc;
 
    // copy currently connected peer addr
 
    let connected = mc.peer_addr;
 
    // connect to given peer_addr
 
    match mc.connect(peer_addr.read()) {
 
        e if e != ERR_OK => return e as isize,
 
        _ => {}
 
    }
 
    // send
 
    let ret = mc.send(bytes_ptr, bytes_len);
 
    // restore connected peer addr
src/runtime/communication.rs
Show inline comments
 
@@ -14,25 +14,25 @@ struct GetterBuffer {
 
}
 
struct RoundCtx {
 
    solution_storage: SolutionStorage,
 
    spec_var_stream: SpecVarStream,
 
    getter_buffer: GetterBuffer,
 
    deadline: Option<Instant>,
 
}
 
struct BranchingNative {
 
    branches: HashMap<Predicate, NativeBranch>,
 
}
 
#[derive(Clone, Debug)]
 
struct NativeBranch {
 
    index: usize,
 
    batch_index: usize,
 
    gotten: HashMap<PortId, Payload>,
 
    to_get: HashSet<PortId>,
 
}
 
#[derive(Debug)]
 
struct SolutionStorage {
 
    old_local: HashSet<Predicate>,
 
    new_local: HashSet<Predicate>,
 
    // this pair acts as SubtreeId -> HashSet<Predicate> which is friendlier to iteration
 
    subtree_solutions: Vec<HashSet<Predicate>>,
 
    subtree_id_to_index: HashMap<SubtreeId, usize>,
 
}
 
#[derive(Debug)]
 
@@ -294,81 +294,103 @@ impl Connector {
 
            spec_var_stream: cu.inner.id_manager.new_spec_var_stream(),
 
            getter_buffer: Default::default(),
 
            deadline: timeout.map(|to| Instant::now() + to),
 
        };
 
        log!(cu.inner.logger, "Round context structure initialized");
 

	
 
        // Explore all native branches eagerly. Find solutions, buffer messages, etc.
 
        log!(
 
            cu.inner.logger,
 
            "Translating {} native batches into branches...",
 
            comm.native_batches.len()
 
        );
 
        let native_branch_spec_var = rctx.spec_var_stream.next();
 
        log!(cu.inner.logger, "Native branch spec var is {:?}", native_branch_spec_var);
 
        let mut native_spec_assign_stream = None;
 
        let mut branching_native = BranchingNative { branches: Default::default() };
 
        'native_branches: for ((native_branch, index), branch_spec_val) in
 
        'native_branches: for ((native_branch, batch_index), branch_spec_val) in
 
            comm.native_batches.drain(..).zip(0..).zip(SpecVal::iter_domain())
 
        {
 
            let NativeBatch { to_get, to_put } = native_branch;
 
            let predicate = {
 
                let mut predicate = Predicate::default();
 
                // assign trues for ports that fire
 
                let firing_ports: HashSet<PortId> =
 
                    to_get.iter().chain(to_put.keys()).copied().collect();
 
                for &port in to_get.iter().chain(to_put.keys()) {
 
                    let var = cu.inner.port_info.spec_var_for(port);
 
                    predicate.assigned.insert(var, SpecVal::FIRING);
 
                }
 
                // assign falses for all silent (not firing) ports
 
                for &port in cu.inner.native_ports.difference(&firing_ports) {
 
                    let var = cu.inner.port_info.spec_var_for(port);
 
                    if let Some(SpecVal::FIRING) = predicate.assigned.insert(var, SpecVal::SILENT) {
 
                        log!(cu.inner.logger, "Native branch index={} contains internal inconsistency wrt. {:?}. Skipping", index, var);
 
                        log!(cu.inner.logger, "Native branch index={} contains internal inconsistency wrt. {:?}. Skipping", batch_index, var);
 
                        continue 'native_branches;
 
                    }
 
                }
 
                // this branch is consistent. distinguish it with a unique var:val mapping and proceed
 
                predicate.inserted(native_branch_spec_var, branch_spec_val)
 
                predicate
 
            };
 
            log!(
 
                cu.inner.logger,
 
                "Native branch index={:?} has consistent {:?}",
 
                index,
 
                "Native batch_index={:?} has consistent {:?}",
 
                batch_index,
 
                &predicate
 
            );
 
            // send all outgoing messages (by buffering them)
 
            for (putter, payload) in to_put {
 
                let msg = SendPayloadMsg { predicate: predicate.clone(), payload };
 
                log!(cu.inner.logger, "Native branch {} sending msg {:?}", index, &msg);
 
                log!(cu.inner.logger, "Native branch {} sending msg {:?}", batch_index, &msg);
 
                rctx.getter_buffer.putter_add(cu, putter, msg);
 
            }
 
            let branch = NativeBranch { index, gotten: Default::default(), to_get };
 
            let branch = NativeBranch { batch_index, gotten: Default::default(), to_get };
 
            if let Some(old_branch) = branching_native.branches.remove(&predicate) {
 
                let (var, vals): (SpecVar, [SpecVal; 2]) = {
 
                    let (var, val_iter) = native_spec_assign_stream.get_or_insert_with(|| {
 
                        (rctx.spec_var_stream.next(), SpecVal::iter_domain())
 
                    });
 
                    let vals = [
 
                        val_iter.next().expect("Exhausted specval space!"),
 
                        val_iter.next().expect("Exhausted specval space!"),
 
                    ];
 
                    (*var, vals)
 
                };
 
                log!(
 
                    cu.inner.logger,
 
                    "Branch collision on {:?}. Going to distinguish them with a spec var {:?} ands vals {:?}",
 
                    &predicate,
 
                    var,
 
                    vals
 
                );
 
                branching_native
 
                    .branches
 
                    .insert(predicate.clone().inserted(var, vals[0]), old_branch);
 
                branching_native.branches.insert(predicate.inserted(var, vals[1]), branch);
 
            } else {
 
                // no branch collision
 
                branching_native.branches.insert(predicate, branch);
 
            }
 
        }
 
        for (predicate, branch) in branching_native.branches.iter() {
 
            if branch.is_ended() {
 
                log!(
 
                    cu.inner.logger,
 
                    "Native submitting solution for batch {} with {:?}",
 
                    index,
 
                    "Native submitting solution for branch {} with {:?}",
 
                    branch.batch_index,
 
                    &predicate
 
                );
 
                rctx.solution_storage.submit_and_digest_subtree_solution(
 
                    &mut *cu.inner.logger,
 
                    SubtreeId::LocalComponent(ComponentId::Native),
 
                    predicate.clone(),
 
                );
 
            }
 
            if let Some(_) = branching_native.branches.insert(predicate, branch) {
 
                // thanks to the native_branch_spec_var, each batch has a distinct predicate
 
                unreachable!()
 
            }
 
        }
 
        // restore the invariant: !native_batches.is_empty()
 
        comm.native_batches.push(Default::default());
 

	
 
        comm.endpoint_manager
 
            .udp_endpoints_round_start(&mut *cu.inner.logger, &mut rctx.spec_var_stream);
 
        // Call to another big method; keep running this round until a distributed decision is reached
 
        let decision = Self::sync_reach_decision(
 
            cu,
 
            comm,
 
            &mut branching_native,
 
            &mut branching_proto_components,
 
@@ -848,27 +870,27 @@ impl BranchingNative {
 
            self.branches.len(),
 
            self.branches.keys()
 
        );
 
        for (branch_predicate, branch) in self.branches {
 
            log!(
 
                logger,
 
                "Considering native branch {:?} with to_get {:?} gotten {:?}",
 
                &branch_predicate,
 
                &branch.to_get,
 
                &branch.gotten
 
            );
 
            if branch.is_ended() && branch_predicate.assigns_subset(solution_predicate) {
 
                let NativeBranch { index, gotten, .. } = branch;
 
                let NativeBranch { batch_index, gotten, .. } = branch;
 
                log!(logger, "Collapsed native has gotten {:?}", &gotten);
 
                return RoundOk { batch_index: index, gotten };
 
                return RoundOk { batch_index, gotten };
 
            }
 
        }
 
        panic!("Native had no branches matching pred {:?}", solution_predicate);
 
    }
 
}
 
impl BranchingProtoComponent {
 
    fn drain_branches_to_blocked(
 
        cd: CyclicDrainer<Predicate, ProtoComponentBranch>,
 
        cu: &mut ConnectorUnphased,
 
        rctx: &mut RoundCtx,
 
        proto_component_id: ProtoComponentId,
 
        ports: &HashSet<PortId>,
src/runtime/setup.rs
Show inline comments
 
@@ -16,25 +16,30 @@ impl Connector {
 
                    logger,
 
                    id_manager: IdManager::new(connector_id),
 
                    native_ports: Default::default(),
 
                    port_info: Default::default(),
 
                },
 
            },
 
            phased: ConnectorPhased::Setup(Box::new(ConnectorSetup {
 
                net_endpoint_setups: Default::default(),
 
                udp_endpoint_setups: Default::default(),
 
            })),
 
        }
 
    }
 
    pub fn new_udp_port(
 
    /// Conceptually, this returning [p0, g1] is sugar for:
 
    /// 1. create port pair [p0, g0]
 
    /// 2. create port pair [p1, g1]
 
    /// 3. create udp component with interface of moved ports [p1, g0]
 
    /// 4. return [p0, g1]
 
    pub fn new_udp_mediator_component(
 
        &mut self,
 
        local_addr: SocketAddr,
 
        peer_addr: SocketAddr,
 
    ) -> Result<[PortId; 2], WrongStateError> {
 
        let Self { unphased: cu, phased } = self;
 
        match phased {
 
            ConnectorPhased::Communication(..) => Err(WrongStateError),
 
            ConnectorPhased::Setup(setup) => {
 
                let udp_index = setup.udp_endpoint_setups.len();
 
                let mut npid = || cu.inner.id_manager.new_port_id();
 
                let [nin, nout, uin, uout] = [npid(), npid(), npid(), npid()];
 
                cu.inner.native_ports.insert(nin);
src/runtime/tests.rs
Show inline comments
 
@@ -668,62 +668,62 @@ fn multi_recover() {
 
                assert_eq!(res.is_ok(), succeeds);
 
            }
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn udp_self_connect() {
 
    let test_log_path = Path::new("./logs/udp_self_connect");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    let mut c = file_logged_connector(0, test_log_path);
 
    c.new_udp_port(sock_addrs[0], sock_addrs[1]).unwrap();
 
    c.new_udp_port(sock_addrs[1], sock_addrs[0]).unwrap();
 
    c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap();
 
    c.new_udp_mediator_component(sock_addrs[1], sock_addrs[0]).unwrap();
 
    c.connect(SEC1).unwrap();
 
}
 

	
 
#[test]
 
fn solo_udp_put_success() {
 
    let test_log_path = Path::new("./logs/solo_udp_put_success");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [p0, _] = c.new_udp_port(sock_addrs[0], sock_addrs[1]).unwrap();
 
    let [p0, _] = c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap();
 
    c.connect(SEC1).unwrap();
 
    c.put(p0, TEST_MSG.clone()).unwrap();
 
    c.sync(MS300).unwrap();
 
}
 

	
 
#[test]
 
fn solo_udp_get_fail() {
 
    let test_log_path = Path::new("./logs/solo_udp_get_fail");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, p0] = c.new_udp_port(sock_addrs[0], sock_addrs[1]).unwrap();
 
    let [_, p0] = c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap();
 
    c.connect(SEC1).unwrap();
 
    c.get(p0).unwrap();
 
    c.sync(MS300).unwrap_err();
 
}
 

	
 
#[test]
 
fn reowolf_to_udp() {
 
    let test_log_path = Path::new("./logs/reowolf_to_udp");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    let barrier = std::sync::Barrier::new(2);
 
    scope(|s| {
 
        s.spawn(|_| {
 
            barrier.wait();
 
            // reowolf thread
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let [p0, _] = c.new_udp_port(sock_addrs[0], sock_addrs[1]).unwrap();
 
            let [p0, _] = c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.put(p0, TEST_MSG.clone()).unwrap();
 
            c.sync(MS300).unwrap();
 
            barrier.wait();
 
        });
 
        s.spawn(|_| {
 
            barrier.wait();
 
            // udp thread
 
            let udp = std::net::UdpSocket::bind(sock_addrs[1]).unwrap();
 
            udp.connect(sock_addrs[0]).unwrap();
 
            let mut buf = new_u8_buffer(256);
 
            let len = udp.recv(&mut buf).unwrap();
 
@@ -735,25 +735,25 @@ fn reowolf_to_udp() {
 
}
 

	
 
#[test]
 
fn udp_to_reowolf() {
 
    let test_log_path = Path::new("./logs/udp_to_reowolf");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    let barrier = std::sync::Barrier::new(2);
 
    scope(|s| {
 
        s.spawn(|_| {
 
            barrier.wait();
 
            // reowolf thread
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let [_, p0] = c.new_udp_port(sock_addrs[0], sock_addrs[1]).unwrap();
 
            let [_, p0] = c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.get(p0).unwrap();
 
            c.sync(SEC5).unwrap();
 
            assert_eq!(c.gotten(p0).unwrap().as_slice(), TEST_MSG_BYTES);
 
            barrier.wait();
 
        });
 
        s.spawn(|_| {
 
            barrier.wait();
 
            // udp thread
 
            let udp = std::net::UdpSocket::bind(sock_addrs[1]).unwrap();
 
            udp.connect(sock_addrs[0]).unwrap();
 
            for _ in 0..15 {
 
@@ -767,25 +767,25 @@ fn udp_to_reowolf() {
 
}
 

	
 
#[test]
 
fn udp_reowolf_swap() {
 
    let test_log_path = Path::new("./logs/udp_reowolf_swap");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    let barrier = std::sync::Barrier::new(2);
 
    scope(|s| {
 
        s.spawn(|_| {
 
            barrier.wait();
 
            // reowolf thread
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let [p0, p1] = c.new_udp_port(sock_addrs[0], sock_addrs[1]).unwrap();
 
            let [p0, p1] = c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.put(p0, TEST_MSG.clone()).unwrap();
 
            c.get(p1).unwrap();
 
            c.sync(SEC5).unwrap();
 
            assert_eq!(c.gotten(p1).unwrap().as_slice(), TEST_MSG_BYTES);
 
            barrier.wait();
 
        });
 
        s.spawn(|_| {
 
            barrier.wait();
 
            // udp thread
 
            let udp = std::net::UdpSocket::bind(sock_addrs[1]).unwrap();
 
            udp.connect(sock_addrs[0]).unwrap();
0 comments (0 inline, 0 general)