Changeset - 6c04a99de862
[Not reviewed]
0 9 0
Christopher Esterhuyse - 5 years ago 2020-07-15 12:24:04
christopher.esterhuyse@gmail.com
fleshed out socket api. did bugfix: proto components remember whether they did put/get
9 files changed with 99 insertions and 40 deletions:
0 comments (0 inline, 0 general)
Cargo.toml
Show inline comments
 
@@ -22,9 +22,9 @@ mio = { version = "0.7.0", package = "mio", features = ["udp", "tcp", "os-poll"]
 

	
 
# protocol
 
backtrace = "0.3"
 
lazy_static = "1.4.0"
 

	
 
# socket ffi
 
lazy_static = { version = "1.4.0", optional = true}
 
atomic_refcell = { version = "0.1.6", optional = true }
 

	
 
[dev-dependencies]
 
@@ -39,6 +39,6 @@ crate-type = ["cdylib"]
 
[features]
 
default = ["ffi", "ffi_socket_api"] # // "session_optimization", 
 
ffi = [] # see src/ffi.rs
 
ffi_socket_api = ["ffi", "lazy_static", "atomic_refcell"]
 
ffi_socket_api = ["ffi", "atomic_refcell"]
 
endpoint_logging = [] # see src/macros.rs
 
session_optimization = [] # see src/runtime/setup.rs
 
\ No newline at end of file
examples/make.py
Show inline comments
 
@@ -4,6 +4,7 @@ for c_file in glob.glob(script_path + "/*/*.c", recursive=False):
 
  print("compiling", c_file)
 
  args = [
 
    "gcc",          # compiler
 
    "-std=c11"      # C11 mode
 
    "-L",           # lib path flag
 
    "./",           # where to look for libs
 
    "-lreowolf_rs", # add lib called "reowolf_rs"
examples/pres_5/bob.c
Show inline comments
 
@@ -29,7 +29,7 @@ int main(int argc, char** argv) {
 
	rw_err_peek(c);
 

	
 
	for(int round=0; round<3; round++) {
 
		printf("\----------Round %d\n", round);
 
		printf("----------Round %d\n", round);
 
		connector_get(c, ports[3]);
 
		rw_err_peek(c);
 
		connector_sync(c, 1000);
src/ffi/socket_api.rs
Show inline comments
 
use super::*;
 
use atomic_refcell::AtomicRefCell;
 
use mio::net::UdpSocket;
 

	
 
use std::{collections::HashMap, ffi::c_void, net::SocketAddr, os::raw::c_int, sync::RwLock};
 
///////////////////////////////////////////////////////////////////
 

	
 
type ConnectorFd = c_int;
 
struct Connector {}
 
struct FdAllocator {
 
    next: Option<ConnectorFd>,
 
    freed: Vec<ConnectorFd>,
 
    next: Option<c_int>,
 
    freed: Vec<c_int>,
 
}
 
enum MaybeConnector {
 
    New,
 
    Bound(SocketAddr),
 
    Connected(Connector),
 
    Bound { local_addr: SocketAddr },
 
    Connected { connector: Connector, putter: PortId, getter: PortId },
 
}
 
#[derive(Default)]
 
struct ConnectorStorage {
 
    fd_to_connector: HashMap<ConnectorFd, AtomicRefCell<MaybeConnector>>,
 
    fd_to_connector: HashMap<c_int, AtomicRefCell<MaybeConnector>>,
 
    fd_allocator: FdAllocator,
 
}
 
///////////////////////////////////////////////////////////////////
 
@@ -31,7 +29,7 @@ impl Default for FdAllocator {
 
    }
 
}
 
impl FdAllocator {
 
    fn alloc(&mut self) -> ConnectorFd {
 
    fn alloc(&mut self) -> c_int {
 
        if let Some(fd) = self.freed.pop() {
 
            return fd;
 
        }
 
@@ -41,7 +39,7 @@ impl FdAllocator {
 
        }
 
        panic!("No more Connector FDs to allocate!")
 
    }
 
    fn free(&mut self, fd: ConnectorFd) {
 
    fn free(&mut self, fd: c_int) {
 
        self.freed.push(fd);
 
    }
 
}
 
@@ -60,7 +58,7 @@ pub extern "C" fn rw_socket(_domain: c_int, _type: c_int) -> c_int {
 
}
 

	
 
#[no_mangle]
 
pub extern "C" fn rw_close(fd: ConnectorFd, _how: c_int) -> c_int {
 
pub extern "C" fn rw_close(fd: c_int, _how: c_int) -> c_int {
 
    // ignoring HOW
 
    let mut w = if let Ok(w) = CONNECTOR_STORAGE.write() { w } else { return FD_LOCK_POISONED };
 
    w.fd_allocator.free(fd);
 
@@ -73,9 +71,9 @@ pub extern "C" fn rw_close(fd: ConnectorFd, _how: c_int) -> c_int {
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn rw_bind(
 
    fd: ConnectorFd,
 
    address: *const SocketAddr,
 
    _address_len: usize,
 
    fd: c_int,
 
    local_addr: *const SocketAddr,
 
    _addr_len: usize,
 
) -> c_int {
 
    use MaybeConnector as Mc;
 
    // assuming _domain is AF_INET and _type is SOCK_DGRAM
 
@@ -83,14 +81,14 @@ pub unsafe extern "C" fn rw_bind(
 
    let mc = if let Some(mc) = r.fd_to_connector.get(&fd) { mc } else { return BAD_FD };
 
    let mc: &mut Mc = &mut mc.borrow_mut();
 
    let _ = if let Mc::New = mc { () } else { return WRONG_STATE };
 
    *mc = Mc::Bound(address.read());
 
    *mc = Mc::Bound { local_addr: local_addr.read() };
 
    ERR_OK
 
}
 

	
 
#[no_mangle]
 
pub extern "C" fn rw_connect(
 
    fd: ConnectorFd,
 
    _address: *const SocketAddr,
 
pub unsafe extern "C" fn rw_connect(
 
    fd: c_int,
 
    peer_addr: *const SocketAddr,
 
    _address_len: usize,
 
) -> c_int {
 
    use MaybeConnector as Mc;
 
@@ -98,24 +96,75 @@ pub extern "C" fn rw_connect(
 
    let r = if let Ok(r) = CONNECTOR_STORAGE.read() { r } else { return FD_LOCK_POISONED };
 
    let mc = if let Some(mc) = r.fd_to_connector.get(&fd) { mc } else { return BAD_FD };
 
    let mc: &mut Mc = &mut mc.borrow_mut();
 
    let _local = if let Mc::Bound(local) = mc { local } else { return WRONG_STATE };
 
    *mc = Mc::Connected(Connector {});
 
    let local_addr =
 
        if let Mc::Bound { local_addr } = mc { local_addr } else { return WRONG_STATE };
 
    let peer_addr = peer_addr.read();
 
    let (connector, [putter, getter]) = {
 
        let mut c = Connector::new(
 
            Box::new(DummyLogger),
 
            crate::TRIVIAL_PD.clone(),
 
            Connector::random_id(),
 
            8,
 
        );
 
        let [putter, getter] = c.new_udp_port(*local_addr, peer_addr).unwrap();
 
        (c, [putter, getter])
 
    };
 
    *mc = Mc::Connected { connector, putter, getter };
 
    ERR_OK
 
}
 
#[no_mangle]
 
pub extern "C" fn rw_send(
 
    fd: ConnectorFd,
 
    _msg: *const c_void,
 
    _len: usize,
 
pub unsafe extern "C" fn rw_send(
 
    fd: c_int,
 
    bytes_ptr: *const c_void,
 
    bytes_len: usize,
 
    _flags: c_int,
 
) -> isize {
 
    use MaybeConnector as Mc;
 
    // assuming _domain is AF_INET and _type is SOCK_DGRAM
 
    // ignoring flags
 
    let r =
 
        if let Ok(r) = CONNECTOR_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize };
 
    let mc = if let Some(mc) = r.fd_to_connector.get(&fd) { mc } else { return BAD_FD as isize };
 
    let mc: &mut Mc = &mut mc.borrow_mut();
 
    let (connector, putter) = if let Mc::Connected { connector, putter, .. } = mc {
 
        (connector, *putter)
 
    } else {
 
        return WRONG_STATE as isize;
 
    };
 
    match connector_put_bytes(connector, putter, bytes_ptr as _, bytes_len) {
 
        ERR_OK => {}
 
        err => return err as isize,
 
    }
 
    connector_sync(connector, -1)
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn rw_recv(
 
    fd: c_int,
 
    bytes_ptr: *mut c_void,
 
    bytes_len: usize,
 
    _flags: c_int,
 
) -> isize {
 
    use MaybeConnector as Mc;
 
    // ignoring flags
 
    let r =
 
        if let Ok(r) = CONNECTOR_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize };
 
    let mc = if let Some(mc) = r.fd_to_connector.get(&fd) { mc } else { return BAD_FD as isize };
 
    let mc: &mut Mc = &mut mc.borrow_mut();
 
    let _c = if let Mc::Connected(c) = mc { c } else { return WRONG_STATE as isize };
 
    // TODO
 
    ERR_OK as isize
 
    let (connector, getter) = if let Mc::Connected { connector, getter, .. } = mc {
 
        (connector, *getter)
 
    } else {
 
        return WRONG_STATE as isize;
 
    };
 
    match connector_get(connector, getter) {
 
        ERR_OK => {}
 
        err => return err as isize,
 
    }
 
    match connector_sync(connector, -1) {
 
        0 => {} // singleton batch index
 
        err => return err as isize,
 
    };
 
    let slice = connector.gotten(getter).unwrap().as_slice();
 
    let copied_bytes = slice.len().min(bytes_len);
 
    std::ptr::copy_nonoverlapping(slice.as_ptr(), bytes_ptr as *mut u8, copied_bytes);
 
    copied_bytes as isize
 
}
src/lib.rs
Show inline comments
 
@@ -6,7 +6,7 @@ mod protocol;
 
mod runtime;
 

	
 
pub use common::{ConnectorId, EndpointPolarity, Payload, Polarity, PortId};
 
pub use protocol::ProtocolDescription;
 
pub use protocol::{ProtocolDescription, TRIVIAL_PD};
 
pub use runtime::{error, Connector, DummyLogger, FileLogger, VecLogger};
 

	
 
#[cfg(feature = "ffi")]
src/protocol/mod.rs
Show inline comments
 
@@ -6,6 +6,12 @@ mod lexer;
 
// mod library;
 
mod parser;
 

	
 
lazy_static::lazy_static! {
 
    pub static ref TRIVIAL_PD: std::sync::Arc<ProtocolDescription> = {
 
        std::sync::Arc::new(ProtocolDescription::parse(b"").unwrap())
 
    };
 
}
 

	
 
use crate::common::*;
 
use crate::protocol::ast::*;
 
use crate::protocol::eval::*;
src/runtime/communication.rs
Show inline comments
 
@@ -36,7 +36,7 @@ struct BranchingProtoComponent {
 
}
 
#[derive(Debug, Clone)]
 
struct ProtoComponentBranch {
 
    did_put: HashSet<PortId>,
 
    did_put_or_get: HashSet<PortId>,
 
    inbox: HashMap<PortId, Payload>,
 
    state: ComponentState,
 
    untaken_choice: Option<u16>,
 
@@ -824,6 +824,7 @@ impl BranchingProtoComponent {
 
                predicate: &predicate,
 
                port_info: &cu.port_info,
 
                inbox: &branch.inbox,
 
                did_put_or_get: &mut branch.did_put_or_get,
 
            };
 
            let blocker = branch.state.sync_run(&mut ctx, &cu.proto_description);
 
            log!(
 
@@ -852,10 +853,7 @@ impl BranchingProtoComponent {
 
                    // make concrete all variables
 
                    for port in ports.iter() {
 
                        let var = cu.port_info.spec_var_for(*port);
 
                        let should_have_fired = match cu.port_info.polarities.get(port).unwrap() {
 
                            Polarity::Getter => branch.inbox.contains_key(port),
 
                            Polarity::Putter => branch.did_put.contains(port),
 
                        };
 
                        let should_have_fired = branch.did_put_or_get.contains(port);
 
                        let val = *predicate.assigned.entry(var).or_insert(SpecVal::SILENT);
 
                        let did_fire = val == SpecVal::FIRING;
 
                        if did_fire != should_have_fired {
 
@@ -901,7 +899,7 @@ impl BranchingProtoComponent {
 
                        drop((predicate, branch));
 
                    } else {
 
                        // keep in "unblocked"
 
                        branch.did_put.insert(putter);
 
                        branch.did_put_or_get.insert(putter);
 
                        log!(cu.logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, putter, var);
 
                        let msg = SendPayloadMsg { predicate: predicate.clone(), payload };
 
                        rctx.getter_buffer.putter_add(cu, putter, msg);
 
@@ -1011,7 +1009,7 @@ impl BranchingProtoComponent {
 
    fn initial(ProtoComponent { state, ports }: ProtoComponent) -> Self {
 
        let branch = ProtoComponentBranch {
 
            inbox: Default::default(),
 
            did_put: Default::default(),
 
            did_put_or_get: Default::default(),
 
            state,
 
            ended: false,
 
            untaken_choice: None,
 
@@ -1142,6 +1140,7 @@ impl SyncProtoContext<'_> {
 
        self.predicate.query(var).map(SpecVal::is_firing)
 
    }
 
    pub(crate) fn read_msg(&mut self, port: PortId) -> Option<&Payload> {
 
        self.did_put_or_get.insert(port);
 
        self.inbox.get(&port)
 
    }
 
    pub(crate) fn take_choice(&mut self) -> Option<u16> {
src/runtime/mod.rs
Show inline comments
 
@@ -20,7 +20,7 @@ pub struct Connector {
 
    unphased: ConnectorUnphased,
 
    phased: ConnectorPhased,
 
}
 
pub trait Logger: Debug {
 
pub trait Logger: Debug + Send + Sync {
 
    fn line_writer(&mut self) -> Option<&mut dyn std::io::Write>;
 
}
 
#[derive(Debug)]
 
@@ -39,6 +39,7 @@ pub(crate) struct NonsyncProtoContext<'a> {
 
}
 
pub(crate) struct SyncProtoContext<'a> {
 
    logger: &'a mut dyn Logger,
 
    did_put_or_get: &'a mut HashSet<PortId>,
 
    untaken_choice: &'a mut Option<u16>,
 
    predicate: &'a Predicate,
 
    port_info: &'a PortInfo,
src/runtime/tests.rs
Show inline comments
 
@@ -790,7 +790,10 @@ fn udp_reowolf_swap() {
 
            let udp = std::net::UdpSocket::bind(sock_addrs[1]).unwrap();
 
            udp.connect(sock_addrs[0]).unwrap();
 
            let mut buf = new_u8_buffer(256);
 
            for _ in 0..5 {
 
                std::thread::sleep(Duration::from_millis(60));
 
                udp.send(TEST_MSG_BYTES).unwrap();
 
            }
 
            let len = udp.recv(&mut buf).unwrap();
 
            assert_eq!(TEST_MSG_BYTES, &buf[0..len]);
 
            barrier.wait();
0 comments (0 inline, 0 general)