Changeset - 1db6ec21a437
[Not reviewed]
0 5 0
Christopher Esterhuyse - 5 years ago 2020-07-21 12:13:33
christopher.esterhuyse@gmail.com
more elegant on-demand native branch indexing with a speculative var
5 files changed with 77 insertions and 49 deletions:
0 comments (0 inline, 0 general)
src/ffi/mod.rs
Show inline comments
 
@@ -179,211 +179,211 @@ pub unsafe extern "C" fn connector_new(pd: &Arc<ProtocolDescription>) -> *mut Co
 
    let c = Connector::new(Box::new(DummyLogger), pd.clone(), Connector::random_id());
 
    Box::into_raw(Box::new(c))
 
}
 

	
 
/// Destroys the given a pointer to the connector on the heap, freeing its resources.
 
/// Usable in {setup, communication} states.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_destroy(connector: *mut Connector) {
 
    drop(Box::from_raw(connector))
 
}
 

	
 
/// Given an initialized connector in setup or connecting state,
 
/// - Creates a new directed port pair with logical channel putter->getter,
 
/// - adds the ports to the native component's interface,
 
/// - and returns them using the given out pointers.
 
/// Usable in {setup, communication} states.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_add_port_pair(
 
    connector: &mut Connector,
 
    out_putter: *mut PortId,
 
    out_getter: *mut PortId,
 
) {
 
    let [o, i] = connector.new_port_pair();
 
    out_putter.write(o);
 
    out_getter.write(i);
 
}
 

	
 
/// Given
 
/// - an initialized connector in setup or connecting state,
 
/// - a string slice for the component's identifier in the connector's configured protocol description,
 
/// - a set of ports (represented as a slice; duplicates are ignored) in the native component's interface,
 
/// the connector creates a new (internal) protocol component C, such that the set of native ports are moved to C.
 
/// Usable in {setup, communication} states.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_add_component(
 
    connector: &mut Connector,
 
    ident_ptr: *const u8,
 
    ident_len: usize,
 
    ports_ptr: *const PortId,
 
    ports_len: usize,
 
) -> c_int {
 
    StoredError::tl_clear();
 
    match connector.add_component(
 
        &*slice_from_raw_parts(ident_ptr, ident_len),
 
        &*slice_from_raw_parts(ports_ptr, ports_len),
 
    ) {
 
        Ok(()) => ERR_OK,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            ERR_REOWOLF
 
        }
 
    }
 
}
 

	
 
/// Given
 
/// - an initialized connector in setup or connecting state,
 
/// - a utf-8 encoded socket address,
 
/// - the logical polarity of P,
 
/// - the "physical" polarity in {Active, Passive} of the endpoint through which P's peer will be discovered,
 
/// returns P, a port newly added to the native interface.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_add_net_port(
 
    connector: &mut Connector,
 
    port: *mut PortId,
 
    addr_str_ptr: *const u8,
 
    addr_str_len: usize,
 
    port_polarity: Polarity,
 
    endpoint_polarity: EndpointPolarity,
 
) -> c_int {
 
    StoredError::tl_clear();
 
    let addr = match tl_socketaddr_from_raw(addr_str_ptr, addr_str_len) {
 
        Ok(local) => local,
 
        Err(errcode) => return errcode,
 
    };
 
    match connector.new_net_port(port_polarity, addr, endpoint_polarity) {
 
        Ok(p) => {
 
            if !port.is_null() {
 
                port.write(p);
 
            }
 
            ERR_OK
 
        }
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            ERR_REOWOLF
 
        }
 
    }
 
}
 

	
 
/// Given
 
/// - an initialized connector in setup or connecting state,
 
/// - a utf-8 encoded BIND socket addresses (i.e., "local"),
 
/// - a utf-8 encoded CONNECT socket addresses (i.e., "peer"),
 
/// returns [P, G] via out pointers [putter, getter],
 
/// - where P is a Putter port that sends messages into the socket
 
/// - where G is a Getter port that recvs messages from the socket
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_add_udp_port(
 
pub unsafe extern "C" fn connector_add_udp_port_pair(
 
    connector: &mut Connector,
 
    putter: *mut PortId,
 
    getter: *mut PortId,
 
    local_addr_str_ptr: *const u8,
 
    local_addr_str_len: usize,
 
    peer_addr_str_ptr: *const u8,
 
    peer_addr_str_len: usize,
 
) -> c_int {
 
    StoredError::tl_clear();
 
    let local = match tl_socketaddr_from_raw(local_addr_str_ptr, local_addr_str_len) {
 
        Ok(local) => local,
 
        Err(errcode) => return errcode,
 
    };
 
    let peer = match tl_socketaddr_from_raw(peer_addr_str_ptr, peer_addr_str_len) {
 
        Ok(local) => local,
 
        Err(errcode) => return errcode,
 
    };
 
    match connector.new_udp_port(local, peer) {
 
    match connector.new_udp_mediator_component(local, peer) {
 
        Ok([p, g]) => {
 
            if !putter.is_null() {
 
                putter.write(p);
 
            }
 
            if !getter.is_null() {
 
                getter.write(g);
 
            }
 
            ERR_OK
 
        }
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            ERR_REOWOLF
 
        }
 
    }
 
}
 

	
 
/// Connects this connector to the distributed system of connectors reachable through endpoints,
 
/// Usable in setup state, and changes the state to communication.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_connect(
 
    connector: &mut Connector,
 
    timeout_millis: i64,
 
) -> c_int {
 
    StoredError::tl_clear();
 
    let option_timeout_millis: Option<u64> = TryFrom::try_from(timeout_millis).ok();
 
    let timeout = option_timeout_millis.map(Duration::from_millis);
 
    match connector.connect(timeout) {
 
        Ok(()) => ERR_OK,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            ERR_REOWOLF
 
        }
 
    }
 
}
 

	
 
// #[no_mangle]
 
// pub unsafe extern "C" fn connector_put_payload(
 
//     connector: &mut Connector,
 
//     port: PortId,
 
//     payload: *mut Payload,
 
// ) -> c_int {
 
//     match connector.put(port, payload.read()) {
 
//         Ok(()) => 0,
 
//         Err(err) => {
 
//             StoredError::tl_debug_store(&err);
 
//             -1
 
//         }
 
//     }
 
// }
 

	
 
// #[no_mangle]
 
// pub unsafe extern "C" fn connector_put_payload_cloning(
 
//     connector: &mut Connector,
 
//     port: PortId,
 
//     payload: &Payload,
 
// ) -> c_int {
 
//     match connector.put(port, payload.clone()) {
 
//         Ok(()) => 0,
 
//         Err(err) => {
 
//             StoredError::tl_debug_store(&err);
 
//             -1
 
//         }
 
//     }
 
// }
 

	
 
/// Convenience function combining the functionalities of
 
/// "payload_new" with "connector_put_payload".
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_put_bytes(
 
    connector: &mut Connector,
 
    port: PortId,
 
    bytes_ptr: *const u8,
 
    bytes_len: usize,
 
) -> c_int {
 
    StoredError::tl_clear();
 
    let bytes = &*slice_from_raw_parts(bytes_ptr, bytes_len);
 
    match connector.put(port, Payload::from(bytes)) {
 
        Ok(()) => ERR_OK,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            ERR_REOWOLF
 
        }
 
    }
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_get(connector: &mut Connector, port: PortId) -> c_int {
 
    StoredError::tl_clear();
 
    match connector.get(port) {
 
        Ok(()) => ERR_OK,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            ERR_REOWOLF
 
        }
 
    }
 
}
src/ffi/socket_api.rs
Show inline comments
 
use super::*;
 

	
 
use std::{
 
    collections::HashMap,
 
    ffi::c_void,
 
    net::{Ipv4Addr, SocketAddr, SocketAddrV4},
 
    os::raw::c_int,
 
    sync::RwLock,
 
};
 
