Changeset - d600dd299dde
[Not reviewed]
1 8 0
Christopher Esterhuyse - 5 years ago 2020-06-25 10:23:53
christopher.esterhuyse@gmail.com
additional FFI
9 files changed with 355 insertions and 196 deletions:
0 comments (0 inline, 0 general)
cbindgen.toml
Show inline comments
 
language = "C"
 
header = "/* CBindgen generated */"
 
include_guard = "REOWOLF_HEADER_DEFINED"
 
\ No newline at end of file
 
include_guard = "REOWOLF_HEADER_DEFINED"
 
[parse]
 
parse_deps = false
 
\ No newline at end of file
reowolf_old.h
Show inline comments
 
deleted file
src/common.rs
Show inline comments
 
@@ -13,85 +13,90 @@ pub use core::{
 
pub use indexmap::{IndexMap, IndexSet};
 
pub use maplit::{hashmap, hashset};
 
pub use mio::{
 
    net::{TcpListener, TcpStream},
 
    Events, Interest, Poll, Token,
 
};
 
pub use std::{
 
    collections::{hash_map::Entry, BTreeMap, HashMap, HashSet},
 
    convert::TryInto,
 
    io::{Read, Write},
 
    net::SocketAddr,
 
    sync::Arc,
 
    time::Instant,
 
};
 
pub use Polarity::*;
 

	
 
///////////////////// DEFS /////////////////////
 

	
 
pub type ConnectorId = u32;
 
pub type PortSuffix = u32;
 

	
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
#[repr(C)]
 
pub struct Id {
 
    pub(crate) connector_id: ConnectorId,
 
    pub(crate) u32_suffix: PortSuffix,
 
}
 

	
 
#[derive(Debug, Default)]
 
pub struct U32Stream {
 
    next: u32,
 
}
 

	
 
// globally unique
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
#[repr(transparent)]
 
pub struct PortId(Id);
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
pub struct FiringVar(pub(crate) PortId);
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
pub struct ProtoComponentId(Id);
 

	
 
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
 
#[repr(C)]
 
pub struct Payload(Arc<Vec<u8>>);
 

	
 
#[derive(
 
    Debug, Eq, PartialEq, Clone, Hash, Copy, Ord, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
#[repr(C)]
 
pub enum Polarity {
 
    Putter, // output port (from the perspective of the component)
 
    Getter, // input port (from the perspective of the component)
 
}
 
#[derive(
 
    Debug, Eq, PartialEq, Clone, Hash, Copy, Ord, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
#[repr(C)]
 
pub enum EndpointPolarity {
 
    Active,  // calls connect()
 
    Passive, // calls bind() listen() accept()
 
}
 

	
 
#[derive(Eq, PartialEq, Copy, Clone, Debug)]
 
pub enum AddComponentError {
 
    NoSuchComponent,
 
    NonPortTypeParameters,
 
    CannotMovePort(PortId),
 
    WrongNumberOfParamaters { expected: usize },
 
    UnknownPort(PortId),
 
    WrongPortPolarity { port: PortId, expected_polarity: Polarity },
 
    DuplicateMovedPort(PortId),
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub enum NonsyncBlocker {
 
    Inconsistent,
 
    ComponentExit,
 
    SyncBlockStart,
 
}
 

	
 
#[derive(Debug, Clone)]
src/protocol/mod.rs
Show inline comments
 
mod arena;
 
mod ast;
 
mod eval;
 
pub mod inputsource;
 
mod lexer;
 
mod library;
 
mod parser;
 

	
 
use crate::common::*;
 
use crate::protocol::ast::*;
 
use crate::protocol::eval::*;
 
use crate::protocol::inputsource::*;
 
use crate::protocol::parser::*;
 

	
 
#[derive(serde::Serialize, serde::Deserialize)]
 
#[repr(C)]
 
pub struct ProtocolDescription {
 
    heap: Heap,
 
    source: InputSource,
 
    root: RootId,
 
}
 
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
 
pub struct ComponentState {
 
    prompt: Prompt,
 
}
 
pub enum EvalContext<'a> {
 
    Nonsync(&'a mut NonsyncProtoContext<'a>),
 
    Sync(&'a mut SyncProtoContext<'a>),
 
    // None,
 
}
 
//////////////////////////////////////////////
 

	
 
impl std::fmt::Debug for ProtocolDescription {
 
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
 
        write!(f, "(A big honkin' protocol description)")
 
    }
 
}
 
impl ProtocolDescription {
 
