Changeset - 9485a0862e90
[Not reviewed]
1 7 6
Christopher Esterhuyse - 5 years ago 2020-09-01 10:53:08
christopher.esterhuyse@gmail.com
cleaner logging, and more options for connector creation exposed to C API
13 files changed with 212 insertions and 31 deletions:
0 comments (0 inline, 0 general)
Cargo.toml
Show inline comments
 
@@ -43,7 +43,7 @@ crate-type = [
 
]
 

	
 
[features]
 
default = ["ffi", "session_optimization"]
 
default = ["ffi"]
 
ffi = [] # see src/ffi/mod.rs
 
ffi_pseudo_socket_api = ["ffi", "libc", "os_socketaddr"]# see src/ffi/pseudo_socket_api.rs.
 
endpoint_logging = [] # see src/macros.rs
examples/bench_1/main_16.c
Show inline comments
 
file renamed from examples/bench_1/main.c to examples/bench_1/main_16.c
 
@@ -2,7 +2,7 @@
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	Arc_ProtocolDescription * pd = protocol_description_parse("", 0);
 
	char logpath[] = "./bench_1.txt";
 
	char logpath[] = "./1_16.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
	rw_err_peek(c);
 
	
 
@@ -23,8 +23,6 @@ int main(int argc, char** argv) {
 
	for(i=0; i<10; i++) {
 
		connector_put_bytes(c, putter, msg_ptr, msg_len);
 
		rw_err_peek(c);
 
		
 
		// ... reach new consistent state within 1000ms deadline.
 
		connector_sync(c, -1);
 
		rw_err_peek(c);
 
	}
examples/bench_1/main_16k.c
Show inline comments
 
new file 100644
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	Arc_ProtocolDescription * pd = protocol_description_parse("", 0);
 
	char logpath[] = "./1_16k.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
	rw_err_peek(c);
 
	
 
	PortId putter, getter;
 
	FfiSocketAddr local_addr = {{0, 0, 0, 0}, 8000};
 
	FfiSocketAddr peer_addr =  {{8, 8, 8, 1}, 8001};
 
	rw_err_peek(c);
 
	connector_add_udp_mediator_component(c, &putter, &getter, local_addr, peer_addr);
 
	connector_connect(c, -1);
 
	rw_err_peek(c);
 
	
 
	// Prepare a message to send
 
	size_t msg_len = 16000;
 
	char * msg_ptr = malloc(msg_len);
 
	memset(msg_ptr, 42, msg_len);
 
	
 
	int i;
 
	for(i=0; i<10; i++) {
 
		connector_put_bytes(c, putter, msg_ptr, msg_len);
 
		rw_err_peek(c);
 
		connector_sync(c, -1);
 
		rw_err_peek(c);
 
	}
 
	
 
	printf("Exiting\n");
 
	protocol_description_destroy(pd);
 
	connector_destroy(c);
 
	free(msg_ptr);
 
	sleep(1.0);
 
	return 0;
 
}
 
\ No newline at end of file
examples/bench_2/main_16.c
Show inline comments
 
new file 100644
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	Arc_ProtocolDescription * pd = protocol_description_parse("", 0);
 
	char logpath[] = "./2_16.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
	rw_err_peek(c);
 
	
 
	PortId putter, getter;
 
	rw_err_peek(c);
 
	connector_add_port_pair(c, &putter, &getter);
 
	connector_connect(c, -1);
 
	rw_err_peek(c);
 
	
 
	// Prepare a message to send
 
	size_t msg_len = 16;
 
	char * msg_ptr = malloc(msg_len);
 
	memset(msg_ptr, 42, msg_len);
 
	
 
	int i;
 
	for(i=0; i<10; i++) {
 
		connector_put_bytes(c, putter, msg_ptr, msg_len);
 
		connector_get(c, getter);
 
		rw_err_peek(c);
 
		connector_sync(c, -1);
 
		rw_err_peek(c);
 
	}
 
	
 
	printf("Exiting\n");
 
	protocol_description_destroy(pd);
 
	connector_destroy(c);
 
	free(msg_ptr);
 
	sleep(1.0);
 
	return 0;
 
}
 
