Changeset - 70e2e44574a6
[Not reviewed]
0 3 0
Christopher Esterhuyse - 5 years ago 2020-07-24 09:11:36
christopher.esterhuyse@gmail.com
added minor util + doc comments to connector API
3 files changed with 18 insertions and 7 deletions:
0 comments (0 inline, 0 general)
Cargo.toml
Show inline comments
 
[package]
 
name = "reowolf_rs"
 
version = "0.1.4"
 
authors = [
 
	"Christopher Esterhuyse <esterhuy@cwi.nl, christopher.esterhuyse@gmail.com>",
 
	"Hans-Dieter Hiep <hdh@cwi.nl>"
 
]
 
edition = "2018"
 

	
 
[dependencies]
 
# convenience macros
 
maplit = "1.0.2"
 
derive_more = "0.99.2"
 

	
 
# runtime
 
bincode = "1.3.1"
 
serde = { version = "1.0.114", features = ["derive"] }
 
getrandom = "0.1.14" # tiny crate. used to guess controller-id
 

	
 
# network
 
mio = { version = "0.7.0", package = "mio", features = ["udp", "tcp", "os-poll"] }
 
socket2 = { version = "0.3.12", optional = true }
 

	
 
# protocol
 
backtrace = "0.3"
 
lazy_static = "1.4.0"
 

	
 
# ffi
 

	
 
# socket ffi
 
libc = { version = "^0.2", optional = true }
 
os_socketaddr = { version = "0.1.0", optional = true }
 

	
 
[dev-dependencies]
 
# test-generator = "0.3.0"
 
crossbeam-utils = "0.7.2"
 
lazy_static = "1.4.0"
 

	
 
[lib]
 
# compile target: dynamically linked library using C ABI
 
crate-type = ["cdylib"]
 

	
 
[features]
 
default = ["ffi", "session_optimization", "ffi_pseudo_socket_api"]
 
default = ["ffi", "session_optimization"]
 
ffi = [] # see src/ffi/mod.rs
 
ffi_pseudo_socket_api = ["ffi", "libc", "os_socketaddr"]# see src/ffi/pseudo_socket_api.rs
 
endpoint_logging = [] # see src/macros.rs
 
session_optimization = [] # see src/runtime/setup.rs
 
\ No newline at end of file
src/ffi/pseudo_socket_api.rs
Show inline comments
 
use super::*;
 

	
 
use libc::{sockaddr, socklen_t};
 
use core::ops::DerefMut;
 
use libc::{sockaddr, socklen_t};
 
use std::{collections::HashMap, ffi::c_void, net::SocketAddr, os::raw::c_int, sync::RwLock};
 
///////////////////////////////////////////////////////////////////
 

	
 
struct FdAllocator {
 
    next: Option<c_int>,
 
    freed: Vec<c_int>,
 
}
 
enum ConnectorComplexPhased {
 
    Setup { local: Option<SocketAddr>, peer: Option<SocketAddr> },
 
    Communication { putter: PortId, getter: PortId },
 
}
 
struct ConnectorComplex {
 
    // invariant: .connector.phased and .phased are variants Setup/Communication in lockstep.
 
    connector: Connector,
 
    phased: ConnectorComplexPhased,
 
}
 
#[derive(Default)]
 
struct CcMap {
 
    fd_to_cc: HashMap<c_int, RwLock<ConnectorComplex>>,
 
    fd_allocator: FdAllocator,
 
}
 
///////////////////////////////////////////////////////////////////
 
unsafe fn payload_from_raw(bytes_ptr: *const c_void, bytes_len: usize) -> Payload {
 
    let bytes_ptr = std::mem::transmute(bytes_ptr);
 
    let bytes = &*slice_from_raw_parts(bytes_ptr, bytes_len);
 
    Payload::from(bytes)
 
}
 
unsafe fn libc_to_std_sockaddr(addr: *const sockaddr, addr_len: socklen_t) -> Option<SocketAddr> {
 
    os_socketaddr::OsSocketAddr::from_raw_parts(addr as _, addr_len as usize).into_addr()
 
}
 
impl Default for FdAllocator {
 
    fn default() -> Self {
 
        Self {
 
            next: Some(0), // positive values used only
 
            freed: vec![],
 
        }
 
        // negative FDs aren't used s.t. they are available for error signalling
 
        Self { next: Some(0), freed: vec![] }
 
    }
 
}
 
