diff --git a/Cargo.toml b/Cargo.toml index d79e155dc03beb1018665b10cb3c5c9de4b754f4..b22eea2544e553c4a4f43ce37ffab0605f2626b7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,9 +22,9 @@ mio = { version = "0.7.0", package = "mio", features = ["udp", "tcp", "os-poll"] # protocol backtrace = "0.3" +lazy_static = "1.4.0" # socket ffi -lazy_static = { version = "1.4.0", optional = true} atomic_refcell = { version = "0.1.6", optional = true } [dev-dependencies] @@ -39,6 +39,6 @@ crate-type = ["cdylib"] [features] default = ["ffi", "ffi_socket_api"] # // "session_optimization", ffi = [] # see src/ffi.rs -ffi_socket_api = ["ffi", "lazy_static", "atomic_refcell"] +ffi_socket_api = ["ffi", "atomic_refcell"] endpoint_logging = [] # see src/macros.rs session_optimization = [] # see src/runtime/setup.rs \ No newline at end of file diff --git a/examples/make.py b/examples/make.py index 7f208d92df6dc51097c91f70a9eaa970741cc429..40da5721455880d48e559995ce51e7f9875c29cb 100644 --- a/examples/make.py +++ b/examples/make.py @@ -4,6 +4,7 @@ for c_file in glob.glob(script_path + "/*/*.c", recursive=False): print("compiling", c_file) args = [ "gcc", # compiler + "-std=c11" # C11 mode "-L", # lib path flag "./", # where to look for libs "-lreowolf_rs", # add lib called "reowolf_rs" diff --git a/examples/pres_5/bob.c b/examples/pres_5/bob.c index 8454c9cfc2f4f72eb5da965ea540be54fd13e2fc..41d053143df4bb8c97590e6e0ec7123c9024deea 100644 --- a/examples/pres_5/bob.c +++ b/examples/pres_5/bob.c @@ -29,7 +29,7 @@ int main(int argc, char** argv) { rw_err_peek(c); for(int round=0; round<3; round++) { - printf("\----------Round %d\n", round); + printf("----------Round %d\n", round); connector_get(c, ports[3]); rw_err_peek(c); connector_sync(c, 1000); diff --git a/src/ffi/socket_api.rs b/src/ffi/socket_api.rs index d5869c9b181fbd42ec571fa7c149862fdfdd7633..8e701d1c72ff1c5cc800e2dbca35c14a3150757f 100644 --- a/src/ffi/socket_api.rs +++ b/src/ffi/socket_api.rs @@ -1,23 +1,21 @@ use super::*; use atomic_refcell::AtomicRefCell; -use mio::net::UdpSocket; + use std::{collections::HashMap, ffi::c_void, net::SocketAddr, os::raw::c_int, sync::RwLock}; /////////////////////////////////////////////////////////////////// -type ConnectorFd = c_int; -struct Connector {} struct FdAllocator { - next: Option, - freed: Vec, + next: Option, + freed: Vec, } enum MaybeConnector { New, - Bound(SocketAddr), - Connected(Connector), + Bound { local_addr: SocketAddr }, + Connected { connector: Connector, putter: PortId, getter: PortId }, } #[derive(Default)] struct ConnectorStorage { - fd_to_connector: HashMap>, + fd_to_connector: HashMap>, fd_allocator: FdAllocator, } /////////////////////////////////////////////////////////////////// @@ -31,7 +29,7 @@ impl Default for FdAllocator { } } impl FdAllocator { - fn alloc(&mut self) -> ConnectorFd { + fn alloc(&mut self) -> c_int { if let Some(fd) = self.freed.pop() { return fd; } @@ -41,7 +39,7 @@ impl FdAllocator { } panic!("No more Connector FDs to allocate!") } - fn free(&mut self, fd: ConnectorFd) { + fn free(&mut self, fd: c_int) { self.freed.push(fd); } } @@ -60,7 +58,7 @@ pub extern "C" fn rw_socket(_domain: c_int, _type: c_int) -> c_int { } #[no_mangle] -pub extern "C" fn rw_close(fd: ConnectorFd, _how: c_int) -> c_int { +pub extern "C" fn rw_close(fd: c_int, _how: c_int) -> c_int { // ignoring HOW let mut w = if let Ok(w) = CONNECTOR_STORAGE.write() { w } else { return FD_LOCK_POISONED }; w.fd_allocator.free(fd); @@ -73,9 +71,9 @@ pub extern "C" fn rw_close(fd: ConnectorFd, _how: c_int) -> c_int { #[no_mangle] pub unsafe extern "C" fn rw_bind( - fd: ConnectorFd, - address: *const SocketAddr, - _address_len: usize, + fd: c_int, + local_addr: *const SocketAddr, + _addr_len: usize, ) -> c_int { use MaybeConnector as Mc; // assuming _domain is AF_INET and _type is SOCK_DGRAM @@ -83,14 +81,14 @@ pub unsafe extern "C" fn rw_bind( let mc = if let Some(mc) = r.fd_to_connector.get(&fd) { mc } else { return BAD_FD }; let mc: &mut Mc = &mut mc.borrow_mut(); let _ = if let Mc::New = mc { () } else { return WRONG_STATE }; - *mc = Mc::Bound(address.read()); + *mc = Mc::Bound { local_addr: local_addr.read() }; ERR_OK } #[no_mangle] -pub extern "C" fn rw_connect( - fd: ConnectorFd, - _address: *const SocketAddr, +pub unsafe extern "C" fn rw_connect( + fd: c_int, + peer_addr: *const SocketAddr, _address_len: usize, ) -> c_int { use MaybeConnector as Mc; @@ -98,24 +96,75 @@ pub extern "C" fn rw_connect( let r = if let Ok(r) = CONNECTOR_STORAGE.read() { r } else { return FD_LOCK_POISONED }; let mc = if let Some(mc) = r.fd_to_connector.get(&fd) { mc } else { return BAD_FD }; let mc: &mut Mc = &mut mc.borrow_mut(); - let _local = if let Mc::Bound(local) = mc { local } else { return WRONG_STATE }; - *mc = Mc::Connected(Connector {}); + let local_addr = + if let Mc::Bound { local_addr } = mc { local_addr } else { return WRONG_STATE }; + let peer_addr = peer_addr.read(); + let (connector, [putter, getter]) = { + let mut c = Connector::new( + Box::new(DummyLogger), + crate::TRIVIAL_PD.clone(), + Connector::random_id(), + 8, + ); + let [putter, getter] = c.new_udp_port(*local_addr, peer_addr).unwrap(); + (c, [putter, getter]) + }; + *mc = Mc::Connected { connector, putter, getter }; ERR_OK } #[no_mangle] -pub extern "C" fn rw_send( - fd: ConnectorFd, - _msg: *const c_void, - _len: usize, +pub unsafe extern "C" fn rw_send( + fd: c_int, + bytes_ptr: *const c_void, + bytes_len: usize, _flags: c_int, ) -> isize { use MaybeConnector as Mc; - // assuming _domain is AF_INET and _type is SOCK_DGRAM + // ignoring flags let r = if let Ok(r) = CONNECTOR_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize }; let mc = if let Some(mc) = r.fd_to_connector.get(&fd) { mc } else { return BAD_FD as isize }; let mc: &mut Mc = &mut mc.borrow_mut(); - let _c = if let Mc::Connected(c) = mc { c } else { return WRONG_STATE as isize }; - // TODO - ERR_OK as isize + let (connector, putter) = if let Mc::Connected { connector, putter, .. } = mc { + (connector, *putter) + } else { + return WRONG_STATE as isize; + }; + match connector_put_bytes(connector, putter, bytes_ptr as _, bytes_len) { + ERR_OK => {} + err => return err as isize, + } + connector_sync(connector, -1) +} + +#[no_mangle] +pub unsafe extern "C" fn rw_recv( + fd: c_int, + bytes_ptr: *mut c_void, + bytes_len: usize, + _flags: c_int, +) -> isize { + use MaybeConnector as Mc; + // ignoring flags + let r = + if let Ok(r) = CONNECTOR_STORAGE.read() { r } else { return FD_LOCK_POISONED as isize }; + let mc = if let Some(mc) = r.fd_to_connector.get(&fd) { mc } else { return BAD_FD as isize }; + let mc: &mut Mc = &mut mc.borrow_mut(); + let (connector, getter) = if let Mc::Connected { connector, getter, .. } = mc { + (connector, *getter) + } else { + return WRONG_STATE as isize; + }; + match connector_get(connector, getter) { + ERR_OK => {} + err => return err as isize, + } + match connector_sync(connector, -1) { + 0 => {} // singleton batch index + err => return err as isize, + }; + let slice = connector.gotten(getter).unwrap().as_slice(); + let copied_bytes = slice.len().min(bytes_len); + std::ptr::copy_nonoverlapping(slice.as_ptr(), bytes_ptr as *mut u8, copied_bytes); + copied_bytes as isize } diff --git a/src/lib.rs b/src/lib.rs index a4722a2a606086970e955fff42acf7393fbc5c64..082d77bfa9d40d481101efa02e087b80434f5322 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,7 +6,7 @@ mod protocol; mod runtime; pub use common::{ConnectorId, EndpointPolarity, Payload, Polarity, PortId}; -pub use protocol::ProtocolDescription; +pub use protocol::{ProtocolDescription, TRIVIAL_PD}; pub use runtime::{error, Connector, DummyLogger, FileLogger, VecLogger}; #[cfg(feature = "ffi")] diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index ebf22a0ae740cc082787d58171826616053c445e..fd1b4735baca5202d4ead0a21a34176cf8ff76c1 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -6,6 +6,12 @@ mod lexer; // mod library; mod parser; +lazy_static::lazy_static! { + pub static ref TRIVIAL_PD: std::sync::Arc = { + std::sync::Arc::new(ProtocolDescription::parse(b"").unwrap()) + }; +} + use crate::common::*; use crate::protocol::ast::*; use crate::protocol::eval::*; diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index faa4436a0ab12df295bcdc9d94f7b99aaeba378e..45bc7fce1c6e4f49c64ef2c07544b32831b2fd6c 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -36,7 +36,7 @@ struct BranchingProtoComponent { } #[derive(Debug, Clone)] struct ProtoComponentBranch { - did_put: HashSet, + did_put_or_get: HashSet, inbox: HashMap, state: ComponentState, untaken_choice: Option, @@ -824,6 +824,7 @@ impl BranchingProtoComponent { predicate: &predicate, port_info: &cu.port_info, inbox: &branch.inbox, + did_put_or_get: &mut branch.did_put_or_get, }; let blocker = branch.state.sync_run(&mut ctx, &cu.proto_description); log!( @@ -852,10 +853,7 @@ impl BranchingProtoComponent { // make concrete all variables for port in ports.iter() { let var = cu.port_info.spec_var_for(*port); - let should_have_fired = match cu.port_info.polarities.get(port).unwrap() { - Polarity::Getter => branch.inbox.contains_key(port), - Polarity::Putter => branch.did_put.contains(port), - }; + let should_have_fired = branch.did_put_or_get.contains(port); let val = *predicate.assigned.entry(var).or_insert(SpecVal::SILENT); let did_fire = val == SpecVal::FIRING; if did_fire != should_have_fired { @@ -901,7 +899,7 @@ impl BranchingProtoComponent { drop((predicate, branch)); } else { // keep in "unblocked" - branch.did_put.insert(putter); + branch.did_put_or_get.insert(putter); log!(cu.logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, putter, var); let msg = SendPayloadMsg { predicate: predicate.clone(), payload }; rctx.getter_buffer.putter_add(cu, putter, msg); @@ -1011,7 +1009,7 @@ impl BranchingProtoComponent { fn initial(ProtoComponent { state, ports }: ProtoComponent) -> Self { let branch = ProtoComponentBranch { inbox: Default::default(), - did_put: Default::default(), + did_put_or_get: Default::default(), state, ended: false, untaken_choice: None, @@ -1142,6 +1140,7 @@ impl SyncProtoContext<'_> { self.predicate.query(var).map(SpecVal::is_firing) } pub(crate) fn read_msg(&mut self, port: PortId) -> Option<&Payload> { + self.did_put_or_get.insert(port); self.inbox.get(&port) } pub(crate) fn take_choice(&mut self) -> Option { diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index cbcce7fdc9a26443da268cd3de30c6022c456516..bfd91f31a6e551566f2b7ea929a2e7c20aba7206 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -20,7 +20,7 @@ pub struct Connector { unphased: ConnectorUnphased, phased: ConnectorPhased, } -pub trait Logger: Debug { +pub trait Logger: Debug + Send + Sync { fn line_writer(&mut self) -> Option<&mut dyn std::io::Write>; } #[derive(Debug)] @@ -39,6 +39,7 @@ pub(crate) struct NonsyncProtoContext<'a> { } pub(crate) struct SyncProtoContext<'a> { logger: &'a mut dyn Logger, + did_put_or_get: &'a mut HashSet, untaken_choice: &'a mut Option, predicate: &'a Predicate, port_info: &'a PortInfo, diff --git a/src/runtime/tests.rs b/src/runtime/tests.rs index 0ec8e15677a52efb4c8473daec3baf1a21fd5ad2..d7d71a7a6eb2bd24ce08010375648fbcef3232e0 100644 --- a/src/runtime/tests.rs +++ b/src/runtime/tests.rs @@ -790,7 +790,10 @@ fn udp_reowolf_swap() { let udp = std::net::UdpSocket::bind(sock_addrs[1]).unwrap(); udp.connect(sock_addrs[0]).unwrap(); let mut buf = new_u8_buffer(256); - udp.send(TEST_MSG_BYTES).unwrap(); + for _ in 0..5 { + std::thread::sleep(Duration::from_millis(60)); + udp.send(TEST_MSG_BYTES).unwrap(); + } let len = udp.recv(&mut buf).unwrap(); assert_eq!(TEST_MSG_BYTES, &buf[0..len]); barrier.wait();