Changeset - 1db6ec21a437
[Not reviewed]
0 5 0
Christopher Esterhuyse - 5 years ago 2020-07-21 12:13:33
christopher.esterhuyse@gmail.com
more elegant on-demand native branch indexing with a speculative var
5 files changed with 78 insertions and 50 deletions:
0 comments (0 inline, 0 general)
src/ffi/mod.rs
Show inline comments
 
@@ -269,13 +269,13 @@ pub unsafe extern "C" fn connector_add_net_port(
 
/// - a utf-8 encoded BIND socket addresses (i.e., "local"),
 
/// - a utf-8 encoded CONNECT socket addresses (i.e., "peer"),
 
/// returns [P, G] via out pointers [putter, getter],
 
/// - where P is a Putter port that sends messages into the socket
 
/// - where G is a Getter port that recvs messages from the socket
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_add_udp_port(
 
pub unsafe extern "C" fn connector_add_udp_port_pair(
 
    connector: &mut Connector,
 
    putter: *mut PortId,
 
    getter: *mut PortId,
 
    local_addr_str_ptr: *const u8,
 
    local_addr_str_len: usize,
 
    peer_addr_str_ptr: *const u8,
 
@@ -287,13 +287,13 @@ pub unsafe extern "C" fn connector_add_udp_port(
 
        Err(errcode) => return errcode,
 
    };
 
    let peer = match tl_socketaddr_from_raw(peer_addr_str_ptr, peer_addr_str_len) {
 
        Ok(local) => local,
 
        Err(errcode) => return errcode,
 
    };
 
    match connector.new_udp_port(local, peer) {
 
    match connector.new_udp_mediator_component(local, peer) {
 
        Ok([p, g]) => {
 
            if !putter.is_null() {
 
                putter.write(p);
 
            }
 
            if !getter.is_null() {
 
                getter.write(g);
src/ffi/socket_api.rs
Show inline comments
 
@@ -24,14 +24,14 @@ struct MaybeConnector {
 
    // 2. putter and getter are ports in the native interface with the appropriate polarities
 
    // 3. peer_addr always mirrors connector's single udp socket's connect addr. both are overwritten together.
 
    peer_addr: SocketAddr,
 
    connector_bound: Option<ConnectorBound>,
 
}
 
#[derive(Default)]
 
struct CspStorage {
 
    fd_to_mc: HashMap<c_int, RwLock<MaybeConnector>>,
 
struct FdcStorage {
 
    fd_to_c: HashMap<c_int, RwLock<MaybeConnector>>,
 
    fd_allocator: FdAllocator,
 
}
 
fn trivial_peer_addr() -> SocketAddr {
 
    // SocketAddrV4::new isn't a constant-time func
 
    SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0))
 
}
 
@@ -58,13 +58,13 @@ impl FdAllocator {
 
    }
 
    fn free(&mut self, fd: c_int) {
 
        self.freed.push(fd);
 
    }
 
}
 
lazy_static::lazy_static! {
 
    static ref CSP_STORAGE: RwLock<CspStorage> = Default::default();
 
    static ref FDC_STORAGE: RwLock<FdcStorage> = Default::default();
 
}
 
impl MaybeConnector {
 
    fn connect(&mut self, peer_addr: SocketAddr) -> c_int {
 
        self.peer_addr = peer_addr;
 
        if let Some(ConnectorBound { connector, .. }) = &mut self.connector_bound {
 
            if connector.get_mut_udp_sock(0).unwrap().connect(peer_addr).is_err() {
 
@@ -107,25 +107,25 @@ impl MaybeConnector {
 
}
 
///////////////////////////////////////////////////////////////////
 

	
 
#[no_mangle]
 
pub extern "C" fn rw_socket(_domain: c_int, _type: c_int) -> c_int {
 
    // ignoring domain and type
 
    let mut w = if let Ok(w) = CSP_STORAGE.write() { w } else { return FD_LOCK_POISONED };
 
    let mut w = if let Ok(w) = FDC_STORAGE.write() { w } else { return FD_LOCK_POISONED };
 
    let fd = w.fd_allocator.alloc();
 
    let mc = MaybeConnector { peer_addr: trivial_peer_addr(), connector_bound: None };
 
    w.fd_to_mc.insert(fd, RwLock::new(mc));
 
    w.fd_to_c.insert(fd, RwLock::new(mc));
 
    fd
 
}
 

	
 
#[no_mangle]
 
pub extern "C" fn rw_close(fd: c_int, _how: c_int) -> c_int {
 
    // ignoring HOW
 
    let mut w = if let Ok(w) = CSP_STORAGE.write() { w } else { return FD_LOCK_POISONED };
 
    w.fd_allocator.free(fd);
 
    if w.fd_to_mc.remove(&fd).is_some() {
 
    let mut w = if let Ok(w) = FDC_STORAGE.write() { w } else { return FD_LOCK_POISONED };
 
    if w.fd_to_c.remove(&fd).is_some() {
 
        w.fd_allocator.free(fd);
 
        ERR_OK
 
    } else {
 
        CLOSE_FAIL
 
    }
 
}
 

	
 
@@ -133,40 +133,41 @@ pub extern "C" fn rw_close(fd: c_int, _how: c_int) -> c_int {
 
pub unsafe extern "C" fn rw_bind(
 
    fd: c_int,
 
    local_addr: *const SocketAddr,
 
    _addr_len: usize,
 
) -> c_int {
 
    // assuming _domain is AF_INET and _type is SOCK_DGRAM
 
    let r = if let Ok(r) = CSP_STORAGE.read() { r } else { return FD_LOCK_POISONED };
 
    let mc = if let Some(mc) = r.fd_to_mc.get(&fd) { mc } else { return BAD_FD };
 
    let r = if let Ok(r) = FDC_STORAGE.read() { r } else { return FD_LOCK_POISONED };
 
    let mc = if let Some(mc) = r.fd_to_c.get(&fd) { mc } else { return BAD_FD };
 
    let mut mc = if let Ok(mc) = mc.write() { mc } else { return FD_LOCK_POISONED };
 
    let mc: &mut MaybeConnector = &mut mc;
 
    if mc.connector_bound.is_some() {
 
        return WRONG_STATE;
 
    }
 
    mc.connector_bound = {
 
        let mut connector = Connector::new(
 
            Box::new(crate::DummyLogger),
 
            crate::TRIVIAL_PD.clone(),
 
            Connector::random_id(),
 
        );
 
        let [putter, getter] = connector.new_udp_port(local_addr.read(), mc.peer_addr).unwrap();
 
        let [putter, getter] =
 
            connector.new_udp_mediator_component(local_addr.read(), mc.peer_addr).unwrap();
 
        Some(ConnectorBound { connector, putter, getter })
 
    };
 
    ERR_OK
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn rw_connect(
 
    fd: c_int,
 
    peer_addr: *const SocketAddr,
 
    _address_len: usize,
 
) -> c_int {
 
    // assuming _domain is AF_INET and _type is SOCK_DGRAM
 
    let r = if let Ok(r) = CSP_STORAGE.read() { r } else { return FD_LOCK_POISONED };
 
    let mc = if let Some(mc) = r.fd_to_mc.get(&fd) { mc } else { return BAD_FD };
 
    let r = if let Ok(r) = FDC_STORAGE.read() { r } else { return FD_LOCK_POISONED };
 
    let mc = if let Some(mc) = r.fd_to_c.get(&fd) { mc } else { return BAD_FD };
 
    let mut mc = if let Ok(mc) = mc.write() { mc } else { return FD_LOCK_POISONED };
 
    let mc: &mut MaybeConnector = &mut mc;
 
    mc.connect(peer_addr.read())
 
}
 

	
 
#[no_mangle]
 
@@ -174,14 +175,14 @@ pub unsafe extern "C" fn rw_send(
 
    fd: c_int,
 
    bytes_ptr: *const c_void,
 
    bytes_len: usize,
 
    _flags: c_int,
 
) -> isize {
 
    // ignoring flags
 
    let r = if let Ok(r) = CSP_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize };
 
    let mc = if let Some(mc) = r.fd_to_mc.get(&fd) { mc } else { return BAD_FD as isize };
 
    let r = if let Ok(r) = FDC_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize };
 
    let mc = if let Some(mc) = r.fd_to_c.get(&fd) { mc } else { return BAD_FD as isize };
 
    let mut mc = if let Ok(mc) = mc.write() { mc } else { return FD_LOCK_POISONED as isize };
 
    let mc: &mut MaybeConnector = &mut mc;
 
    mc.send(bytes_ptr, bytes_len)
 
}
 

	
 
#[no_mangle]
 
@@ -189,14 +190,14 @@ pub unsafe extern "C" fn rw_recv(
 
    fd: c_int,
 
    bytes_ptr: *mut c_void,
 
    bytes_len: usize,
 
    _flags: c_int,
 
) -> isize {
 
    // ignoring flags
 
    let r = if let Ok(r) = CSP_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize };
 
    let mc = if let Some(mc) = r.fd_to_mc.get(&fd) { mc } else { return BAD_FD as isize };
 
    let r = if let Ok(r) = FDC_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize };
 
    let mc = if let Some(mc) = r.fd_to_c.get(&fd) { mc } else { return BAD_FD as isize };
 
    let mut mc = if let Ok(mc) = mc.write() { mc } else { return FD_LOCK_POISONED as isize };
 
    let mc: &mut MaybeConnector = &mut mc;
 
    mc.recv(bytes_ptr, bytes_len)
 
}
 

	
 
#[no_mangle]
 
@@ -205,14 +206,14 @@ pub unsafe extern "C" fn rw_sendto(
 
    bytes_ptr: *mut c_void,
 
    bytes_len: usize,
 
    _flags: c_int,
 
    peer_addr: *const SocketAddr,
 
    _addr_len: usize,
 
) -> isize {
 
    let r = if let Ok(r) = CSP_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize };
 
    let mc = if let Some(mc) = r.fd_to_mc.get(&fd) { mc } else { return BAD_FD as isize };
 
    let r = if let Ok(r) = FDC_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize };
 
    let mc = if let Some(mc) = r.fd_to_c.get(&fd) { mc } else { return BAD_FD as isize };
 
    let mut mc = if let Ok(mc) = mc.write() { mc } else { return FD_LOCK_POISONED as isize };
 
    let mc: &mut MaybeConnector = &mut mc;
 
    // copy currently connected peer addr
 
    let connected = mc.peer_addr;
 
    // connect to given peer_addr
 
    match mc.connect(peer_addr.read()) {
 
@@ -236,14 +237,14 @@ pub unsafe extern "C" fn rw_recvfrom(
 
    bytes_ptr: *mut c_void,
 
    bytes_len: usize,
 
    _flags: c_int,
 
    peer_addr: *const SocketAddr,
 
    _addr_len: usize,
 
) -> isize {
 
    let r = if let Ok(r) = CSP_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize };
 
    let mc = if let Some(mc) = r.fd_to_mc.get(&fd) { mc } else { return BAD_FD as isize };
 
    let r = if let Ok(r) = FDC_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize };
 
    let mc = if let Some(mc) = r.fd_to_c.get(&fd) { mc } else { return BAD_FD as isize };
 
    let mut mc = if let Ok(mc) = mc.write() { mc } else { return FD_LOCK_POISONED as isize };
 
    let mc: &mut MaybeConnector = &mut mc;
 
    // copy currently connected peer addr
 
    let connected = mc.peer_addr;
 
    // connect to given peer_addr
 
    match mc.connect(peer_addr.read()) {
src/runtime/communication.rs
Show inline comments
 
@@ -20,13 +20,13 @@ struct RoundCtx {
 
}
 
struct BranchingNative {
 
    branches: HashMap<Predicate, NativeBranch>,
 
}
 
#[derive(Clone, Debug)]
 
struct NativeBranch {
 
    index: usize,
 
    batch_index: usize,
 
    gotten: HashMap<PortId, Payload>,
 
    to_get: HashSet<PortId>,
 
}
 
#[derive(Debug)]
 
struct SolutionStorage {
 
    old_local: HashSet<Predicate>,
 
@@ -300,16 +300,15 @@ impl Connector {
 
        // Explore all native branches eagerly. Find solutions, buffer messages, etc.
 
        log!(
 
            cu.inner.logger,
 
            "Translating {} native batches into branches...",
 
            comm.native_batches.len()
 
        );
 
        let native_branch_spec_var = rctx.spec_var_stream.next();
 
        log!(cu.inner.logger, "Native branch spec var is {:?}", native_branch_spec_var);
 
        let mut native_spec_assign_stream = None;
 
        let mut branching_native = BranchingNative { branches: Default::default() };
 
        'native_branches: for ((native_branch, index), branch_spec_val) in
 
        'native_branches: for ((native_branch, batch_index), branch_spec_val) in
 
            comm.native_batches.drain(..).zip(0..).zip(SpecVal::iter_domain())
 
        {
 
            let NativeBatch { to_get, to_put } = native_branch;
 
            let predicate = {
 
                let mut predicate = Predicate::default();
 
                // assign trues for ports that fire
 
@@ -320,49 +319,72 @@ impl Connector {
 
                    predicate.assigned.insert(var, SpecVal::FIRING);
 
                }
 
                // assign falses for all silent (not firing) ports
 
                for &port in cu.inner.native_ports.difference(&firing_ports) {
 
                    let var = cu.inner.port_info.spec_var_for(port);
 
                    if let Some(SpecVal::FIRING) = predicate.assigned.insert(var, SpecVal::SILENT) {
 
                        log!(cu.inner.logger, "Native branch index={} contains internal inconsistency wrt. {:?}. Skipping", index, var);
 
                        log!(cu.inner.logger, "Native branch index={} contains internal inconsistency wrt. {:?}. Skipping", batch_index, var);
 
                        continue 'native_branches;
 
                    }
 
                }
 
                // this branch is consistent. distinguish it with a unique var:val mapping and proceed
 
                predicate.inserted(native_branch_spec_var, branch_spec_val)
 
                predicate
 
            };
 
            log!(
 
                cu.inner.logger,
 
                "Native branch index={:?} has consistent {:?}",
 
                index,
 
                "Native batch_index={:?} has consistent {:?}",
 
                batch_index,
 
                &predicate
 
            );
 
            // send all outgoing messages (by buffering them)
 
            for (putter, payload) in to_put {
 
                let msg = SendPayloadMsg { predicate: predicate.clone(), payload };
 
                log!(cu.inner.logger, "Native branch {} sending msg {:?}", index, &msg);
 
                log!(cu.inner.logger, "Native branch {} sending msg {:?}", batch_index, &msg);
 
                rctx.getter_buffer.putter_add(cu, putter, msg);
 
            }
 
            let branch = NativeBranch { index, gotten: Default::default(), to_get };
 
            let branch = NativeBranch { batch_index, gotten: Default::default(), to_get };
 
            if let Some(old_branch) = branching_native.branches.remove(&predicate) {
 
                let (var, vals): (SpecVar, [SpecVal; 2]) = {
 
                    let (var, val_iter) = native_spec_assign_stream.get_or_insert_with(|| {
 
                        (rctx.spec_var_stream.next(), SpecVal::iter_domain())
 
                    });
 
                    let vals = [
 
                        val_iter.next().expect("Exhausted specval space!"),
 
                        val_iter.next().expect("Exhausted specval space!"),
 
                    ];
 
                    (*var, vals)
 
                };
 
                log!(
 
                    cu.inner.logger,
 
                    "Branch collision on {:?}. Going to distinguish them with a spec var {:?} ands vals {:?}",
 
                    &predicate,
 
                    var,
 
                    vals
 
                );
 
                branching_native
 
                    .branches
 
                    .insert(predicate.clone().inserted(var, vals[0]), old_branch);
 
                branching_native.branches.insert(predicate.inserted(var, vals[1]), branch);
 
            } else {
 
                // no branch collision
 
                branching_native.branches.insert(predicate, branch);
 
            }
 
        }
 
        for (predicate, branch) in branching_native.branches.iter() {
 
            if branch.is_ended() {
 
                log!(
 
                    cu.inner.logger,
 
                    "Native submitting solution for batch {} with {:?}",
 
                    index,
 
                    "Native submitting solution for branch {} with {:?}",
 
                    branch.batch_index,
 
                    &predicate
 
                );
 
                rctx.solution_storage.submit_and_digest_subtree_solution(
 
                    &mut *cu.inner.logger,
 
                    SubtreeId::LocalComponent(ComponentId::Native),
 
                    predicate.clone(),
 
                );
 
            }
 
            if let Some(_) = branching_native.branches.insert(predicate, branch) {
 
                // thanks to the native_branch_spec_var, each batch has a distinct predicate
 
                unreachable!()
 
            }
 
        }
 
        // restore the invariant: !native_batches.is_empty()
 
        comm.native_batches.push(Default::default());
 

	
 
        comm.endpoint_manager
 
            .udp_endpoints_round_start(&mut *cu.inner.logger, &mut rctx.spec_var_stream);
 
@@ -854,15 +876,15 @@ impl BranchingNative {
 
                "Considering native branch {:?} with to_get {:?} gotten {:?}",
 
                &branch_predicate,
 
                &branch.to_get,
 
                &branch.gotten
 
            );
 
            if branch.is_ended() && branch_predicate.assigns_subset(solution_predicate) {
 
                let NativeBranch { index, gotten, .. } = branch;
 
                let NativeBranch { batch_index, gotten, .. } = branch;
 
                log!(logger, "Collapsed native has gotten {:?}", &gotten);
 
                return RoundOk { batch_index: index, gotten };
 
                return RoundOk { batch_index, gotten };
 
            }
 
        }
 
        panic!("Native had no branches matching pred {:?}", solution_predicate);
 
    }
 
}
 
impl BranchingProtoComponent {
src/runtime/setup.rs
Show inline comments
 
@@ -22,13 +22,18 @@ impl Connector {
 
            phased: ConnectorPhased::Setup(Box::new(ConnectorSetup {
 
                net_endpoint_setups: Default::default(),
 
                udp_endpoint_setups: Default::default(),
 
            })),
 
        }
 
    }
 
    pub fn new_udp_port(
 
    /// Conceptually, this returning [p0, g1] is sugar for:
 
    /// 1. create port pair [p0, g0]
 
    /// 2. create port pair [p1, g1]
 
    /// 3. create udp component with interface of moved ports [p1, g0]
 
    /// 4. return [p0, g1]
 
    pub fn new_udp_mediator_component(
 
        &mut self,
 
        local_addr: SocketAddr,
 
        peer_addr: SocketAddr,
 
    ) -> Result<[PortId; 2], WrongStateError> {
 
        let Self { unphased: cu, phased } = self;
 
        match phased {
src/runtime/tests.rs
Show inline comments
 
@@ -674,34 +674,34 @@ fn multi_recover() {
 

	
 
#[test]
 
fn udp_self_connect() {
 
    let test_log_path = Path::new("./logs/udp_self_connect");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    let mut c = file_logged_connector(0, test_log_path);
 
    c.new_udp_port(sock_addrs[0], sock_addrs[1]).unwrap();
 
    c.new_udp_port(sock_addrs[1], sock_addrs[0]).unwrap();
 
    c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap();
 
    c.new_udp_mediator_component(sock_addrs[1], sock_addrs[0]).unwrap();
 
    c.connect(SEC1).unwrap();
 
}
 

	
 
#[test]
 
fn solo_udp_put_success() {
 
    let test_log_path = Path::new("./logs/solo_udp_put_success");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [p0, _] = c.new_udp_port(sock_addrs[0], sock_addrs[1]).unwrap();
 
    let [p0, _] = c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap();
 
    c.connect(SEC1).unwrap();
 
    c.put(p0, TEST_MSG.clone()).unwrap();
 
    c.sync(MS300).unwrap();
 
}
 

	
 
#[test]
 
fn solo_udp_get_fail() {
 
    let test_log_path = Path::new("./logs/solo_udp_get_fail");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, p0] = c.new_udp_port(sock_addrs[0], sock_addrs[1]).unwrap();
 
    let [_, p0] = c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap();
 
    c.connect(SEC1).unwrap();
 
    c.get(p0).unwrap();
 
    c.sync(MS300).unwrap_err();
 
}
 

	
 
#[test]
 
@@ -711,13 +711,13 @@ fn reowolf_to_udp() {
 
    let barrier = std::sync::Barrier::new(2);
 
    scope(|s| {
 
        s.spawn(|_| {
 
            barrier.wait();
 
            // reowolf thread
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let [p0, _] = c.new_udp_port(sock_addrs[0], sock_addrs[1]).unwrap();
 
            let [p0, _] = c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.put(p0, TEST_MSG.clone()).unwrap();
 
            c.sync(MS300).unwrap();
 
            barrier.wait();
 
        });
 
        s.spawn(|_| {
 
@@ -741,13 +741,13 @@ fn udp_to_reowolf() {
 
    let barrier = std::sync::Barrier::new(2);
 
    scope(|s| {
 
        s.spawn(|_| {
 
            barrier.wait();
 
            // reowolf thread
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let [_, p0] = c.new_udp_port(sock_addrs[0], sock_addrs[1]).unwrap();
 
            let [_, p0] = c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.get(p0).unwrap();
 
            c.sync(SEC5).unwrap();
 
            assert_eq!(c.gotten(p0).unwrap().as_slice(), TEST_MSG_BYTES);
 
            barrier.wait();
 
        });
 
@@ -773,13 +773,13 @@ fn udp_reowolf_swap() {
 
    let barrier = std::sync::Barrier::new(2);
 
    scope(|s| {
 
        s.spawn(|_| {
 
            barrier.wait();
 
            // reowolf thread
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let [p0, p1] = c.new_udp_port(sock_addrs[0], sock_addrs[1]).unwrap();
 
            let [p0, p1] = c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.put(p0, TEST_MSG.clone()).unwrap();
 
            c.get(p1).unwrap();
 
            c.sync(SEC5).unwrap();
 
            assert_eq!(c.gotten(p1).unwrap().as_slice(), TEST_MSG_BYTES);
 
            barrier.wait();
0 comments (0 inline, 0 general)