Changeset - 7b9df91324ad
[Not reviewed]
0 5 2
Christopher Esterhuyse - 5 years ago 2020-07-13 16:52:23
christopher.esterhuyse@gmail.com
fixed unintentional name collisions in socket-micking API
7 files changed with 133 insertions and 34 deletions:
0 comments (0 inline, 0 general)
Cargo.toml
Show inline comments
 
@@ -34,11 +34,11 @@ lazy_static = "1.4.0"
 

	
 
[lib]
 
# compile target: dynamically linked library using C ABI
 
crate-type = ["cdylib"]
 

	
 
[features]
 
default = ["ffi", "session_optimization"]
 
default = ["ffi", "session_optimization", "ffi_socket_api"]
 
ffi = [] # see src/ffi.rs
 
ffi_socket_api = ["ffi", "lazy_static", "atomic_refcell"]
 
endpoint_logging = [] # see src/macros.rs
 
session_optimization = [] # see src/runtime/setup.rs
 
\ No newline at end of file
examples/pres_1/amy.c
Show inline comments
 
new file 100644
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	// Create a connector...
 
	Arc_ProtocolDescription * pd = protocol_description_parse("", 0);
 
	char logpath[] = "./pres_1_amy.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	// ... with 1 outgoing network connection
 
	PortId p0;
 
	char addr_str[] = "127.0.0.1:8000";
 
	connector_add_net_port(
 
		c, &p0, addr_str, sizeof(addr_str)-1, Polarity_Getter, EndpointPolarity_Active);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	// Connect! Begin communication. 5000ms timeout
 
	connector_connect(c, 5000);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	// Ask to receive a message...
 
	connector_get(c, p0);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	// ... or timeout within 1000ms.
 
	connector_sync(c, 1000);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	// Print the message we received
 
	size_t msg_len;
 
	const char * msg_ptr = connector_gotten_bytes(c, p0, &msg_len);
 
	printf("Got msg `%.*s`\n", msg_len, msg_ptr);
 
	
 
	printf("Exiting\n");
 
	protocol_description_destroy(pd);
 
	connector_destroy(c);
 
	sleep(1.0);
 
	return 0;
 
}
 
\ No newline at end of file
examples/pres_1/bob.c
Show inline comments
 
new file 100644
 

	
 
#include "../../reowolf.h"
 
#include "../utility.c"
 

	
 
int main(int argc, char** argv) {
 
	// Create a connector...
 
	Arc_ProtocolDescription * pd = protocol_description_parse("", 0);
 
	char logpath[] = "./pres_1_bob.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	// ... with 1 outgoing network connection
 
	PortId p0;
 
	char addr_str[] = "127.0.0.1:8000";
 
	connector_add_net_port(
 
		c, &p0, addr_str, sizeof(addr_str)-1, Polarity_Putter, EndpointPolarity_Passive);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	// Connect (5000ms timeout). Begin communication. 
 
	connector_connect(c, 5000);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	// Send a single 2-byte message...
 
	connector_put_bytes(c, p0, "hi", 2);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	// ... and acknowledge receipt within 1000ms.
 
	connector_sync(c, 1000);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	printf("Exiting\n");
 
	protocol_description_destroy(pd);
 
	connector_destroy(c);
 
	sleep(1.0);
 
	return 0;
 
}
 
\ No newline at end of file
examples/utility.c
Show inline comments
 
#include <stdio.h>
 
#include <stdlib.h>
 
#include <errno.h>
 
#include <string.h>
 
#include <unistd.h>
 

	
 
// allocates a buffer!
 