///////////////////////////////////////////////////////////////////
 

	
 
struct FdAllocator {
 
    next: Option<c_int>,
 
    freed: Vec<c_int>,
 
}
 
struct ConnectorBound {
 
    connector: Connector,
 
    putter: PortId,
 
    getter: PortId,
 
}
 
struct MaybeConnector {
 
    // invariants:
 
    // 1. connector is a upd-socket singleton
 
    // 2. putter and getter are ports in the native interface with the appropriate polarities
 
    // 3. peer_addr always mirrors connector's single udp socket's connect addr. both are overwritten together.
 
    peer_addr: SocketAddr,
 
    connector_bound: Option<ConnectorBound>,
 
}
 
#[derive(Default)]
 
struct CspStorage {
 
    fd_to_mc: HashMap<c_int, RwLock<MaybeConnector>>,
 
struct FdcStorage {
 
    fd_to_c: HashMap<c_int, RwLock<MaybeConnector>>,
 
    fd_allocator: FdAllocator,
 
}
 
fn trivial_peer_addr() -> SocketAddr {
 
    // SocketAddrV4::new isn't a constant-time func
 
    SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0))
 
}
 
///////////////////////////////////////////////////////////////////
 

	
 
impl Default for FdAllocator {
 
    fn default() -> Self {
 
        Self {
 
            next: Some(0), // positive values used only
 
            freed: vec![],
 
        }
 
    }
 
}
 
impl FdAllocator {
 
    fn alloc(&mut self) -> c_int {
 
        if let Some(fd) = self.freed.pop() {
 
            return fd;
 
        }
 
        if let Some(fd) = self.next {
 
            self.next = fd.checked_add(1);
 
            return fd;
 
        }
 
        panic!("No more Connector FDs to allocate!")
 
    }
 
    fn free(&mut self, fd: c_int) {
 
        self.freed.push(fd);
 
    }
 
}
 
lazy_static::lazy_static! {
 
    static ref CSP_STORAGE: RwLock<CspStorage> = Default::default();
 
    static ref FDC_STORAGE: RwLock<FdcStorage> = Default::default();
 
}
 
impl MaybeConnector {
 
    fn connect(&mut self, peer_addr: SocketAddr) -> c_int {
 
        self.peer_addr = peer_addr;
 
        if let Some(ConnectorBound { connector, .. }) = &mut self.connector_bound {
 
            if connector.get_mut_udp_sock(0).unwrap().connect(peer_addr).is_err() {
 
                return CONNECT_FAILED;
 
            }
 
        }
 
        ERR_OK
 
    }
 
    unsafe fn send(&mut self, bytes_ptr: *const c_void, bytes_len: usize) -> isize {
 
        if let Some(ConnectorBound { connector, putter, .. }) = &mut self.connector_bound {
 
            match connector_put_bytes(connector, *putter, bytes_ptr as _, bytes_len) {
 
                ERR_OK => connector_sync(connector, -1),
 
                err => err as isize,
 
            }
 
        } else {
 
            WRONG_STATE as isize // not bound!
 
        }
 
    }
 
    unsafe fn recv(&mut self, bytes_ptr: *const c_void, bytes_len: usize) -> isize {
 
        if let Some(ConnectorBound { connector, getter, .. }) = &mut self.connector_bound {
 
            connector_get(connector, *getter);
 
            match connector_sync(connector, -1) {
 
                0 => {
 
                    // batch index 0 means OK
 
                    let slice = connector.gotten(*getter).unwrap().as_slice();
 
                    let copied_bytes = slice.len().min(bytes_len);
 
                    std::ptr::copy_nonoverlapping(
 
                        slice.as_ptr(),
 
                        bytes_ptr as *mut u8,
 
                        copied_bytes,
 
                    );
 
                    copied_bytes as isize
 
                }
 
                err => return err as isize,
 
            }
 
        } else {
 
            WRONG_STATE as isize // not bound!
 
        }
 
    }
 
}
 
///////////////////////////////////////////////////////////////////
 

	
 
#[no_mangle]
 
pub extern "C" fn rw_socket(_domain: c_int, _type: c_int) -> c_int {
 
    // ignoring domain and type
 
    let mut w = if let Ok(w) = CSP_STORAGE.write() { w } else { return FD_LOCK_POISONED };
 
    let mut w = if let Ok(w) = FDC_STORAGE.write() { w } else { return FD_LOCK_POISONED };
 
    let fd = w.fd_allocator.alloc();
 
    let mc = MaybeConnector { peer_addr: trivial_peer_addr(), connector_bound: None };
 
    w.fd_to_mc.insert(fd, RwLock::new(mc));
 
    w.fd_to_c.insert(fd, RwLock::new(mc));
 
    fd
 
}
 

	
 
#[no_mangle]
 
pub extern "C" fn rw_close(fd: c_int, _how: c_int) -> c_int {
 
    // ignoring HOW
 
    let mut w = if let Ok(w) = CSP_STORAGE.write() { w } else { return FD_LOCK_POISONED };
 
    let mut w = if let Ok(w) = FDC_STORAGE.write() { w } else { return FD_LOCK_POISONED };
 
    if w.fd_to_c.remove(&fd).is_some() {
 
        w.fd_allocator.free(fd);
 
    if w.fd_to_mc.remove(&fd).is_some() {
 
        ERR_OK
 
    } else {
 
        CLOSE_FAIL
 
    }
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn rw_bind(
 
    fd: c_int,
 
    local_addr: *const SocketAddr,
 
    _addr_len: usize,
 
) -> c_int {
 
    // assuming _domain is AF_INET and _type is SOCK_DGRAM
 
    let r = if let Ok(r) = CSP_STORAGE.read() { r } else { return FD_LOCK_POISONED };
 
    let mc = if let Some(mc) = r.fd_to_mc.get(&fd) { mc } else { return BAD_FD };
 
    let r = if let Ok(r) = FDC_STORAGE.read() { r } else { return FD_LOCK_POISONED };
 
    let mc = if let Some(mc) = r.fd_to_c.get(&fd) { mc } else { return BAD_FD };
 
    let mut mc = if let Ok(mc) = mc.write() { mc } else { return FD_LOCK_POISONED };
 
    let mc: &mut MaybeConnector = &mut mc;
 
    if mc.connector_bound.is_some() {
 
        return WRONG_STATE;
 
    }
 
    mc.connector_bound = {
 
        let mut connector = Connector::new(
 
            Box::new(crate::DummyLogger),
 
            crate::TRIVIAL_PD.clone(),
 
            Connector::random_id(),
 
        );
 
        let [putter, getter] = connector.new_udp_port(local_addr.read(), mc.peer_addr).unwrap();
 
        let [putter, getter] =
 
            connector.new_udp_mediator_component(local_addr.read(), mc.peer_addr).unwrap();
 
        Some(ConnectorBound { connector, putter, getter })
 
    };
 
    ERR_OK
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn rw_connect(
 
    fd: c_int,
 
    peer_addr: *const SocketAddr,
 
    _address_len: usize,
 
) -> c_int {
 
    // assuming _domain is AF_INET and _type is SOCK_DGRAM
 
    let r = if let Ok(r) = CSP_STORAGE.read() { r } else { return FD_LOCK_POISONED };
 
    let mc = if let Some(mc) = r.fd_to_mc.get(&fd) { mc } else { return BAD_FD };
 
    let r = if let Ok(r) = FDC_STORAGE.read() { r } else { return FD_LOCK_POISONED };
 
    let mc = if let Some(mc) = r.fd_to_c.get(&fd) { mc } else { return BAD_FD };
 
    let mut mc = if let Ok(mc) = mc.write() { mc } else { return FD_LOCK_POISONED };
 
    let mc: &mut MaybeConnector = &mut mc;
 
    mc.connect(peer_addr.read())
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn rw_send(
 
    fd: c_int,
 
    bytes_ptr: *const c_void,
 
    bytes_len: usize,
 
    _flags: c_int,
 
) -> isize {
 
    // ignoring flags
 
    let r = if let Ok(r) = CSP_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize };
 
    let mc = if let Some(mc) = r.fd_to_mc.get(&fd) { mc } else { return BAD_FD as isize };
 
    let r = if let Ok(r) = FDC_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize };
 
    let mc = if let Some(mc) = r.fd_to_c.get(&fd) { mc } else { return BAD_FD as isize };
 
    let mut mc = if let Ok(mc) = mc.write() { mc } else { return FD_LOCK_POISONED as isize };
 
    let mc: &mut MaybeConnector = &mut mc;
 
    mc.send(bytes_ptr, bytes_len)
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn rw_recv(
 
    fd: c_int,
 
    bytes_ptr: *mut c_void,
 
    bytes_len: usize,
 
    _flags: c_int,
 
) -> isize {
 
    // ignoring flags
 
    let r = if let Ok(r) = CSP_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize };
 
    let mc = if let Some(mc) = r.fd_to_mc.get(&fd) { mc } else { return BAD_FD as isize };
 
    let r = if let Ok(r) = FDC_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize };
 
    let mc = if let Some(mc) = r.fd_to_c.get(&fd) { mc } else { return BAD_FD as isize };
 
    let mut mc = if let Ok(mc) = mc.write() { mc } else { return FD_LOCK_POISONED as isize };
 
    let mc: &mut MaybeConnector = &mut mc;
 
    mc.recv(bytes_ptr, bytes_len)
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn rw_sendto(
 
    fd: c_int,
 
    bytes_ptr: *mut c_void,
 
    bytes_len: usize,
 
    _flags: c_int,
 
    peer_addr: *const SocketAddr,
 
    _addr_len: usize,
 
) -> isize {
 
    let r = if let Ok(r) = CSP_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize };
 
    let mc = if let Some(mc) = r.fd_to_mc.get(&fd) { mc } else { return BAD_FD as isize };
 
    let r = if let Ok(r) = FDC_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize };
 
    let mc = if let Some(mc) = r.fd_to_c.get(&fd) { mc } else { return BAD_FD as isize };
 
    let mut mc = if let Ok(mc) = mc.write() { mc } else { return FD_LOCK_POISONED as isize };
 
    let mc: &mut MaybeConnector = &mut mc;
 
    // copy currently connected peer addr
 
    let connected = mc.peer_addr;
 
    // connect to given peer_addr
 
    match mc.connect(peer_addr.read()) {
 
        e if e != ERR_OK => return e as isize,
 
        _ => {}
 
    }
 
    // send
 
    let ret = mc.send(bytes_ptr, bytes_len);
 
    // restore connected peer addr
 
    match mc.connect(connected) {
 
        e if e != ERR_OK => return e as isize,
 
        _ => {}
 
    }
 
    ret
 
}
 

	
 