\ No newline at end of file
examples/bench_2/main_16k.c
Show inline comments
 
new file 100644
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	Arc_ProtocolDescription * pd = protocol_description_parse("", 0);
 
	char logpath[] = "./2_16k.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
	rw_err_peek(c);
 
	
 
	PortId putter, getter;
 
	rw_err_peek(c);
 
	connector_add_port_pair(c, &putter, &getter);
 
	connector_connect(c, -1);
 
	rw_err_peek(c);
 
	
 
	// Prepare a message to send
 
	size_t msg_len = 16000;
 
	char * msg_ptr = malloc(msg_len);
 
	memset(msg_ptr, 42, msg_len);
 
	
 
	int i;
 
	for(i=0; i<10; i++) {
 
		connector_put_bytes(c, putter, msg_ptr, msg_len);
 
		connector_get(c, getter);
 
		rw_err_peek(c);
 
		connector_sync(c, -1);
 
		rw_err_peek(c);
 
	}
 
	
 
	printf("Exiting\n");
 
	protocol_description_destroy(pd);
 
	connector_destroy(c);
 
	free(msg_ptr);
 
	sleep(1.0);
 
	return 0;
 
}
 
\ No newline at end of file
examples/bench_3/getter.c
Show inline comments
 
new file 100644
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	Arc_ProtocolDescription * pd = protocol_description_parse("", 0);
 
	char logpath[] = "./3_16_getter.txt";
 
	Connector * c = connector_new_logging_with_id(pd, logpath, sizeof(logpath)-1, 0);
 
	rw_err_peek(c);
 
	
 
	PortId getter;
 
	FfiSocketAddr addr = {{192, 168, 1, 124}, 8009};
 
	rw_err_peek(c);
 
	connector_add_net_port(c, &getter, addr, Polarity_Getter, EndpointPolarity_Passive);
 
	connector_connect(c, -1);
 
	rw_err_peek(c);
 
	
 
	int i;
 
	for(i=0; i<10; i++) {
 
		connector_get(c, getter);
 
		rw_err_peek(c);
 
		connector_sync(c, -1);
 
		rw_err_peek(c);
 
	}
 
	
 
	printf("Exiting\n");
 
	protocol_description_destroy(pd);
 
	connector_destroy(c);
 
	sleep(1.0);
 
	return 0;
 
}
 
\ No newline at end of file
examples/bench_3/putter.c
Show inline comments
 
new file 100644
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	Arc_ProtocolDescription * pd = protocol_description_parse("", 0);
 
	char logpath[] = "./3_16_putter.txt";
 
	Connector * c = connector_new_logging_with_id(pd, logpath, sizeof(logpath)-1, 1);
 
	rw_err_peek(c);
 
	
 
	PortId putter;
 
	FfiSocketAddr addr = {{192, 168, 1, 124}, 8009};
 
	rw_err_peek(c);
 
	connector_add_net_port(c, &putter, addr, Polarity_Putter, EndpointPolarity_Active);
 
	connector_connect(c, -1);
 
	rw_err_peek(c);
 
	
 
	// Prepare a message to send
 
	size_t msg_len = 16;
 
	char * msg_ptr = malloc(msg_len);
 
	memset(msg_ptr, 42, msg_len);
 
	
 
	int i;
 
	for(i=0; i<10; i++) {
 
		connector_put_bytes(c, putter, msg_ptr, msg_len);
 
		rw_err_peek(c);
 
		connector_sync(c, -1);
 
		rw_err_peek(c);
 
	}
 
	
 
	printf("Exiting\n");
 
	protocol_description_destroy(pd);
 
	connector_destroy(c);
 
	free(msg_ptr);
 
	sleep(1.0);
 
	return 0;
 
}
 
\ No newline at end of file
examples/make.py
Show inline comments
 
import os, glob, subprocess, time
 
import os, glob, subprocess, time, sys
 
script_path = os.path.dirname(os.path.realpath(__file__));
 