    pub fn parse(buffer: &[u8]) -> Result<Self, String> {
 
        let mut heap = Heap::new();
src/runtime/communication.rs
Show inline comments
 
@@ -32,88 +32,86 @@ struct ProtoComponentBranch {
 
struct CyclicDrainer<'a, K: Eq + Hash, V> {
 
    input: &'a mut HashMap<K, V>,
 
    inner: CyclicDrainInner<'a, K, V>,
 
}
 
struct CyclicDrainInner<'a, K: Eq + Hash, V> {
 
    swap: &'a mut HashMap<K, V>,
 
    output: &'a mut HashMap<K, V>,
 
}
 
trait PayloadMsgSender {
 
    fn putter_send(
 
        &mut self,
 
        cu: &mut ConnectorUnphased,
 
        putter: PortId,
 
        msg: SendPayloadMsg,
 
    ) -> Result<(), SyncError>;
 
}
 

	
 
////////////////
 
impl Connector {
 
    pub fn gotten(&mut self, port: PortId) -> Result<&Payload, GottenError> {
 
        use GottenError::*;
 
        let Self { phased, .. } = self;
 
        match phased {
 
            ConnectorPhased::Setup { .. } => Err(NoPreviousRound),
 
            ConnectorPhased::Communication(ConnectorCommunication { round_result, .. }) => {
 
                match round_result {
 
                    Err(_) => Err(PreviousSyncFailed),
 
                    Ok(None) => Err(NoPreviousRound),
 
                    Ok(Some(round_ok)) => round_ok.gotten.get(&port).ok_or(PortDidntGet),
 
                }
 
            }
 
            ConnectorPhased::Communication(comm) => match &comm.round_result {
 
                Err(_) => Err(PreviousSyncFailed),
 
                Ok(None) => Err(NoPreviousRound),
 
                Ok(Some(round_ok)) => round_ok.gotten.get(&port).ok_or(PortDidntGet),
 
            },
 
        }
 
    }
 