#[no_mangle]
 
#[no_mangle]
 
pub unsafe extern "C" fn rw_recvfrom(
 
    fd: c_int,
 
    bytes_ptr: *mut c_void,
 
    bytes_len: usize,
 
    _flags: c_int,
 
    peer_addr: *const SocketAddr,
 
    _addr_len: usize,
 
) -> isize {
 
    let r = if let Ok(r) = CSP_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize };
 
    let mc = if let Some(mc) = r.fd_to_mc.get(&fd) { mc } else { return BAD_FD as isize };
 
    let r = if let Ok(r) = FDC_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize };
 
    let mc = if let Some(mc) = r.fd_to_c.get(&fd) { mc } else { return BAD_FD as isize };
 
    let mut mc = if let Ok(mc) = mc.write() { mc } else { return FD_LOCK_POISONED as isize };
 
    let mc: &mut MaybeConnector = &mut mc;
 
    // copy currently connected peer addr
 
    let connected = mc.peer_addr;
 
    // connect to given peer_addr
 
    match mc.connect(peer_addr.read()) {
 
        e if e != ERR_OK => return e as isize,
 
        _ => {}
 
    }
 
    // send
 
    let ret = mc.send(bytes_ptr, bytes_len);
 
    // restore connected peer addr
 
    match mc.connect(connected) {
 
        e if e != ERR_OK => return e as isize,
 
        _ => {}
 
    }
 
    ret
 
}
src/runtime/communication.rs
Show inline comments
 
use super::*;
 
use crate::common::*;
 
use core::ops::{Deref, DerefMut};
 

	
 
////////////////
 
// Guard protecting an incrementally unfoldable slice of MapTempGuard elements
 
struct MapTempsGuard<'a, K, V>(&'a mut [HashMap<K, V>]);
 
// Type protecting a temporary map; At the start and end of the Guard's lifetime, self.0.is_empty() must be true
 
struct MapTempGuard<'a, K, V>(&'a mut HashMap<K, V>);
 

	
 
#[derive(Default)]
 
struct GetterBuffer {
 
    getters_and_sends: Vec<(PortId, SendPayloadMsg)>,
 
}
 
struct RoundCtx {
 
    solution_storage: SolutionStorage,
 
    spec_var_stream: SpecVarStream,
 
    getter_buffer: GetterBuffer,
 
    deadline: Option<Instant>,
 
}
 
struct BranchingNative {
 
    branches: HashMap<Predicate, NativeBranch>,
 
}
 
#[derive(Clone, Debug)]
 
struct NativeBranch {
 
    index: usize,
 
    batch_index: usize,
 
    gotten: HashMap<PortId, Payload>,
 
    to_get: HashSet<PortId>,
 
}
 
#[derive(Debug)]
 
struct SolutionStorage {
 
    old_local: HashSet<Predicate>,
 
    new_local: HashSet<Predicate>,
 
    // this pair acts as SubtreeId -> HashSet<Predicate> which is friendlier to iteration
 
    subtree_solutions: Vec<HashSet<Predicate>>,
 
    subtree_id_to_index: HashMap<SubtreeId, usize>,
 
}
 
#[derive(Debug)]
 
struct BranchingProtoComponent {
 
    ports: HashSet<PortId>,
 
    branches: HashMap<Predicate, ProtoComponentBranch>,
 
}
 
#[derive(Debug, Clone)]
 
struct ProtoComponentBranch {
 
    state: ComponentState,
 
    inner: ProtoComponentBranchInner,
 
    ended: bool,
 
}
 
struct CyclicDrainer<'a, K: Eq + Hash, V> {
 
    input: &'a mut HashMap<K, V>,
 
    inner: CyclicDrainInner<'a, K, V>,
 
}
 
struct CyclicDrainInner<'a, K: Eq + Hash, V> {
 
    swap: &'a mut HashMap<K, V>,
 
    output: &'a mut HashMap<K, V>,
 
}
 
trait ReplaceBoolTrue {
 
    fn replace_with_true(&mut self) -> bool;
 
}
 
impl ReplaceBoolTrue for bool {
 
    fn replace_with_true(&mut self) -> bool {
 
        let was = *self;
 
        *self = true;
 
        !was
 
    }
 
}
 

	
 
////////////////
 
impl<'a, K, V> MapTempsGuard<'a, K, V> {
 
    fn reborrow(&mut self) -> MapTempsGuard<'_, K, V> {
 
        MapTempsGuard(self.0)
 
    }
 
    fn split_first_mut(self) -> (MapTempGuard<'a, K, V>, MapTempsGuard<'a, K, V>) {
 
        let (head, tail) = self.0.split_first_mut().expect("Cache exhausted");
 
        (MapTempGuard::new(head), MapTempsGuard(tail))
 
    }
 
}
 
impl<'a, K, V> MapTempGuard<'a, K, V> {
 
    fn new(map: &'a mut HashMap<K, V>) -> Self {
 
        assert!(map.is_empty()); // sanity check
 
        Self(map)
 
    }
 
}
 