for c_file in glob.glob(script_path + "/*/*.c", recursive=False):
 
  if sys.platform != "linux" and sys.platform != "linux2" and "interop" in c_file:
 
    print("Not Linux! skipping", c_file)
 
    continue
 
  print("compiling", c_file)
 
  args = [
 
    "gcc",          # compiler
reowolf.h
Show inline comments
 
@@ -47,7 +47,9 @@ typedef uint32_t U32Suffix;
 
typedef struct {
 
  ConnectorId connector_id;
 
  U32Suffix u32_suffix;
 
} PortId;
 
} Id;
 

	
 
typedef Id PortId;
 

	
 
typedef struct {
 
  uint8_t ipv4[4];
 
@@ -132,6 +134,11 @@ Connector *connector_new_logging(const Arc_ProtocolDescription *pd,
 
                                 const uint8_t *path_ptr,
 
                                 uintptr_t path_len);
 

	
 
Connector *connector_new_logging_with_id(const Arc_ProtocolDescription *pd,
 
                                         const uint8_t *path_ptr,
 
                                         uintptr_t path_len,
 
                                         ConnectorId connector_id);
 

	
 
intptr_t connector_next_batch(Connector *connector);
 

	
 
void connector_print_debug(Connector *connector);
src/common.rs
Show inline comments
 
@@ -53,7 +53,7 @@ pub struct U32Stream {
 
)]
 
#[repr(transparent)]
 
pub struct PortId(Id);
 
#[derive(Default, Clone, Ord, PartialOrd)]
 
#[derive(Default, Eq, PartialEq, Clone, Ord, PartialOrd)]
 
pub struct Payload(Arc<Vec<u8>>);
 
#[derive(
 
    Debug, Eq, PartialEq, Clone, Hash, Copy, Ord, PartialOrd, serde::Serialize, serde::Deserialize,
 
@@ -89,15 +89,6 @@ pub(crate) enum SyncBlocker {
 
pub(crate) struct DenseDebugHex<'a>(pub &'a [u8]);
 

	
 
///////////////////// IMPL /////////////////////
 
impl Eq for Payload {}
 
impl PartialEq for Payload {
 
    fn eq(&self, other: &Self) -> bool {
 
        // self.as_slice() == other.as_slice()
 
        let res = self.as_slice() == other.as_slice();
 
        println!("CMP RESULT IS.... {}", res);
 
        res
 
    }
 
}
 
impl IdParts for Id {
 