impl FdAllocator {
 
    fn alloc(&mut self) -> c_int {
 
        if let Some(fd) = self.freed.pop() {
 
            return fd;
 
        }
 
        if let Some(fd) = self.next {
 
            self.next = fd.checked_add(1);
 
            return fd;
 
        }
 
        panic!("No more Connector FDs to allocate!")
 
    }
 
    fn free(&mut self, fd: c_int) {
 
        self.freed.push(fd);
 
    }
 
}
 
lazy_static::lazy_static! {
 
    static ref CC_MAP: RwLock<CcMap> = Default::default();
 
}
 
impl ConnectorComplex {
 
    fn try_become_connected(&mut self) {
 
        match self.phased {
 
            ConnectorComplexPhased::Setup { local: Some(local), peer: Some(peer) } => {
 
                // complete setup
 
                let [putter, getter] = self.connector.new_udp_mediator_component(local, peer).unwrap();
 
                let [putter, getter] =
 
                    self.connector.new_udp_mediator_component(local, peer).unwrap();
 
                self.connector.connect(None).unwrap();
 
                self.phased = ConnectorComplexPhased::Communication { putter, getter }
 
            }
 
            _ => {} // setup incomplete
 
        }
 
    }
 
}
 
/////////////////////////////////
 
#[no_mangle]
 
pub extern "C" fn rw_socket(_domain: c_int, _type: c_int) -> c_int {
 
    // ignoring domain and type
 
    // get writer lock
 
    let mut w = if let Ok(w) = CC_MAP.write() { w } else { return LOCK_POISONED };
 
    let fd = w.fd_allocator.alloc();
 
    let cc = ConnectorComplex {
 
        connector: Connector::new(
 
            Box::new(crate::DummyLogger),
 
            crate::TRIVIAL_PD.clone(),
 
            Connector::random_id(),
 
        ),
 
        phased: ConnectorComplexPhased::Setup { local: None, peer: None },
 
    };
 
    w.fd_to_cc.insert(fd, RwLock::new(cc));
 
    fd
 
}
 
#[no_mangle]
 
pub extern "C" fn rw_close(fd: c_int, _how: c_int) -> c_int {
 
    // ignoring HOW
 
    // get writer lock
 
    let mut w = if let Ok(w) = CC_MAP.write() { w } else { return LOCK_POISONED };
 
    if w.fd_to_cc.remove(&fd).is_some() {
 
        w.fd_allocator.free(fd);
 
        ERR_OK
 
    } else {
 
        CLOSE_FAIL
 
    }
 
}
 
#[no_mangle]
 
pub unsafe extern "C" fn rw_bind(fd: c_int, addr: *const sockaddr, addr_len: socklen_t) -> c_int {
 
    // assuming _domain is AF_INET and _type is SOCK_DGRAM
 
    let addr = match libc_to_std_sockaddr(addr, addr_len) {
 
        Some(addr) => addr,
 
        _ => return BAD_SOCKADDR,
 
    };
 
    // get outer reader, inner writer locks
 
    let r = if let Ok(r) = CC_MAP.read() { r } else { return LOCK_POISONED };
 
    let cc = if let Some(cc) = r.fd_to_cc.get(&fd) { cc } else { return BAD_FD };
 
    let mut cc = if let Ok(cc) = cc.write() { cc } else { return LOCK_POISONED };
src/runtime/mod.rs
Show inline comments
 
@@ -334,96 +334,107 @@ impl PortInfo {
 
        SpecVar(match self.polarities.get(&port).unwrap() {
 
            Getter => port,
 
            Putter => *self.peers.get(&port).unwrap(),
 
        })
 
    }
 
}
 
impl SpecVarStream {
 
    fn next(&mut self) -> SpecVar {
 
        let phantom_port: PortId =
 
            Id { connector_id: self.connector_id, u32_suffix: self.port_suffix_stream.next() }
 
                .into();
 
        SpecVar(phantom_port)
 
    }
 
}
 
impl IdManager {
 
    fn new(connector_id: ConnectorId) -> Self {
 
        Self {
 
            connector_id,
 
            port_suffix_stream: Default::default(),
 
            proto_component_suffix_stream: Default::default(),
 
        }
 
    }
 
