Changeset - 1db6ec21a437
[Not reviewed]
0 5 0
Christopher Esterhuyse - 5 years ago 2020-07-21 12:13:33
christopher.esterhuyse@gmail.com
more elegant on-demand native branch indexing with a speculative var
5 files changed with 78 insertions and 50 deletions:
0 comments (0 inline, 0 general)
src/ffi/mod.rs
Show inline comments
 
@@ -272,7 +272,7 @@ pub unsafe extern "C" fn connector_add_net_port(
 
/// - where P is a Putter port that sends messages into the socket
 
/// - where G is a Getter port that recvs messages from the socket
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_add_udp_port(
 
pub unsafe extern "C" fn connector_add_udp_port_pair(
 
    connector: &mut Connector,
 
    putter: *mut PortId,
 
    getter: *mut PortId,
 
@@ -290,7 +290,7 @@ pub unsafe extern "C" fn connector_add_udp_port(
 
        Ok(local) => local,
 
        Err(errcode) => return errcode,
 
    };
 
    match connector.new_udp_port(local, peer) {
 
    match connector.new_udp_mediator_component(local, peer) {
 
        Ok([p, g]) => {
 
            if !putter.is_null() {
 
                putter.write(p);
src/ffi/socket_api.rs
Show inline comments
 
@@ -27,8 +27,8 @@ struct MaybeConnector {
 
    connector_bound: Option<ConnectorBound>,
 
}
 
#[derive(Default)]
 
struct CspStorage {
 
    fd_to_mc: HashMap<c_int, RwLock<MaybeConnector>>,
 
struct FdcStorage {
 
    fd_to_c: HashMap<c_int, RwLock<MaybeConnector>>,
 
    fd_allocator: FdAllocator,
 
}
 
fn trivial_peer_addr() -> SocketAddr {
 
@@ -61,7 +61,7 @@ impl FdAllocator {
 
    }
 
}
 
lazy_static::lazy_static! {
 
    static ref CSP_STORAGE: RwLock<CspStorage> = Default::default();
 
    static ref FDC_STORAGE: RwLock<FdcStorage> = Default::default();
 
}
 
impl MaybeConnector {
 
    fn connect(&mut self, peer_addr: SocketAddr) -> c_int {
 
@@ -110,19 +110,19 @@ impl MaybeConnector {
 
#[no_mangle]
 
pub extern "C" fn rw_socket(_domain: c_int, _type: c_int) -> c_int {
 
    // ignoring domain and type
 
    let mut w = if let Ok(w) = CSP_STORAGE.write() { w } else { return FD_LOCK_POISONED };
 
    let mut w = if let Ok(w) = FDC_STORAGE.write() { w } else { return FD_LOCK_POISONED };
 
    let fd = w.fd_allocator.alloc();
 
    let mc = MaybeConnector { peer_addr: trivial_peer_addr(), connector_bound: None };
 
    w.fd_to_mc.insert(fd, RwLock::new(mc));
 
    w.fd_to_c.insert(fd, RwLock::new(mc));
 
    fd
 
}
 

	
 
#[no_mangle]
 
pub extern "C" fn rw_close(fd: c_int, _how: c_int) -> c_int {
 
    // ignoring HOW
 
    let mut w = if let Ok(w) = CSP_STORAGE.write() { w } else { return FD_LOCK_POISONED };
 
    w.fd_allocator.free(fd);
 
    if w.fd_to_mc.remove(&fd).is_some() {
 
    let mut w = if let Ok(w) = FDC_STORAGE.write() { w } else { return FD_LOCK_POISONED };
 
    if w.fd_to_c.remove(&fd).is_some() {
 
        w.fd_allocator.free(fd);
 
        ERR_OK
 
    } else {
 
        CLOSE_FAIL
 
@@ -136,8 +136,8 @@ pub unsafe extern "C" fn rw_bind(
 
    _addr_len: usize,
 
) -> c_int {
 
    // assuming _domain is AF_INET and _type is SOCK_DGRAM
 
    let r = if let Ok(r) = CSP_STORAGE.read() { r } else { return FD_LOCK_POISONED };
 
    let mc = if let Some(mc) = r.fd_to_mc.get(&fd) { mc } else { return BAD_FD };
 
    let r = if let Ok(r) = FDC_STORAGE.read() { r } else { return FD_LOCK_POISONED };
 
    let mc = if let Some(mc) = r.fd_to_c.get(&fd) { mc } else { return BAD_FD };
 
    let mut mc = if let Ok(mc) = mc.write() { mc } else { return FD_LOCK_POISONED };
 
    let mc: &mut MaybeConnector = &mut mc;
 
    if mc.connector_bound.is_some() {
 
@@ -149,7 +149,8 @@ pub unsafe extern "C" fn rw_bind(
 
            crate::TRIVIAL_PD.clone(),
 
            Connector::random_id(),
 
        );
 
        let [putter, getter] = connector.new_udp_port(local_addr.read(), mc.peer_addr).unwrap();
 
        let [putter, getter] =
 
            connector.new_udp_mediator_component(local_addr.read(), mc.peer_addr).unwrap();
 
        Some(ConnectorBound { connector, putter, getter })
 
    };
 
    ERR_OK
 
@@ -162,8 +163,8 @@ pub unsafe extern "C" fn rw_connect(
 
    _address_len: usize,
 
) -> c_int {
 
    // assuming _domain is AF_INET and _type is SOCK_DGRAM
 
    let r = if let Ok(r) = CSP_STORAGE.read() { r } else { return FD_LOCK_POISONED };
 
    let mc = if let Some(mc) = r.fd_to_mc.get(&fd) { mc } else { return BAD_FD };
 
    let r = if let Ok(r) = FDC_STORAGE.read() { r } else { return FD_LOCK_POISONED };
 
    let mc = if let Some(mc) = r.fd_to_c.get(&fd) { mc } else { return BAD_FD };
 
    let mut mc = if let Ok(mc) = mc.write() { mc } else { return FD_LOCK_POISONED };
 
    let mc: &mut MaybeConnector = &mut mc;
 
    mc.connect(peer_addr.read())
 
@@ -177,8 +178,8 @@ pub unsafe extern "C" fn rw_send(
 
    _flags: c_int,
 
) -> isize {
 
    // ignoring flags
 
    let r = if let Ok(r) = CSP_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize };
 
    let mc = if let Some(mc) = r.fd_to_mc.get(&fd) { mc } else { return BAD_FD as isize };
 
    let r = if let Ok(r) = FDC_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize };
 
    let mc = if let Some(mc) = r.fd_to_c.get(&fd) { mc } else { return BAD_FD as isize };
 
    let mut mc = if let Ok(mc) = mc.write() { mc } else { return FD_LOCK_POISONED as isize };
 
    let mc: &mut MaybeConnector = &mut mc;
 
    mc.send(bytes_ptr, bytes_len)
 
@@ -192,8 +193,8 @@ pub unsafe extern "C" fn rw_recv(
 
    _flags: c_int,
 
) -> isize {
 
    // ignoring flags
 
    let r = if let Ok(r) = CSP_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize };
 
    let mc = if let Some(mc) = r.fd_to_mc.get(&fd) { mc } else { return BAD_FD as isize };
 
    let r = if let Ok(r) = FDC_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize };
 
    let mc = if let Some(mc) = r.fd_to_c.get(&fd) { mc } else { return BAD_FD as isize };
 
    let mut mc = if let Ok(mc) = mc.write() { mc } else { return FD_LOCK_POISONED as isize };
 
    let mc: &mut MaybeConnector = &mut mc;
 
    mc.recv(bytes_ptr, bytes_len)
 
@@ -208,8 +209,8 @@ pub unsafe extern "C" fn rw_sendto(
 
    peer_addr: *const SocketAddr,
 
    _addr_len: usize,
 
) -> isize {
 
    let r = if let Ok(r) = CSP_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize };
 
    let mc = if let Some(mc) = r.fd_to_mc.get(&fd) { mc } else { return BAD_FD as isize };
 
    let r = if let Ok(r) = FDC_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize };
 
    let mc = if let Some(mc) = r.fd_to_c.get(&fd) { mc } else { return BAD_FD as isize };
 
    let mut mc = if let Ok(mc) = mc.write() { mc } else { return FD_LOCK_POISONED as isize };
 
    let mc: &mut MaybeConnector = &mut mc;
 
    // copy currently connected peer addr
 
@@ -239,8 +240,8 @@ pub unsafe extern "C" fn rw_recvfrom(
 
    peer_addr: *const SocketAddr,
 
    _addr_len: usize,
 
) -> isize {
 
    let r = if let Ok(r) = CSP_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize };
 
    let mc = if let Some(mc) = r.fd_to_mc.get(&fd) { mc } else { return BAD_FD as isize };
 
    let r = if let Ok(r) = FDC_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize };
 
    let mc = if let Some(mc) = r.fd_to_c.get(&fd) { mc } else { return BAD_FD as isize };
 
    let mut mc = if let Ok(mc) = mc.write() { mc } else { return FD_LOCK_POISONED as isize };
 
    let mc: &mut MaybeConnector = &mut mc;
 
    // copy currently connected peer addr
src/runtime/communication.rs
Show inline comments
 
@@ -23,7 +23,7 @@ struct BranchingNative {
 
}
 
#[derive(Clone, Debug)]
 
struct NativeBranch {
 
    index: usize,
 
    batch_index: usize,
 
    gotten: HashMap<PortId, Payload>,
 
    to_get: HashSet<PortId>,
 
}
 
@@ -303,10 +303,9 @@ impl Connector {
 
            "Translating {} native batches into branches...",
 
            comm.native_batches.len()
 
        );
 
        let native_branch_spec_var = rctx.spec_var_stream.next();
 
        log!(cu.inner.logger, "Native branch spec var is {:?}", native_branch_spec_var);
 
        let mut native_spec_assign_stream = None;
 
        let mut branching_native = BranchingNative { branches: Default::default() };
 
        'native_branches: for ((native_branch, index), branch_spec_val) in
 
        'native_branches: for ((native_branch, batch_index), branch_spec_val) in
 
            comm.native_batches.drain(..).zip(0..).zip(SpecVal::iter_domain())
 
        {
 
            let NativeBatch { to_get, to_put } = native_branch;
 
@@ -323,31 +322,58 @@ impl Connector {
 
                for &port in cu.inner.native_ports.difference(&firing_ports) {
 
                    let var = cu.inner.port_info.spec_var_for(port);
 
                    if let Some(SpecVal::FIRING) = predicate.assigned.insert(var, SpecVal::SILENT) {
 
                        log!(cu.inner.logger, "Native branch index={} contains internal inconsistency wrt. {:?}. Skipping", index, var);
 
                        log!(cu.inner.logger, "Native branch index={} contains internal inconsistency wrt. {:?}. Skipping", batch_index, var);
 
                        continue 'native_branches;
 
                    }
 
                }
 
                // this branch is consistent. distinguish it with a unique var:val mapping and proceed
 
                predicate.inserted(native_branch_spec_var, branch_spec_val)
 
                predicate
 
            };
 
            log!(
 
                cu.inner.logger,
 
                "Native branch index={:?} has consistent {:?}",
 
                index,
 
                "Native batch_index={:?} has consistent {:?}",
 
                batch_index,
 
                &predicate
 
            );
 
            // send all outgoing messages (by buffering them)
 
            for (putter, payload) in to_put {
 
                let msg = SendPayloadMsg { predicate: predicate.clone(), payload };
 
                log!(cu.inner.logger, "Native branch {} sending msg {:?}", index, &msg);
 
                log!(cu.inner.logger, "Native branch {} sending msg {:?}", batch_index, &msg);
 
                rctx.getter_buffer.putter_add(cu, putter, msg);
 
            }
 
            let branch = NativeBranch { index, gotten: Default::default(), to_get };
 
            let branch = NativeBranch { batch_index, gotten: Default::default(), to_get };
 
            if let Some(old_branch) = branching_native.branches.remove(&predicate) {
 
                let (var, vals): (SpecVar, [SpecVal; 2]) = {
 
                    let (var, val_iter) = native_spec_assign_stream.get_or_insert_with(|| {
 
                        (rctx.spec_var_stream.next(), SpecVal::iter_domain())
 
                    });
 
                    let vals = [
 
                        val_iter.next().expect("Exhausted specval space!"),
 
                        val_iter.next().expect("Exhausted specval space!"),
 
                    ];
 
                    (*var, vals)
 
                };
 
                log!(
 
                    cu.inner.logger,
 
                    "Branch collision on {:?}. Going to distinguish them with a spec var {:?} ands vals {:?}",
 
                    &predicate,
 
                    var,
 
                    vals
 
                );
 
                branching_native
 
                    .branches
 
                    .insert(predicate.clone().inserted(var, vals[0]), old_branch);
 
                branching_native.branches.insert(predicate.inserted(var, vals[1]), branch);
 
            } else {
 
                // no branch collision
 
                branching_native.branches.insert(predicate, branch);
 
            }
 
        }
 
        for (predicate, branch) in branching_native.branches.iter() {
 
            if branch.is_ended() {
 
                log!(
 
                    cu.inner.logger,
 
                    "Native submitting solution for batch {} with {:?}",
 
                    index,
 
                    "Native submitting solution for branch {} with {:?}",
 
                    branch.batch_index,
 
                    &predicate
 
                );
 
                rctx.solution_storage.submit_and_digest_subtree_solution(
 
@@ -356,10 +382,6 @@ impl Connector {
 
                    predicate.clone(),
 
                );
 
            }
 
            if let Some(_) = branching_native.branches.insert(predicate, branch) {
 
                // thanks to the native_branch_spec_var, each batch has a distinct predicate
 
                unreachable!()
 
            }
 
        }
 
        // restore the invariant: !native_batches.is_empty()
 
        comm.native_batches.push(Default::default());
 
@@ -857,9 +879,9 @@ impl BranchingNative {
 
                &branch.gotten
 
            );
 
            if branch.is_ended() && branch_predicate.assigns_subset(solution_predicate) {
 
                let NativeBranch { index, gotten, .. } = branch;
 
                let NativeBranch { batch_index, gotten, .. } = branch;
 
                log!(logger, "Collapsed native has gotten {:?}", &gotten);
 
                return RoundOk { batch_index: index, gotten };
 
                return RoundOk { batch_index, gotten };
 
            }
 
        }
 
        panic!("Native had no branches matching pred {:?}", solution_predicate);
src/runtime/setup.rs
Show inline comments
 
@@ -25,7 +25,12 @@ impl Connector {
 
            })),
 
        }
 
    }
 
    pub fn new_udp_port(
 
    /// Conceptually, this returning [p0, g1] is sugar for:
 
    /// 1. create port pair [p0, g0]
 
    /// 2. create port pair [p1, g1]
 
    /// 3. create udp component with interface of moved ports [p1, g0]
 
    /// 4. return [p0, g1]
 
    pub fn new_udp_mediator_component(
 
        &mut self,
 
        local_addr: SocketAddr,
 
        peer_addr: SocketAddr,
src/runtime/tests.rs
Show inline comments
 
@@ -677,8 +677,8 @@ fn udp_self_connect() {
 
    let test_log_path = Path::new("./logs/udp_self_connect");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    let mut c = file_logged_connector(0, test_log_path);
 
    c.new_udp_port(sock_addrs[0], sock_addrs[1]).unwrap();
 
    c.new_udp_port(sock_addrs[1], sock_addrs[0]).unwrap();
 
    c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap();
 
    c.new_udp_mediator_component(sock_addrs[1], sock_addrs[0]).unwrap();
 
    c.connect(SEC1).unwrap();
 
}
 

	
 
@@ -687,7 +687,7 @@ fn solo_udp_put_success() {
 
    let test_log_path = Path::new("./logs/solo_udp_put_success");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [p0, _] = c.new_udp_port(sock_addrs[0], sock_addrs[1]).unwrap();
 
    let [p0, _] = c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap();
 
    c.connect(SEC1).unwrap();
 
    c.put(p0, TEST_MSG.clone()).unwrap();
 
    c.sync(MS300).unwrap();
 
@@ -698,7 +698,7 @@ fn solo_udp_get_fail() {
 
    let test_log_path = Path::new("./logs/solo_udp_get_fail");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    let mut c = file_logged_connector(0, test_log_path);
 
    let [_, p0] = c.new_udp_port(sock_addrs[0], sock_addrs[1]).unwrap();
 
    let [_, p0] = c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap();
 
    c.connect(SEC1).unwrap();
 
    c.get(p0).unwrap();
 
    c.sync(MS300).unwrap_err();
 
@@ -714,7 +714,7 @@ fn reowolf_to_udp() {
 
            barrier.wait();
 
            // reowolf thread
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let [p0, _] = c.new_udp_port(sock_addrs[0], sock_addrs[1]).unwrap();
 
            let [p0, _] = c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.put(p0, TEST_MSG.clone()).unwrap();
 
            c.sync(MS300).unwrap();
 
@@ -744,7 +744,7 @@ fn udp_to_reowolf() {
 
            barrier.wait();
 
            // reowolf thread
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let [_, p0] = c.new_udp_port(sock_addrs[0], sock_addrs[1]).unwrap();
 
            let [_, p0] = c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.get(p0).unwrap();
 
            c.sync(SEC5).unwrap();
 
@@ -776,7 +776,7 @@ fn udp_reowolf_swap() {
 
            barrier.wait();
 
            // reowolf thread
 
            let mut c = file_logged_connector(0, test_log_path);
 
            let [p0, p1] = c.new_udp_port(sock_addrs[0], sock_addrs[1]).unwrap();
 
            let [p0, p1] = c.new_udp_mediator_component(sock_addrs[0], sock_addrs[1]).unwrap();
 
            c.connect(SEC1).unwrap();
 
            c.put(p0, TEST_MSG.clone()).unwrap();
 
            c.get(p1).unwrap();
0 comments (0 inline, 0 general)