    fn id_parts(self) -> (ConnectorId, U32Suffix) {
 
        (self.connector_id, self.u32_suffix)
src/ffi/mod.rs
Show inline comments
 
@@ -140,10 +140,11 @@ pub unsafe extern "C" fn protocol_description_clone(
 
///////////////////// CONNECTOR //////////////////////////
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_new_logging(
 
pub unsafe extern "C" fn connector_new_logging_with_id(
 
    pd: &Arc<ProtocolDescription>,
 
    path_ptr: *const u8,
 
    path_len: usize,
 
    connector_id: ConnectorId,
 
) -> *mut Connector {
 
    StoredError::tl_clear();
 
    let path_bytes = &*slice_from_raw_parts(path_ptr, path_len);
 
@@ -156,7 +157,6 @@ pub unsafe extern "C" fn connector_new_logging(
 
    };
 
    match std::fs::File::create(path_str) {
 
        Ok(file) => {
 
            let connector_id = Connector::random_id();
 
            let file_logger = Box::new(FileLogger::new(connector_id, file));
 
            let c = Connector::new(file_logger, pd.clone(), connector_id);
 
            Box::into_raw(Box::new(c))
 
@@ -167,6 +167,14 @@ pub unsafe extern "C" fn connector_new_logging(
 
        }
 
    }
 
}
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_new_logging(
 
    pd: &Arc<ProtocolDescription>,
 
    path_ptr: *const u8,
 
    path_len: usize,
 
) -> *mut Connector {
 
    connector_new_logging_with_id(pd, path_ptr, path_len, Connector::random_id())
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_print_debug(connector: &mut Connector) {
src/macros.rs
Show inline comments
 
@@ -3,17 +3,17 @@ Change the definition of these macros to control the logging level statically
 
*/
 

	
 
macro_rules! log {
 
    (@ENDPT, $logger:expr, $($arg:tt)*) => {{
 
        // ignore
 
    }};
 
    (@COMM_NB, $logger:expr, $($arg:tt)*) => {{
 
    (@BENCH, $logger:expr, $($arg:tt)*) => {{
 
        if let Some(w) = $logger.line_writer() {
 
            let _ = writeln!(w, $($arg)*);
 
        }
 
    }};
 
    (@ENDPT, $logger:expr, $($arg:tt)*) => {{
 
        // ignore
 
    }};
 
    ($logger:expr, $($arg:tt)*) => {{
 
        if let Some(w) = $logger.line_writer() {
 
            let _ = writeln!(w, $($arg)*);
 
        }
 
        // if let Some(w) = $logger.line_writer() {
 
        //     let _ = writeln!(w, $($arg)*);
 
        // }
 
    }};
 
}
src/runtime/communication.rs
Show inline comments
 
@@ -204,12 +204,12 @@ impl Connector {
 
        use SyncError as Se;
 
        //////////////////////////////////
 
        log!(
 
            @COMM_NB,
 
            cu.inner.logger,
 
            "~~~ SYNC called with timeout {:?}; starting round {}",
 
            &timeout,
 
            comm.round_index
 
        );
 
        log!(@BENCH, cu.inner.logger, "");
 

	
 
        // 1. run all proto components to Nonsync blockers
 
        // NOTE: original components are immutable until Decision::Success
 
@@ -255,11 +255,11 @@ impl Connector {
 
            }
 
        }
 
        log!(
 
            @COMM_NB,
 
            cu.inner.logger,
 
            "All {} proto components are now done with Nonsync phase",
 
            branching_proto_components.len(),
 
        );
 
        log!(@BENCH, cu.inner.logger, "");
 

	
 
        // Create temp structures needed for the synchronous phase of the round
 
        let mut rctx = RoundCtx {
 
@@ -286,7 +286,8 @@ impl Connector {
 
            getter_buffer: Default::default(),
 
            deadline: timeout.map(|to| Instant::now() + to),
 
        };
 
        log!(@COMM_NB, cu.inner.logger, "Round context structure initialized");
 
        log!(cu.inner.logger, "Round context structure initialized");
 
        log!(@BENCH, cu.inner.logger, "");
 

	
 
        // Explore all native branches eagerly. Find solutions, buffer messages, etc.
 
        log!(
 
@@ -361,7 +362,8 @@ impl Connector {
 
        // restore the invariant: !native_batches.is_empty()
 
        comm.native_batches.push(Default::default());
 
        // Call to another big method; keep running this round until a distributed decision is reached
 
        log!(@COMM_NB, cu.inner.logger, "Searching for decision...");
 
        log!(cu.inner.logger, "Searching for decision...");
 
        log!(@BENCH, cu.inner.logger, "");
 
        let decision = Self::sync_reach_decision(
 
            cu,
 
            comm,
 
@@ -369,7 +371,8 @@ impl Connector {
 
            &mut branching_proto_components,
 
            &mut rctx,
 
        )?;
 
        log!(@COMM_NB, cu.inner.logger, "Committing to decision {:?}!", &decision);
 
        log!(cu.inner.logger, "Committing to decision {:?}!", &decision);
 
        log!(@BENCH, cu.inner.logger, "");
 
        comm.endpoint_manager.udp_endpoints_round_end(&mut *cu.inner.logger, &decision)?;
 

	
 
        // propagate the decision to children
 
@@ -411,7 +414,8 @@ impl Connector {
 
                Ok(Some(branching_native.collapse_with(&mut *cu.inner.logger, &predicate)))
 
            }
 
        };
 
        log!(@COMM_NB, cu.inner.logger, "Sync round ending! Cleaning up");
 
        log!(cu.inner.logger, "Sync round ending! Cleaning up");
 
        log!(@BENCH, cu.inner.logger, "");
 
        ret
 
    }
 

	
0 comments (0 inline, 0 general)