Changeset - 6c04a99de862
[Not reviewed]
0 9 0
Christopher Esterhuyse - 5 years ago 2020-07-15 12:24:04
christopher.esterhuyse@gmail.com
fleshed out socket api. did bugfix: proto components remember whether they did put/get
9 files changed with 100 insertions and 41 deletions:
0 comments (0 inline, 0 general)
Cargo.toml
Show inline comments
 
@@ -19,15 +19,15 @@ getrandom = "0.1.14" # tiny crate. used to guess controller-id
 

	
 
# network
 
mio = { version = "0.7.0", package = "mio", features = ["udp", "tcp", "os-poll"] }
 

	
 
# protocol
 
backtrace = "0.3"
 
lazy_static = "1.4.0"
 

	
 
# socket ffi
 
lazy_static = { version = "1.4.0", optional = true}
 
atomic_refcell = { version = "0.1.6", optional = true }
 

	
 
[dev-dependencies]
 
# test-generator = "0.3.0"
 
crossbeam-utils = "0.7.2"
 
lazy_static = "1.4.0"
 
@@ -36,9 +36,9 @@ lazy_static = "1.4.0"
 
# compile target: dynamically linked library using C ABI
 
crate-type = ["cdylib"]
 

	
 
[features]
 
default = ["ffi", "ffi_socket_api"] # // "session_optimization", 
 
ffi = [] # see src/ffi.rs
 
ffi_socket_api = ["ffi", "lazy_static", "atomic_refcell"]
 
ffi_socket_api = ["ffi", "atomic_refcell"]
 
endpoint_logging = [] # see src/macros.rs
 
session_optimization = [] # see src/runtime/setup.rs
 
\ No newline at end of file
examples/make.py
Show inline comments
 
import os, glob, subprocess, time
 
script_path = os.path.dirname(os.path.realpath(__file__));
 
