From 9485a0862e9094ea0049509248281908580c990d 2020-09-01 10:53:08 From: Christopher Esterhuyse Date: 2020-09-01 10:53:08 Subject: [PATCH] cleaner logging, and more options for connector creation exposed to C API --- diff --git a/Cargo.toml b/Cargo.toml index d6978deb8b50cbaffd0d11dd124dccbef5758d6e..e555a0d97ebe3d1cb46ebae6cdf79c9fd3a41ca2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,7 +43,7 @@ crate-type = [ ] [features] -default = ["ffi", "session_optimization"] +default = ["ffi"] ffi = [] # see src/ffi/mod.rs ffi_pseudo_socket_api = ["ffi", "libc", "os_socketaddr"]# see src/ffi/pseudo_socket_api.rs. endpoint_logging = [] # see src/macros.rs diff --git a/examples/bench_1/main.c b/examples/bench_1/main_16.c similarity index 90% rename from examples/bench_1/main.c rename to examples/bench_1/main_16.c index 7d1a5d43f11425e47879ac6ae716e5449ad4f875..4762e8c0d402ae7b39f9556874b2347435553b0d 100644 --- a/examples/bench_1/main.c +++ b/examples/bench_1/main_16.c @@ -2,7 +2,7 @@ #include "../utility.c" int main(int argc, char** argv) { Arc_ProtocolDescription * pd = protocol_description_parse("", 0); - char logpath[] = "./bench_1.txt"; + char logpath[] = "./1_16.txt"; Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1); rw_err_peek(c); @@ -23,8 +23,6 @@ int main(int argc, char** argv) { for(i=0; i<10; i++) { connector_put_bytes(c, putter, msg_ptr, msg_len); rw_err_peek(c); - - // ... reach new consistent state within 1000ms deadline. connector_sync(c, -1); rw_err_peek(c); } diff --git a/examples/bench_1/main_16k.c b/examples/bench_1/main_16k.c new file mode 100644 index 0000000000000000000000000000000000000000..2992d1f8cbba7b692176e23827cfa91b9d6f15a9 --- /dev/null +++ b/examples/bench_1/main_16k.c @@ -0,0 +1,36 @@ +#include "../../reowolf.h" +#include "../utility.c" +int main(int argc, char** argv) { + Arc_ProtocolDescription * pd = protocol_description_parse("", 0); + char logpath[] = "./1_16k.txt"; + Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1); + rw_err_peek(c); + + PortId putter, getter; + FfiSocketAddr local_addr = {{0, 0, 0, 0}, 8000}; + FfiSocketAddr peer_addr = {{8, 8, 8, 1}, 8001}; + rw_err_peek(c); + connector_add_udp_mediator_component(c, &putter, &getter, local_addr, peer_addr); + connector_connect(c, -1); + rw_err_peek(c); + + // Prepare a message to send + size_t msg_len = 16000; + char * msg_ptr = malloc(msg_len); + memset(msg_ptr, 42, msg_len); + + int i; + for(i=0; i<10; i++) { + connector_put_bytes(c, putter, msg_ptr, msg_len); + rw_err_peek(c); + connector_sync(c, -1); + rw_err_peek(c); + } + + printf("Exiting\n"); + protocol_description_destroy(pd); + connector_destroy(c); + free(msg_ptr); + sleep(1.0); + return 0; +} \ No newline at end of file diff --git a/examples/bench_2/main_16.c b/examples/bench_2/main_16.c new file mode 100644 index 0000000000000000000000000000000000000000..3aa6d6a4481cf2096d03b59db450650bd4b82db9 --- /dev/null +++ b/examples/bench_2/main_16.c @@ -0,0 +1,35 @@ +#include "../../reowolf.h" +#include "../utility.c" +int main(int argc, char** argv) { + Arc_ProtocolDescription * pd = protocol_description_parse("", 0); + char logpath[] = "./2_16.txt"; + Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1); + rw_err_peek(c); + + PortId putter, getter; + rw_err_peek(c); + connector_add_port_pair(c, &putter, &getter); + connector_connect(c, -1); + rw_err_peek(c); + + // Prepare a message to send + size_t msg_len = 16; + char * msg_ptr = malloc(msg_len); + memset(msg_ptr, 42, msg_len); + + int i; + for(i=0; i<10; i++) { + connector_put_bytes(c, putter, msg_ptr, msg_len); + connector_get(c, getter); + rw_err_peek(c); + connector_sync(c, -1); + rw_err_peek(c); + } + + printf("Exiting\n"); + protocol_description_destroy(pd); + connector_destroy(c); + free(msg_ptr); + sleep(1.0); + return 0; +} \ No newline at end of file diff --git a/examples/bench_2/main_16k.c b/examples/bench_2/main_16k.c new file mode 100644 index 0000000000000000000000000000000000000000..91322df69359eda16180ea6353d511a9bc133088 --- /dev/null +++ b/examples/bench_2/main_16k.c @@ -0,0 +1,35 @@ +#include "../../reowolf.h" +#include "../utility.c" +int main(int argc, char** argv) { + Arc_ProtocolDescription * pd = protocol_description_parse("", 0); + char logpath[] = "./2_16k.txt"; + Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1); + rw_err_peek(c); + + PortId putter, getter; + rw_err_peek(c); + connector_add_port_pair(c, &putter, &getter); + connector_connect(c, -1); + rw_err_peek(c); + + // Prepare a message to send + size_t msg_len = 16000; + char * msg_ptr = malloc(msg_len); + memset(msg_ptr, 42, msg_len); + + int i; + for(i=0; i<10; i++) { + connector_put_bytes(c, putter, msg_ptr, msg_len); + connector_get(c, getter); + rw_err_peek(c); + connector_sync(c, -1); + rw_err_peek(c); + } + + printf("Exiting\n"); + protocol_description_destroy(pd); + connector_destroy(c); + free(msg_ptr); + sleep(1.0); + return 0; +} \ No newline at end of file diff --git a/examples/bench_3/getter.c b/examples/bench_3/getter.c new file mode 100644 index 0000000000000000000000000000000000000000..7e9de8a9e4c753780e21df16a34ac328b7a21452 --- /dev/null +++ b/examples/bench_3/getter.c @@ -0,0 +1,29 @@ +#include "../../reowolf.h" +#include "../utility.c" +int main(int argc, char** argv) { + Arc_ProtocolDescription * pd = protocol_description_parse("", 0); + char logpath[] = "./3_16_getter.txt"; + Connector * c = connector_new_logging_with_id(pd, logpath, sizeof(logpath)-1, 0); + rw_err_peek(c); + + PortId getter; + FfiSocketAddr addr = {{192, 168, 1, 124}, 8009}; + rw_err_peek(c); + connector_add_net_port(c, &getter, addr, Polarity_Getter, EndpointPolarity_Passive); + connector_connect(c, -1); + rw_err_peek(c); + + int i; + for(i=0; i<10; i++) { + connector_get(c, getter); + rw_err_peek(c); + connector_sync(c, -1); + rw_err_peek(c); + } + + printf("Exiting\n"); + protocol_description_destroy(pd); + connector_destroy(c); + sleep(1.0); + return 0; +} \ No newline at end of file diff --git a/examples/bench_3/putter.c b/examples/bench_3/putter.c new file mode 100644 index 0000000000000000000000000000000000000000..4d08f71ca2e9a68570db242a08d39b62619a5db0 --- /dev/null +++ b/examples/bench_3/putter.c @@ -0,0 +1,35 @@ +#include "../../reowolf.h" +#include "../utility.c" +int main(int argc, char** argv) { + Arc_ProtocolDescription * pd = protocol_description_parse("", 0); + char logpath[] = "./3_16_putter.txt"; + Connector * c = connector_new_logging_with_id(pd, logpath, sizeof(logpath)-1, 1); + rw_err_peek(c); + + PortId putter; + FfiSocketAddr addr = {{192, 168, 1, 124}, 8009}; + rw_err_peek(c); + connector_add_net_port(c, &putter, addr, Polarity_Putter, EndpointPolarity_Active); + connector_connect(c, -1); + rw_err_peek(c); + + // Prepare a message to send + size_t msg_len = 16; + char * msg_ptr = malloc(msg_len); + memset(msg_ptr, 42, msg_len); + + int i; + for(i=0; i<10; i++) { + connector_put_bytes(c, putter, msg_ptr, msg_len); + rw_err_peek(c); + connector_sync(c, -1); + rw_err_peek(c); + } + + printf("Exiting\n"); + protocol_description_destroy(pd); + connector_destroy(c); + free(msg_ptr); + sleep(1.0); + return 0; +} \ No newline at end of file diff --git a/examples/make.py b/examples/make.py index ff0c60e9d2377f5d63ce0667adcd8ceec7b0a6e1..db5e3c140b7f3e454680fe6ca835ee575b2528da 100644 --- a/examples/make.py +++ b/examples/make.py @@ -1,6 +1,9 @@ -import os, glob, subprocess, time +import os, glob, subprocess, time, sys script_path = os.path.dirname(os.path.realpath(__file__)); for c_file in glob.glob(script_path + "/*/*.c", recursive=False): + if sys.platform != "linux" and sys.platform != "linux2" and "interop" in c_file: + print("Not Linux! skipping", c_file) + continue print("compiling", c_file) args = [ "gcc", # compiler diff --git a/reowolf.h b/reowolf.h index a889f774ede6e40b1b87fffcdf0276e5cea897f0..00b4878a26c0a0459ae4ad00130891ac941cf557 100644 --- a/reowolf.h +++ b/reowolf.h @@ -47,7 +47,9 @@ typedef uint32_t U32Suffix; typedef struct { ConnectorId connector_id; U32Suffix u32_suffix; -} PortId; +} Id; + +typedef Id PortId; typedef struct { uint8_t ipv4[4]; @@ -132,6 +134,11 @@ Connector *connector_new_logging(const Arc_ProtocolDescription *pd, const uint8_t *path_ptr, uintptr_t path_len); +Connector *connector_new_logging_with_id(const Arc_ProtocolDescription *pd, + const uint8_t *path_ptr, + uintptr_t path_len, + ConnectorId connector_id); + intptr_t connector_next_batch(Connector *connector); void connector_print_debug(Connector *connector); diff --git a/src/common.rs b/src/common.rs index be3216cc2148d4453dd477faf1831ff740689ae5..4a9fc999d47977834f69e25ca118e5462a7356d5 100644 --- a/src/common.rs +++ b/src/common.rs @@ -53,7 +53,7 @@ pub struct U32Stream { )] #[repr(transparent)] pub struct PortId(Id); -#[derive(Default, Clone, Ord, PartialOrd)] +#[derive(Default, Eq, PartialEq, Clone, Ord, PartialOrd)] pub struct Payload(Arc>); #[derive( Debug, Eq, PartialEq, Clone, Hash, Copy, Ord, PartialOrd, serde::Serialize, serde::Deserialize, @@ -89,15 +89,6 @@ pub(crate) enum SyncBlocker { pub(crate) struct DenseDebugHex<'a>(pub &'a [u8]); ///////////////////// IMPL ///////////////////// -impl Eq for Payload {} -impl PartialEq for Payload { - fn eq(&self, other: &Self) -> bool { - // self.as_slice() == other.as_slice() - let res = self.as_slice() == other.as_slice(); - println!("CMP RESULT IS.... {}", res); - res - } -} impl IdParts for Id { fn id_parts(self) -> (ConnectorId, U32Suffix) { (self.connector_id, self.u32_suffix) diff --git a/src/ffi/mod.rs b/src/ffi/mod.rs index e834e056417fb2dfbbb2e832d885776fbb669633..bc353a9a8a7bcd86aac1fb1ee11d230c8c2ef012 100644 --- a/src/ffi/mod.rs +++ b/src/ffi/mod.rs @@ -140,10 +140,11 @@ pub unsafe extern "C" fn protocol_description_clone( ///////////////////// CONNECTOR ////////////////////////// #[no_mangle] -pub unsafe extern "C" fn connector_new_logging( +pub unsafe extern "C" fn connector_new_logging_with_id( pd: &Arc, path_ptr: *const u8, path_len: usize, + connector_id: ConnectorId, ) -> *mut Connector { StoredError::tl_clear(); let path_bytes = &*slice_from_raw_parts(path_ptr, path_len); @@ -156,7 +157,6 @@ pub unsafe extern "C" fn connector_new_logging( }; match std::fs::File::create(path_str) { Ok(file) => { - let connector_id = Connector::random_id(); let file_logger = Box::new(FileLogger::new(connector_id, file)); let c = Connector::new(file_logger, pd.clone(), connector_id); Box::into_raw(Box::new(c)) @@ -167,6 +167,14 @@ pub unsafe extern "C" fn connector_new_logging( } } } +#[no_mangle] +pub unsafe extern "C" fn connector_new_logging( + pd: &Arc, + path_ptr: *const u8, + path_len: usize, +) -> *mut Connector { + connector_new_logging_with_id(pd, path_ptr, path_len, Connector::random_id()) +} #[no_mangle] pub unsafe extern "C" fn connector_print_debug(connector: &mut Connector) { diff --git a/src/macros.rs b/src/macros.rs index 4de21d183a57c7118fd88d946721607a92512703..ec1917ee0461819d67d420c31e395d5bc7dcac36 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -3,17 +3,17 @@ Change the definition of these macros to control the logging level statically */ macro_rules! log { - (@ENDPT, $logger:expr, $($arg:tt)*) => {{ - // ignore - }}; - (@COMM_NB, $logger:expr, $($arg:tt)*) => {{ + (@BENCH, $logger:expr, $($arg:tt)*) => {{ if let Some(w) = $logger.line_writer() { let _ = writeln!(w, $($arg)*); } }}; + (@ENDPT, $logger:expr, $($arg:tt)*) => {{ + // ignore + }}; ($logger:expr, $($arg:tt)*) => {{ - if let Some(w) = $logger.line_writer() { - let _ = writeln!(w, $($arg)*); - } + // if let Some(w) = $logger.line_writer() { + // let _ = writeln!(w, $($arg)*); + // } }}; } diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 8f334e9e53205752c55b7b5b795ae58f5343db4c..09c7fb9545a9b9275a89efef775b0cb6370d58de 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -204,12 +204,12 @@ impl Connector { use SyncError as Se; ////////////////////////////////// log!( - @COMM_NB, cu.inner.logger, "~~~ SYNC called with timeout {:?}; starting round {}", &timeout, comm.round_index ); + log!(@BENCH, cu.inner.logger, ""); // 1. run all proto components to Nonsync blockers // NOTE: original components are immutable until Decision::Success @@ -255,11 +255,11 @@ impl Connector { } } log!( - @COMM_NB, cu.inner.logger, "All {} proto components are now done with Nonsync phase", branching_proto_components.len(), ); + log!(@BENCH, cu.inner.logger, ""); // Create temp structures needed for the synchronous phase of the round let mut rctx = RoundCtx { @@ -286,7 +286,8 @@ impl Connector { getter_buffer: Default::default(), deadline: timeout.map(|to| Instant::now() + to), }; - log!(@COMM_NB, cu.inner.logger, "Round context structure initialized"); + log!(cu.inner.logger, "Round context structure initialized"); + log!(@BENCH, cu.inner.logger, ""); // Explore all native branches eagerly. Find solutions, buffer messages, etc. log!( @@ -361,7 +362,8 @@ impl Connector { // restore the invariant: !native_batches.is_empty() comm.native_batches.push(Default::default()); // Call to another big method; keep running this round until a distributed decision is reached - log!(@COMM_NB, cu.inner.logger, "Searching for decision..."); + log!(cu.inner.logger, "Searching for decision..."); + log!(@BENCH, cu.inner.logger, ""); let decision = Self::sync_reach_decision( cu, comm, @@ -369,7 +371,8 @@ impl Connector { &mut branching_proto_components, &mut rctx, )?; - log!(@COMM_NB, cu.inner.logger, "Committing to decision {:?}!", &decision); + log!(cu.inner.logger, "Committing to decision {:?}!", &decision); + log!(@BENCH, cu.inner.logger, ""); comm.endpoint_manager.udp_endpoints_round_end(&mut *cu.inner.logger, &decision)?; // propagate the decision to children @@ -411,7 +414,8 @@ impl Connector { Ok(Some(branching_native.collapse_with(&mut *cu.inner.logger, &predicate))) } }; - log!(@COMM_NB, cu.inner.logger, "Sync round ending! Cleaning up"); + log!(cu.inner.logger, "Sync round ending! Cleaning up"); + log!(@BENCH, cu.inner.logger, ""); ret }