impl<'a, K, V> Drop for MapTempGuard<'a, K, V> {
 
    fn drop(&mut self) {
 
        assert!(self.0.is_empty()); // sanity check
 
    }
 
}
 
impl<'a, K, V> Deref for MapTempGuard<'a, K, V> {
 
    type Target = HashMap<K, V>;
 
    fn deref(&self) -> &<Self as Deref>::Target {
 
        self.0
 
    }
 
}
 
impl<'a, K, V> DerefMut for MapTempGuard<'a, K, V> {
 
    fn deref_mut(&mut self) -> &mut <Self as Deref>::Target {
 
        self.0
 
    }
 
}
 
impl RoundCtxTrait for RoundCtx {
 
    fn get_deadline(&self) -> &Option<Instant> {
 
        &self.deadline
 
    }
 
    fn getter_add(&mut self, getter: PortId, msg: SendPayloadMsg) {
 
        self.getter_buffer.getter_add(getter, msg)
 
    }
 
}
 
impl Connector {
 
    fn get_comm_mut(&mut self) -> Option<&mut ConnectorCommunication> {
 
        if let ConnectorPhased::Communication(comm) = &mut self.phased {
 
            Some(comm)
 
        } else {
 
            None
 
        }
 
    }
 
    // #[cfg(ffi_socket_api)]
 