    pub fn next_batch(&mut self) -> Result<usize, NextBatchError> {
 
        // returns index of new batch
 
        use NextBatchError::*;
 
        let Self { phased, .. } = self;
 
        match phased {
 
            ConnectorPhased::Setup { .. } => Err(NotConnected),
 
            ConnectorPhased::Communication(ConnectorCommunication { native_batches, .. }) => {
 
                native_batches.push(Default::default());
 
                Ok(native_batches.len() - 1)
 
            ConnectorPhased::Communication(comm) => {
 
                comm.native_batches.push(Default::default());
 
                Ok(comm.native_batches.len() - 1)
 
            }
 
        }
 
    }
 
    fn port_op_access(
 
        &mut self,
 
        port: PortId,
 
        expect_polarity: Polarity,
 
    ) -> Result<&mut NativeBatch, PortOpError> {
 
        use PortOpError::*;
 
        let Self { unphased, phased } = self;
 
        if !unphased.native_ports.contains(&port) {
 
            return Err(PortUnavailable);
 
        }
 
        match unphased.port_info.polarities.get(&port) {
 
            Some(p) if *p == expect_polarity => {}
 
            Some(_) => return Err(WrongPolarity),
 
            None => return Err(UnknownPolarity),
 
        }
 
        match phased {
 
            ConnectorPhased::Setup { .. } => Err(NotConnected),
 
            ConnectorPhased::Communication(ConnectorCommunication { native_batches, .. }) => {
 
                let batch = native_batches.last_mut().unwrap(); // length >= invariant
 
            ConnectorPhased::Communication(comm) => {
 
                let batch = comm.native_batches.last_mut().unwrap(); // length >= invariant
 
                Ok(batch)
 
            }
 
        }
 
    }
 
    pub fn put(&mut self, port: PortId, payload: Payload) -> Result<(), PortOpError> {
 
        use PortOpError::*;
 
        let batch = self.port_op_access(port, Putter)?;
 
        if batch.to_put.contains_key(&port) {
 
            Err(MultipleOpsOnPort)
 
        } else {
 
            batch.to_put.insert(port, payload);
 
            Ok(())
 
        }
 
    }
 
    pub fn get(&mut self, port: PortId) -> Result<(), PortOpError> {
 
        use PortOpError::*;
 
        let batch = self.port_op_access(port, Getter)?;
 
        if batch.to_get.insert(port) {
 
            Ok(())
 
        } else {
 
            Err(MultipleOpsOnPort)
 
        }
 
    }
 
    // entrypoint for caller. overwrites round result enum, and returns what happened
src/runtime/error.rs
Show inline comments
 
@@ -30,24 +30,29 @@ pub enum MalformedStateError {
 
}
 
#[derive(Debug, Clone)]
 
pub enum EndpointError {
 
    MalformedMessage,
 
    BrokenEndpoint,
 
}
 
#[derive(Debug)]
 
pub enum PortOpError {
 
    WrongPolarity,
 
    UnknownPolarity,
 
    NotConnected,
 
    MultipleOpsOnPort,
 
    PortUnavailable,
 
}
 
#[derive(Debug, Eq, PartialEq)]
 
pub enum GottenError {
 
    NoPreviousRound,
 
    PortDidntGet,
 
    PreviousSyncFailed,
 
}
 
#[derive(Debug, Eq, PartialEq)]
 
pub enum NextBatchError {
 
    NotConnected,
 
}
 

	
 
#[derive(Debug, Eq, PartialEq)]
 
pub enum NewNetPortError {
 
    AlreadyConnected,
 
}
src/runtime/ffi.rs
Show inline comments
 
use super::*;
 

	
 
use core::cell::RefCell;
 
use std::os::raw::{c_char, c_int, c_uchar, c_uint};
 
use core::convert::TryFrom;
 
use std::slice::from_raw_parts as slice_from_parts;
 
// use std::os::raw::{c_char, c_int, c_uchar, c_uint};
 

	
 
#[derive(Default)]
 
struct StoredError {
 
    // invariant: len is zero IFF its occupied
 
    // contents are 1+ bytes because we also store the NULL TERMINATOR
 
    buf: Vec<u8>,
 
}
 
impl StoredError {
 
    const NULL_TERMINATOR: u8 = 0;
 
    fn clear(&mut self) {
 
        // no null terminator either!
 
        self.buf.clear();
 
    }
 
    fn store<E: Debug>(&mut self, error: &E) {
 
        write!(&mut self.buf, "{:?}", error);
 
    fn debug_store<E: Debug>(&mut self, error: &E) {
 
        let _ = write!(&mut self.buf, "{:?}", error);
 
        self.buf.push(Self::NULL_TERMINATOR);
 
    }
 
    fn tl_store<E: Debug>(error: &E) {
 
    fn tl_debug_store<E: Debug>(error: &E) {
 
        STORED_ERROR.with(|stored_error| {
 
            let mut stored_error = stored_error.borrow_mut();
 
            stored_error.clear();
 
            stored_error.store(error);
 
            stored_error.debug_store(error);
 
        })
 
    }
 
    // fn bytes_store(&mut self, bytes: &[u8]) {
 
    //     let _ = self.buf.write_all(bytes);
 
    //     self.buf.push(Self::NULL_TERMINATOR);
 
    // }
 
    // fn tl_bytes_store(bytes: &[u8]) {
 
    //     STORED_ERROR.with(|stored_error| {
 
    //         let mut stored_error = stored_error.borrow_mut();
 
    //         stored_error.clear();
 
    //         stored_error.bytes_store(bytes);
 
    //     })
 
    // }
 
    fn tl_clear() {
 
        STORED_ERROR.with(|stored_error| {
 
            let mut stored_error = stored_error.borrow_mut();
 
            stored_error.clear();
 
        })
 
    }
 
    fn tl_raw_peek() -> (*const u8, usize) {
 
    fn tl_bytes_peek() -> (*const u8, usize) {
 
        STORED_ERROR.with(|stored_error| {
 
            let stored_error = stored_error.borrow();
 
            match stored_error.buf.len() {
 
                0 => (core::ptr::null(), 0), // no error!
 
                n => {
 
                    // stores an error of length n-1 AND a NULL TERMINATOR
 
                    (stored_error.buf.as_ptr(), n - 1)
 
                }
 
            }
 
        })
 
    }
 
}
 
thread_local! {
 
    static STORED_ERROR: RefCell<StoredError> = RefCell::new(StoredError::default());
 
}
 

	
 
type ErrorCode = i32;
 

	
 
//////////////////////////////////////
 
///////////////////// REOWOLF //////////////////////////
 

	
 
/// Returns length (via out pointer) and pointer (via return value) of the last Reowolf error.
 
/// - pointer is NULL iff there was no last error
 
/// - data at pointer is null-delimited
 
/// - len does NOT include the length of the null-delimiter
 
#[no_mangle]
 
pub unsafe extern "C" fn reowolf_error_peek(len: *mut usize) -> *const u8 {
 
    let (err_ptr, err_len) = StoredError::tl_raw_peek();
 
    let (err_ptr, err_len) = StoredError::tl_bytes_peek();
 
    len.write(err_len);
 
    err_ptr
 
}
 

	
 
///////////////////// PROTOCOL DESCRIPTION //////////////////////////
 

	
 
/// Parses the utf8-encoded string slice to initialize a new protocol description object.
 
/// - On success, initializes `out` and returns 0
 
/// - On failure, stores an error string (see `reowolf_error_peek`) and returns -1
 
#[no_mangle]
 
pub unsafe extern "C" fn protocol_description_parse(
 
    pdl: *const u8,
 
    pdl_len: usize,
 
    pd: *mut Arc<ProtocolDescription>,
 
    out: *mut Arc<ProtocolDescription>,
 
) -> ErrorCode {
 
    StoredError::tl_clear();
 
    let slice: *const [u8] = std::slice::from_raw_parts(pdl, pdl_len);
 
    let slice: &[u8] = &*slice;
 
    match ProtocolDescription::parse(slice) {
 
    match ProtocolDescription::parse(&*slice_from_parts(pdl, pdl_len)) {
 
        Ok(new) => {
 
            pd.write(Arc::new(new));
 
            out.write(Arc::new(new));
 
            0
 
        }
 
        Err(err) => {
 
            StoredError::tl_store(&err);
 
            StoredError::tl_debug_store(&err);
 
            -1
 
        }
 
    }
 
}
 

	
 
/// Destroys the given initialized protocol description and frees its resources.
 
#[no_mangle]
 
pub unsafe extern "C" fn protocol_description_destroy(pd: Arc<ProtocolDescription>) {
 
    drop(pd)
 
pub unsafe extern "C" fn protocol_description_destroy(pd: *mut Arc<ProtocolDescription>) {
 
    drop(Box::from_raw(pd))
 
}
 

	
 
/// Given an initialized protocol description, initializes `out` with a clone which can be independently created or destroyed.
 
#[no_mangle]
 
pub unsafe extern "C" fn protocol_description_clone(
 
    pd: &Arc<ProtocolDescription>,
 
) -> Arc<ProtocolDescription> {
 
    pd.clone()
 
    out: *mut Arc<ProtocolDescription>,
 
) {
 
    out.write(pd.clone());
 
}
 

	
 
///////////////////// CONNECTOR //////////////////////////
 

	
 
/// Initializes `out` with a new connector using the given protocol description as its configuration.
 
/// The connector is assigned a random (internal) connector ID.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_new(pd: &Arc<ProtocolDescription>, out: *mut Connector) {
 
    connector_new_with_id(pd, random_connector_id(), out)
 
}
 

	
 
/// Initializes `out` with a new connector using the given protocol description as its configuration.
 
/// The connector uses the given (internal) connector ID.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_new_with_id(
 
    pd: &Arc<ProtocolDescription>,
 
    cid: ConnectorId,
 
    out: *mut Connector,
 
) {
 
    out.write(Connector::new(Box::new(DummyLogger), pd.clone(), cid, 8))
 
}
 

	
 
/// Destroys the given initialized connector and frees its resources.
 
/// Usable in {setup, communication} states.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_destroy(connector: Connector) {
 
    drop(connector)
 
}
 

	
 
/// Given an initialized connector in setup or connecting state,
 
/// - Creates a new directed port pair with logical channel putter->getter,
 
/// - adds the ports to the native component's interface,
 
/// - and returns them using the given out pointers.
 
/// Usable in {setup, communication} states.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_add_port_pair(
 
    connector: &mut Connector,
 
    out_putter: *mut PortId,
 
    out_getter: *mut PortId,
 
) {
 
    let [o, i] = connector.new_port_pair();
 
    out_putter.write(o);
 
    out_getter.write(i);
 
}
 

	
 
/// Given
 
/// - an initialized connector in setup or connecting state,
 
/// - a string slice for the component's identifier in the connector's configured protocol description,
 
/// - a set of ports (represented as a slice; duplicates are ignored) in the native component's interface,
 
/// the connector creates a new (internal) protocol component C, such that the set of native ports are moved to C.
 
/// Usable in {setup, communication} states.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_add_component(
 
    connector: &mut Connector,
 
    ident_ptr: *const u8,
 
    ident_len: usize,
 
    ports_ptr: *const PortId,
 
    ports_len: usize,
 
) -> ErrorCode {
 
    StoredError::tl_clear();
 
    match connector.add_component(
 
        &*slice_from_parts(ident_ptr, ident_len),
 
        &*slice_from_parts(ports_ptr, ports_len),
 
    ) {
 
        Ok(()) => 0,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            -1
 
        }
 
    }
 
}
 

	
 
/// Given
 
/// - an initialized connector in setup or connecting state,
 
/// - a utf-8 encoded socket address,
 
/// - the logical polarity of P,
 
/// - the "physical" polarity in {Active, Passive} of the endpoint through which P's peer will be discovered,
 
/// returns P, a port newly added to the native interface.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_add_net_port(
 
    connector: &mut Connector,
 
    addr_str_ptr: *const u8,
 
    addr_str_len: usize,
 
    port_polarity: Polarity,
 
    endpoint_polarity: EndpointPolarity,
 
    port: *mut PortId,
 
) -> ErrorCode {
 
    StoredError::tl_clear();
 
    let addr_bytes = &*slice_from_parts(addr_str_ptr, addr_str_len);
 
    let addr_str = match std::str::from_utf8(addr_bytes) {
 
        Ok(addr_str) => addr_str,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            return -1;
 
        }
 
    };
 
