Changeset - 3801521d486d
[Not reviewed]
0 7 0
Christopher Esterhuyse - 5 years ago 2020-07-23 11:30:09
christopher.esterhuyse@gmail.com
more pseudo-socket FFI. now correctly supporting recvfrom and sendto. testing on linux todo
7 files changed with 123 insertions and 94 deletions:
0 comments (0 inline, 0 general)
Cargo.toml
Show inline comments
 
[package]
 
name = "reowolf_rs"
 
version = "0.1.4"
 
authors = [
 
	"Christopher Esterhuyse <esterhuy@cwi.nl, christopher.esterhuyse@gmail.com>",
 
	"Hans-Dieter Hiep <hdh@cwi.nl>"
 
]
 
edition = "2018"
 

	
 
[dependencies]
 
# convenience macros
 
maplit = "1.0.2"
 
derive_more = "0.99.2"
 

	
 
# runtime
 
bincode = "1.3.1"
 
serde = { version = "1.0.114", features = ["derive"] }
 
getrandom = "0.1.14" # tiny crate. used to guess controller-id
 

	
 
# network
 
mio = { version = "0.7.0", package = "mio", features = ["udp", "tcp", "os-poll"] }
 
socket2 = { version = "0.3.12", optional = true }
 

	
 
# protocol
 
backtrace = "0.3"
 
lazy_static = "1.4.0"
 

	
 
# ffi
 

	
 
# socket ffi
 
libc = { version = "^0.2", optional = true }
 
os_socketaddr = { version = "0.1.0", optional = true }
 

	
 
[dev-dependencies]
 
# test-generator = "0.3.0"
 
crossbeam-utils = "0.7.2"
 
lazy_static = "1.4.0"
 

	
 
[lib]
 
# compile target: dynamically linked library using C ABI
 
crate-type = ["cdylib"]
 

	
 
[features]
 
default = ["ffi", "session_optimization", "ffi_pseudo_socket_api"]
 
default = ["ffi", "session_optimization"]
 
ffi = [] # see src/ffi/mod.rs
 
ffi_pseudo_socket_api = ["ffi", "libc", "os_socketaddr"]# see src/ffi/pseudo_socket_api.rs
 
endpoint_logging = [] # see src/macros.rs
 
session_optimization = [] # see src/runtime/setup.rs
 
\ No newline at end of file
src/ffi/mod.rs
Show inline comments
 
use crate::{common::*, runtime::*};
 
use core::{cell::RefCell, convert::TryFrom};
 
use std::os::raw::c_int;
 
use std::slice::from_raw_parts as slice_from_raw_parts;
 

	
 
#[cfg(all(target_os = "linux", feature = "ffi_pseudo_socket_api"))]
 
pub mod pseudo_socket_api;
 
// #[cfg(all(target_os = "linux", feature = "ffi_pseudo_socket_api"))]
 
// pub mod pseudo_socket_api;
 

	
 
// Temporary simplfication: ignore ipv6. To revert, just refactor this structure and its usages
 
#[repr(C)]
 
pub struct FfiSocketAddr {
 
    pub ipv4: [u8; 4],
 
    pub port: u16,
 
}
 
impl Into<SocketAddr> for FfiSocketAddr {
 
    fn into(self) -> SocketAddr {
 
        (self.ipv4, self.port).into()
 
    }
 
}
 

	
 
///////////////////////////////////////////////
 
#[derive(Default)]
 
struct StoredError {
 
    // invariant: len is zero IFF its occupied
 
    // contents are 1+ bytes because we also store the NULL TERMINATOR
 
    buf: Vec<u8>,
 
}
 
impl StoredError {
 
    const NULL_TERMINATOR: u8 = 0;
 
    fn clear(&mut self) {
 
        // no null terminator either!
 
        self.buf.clear();
 
    }
 
    fn debug_store<E: Debug>(&mut self, error: &E) {
 
        let _ = write!(&mut self.buf, "{:?}", error);
 
        self.buf.push(Self::NULL_TERMINATOR);
 
    }
 
    fn tl_debug_store<E: Debug>(error: &E) {
 
        STORED_ERROR.with(|stored_error| {
 
            let mut stored_error = stored_error.borrow_mut();
 
            stored_error.clear();
 
            stored_error.debug_store(error);
 
        })
 
    }
 
    fn bytes_store(&mut self, bytes: &[u8]) {
 
        let _ = self.buf.write_all(bytes);
 
        self.buf.push(Self::NULL_TERMINATOR);
 
    }
 
    fn tl_bytes_store(bytes: &[u8]) {
 
        STORED_ERROR.with(|stored_error| {
 
            let mut stored_error = stored_error.borrow_mut();
 
            stored_error.clear();
 
            stored_error.bytes_store(bytes);
 
        })
 
    }
 
    fn tl_clear() {
 
        STORED_ERROR.with(|stored_error| {
 
            let mut stored_error = stored_error.borrow_mut();
 
            stored_error.clear();
 
        })
 
    }
 
    fn tl_bytes_peek() -> (*const u8, usize) {
 
        STORED_ERROR.with(|stored_error| {
 
            let stored_error = stored_error.borrow();
 
            match stored_error.buf.len() {
 
                0 => (core::ptr::null(), 0), // no error!
 
                n => {
 
                    // stores an error of length n-1 AND a NULL TERMINATOR
 
                    (stored_error.buf.as_ptr(), n - 1)
 
                }
 
            }
 
        })
 
    }
 
}
 
thread_local! {
 
    static STORED_ERROR: RefCell<StoredError> = RefCell::new(StoredError::default());
 
}
 

	
 
pub const ERR_OK: c_int = 0;
 
pub const ERR_REOWOLF: c_int = -1;
 
pub const WRONG_STATE: c_int = -2;
 
pub const CC_MAP_LOCK_POISONED: c_int = -3;
 
pub const CLOSE_FAIL: c_int = -4;
 
pub const BAD_FD: c_int = -5;
 
pub const CONNECT_FAILED: c_int = -6;
 
pub const WOULD_BLOCK: c_int = -7;
 
pub const BAD_SOCKADDR: c_int = -8;
 
pub const SEND_BEFORE_CONNECT: c_int = -9;
 

	
 
///////////////////// REOWOLF //////////////////////////
 

	
 
/// Returns length (via out pointer) and pointer (via return value) of the last Reowolf error.
 
/// - pointer is NULL iff there was no last error
 
/// - data at pointer is null-delimited
 
/// - len does NOT include the length of the null-delimiter
 
/// If len is NULL, it will not written to.
 
#[no_mangle]
 
pub unsafe extern "C" fn reowolf_error_peek(len: *mut usize) -> *const u8 {
 
    let (err_ptr, err_len) = StoredError::tl_bytes_peek();
 
    if !len.is_null() {
 
        len.write(err_len);
 
    }
 
    err_ptr
 
}
 

	
 
///////////////////// PROTOCOL DESCRIPTION //////////////////////////
 

	
 
/// Parses the utf8-encoded string slice to initialize a new protocol description object.
 
/// - On success, initializes `out` and returns 0
 
/// - On failure, stores an error string (see `reowolf_error_peek`) and returns -1
 
#[no_mangle]
 
pub unsafe extern "C" fn protocol_description_parse(
 
    pdl: *const u8,
 
    pdl_len: usize,
 
) -> *mut Arc<ProtocolDescription> {
 
    StoredError::tl_clear();
 
    match ProtocolDescription::parse(&*slice_from_raw_parts(pdl, pdl_len)) {
 
        Ok(new) => Box::into_raw(Box::new(Arc::new(new))),
 
        Err(err) => {
 
            StoredError::tl_bytes_store(err.as_bytes());
 
            std::ptr::null_mut()
 
        }
 
    }
 
}
 

	
 
/// Destroys the given initialized protocol description and frees its resources.
 
#[no_mangle]
 
pub unsafe extern "C" fn protocol_description_destroy(pd: *mut Arc<ProtocolDescription>) {
 
    drop(Box::from_raw(pd))
 
}
 

	
 
/// Given an initialized protocol description, initializes `out` with a clone which can be independently created or destroyed.
 
#[no_mangle]
 
pub unsafe extern "C" fn protocol_description_clone(
 
    pd: &Arc<ProtocolDescription>,
 
) -> *mut Arc<ProtocolDescription> {
 
    Box::into_raw(Box::new(pd.clone()))
 
}
 

	
 
///////////////////// CONNECTOR //////////////////////////
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_new_logging(
 
    pd: &Arc<ProtocolDescription>,
 
    path_ptr: *const u8,
 
    path_len: usize,
 
) -> *mut Connector {
 
    StoredError::tl_clear();
 
    let path_bytes = &*slice_from_raw_parts(path_ptr, path_len);
 
    let path_str = match std::str::from_utf8(path_bytes) {
 
        Ok(path_str) => path_str,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            return std::ptr::null_mut();
 
        }
 
    };
 
    match std::fs::File::create(path_str) {
 
        Ok(file) => {
 
            let connector_id = Connector::random_id();
 
            let file_logger = Box::new(FileLogger::new(connector_id, file));
 
            let c = Connector::new(file_logger, pd.clone(), connector_id);
 
            Box::into_raw(Box::new(c))
 
        }
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            std::ptr::null_mut()
 
        }
 
    }
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_print_debug(connector: &mut Connector) {
 
    println!("Debug print dump {:#?}", connector);
 
}
 

	
 
/// Initializes `out` with a new connector using the given protocol description as its configuration.
 
/// The connector uses the given (internal) connector ID.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_new(pd: &Arc<ProtocolDescription>) -> *mut Connector {
 
    let c = Connector::new(Box::new(DummyLogger), pd.clone(), Connector::random_id());
 
    Box::into_raw(Box::new(c))
 
}
 

	
 
/// Destroys the given a pointer to the connector on the heap, freeing its resources.
 
/// Usable in {setup, communication} states.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_destroy(connector: *mut Connector) {
 
    drop(Box::from_raw(connector))
 
}
 

	
 
/// Given an initialized connector in setup or connecting state,
 
/// - Creates a new directed port pair with logical channel putter->getter,
 
/// - adds the ports to the native component's interface,
 
/// - and returns them using the given out pointers.
 
/// Usable in {setup, communication} states.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_add_port_pair(
 
    connector: &mut Connector,
 
    out_putter: *mut PortId,
 
    out_getter: *mut PortId,
 
) {
 
    let [o, i] = connector.new_port_pair();
 
    out_putter.write(o);
 
    out_getter.write(i);
 
}
 

	
 
/// Given
 
/// - an initialized connector in setup or connecting state,
 
/// - a string slice for the component's identifier in the connector's configured protocol description,
 
/// - a set of ports (represented as a slice; duplicates are ignored) in the native component's interface,
 
/// the connector creates a new (internal) protocol component C, such that the set of native ports are moved to C.
 
/// Usable in {setup, communication} states.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_add_component(
 
    connector: &mut Connector,
 
    ident_ptr: *const u8,
 
    ident_len: usize,
 
    ports_ptr: *const PortId,
 
    ports_len: usize,
 
) -> c_int {
 
    StoredError::tl_clear();
 
    match connector.add_component(
 
        &*slice_from_raw_parts(ident_ptr, ident_len),
 
        &*slice_from_raw_parts(ports_ptr, ports_len),
 
    ) {
 
        Ok(()) => ERR_OK,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            ERR_REOWOLF
 
        }
 
    }
 
}
 

	
 
/// Given
 
/// - an initialized connector in setup or connecting state,
 
