Changeset - d600dd299dde
[Not reviewed]
1 8 0
Christopher Esterhuyse - 5 years ago 2020-06-25 10:23:53
christopher.esterhuyse@gmail.com
additional FFI
9 files changed with 355 insertions and 196 deletions:
0 comments (0 inline, 0 general)
cbindgen.toml
Show inline comments
 
language = "C"
 
header = "/* CBindgen generated */"
 
include_guard = "REOWOLF_HEADER_DEFINED"
 
\ No newline at end of file
 
include_guard = "REOWOLF_HEADER_DEFINED"
 
[parse]
 
parse_deps = false
 
\ No newline at end of file
reowolf_old.h
Show inline comments
 
deleted file
src/common.rs
Show inline comments
 
@@ -34,6 +34,7 @@ pub type PortSuffix = u32;
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
#[repr(C)]
 
pub struct Id {
 
    pub(crate) connector_id: ConnectorId,
 
    pub(crate) u32_suffix: PortSuffix,
 
@@ -48,6 +49,7 @@ pub struct U32Stream {
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
#[repr(transparent)]
 
pub struct PortId(Id);
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
@@ -59,11 +61,13 @@ pub struct FiringVar(pub(crate) PortId);
 
pub struct ProtoComponentId(Id);
 

	
 
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
 
#[repr(C)]
 
pub struct Payload(Arc<Vec<u8>>);
 

	
 
#[derive(
 
    Debug, Eq, PartialEq, Clone, Hash, Copy, Ord, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
#[repr(C)]
 
pub enum Polarity {
 
    Putter, // output port (from the perspective of the component)
 
    Getter, // input port (from the perspective of the component)
 
@@ -71,6 +75,7 @@ pub enum Polarity {
 
#[derive(
 
    Debug, Eq, PartialEq, Clone, Hash, Copy, Ord, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
#[repr(C)]
 
pub enum EndpointPolarity {
 
    Active,  // calls connect()
 
    Passive, // calls bind() listen() accept()
src/protocol/mod.rs
Show inline comments
 
@@ -13,6 +13,7 @@ use crate::protocol::inputsource::*;
 
use crate::protocol::parser::*;
 

	
 
#[derive(serde::Serialize, serde::Deserialize)]
 
#[repr(C)]
 
pub struct ProtocolDescription {
 
    heap: Heap,
 
    source: InputSource,
src/runtime/communication.rs
Show inline comments
 
@@ -53,13 +53,11 @@ impl Connector {
 
        let Self { phased, .. } = self;
 
        match phased {
 
            ConnectorPhased::Setup { .. } => Err(NoPreviousRound),
 
            ConnectorPhased::Communication(ConnectorCommunication { round_result, .. }) => {
 
                match round_result {
 
                    Err(_) => Err(PreviousSyncFailed),
 
                    Ok(None) => Err(NoPreviousRound),
 
                    Ok(Some(round_ok)) => round_ok.gotten.get(&port).ok_or(PortDidntGet),
 
                }
 
            }
 
            ConnectorPhased::Communication(comm) => match &comm.round_result {
 
                Err(_) => Err(PreviousSyncFailed),
 
                Ok(None) => Err(NoPreviousRound),
 
                Ok(Some(round_ok)) => round_ok.gotten.get(&port).ok_or(PortDidntGet),
 
            },
 
        }
 
    }
 
    pub fn next_batch(&mut self) -> Result<usize, NextBatchError> {
 
@@ -68,9 +66,9 @@ impl Connector {
 
        let Self { phased, .. } = self;
 
        match phased {
 
            ConnectorPhased::Setup { .. } => Err(NotConnected),
 
            ConnectorPhased::Communication(ConnectorCommunication { native_batches, .. }) => {
 
                native_batches.push(Default::default());
 
                Ok(native_batches.len() - 1)
 
            ConnectorPhased::Communication(comm) => {
 
                comm.native_batches.push(Default::default());
 
                Ok(comm.native_batches.len() - 1)
 
            }
 
        }
 
    }
 
@@ -91,8 +89,8 @@ impl Connector {
 
        }
 
        match phased {
 
            ConnectorPhased::Setup { .. } => Err(NotConnected),
 
            ConnectorPhased::Communication(ConnectorCommunication { native_batches, .. }) => {
 
                let batch = native_batches.last_mut().unwrap(); // length >= invariant
 
            ConnectorPhased::Communication(comm) => {
 
                let batch = comm.native_batches.last_mut().unwrap(); // length >= invariant
 
                Ok(batch)
 
            }
 
        }
src/runtime/error.rs
Show inline comments
 
@@ -51,3 +51,8 @@ pub enum GottenError {
 
pub enum NextBatchError {
 
    NotConnected,
 
}
 

	
 
#[derive(Debug, Eq, PartialEq)]
 
pub enum NewNetPortError {
 
    AlreadyConnected,
 
}
src/runtime/ffi.rs
Show inline comments
 
use super::*;
 

	
 
use core::cell::RefCell;
 
use std::os::raw::{c_char, c_int, c_uchar, c_uint};
 
use core::convert::TryFrom;
 
use std::slice::from_raw_parts as slice_from_parts;
 
// use std::os::raw::{c_char, c_int, c_uchar, c_uint};
 

	
 
#[derive(Default)]
 
struct StoredError {
 
@@ -15,24 +17,35 @@ impl StoredError {
 
        // no null terminator either!
 
        self.buf.clear();
 
    }
 
    fn store<E: Debug>(&mut self, error: &E) {
 
        write!(&mut self.buf, "{:?}", error);
 
    fn debug_store<E: Debug>(&mut self, error: &E) {
 
        let _ = write!(&mut self.buf, "{:?}", error);
 
        self.buf.push(Self::NULL_TERMINATOR);
 
    }
 
    fn tl_store<E: Debug>(error: &E) {
 
    fn tl_debug_store<E: Debug>(error: &E) {
 
        STORED_ERROR.with(|stored_error| {
 
            let mut stored_error = stored_error.borrow_mut();
 
            stored_error.clear();
 
            stored_error.store(error);
 
            stored_error.debug_store(error);
 
        })
 
    }
 
    // fn bytes_store(&mut self, bytes: &[u8]) {
 
    //     let _ = self.buf.write_all(bytes);
 
    //     self.buf.push(Self::NULL_TERMINATOR);
 
    // }
 
    // fn tl_bytes_store(bytes: &[u8]) {
 
    //     STORED_ERROR.with(|stored_error| {
 
    //         let mut stored_error = stored_error.borrow_mut();
 
    //         stored_error.clear();
 
    //         stored_error.bytes_store(bytes);
 
    //     })
 
    // }
 
    fn tl_clear() {
 
        STORED_ERROR.with(|stored_error| {
 
            let mut stored_error = stored_error.borrow_mut();
 
            stored_error.clear();
 
        })
 
    }
 
    fn tl_raw_peek() -> (*const u8, usize) {
 
    fn tl_bytes_peek() -> (*const u8, usize) {
 
        STORED_ERROR.with(|stored_error| {
 
            let stored_error = stored_error.borrow();
 
            match stored_error.buf.len() {
 
@@ -51,7 +64,7 @@ thread_local! {
 

	
 
type ErrorCode = i32;
 

	
 
//////////////////////////////////////
 
///////////////////// REOWOLF //////////////////////////
 

	
 
/// Returns length (via out pointer) and pointer (via return value) of the last Reowolf error.
 
/// - pointer is NULL iff there was no last error
 
@@ -59,51 +72,331 @@ type ErrorCode = i32;
 
/// - len does NOT include the length of the null-delimiter
 
#[no_mangle]
 
pub unsafe extern "C" fn reowolf_error_peek(len: *mut usize) -> *const u8 {
 
    let (err_ptr, err_len) = StoredError::tl_raw_peek();
 
    let (err_ptr, err_len) = StoredError::tl_bytes_peek();
 
    len.write(err_len);
 
    err_ptr
 
}
 

	
 
///////////////////// PROTOCOL DESCRIPTION //////////////////////////
 

	
 
/// Parses the utf8-encoded string slice to initialize a new protocol description object.
 
/// - On success, initializes `out` and returns 0
 
/// - On failure, stores an error string (see `reowolf_error_peek`) and returns -1
 
#[no_mangle]
 
pub unsafe extern "C" fn protocol_description_parse(
 
    pdl: *const u8,
 
    pdl_len: usize,
 
    pd: *mut Arc<ProtocolDescription>,
 
    out: *mut Arc<ProtocolDescription>,
 
) -> ErrorCode {
 
    StoredError::tl_clear();
 
    let slice: *const [u8] = std::slice::from_raw_parts(pdl, pdl_len);
 
    let slice: &[u8] = &*slice;
 
    match ProtocolDescription::parse(slice) {
 
    match ProtocolDescription::parse(&*slice_from_parts(pdl, pdl_len)) {
 
        Ok(new) => {
 
            pd.write(Arc::new(new));
 
            out.write(Arc::new(new));
 
            0
 
        }
 
        Err(err) => {
 
            StoredError::tl_store(&err);
 
            StoredError::tl_debug_store(&err);
 
            -1
 
        }
 
    }
 
}
 

	
 
/// Destroys the given initialized protocol description and frees its resources.
 
#[no_mangle]
 
pub unsafe extern "C" fn protocol_description_destroy(pd: Arc<ProtocolDescription>) {
 
    drop(pd)
 
pub unsafe extern "C" fn protocol_description_destroy(pd: *mut Arc<ProtocolDescription>) {
 
    drop(Box::from_raw(pd))
 
}
 

	
 
/// Given an initialized protocol description, initializes `out` with a clone which can be independently created or destroyed.
 
#[no_mangle]
 
pub unsafe extern "C" fn protocol_description_clone(
 
    pd: &Arc<ProtocolDescription>,
 
) -> Arc<ProtocolDescription> {
 
    pd.clone()
 
    out: *mut Arc<ProtocolDescription>,
 
) {
 
    out.write(pd.clone());
 
}
 

	
 
///////////////////// CONNECTOR //////////////////////////
 

	
 
/// Initializes `out` with a new connector using the given protocol description as its configuration.
 
/// The connector is assigned a random (internal) connector ID.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_new(pd: &Arc<ProtocolDescription>, out: *mut Connector) {
 
    connector_new_with_id(pd, random_connector_id(), out)
 
}
 

	
 
/// Initializes `out` with a new connector using the given protocol description as its configuration.
 
/// The connector uses the given (internal) connector ID.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_new_with_id(
 
    pd: &Arc<ProtocolDescription>,
 
    cid: ConnectorId,
 
    out: *mut Connector,
 
) {
 
    out.write(Connector::new(Box::new(DummyLogger), pd.clone(), cid, 8))
 
}
 

	
 
/// Destroys the given initialized connector and frees its resources.
 
/// Usable in {setup, communication} states.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_destroy(connector: Connector) {
 
    drop(connector)
 
}
 

	
 
/// Given an initialized connector in setup or connecting state,
 
/// - Creates a new directed port pair with logical channel putter->getter,
 
/// - adds the ports to the native component's interface,
 
/// - and returns them using the given out pointers.
 
/// Usable in {setup, communication} states.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_add_port_pair(
 
    connector: &mut Connector,
 
    out_putter: *mut PortId,
 
    out_getter: *mut PortId,
 
) {
 
    let [o, i] = connector.new_port_pair();
 
    out_putter.write(o);
 
    out_getter.write(i);
 
}
 

	
 
/// Given
 
/// - an initialized connector in setup or connecting state,
 
/// - a string slice for the component's identifier in the connector's configured protocol description,
 
/// - a set of ports (represented as a slice; duplicates are ignored) in the native component's interface,
 
/// the connector creates a new (internal) protocol component C, such that the set of native ports are moved to C.
 
/// Usable in {setup, communication} states.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_add_component(
 
    connector: &mut Connector,
 
    ident_ptr: *const u8,
 
    ident_len: usize,
 
    ports_ptr: *const PortId,
 
    ports_len: usize,
 
) -> ErrorCode {
 
    StoredError::tl_clear();
 
    match connector.add_component(
 
        &*slice_from_parts(ident_ptr, ident_len),
 
        &*slice_from_parts(ports_ptr, ports_len),
 
    ) {
 
        Ok(()) => 0,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            -1
 
        }
 
    }
 
}
 

	
 
/// Given
 
/// - an initialized connector in setup or connecting state,
 
/// - a utf-8 encoded socket address,
 
/// - the logical polarity of P,
 
/// - the "physical" polarity in {Active, Passive} of the endpoint through which P's peer will be discovered,
 
/// returns P, a port newly added to the native interface.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_add_net_port(
 
    connector: &mut Connector,
 
    addr_str_ptr: *const u8,
 
    addr_str_len: usize,
 
    port_polarity: Polarity,
 
    endpoint_polarity: EndpointPolarity,
 
    port: *mut PortId,
 
) -> ErrorCode {
 
    StoredError::tl_clear();
 
    let addr_bytes = &*slice_from_parts(addr_str_ptr, addr_str_len);
 
    let addr_str = match std::str::from_utf8(addr_bytes) {
 
        Ok(addr_str) => addr_str,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            return -1;
 
        }
 
    };
 
    let sock_address: SocketAddr = match addr_str.parse() {
 
        Ok(addr) => addr,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            return -2;
 
        }
 
    };
 
    match connector.new_net_port(port_polarity, sock_address, endpoint_polarity) {
 
        Ok(p) => {
 
            port.write(p);
 
            0
 
        }
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            -3
 
        }
 
    }
 
}
 

	
 
/// Connects this connector to the distributed system of connectors reachable through endpoints,
 
/// Usable in setup state, and changes the state to communication.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_connect(
 
    connector: &mut Connector,
 
    timeout_millis: i64,
 
) -> ErrorCode {
 
    StoredError::tl_clear();
 
    let option_timeout_millis: Option<u64> = TryFrom::try_from(timeout_millis).ok();
 
    let timeout = option_timeout_millis.map(Duration::from_millis);
 
    match connector.connect(timeout) {
 
        Ok(()) => 0,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            -1
 
        }
 
    }
 
}
 

	
 
// #[no_mangle]
 
// pub extern "C" fn connector_new(pd: *const Arc<ProtocolDescription>) -> *mut Connector {
 
//     Box::into_raw(Box::new(Connector::default()))
 
// }
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_put_payload(
 
    connector: &mut Connector,
 
    port: PortId,
 
    payload: *mut Payload,
 
) -> ErrorCode {
 
    match connector.put(port, payload.read()) {
 
        Ok(()) => 0,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            -1
 
        }
 
    }
 
}
 

	
 
// /// Creates and returns Reowolf Connector structure allocated on the heap.
 
// #[no_mangle]
 
// pub extern "C" fn connector_with_controller_id(controller_id: ControllerId) -> *mut Connector {
 
//     Box::into_raw(Box::new(Connector::Unconfigured(Unconfigured { controller_id })))
 
// }
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_put_payload_cloning(
 
    connector: &mut Connector,
 
    port: PortId,
 
    payload: &Payload,
 
) -> ErrorCode {
 
    match connector.put(port, payload.clone()) {
 
        Ok(()) => 0,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            -1
 
        }
 
    }
 
}
 

	
 
/// Convenience function combining the functionalities of
 
/// "payload_new" with "connector_put_payload".
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_put_bytes(
 
    connector: &mut Connector,
 
    port: PortId,
 
    bytes_ptr: *const u8,
 
    bytes_len: usize,
 
) -> ErrorCode {
 
    StoredError::tl_clear();
 
    let payload: Payload = {
 
        let payload: *mut Payload = std::ptr::null_mut(); // uninitialized
 
        payload_new(bytes_ptr, bytes_len, payload); // initializes;
 
        payload.read()
 
    };
 
    match connector.put(port, payload) {
 
        Ok(()) => 0,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            -1
 
        }
 
    }
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_get(connector: &mut Connector, port: PortId) -> ErrorCode {
 
    match connector.get(port) {
 
        Ok(()) => 0,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            -1
 
        }
 
    }
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_next_batch(connector: &mut Connector) -> isize {
 
    match connector.next_batch() {
 
        Ok(n) => n as isize,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            -1
 
        }
 
    }
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_sync(connector: &mut Connector, timeout_millis: i64) -> isize {
 
    StoredError::tl_clear();
 
    let option_timeout_millis: Option<u64> = TryFrom::try_from(timeout_millis).ok();
 
    let timeout = option_timeout_millis.map(Duration::from_millis);
 
    match connector.sync(timeout) {
 
        Ok(n) => n as isize,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            -1
 
        }
 
    }
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_gotten_bytes(
 
    connector: &mut Connector,
 
    port: PortId,
 
    len: *mut usize,
 
) -> *const u8 {
 
    StoredError::tl_clear();
 
    match connector.gotten(port) {
 
        Ok(payload_borrow) => {
 
            let slice = payload_borrow.as_slice();
 
            len.write(slice.len());
 
            slice.as_ptr()
 
        }
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            std::ptr::null()
 
        }
 
    }
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_gotten_payload(
 
    connector: &mut Connector,
 
    port: PortId,
 
) -> *const Payload {
 
    StoredError::tl_clear();
 
    match connector.gotten(port) {
 
        Ok(payload_borrow) => payload_borrow,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            std::ptr::null()
 
        }
 
    }
 
}
 

	
 
///////////////////// PAYLOAD //////////////////////////
 
#[no_mangle]
 
pub unsafe extern "C" fn payload_new(
 
    bytes_ptr: *const u8,
 
    bytes_len: usize,
 
    out_payload: *mut Payload,
 
) {
 
    let bytes: &[u8] = &*slice_from_parts(bytes_ptr, bytes_len);
 
    out_payload.write(Payload::from(bytes));
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn payload_destroy(payload: *mut Payload) {
 
    drop(Box::from_raw(payload))
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn payload_clone(payload: &Payload, out_payload: *mut Payload) {
 
    out_payload.write(payload.clone())
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn payload_peek_bytes(payload: &Payload, bytes_len: *mut usize) -> *const u8 {
 
    let slice = payload.as_slice();
 
    bytes_len.write(slice.len());
 
    slice.as_ptr()
 
}
src/runtime/mod.rs
Show inline comments
 
@@ -151,6 +151,7 @@ pub struct PortInfo {
 
    routes: HashMap<PortId, Route>,
 
}
 
#[derive(Debug)]
 
// #[repr(C)]
 
pub struct Connector {
 
    unphased: ConnectorUnphased,
 
    phased: ConnectorPhased,
 
@@ -176,7 +177,7 @@ pub struct ConnectorUnphased {
 
#[derive(Debug)]
 
pub enum ConnectorPhased {
 
    Setup { endpoint_setups: Vec<(PortId, EndpointSetup)>, surplus_sockets: u16 },
 
    Communication(ConnectorCommunication),
 
    Communication(Box<ConnectorCommunication>),
 
}
 
#[derive(Default, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
pub struct Predicate {
 
@@ -203,7 +204,12 @@ pub struct SyncProtoContext<'a> {
 
    inbox: &'a HashMap<PortId, Payload>,
 
}
 
////////////////
 

	
 
pub fn random_connector_id() -> ConnectorId {
 
    type Bytes8 = [u8; std::mem::size_of::<ConnectorId>()];
 
    let mut bytes = Bytes8::default();
 
    getrandom::getrandom(&mut bytes).unwrap();
 
    unsafe { std::mem::transmute::<Bytes8, ConnectorId>(bytes) }
 
}
 
pub fn would_block(err: &std::io::Error) -> bool {
 
    err.kind() == std::io::ErrorKind::WouldBlock
 
}
src/runtime/setup.rs
Show inline comments
 
@@ -26,9 +26,10 @@ impl Connector {
 
        polarity: Polarity,
 
        sock_addr: SocketAddr,
 
        endpoint_polarity: EndpointPolarity,
 
    ) -> Result<PortId, ()> {
 
    ) -> Result<PortId, NewNetPortError> {
 
        let Self { unphased: up, phased } = self;
 
        match phased {
 
            ConnectorPhased::Communication { .. } => Err(NewNetPortError::AlreadyConnected),
 
            ConnectorPhased::Setup { endpoint_setups, .. } => {
 
                let endpoint_setup = EndpointSetup { sock_addr, endpoint_polarity };
 
                let p = up.id_manager.new_port_id();
 
@@ -46,7 +47,6 @@ impl Connector {
 
                endpoint_setups.push((p, endpoint_setup));
 
                Ok(p)
 
            }
 
            ConnectorPhased::Communication { .. } => Err(()),
 
        }
 
    }
 
    pub fn connect(&mut self, timeout: Option<Duration>) -> Result<(), ConnectError> {
 
@@ -90,7 +90,7 @@ impl Connector {
 
                };
 
                session_optimize(cu, &mut comm, deadline)?;
 
                log!(cu.logger, "connect() finished. setup phase complete");
 
                self.phased = ConnectorPhased::Communication(comm);
 
                self.phased = ConnectorPhased::Communication(Box::new(comm));
 
                Ok(())
 
            }
 
        }
 
@@ -141,10 +141,6 @@ fn new_endpoint_manager(
 
            recv_peer_port: None,
 
            endpoint_setup: endpoint_setup.clone(),
 
        })
 
    };
 
    struct WakerState {
 
        continue_signal: Arc<AtomicBool>,
 
        failed_indices: HashSet<usize>,
 
    }
 
    ////////////////////////////////////////////
 

	
0 comments (0 inline, 0 general)