for c_file in glob.glob(script_path + "/*/*.c", recursive=False):
 
  print("compiling", c_file)
 
  args = [
 
    "gcc",          # compiler
 
    "-std=c11"      # C11 mode
 
    "-L",           # lib path flag
 
    "./",           # where to look for libs
 
    "-lreowolf_rs", # add lib called "reowolf_rs"
 
    "-Wl,-R./",     # pass -R flag to linker: produce relocatable object
 
    c_file,         # input source file
 
    "-o",           # output flag
examples/pres_5/bob.c
Show inline comments
 
@@ -26,13 +26,13 @@ int main(int argc, char** argv) {
 
	
 
	// Connect with peers (5000ms timeout).
 
	connector_connect(c, 5000);
 
	rw_err_peek(c);
 

	
 
	for(int round=0; round<3; round++) {
 
		printf("\----------Round %d\n", round);
 
		printf("----------Round %d\n", round);
 
		connector_get(c, ports[3]);
 
		rw_err_peek(c);
 
		connector_sync(c, 1000);
 
		rw_err_peek(c);
 

	
 
		size_t msg_len = 0;
src/ffi/socket_api.rs
Show inline comments
 
use super::*;
 
use atomic_refcell::AtomicRefCell;
 
use mio::net::UdpSocket;
 

	
 
use std::{collections::HashMap, ffi::c_void, net::SocketAddr, os::raw::c_int, sync::RwLock};
 
///////////////////////////////////////////////////////////////////
 

	
 
type ConnectorFd = c_int;
 
struct Connector {}
 
struct FdAllocator {
 
    next: Option<ConnectorFd>,
 
    freed: Vec<ConnectorFd>,
 
    next: Option<c_int>,
 
    freed: Vec<c_int>,
 
}
 
enum MaybeConnector {
 
    New,
 
    Bound(SocketAddr),
 
    Connected(Connector),
 
    Bound { local_addr: SocketAddr },
 
    Connected { connector: Connector, putter: PortId, getter: PortId },
 
}
 
#[derive(Default)]
 
struct ConnectorStorage {
 
    fd_to_connector: HashMap<ConnectorFd, AtomicRefCell<MaybeConnector>>,
 
    fd_to_connector: HashMap<c_int, AtomicRefCell<MaybeConnector>>,
 
    fd_allocator: FdAllocator,
 
}
 
///////////////////////////////////////////////////////////////////
 

	
 
impl Default for FdAllocator {
 
    fn default() -> Self {
 
@@ -28,23 +26,23 @@ impl Default for FdAllocator {
 
            next: Some(0), // positive values used only
 
            freed: vec![],
 
        }
 
    }
 
}
 
impl FdAllocator {
 
    fn alloc(&mut self) -> ConnectorFd {
 
    fn alloc(&mut self) -> c_int {
 
        if let Some(fd) = self.freed.pop() {
 
            return fd;
 
        }
 
        if let Some(fd) = self.next {
 
            self.next = fd.checked_add(1);
 
            return fd;
 
        }
 
        panic!("No more Connector FDs to allocate!")
 
    }
 
    fn free(&mut self, fd: ConnectorFd) {
 
    fn free(&mut self, fd: c_int) {
 
        self.freed.push(fd);
 
    }
 
}
 
lazy_static::lazy_static! {
 
    static ref CONNECTOR_STORAGE: RwLock<ConnectorStorage> = Default::default();
 
}
 
@@ -57,65 +55,116 @@ pub extern "C" fn rw_socket(_domain: c_int, _type: c_int) -> c_int {
 
    let fd = w.fd_allocator.alloc();
 
    w.fd_to_connector.insert(fd, AtomicRefCell::new(MaybeConnector::New));
 
    fd
 
}
 

	
 
#[no_mangle]
 
pub extern "C" fn rw_close(fd: ConnectorFd, _how: c_int) -> c_int {
 
pub extern "C" fn rw_close(fd: c_int, _how: c_int) -> c_int {
 
    // ignoring HOW
 
    let mut w = if let Ok(w) = CONNECTOR_STORAGE.write() { w } else { return FD_LOCK_POISONED };
 
    w.fd_allocator.free(fd);
 
    if w.fd_to_connector.remove(&fd).is_some() {
 
        ERR_OK
 
    } else {
 
        CLOSE_FAIL
 
    }
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn rw_bind(
 
    fd: ConnectorFd,
 
    address: *const SocketAddr,
 
    _address_len: usize,
 
    fd: c_int,
 
    local_addr: *const SocketAddr,
 
    _addr_len: usize,
 
) -> c_int {
 
    use MaybeConnector as Mc;
 
    // assuming _domain is AF_INET and _type is SOCK_DGRAM
 
    let r = if let Ok(r) = CONNECTOR_STORAGE.read() { r } else { return FD_LOCK_POISONED };
 
    let mc = if let Some(mc) = r.fd_to_connector.get(&fd) { mc } else { return BAD_FD };
 
    let mc: &mut Mc = &mut mc.borrow_mut();
 
    let _ = if let Mc::New = mc { () } else { return WRONG_STATE };
 
    *mc = Mc::Bound(address.read());
 
    *mc = Mc::Bound { local_addr: local_addr.read() };
 
    ERR_OK
 
}
 

	
 
#[no_mangle]
 
pub extern "C" fn rw_connect(
 
    fd: ConnectorFd,
 
    _address: *const SocketAddr,
 
pub unsafe extern "C" fn rw_connect(
 
    fd: c_int,
 
    peer_addr: *const SocketAddr,
 
    _address_len: usize,
 
) -> c_int {
 
    use MaybeConnector as Mc;
 
    // assuming _domain is AF_INET and _type is SOCK_DGRAM
 
    let r = if let Ok(r) = CONNECTOR_STORAGE.read() { r } else { return FD_LOCK_POISONED };
 
    let mc = if let Some(mc) = r.fd_to_connector.get(&fd) { mc } else { return BAD_FD };
 
    let mc: &mut Mc = &mut mc.borrow_mut();
 
    let _local = if let Mc::Bound(local) = mc { local } else { return WRONG_STATE };
 
    *mc = Mc::Connected(Connector {});
 
    let local_addr =
 
        if let Mc::Bound { local_addr } = mc { local_addr } else { return WRONG_STATE };
 
    let peer_addr = peer_addr.read();
 
    let (connector, [putter, getter]) = {
 
        let mut c = Connector::new(
 
            Box::new(DummyLogger),
 
            crate::TRIVIAL_PD.clone(),
 
            Connector::random_id(),
 
            8,
 
        );
 
        let [putter, getter] = c.new_udp_port(*local_addr, peer_addr).unwrap();
 
        (c, [putter, getter])
 
    };
 
    *mc = Mc::Connected { connector, putter, getter };
 
    ERR_OK
 
}
 
#[no_mangle]
 
pub extern "C" fn rw_send(
 
    fd: ConnectorFd,
 
    _msg: *const c_void,
 
    _len: usize,
 
pub unsafe extern "C" fn rw_send(
 
    fd: c_int,
 
    bytes_ptr: *const c_void,
 
    bytes_len: usize,
 
    _flags: c_int,
 
) -> isize {
 
    use MaybeConnector as Mc;
 
    // assuming _domain is AF_INET and _type is SOCK_DGRAM
 
    // ignoring flags
 
    let r =
 
        if let Ok(r) = CONNECTOR_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize };
 
    let mc = if let Some(mc) = r.fd_to_connector.get(&fd) { mc } else { return BAD_FD as isize };
 
    let mc: &mut Mc = &mut mc.borrow_mut();
 
    let _c = if let Mc::Connected(c) = mc { c } else { return WRONG_STATE as isize };
 
    // TODO
 
    ERR_OK as isize
 
    let (connector, putter) = if let Mc::Connected { connector, putter, .. } = mc {
 
        (connector, *putter)
 
    } else {
 
        return WRONG_STATE as isize;
 
    };
 
    match connector_put_bytes(connector, putter, bytes_ptr as _, bytes_len) {
 
        ERR_OK => {}
 
        err => return err as isize,
 
    }
 
    connector_sync(connector, -1)
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn rw_recv(
 
    fd: c_int,
 
    bytes_ptr: *mut c_void,
 
    bytes_len: usize,
 
    _flags: c_int,
 
) -> isize {
 
    use MaybeConnector as Mc;
 
    // ignoring flags
 
    let r =
 
        if let Ok(r) = CONNECTOR_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize };
 
    let mc = if let Some(mc) = r.fd_to_connector.get(&fd) { mc } else { return BAD_FD as isize };
 
    let mc: &mut Mc = &mut mc.borrow_mut();
 
    let (connector, getter) = if let Mc::Connected { connector, getter, .. } = mc {
 
        (connector, *getter)
 
    } else {
 
        return WRONG_STATE as isize;
 
    };
 
    match connector_get(connector, getter) {
 
        ERR_OK => {}
 
        err => return err as isize,
 
    }
 
    match connector_sync(connector, -1) {
 
        0 => {} // singleton batch index
 
        err => return err as isize,
 
    };
 
    let slice = connector.gotten(getter).unwrap().as_slice();
 
    let copied_bytes = slice.len().min(bytes_len);
 
    std::ptr::copy_nonoverlapping(slice.as_ptr(), bytes_ptr as *mut u8, copied_bytes);
 
    copied_bytes as isize
 
}
src/lib.rs
Show inline comments
 
@@ -3,11 +3,11 @@ mod macros;
 

	
 
mod common;
 
mod protocol;
 
mod runtime;
 

	
 
pub use common::{ConnectorId, EndpointPolarity, Payload, Polarity, PortId};
 
pub use protocol::ProtocolDescription;
 
pub use protocol::{ProtocolDescription, TRIVIAL_PD};
 
pub use runtime::{error, Connector, DummyLogger, FileLogger, VecLogger};
 

	
 
#[cfg(feature = "ffi")]
 
pub mod ffi;
src/protocol/mod.rs
Show inline comments
 
@@ -3,12 +3,18 @@ mod ast;
 
mod eval;
 
pub(crate) mod inputsource;
 
mod lexer;
 
// mod library;
 
mod parser;
 

	
 
lazy_static::lazy_static! {
 
    pub static ref TRIVIAL_PD: std::sync::Arc<ProtocolDescription> = {
 
        std::sync::Arc::new(ProtocolDescription::parse(b"").unwrap())
 
    };
 
}
 

	
 
use crate::common::*;
 
use crate::protocol::ast::*;
 
use crate::protocol::eval::*;
 
use crate::protocol::inputsource::*;
 
use crate::protocol::parser::*;
 

	
src/runtime/communication.rs
Show inline comments
 
@@ -33,13 +33,13 @@ struct SolutionStorage {
 
struct BranchingProtoComponent {
 
    ports: HashSet<PortId>,
 
    branches: HashMap<Predicate, ProtoComponentBranch>,
 
}
 
#[derive(Debug, Clone)]
 
struct ProtoComponentBranch {
 
    did_put: HashSet<PortId>,
 
    did_put_or_get: HashSet<PortId>,
 
    inbox: HashMap<PortId, Payload>,
 
    state: ComponentState,
 
    untaken_choice: Option<u16>,
 
    ended: bool,
 
}
 
struct CyclicDrainer<'a, K: Eq + Hash, V> {
 
@@ -821,12 +821,13 @@ impl BranchingProtoComponent {
 
            let mut ctx = SyncProtoContext {
 
                untaken_choice: &mut branch.untaken_choice,
 
                logger: &mut *cu.logger,
 
                predicate: &predicate,
 
                port_info: &cu.port_info,
 
                inbox: &branch.inbox,
 
                did_put_or_get: &mut branch.did_put_or_get,
 
            };
 
            let blocker = branch.state.sync_run(&mut ctx, &cu.proto_description);
 
            log!(
 
                cu.logger,
 
                "Proto component with id {:?} branch with pred {:?} hit blocker {:?}",
 
                proto_component_id,
 
@@ -849,16 +850,13 @@ impl BranchingProtoComponent {
 
                    drop((predicate, branch));
 
                }
 
                B::SyncBlockEnd => {
 
                    // make concrete all variables
 
                    for port in ports.iter() {
 
                        let var = cu.port_info.spec_var_for(*port);
 
                        let should_have_fired = match cu.port_info.polarities.get(port).unwrap() {
 
                            Polarity::Getter => branch.inbox.contains_key(port),
 
                            Polarity::Putter => branch.did_put.contains(port),
 
                        };
 
                        let should_have_fired = branch.did_put_or_get.contains(port);
 
                        let val = *predicate.assigned.entry(var).or_insert(SpecVal::SILENT);
 
                        let did_fire = val == SpecVal::FIRING;
 
                        if did_fire != should_have_fired {
 
                            log!(cu.logger, "Inconsistent wrt. port {:?} var {:?} val {:?} did_fire={}, should_have_fired={}", port, var, val, did_fire, should_have_fired);
 
                            // IMPLICIT inconsistency
 
                            drop((predicate, branch));
 
@@ -898,13 +896,13 @@ impl BranchingProtoComponent {
 
                    if was == Some(SpecVal::SILENT) {
 
                        log!(cu.logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, putter, var);
 
                        // discard forever
 
                        drop((predicate, branch));
 
                    } else {
 
                        // keep in "unblocked"
 
                        branch.did_put.insert(putter);
 
                        branch.did_put_or_get.insert(putter);
 
                        log!(cu.logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, putter, var);
 
                        let msg = SendPayloadMsg { predicate: predicate.clone(), payload };
 
                        rctx.getter_buffer.putter_add(cu, putter, msg);
 
                        drainer.add_input(predicate, branch);
 
                    }
 
                }
 
@@ -1008,13 +1006,13 @@ impl BranchingProtoComponent {
 
        }
 
        panic!("ProtoComponent had no branches matching pred {:?}", solution_predicate);
 
    }
 
    fn initial(ProtoComponent { state, ports }: ProtoComponent) -> Self {
 
        let branch = ProtoComponentBranch {
 
            inbox: Default::default(),
 
            did_put: Default::default(),
 
            did_put_or_get: Default::default(),
 
            state,
 
            ended: false,
 
            untaken_choice: None,
 
        };
 
        Self { ports, branches: hashmap! { Predicate::default() => branch  } }
 
    }
 
@@ -1139,12 +1137,13 @@ impl GetterBuffer {
 
impl SyncProtoContext<'_> {
 
    pub(crate) fn is_firing(&mut self, port: PortId) -> Option<bool> {
 
        let var = self.port_info.spec_var_for(port);
 
        self.predicate.query(var).map(SpecVal::is_firing)
 
    }
 
    pub(crate) fn read_msg(&mut self, port: PortId) -> Option<&Payload> {
 
        self.did_put_or_get.insert(port);
 
        self.inbox.get(&port)
 
    }
 
    pub(crate) fn take_choice(&mut self) -> Option<u16> {
 
        self.untaken_choice.take()
 
    }
 
}
src/runtime/mod.rs
Show inline comments
 
@@ -17,13 +17,13 @@ use mio::net::UdpSocket;
 

	
 
#[derive(Debug)]
 
pub struct Connector {
 
    unphased: ConnectorUnphased,
 
    phased: ConnectorPhased,
 
}
 
pub trait Logger: Debug {
 
pub trait Logger: Debug + Send + Sync {
 
    fn line_writer(&mut self) -> Option<&mut dyn std::io::Write>;
 
}
 
#[derive(Debug)]
 
pub struct VecLogger(ConnectorId, Vec<u8>);
 
#[derive(Debug)]
 
pub struct DummyLogger;
 
@@ -36,12 +36,13 @@ pub(crate) struct NonsyncProtoContext<'a> {
 
    id_manager: &'a mut IdManager,
 
    proto_component_ports: &'a mut HashSet<PortId>,
 
    unrun_components: &'a mut Vec<(ProtoComponentId, ProtoComponent)>,
 
}
 
pub(crate) struct SyncProtoContext<'a> {
 
    logger: &'a mut dyn Logger,
 
    did_put_or_get: &'a mut HashSet<PortId>,
 
    untaken_choice: &'a mut Option<u16>,
 
    predicate: &'a Predicate,
 
    port_info: &'a PortInfo,
 
    inbox: &'a HashMap<PortId, Payload>,
 
}
 

	
src/runtime/tests.rs
Show inline comments
 
@@ -787,13 +787,16 @@ fn udp_reowolf_swap() {
 
        s.spawn(|_| {
 
            barrier.wait();
 
            // udp thread
 
            let udp = std::net::UdpSocket::bind(sock_addrs[1]).unwrap();
 
            udp.connect(sock_addrs[0]).unwrap();
 
            let mut buf = new_u8_buffer(256);
 
            udp.send(TEST_MSG_BYTES).unwrap();
 
            for _ in 0..5 {
 
                std::thread::sleep(Duration::from_millis(60));
 
                udp.send(TEST_MSG_BYTES).unwrap();
 
            }
 
            let len = udp.recv(&mut buf).unwrap();
 
            assert_eq!(TEST_MSG_BYTES, &buf[0..len]);
 
            barrier.wait();
 
        });
 
    })
 
    .unwrap();
0 comments (0 inline, 0 general)