/// - a utf-8 encoded socket address,
 
/// - the logical polarity of P,
 
/// - the "physical" polarity in {Active, Passive} of the endpoint through which P's peer will be discovered,
 
/// returns P, a port newly added to the native interface.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_add_net_port(
 
    connector: &mut Connector,
 
    port: *mut PortId,
 
    addr: FfiSocketAddr,
 
    port_polarity: Polarity,
 
    endpoint_polarity: EndpointPolarity,
 
) -> c_int {
 
    StoredError::tl_clear();
 
    match connector.new_net_port(port_polarity, addr.into(), endpoint_polarity) {
 
        Ok(p) => {
 
            if !port.is_null() {
 
                port.write(p);
 
            }
 
            ERR_OK
 
        }
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            ERR_REOWOLF
 
        }
 
    }
 
}
 

	
 
/// Given
 
/// - an initialized connector in setup or connecting state,
 
/// - a utf-8 encoded BIND socket addresses (i.e., "local"),
 
/// - a utf-8 encoded CONNECT socket addresses (i.e., "peer"),
 
/// returns [P, G] via out pointers [putter, getter],
 
/// - where P is a Putter port that sends messages into the socket
 
/// - where G is a Getter port that recvs messages from the socket
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_add_udp_port_pair(
 
    connector: &mut Connector,
 
    putter: *mut PortId,
 
    getter: *mut PortId,
 
    local_addr: FfiSocketAddr,
 
    peer_addr: FfiSocketAddr,
 
) -> c_int {
 
    StoredError::tl_clear();
 
    match connector.new_udp_mediator_component(local_addr.into(), peer_addr.into()) {
 
        Ok([p, g]) => {
src/ffi/pseudo_socket_api.rs
Show inline comments
 
use super::*;
 

	
 
use libc::{sockaddr, socklen_t};
 
use std::{
 
    collections::HashMap,
 
    ffi::c_void,
 
    net::{Ipv4Addr, SocketAddr, SocketAddrV4},
 
    os::raw::c_int,
 
    sync::RwLock,
 
};
 
use libc::{sockaddr, socklen_t};
 
///////////////////////////////////////////////////////////////////
 

	
 
struct FdAllocator {
 
    next: Option<c_int>,
 
    freed: Vec<c_int>,
 
}
 
struct ConnectorBound {
 
    connector: Connector,
 
    putter: PortId,
 
    getter: PortId,
 
}
 
struct ConnectorComplex {
 
    // invariants:
 
    // 1. connector is a upd-socket singleton
 
    // 2. putter and getter are ports in the native interface with the appropriate polarities
 
    // 3. peer_addr always mirrors connector's single udp socket's connect addr. both are overwritten together.
 
    peer_addr: SocketAddr,
 
    // 3. connected_to always mirrors connector's single udp socket's connect addr. both are overwritten together.
 
    conencted_to: Option<SocketAddr>,
 
    connector_bound: Option<ConnectorBound>,
 
}
 
#[derive(Default)]
 
struct CcMap {
 
    fd_to_cc: HashMap<c_int, RwLock<ConnectorComplex>>,
 
    fd_allocator: FdAllocator,
 
}
 
///////////////////////////////////////////////////////////////////
 
unsafe fn addr_from_raw(addr: *const sockaddr, addr_len: socklen_t) -> Option<SocketAddr> {
 
    os_socketaddr::OsSocketAddr::from_raw_parts(addr as _, addr_len as usize).into_addr()
 
}
 
fn trivial_peer_addr() -> SocketAddr {
 
fn dummy_peer_addr() -> SocketAddr {
 
    // SocketAddrV4::new isn't a constant-time func
 
    SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0))
 
    SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 0), 8000))
 
}
 
impl Default for FdAllocator {
 
    fn default() -> Self {
 
        Self {
 
            next: Some(0), // positive values used only
 
            freed: vec![],
 
        }
 
    }
 
}
 
impl FdAllocator {
 
    fn alloc(&mut self) -> c_int {
 
        if let Some(fd) = self.freed.pop() {
 
            return fd;
 
        }
 
        if let Some(fd) = self.next {
 
            self.next = fd.checked_add(1);
 
            return fd;
 
        }
 
        panic!("No more Connector FDs to allocate!")
 
    }
 
    fn free(&mut self, fd: c_int) {
 
        self.freed.push(fd);
 
    }
 
}
 
lazy_static::lazy_static! {
 
    static ref CC_MAP: RwLock<CcMap> = Default::default();
 
}
 
impl ConnectorComplex {
 
    fn connect(&mut self, peer_addr: SocketAddr) -> c_int {
 
        self.peer_addr = peer_addr;
 
        if let Some(ConnectorBound { connector, .. }) = &mut self.connector_bound {
 
            if connector.get_mut_udp_sock(0).unwrap().connect(peer_addr).is_err() {
 
                return CONNECT_FAILED;
 
            }
 
        }
 
        ERR_OK
 
    }
 
    unsafe fn send(&mut self, bytes_ptr: *const c_void, bytes_len: usize) -> isize {
 
        if let Some(ConnectorBound { connector, putter, .. }) = &mut self.connector_bound {
 
            match connector_put_bytes(connector, *putter, bytes_ptr as _, bytes_len) {
 
                ERR_OK => connector_sync(connector, -1),
 
                err => err as isize,
 
            }
 
        } else {
 
            WRONG_STATE as isize // not bound!
 
        }
 
    }
 
    unsafe fn recv(&mut self, bytes_ptr: *const c_void, bytes_len: usize) -> isize {
 
        if let Some(ConnectorBound { connector, getter, .. }) = &mut self.connector_bound {
 
            connector_get(connector, *getter);
 
            match connector_sync(connector, -1) {
 
                0 => {
 
                    // batch index 0 means OK
 
                    let slice = connector.gotten(*getter).unwrap().as_slice();
 
                    let copied_bytes = slice.len().min(bytes_len);
 
                    std::ptr::copy_nonoverlapping(
 
                        slice.as_ptr(),
 
                        bytes_ptr as *mut u8,
 
                        copied_bytes,
 
                    );
 
                    copied_bytes as isize
 
                }
 
                err => return err as isize,
 
            }
 
        } else {
 
            WRONG_STATE as isize // not bound!
 
        }
 
    }
 
}
 

	
 
///////////////////////////////////////////////////////////////////
 

	
 
#[no_mangle]
 
pub extern "C" fn rw_socket(_domain: c_int, _type: c_int) -> c_int {
 
    // ignoring domain and type
 
    // get writer lock
 
    let mut w = if let Ok(w) = CC_MAP.write() { w } else { return CC_MAP_LOCK_POISONED };
 
    let fd = w.fd_allocator.alloc();
 
    let cc = ConnectorComplex { peer_addr: trivial_peer_addr(), connector_bound: None };
 
    let cc = ConnectorComplex { peer_addr: dummy_peer_addr(), connector_bound: None };
 
    w.fd_to_cc.insert(fd, RwLock::new(cc));
 
    fd
 
}
 

	
 
#[no_mangle]
 