    pub(crate) fn get_mut_udp_sock(&mut self, index: usize) -> Option<&mut UdpSocket> {
 
        let sock = &mut self
 
            .get_comm_mut()?
 
            .endpoint_manager
 
            .udp_endpoint_store
 
            .endpoint_exts
 
@@ -210,249 +210,271 @@ impl Connector {
 
        cu: &mut ConnectorUnphased,
 
        comm: &mut ConnectorCommunication,
 
        timeout: Option<Duration>,
 
    ) -> Result<Option<RoundOk>, SyncError> {
 
        //////////////////////////////////
 
        use SyncError as Se;
 
        //////////////////////////////////
 
        log!(
 
            cu.inner.logger,
 
            "~~~ SYNC called with timeout {:?}; starting round {}",
 
            &timeout,
 
            comm.round_index
 
        );
 

	
 
        // 1. run all proto components to Nonsync blockers
 
        // NOTE: original components are immutable until Decision::Success
 
        let mut branching_proto_components =
 
            HashMap::<ProtoComponentId, BranchingProtoComponent>::default();
 
        let mut unrun_components: Vec<(ProtoComponentId, ProtoComponent)> =
 
            cu.proto_components.iter().map(|(&k, v)| (k, v.clone())).collect();
 
        log!(cu.inner.logger, "Nonsync running {} proto components...", unrun_components.len());
 
        // drains unrun_components, and populates branching_proto_components.
 
        while let Some((proto_component_id, mut component)) = unrun_components.pop() {
 
            // TODO coalesce fields
 
            log!(
 
                cu.inner.logger,
 
                "Nonsync running proto component with ID {:?}. {} to go after this",
 
                proto_component_id,
 
                unrun_components.len()
 
            );
 
            let mut ctx = NonsyncProtoContext {
 
                cu_inner: &mut cu.inner,
 
                proto_component_id,
 
                unrun_components: &mut unrun_components,
 
                proto_component_ports: &mut cu
 
                    .proto_components
 
                    .get_mut(&proto_component_id)
 
                    .unwrap() // unrun_components' keys originate from proto_components
 
                    .ports,
 
            };
 
            let blocker = component.state.nonsync_run(&mut ctx, &cu.proto_description);
 
            log!(
 
                cu.inner.logger,
 
                "proto component {:?} ran to nonsync blocker {:?}",
 
                proto_component_id,
 
                &blocker
 
            );
 
            use NonsyncBlocker as B;
 
            match blocker {
 
                B::ComponentExit => drop(component),
 
                B::Inconsistent => return Err(Se::InconsistentProtoComponent(proto_component_id)),
 
                B::SyncBlockStart => {
 
                    branching_proto_components
 
                        .insert(proto_component_id, BranchingProtoComponent::initial(component));
 
                }
 
            }
 
        }
 
        log!(
 
            cu.inner.logger,
 
            "All {} proto components are now done with Nonsync phase",
 
            branching_proto_components.len(),
 
        );
 

	
 
        // Create temp structures needed for the synchronous phase of the round
 
        let mut rctx = RoundCtx {
 
            solution_storage: {
 
                let n = std::iter::once(SubtreeId::LocalComponent(ComponentId::Native));
 
                let c = cu
 
                    .proto_components
 
                    .keys()
 
                    .map(|&id| SubtreeId::LocalComponent(ComponentId::Proto(id)));
 
                let e = comm
 
                    .neighborhood
 
                    .children
 
                    .iter()
 
                    .map(|&index| SubtreeId::NetEndpoint { index });
 
                let subtree_id_iter = n.chain(c).chain(e);
 
                log!(
 
                    cu.inner.logger,
 
                    "Children in subtree are: {:?}",
 
                    subtree_id_iter.clone().collect::<Vec<_>>()
 
                );
 
                SolutionStorage::new(subtree_id_iter)
 
            },
 
            spec_var_stream: cu.inner.id_manager.new_spec_var_stream(),
 
            getter_buffer: Default::default(),
 
            deadline: timeout.map(|to| Instant::now() + to),
 
        };
 
        log!(cu.inner.logger, "Round context structure initialized");
 

	
 
        // Explore all native branches eagerly. Find solutions, buffer messages, etc.
 
        log!(
 
            cu.inner.logger,
 
            "Translating {} native batches into branches...",
 
            comm.native_batches.len()
 
        );
 
        let native_branch_spec_var = rctx.spec_var_stream.next();
 
        log!(cu.inner.logger, "Native branch spec var is {:?}", native_branch_spec_var);
 
        let mut native_spec_assign_stream = None;
 
        let mut branching_native = BranchingNative { branches: Default::default() };
 
        'native_branches: for ((native_branch, index), branch_spec_val) in
 
        'native_branches: for ((native_branch, batch_index), branch_spec_val) in
 
            comm.native_batches.drain(..).zip(0..).zip(SpecVal::iter_domain())
 
        {
 
            let NativeBatch { to_get, to_put } = native_branch;
 
            let predicate = {
 
                let mut predicate = Predicate::default();
 
                // assign trues for ports that fire
 
                let firing_ports: HashSet<PortId> =
 
                    to_get.iter().chain(to_put.keys()).copied().collect();
 
                for &port in to_get.iter().chain(to_put.keys()) {
 
                    let var = cu.inner.port_info.spec_var_for(port);
 
                    predicate.assigned.insert(var, SpecVal::FIRING);
 
                }
 
                // assign falses for all silent (not firing) ports
 
                for &port in cu.inner.native_ports.difference(&firing_ports) {
 
                    let var = cu.inner.port_info.spec_var_for(port);
 
                    if let Some(SpecVal::FIRING) = predicate.assigned.insert(var, SpecVal::SILENT) {
 
                        log!(cu.inner.logger, "Native branch index={} contains internal inconsistency wrt. {:?}. Skipping", index, var);
 
                        log!(cu.inner.logger, "Native branch index={} contains internal inconsistency wrt. {:?}. Skipping", batch_index, var);
 
                        continue 'native_branches;
 
                    }
 
                }
 
                // this branch is consistent. distinguish it with a unique var:val mapping and proceed
 
                predicate.inserted(native_branch_spec_var, branch_spec_val)
 
                predicate
 
            };
 
            log!(
 
                cu.inner.logger,
 
                "Native branch index={:?} has consistent {:?}",
 
                index,
 
                "Native batch_index={:?} has consistent {:?}",
 
                batch_index,
 
                &predicate
 
            );
 
            // send all outgoing messages (by buffering them)
 
            for (putter, payload) in to_put {
 
                let msg = SendPayloadMsg { predicate: predicate.clone(), payload };
 
                log!(cu.inner.logger, "Native branch {} sending msg {:?}", index, &msg);
 
                log!(cu.inner.logger, "Native branch {} sending msg {:?}", batch_index, &msg);
 
                rctx.getter_buffer.putter_add(cu, putter, msg);
 
            }
 
            let branch = NativeBranch { index, gotten: Default::default(), to_get };
 
            let branch = NativeBranch { batch_index, gotten: Default::default(), to_get };
 
            if let Some(old_branch) = branching_native.branches.remove(&predicate) {
 
                let (var, vals): (SpecVar, [SpecVal; 2]) = {
 
                    let (var, val_iter) = native_spec_assign_stream.get_or_insert_with(|| {
 
                        (rctx.spec_var_stream.next(), SpecVal::iter_domain())
 
                    });
 
                    let vals = [
 
                        val_iter.next().expect("Exhausted specval space!"),
 
                        val_iter.next().expect("Exhausted specval space!"),
 
                    ];
 
                    (*var, vals)
 
                };
 
                log!(
 
                    cu.inner.logger,
 
                    "Branch collision on {:?}. Going to distinguish them with a spec var {:?} ands vals {:?}",
 
                    &predicate,
 
                    var,
 
                    vals
 
                );
 
                branching_native
 
                    .branches
 
                    .insert(predicate.clone().inserted(var, vals[0]), old_branch);
 
                branching_native.branches.insert(predicate.inserted(var, vals[1]), branch);
 
            } else {
 
                // no branch collision
 
                branching_native.branches.insert(predicate, branch);
 
            }
 
        }
 
        for (predicate, branch) in branching_native.branches.iter() {
 
            if branch.is_ended() {
 
                log!(
 
                    cu.inner.logger,
 
                    "Native submitting solution for batch {} with {:?}",
 
                    index,
 
                    "Native submitting solution for branch {} with {:?}",
 
                    branch.batch_index,
 
                    &predicate
 
                );
 
                rctx.solution_storage.submit_and_digest_subtree_solution(
 
                    &mut *cu.inner.logger,
 
                    SubtreeId::LocalComponent(ComponentId::Native),
 
                    predicate.clone(),
 
                );
 
            }
 
            if let Some(_) = branching_native.branches.insert(predicate, branch) {
 
                // thanks to the native_branch_spec_var, each batch has a distinct predicate
 
                unreachable!()
 
            }
 
        }
 
        // restore the invariant: !native_batches.is_empty()
 
        comm.native_batches.push(Default::default());
 

	
 
        comm.endpoint_manager
 
            .udp_endpoints_round_start(&mut *cu.inner.logger, &mut rctx.spec_var_stream);
 
        // Call to another big method; keep running this round until a distributed decision is reached
 
        let decision = Self::sync_reach_decision(
 
            cu,
 
            comm,
 
            &mut branching_native,
 
            &mut branching_proto_components,
 
            &mut rctx,
 
        )?;
 
        log!(cu.inner.logger, "Committing to decision {:?}!", &decision);
 
        comm.endpoint_manager.udp_endpoints_round_end(&mut *cu.inner.logger, &decision)?;
 

	
 
        // propagate the decision to children
 
        let msg = Msg::CommMsg(CommMsg {
 
            round_index: comm.round_index,
 
            contents: CommMsgContents::CommCtrl(CommCtrlMsg::Announce {
 
                decision: decision.clone(),
 
            }),
 
        });
 
        log!(
 
            cu.inner.logger,
 
            "Announcing decision {:?} through child endpoints {:?}",
 
            &msg,
 
            &comm.neighborhood.children
 
        );
 
        for &child in comm.neighborhood.children.iter() {
 
            comm.endpoint_manager.send_to_comms(child, &msg)?;
 
        }
 
        let ret = match decision {
 
            Decision::Failure => {
 
                // dropping {branching_proto_components, branching_native}
 
                Err(Se::RoundFailure)
 
            }
 
            Decision::Success(predicate) => {
 
                // commit changes to component states
 
                cu.proto_components.clear();
 
                cu.proto_components.extend(
 
                    // consume branching proto components
 
                    branching_proto_components
 
                        .into_iter()
 
                        .map(|(id, bpc)| (id, bpc.collapse_with(&predicate))),
 
                );
 
                log!(
 
                    cu.inner.logger,
 
                    "End round with (updated) component states {:?}",
 
                    cu.proto_components.keys()
 
                );
 
                // consume native
 
                Ok(Some(branching_native.collapse_with(&mut *cu.inner.logger, &predicate)))
 
            }
 
        };
 
        log!(cu.inner.logger, "Sync round ending! Cleaning up");
 
        ret
 
    }
 

	
 
    fn sync_reach_decision(
 
        cu: &mut ConnectorUnphased,
 
        comm: &mut ConnectorCommunication,
 
        branching_native: &mut BranchingNative,
 
        branching_proto_components: &mut HashMap<ProtoComponentId, BranchingProtoComponent>,
 
        rctx: &mut RoundCtx,
 
    ) -> Result<Decision, UnrecoverableSyncError> {
 
        let mut already_requested_failure = false;
 
        if branching_native.branches.is_empty() {
 
            log!(cu.inner.logger, "Native starts with no branches! Failure!");
 
            match comm.neighborhood.parent {
 
                Some(parent) => {
 
                    if already_requested_failure.replace_with_true() {
 
                        Self::request_failure(cu, comm, parent)?
 
                    } else {
 
                        log!(cu.inner.logger, "Already requested failure");
 
                    }
 
                }
 
                None => {
 
                    log!(cu.inner.logger, "No parent. Deciding on failure");
 
                    return Ok(Decision::Failure);
 
                }
 
            }
 
        }
 
        log!(cu.inner.logger, "Done translating native batches into branches");
 

	
 
        let mut pcb_temps_owner = <[HashMap<Predicate, ProtoComponentBranch>; 3]>::default();
 
        let mut pcb_temps = MapTempsGuard(&mut pcb_temps_owner);
 
        let mut bn_temp_owner = <HashMap<Predicate, NativeBranch>>::default();
 

	
 
        // run all proto components to their sync blocker
 
        log!(
 
            cu.inner.logger,
 
            "Running all {} proto components to their sync blocker...",
 
            branching_proto_components.len()
 
        );
 
@@ -764,195 +786,195 @@ impl BranchingNative {
 
                log!(
 
                    cu.inner.logger,
 
                    "skipping branch with {:?} that doesn't want the message (fastpath)",
 
                    &predicate
 
                );
 
                Self::insert_branch_merging(finished, predicate, branch);
 
                continue;
 
            }
 
            use AssignmentUnionResult as Aur;
 
            match predicate.assignment_union(&send_payload_msg.predicate) {
 
                Aur::Nonexistant => {
 
                    // this branch does not receive the message
 
                    log!(
 
                        cu.inner.logger,
 
                        "skipping branch with {:?} that doesn't want the message (slowpath)",
 
                        &predicate
 
                    );
 
                    Self::insert_branch_merging(finished, predicate, branch);
 
                }
 
                Aur::Equivalent | Aur::FormerNotLatter => {
 
                    // retain the existing predicate, but add this payload
 
                    feed_branch(&mut branch, &predicate);
 
                    log!(cu.inner.logger, "branch pred covers it! Accept the msg");
 
                    Self::insert_branch_merging(finished, predicate, branch);
 
                }
 
                Aur::LatterNotFormer => {
 
                    // fork branch, give fork the message and payload predicate. original branch untouched
 
                    let mut branch2 = branch.clone();
 
                    let predicate2 = send_payload_msg.predicate.clone();
 
                    feed_branch(&mut branch2, &predicate2);
 
                    log!(
 
                        cu.inner.logger,
 
                        "payload pred {:?} covers branch pred {:?}",
 
                        &predicate2,
 
                        &predicate
 
                    );
 
                    Self::insert_branch_merging(finished, predicate, branch);
 
                    Self::insert_branch_merging(finished, predicate2, branch2);
 
                }
 
                Aur::New(predicate2) => {
 
                    // fork branch, give fork the message and the new predicate. original branch untouched
 
                    let mut branch2 = branch.clone();
 
                    feed_branch(&mut branch2, &predicate2);
 
                    log!(
 
                        cu.inner.logger,
 
                        "new subsuming pred created {:?}. forking and feeding",
 
                        &predicate2
 
                    );
 
                    Self::insert_branch_merging(finished, predicate, branch);
 
                    Self::insert_branch_merging(finished, predicate2, branch2);
 
                }
 
            }
 
        }
 
    }
 