    let sock_address: SocketAddr = match addr_str.parse() {
 
        Ok(addr) => addr,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            return -2;
 
        }
 
    };
 
    match connector.new_net_port(port_polarity, sock_address, endpoint_polarity) {
 
        Ok(p) => {
 
            port.write(p);
 
            0
 
        }
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            -3
 
        }
 
    }
 
}
 

	
 
/// Connects this connector to the distributed system of connectors reachable through endpoints,
 
/// Usable in setup state, and changes the state to communication.
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_connect(
 
    connector: &mut Connector,
 
    timeout_millis: i64,
 
) -> ErrorCode {
 
    StoredError::tl_clear();
 
    let option_timeout_millis: Option<u64> = TryFrom::try_from(timeout_millis).ok();
 
    let timeout = option_timeout_millis.map(Duration::from_millis);
 
    match connector.connect(timeout) {
 
        Ok(()) => 0,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            -1
 
        }
 
    }
 
}
 

	
 
// #[no_mangle]
 
// pub extern "C" fn connector_new(pd: *const Arc<ProtocolDescription>) -> *mut Connector {
 
//     Box::into_raw(Box::new(Connector::default()))
 
// }
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_put_payload(
 
    connector: &mut Connector,
 
    port: PortId,
 
    payload: *mut Payload,
 
) -> ErrorCode {
 
    match connector.put(port, payload.read()) {
 
        Ok(()) => 0,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            -1
 
        }
 
    }
 
}
 

	
 