pub extern "C" fn rw_close(fd: c_int, _how: c_int) -> c_int {
 
    // ignoring HOW
 
    // get writer lock
 
    let mut w = if let Ok(w) = CC_MAP.write() { w } else { return CC_MAP_LOCK_POISONED };
 
    if w.fd_to_cc.remove(&fd).is_some() {
 
        w.fd_allocator.free(fd);
 
        ERR_OK
 
    } else {
 
        CLOSE_FAIL
 
    }
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn rw_bind(fd: c_int, addr: *const sockaddr, addr_len: socklen_t) -> c_int {
 
    // assuming _domain is AF_INET and _type is SOCK_DGRAM
 
    let addr = match addr_from_raw(addr, addr_len) {
 
        Some(addr) => addr,
 
        _ => return BAD_SOCKADDR,
 
    };
 
    // assuming _domain is AF_INET and _type is SOCK_DGRAM
 
    // get outer reader, inner writer locks
 
    let r = if let Ok(r) = CC_MAP.read() { r } else { return CC_MAP_LOCK_POISONED };
 
    let cc = if let Some(cc) = r.fd_to_cc.get(&fd) { cc } else { return BAD_FD };
 
    let mut cc = if let Ok(cc) = cc.write() { cc } else { return CC_MAP_LOCK_POISONED };
 
    let cc: &mut ConnectorComplex = &mut cc;
 
    if cc.connector_bound.is_some() {
 
        // already bound!
 
        return WRONG_STATE;
 
    }
 
    cc.connector_bound = {
 
        let mut connector = Connector::new(
 
            Box::new(crate::DummyLogger),
 
            crate::TRIVIAL_PD.clone(),
 
            Connector::random_id(),
 
        );
 
        let [putter, getter] = connector.new_udp_mediator_component(addr, cc.peer_addr).unwrap();
 
        // maintain invariant: if cc.connected_to.is_some():
 
        //   cc.connected_to matches the connected address of the socket
 
        let peer_addr = cc.connected_to.unwrap_or_with(dummy_peer_addr);
 
        let [putter, getter] = connector.new_udp_mediator_component(addr, peer_addr).unwrap();
 
        Some(ConnectorBound { connector, putter, getter })
 
    };
 
    ERR_OK
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn rw_connect(
 
    fd: c_int,
 
    addr: *const sockaddr,
 
    addr_len: socklen_t,
 
) -> c_int {
 
    let addr = match addr_from_raw(addr, addr_len) {
 
        Some(addr) => addr,
 
        _ => return BAD_SOCKADDR,
 
    };
 
    // assuming _domain is AF_INET and _type is SOCK_DGRAM
 
    // get outer reader, inner writer locks
 
    let r = if let Ok(r) = CC_MAP.read() { r } else { return CC_MAP_LOCK_POISONED };
 
    let cc = if let Some(cc) = r.fd_to_cc.get(&fd) { cc } else { return BAD_FD };
 
    let mut cc = if let Ok(cc) = cc.write() { cc } else { return CC_MAP_LOCK_POISONED };
 
    let cc: &mut ConnectorComplex = &mut cc;
 
    cc.connect(addr)
 
    if let Some(ConnectorBound { connector, .. }) = &mut cc.connector_bound {
 
        // already bound. maintain invariant by overwriting the socket's connection (DUMMY or otherwise)
 
        if connector.get_mut_udp_ee(0).unwrap().sock.connect(peer_addr).is_err() {
 
            return CONNECT_FAILED;
 
        }
 
    }
 
    cc.connected_to = Some(addr);
 
    ERR_OK
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn rw_send(
 
    fd: c_int,
 
    bytes_ptr: *const c_void,
 
    bytes_len: usize,
 
    _flags: c_int,
 
) -> isize {
 
    // ignoring flags
 
    // get outer reader, inner writer locks
 
    let r = if let Ok(r) = CC_MAP.read() { r } else { return CC_MAP_LOCK_POISONED as isize };
 
    let cc = if let Some(cc) = r.fd_to_cc.get(&fd) { cc } else { return BAD_FD as isize };
 
    let mut cc = if let Ok(cc) = cc.write() { cc } else { return CC_MAP_LOCK_POISONED as isize };
 
    let cc: &mut ConnectorComplex = &mut cc;
 
    cc.send(bytes_ptr, bytes_len)
 
    if cc.connected_to.is_none() {
 
        return SEND_BEFORE_CONNECT;
 
    }
 
    if let Some(ConnectorBound { connector, putter, .. }) = &mut cc.connector_bound {
 
        // is bound
 
        let bytes = &*slice_from_raw_parts(bytes_ptr, bytes_len);
 
        connector.put(putter, Payload::from_bytes(bytes)).unwrap();
 
        connector.sync(connector, None).unwrap();
 
        bytes_len as isize
 
    } else {
 
        // is not bound
 
        WRONG_STATE as isize
 
    }
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn rw_recv(
 
pub unsafe extern "C" fn rw_sendto(
 
    fd: c_int,
 
    bytes_ptr: *mut c_void,
 
    bytes_len: usize,
 
    _flags: c_int,
 
    addr: *const sockaddr,
 
    addr_len: socklen_t,
 
) -> isize {
 
    // ignoring flags
 
    let addr = match addr_from_raw(addr, addr_len) {
 
        Some(addr) => addr,
 
        _ => return BAD_SOCKADDR,
 
    };
 
    // get outer reader, inner writer locks
 
    let r = if let Ok(r) = CC_MAP.read() { r } else { return CC_MAP_LOCK_POISONED as isize };
 
    let cc = if let Some(cc) = r.fd_to_cc.get(&fd) { cc } else { return BAD_FD as isize };
 
    let mut cc = if let Ok(cc) = cc.write() { cc } else { return CC_MAP_LOCK_POISONED as isize };
 
    let cc: &mut ConnectorComplex = &mut cc;
 
    cc.recv(bytes_ptr, bytes_len)
 
    if let Some(ConnectorBound { connector, putter, .. }) = &mut cc.connector_bound {
 
        // is bound
 
        // (temporarily) break invariant
 
        if connector.get_mut_udp_ee(0).unwrap().sock.connect(addr).is_err() {
 
            // invariant not broken. nevermind
 
            return CONNECT_FAILED;
 
        }
 
        // invariant broken...
 
        let bytes = &*slice_from_raw_parts(bytes_ptr, bytes_len);
 
        connector.put(putter, Payload::from_bytes(bytes)).unwrap();
 
        connector.sync(connector, None).unwrap();
 
        let old_addr = cc.connected_to.unwrap_or_with(dummy_peer_addr)
 
        connector.get_mut_udp_ee(0).unwrap().sock.connect(addr).unwrap();
 
        // ...invariant restored
 
        bytes_len as isize
 
    } else {
 
        // is not bound
 
        WRONG_STATE as isize
 
    }
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn rw_sendto(
 
pub unsafe extern "C" fn rw_recv(
 
    fd: c_int,
 
    bytes_ptr: *mut c_void,
 
    bytes_len: usize,
 
    _flags: c_int,
 
    addr: *const sockaddr,
 
    addr_len: socklen_t,
 
) -> isize {
 
    let addr = match addr_from_raw(addr, addr_len) {
 
        Some(addr) => addr,
 
        _ => return BAD_SOCKADDR as isize,
 
    };
 
    // ignoring flags
 
    // get outer reader, inner writer locks
 
    let r = if let Ok(r) = CC_MAP.read() { r } else { return CC_MAP_LOCK_POISONED as isize };
 
    let cc = if let Some(cc) = r.fd_to_cc.get(&fd) { cc } else { return BAD_FD as isize };
 
    let mut cc = if let Ok(cc) = cc.write() { cc } else { return CC_MAP_LOCK_POISONED as isize };
 
    let cc: &mut ConnectorComplex = &mut cc;
 
    // copy currently old_addr
 
    let old_addr = cc.peer_addr;
 
    // connect to given peer_addr
 
    match cc.connect(addr) {
 
        e if e != ERR_OK => return e as isize,
 
        _ => {}
 
    }
 
    // send
 
    let ret = cc.send(bytes_ptr, bytes_len);
 
    // restore old_addr
 
    match cc.connect(old_addr) {
 
        e if e != ERR_OK => return e as isize,
 
        _ => {}
 
    if let Some(ConnectorBound { connector, getter, .. }) = &mut self.connector_bound {
 
        connector.get(getter).unwrap();
 
        // this call BLOCKS until it succeeds, and its got no reason to fail
 
        connector.sync(connector, None).unwrap();
 
        // copy from gotten to caller's buffer (truncating if necessary)
 
        let slice = connector.gotten(*getter).unwrap().as_slice();
 
        let cpy_msg_bytes = slice.len().min(bytes_len);
 
        std::ptr::copy_nonoverlapping(slice.as_ptr(), bytes_ptr as *mut u8, cpy_msg_bytes);
 
        // return number of bytes sent
 
        cpy_msg_bytes as isize
 
    } else {
 
        WRONG_STATE as isize // not bound!
 
    }
 
    ret
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn rw_recvfrom(
 
    fd: c_int,
 
    bytes_ptr: *mut c_void,
 
    bytes_len: usize,
 
    _flags: c_int,
 
    addr: *const sockaddr,
 
    addr_len: socklen_t,
 
    addr: *mut sockaddr,
 
    addr_len: *mut socklen_t,
 
) -> isize {
 
    let addr = match addr_from_raw(addr, addr_len) {
 
        Some(addr) => addr,
 
        _ => return BAD_SOCKADDR as isize,
 
    };
 
    // ignoring flags
 
    // get outer reader, inner writer locks
 
    let r = if let Ok(r) = CC_MAP.read() { r } else { return CC_MAP_LOCK_POISONED as isize };
 
    let cc = if let Some(cc) = r.fd_to_cc.get(&fd) { cc } else { return BAD_FD as isize };
 
    let mut cc = if let Ok(cc) = cc.write() { cc } else { return CC_MAP_LOCK_POISONED as isize };
 
    let cc: &mut ConnectorComplex = &mut cc;
 
    // copy currently old_addr
 
    let old_addr = cc.peer_addr;
 
    // connect to given peer_addr
 
    match cc.connect(addr) {
 
        e if e != ERR_OK => return e as isize,
 
        _ => {}
 
    }
 
    // send
 
    let ret = cc.send(bytes_ptr, bytes_len);
 
    // restore old_addr
 
    match cc.connect(old_addr) {
 
        e if e != ERR_OK => return e as isize,
 
        _ => {}
 
    if let Some(ConnectorBound { connector, getter, .. }) = &mut self.connector_bound {
 
        connector.get(getter).unwrap();
 
        // this call BLOCKS until it succeeds, and its got no reason to fail
 
        connector.sync(connector, None).unwrap();
 
        // overwrite addr and addr_len
 
        let addr = connector.get_mut_udp_ee(0).unwrap().received_from_this_round.unwrap();
 
        let os_addr = os_socketaddr::OsSocketAddr::from(addr);
 
        let cpy_addr_bytes = (*addr_len).min(os_addr.capacity());
 
        // ptr-return addr bytes (truncated to addr_len)
 
        std::ptr::copy_nonoverlapping(os_addr.as_ptr(), addr as *mut u8, cpy_addr_bytes);
 
        // ptr-return true addr size
 
        *addr_len = os_addr.capacity(); 
 
        // copy from gotten to caller's buffer (truncating if necessary)
 
        let slice = connector.gotten(*getter).unwrap().as_slice();
 
        let cpy_msg_bytes = slice.len().min(bytes_len);
 
        std::ptr::copy_nonoverlapping(slice.as_ptr(), bytes_ptr as *mut u8, cpy_msg_bytes);
 
        // return number of bytes received
 
        cpy_msg_bytes as isize
 
    } else {
 
        WRONG_STATE as isize // not bound!
 
    }
 
    ret
 
}
src/runtime/communication.rs
Show inline comments
 
use super::*;
 
use crate::common::*;
 
use core::ops::{Deref, DerefMut};
 

	
 
////////////////
 
// Guard protecting an incrementally unfoldable slice of MapTempGuard elements
 
struct MapTempsGuard<'a, K, V>(&'a mut [HashMap<K, V>]);
 
// Type protecting a temporary map; At the start and end of the Guard's lifetime, self.0.is_empty() must be true
 
struct MapTempGuard<'a, K, V>(&'a mut HashMap<K, V>);
 

	
 
#[derive(Default)]
 
struct GetterBuffer {
 
    getters_and_sends: Vec<(PortId, SendPayloadMsg)>,
 
}
 
struct RoundCtx {
 
    solution_storage: SolutionStorage,
 
    spec_var_stream: SpecVarStream,
 
    getter_buffer: GetterBuffer,
 
    deadline: Option<Instant>,
 
}
 
struct BranchingNative {
 
    branches: HashMap<Predicate, NativeBranch>,
 
}
 
#[derive(Clone, Debug)]
 
struct NativeBranch {
 
    index: usize,
 
    gotten: HashMap<PortId, Payload>,
 
    to_get: HashSet<PortId>,
 
}
 
#[derive(Debug)]
 
struct SolutionStorage {
 
    old_local: HashSet<Predicate>,
 
    new_local: HashSet<Predicate>,
 
    // this pair acts as SubtreeId -> HashSet<Predicate> which is friendlier to iteration
 
    subtree_solutions: Vec<HashSet<Predicate>>,
 
    subtree_id_to_index: HashMap<SubtreeId, usize>,
 
}
 
#[derive(Debug)]
 
struct BranchingProtoComponent {
 
    ports: HashSet<PortId>,
 
    branches: HashMap<Predicate, ProtoComponentBranch>,
 
}
 
#[derive(Debug, Clone)]
 
struct ProtoComponentBranch {
 
    state: ComponentState,
 
    inner: ProtoComponentBranchInner,
 
    ended: bool,
 
}
 
struct CyclicDrainer<'a, K: Eq + Hash, V> {
 
    input: &'a mut HashMap<K, V>,
 
    inner: CyclicDrainInner<'a, K, V>,
 
}
 
struct CyclicDrainInner<'a, K: Eq + Hash, V> {
 
    swap: &'a mut HashMap<K, V>,
 
    output: &'a mut HashMap<K, V>,
 
}
 
trait ReplaceBoolTrue {
 
    fn replace_with_true(&mut self) -> bool;
 
}
 
impl ReplaceBoolTrue for bool {
 
    fn replace_with_true(&mut self) -> bool {
 
        let was = *self;
 
        *self = true;
 
        !was
 
    }
 
}
 

	
 
////////////////
 
impl<'a, K, V> MapTempsGuard<'a, K, V> {
 
    fn reborrow(&mut self) -> MapTempsGuard<'_, K, V> {
 
        MapTempsGuard(self.0)
 
    }
 
    fn split_first_mut(self) -> (MapTempGuard<'a, K, V>, MapTempsGuard<'a, K, V>) {
 
        let (head, tail) = self.0.split_first_mut().expect("Cache exhausted");
 
        (MapTempGuard::new(head), MapTempsGuard(tail))
 
    }
 
}
 
impl<'a, K, V> MapTempGuard<'a, K, V> {
 
    fn new(map: &'a mut HashMap<K, V>) -> Self {
 
        assert!(map.is_empty()); // sanity check
 
        Self(map)
 
    }
 
}
 
impl<'a, K, V> Drop for MapTempGuard<'a, K, V> {
 
    fn drop(&mut self) {
 
        assert!(self.0.is_empty()); // sanity check
 
    }
 
}
 
impl<'a, K, V> Deref for MapTempGuard<'a, K, V> {
 
    type Target = HashMap<K, V>;
 
    fn deref(&self) -> &<Self as Deref>::Target {
 
        self.0
 
    }
 
}
 
impl<'a, K, V> DerefMut for MapTempGuard<'a, K, V> {
 
    fn deref_mut(&mut self) -> &mut <Self as Deref>::Target {
 
        self.0
 
    }
 
}
 
impl RoundCtxTrait for RoundCtx {
 
    fn get_deadline(&self) -> &Option<Instant> {
 
        &self.deadline
 
    }
 
    fn getter_add(&mut self, getter: PortId, msg: SendPayloadMsg) {
 
        self.getter_buffer.getter_add(getter, msg)
 
    }
 
}
 
impl Connector {
 
    fn get_comm_mut(&mut self) -> Option<&mut ConnectorCommunication> {
 
        if let ConnectorPhased::Communication(comm) = &mut self.phased {
 
            Some(comm)
 
        } else {
 
            None
 
        }
 
    }
 
    // #[cfg(ffi_socket_api)]
 
    pub(crate) fn get_mut_udp_sock(&mut self, index: usize) -> Option<&mut UdpSocket> {
 
        let sock = &mut self
 
            .get_comm_mut()?
 
            .endpoint_manager
 
            .udp_endpoint_store
 
            .endpoint_exts
 
            .get_mut(index)?
 
            .sock;
 
        Some(sock)
 
    pub(crate) fn get_mut_udp_ee(&mut self, index: usize) -> Option<&mut UdpEndpointExt> {
 
        self.get_comm_mut()?.endpoint_manager.udp_endpoint_store.endpoint_exts.get_mut(index)
 
    }
 
    pub fn gotten(&mut self, port: PortId) -> Result<&Payload, GottenError> {
 
        use GottenError as Ge;
 
        let comm = self.get_comm_mut().ok_or(Ge::NoPreviousRound)?;
 
        match &comm.round_result {
 
            Err(_) => Err(Ge::PreviousSyncFailed),
 
            Ok(None) => Err(Ge::NoPreviousRound),
 
            Ok(Some(round_ok)) => round_ok.gotten.get(&port).ok_or(Ge::PortDidntGet),
 
        }
 
    }
 
    pub fn next_batch(&mut self) -> Result<usize, WrongStateError> {
 
        // returns index of new batch
 
        let comm = self.get_comm_mut().ok_or(WrongStateError)?;
 
        comm.native_batches.push(Default::default());
 
        Ok(comm.native_batches.len() - 1)
 
    }
 
    fn port_op_access(
 
        &mut self,
 
        port: PortId,
 
        expect_polarity: Polarity,
 
    ) -> Result<&mut NativeBatch, PortOpError> {
 
        use PortOpError as Poe;
 
        let Self { unphased: cu, phased } = self;
 
        if !cu.inner.native_ports.contains(&port) {
 
            return Err(Poe::PortUnavailable);
 
        }
 
        match cu.inner.port_info.polarities.get(&port) {
 
            Some(p) if *p == expect_polarity => {}
 
            Some(_) => return Err(Poe::WrongPolarity),
 
            None => return Err(Poe::UnknownPolarity),
 
        }
 
        match phased {
 
            ConnectorPhased::Setup { .. } => Err(Poe::NotConnected),
 
            ConnectorPhased::Communication(comm) => {
 
                let batch = comm.native_batches.last_mut().unwrap(); // length >= 1 is invariant
 
                Ok(batch)
 
            }
 
        }
 
    }
 
    pub fn put(&mut self, port: PortId, payload: Payload) -> Result<(), PortOpError> {
 
        use PortOpError as Poe;
 
        let batch = self.port_op_access(port, Putter)?;
 
        if batch.to_put.contains_key(&port) {
 
            Err(Poe::MultipleOpsOnPort)
 
        } else {
 
            batch.to_put.insert(port, payload);
 
            Ok(())
 
        }
 
    }
 
    pub fn get(&mut self, port: PortId) -> Result<(), PortOpError> {
 
        use PortOpError as Poe;
 
        let batch = self.port_op_access(port, Getter)?;
 
        if batch.to_get.insert(port) {
 
            Ok(())
 
        } else {
 
            Err(Poe::MultipleOpsOnPort)
 
        }
 
    }
 
    // entrypoint for caller. overwrites round result enum, and returns what happened
 
    pub fn sync(&mut self, timeout: Option<Duration>) -> Result<usize, SyncError> {
 
        let Self { unphased: cu, phased } = self;
 
        match phased {
 
            ConnectorPhased::Setup { .. } => Err(SyncError::NotConnected),
 
            ConnectorPhased::Communication(comm) => {
 
                match &comm.round_result {
 
                    Err(SyncError::Unrecoverable(e)) => {
 
                        log!(cu.inner.logger, "Attempted to start sync round, but previous error {:?} was unrecoverable!", e);
 
                        return Err(SyncError::Unrecoverable(e.clone()));
 
                    }
 
                    _ => {}
 
                }
 
                comm.round_result = Self::connected_sync(cu, comm, timeout);
 
                comm.round_index += 1;
 
                match &comm.round_result {
 
                    Ok(None) => unreachable!(),
 
                    Ok(Some(ok_result)) => Ok(ok_result.batch_index),
 
                    Err(sync_error) => Err(sync_error.clone()),
 
                }
 
            }
 
        }
 
    }
 
    // private function. mutates state but returns with round
 
    // result ASAP (allows for convenient error return with ?)
 
    fn connected_sync(
 
        cu: &mut ConnectorUnphased,
 
        comm: &mut ConnectorCommunication,
 
        timeout: Option<Duration>,
 
    ) -> Result<Option<RoundOk>, SyncError> {
 
        //////////////////////////////////
 
        use SyncError as Se;
 
        //////////////////////////////////
 
        log!(
 
            cu.inner.logger,
 
            "~~~ SYNC called with timeout {:?}; starting round {}",
 
            &timeout,
 
            comm.round_index
 
        );
 

	
 
        // 1. run all proto components to Nonsync blockers
 
        // NOTE: original components are immutable until Decision::Success
 
        let mut branching_proto_components =
 
            HashMap::<ProtoComponentId, BranchingProtoComponent>::default();
 
        let mut unrun_components: Vec<(ProtoComponentId, ProtoComponent)> =
 
            cu.proto_components.iter().map(|(&k, v)| (k, v.clone())).collect();
 
        log!(cu.inner.logger, "Nonsync running {} proto components...", unrun_components.len());
 
        // drains unrun_components, and populates branching_proto_components.
 
        while let Some((proto_component_id, mut component)) = unrun_components.pop() {
 
            // TODO coalesce fields
 
            log!(
 
                cu.inner.logger,
 
                "Nonsync running proto component with ID {:?}. {} to go after this",
 
                proto_component_id,
 
                unrun_components.len()
 
            );
 
            let mut ctx = NonsyncProtoContext {
 
                cu_inner: &mut cu.inner,
 
                proto_component_id,
 
                unrun_components: &mut unrun_components,
 
                proto_component_ports: &mut cu
 
                    .proto_components
 
                    .get_mut(&proto_component_id)
 
                    .unwrap() // unrun_components' keys originate from proto_components
 
                    .ports,
 
            };
 
            let blocker = component.state.nonsync_run(&mut ctx, &cu.proto_description);
 
            log!(
 
                cu.inner.logger,
 
                "proto component {:?} ran to nonsync blocker {:?}",
 
                proto_component_id,
 
                &blocker
 
            );
 
            use NonsyncBlocker as B;
 
            match blocker {
 
                B::ComponentExit => drop(component),
 
                B::Inconsistent => return Err(Se::InconsistentProtoComponent(proto_component_id)),
 
                B::SyncBlockStart => {
 
                    branching_proto_components
 
                        .insert(proto_component_id, BranchingProtoComponent::initial(component));
 
                }
 
            }
 
        }
 
        log!(
 
            cu.inner.logger,
 
            "All {} proto components are now done with Nonsync phase",
 
            branching_proto_components.len(),
 
        );
 

	
 
        // Create temp structures needed for the synchronous phase of the round
 
        let mut rctx = RoundCtx {
 
            solution_storage: {
 
                let n = std::iter::once(SubtreeId::LocalComponent(ComponentId::Native));
 
                let c = cu
 
                    .proto_components
 
                    .keys()
 
                    .map(|&id| SubtreeId::LocalComponent(ComponentId::Proto(id)));
 
                let e = comm
 
                    .neighborhood
 
                    .children
 
                    .iter()
 
                    .map(|&index| SubtreeId::NetEndpoint { index });
 
                let subtree_id_iter = n.chain(c).chain(e);
 
                log!(
 
                    cu.inner.logger,
 
                    "Children in subtree are: {:?}",
 
                    subtree_id_iter.clone().collect::<Vec<_>>()
 
                );
 
                SolutionStorage::new(subtree_id_iter)
 
            },
 
            spec_var_stream: cu.inner.id_manager.new_spec_var_stream(),
 
            getter_buffer: Default::default(),
 
            deadline: timeout.map(|to| Instant::now() + to),
 
        };
 
        log!(cu.inner.logger, "Round context structure initialized");
 

	
 
        // Explore all native branches eagerly. Find solutions, buffer messages, etc.
 
        log!(
 
            cu.inner.logger,
 
            "Translating {} native batches into branches...",
 
            comm.native_batches.len()
 
        );
 
        let native_spec_var = rctx.spec_var_stream.next();
 
        log!(cu.inner.logger, "Native branch spec var is {:?}", native_spec_var);
 
        let mut branching_native = BranchingNative { branches: Default::default() };
 
        'native_branches: for ((native_branch, index), branch_spec_val) in
 
            comm.native_batches.drain(..).zip(0..).zip(SpecVal::iter_domain())
 
        {
 
            let NativeBatch { to_get, to_put } = native_branch;
 
            let predicate = {
 
                let mut predicate = Predicate::default();
 
                // assign trues for ports that fire
 
                let firing_ports: HashSet<PortId> =
 
                    to_get.iter().chain(to_put.keys()).copied().collect();
src/runtime/endpoints.rs
Show inline comments
 
@@ -45,383 +45,384 @@ impl NetEndpoint {
 
            Ok(msg) => {
 
                let msg_size = monitored.bytes_read();
 
                self.inbox.drain(0..(msg_size.try_into().unwrap()));
 
                endptlog!(
 
                    logger,
 
                    "Yielding msg. Inbox len {}-{}=={}: [{:?}]",
 
                    self.inbox.len() + msg_size,
 
                    msg_size,
 
                    self.inbox.len(),
 
                    DenseDebugHex(&self.inbox[..]),
 
                );
 
                Ok(Some(msg))
 
            }
 
            Err(e) => match *e {
 
                bincode::ErrorKind::Io(k) if k.kind() == std::io::ErrorKind::UnexpectedEof => {
 
                    Ok(None)
 
                }
 
                _ => Err(Nee::MalformedMessage),
 
            },
 
        }
 
    }
 
    pub(super) fn send<T: serde::ser::Serialize>(
 
        &mut self,
 
        msg: &T,
 
    ) -> Result<(), NetEndpointError> {
 
        use bincode::config::Options;
 
        use NetEndpointError as Nee;
 
        Self::bincode_opts()
 
            .serialize_into(&mut self.stream, msg)
 
            .map_err(|_| Nee::BrokenNetEndpoint)
 
    }
 
}
 

	
 
impl EndpointManager {
 
    pub(super) fn index_iter(&self) -> Range<usize> {
 
        0..self.num_net_endpoints()
 
    }
 
    pub(super) fn num_net_endpoints(&self) -> usize {
 
        self.net_endpoint_store.endpoint_exts.len()
 
    }
 
    pub(super) fn send_to_comms(
 
        &mut self,
 
        index: usize,
 
        msg: &Msg,
 
    ) -> Result<(), UnrecoverableSyncError> {
 
        use UnrecoverableSyncError as Use;
 
        let net_endpoint = &mut self.net_endpoint_store.endpoint_exts[index].net_endpoint;
 
        net_endpoint.send(msg).map_err(|_| Use::BrokenNetEndpoint { index })
 
    }
 
    pub(super) fn send_to_setup(&mut self, index: usize, msg: &Msg) -> Result<(), ConnectError> {
 
        let net_endpoint = &mut self.net_endpoint_store.endpoint_exts[index].net_endpoint;
 
        net_endpoint.send(msg).map_err(|err| {
 
            ConnectError::NetEndpointSetupError(net_endpoint.stream.local_addr().unwrap(), err)
 
        })
 
    }
 

	
 
    /// Receive the first message of any kind at all.
 
    /// Why not return SetupMsg? Because often this message will be forwarded to several others,
 
    /// and by returning a Msg, it can be serialized in-place (NetEndpoints allow the sending of Msg types!)
 
    pub(super) fn try_recv_any_setup(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        deadline: &Option<Instant>,
 
    ) -> Result<(usize, Msg), ConnectError> {
 
        ///////////////////////////////////////////
 
        fn map_trane(
 
            trane: TryRecvAnyNetError,
 
            net_endpoint_store: &EndpointStore<NetEndpointExt>,
 
        ) -> ConnectError {
 
            ConnectError::NetEndpointSetupError(
 
                net_endpoint_store.endpoint_exts[trane.index]
 
                    .net_endpoint
 
                    .stream
 
                    .local_addr()
 
                    .unwrap(), // stream must already be connected
 
                trane.error,
 
            )
 
        }
 
        ///////////////////////////////////////////
 
        // try yield undelayed net message
 
        if let Some(tup) = self.undelayed_messages.pop() {
 
            endptlog!(logger, "RECV undelayed_msg {:?}", &tup);
 
            return Ok(tup);
 
        }
 
        loop {
 
            // try recv from some polled undrained NET endpoint
 
            if let Some(tup) = self
 
                .try_recv_undrained_net(logger)
 
                .map_err(|trane| map_trane(trane, &self.net_endpoint_store))?
 
            {
 
                return Ok(tup);
 
            }
 
            // poll if time remains
 
            self.poll_and_populate(logger, deadline)?;
 
        }
 
    }
 

	
 
    // drops all Setup messages,
 
    // buffers all future round messages,
 
    // drops all previous round messages,
 
    // enqueues all current round SendPayload messages using round_ctx.getter_add
 
    // returns the first comm_ctrl_msg encountered
 
    // only polls until SOME message is enqueued
 
    pub(super) fn try_recv_any_comms(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        port_info: &PortInfo,
 
        round_ctx: &mut impl RoundCtxTrait,
 
        round_index: usize,
 
    ) -> Result<CommRecvOk, UnrecoverableSyncError> {
 
        ///////////////////////////////////////////
 
        impl EndpointManager {
 
            fn handle_msg(
 
                &mut self,
 
                logger: &mut dyn Logger,
 
                round_ctx: &mut impl RoundCtxTrait,
 
                net_index: usize,
 
                msg: Msg,
 
                round_index: usize,
 
                some_message_enqueued: &mut bool,
 
            ) -> Option<(usize, CommCtrlMsg)> {
 
                let comm_msg_contents = match msg {
 
                    Msg::SetupMsg(..) => return None,
 
                    Msg::CommMsg(comm_msg) => match comm_msg.round_index.cmp(&round_index) {
 
                        Ordering::Equal => comm_msg.contents,
 
                        Ordering::Less => {
 
                            log!(
 
                                logger,
 
                                "We are in round {}, but msg is for round {}. Discard",
 
                                comm_msg.round_index,
 
                                round_index,
 
                            );
 
                            return None;
 
                        }
 
                        Ordering::Greater => {
 
                            log!(
 
                                logger,
 
                                "We are in round {}, but msg is for round {}. Buffer",
 
                                comm_msg.round_index,
 
                                round_index,
 
                            );
 
                            self.delayed_messages.push((net_index, Msg::CommMsg(comm_msg)));
 
                            return None;
 
                        }
 
                    },
 
                };
 
                match comm_msg_contents {
 
                    CommMsgContents::CommCtrl(comm_ctrl_msg) => Some((net_index, comm_ctrl_msg)),
 
                    CommMsgContents::SendPayload(send_payload_msg) => {
 
                        let getter =
 
                            self.net_endpoint_store.endpoint_exts[net_index].getter_for_incoming;
 
                        round_ctx.getter_add(getter, send_payload_msg);
 
                        *some_message_enqueued = true;
 
                        None
 
                    }
 
                }
 
            }
 
        }
 
        use {PollAndPopulateError as Pape, UnrecoverableSyncError as Use};
 
        ///////////////////////////////////////////
 
        let mut some_message_enqueued = false;
 
        // try yield undelayed net message
 
        while let Some((net_index, msg)) = self.undelayed_messages.pop() {
 
            if let Some((net_index, msg)) = self.handle_msg(
 
                logger,
 
                round_ctx,
 
                net_index,
 
                msg,
 
                round_index,
 
                &mut some_message_enqueued,
 
            ) {
 
                return Ok(CommRecvOk::NewControlMsg { net_index, msg });
 
            }
 
        }
 
        loop {
 
            // try receive a net message
 
            while let Some((net_index, msg)) = self.try_recv_undrained_net(logger)? {
 
                if let Some((net_index, msg)) = self.handle_msg(
 
                    logger,
 
                    round_ctx,
 
                    net_index,
 
                    msg,
 
                    round_index,
 
                    &mut some_message_enqueued,
 
                ) {
 
                    return Ok(CommRecvOk::NewControlMsg { net_index, msg });
 
                }
 
            }
 
            // try receive a udp message
 
            let recv_buffer = self.udp_in_buffer.as_mut_slice();
 
            while let Some(index) = self.udp_endpoint_store.polled_undrained.pop() {
 
                let ee = &mut self.udp_endpoint_store.endpoint_exts[index];
 
                if let Some(bytes_written) = ee.sock.recv(recv_buffer).ok() {
 
                if let Some((bytes_written, from)) = ee.sock.recv_from(recv_buffer).ok() {
 
                    // I received a payload!
 
                    self.udp_endpoint_store.polled_undrained.insert(index);
 
                    if !ee.received_this_round {
 
                    if !ee.received_from_this_round.is_none() {
 
                        let payload = Payload::from(&recv_buffer[..bytes_written]);
 
                        let port_spec_var = port_info.spec_var_for(ee.getter_for_incoming);
 
                        let predicate = Predicate::singleton(port_spec_var, SpecVal::FIRING);
 
                        round_ctx.getter_add(
 
                            ee.getter_for_incoming,
 
                            SendPayloadMsg { payload, predicate },
 
                        );
 
                        some_message_enqueued = true;
 
                        ee.received_this_round = true;
 
                        ee.received_from_this_round = Some(from);
 
                    } else {
 
                        // lose the message!
 
                    }
 
                }
 
            }
 
            if some_message_enqueued {
 
                return Ok(CommRecvOk::NewPayloadMsgs);
 
            }
 
            // poll if time remains
 
            match self.poll_and_populate(logger, round_ctx.get_deadline()) {
 
                Ok(()) => {} // continue looping
 
                Err(Pape::Timeout) => return Ok(CommRecvOk::TimeoutWithoutNew),
 
                Err(Pape::PollFailed) => return Err(Use::PollFailed),
 
            }
 
        }
 
    }
 
    fn try_recv_undrained_net(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
    ) -> Result<Option<(usize, Msg)>, TryRecvAnyNetError> {
 
        while let Some(index) = self.net_endpoint_store.polled_undrained.pop() {
 
            let net_endpoint = &mut self.net_endpoint_store.endpoint_exts[index].net_endpoint;
 
            if let Some(msg) = net_endpoint
 
                .try_recv(logger)
 
                .map_err(|error| TryRecvAnyNetError { error, index })?
 
            {
 
                endptlog!(logger, "RECV polled_undrained {:?}", &msg);
 
                if !net_endpoint.inbox.is_empty() {
 
                    // there may be another message waiting!
 
                    self.net_endpoint_store.polled_undrained.insert(index);
 
                }
 
                return Ok(Some((index, msg)));
 
            }
 
        }
 
        Ok(None)
 
    }
 
    fn poll_and_populate(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        deadline: &Option<Instant>,
 
    ) -> Result<(), PollAndPopulateError> {
 
        use PollAndPopulateError as Pape;
 
        // No message yet. Do we have enough time to poll?
 
        let remaining = if let Some(deadline) = deadline {
 
            Some(deadline.checked_duration_since(Instant::now()).ok_or(Pape::Timeout)?)
 
        } else {
 
            None
 
        };
 
        // Yes we do! Poll with remaining time as poll deadline
 
        self.poll.poll(&mut self.events, remaining).map_err(|_| Pape::PollFailed)?;
 
        for event in self.events.iter() {
 
            match TokenTarget::from(event.token()) {
 
                TokenTarget::Waker => {
 
                    // Can ignore. Residual event from endpoint manager setup procedure
 
                }
 
                TokenTarget::NetEndpoint { index } => {
 
                    self.net_endpoint_store.polled_undrained.insert(index);
 
                    endptlog!(
 
                        logger,
 
                        "RECV poll event {:?} for NET endpoint index {:?}. undrained: {:?}",
 
                        &event,
 
                        index,
 
                        self.net_endpoint_store.polled_undrained.iter()
 
                    );
 
                }
 
                TokenTarget::UdpEndpoint { index } => {
 
                    self.udp_endpoint_store.polled_undrained.insert(index);
 
                    endptlog!(
 
                        logger,
 
                        "RECV poll event {:?} for UDP endpoint index {:?}. undrained: {:?}",
 
                        &event,
 
                        index,
 
                        self.udp_endpoint_store.polled_undrained.iter()
 
                    );
 
                }
 
            }
 
        }
 
        self.events.clear();
 
        Ok(())
 
    }
 
    pub(super) fn undelay_all(&mut self) {
 
        if self.undelayed_messages.is_empty() {
 
            // fast path
 
            std::mem::swap(&mut self.delayed_messages, &mut self.undelayed_messages);
 
            return;
 
        }
 
        // slow path
 
        self.undelayed_messages.extend(self.delayed_messages.drain(..));
 
    }
 
    pub(super) fn udp_endpoints_round_start(&mut self, logger: &mut dyn Logger) {
 
        log!(
 
            logger,
 
            "Starting round for {} udp endpoints",
 
            self.udp_endpoint_store.endpoint_exts.len()
 
        );
 
        for ee in self.udp_endpoint_store.endpoint_exts.iter_mut() {
 
            ee.received_this_round = false;
 
            ee.received_from_this_round = None;
 
        }
 
    }
 
    pub(super) fn udp_endpoints_round_end(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        decision: &Decision,
 
    ) -> Result<(), UnrecoverableSyncError> {
 
        // retain received_from_this_round for use in pseudo_socket_api::recv_from
 
        log!(
 
            logger,
 
            "Ending round for {} udp endpoints",
 
            self.udp_endpoint_store.endpoint_exts.len()
 
        );
 
        use UnrecoverableSyncError as Use;
 
        if let Decision::Success(solution_predicate) = decision {
 
            'endpoint_loop: for (index, ee) in
 
                self.udp_endpoint_store.endpoint_exts.iter_mut().enumerate()
 
            {
 
                for (payload_predicate, payload) in ee.outgoing_payloads.drain() {
 
                    if payload_predicate.assigns_subset(solution_predicate) {
 
                        ee.sock.send(payload.as_slice()).map_err(|e| {
 
                            println!("{:?}", e);
 
                            Use::BrokenUdpEndpoint { index }
 
                        })?;
 
                        log!(
 
                            logger,
 
                            "Sent payload {:?} with pred {:?} through Udp endpoint {}",
 
                            &payload,
 
                            &payload_predicate,
 
                            index
 
                        );
 
                        continue 'endpoint_loop; // send at most one payload per endpoint per round
 
                    }
 
                }
 
                log!(logger, "Sent no message through Udp endpoint {}", index);
 
            }
 
        }
 
        Ok(())
 
    }
 
}
 
impl Debug for NetEndpoint {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        f.debug_struct("Endpoint").field("inbox", &self.inbox).finish()
 
    }
 
}
 
impl<R: Read> From<R> for MonitoredReader<R> {
 
    fn from(r: R) -> Self {
 
        Self { r, bytes: 0 }
 
    }
 
}
 
impl<R: Read> MonitoredReader<R> {
 
    pub(super) fn bytes_read(&self) -> usize {
 
        self.bytes
 
    }
 
}
 
impl<R: Read> Read for MonitoredReader<R> {
 
    fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
 
        let n = self.r.read(buf)?;
 
        self.bytes += n;
 
        Ok(n)
 
    }
 
}
 
impl Into<Msg> for SetupMsg {
 
    fn into(self) -> Msg {
 
        Msg::SetupMsg(self)
 
    }
 
}
 
impl From<PollAndPopulateError> for ConnectError {
 
    fn from(pape: PollAndPopulateError) -> ConnectError {
 
        use {ConnectError as Ce, PollAndPopulateError as Pape};
 
        match pape {
 
            Pape::PollFailed => Ce::PollFailed,
 
            Pape::Timeout => Ce::Timeout,
 
        }
 
    }
 
}
 
impl From<TryRecvAnyNetError> for UnrecoverableSyncError {
 
    fn from(trane: TryRecvAnyNetError) -> UnrecoverableSyncError {
 
        let TryRecvAnyNetError { index, .. } = trane;
 
        UnrecoverableSyncError::BrokenNetEndpoint { index }
 
    }
 
}
src/runtime/mod.rs
Show inline comments
 
/// cbindgen:ignore
 
mod communication;
 
/// cbindgen:ignore
 
mod endpoints;
 
pub mod error;
 
/// cbindgen:ignore
 
mod logging;
 
/// cbindgen:ignore
 
mod setup;
 

	
 
#[cfg(test)]
 
mod tests;
 

	
 
use crate::common::*;
 
use error::*;
 
use mio::net::UdpSocket;
 

	
 
#[derive(Debug)]
 
pub struct Connector {
 
    unphased: ConnectorUnphased,
 
    phased: ConnectorPhased,
 
}
 
pub trait Logger: Debug + Send + Sync {
 
    fn line_writer(&mut self) -> Option<&mut dyn std::io::Write>;
 
}
 
#[derive(Debug)]
 
pub struct VecLogger(ConnectorId, Vec<u8>);
 
#[derive(Debug)]
 
pub struct DummyLogger;
 
#[derive(Debug)]
 
pub struct FileLogger(ConnectorId, std::fs::File);
 
pub(crate) struct NonsyncProtoContext<'a> {
 
    cu_inner: &'a mut ConnectorUnphasedInner, // persists between rounds
 
    proto_component_ports: &'a mut HashSet<PortId>, // sub-structure of component
 
    unrun_components: &'a mut Vec<(ProtoComponentId, ProtoComponent)>, // lives for Nonsync phase
 
    proto_component_id: ProtoComponentId,     // KEY in id->component map
 
}
 
pub(crate) struct SyncProtoContext<'a> {
 
    cu_inner: &'a mut ConnectorUnphasedInner, // persists between rounds
 
    branch_inner: &'a mut ProtoComponentBranchInner, // sub-structure of component branch
 
    predicate: &'a Predicate,                 // KEY in pred->branch map
 
}
 
#[derive(Debug)]
 
pub(crate) struct UdpEndpointExt {
 
    sock: UdpSocket, // already bound and connected
 
    received_from_this_round: Option<SocketAddr>,
 
    outgoing_payloads: HashMap<Predicate, Payload>,
 
    getter_for_incoming: PortId,
 
}
 
#[derive(Default, Debug, Clone)]
 
struct ProtoComponentBranchInner {
 
    untaken_choice: Option<u16>,
 
    did_put_or_get: HashSet<PortId>,
 
    inbox: HashMap<PortId, Payload>,
 
}
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
struct SpecVar(PortId);
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
struct SpecVal(u16);
 
#[derive(Debug)]
 
struct RoundOk {
 
    batch_index: usize,
 
    gotten: HashMap<PortId, Payload>,
 
}
 
#[derive(Default)]
 
struct VecSet<T: std::cmp::Ord> {
 
    // invariant: ordered, deduplicated
 
    vec: Vec<T>,
 
}
 
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
enum ComponentId {
 
    Native,
 
    Proto(ProtoComponentId),
 
}
 
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
enum Route {
 
    LocalComponent(ComponentId),
 
    NetEndpoint { index: usize },
 
    UdpEndpoint { index: usize },
 
}
 
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
enum SubtreeId {
 
    LocalComponent(ComponentId),
 
    NetEndpoint { index: usize },
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
struct MyPortInfo {
 
    polarity: Polarity,
 
    port: PortId,
 
}
 
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
 
enum Decision {
 
    Failure,
 
    Success(Predicate),
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
enum Msg {
 
    SetupMsg(SetupMsg),
 
    CommMsg(CommMsg),
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
enum SetupMsg {
 
    MyPortInfo(MyPortInfo),
 
    LeaderWave { wave_leader: ConnectorId },
 
    LeaderAnnounce { tree_leader: ConnectorId },
 
    YouAreMyParent,
 
    SessionGather { unoptimized_map: HashMap<ConnectorId, SessionInfo> },
 
    SessionScatter { optimized_map: HashMap<ConnectorId, SessionInfo> },
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
struct SessionInfo {
 
    serde_proto_description: SerdeProtocolDescription,
 
    port_info: PortInfo,
 
    endpoint_incoming_to_getter: Vec<PortId>,
 
    proto_components: HashMap<ProtoComponentId, ProtoComponent>,
 
}
 
#[derive(Debug, Clone)]
 
struct SerdeProtocolDescription(Arc<ProtocolDescription>);
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
struct CommMsg {
 
    round_index: usize,
 
    contents: CommMsgContents,
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
enum CommMsgContents {
 
    SendPayload(SendPayloadMsg),
 
    CommCtrl(CommCtrlMsg),
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
enum CommCtrlMsg {
 
    Suggest { suggestion: Decision }, // SINKWARD
 
    Announce { decision: Decision },  // SINKAWAYS
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
struct SendPayloadMsg {
 
    predicate: Predicate,
 
    payload: Payload,
 
}
 
#[derive(Debug, PartialEq)]
 
enum AssignmentUnionResult {
 
    FormerNotLatter,
 
    LatterNotFormer,
 
    Equivalent,
 
    New(Predicate),
 
    Nonexistant,
 
}
 
struct NetEndpoint {
 
    inbox: Vec<u8>,
 
    stream: TcpStream,
 
}
 
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
 
struct ProtoComponent {
 
    state: ComponentState,
 
    ports: HashSet<PortId>,
 
}
 
#[derive(Debug, Clone)]
 
struct NetEndpointSetup {
 
    getter_for_incoming: PortId,
 
    sock_addr: SocketAddr,
 
    endpoint_polarity: EndpointPolarity,
 
}
 

	
 
#[derive(Debug, Clone)]
 
struct UdpEndpointSetup {
 
    getter_for_incoming: PortId,
 
    local_addr: SocketAddr,
 
    peer_addr: SocketAddr,
 
}
 
#[derive(Debug)]
 
struct NetEndpointExt {
 
    net_endpoint: NetEndpoint,
 
    getter_for_incoming: PortId,
 
}
 
#[derive(Debug)]
 
struct Neighborhood {
 
    parent: Option<usize>,
 
    children: VecSet<usize>,
 
}
 
#[derive(Debug)]
 
struct IdManager {
 
    connector_id: ConnectorId,
 
    port_suffix_stream: U32Stream,
 
    proto_component_suffix_stream: U32Stream,
 
}
 
#[derive(Debug)]
 
struct UdpInBuffer {
 
    byte_vec: Vec<u8>,
 
}
 
#[derive(Debug)]
 
struct SpecVarStream {
 
    connector_id: ConnectorId,
 
    port_suffix_stream: U32Stream,
 
}
 
#[derive(Debug)]
 
struct EndpointManager {
 
    // invariants:
 
    // 1. net and udp endpoints are registered with poll. Poll token computed with TargetToken::into
 
    // 2. Events is empty
 
    poll: Poll,
 
    events: Events,
 
    delayed_messages: Vec<(usize, Msg)>,
 
    undelayed_messages: Vec<(usize, Msg)>,
 
    net_endpoint_store: EndpointStore<NetEndpointExt>,
 
    udp_endpoint_store: EndpointStore<UdpEndpointExt>,
 
    udp_in_buffer: UdpInBuffer,
 
}
 
#[derive(Debug)]
 
struct EndpointStore<T> {
 
    endpoint_exts: Vec<T>,
 
    polled_undrained: VecSet<usize>,
 
}
 
#[derive(Debug)]
 
struct UdpEndpointExt {
 
    sock: UdpSocket, // already bound and connected
 
    received_this_round: bool,
 
    outgoing_payloads: HashMap<Predicate, Payload>,
 
    getter_for_incoming: PortId,
 
}
 
#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
 
struct PortInfo {
 
    polarities: HashMap<PortId, Polarity>,
 
    peers: HashMap<PortId, PortId>,
 
    routes: HashMap<PortId, Route>,
 
}
 
#[derive(Debug)]
 
struct ConnectorCommunication {
 
    round_index: usize,
 
    endpoint_manager: EndpointManager,
 
    neighborhood: Neighborhood,
 
    native_batches: Vec<NativeBatch>,
 
    round_result: Result<Option<RoundOk>, SyncError>,
 
}
 
#[derive(Debug)]
 
struct ConnectorUnphased {
 
    proto_description: Arc<ProtocolDescription>,
 
    proto_components: HashMap<ProtoComponentId, ProtoComponent>,
 
    inner: ConnectorUnphasedInner,
 
}
 
#[derive(Debug)]
 
struct ConnectorUnphasedInner {
 
    logger: Box<dyn Logger>,
 
    id_manager: IdManager,
 
    native_ports: HashSet<PortId>,
 
    port_info: PortInfo,
 
}
 
#[derive(Debug)]
 
struct ConnectorSetup {
 
    net_endpoint_setups: Vec<NetEndpointSetup>,
 
    udp_endpoint_setups: Vec<UdpEndpointSetup>,
 
}
 
#[derive(Debug)]
 
enum ConnectorPhased {
 
    Setup(Box<ConnectorSetup>),
 
    Communication(Box<ConnectorCommunication>),
 
}
 
#[derive(Default, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
struct Predicate {
 
    assigned: BTreeMap<SpecVar, SpecVal>,
 
}
 
#[derive(Debug, Default)]
 
struct NativeBatch {
 
    // invariant: putters' and getters' polarities respected
 
    to_put: HashMap<PortId, Payload>,
 
    to_get: HashSet<PortId>,
 
}
 
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
 
enum TokenTarget {
 
    NetEndpoint { index: usize },
 
    UdpEndpoint { index: usize },
 
    Waker,
 
}
 
trait RoundCtxTrait {
 
    fn get_deadline(&self) -> &Option<Instant>;
 
    fn getter_add(&mut self, getter: PortId, msg: SendPayloadMsg);
 
}
 
enum CommRecvOk {
 
    TimeoutWithoutNew,
 
    NewPayloadMsgs,
 
    NewControlMsg { net_index: usize, msg: CommCtrlMsg },
 
}
 
////////////////
 
fn would_block(err: &std::io::Error) -> bool {
 
    err.kind() == std::io::ErrorKind::WouldBlock
 
}
 
impl TokenTarget {
 
    const HALFWAY_INDEX: usize = usize::MAX / 2;
 
    const MAX_INDEX: usize = usize::MAX;
 
    const WAKER_TOKEN: usize = Self::MAX_INDEX;
 
}
 
impl From<Token> for TokenTarget {
 
    fn from(Token(index): Token) -> Self {
 
        if index == Self::WAKER_TOKEN {
 
            TokenTarget::Waker
 
        } else if let Some(shifted) = index.checked_sub(Self::HALFWAY_INDEX) {
 
            TokenTarget::UdpEndpoint { index: shifted }
 
        } else {
 
            TokenTarget::NetEndpoint { index }
 
        }
 
    }
 
}
 
impl Into<Token> for TokenTarget {
 
    fn into(self) -> Token {
 
        match self {
 
            TokenTarget::Waker => Token(Self::WAKER_TOKEN),
 
            TokenTarget::UdpEndpoint { index } => Token(index + Self::HALFWAY_INDEX),
 
            TokenTarget::NetEndpoint { index } => Token(index),
 
        }
 
    }
 
}
 
impl<T: std::cmp::Ord> VecSet<T> {
 
    fn new(mut vec: Vec<T>) -> Self {
 
        vec.sort();
 
        vec.dedup();
 
        Self { vec }
 
    }
 
    fn contains(&self, element: &T) -> bool {
 
        self.vec.binary_search(element).is_ok()
 
    }
 
    fn insert(&mut self, element: T) -> bool {
 
        match self.vec.binary_search(&element) {
 
            Ok(_) => false,
 
            Err(index) => {
 
                self.vec.insert(index, element);
 
                true
 
            }
 
        }
 
    }
 
    fn iter(&self) -> std::slice::Iter<T> {
 
        self.vec.iter()
 
    }
 
    fn pop(&mut self) -> Option<T> {
 
        self.vec.pop()
 
    }
 
}
 
impl PortInfo {
 
    fn spec_var_for(&self, port: PortId) -> SpecVar {
 
        SpecVar(match self.polarities.get(&port).unwrap() {
 
            Getter => port,
 
            Putter => *self.peers.get(&port).unwrap(),
 
        })
 
    }
 
}
 
impl SpecVarStream {
 
    fn next(&mut self) -> SpecVar {
 
        let phantom_port: PortId =
 
            Id { connector_id: self.connector_id, u32_suffix: self.port_suffix_stream.next() }
 
                .into();
 
        SpecVar(phantom_port)
 
    }
 
}
 
impl IdManager {
 
    fn new(connector_id: ConnectorId) -> Self {
 
        Self {
 
            connector_id,
 
            port_suffix_stream: Default::default(),
 
            proto_component_suffix_stream: Default::default(),
 
        }
 
    }
 
    fn new_spec_var_stream(&self) -> SpecVarStream {
 
        // Spec var stream starts where the current port_id stream ends, with gap of SKIP_N.
 
        // This gap is entirely unnecessary (i.e. 0 is fine)
 
        // It's purpose is only to make SpecVars easier to spot in logs.
 
        // E.g. spot the spec var: { v0_0, v1_2, v1_103 }
 
        const SKIP_N: u32 = 100;
 
        let port_suffix_stream = self.port_suffix_stream.clone().n_skipped(SKIP_N);
 
        SpecVarStream { connector_id: self.connector_id, port_suffix_stream }
 
    }
 
    fn new_port_id(&mut self) -> PortId {
 
        Id { connector_id: self.connector_id, u32_suffix: self.port_suffix_stream.next() }.into()
 
    }
 
    fn new_proto_component_id(&mut self) -> ProtoComponentId {
 
        Id {
 
            connector_id: self.connector_id,
 
            u32_suffix: self.proto_component_suffix_stream.next(),
 
        }
 
        .into()
 
    }
 
}
 
impl Drop for Connector {
 
    fn drop(&mut self) {
 
        log!(&mut *self.unphased.inner.logger, "Connector dropping. Goodbye!");
 
    }
 
}
 
impl Connector {
 
    pub(crate) fn random_id() -> ConnectorId {
 
        type Bytes8 = [u8; std::mem::size_of::<ConnectorId>()];
 
        unsafe {
 
            let mut bytes = std::mem::MaybeUninit::<Bytes8>::uninit();
 
            // getrandom is the canonical crate for a small, secure rng
 
            getrandom::getrandom(&mut *bytes.as_mut_ptr()).unwrap();
 
            // safe! representations of all valid Byte8 values are valid ConnectorId values
 
            std::mem::transmute::<_, _>(bytes.assume_init())
 
        }
 
    }
 
    pub fn swap_logger(&mut self, mut new_logger: Box<dyn Logger>) -> Box<dyn Logger> {
 
        std::mem::swap(&mut self.unphased.inner.logger, &mut new_logger);
 
        new_logger
 
    }
 
    pub fn get_logger(&mut self) -> &mut dyn Logger {
 
        &mut *self.unphased.inner.logger
 
    }
 
    pub fn new_port_pair(&mut self) -> [PortId; 2] {
 
        let cu = &mut self.unphased;
 
        // adds two new associated ports, related to each other, and exposed to the native
 
        let [o, i] = [cu.inner.id_manager.new_port_id(), cu.inner.id_manager.new_port_id()];
 
        cu.inner.native_ports.insert(o);
 
        cu.inner.native_ports.insert(i);
 
        // {polarity, peer, route} known. {} unknown.
 
        cu.inner.port_info.polarities.insert(o, Putter);
 
        cu.inner.port_info.polarities.insert(i, Getter);
src/runtime/setup.rs
Show inline comments
 
@@ -322,385 +322,385 @@ fn new_endpoint_manager(
 
                                // successfully accepted the active peer
 
                                // reusing the token, but now for the stream and not the listener
 
                                poll.registry().deregister(listener).unwrap();
 
                                poll.registry().register(&mut stream, token, BOTH).unwrap();
 
                                log!(
 
                                    logger,
 
                                    "Endpoint[{}] accepted a connection from {:?}",
 
                                    index,
 
                                    peer_addr
 
                                );
 
                                let net_endpoint = NetEndpoint { stream, inbox: vec![] };
 
                                net_todo.todo_endpoint = TodoEndpoint::NetEndpoint(net_endpoint);
 
                            }
 
                        }
 
                    }
 
                    if let TodoEndpoint::NetEndpoint(net_endpoint) = &mut net_todo.todo_endpoint {
 
                        if event.is_error() {
 
                            if net_todo.endpoint_setup.endpoint_polarity
 
                                == EndpointPolarity::Passive
 
                            {
 
                                // right now you cannot retry an acceptor. return failure
 
                                return Err(Ce::AcceptFailed(
 
                                    net_endpoint.stream.local_addr().unwrap(),
 
                                ));
 
                            }
 
                            // this actively-connecting endpoint failed to connect!
 
                            if net_connect_retry_later.insert(index) {
 
                                log!(
 
                                    logger,
 
                                    "Connection failed for {:?}. List is {:?}",
 
                                    index,
 
                                    net_connect_retry_later.iter()
 
                                );
 
                                poll.registry().deregister(&mut net_endpoint.stream).unwrap();
 
                            } else {
 
                                // spurious wakeup. already scheduled to retry connect later
 
                                continue;
 
                            }
 
                            if waker_state.is_none() {
 
                                log!(logger, "First connect failure. Starting waker thread");
 
                                let arc = Arc::new(WakerState {
 
                                    waker: mio::Waker::new(
 
                                        poll.registry(),
 
                                        TokenTarget::Waker.into(),
 
                                    )
 
                                    .unwrap(),
 
                                    continue_signal: true.into(),
 
                                });
 
                                let moved_arc = arc.clone();
 
                                waker_state = Some(arc);
 
                                std::thread::spawn(move || moved_arc.waker_loop());
 
                            }
 
                            continue;
 
                        }
 
                        // event wasn't ERROR
 
                        if net_connect_retry_later.contains(&index) {
 
                            // spurious wakeup. already scheduled to retry connect later
 
                            continue;
 
                        }
 
                        if !setup_incomplete.contains(&token_target) {
 
                            // spurious wakeup. this endpoint has already been completed!
 
                            if event.is_readable() {
 
                                net_polled_undrained.insert(index);
 
                            }
 
                            continue;
 
                        }
 
                        let local_polarity = *port_info
 
                            .polarities
 
                            .get(&net_todo.endpoint_setup.getter_for_incoming)
 
                            .unwrap();
 
                        if event.is_writable() && !net_todo.sent_local_port {
 
                            // can write and didn't send setup msg yet? Do so!
 
                            let msg = Msg::SetupMsg(SetupMsg::MyPortInfo(MyPortInfo {
 
                                polarity: local_polarity,
 
                                port: net_todo.endpoint_setup.getter_for_incoming,
 
                            }));
 
                            net_endpoint
 
                                .send(&msg)
 
                                .map_err(|e| {
 
                                    Ce::NetEndpointSetupError(
 
                                        net_endpoint.stream.local_addr().unwrap(),
 
                                        e,
 
                                    )
 
                                })
 
                                .unwrap();
 
                            log!(logger, "endpoint[{}] sent msg {:?}", index, &msg);
 
                            net_todo.sent_local_port = true;
 
                        }
 
                        if event.is_readable() && net_todo.recv_peer_port.is_none() {
 
                            // can read and didn't recv setup msg yet? Do so!
 
                            let maybe_msg = net_endpoint.try_recv(logger).map_err(|e| {
 
                                Ce::NetEndpointSetupError(
 
                                    net_endpoint.stream.local_addr().unwrap(),
 
                                    e,
 
                                )
 
                            })?;
 
                            if maybe_msg.is_some() && !net_endpoint.inbox.is_empty() {
 
                                net_polled_undrained.insert(index);
 
                            }
 
                            match maybe_msg {
 
                                None => {} // msg deserialization incomplete
 
                                Some(Msg::SetupMsg(SetupMsg::MyPortInfo(peer_info))) => {
 
                                    log!(
 
                                        logger,
 
                                        "endpoint[{}] got peer info {:?}",
 
                                        index,
 
                                        peer_info
 
                                    );
 
                                    if peer_info.polarity == local_polarity {
 
                                        return Err(ConnectError::PortPeerPolarityMismatch(
 
                                            net_todo.endpoint_setup.getter_for_incoming,
 
                                        ));
 
                                    }
 
                                    net_todo.recv_peer_port = Some(peer_info.port);
 
                                    // 1. finally learned the peer of this port!
 
                                    port_info.peers.insert(
 
                                        net_todo.endpoint_setup.getter_for_incoming,
 
                                        peer_info.port,
 
                                    );
 
                                    // 2. learned the info of this peer port
 
                                    port_info.polarities.insert(peer_info.port, peer_info.polarity);
 
                                    port_info.peers.insert(
 
                                        peer_info.port,
 
                                        net_todo.endpoint_setup.getter_for_incoming,
 
                                    );
 
                                    if let Some(route) = port_info.routes.get(&peer_info.port) {
 
                                        // check just for logging purposes
 
                                        log!(
 
                                            logger,
 
                                            "Special case! Route to peer {:?} already known to be {:?}. Leave untouched",
 
                                            peer_info.port,
 
                                            route
 
                                        );
 
                                    }
 
                                    port_info
 
                                        .routes
 
                                        .entry(peer_info.port)
 
                                        .or_insert(Route::NetEndpoint { index });
 
                                }
 
                                Some(inappropriate_msg) => {
 
                                    log!(
 
                                        logger,
 
                                        "delaying msg {:?} during channel setup phase",
 
                                        inappropriate_msg
 
                                    );
 
                                    delayed_messages.push((index, inappropriate_msg));
 
                                }
 
                            }
 
                        }
 
                        // is the setup for this net_endpoint now complete?
 
                        if net_todo.sent_local_port && net_todo.recv_peer_port.is_some() {
 
                            // yes! connected, sent my info and received peer's info
 
                            setup_incomplete.remove(&token_target);
 
                            log!(logger, "endpoint[{}] is finished!", index);
 
                        }
 
                    }
 
                }
 
            }
 
        }
 
        events.clear();
 
    }
 
    log!(logger, "Endpoint setup complete! Cleaning up and building structures");
 
    if let Some(ws) = waker_state.take() {
 
        ws.waker_stop();
 
    }
 
    let net_endpoint_exts = net_todos
 
        .into_iter()
 
        .enumerate()
 
        .map(|(index, Todo { todo_endpoint, endpoint_setup, .. })| NetEndpointExt {
 
            net_endpoint: match todo_endpoint {
 
                TodoEndpoint::NetEndpoint(mut net_endpoint) => {
 
                    let token = TokenTarget::NetEndpoint { index }.into();
 
                    poll.registry()
 
                        .reregister(&mut net_endpoint.stream, token, Interest::READABLE)
 
                        .unwrap();
 
                    net_endpoint
 
                }
 
                _ => unreachable!(),
 
            },
 
            getter_for_incoming: endpoint_setup.getter_for_incoming,
 
        })
 
        .collect();
 
    let udp_endpoint_exts = udp_todos
 
        .into_iter()
 
        .enumerate()
 
        .map(|(index, udp_todo)| {
 
            let UdpTodo { mut sock, getter_for_incoming } = udp_todo;
 
            let token = TokenTarget::UdpEndpoint { index }.into();
 
            poll.registry().reregister(&mut sock, token, Interest::READABLE).unwrap();
 
            UdpEndpointExt {
 
                sock,
 
                outgoing_payloads: Default::default(),
 
                received_this_round: false,
 
                received_from_this_round: None,
 
                getter_for_incoming,
 
            }
 
        })
 
        .collect();
 
    Ok(EndpointManager {
 
        poll,
 
        events,
 
        undelayed_messages: delayed_messages, // no longer delayed
 
        delayed_messages: Default::default(),
 
        net_endpoint_store: EndpointStore {
 
            endpoint_exts: net_endpoint_exts,
 
            polled_undrained: net_polled_undrained,
 
        },
 
        udp_endpoint_store: EndpointStore {
 
            endpoint_exts: udp_endpoint_exts,
 
            polled_undrained: udp_polled_undrained,
 
        },
 
        udp_in_buffer: Default::default(),
 
    })
 
}
 

	
 
fn init_neighborhood(
 
    connector_id: ConnectorId,
 
    logger: &mut dyn Logger,
 
    em: &mut EndpointManager,
 
    deadline: &Option<Instant>,
 
) -> Result<Neighborhood, ConnectError> {
 
    ////////////////////////////////
 
    use {ConnectError as Ce, Msg::SetupMsg as S, SetupMsg as Sm};
 
    #[derive(Debug)]
 
    struct WaveState {
 
        parent: Option<usize>,
 
        leader: ConnectorId,
 
    }
 
    fn do_wave(
 
        em: &mut EndpointManager,
 
        awaiting: &mut HashSet<usize>,
 
        ws: &WaveState,
 
    ) -> Result<(), ConnectError> {
 
        awaiting.clear();
 
        let msg = S(Sm::LeaderWave { wave_leader: ws.leader });
 
        for index in em.index_iter() {
 
            if Some(index) != ws.parent {
 
                em.send_to_setup(index, &msg)?;
 
                awaiting.insert(index);
 
            }
 
        }
 
        Ok(())
 
    }
 
    ///////////////////////
 
    /*
 
    Conceptually, we have two distinct disstributed algorithms back-to-back
 
    1. Leader election using echo algorithm with extinction.
 
        - Each connector initiates a wave tagged with their ID
 
        - Connectors participate in waves of GREATER ID, abandoning previous waves
 
        - Only the wave of the connector with GREATEST ID completes, whereupon they are the leader
 
    2. Tree construction
 
        - The leader broadcasts their leadership with msg A
 
        - Upon receiving their first announcement, connectors reply B, and send A to all peers
 
        - A controller exits once they have received A or B from each neighbor
 

	
 
    The actual implementation is muddier, because non-leaders aren't aware of termiantion of algorithm 1,
 
    so they rely on receipt of the leader's announcement to realize that algorithm 2 has begun.
 

	
 
    NOTE the distinction between PARENT and LEADER
 
    */
 
    log!(logger, "beginning neighborhood construction");
 
    if em.num_net_endpoints() == 0 {
 
        log!(logger, "Edge case of no neighbors! No parent an no children!");
 
        return Ok(Neighborhood { parent: None, children: VecSet::new(vec![]) });
 
    }
 
    log!(logger, "Have {} endpoints. Must participate in distributed alg.", em.num_net_endpoints());
 
    let mut awaiting = HashSet::with_capacity(em.num_net_endpoints());
 
    // 1+ neighbors. Leader can only be learned by receiving messages
 
    // loop ends when I know my sink tree parent (implies leader was elected)
 
    let election_result: WaveState = {
 
        // initially: No parent, I'm the best leader.
 
        let mut best_wave = WaveState { parent: None, leader: connector_id };
 
        // start a wave for this initial state
 
        do_wave(em, &mut awaiting, &best_wave)?;
 
        // with 1+ neighbors, progress is only made in response to incoming messages
 
        em.undelay_all();
 
        'election: loop {
 
            log!(logger, "Election loop. awaiting {:?}...", awaiting.iter());
 
            let (recv_index, msg) = em.try_recv_any_setup(logger, deadline)?;
 
            log!(logger, "Received from index {:?} msg {:?}", &recv_index, &msg);
 
            match msg {
 
                S(Sm::LeaderAnnounce { tree_leader }) => {
 
                    let election_result =
 
                        WaveState { leader: tree_leader, parent: Some(recv_index) };
 
                    log!(logger, "Election lost! Result {:?}", &election_result);
 
                    assert!(election_result.leader >= best_wave.leader);
 
                    assert_ne!(election_result.leader, connector_id);
 
                    break 'election election_result;
 
                }
 
                S(Sm::LeaderWave { wave_leader }) => {
 
                    use Ordering as O;
 
                    match wave_leader.cmp(&best_wave.leader) {
 
                        O::Less => log!(
 
                            logger,
 
                            "Ignoring wave with Id {:?}<{:?}",
 
                            wave_leader,
 
                            best_wave.leader
 
                        ),
 
                        O::Greater => {
 
                            log!(
 
                                logger,
 
                                "Joining wave with Id {:?}>{:?}",
 
                                wave_leader,
 
                                best_wave.leader
 
                            );
 
                            best_wave = WaveState { leader: wave_leader, parent: Some(recv_index) };
 
                            log!(logger, "New wave state {:?}", &best_wave);
 
                            do_wave(em, &mut awaiting, &best_wave)?;
 
                            if awaiting.is_empty() {
 
                                log!(logger, "Special case! Only neighbor is parent. Replying to {:?} msg {:?}", recv_index, &msg);
 
                                em.send_to_setup(recv_index, &msg)?;
 
                            }
 
                        }
 
                        O::Equal => {
 
                            assert!(awaiting.remove(&recv_index));
 
                            log!(
 
                                logger,
 
                                "Wave reply from index {:?} for leader {:?}. Now awaiting {} replies",
 
                                recv_index,
 
                                best_wave.leader,
 
                                awaiting.len()
 
                            );
 
                            if awaiting.is_empty() {
 
                                if let Some(parent) = best_wave.parent {
 
                                    log!(
 
                                        logger,
 
                                        "Sub-wave done! replying to parent {:?} msg {:?}",
 
                                        parent,
 
                                        &msg
 
                                    );
 
                                    em.send_to_setup(parent, &msg)?;
 
                                } else {
 
                                    let election_result: WaveState = best_wave;
 
                                    log!(logger, "Election won! Result {:?}", &election_result);
 
                                    break 'election election_result;
 
                                }
 
                            }
 
                        }
 
                    }
 
                }
 
                msg @ S(Sm::YouAreMyParent) | msg @ S(Sm::MyPortInfo(_)) => {
 
                    log!(logger, "Endpont {:?} sent unexpected msg! {:?}", recv_index, &msg);
 
                    return Err(Ce::SetupAlgMisbehavior);
 
                }
 
                msg @ S(Sm::SessionScatter { .. })
 
                | msg @ S(Sm::SessionGather { .. })
 
                | msg @ Msg::CommMsg { .. } => {
 
                    log!(logger, "delaying msg {:?} during election algorithm", msg);
 
                    em.delayed_messages.push((recv_index, msg));
 
                }
 
            }
 
        }
 
    };
 

	
 
    // starting algorithm 2. Send a message to every neighbor
 
    log!(logger, "Starting tree construction. Step 1: send one msg per neighbor");
 
    awaiting.clear();
 
    for index in em.index_iter() {
 
        if Some(index) == election_result.parent {
 
            em.send_to_setup(index, &S(Sm::YouAreMyParent))?;
 
        } else {
 
            awaiting.insert(index);
 
            em.send_to_setup(
 
                index,
 
                &S(Sm::LeaderAnnounce { tree_leader: election_result.leader }),
 
            )?;
 
        }
 
    }
 
    let mut children = vec![];
 
    em.undelay_all();
 
    while !awaiting.is_empty() {
 
        log!(logger, "Tree construction_loop loop. awaiting {:?}...", awaiting.iter());
 
        let (recv_index, msg) = em.try_recv_any_setup(logger, deadline)?;
 
        log!(logger, "Received from index {:?} msg {:?}", &recv_index, &msg);
 
        match msg {
 
            S(Sm::LeaderAnnounce { .. }) => {
 
                // not a child
 
                log!(
 
                    logger,
 
                    "Got reply from non-child index {:?}. Children: {:?}",
 
                    recv_index,
 
                    children.iter()
 
                );
 
                if !awaiting.remove(&recv_index) {
 
                    return Err(Ce::SetupAlgMisbehavior);
 
                }
0 comments (0 inline, 0 general)