    fn insert_branch_merging(
 
        branches: &mut HashMap<Predicate, NativeBranch>,
 
        predicate: Predicate,
 
        mut branch: NativeBranch,
 
    ) {
 
        let e = branches.entry(predicate);
 
        use std::collections::hash_map::Entry;
 
        match e {
 
            Entry::Vacant(ev) => {
 
                // no existing branch present. We insert it no problem. (The most common case)
 
                ev.insert(branch);
 
            }
 
            Entry::Occupied(mut eo) => {
 
                // Oh dear, there is already a branch with this predicate.
 
                // Rather than choosing either branch, we MERGE them.
 
                // This means taking the UNION of their .gotten and the INTERSECTION of their .to_get
 
                let old = eo.get_mut();
 
                for (k, v) in branch.gotten.drain() {
 
                    if old.gotten.insert(k, v).is_none() {
 
                        // added a gotten element in `branch` not already in `old`
 
                        old.to_get.remove(&k);
 
                    }
 
                }
 
            }
 
        }
 
    }
 
    fn collapse_with(self, logger: &mut dyn Logger, solution_predicate: &Predicate) -> RoundOk {
 
        log!(
 
            logger,
 
            "Collapsing native with {} branch preds {:?}",
 
            self.branches.len(),
 
            self.branches.keys()
 
        );
 
        for (branch_predicate, branch) in self.branches {
 
            log!(
 
                logger,
 
                "Considering native branch {:?} with to_get {:?} gotten {:?}",
 
                &branch_predicate,
 
                &branch.to_get,
 
                &branch.gotten
 
            );
 
            if branch.is_ended() && branch_predicate.assigns_subset(solution_predicate) {
 
                let NativeBranch { index, gotten, .. } = branch;
 
                let NativeBranch { batch_index, gotten, .. } = branch;
 
                log!(logger, "Collapsed native has gotten {:?}", &gotten);
 
                return RoundOk { batch_index: index, gotten };
 
                return RoundOk { batch_index, gotten };
 
            }
 
        }
 
        panic!("Native had no branches matching pred {:?}", solution_predicate);
 
    }
 
}
 
impl BranchingProtoComponent {
 