// /// Creates and returns Reowolf Connector structure allocated on the heap.
 
// #[no_mangle]
 
// pub extern "C" fn connector_with_controller_id(controller_id: ControllerId) -> *mut Connector {
 
//     Box::into_raw(Box::new(Connector::Unconfigured(Unconfigured { controller_id })))
 
// }
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_put_payload_cloning(
 
    connector: &mut Connector,
 
    port: PortId,
 
    payload: &Payload,
 
) -> ErrorCode {
 
    match connector.put(port, payload.clone()) {
 
        Ok(()) => 0,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            -1
 
        }
 
    }
 
}
 

	
 
/// Convenience function combining the functionalities of
 
/// "payload_new" with "connector_put_payload".
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_put_bytes(
 
    connector: &mut Connector,
 
    port: PortId,
 
    bytes_ptr: *const u8,
 
    bytes_len: usize,
 
) -> ErrorCode {
 
    StoredError::tl_clear();
 
    let payload: Payload = {
 
        let payload: *mut Payload = std::ptr::null_mut(); // uninitialized
 
        payload_new(bytes_ptr, bytes_len, payload); // initializes;
 
        payload.read()
 
    };
 
    match connector.put(port, payload) {
 
        Ok(()) => 0,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            -1
 
        }
 
    }
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_get(connector: &mut Connector, port: PortId) -> ErrorCode {
 
    match connector.get(port) {
 
        Ok(()) => 0,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            -1
 
        }
 
    }
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_next_batch(connector: &mut Connector) -> isize {
 
    match connector.next_batch() {
 
        Ok(n) => n as isize,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            -1
 
        }
 
    }
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_sync(connector: &mut Connector, timeout_millis: i64) -> isize {
 
    StoredError::tl_clear();
 
    let option_timeout_millis: Option<u64> = TryFrom::try_from(timeout_millis).ok();
 
    let timeout = option_timeout_millis.map(Duration::from_millis);
 
    match connector.sync(timeout) {
 
        Ok(n) => n as isize,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            -1
 
        }
 
    }
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_gotten_bytes(
 
    connector: &mut Connector,
 
    port: PortId,
 
    len: *mut usize,
 
) -> *const u8 {
 
    StoredError::tl_clear();
 
    match connector.gotten(port) {
 
        Ok(payload_borrow) => {
 
            let slice = payload_borrow.as_slice();
 
            len.write(slice.len());
 
            slice.as_ptr()
 
        }
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            std::ptr::null()
 
        }
 
    }
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_gotten_payload(
 
    connector: &mut Connector,
 
    port: PortId,
 
) -> *const Payload {
 
    StoredError::tl_clear();
 
    match connector.gotten(port) {
 
        Ok(payload_borrow) => payload_borrow,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            std::ptr::null()
 
        }
 
    }
 
}
 

	
 