char * buffer_pdl(char * filename) {
 
	FILE *f = fopen(filename, "rb");
 
	if (f == NULL) {
 
		printf("Opening pdl file returned errno %d!\n", errno);
src/ffi/socket_api.rs
Show inline comments
 
use super::*;
 
use atomic_refcell::AtomicRefCell;
 
use mio::net::UdpSocket;
 
use std::{collections::HashMap, ffi::c_void, net::SocketAddr, os::raw::c_int, sync::RwLock};
 
///////////////////////////////////////////////////////////////////
 

	
 
type ConnectorFd = c_int;
 
struct Connector {}
 
struct FdAllocator {
 
@@ -17,12 +18,13 @@ enum MaybeConnector {
 
#[derive(Default)]
 
struct ConnectorStorage {
 
    fd_to_connector: HashMap<ConnectorFd, AtomicRefCell<MaybeConnector>>,
 
    fd_allocator: FdAllocator,
 
}
 
///////////////////////////////////////////////////////////////////
 

	
 
impl Default for FdAllocator {
 
    fn default() -> Self {
 
        Self {
 
            next: Some(0), // positive values used only
 
            freed: vec![],
 
        }
 
@@ -46,64 +48,74 @@ impl FdAllocator {
 
lazy_static::lazy_static! {
 
    static ref CONNECTOR_STORAGE: RwLock<ConnectorStorage> = Default::default();
 
}
 
///////////////////////////////////////////////////////////////////
 

	
 
#[no_mangle]
 
pub extern "C" fn socket(_domain: c_int, _type: c_int) -> c_int {
 
pub extern "C" fn rw_socket(_domain: c_int, _type: c_int) -> c_int {
 
    // assuming _domain is AF_INET and _type is SOCK_DGRAM
 
    let w = if let Some(w) = CONNECTOR_STORAGE.write() { w } else { return FD_LOCK_POISONED };
 
    let mut w = if let Ok(w) = CONNECTOR_STORAGE.write() { w } else { return FD_LOCK_POISONED };
 
    let fd = w.fd_allocator.alloc();
 
    w.fd_to_connector.insert(fd, AtomicRefCell::new(MaybeConnector::New));
 
    fd
 
}
 

	
 
#[no_mangle]
 
pub extern "C" fn close(fd: ConnectorFd, _how: c_int) -> c_int {
 
pub extern "C" fn rw_close(fd: ConnectorFd, _how: c_int) -> c_int {
 
    // ignoring HOW
 
    let w = if let Some(w) = CONNECTOR_STORAGE.write() { w } else { return FD_LOCK_POISONED };
 
    let mut w = if let Ok(w) = CONNECTOR_STORAGE.write() { w } else { return FD_LOCK_POISONED };
 
    w.fd_allocator.free(fd);
 
    if w.fd_to_connector.remove(&fd).is_some() {
 
        ERR_OK
 
    } else {
 
        CLOSE_FAIL
 
    }
 
}
 

	
 
#[no_mangle]
 
pub extern "C" fn bind(fd: ConnectorFd, address: *const SocketAddr, _address_len: usize) -> c_int {
 
pub unsafe extern "C" fn rw_bind(
 
    fd: ConnectorFd,
 
    address: *const SocketAddr,
 
    _address_len: usize,
 
) -> c_int {
 
    use MaybeConnector as Mc;
 
    // assuming _domain is AF_INET and _type is SOCK_DGRAM
 
    let r = if let Some(r) = CONNECTOR_STORAGE.read() { r } else { return FD_LOCK_POISONED };
 
    let r = if let Ok(r) = CONNECTOR_STORAGE.read() { r } else { return FD_LOCK_POISONED };
 
    let mc = if let Some(mc) = r.fd_to_connector.get(&fd) { mc } else { return BAD_FD };
 
    let mc: &mut Mc = &mut maybe_conn.borrow_mut();
 
    let mc: &mut Mc = &mut mc.borrow_mut();
 
    let _ = if let Mc::New = mc { () } else { return WRONG_STATE };
 
    *mc = Mc::Bound(address.read());
 
    ERR_OK
 
}
 

	
 
#[no_mangle]
 
pub extern "C" fn connect(
 
pub extern "C" fn rw_connect(
 
    fd: ConnectorFd,
 
    _address: *const SocketAddr,
 
    _address_len: usize,
 
) -> c_int {
 
    use MaybeConnector as Mc;
 
    // assuming _domain is AF_INET and _type is SOCK_DGRAM
 
    let r = if let Some(r) = CONNECTOR_STORAGE.read() { r } else { return FD_LOCK_POISONED };
 
    let r = if let Ok(r) = CONNECTOR_STORAGE.read() { r } else { return FD_LOCK_POISONED };
 
    let mc = if let Some(mc) = r.fd_to_connector.get(&fd) { mc } else { return BAD_FD };
 
    let mc: &mut Mc = &mut maybe_conn.borrow_mut();
 
    let local = if let Mc::Bound(local) = mc { local } else { return WRONG_STATE };
 
    let mc: &mut Mc = &mut mc.borrow_mut();
 
    let _local = if let Mc::Bound(local) = mc { local } else { return WRONG_STATE };
 
    *mc = Mc::Connected(Connector {});
 
    ERR_OK
 
}
 
#[no_mangle]
 
pub extern "C" fn send(fd: ConnectorFd, msg: *const c_void, len: usize, flags: c_int) -> isize {
 
pub extern "C" fn rw_send(
 
    fd: ConnectorFd,
 
    _msg: *const c_void,
 
    _len: usize,
 
    _flags: c_int,
 
) -> isize {
 
    use MaybeConnector as Mc;
 
    // assuming _domain is AF_INET and _type is SOCK_DGRAM
 
    let r = if let Some(r) = CONNECTOR_STORAGE.read() { r } else { return FD_LOCK_POISONED };
 
    let mc = if let Some(mc) = r.fd_to_connector.get(&fd) { mc } else { return BAD_FD };
 
    let mc: &mut Mc = &mut maybe_conn.borrow_mut();
 
    let c = if let Mc::Connected(c) = mc { c } else { return WRONG_STATE };
 
    let r =
 
        if let Ok(r) = CONNECTOR_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize };
 
    let mc = if let Some(mc) = r.fd_to_connector.get(&fd) { mc } else { return BAD_FD as isize };
 
    let mc: &mut Mc = &mut mc.borrow_mut();
 
    let _c = if let Mc::Connected(c) = mc { c } else { return WRONG_STATE as isize };
 
    // TODO
 
    ERR_OK
 
    ERR_OK as isize
 
}
src/runtime/communication.rs
Show inline comments
 
@@ -67,34 +67,43 @@ impl RoundCtxTrait for RoundCtx {
 
    }
 
    fn getter_add(&mut self, getter: PortId, msg: SendPayloadMsg) {
 
        self.getter_buffer.getter_add(getter, msg)
 
    }
 
}
 
impl Connector {
 
    fn get_comm_mut(&mut self) -> Option<&mut ConnectorCommunication> {
 
        if let ConnectorPhased::Communication(comm) = &mut self.phased {
 
            Some(comm)
 
        } else {
 
            None
 
        }
 
    }
 
    // pub(crate) fn get_mut_udp_sock(&mut self, index: usize) -> Option<&mut UdpSocket> {
 
    //     let sock = &mut self
 
    //         .get_comm_mut()?
 
    //         .endpoint_manager
 
    //         .udp_endpoint_store
 
    //         .endpoint_exts
 
    //         .get_mut(index)?
 
    //         .sock;
 
    //     Some(sock)
 
    // }
 
    pub fn gotten(&mut self, port: PortId) -> Result<&Payload, GottenError> {
 
        use GottenError as Ge;
 
        let Self { phased, .. } = self;
 
        match phased {
 
            ConnectorPhased::Setup { .. } => Err(Ge::NoPreviousRound),
 
            ConnectorPhased::Communication(comm) => match &comm.round_result {
 
                Err(_) => Err(Ge::PreviousSyncFailed),
 
                Ok(None) => Err(Ge::NoPreviousRound),
 
                Ok(Some(round_ok)) => round_ok.gotten.get(&port).ok_or(Ge::PortDidntGet),
 
            },
 
        let comm = self.get_comm_mut().ok_or(Ge::NoPreviousRound)?;
 
        match &comm.round_result {
 
            Err(_) => Err(Ge::PreviousSyncFailed),
 
            Ok(None) => Err(Ge::NoPreviousRound),
 
            Ok(Some(round_ok)) => round_ok.gotten.get(&port).ok_or(Ge::PortDidntGet),
 
        }
 
    }
 
    pub fn next_batch(&mut self) -> Result<usize, WrongStateError> {
 
        // returns index of new batch
 
        let Self { phased, .. } = self;
 
        match phased {
 
            ConnectorPhased::Setup { .. } => Err(WrongStateError),
 
            ConnectorPhased::Communication(comm) => {
 
                comm.native_batches.push(Default::default());
 
                Ok(comm.native_batches.len() - 1)
 
            }
 
        }
 
        let comm = self.get_comm_mut().ok_or(WrongStateError)?;
 
        comm.native_batches.push(Default::default());
 
        Ok(comm.native_batches.len() - 1)
 
    }
 
    fn port_op_access(
 
        &mut self,
 
        port: PortId,
 
        expect_polarity: Polarity,
 
    ) -> Result<&mut NativeBatch, PortOpError> {
src/runtime/setup.rs
Show inline comments
 
@@ -200,12 +200,13 @@ fn new_endpoint_manager(
 
    // 2. Create net/udp TODOs, each already registered with poll
 
    let mut net_todos = net_endpoint_setups
 
        .iter()
 
        .enumerate()
 
        .map(|(index, endpoint_setup)| {
 
            let token = TokenTarget::NetEndpoint { index }.into();
 
            log!(logger, "Net endpoint {} beginning setup with {:?}", index, &endpoint_setup);
 
            let todo_endpoint = if let EndpointPolarity::Active = endpoint_setup.endpoint_polarity {
 
                let mut stream = TcpStream::connect(endpoint_setup.sock_addr)
 
                    .expect("mio::TcpStream connect should not fail!");
 
                poll.registry().register(&mut stream, token, BOTH).unwrap();
 
                TodoEndpoint::NetEndpoint(NetEndpoint { stream, inbox: vec![] })
 
            } else {
0 comments (0 inline, 0 general)