    fn drain_branches_to_blocked(
 
        cd: CyclicDrainer<Predicate, ProtoComponentBranch>,
 
        cu: &mut ConnectorUnphased,
 
        rctx: &mut RoundCtx,
 
        proto_component_id: ProtoComponentId,
 
        ports: &HashSet<PortId>,
 
    ) -> Result<(), UnrecoverableSyncError> {
 
        cd.cyclic_drain(|mut predicate, mut branch, mut drainer| {
 
            let mut ctx = SyncProtoContext {
 
                cu_inner: &mut cu.inner,
 
                predicate: &predicate,
 
                branch_inner: &mut branch.inner,
 
            };
 
            let blocker = branch.state.sync_run(&mut ctx, &cu.proto_description);
 
            log!(
 
                cu.inner.logger,
 
                "Proto component with id {:?} branch with pred {:?} hit blocker {:?}",
 
                proto_component_id,
 
                &predicate,
 
                &blocker,
 
            );
 
            use SyncBlocker as B;
 
            match blocker {
 
                B::Inconsistent => drop((predicate, branch)), // EXPLICIT inconsistency
 
                B::NondetChoice { n } => {
 
                    let var = rctx.spec_var_stream.next();
 
                    for val in SpecVal::iter_domain().take(n as usize) {
 
                        let pred = predicate.clone().inserted(var, val);
 
                        let mut branch_n = branch.clone();
 
                        branch_n.inner.untaken_choice = Some(val.0);
 
                        drainer.add_input(pred, branch_n);
 
                    }
 
                }
 
                B::CouldntReadMsg(port) => {
 
                    // move to "blocked"
 
                    assert!(!branch.inner.inbox.contains_key(&port));
 
                    drainer.add_output(predicate, branch);
 
                }
 
                B::CouldntCheckFiring(port) => {
 
                    // sanity check
 
                    let var = cu.inner.port_info.spec_var_for(port);
 
                    assert!(predicate.query(var).is_none());
 
                    // keep forks in "unblocked"
 
                    drainer.add_input(predicate.clone().inserted(var, SpecVal::SILENT), branch.clone());
 
                    drainer.add_input(predicate.inserted(var, SpecVal::FIRING), branch);
 
                }
 
                B::PutMsg(putter, payload) => {
 
                    // sanity check
 
                    assert_eq!(Some(&Putter), cu.inner.port_info.polarities.get(&putter));
 
                    // overwrite assignment
 
                    let var = cu.inner.port_info.spec_var_for(putter);
 
                    let was = predicate.assigned.insert(var, SpecVal::FIRING);
 
                    if was == Some(SpecVal::SILENT) {
 
                        log!(cu.inner.logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, putter, var);
 
                        // discard forever
 
                        drop((predicate, branch));
 
                    } else {
 
                        // keep in "unblocked"
 
                        branch.inner.did_put_or_get.insert(putter);
 
                        log!(cu.inner.logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, putter, var);
 
                        let msg = SendPayloadMsg { predicate: predicate.clone(), payload };
 
                        rctx.getter_buffer.putter_add(cu, putter, msg);
 
                        drainer.add_input(predicate, branch);
 
                    }
 
                }
 
                B::SyncBlockEnd => {
 
                    // make concrete all variables
 
                    for port in ports.iter() {
 
                        let var = cu.inner.port_info.spec_var_for(*port);
 
                        let should_have_fired = branch.inner.did_put_or_get.contains(port);
 
                        let val = *predicate.assigned.entry(var).or_insert(SpecVal::SILENT);
 
                        let did_fire = val == SpecVal::FIRING;
 
                        if did_fire != should_have_fired {
 
                            log!(cu.inner.logger, "Inconsistent wrt. port {:?} var {:?} val {:?} did_fire={}, should_have_fired={}", port, var, val, did_fire, should_have_fired);
 
                            // IMPLICIT inconsistency
 
                            drop((predicate, branch));
 
                            return Ok(());
 
                        }
 
                    }
 
                    // submit solution for this component
 
                    let subtree_id = SubtreeId::LocalComponent(ComponentId::Proto(proto_component_id));
 
                    rctx.solution_storage.submit_and_digest_subtree_solution(
 
                        &mut *cu.inner.logger,
 
                        subtree_id,
 
                        predicate.clone(),
 
                    );
 
                    branch.ended = true;
 
                    // move to "blocked"
 
                    drainer.add_output(predicate, branch);
 
                }
src/runtime/setup.rs
Show inline comments
 
use crate::common::*;
 
use crate::runtime::*;
 

	
 
impl Connector {
 
    pub fn new(
 
        mut logger: Box<dyn Logger>,
 
        proto_description: Arc<ProtocolDescription>,
 
        connector_id: ConnectorId,
 
    ) -> Self {
 
        log!(&mut *logger, "Created with connector_id {:?}", connector_id);
 
        Self {
 
            unphased: ConnectorUnphased {
 
                proto_description,
 
                proto_components: Default::default(),
 
                inner: ConnectorUnphasedInner {
 
                    logger,
 
                    id_manager: IdManager::new(connector_id),
 
                    native_ports: Default::default(),
 
                    port_info: Default::default(),
 
                },
 
            },
 
            phased: ConnectorPhased::Setup(Box::new(ConnectorSetup {
 
                net_endpoint_setups: Default::default(),
 
                udp_endpoint_setups: Default::default(),
 
            })),
 
        }
 
    }
 
    pub fn new_udp_port(
 
    /// Conceptually, this returning [p0, g1] is sugar for:
 
    /// 1. create port pair [p0, g0]
 
    /// 2. create port pair [p1, g1]
 
    /// 3. create udp component with interface of moved ports [p1, g0]
 
    /// 4. return [p0, g1]
 
    pub fn new_udp_mediator_component(
 
        &mut self,
 
        local_addr: SocketAddr,
 
        peer_addr: SocketAddr,
 
    ) -> Result<[PortId; 2], WrongStateError> {
 
        let Self { unphased: cu, phased } = self;
 
        match phased {
 
            ConnectorPhased::Communication(..) => Err(WrongStateError),
 
            ConnectorPhased::Setup(setup) => {
 
                let udp_index = setup.udp_endpoint_setups.len();
 
                let mut npid = || cu.inner.id_manager.new_port_id();
 
                let [nin, nout, uin, uout] = [npid(), npid(), npid(), npid()];
 
                cu.inner.native_ports.insert(nin);
 
                cu.inner.native_ports.insert(nout);
 
                cu.inner.port_info.polarities.insert(nin, Getter);
 
                cu.inner.port_info.polarities.insert(nout, Putter);
 
                cu.inner.port_info.polarities.insert(uin, Getter);
 
                cu.inner.port_info.polarities.insert(uout, Putter);
 
                cu.inner.port_info.peers.insert(nin, uout);
 
                cu.inner.port_info.peers.insert(nout, uin);
 
                cu.inner.port_info.peers.insert(uin, nout);
 
                cu.inner.port_info.peers.insert(uout, nin);
 
                cu.inner.port_info.routes.insert(nin, Route::LocalComponent(ComponentId::Native));
 
                cu.inner.port_info.routes.insert(nout, Route::LocalComponent(ComponentId::Native));
 
                cu.inner.port_info.routes.insert(uin, Route::UdpEndpoint { index: udp_index });
 
                cu.inner.port_info.routes.insert(uout, Route::UdpEndpoint { index: udp_index });
 
                setup.udp_endpoint_setups.push(UdpEndpointSetup {
 
                    local_addr,
 
                    peer_addr,
 
                    getter_for_incoming: nin,
 
                });
 
                Ok([nout, nin])
 
            }
 
        }
 
    }
 
    pub fn new_net_port(
 
        &mut self,
 
        polarity: Polarity,
 
        sock_addr: SocketAddr,
 
        endpoint_polarity: EndpointPolarity,
 
    ) -> Result<PortId, WrongStateError> {
 
        let Self { unphased: cu, phased } = self;
 
        match phased {
 
            ConnectorPhased::Communication(..) => Err(WrongStateError),
 
            ConnectorPhased::Setup(setup) => {
 
                let local_port = cu.inner.id_manager.new_port_id();
 
                cu.inner.native_ports.insert(local_port);
 
                // {polarity, route} known. {peer} unknown.
 
                cu.inner.port_info.polarities.insert(local_port, polarity);
 
                cu.inner
 
                    .port_info
 
                    .routes
 
                    .insert(local_port, Route::LocalComponent(ComponentId::Native));
 
                log!(
 
                    cu.inner.logger,
 
                    "Added net port {:?} with polarity {:?} addr {:?} endpoint_polarity {:?}",
 
                    local_port,
 
                    polarity,
 
                    &sock_addr,
 
                    endpoint_polarity
 
                );
 
                setup.net_endpoint_setups.push(NetEndpointSetup {
 
                    sock_addr,
 
                    endpoint_polarity,
 
                    getter_for_incoming: local_port,
 
                });
 
                Ok(local_port)
 
            }
 
        }
 
    }
 
    pub fn connect(&mut self, timeout: Option<Duration>) -> Result<(), ConnectError> {
 
        use ConnectError as Ce;
 
        let Self { unphased: cu, phased } = self;
 
        match &phased {
 
            ConnectorPhased::Communication { .. } => {
 
                log!(cu.inner.logger, "Call to connecting in connected state");
 
                Err(Ce::AlreadyConnected)
 
            }
 
            ConnectorPhased::Setup(setup) => {
 
                log!(cu.inner.logger, "~~~ CONNECT called timeout {:?}", timeout);
 
                let deadline = timeout.map(|to| Instant::now() + to);
 
                // connect all endpoints in parallel; send and receive peer ids through ports
 
                let mut endpoint_manager = new_endpoint_manager(
 
                    &mut *cu.inner.logger,
 
                    &setup.net_endpoint_setups,
 
                    &setup.udp_endpoint_setups,
 
                    &mut cu.inner.port_info,
 
                    &deadline,
 
                )?;
 
                log!(
 
                    cu.inner.logger,
 
                    "Successfully connected {} endpoints",
 
                    endpoint_manager.net_endpoint_store.endpoint_exts.len()
 
                );
 
                // leader election and tree construction
 
                let neighborhood = init_neighborhood(
 
                    cu.inner.id_manager.connector_id,
src/runtime/tests.rs
Show inline comments
 
@@ -584,292 +584,292 @@ fn together() {
 
            let [p0, p1] = c.new_port_pair();
 
            let p2 = c.new_net_port(Getter, sock_addrs[1], Passive).unwrap();
 
            let p3 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap();
 
            let [p4, p5] = c.new_port_pair();
 
            c.add_component(b"together", &[p1, p2, p3, p4]).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.put(p0, TEST_MSG.clone()).unwrap();
 
            c.get(p5).unwrap();
 
            c.sync(MS300).unwrap();
 
            c.gotten(p5).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn native_batch_distinguish() {
 
    let test_log_path = Path::new("./logs/native_batch_distinguish");
 
    let mut c = file_logged_connector(0, test_log_path);
 
    c.connect(SEC1).unwrap();
 
    c.next_batch().unwrap();
 
    c.sync(SEC1).unwrap();
 
}
 

	
 
#[test]
 
fn multirounds() {
 
    let test_log_path = Path::new("./logs/multirounds");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let p0 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap();
 
            let p1 = c.new_net_port(Getter, sock_addrs[1], Passive).unwrap();
 
            c.connect(SEC1).unwrap();
 
            for _ in 0..10 {
 
                c.put(p0, TEST_MSG.clone()).unwrap();
 
                c.get(p1).unwrap();
 
                c.sync(SEC1).unwrap();
 
            }
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let p0 = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
            let p1 = c.new_net_port(Putter, sock_addrs[1], Active).unwrap();
 
            c.connect(SEC1).unwrap();
 
            for _ in 0..10 {
 
                c.get(p0).unwrap();
 
                c.put(p1, TEST_MSG.clone()).unwrap();
 
                c.sync(SEC1).unwrap();
 
            }
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn multi_recover() {
 
    let test_log_path = Path::new("./logs/multi_recover");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    let success_iter = [true, false].iter().copied().cycle().take(10);
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let p0 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap();
 
            let p1 = c.new_net_port(Getter, sock_addrs[1], Passive).unwrap();
 
            c.connect(SEC1).unwrap();
 
            for succeeds in success_iter.clone() {
 
                c.put(p0, TEST_MSG.clone()).unwrap();
 
                if succeeds {
 
                    c.get(p1).unwrap();
 
                }
 
                let res = c.sync(MS300);
 
                assert_eq!(res.is_ok(), succeeds);
 
            }
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let p0 = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
            let p1 = c.new_net_port(Putter, sock_addrs[1], Active).unwrap();
 
            c.connect(SEC1).unwrap();
 
            for succeeds in success_iter.clone() {
 
                c.get(p0).unwrap();
 
                c.put(p1, TEST_MSG.clone()).unwrap();
 
                let res = c.sync(MS300);
 
                assert_eq!(res.is_ok(), succeeds);
 
            }
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn udp_self_connect() {
 
    let test_log_path = Path::new("./logs/udp_self_connect");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    let mut c = file_logged_connector(0, test_log_path);
 
    c.new_udp_port(sock_addrs[0], sock_addrs[1]).unwrap();
 
    c.new_udp_port(sock_addrs[1], sock_addrs[0]).unwrap();
 
    c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap();
 
    c.new_udp_mediator_component(sock_addrs[1], sock_addrs[0]).unwrap();
 
    c.connect(SEC1).unwrap();
 
}
 

	
 
#[test]
 
fn solo_udp_put_success() {
 
    let test_log_path = Path::new("./logs/solo_udp_put_success");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [p0, _] = c.new_udp_port(sock_addrs[0], sock_addrs[1]).unwrap();
 
    let [p0, _] = c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap();
 
    c.connect(SEC1).unwrap();
 
    c.put(p0, TEST_MSG.clone()).unwrap();
 
    c.sync(MS300).unwrap();
 
}
 

	
 
#[test]
 
fn solo_udp_get_fail() {
 
    let test_log_path = Path::new("./logs/solo_udp_get_fail");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, p0] = c.new_udp_port(sock_addrs[0], sock_addrs[1]).unwrap();
 
    let [_, p0] = c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap();
 
    c.connect(SEC1).unwrap();
 
    c.get(p0).unwrap();
 
    c.sync(MS300).unwrap_err();
 
}
 

	
 
#[test]
 
fn reowolf_to_udp() {
 
    let test_log_path = Path::new("./logs/reowolf_to_udp");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    let barrier = std::sync::Barrier::new(2);
 
    scope(|s| {
 
        s.spawn(|_| {
 
            barrier.wait();
 
            // reowolf thread
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let [p0, _] = c.new_udp_port(sock_addrs[0], sock_addrs[1]).unwrap();
 
            let [p0, _] = c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.put(p0, TEST_MSG.clone()).unwrap();
 
            c.sync(MS300).unwrap();
 
            barrier.wait();
 
        });
 
        s.spawn(|_| {
 
            barrier.wait();
 
            // udp thread
 
            let udp = std::net::UdpSocket::bind(sock_addrs[1]).unwrap();
 
            udp.connect(sock_addrs[0]).unwrap();
 
            let mut buf = new_u8_buffer(256);
 
            let len = udp.recv(&mut buf).unwrap();
 
            assert_eq!(TEST_MSG_BYTES, &buf[0..len]);
 
            barrier.wait();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn udp_to_reowolf() {
 
    let test_log_path = Path::new("./logs/udp_to_reowolf");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    let barrier = std::sync::Barrier::new(2);
 
    scope(|s| {
 
        s.spawn(|_| {
 
            barrier.wait();
 
            // reowolf thread
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let [_, p0] = c.new_udp_port(sock_addrs[0], sock_addrs[1]).unwrap();
 
            let [_, p0] = c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.get(p0).unwrap();
 
            c.sync(SEC5).unwrap();
 
            assert_eq!(c.gotten(p0).unwrap().as_slice(), TEST_MSG_BYTES);
 
            barrier.wait();
 
        });
 
        s.spawn(|_| {
 
            barrier.wait();
 
            // udp thread
 
            let udp = std::net::UdpSocket::bind(sock_addrs[1]).unwrap();
 
            udp.connect(sock_addrs[0]).unwrap();
 
            for _ in 0..15 {
 
                udp.send(TEST_MSG_BYTES).unwrap();
 
                std::thread::sleep(MS100.unwrap());
 
            }
 
            barrier.wait();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn udp_reowolf_swap() {
 
    let test_log_path = Path::new("./logs/udp_reowolf_swap");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    let barrier = std::sync::Barrier::new(2);
 
    scope(|s| {
 
        s.spawn(|_| {
 
            barrier.wait();
 
            // reowolf thread
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let [p0, p1] = c.new_udp_port(sock_addrs[0], sock_addrs[1]).unwrap();
 
            let [p0, p1] = c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.put(p0, TEST_MSG.clone()).unwrap();
 
            c.get(p1).unwrap();
 
            c.sync(SEC5).unwrap();
 
            assert_eq!(c.gotten(p1).unwrap().as_slice(), TEST_MSG_BYTES);
 
            barrier.wait();
 
        });
 
        s.spawn(|_| {
 
            barrier.wait();
 
            // udp thread
 
            let udp = std::net::UdpSocket::bind(sock_addrs[1]).unwrap();
 
            udp.connect(sock_addrs[0]).unwrap();
 
            let mut buf = new_u8_buffer(256);
 
            for _ in 0..5 {
 
                std::thread::sleep(Duration::from_millis(60));
 
                udp.send(TEST_MSG_BYTES).unwrap();
 
            }
 
            let len = udp.recv(&mut buf).unwrap();
 
            assert_eq!(TEST_MSG_BYTES, &buf[0..len]);
 
            barrier.wait();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn example_pres_3() {
 
    let test_log_path = Path::new("./logs/example_pres_3");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            // "amy"
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let p0 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap();
 
            let p1 = c.new_net_port(Putter, sock_addrs[1], Active).unwrap();
 
            c.connect(SEC1).unwrap();
 
            // put {A} and FAIL
 
            c.put(p0, TEST_MSG.clone()).unwrap();
 
            c.sync(SEC1).unwrap_err();
 
            // put {B} and FAIL
 
            c.put(p1, TEST_MSG.clone()).unwrap();
 
            c.sync(SEC1).unwrap_err();
 
            // put {A, B} and SUCCEED
 
            c.put(p0, TEST_MSG.clone()).unwrap();
 
            c.put(p1, TEST_MSG.clone()).unwrap();
 
            c.sync(SEC1).unwrap();
 
        });
 
        s.spawn(|_| {
 
            // "bob"
 
            let mut c = file_logged_connector(1, test_log_path);
 
            let p0 = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
            let p1 = c.new_net_port(Getter, sock_addrs[1], Passive).unwrap();
 
            c.connect(SEC1).unwrap();
 
            for _ in 0..2 {
 
                // get {A, B} and FAIL
 
                c.get(p0).unwrap();
 
                c.get(p1).unwrap();
 
                c.sync(SEC1).unwrap_err();
 
            }
 
            // get {A, B} and SUCCEED
 
            c.get(p0).unwrap();
 
            c.get(p1).unwrap();
 
            c.sync(SEC1).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn ac_not_b() {
 
    let test_log_path = Path::new("./logs/ac_not_b");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            // "amy"
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let p0 = c.new_net_port(Putter, sock_addrs[0], Active).unwrap();
 
            let p1 = c.new_net_port(Putter, sock_addrs[1], Active).unwrap();
 
            c.connect(SEC1).unwrap();
 

	
 
            // put both A and B
 
            c.put(p0, TEST_MSG.clone()).unwrap();
 
            c.put(p1, TEST_MSG.clone()).unwrap();
 
            c.sync(SEC1).unwrap_err();
 
        });
 
        s.spawn(|_| {
 
            // "bob"
 
            let pdl = b"
 
            primitive ac_not_b(in a, in b, out c){
 
                // forward A to C but keep B silent
 
                synchronous{ put(c, get(a)); }
 
            }";
 
            let pd = Arc::new(reowolf::ProtocolDescription::parse(pdl).unwrap());
 
            let mut c = file_logged_configured_connector(1, test_log_path, pd);
 
            let p0 = c.new_net_port(Getter, sock_addrs[0], Passive).unwrap();
 
            let p1 = c.new_net_port(Getter, sock_addrs[1], Passive).unwrap();
0 comments (0 inline, 0 general)