///////////////////// PAYLOAD //////////////////////////
 
#[no_mangle]
 
pub unsafe extern "C" fn payload_new(
 
    bytes_ptr: *const u8,
 
    bytes_len: usize,
 
    out_payload: *mut Payload,
 
) {
 
    let bytes: &[u8] = &*slice_from_parts(bytes_ptr, bytes_len);
 
    out_payload.write(Payload::from(bytes));
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn payload_destroy(payload: *mut Payload) {
 
    drop(Box::from_raw(payload))
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn payload_clone(payload: &Payload, out_payload: *mut Payload) {
 
    out_payload.write(payload.clone())
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn payload_peek_bytes(payload: &Payload, bytes_len: *mut usize) -> *const u8 {
 
    let slice = payload.as_slice();
 
    bytes_len.write(slice.len());
 
    slice.as_ptr()
 
}
src/runtime/mod.rs
Show inline comments
 
@@ -130,101 +130,107 @@ pub struct MemInMsg {
 
pub struct IdManager {
 
    connector_id: ConnectorId,
 
    port_suffix_stream: U32Stream,
 
    proto_component_suffix_stream: U32Stream,
 
}
 
#[derive(Debug)]
 
pub struct EndpointManager {
 
    // invariants:
 
    // 1. endpoint N is registered READ | WRITE with poller
 
    // 2. Events is empty
 
    poll: Poll,
 
    events: Events,
 
    polled_undrained: IndexSet<usize>,
 
    delayed_messages: Vec<(usize, Msg)>,
 
    undelayed_messages: Vec<(usize, Msg)>,
 
    endpoint_exts: Vec<EndpointExt>,
 
}
 
#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
 
pub struct PortInfo {
 
    polarities: HashMap<PortId, Polarity>,
 
    peers: HashMap<PortId, PortId>,
 
    routes: HashMap<PortId, Route>,
 
}
 
#[derive(Debug)]
 
// #[repr(C)]
 
pub struct Connector {
 
    unphased: ConnectorUnphased,
 
    phased: ConnectorPhased,
 
}
 
#[derive(Debug)]
 
pub struct ConnectorCommunication {
 
    round_index: usize,
 
    endpoint_manager: EndpointManager,
 
    neighborhood: Neighborhood,
 
    mem_inbox: Vec<MemInMsg>,
 
    native_batches: Vec<NativeBatch>,
 
    round_result: Result<Option<RoundOk>, SyncError>,
 
}
 
#[derive(Debug)]
 
pub struct ConnectorUnphased {
 
    proto_description: Arc<ProtocolDescription>,
 
    proto_components: HashMap<ProtoComponentId, ProtoComponent>,
 
    logger: Box<dyn Logger>,
 
    id_manager: IdManager,
 
    native_ports: HashSet<PortId>,
 
    port_info: PortInfo,
 
}
 
#[derive(Debug)]
 
pub enum ConnectorPhased {
 
    Setup { endpoint_setups: Vec<(PortId, EndpointSetup)>, surplus_sockets: u16 },
 
    Communication(ConnectorCommunication),
 
    Communication(Box<ConnectorCommunication>),
 
}
 
#[derive(Default, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
pub struct Predicate {
 
    pub assigned: BTreeMap<FiringVar, bool>,
 
}
 
#[derive(Debug, Default)]
 
pub struct NativeBatch {
 
    // invariant: putters' and getters' polarities respected
 
    to_put: HashMap<PortId, Payload>,
 
    to_get: HashSet<PortId>,
 
}
 
pub struct NonsyncProtoContext<'a> {
 
    logger: &'a mut dyn Logger,
 
    proto_component_id: ProtoComponentId,
 
    port_info: &'a mut PortInfo,
 
    id_manager: &'a mut IdManager,
 
    proto_component_ports: &'a mut HashSet<PortId>,
 
    unrun_components: &'a mut Vec<(ProtoComponentId, ProtoComponent)>,
 
}
 
pub struct SyncProtoContext<'a> {
 
    logger: &'a mut dyn Logger,
 
    predicate: &'a Predicate,
 
    port_info: &'a PortInfo,
 
    inbox: &'a HashMap<PortId, Payload>,
 
}
 
////////////////
 

	
 
pub fn random_connector_id() -> ConnectorId {
 
    type Bytes8 = [u8; std::mem::size_of::<ConnectorId>()];
 
    let mut bytes = Bytes8::default();
 
    getrandom::getrandom(&mut bytes).unwrap();
 
    unsafe { std::mem::transmute::<Bytes8, ConnectorId>(bytes) }
 
}
 
pub fn would_block(err: &std::io::Error) -> bool {
 
    err.kind() == std::io::ErrorKind::WouldBlock
 
}
 
impl<T: std::cmp::Ord> VecSet<T> {
 
    fn iter(&self) -> std::slice::Iter<T> {
 
        self.vec.iter()
 
    }
 
    fn contains(&self, element: &T) -> bool {
 
        self.vec.binary_search(element).is_ok()
 
    }
 
    fn new(mut vec: Vec<T>) -> Self {
 
        vec.sort();
 
        vec.dedup();
 
        Self { vec }
 
    }
 
}
 
impl PortInfo {
 
    fn firing_var_for(&self, port: PortId) -> FiringVar {
 
        FiringVar(match self.polarities.get(&port).unwrap() {
 
            Getter => port,
 
            Putter => *self.peers.get(&port).unwrap(),
 
        })
 
    }
 
}
src/runtime/setup.rs
Show inline comments
 
@@ -5,113 +5,113 @@ impl Connector {
 
    pub fn new(
 
        mut logger: Box<dyn Logger>,
 
        proto_description: Arc<ProtocolDescription>,
 
        connector_id: ConnectorId,
 
        surplus_sockets: u16,
 
    ) -> Self {
 
        log!(&mut *logger, "Created with connector_id {:?}", connector_id);
 
        Self {
 
            unphased: ConnectorUnphased {
 
                proto_description,
 
                proto_components: Default::default(),
 
                logger,
 
                id_manager: IdManager::new(connector_id),
 
                native_ports: Default::default(),
 
                port_info: Default::default(),
 
            },
 
            phased: ConnectorPhased::Setup { endpoint_setups: Default::default(), surplus_sockets },
 
        }
 
    }
 
    pub fn new_net_port(
 
        &mut self,
 
        polarity: Polarity,
 
        sock_addr: SocketAddr,
 
        endpoint_polarity: EndpointPolarity,
 
    ) -> Result<PortId, ()> {
 
    ) -> Result<PortId, NewNetPortError> {
 
        let Self { unphased: up, phased } = self;
 
        match phased {
 
            ConnectorPhased::Communication { .. } => Err(NewNetPortError::AlreadyConnected),
 
            ConnectorPhased::Setup { endpoint_setups, .. } => {
 
                let endpoint_setup = EndpointSetup { sock_addr, endpoint_polarity };
 
                let p = up.id_manager.new_port_id();
 
                up.native_ports.insert(p);
 
                // {polarity, route} known. {peer} unknown.
 
                up.port_info.polarities.insert(p, polarity);
 
                up.port_info.routes.insert(p, Route::LocalComponent(ComponentId::Native));
 
                log!(
 
                    up.logger,
 
                    "Added net port {:?} with polarity {:?} and endpoint setup {:?} ",
 
                    p,
 
                    polarity,
 
                    &endpoint_setup
 
                );
 
                endpoint_setups.push((p, endpoint_setup));
 
                Ok(p)
 
            }
 
            ConnectorPhased::Communication { .. } => Err(()),
 
        }
 
    }
 
    pub fn connect(&mut self, timeout: Option<Duration>) -> Result<(), ConnectError> {
 
        use ConnectError::*;
 
        let Self { unphased: cu, phased } = self;
 
        match phased {
 
            ConnectorPhased::Communication { .. } => {
 
                log!(cu.logger, "Call to connecting in connected state");
 
                Err(AlreadyConnected)
 
            }
 
            ConnectorPhased::Setup { endpoint_setups, .. } => {
 
                log!(cu.logger, "~~~ CONNECT called timeout {:?}", timeout);
 
                let deadline = timeout.map(|to| Instant::now() + to);
 
                // connect all endpoints in parallel; send and receive peer ids through ports
 
                let mut endpoint_manager = new_endpoint_manager(
 
                    &mut *cu.logger,
 
                    endpoint_setups,
 
                    &mut cu.port_info,
 
                    deadline,
 
                )?;
 
                log!(
 
                    cu.logger,
 
                    "Successfully connected {} endpoints",
 
                    endpoint_manager.endpoint_exts.len()
 
                );
 
                // leader election and tree construction
 
                let neighborhood = init_neighborhood(
 
                    cu.id_manager.connector_id,
 
                    &mut *cu.logger,
 
                    &mut endpoint_manager,
 
                    deadline,
 
                )?;
 
                log!(cu.logger, "Successfully created neighborhood {:?}", &neighborhood);
 
                let mut comm = ConnectorCommunication {
 
                    round_index: 0,
 
                    endpoint_manager,
 
                    neighborhood,
 
                    mem_inbox: Default::default(),
 
                    native_batches: vec![Default::default()],
 
                    round_result: Ok(None),
 
                };
 
                session_optimize(cu, &mut comm, deadline)?;
 
                log!(cu.logger, "connect() finished. setup phase complete");
 
                self.phased = ConnectorPhased::Communication(comm);
 
                self.phased = ConnectorPhased::Communication(Box::new(comm));
 
                Ok(())
 
            }
 
        }
 
    }
 
}
 
fn new_endpoint_manager(
 
    logger: &mut dyn Logger,
 
    endpoint_setups: &[(PortId, EndpointSetup)],
 
    port_info: &mut PortInfo,
 
    deadline: Option<Instant>,
 
) -> Result<EndpointManager, ConnectError> {
 
    ////////////////////////////////////////////
 
    use std::sync::atomic::AtomicBool;
 
    use ConnectError::*;
 
    const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE);
 
    struct Todo {
 
        todo_endpoint: TodoEndpoint,
 
        endpoint_setup: EndpointSetup,
 
        local_port: PortId,
 
        sent_local_port: bool,          // true <-> I've sent my local port
 
        recv_peer_port: Option<PortId>, // Some(..) <-> I've received my peer's port
 
    }
 
    enum TodoEndpoint {
 
        Accepting(TcpListener),
 
@@ -120,52 +120,48 @@ fn new_endpoint_manager(
 
    fn init_todo(
 
        token: Token,
 
        local_port: PortId,
 
        endpoint_setup: &EndpointSetup,
 
        poll: &mut Poll,
 
    ) -> Result<Todo, ConnectError> {
 
        let todo_endpoint = if let EndpointPolarity::Active = endpoint_setup.endpoint_polarity {
 
            let mut stream = TcpStream::connect(endpoint_setup.sock_addr)
 
                .expect("mio::TcpStream connect should not fail!");
 
            poll.registry().register(&mut stream, token, BOTH).unwrap();
 
            TodoEndpoint::Endpoint(Endpoint { stream, inbox: vec![] })
 
        } else {
 
            let mut listener = TcpListener::bind(endpoint_setup.sock_addr)
 
                .map_err(|_| BindFailed(endpoint_setup.sock_addr))?;
 
            poll.registry().register(&mut listener, token, BOTH).unwrap();
 
            TodoEndpoint::Accepting(listener)
 
        };
 
        Ok(Todo {
 
            todo_endpoint,
 
            local_port,
 
            sent_local_port: false,
 
            recv_peer_port: None,
 
            endpoint_setup: endpoint_setup.clone(),
 
        })
 
    };
 
    struct WakerState {
 
        continue_signal: Arc<AtomicBool>,
 
        failed_indices: HashSet<usize>,
 
    }
 
    ////////////////////////////////////////////
 

	
 
    // 1. Start to construct EndpointManager
 
    const WAKER_TOKEN: Token = Token(usize::MAX);
 
    const WAKER_PERIOD: Duration = Duration::from_millis(90);
 
    assert!(endpoint_setups.len() < WAKER_TOKEN.0); // using MAX usize as waker token
 
    let mut waker_continue_signal: Option<Arc<AtomicBool>> = None;
 
    let mut poll = Poll::new().map_err(|_| PollInitFailed)?;
 
    let mut events = Events::with_capacity(endpoint_setups.len() * 2 + 4);
 
    let mut polled_undrained = IndexSet::default();
 
    let mut delayed_messages = vec![];
 

	
 
    // 2. create a registered (TcpListener/Endpoint) for passive / active respectively
 
    let mut todos = endpoint_setups
 
        .iter()
 
        .enumerate()
 
        .map(|(index, (local_port, endpoint_setup))| {
 
            init_todo(Token(index), *local_port, endpoint_setup, &mut poll)
 
        })
 
        .collect::<Result<Vec<Todo>, ConnectError>>()?;
 

	
 
    // 3. Using poll to drive progress:
 
    //    - accept an incoming connection for each TcpListener (turning them into endpoints too)
0 comments (0 inline, 0 general)