    fn new_spec_var_stream(&self) -> SpecVarStream {
 
        // Spec var stream starts where the current port_id stream ends, with gap of SKIP_N.
 
        // This gap is entirely unnecessary (i.e. 0 is fine)
 
        // It's purpose is only to make SpecVars easier to spot in logs.
 
        // E.g. spot the spec var: { v0_0, v1_2, v1_103 }
 
        const SKIP_N: u32 = 100;
 
        let port_suffix_stream = self.port_suffix_stream.clone().n_skipped(SKIP_N);
 
        SpecVarStream { connector_id: self.connector_id, port_suffix_stream }
 
    }
 
    fn new_port_id(&mut self) -> PortId {
 
        Id { connector_id: self.connector_id, u32_suffix: self.port_suffix_stream.next() }.into()
 
    }
 
    fn new_proto_component_id(&mut self) -> ProtoComponentId {
 
        Id {
 
            connector_id: self.connector_id,
 
            u32_suffix: self.proto_component_suffix_stream.next(),
 
        }
 
        .into()
 
    }
 
}
 
impl Drop for Connector {
 
    fn drop(&mut self) {
 
        log!(&mut *self.unphased.inner.logger, "Connector dropping. Goodbye!");
 
    }
 
}
 
impl Connector {
 
    pub fn is_connected(&self) -> bool {
 
        // If designed for Rust usage, connectors would be exposed as an enum type from the start.
 
        // consequently, this "phased" business would also include connector variants and this would
 
        // get a lot closer to the connector impl. itself.
 
        // Instead, the C-oriented implementation doesn't distinguish connector states as types,
 
        // and distinguish them as enum variants instead
 
        match self.phased {
 
            ConnectorPhased::Setup(..) => false,
 
            ConnectorPhased::Communication(..) => true,
 
        }
 
    }
 
    pub(crate) fn random_id() -> ConnectorId {
 
        type Bytes8 = [u8; std::mem::size_of::<ConnectorId>()];
 
        unsafe {
 
            let mut bytes = std::mem::MaybeUninit::<Bytes8>::uninit();
 
            // getrandom is the canonical crate for a small, secure rng
 
            getrandom::getrandom(&mut *bytes.as_mut_ptr()).unwrap();
 
            // safe! representations of all valid Byte8 values are valid ConnectorId values
 
            std::mem::transmute::<_, _>(bytes.assume_init())
 
        }
 
    }
 
    pub fn swap_logger(&mut self, mut new_logger: Box<dyn Logger>) -> Box<dyn Logger> {
 
        std::mem::swap(&mut self.unphased.inner.logger, &mut new_logger);
 
        new_logger
 
    }
 
    pub fn get_logger(&mut self) -> &mut dyn Logger {
 
        &mut *self.unphased.inner.logger
 
    }
 
    pub fn new_port_pair(&mut self) -> [PortId; 2] {
 
        let cu = &mut self.unphased;
 
        // adds two new associated ports, related to each other, and exposed to the native
 
        let [o, i] = [cu.inner.id_manager.new_port_id(), cu.inner.id_manager.new_port_id()];
 
        cu.inner.native_ports.insert(o);
 
        cu.inner.native_ports.insert(i);
 
        // {polarity, peer, route} known. {} unknown.
 
        cu.inner.port_info.polarities.insert(o, Putter);
 
        cu.inner.port_info.polarities.insert(i, Getter);
 
        cu.inner.port_info.peers.insert(o, i);
 
        cu.inner.port_info.peers.insert(i, o);
 
        let route = Route::LocalComponent(ComponentId::Native);
 
        cu.inner.port_info.routes.insert(o, route);
 
        cu.inner.port_info.routes.insert(i, route);
 
        log!(cu.inner.logger, "Added port pair (out->in) {:?} -> {:?}", o, i);
 
        [o, i]
 
    }
 
    pub fn add_component(
 
        &mut self,
 
        identifier: &[u8],
 
        ports: &[PortId],
 
    ) -> Result<(), AddComponentError> {
 
        // called by the USER. moves ports owned by the NATIVE
 
        use AddComponentError as Ace;
 
        // 1. check if this is OK
 
        let cu = &mut self.unphased;
 
        let polarities = cu.proto_description.component_polarities(identifier)?;
 
        if polarities.len() != ports.len() {
 
            return Err(Ace::WrongNumberOfParamaters { expected: polarities.len() });
 
        }
 
        for (&expected_polarity, port) in polarities.iter().zip(ports.iter()) {
0 comments (0 inline, 0 general)