Changeset - 9485a0862e90
[Not reviewed]
1 7 6
Christopher Esterhuyse - 5 years ago 2020-09-01 10:53:08
christopher.esterhuyse@gmail.com
cleaner logging, and more options for connector creation exposed to C API
13 files changed with 212 insertions and 31 deletions:
0 comments (0 inline, 0 general)
Cargo.toml
Show inline comments
 
@@ -40,11 +40,11 @@ lazy_static = "1.4.0"
 
crate-type = [
 
	"rlib", # for use as a Rust dependency. 
 
	"cdylib" # for FFI use, typically C.
 
]
 

	
 
[features]
 
default = ["ffi", "session_optimization"]
 
default = ["ffi"]
 
ffi = [] # see src/ffi/mod.rs
 
ffi_pseudo_socket_api = ["ffi", "libc", "os_socketaddr"]# see src/ffi/pseudo_socket_api.rs.
 
endpoint_logging = [] # see src/macros.rs
 
session_optimization = [] # see src/runtime/setup.rs
 
\ No newline at end of file
examples/bench_1/main_16.c
Show inline comments
 
file renamed from examples/bench_1/main.c to examples/bench_1/main_16.c
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	Arc_ProtocolDescription * pd = protocol_description_parse("", 0);
 
	char logpath[] = "./bench_1.txt";
 
	char logpath[] = "./1_16.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
	rw_err_peek(c);
 
	
 
	PortId putter, getter;
 
	FfiSocketAddr local_addr = {{0, 0, 0, 0}, 8000};
 
	FfiSocketAddr peer_addr =  {{8, 8, 8, 1}, 8001};
 
@@ -20,14 +20,12 @@ int main(int argc, char** argv) {
 
	memset(msg_ptr, 42, msg_len);
 
	
 
	int i;
 
	for(i=0; i<10; i++) {
 
		connector_put_bytes(c, putter, msg_ptr, msg_len);
 
		rw_err_peek(c);
 
		
 
		// ... reach new consistent state within 1000ms deadline.
 
		connector_sync(c, -1);
 
		rw_err_peek(c);
 
	}
 
	
 
	printf("Exiting\n");
 
	protocol_description_destroy(pd);
examples/bench_1/main_16k.c
Show inline comments
 
new file 100644
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	Arc_ProtocolDescription * pd = protocol_description_parse("", 0);
 
	char logpath[] = "./1_16k.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
	rw_err_peek(c);
 
	
 
	PortId putter, getter;
 
	FfiSocketAddr local_addr = {{0, 0, 0, 0}, 8000};
 
	FfiSocketAddr peer_addr =  {{8, 8, 8, 1}, 8001};
 
	rw_err_peek(c);
 
	connector_add_udp_mediator_component(c, &putter, &getter, local_addr, peer_addr);
 
	connector_connect(c, -1);
 
	rw_err_peek(c);
 
	
 
	// Prepare a message to send
 
	size_t msg_len = 16000;
 
	char * msg_ptr = malloc(msg_len);
 
	memset(msg_ptr, 42, msg_len);
 
	
 
	int i;
 
	for(i=0; i<10; i++) {
 
		connector_put_bytes(c, putter, msg_ptr, msg_len);
 
		rw_err_peek(c);
 
		connector_sync(c, -1);
 
		rw_err_peek(c);
 
	}
 
	
 
	printf("Exiting\n");
 
	protocol_description_destroy(pd);
 
	connector_destroy(c);
 
	free(msg_ptr);
 
	sleep(1.0);
 
	return 0;
 
}
 
\ No newline at end of file
examples/bench_2/main_16.c
Show inline comments
 
new file 100644
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	Arc_ProtocolDescription * pd = protocol_description_parse("", 0);
 
	char logpath[] = "./2_16.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
	rw_err_peek(c);
 
	
 
	PortId putter, getter;
 
	rw_err_peek(c);
 
	connector_add_port_pair(c, &putter, &getter);
 
	connector_connect(c, -1);
 
	rw_err_peek(c);
 
	
 
	// Prepare a message to send
 
	size_t msg_len = 16;
 
	char * msg_ptr = malloc(msg_len);
 
	memset(msg_ptr, 42, msg_len);
 
	
 
	int i;
 
	for(i=0; i<10; i++) {
 
		connector_put_bytes(c, putter, msg_ptr, msg_len);
 
		connector_get(c, getter);
 
		rw_err_peek(c);
 
		connector_sync(c, -1);
 
		rw_err_peek(c);
 
	}
 
	
 
	printf("Exiting\n");
 
	protocol_description_destroy(pd);
 
	connector_destroy(c);
 
	free(msg_ptr);
 
	sleep(1.0);
 
	return 0;
 
}
 
\ No newline at end of file
examples/bench_2/main_16k.c
Show inline comments
 
new file 100644
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	Arc_ProtocolDescription * pd = protocol_description_parse("", 0);
 
	char logpath[] = "./2_16k.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 
	rw_err_peek(c);
 
	
 
	PortId putter, getter;
 
	rw_err_peek(c);
 
	connector_add_port_pair(c, &putter, &getter);
 
	connector_connect(c, -1);
 
	rw_err_peek(c);
 
	
 
	// Prepare a message to send
 
	size_t msg_len = 16000;
 
	char * msg_ptr = malloc(msg_len);
 
	memset(msg_ptr, 42, msg_len);
 
	
 
	int i;
 
	for(i=0; i<10; i++) {
 
		connector_put_bytes(c, putter, msg_ptr, msg_len);
 
		connector_get(c, getter);
 
		rw_err_peek(c);
 
		connector_sync(c, -1);
 
		rw_err_peek(c);
 
	}
 
	
 
	printf("Exiting\n");
 
	protocol_description_destroy(pd);
 
	connector_destroy(c);
 
	free(msg_ptr);
 
	sleep(1.0);
 
	return 0;
 
}
 
\ No newline at end of file
examples/bench_3/getter.c
Show inline comments
 
new file 100644
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	Arc_ProtocolDescription * pd = protocol_description_parse("", 0);
 
	char logpath[] = "./3_16_getter.txt";
 
	Connector * c = connector_new_logging_with_id(pd, logpath, sizeof(logpath)-1, 0);
 
	rw_err_peek(c);
 
	
 
	PortId getter;
 
	FfiSocketAddr addr = {{192, 168, 1, 124}, 8009};
 
	rw_err_peek(c);
 
	connector_add_net_port(c, &getter, addr, Polarity_Getter, EndpointPolarity_Passive);
 
	connector_connect(c, -1);
 
	rw_err_peek(c);
 
	
 
	int i;
 
	for(i=0; i<10; i++) {
 
		connector_get(c, getter);
 
		rw_err_peek(c);
 
		connector_sync(c, -1);
 
		rw_err_peek(c);
 
	}
 
	
 
	printf("Exiting\n");
 
	protocol_description_destroy(pd);
 
	connector_destroy(c);
 
	sleep(1.0);
 
	return 0;
 
}
 
\ No newline at end of file
examples/bench_3/putter.c
Show inline comments
 
new file 100644
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	Arc_ProtocolDescription * pd = protocol_description_parse("", 0);
 
	char logpath[] = "./3_16_putter.txt";
 
	Connector * c = connector_new_logging_with_id(pd, logpath, sizeof(logpath)-1, 1);
 
	rw_err_peek(c);
 
	
 
	PortId putter;
 
	FfiSocketAddr addr = {{192, 168, 1, 124}, 8009};
 
	rw_err_peek(c);
 
	connector_add_net_port(c, &putter, addr, Polarity_Putter, EndpointPolarity_Active);
 
	connector_connect(c, -1);
 
	rw_err_peek(c);
 
	
 
	// Prepare a message to send
 
	size_t msg_len = 16;
 
	char * msg_ptr = malloc(msg_len);
 
	memset(msg_ptr, 42, msg_len);
 
	
 
	int i;
 
	for(i=0; i<10; i++) {
 
		connector_put_bytes(c, putter, msg_ptr, msg_len);
 
		rw_err_peek(c);
 
		connector_sync(c, -1);
 
		rw_err_peek(c);
 
	}
 
	
 
	printf("Exiting\n");
 
	protocol_description_destroy(pd);
 
	connector_destroy(c);
 
	free(msg_ptr);
 
	sleep(1.0);
 
	return 0;
 
}
 
\ No newline at end of file
examples/make.py
Show inline comments
 
import os, glob, subprocess, time
 
import os, glob, subprocess, time, sys
 
script_path = os.path.dirname(os.path.realpath(__file__));
 
for c_file in glob.glob(script_path + "/*/*.c", recursive=False):
 
  if sys.platform != "linux" and sys.platform != "linux2" and "interop" in c_file:
 
    print("Not Linux! skipping", c_file)
 
    continue
 
  print("compiling", c_file)
 
  args = [
 
    "gcc",          # compiler
 
    "-std=c11",     # C11 mode
 
    "-Wl,-R./",     # pass -R flag to linker: produce relocatable object
 
    c_file,         # input source file
reowolf.h
Show inline comments
 
@@ -44,13 +44,15 @@ typedef uint32_t ConnectorId;
 

	
 
typedef uint32_t U32Suffix;
 

	
 
typedef struct {
 
  ConnectorId connector_id;
 
  U32Suffix u32_suffix;
 
} PortId;
 
} Id;
 

	
 
typedef Id PortId;
 

	
 
typedef struct {
 
  uint8_t ipv4[4];
 
  uint16_t port;
 
} FfiSocketAddr;
 

	
 
@@ -129,12 +131,17 @@ const uint8_t *connector_gotten_bytes(Connector *connector, PortId port, uintptr
 
Connector *connector_new(const Arc_ProtocolDescription *pd);
 

	
 
Connector *connector_new_logging(const Arc_ProtocolDescription *pd,
 
                                 const uint8_t *path_ptr,
 
                                 uintptr_t path_len);
 

	
 
Connector *connector_new_logging_with_id(const Arc_ProtocolDescription *pd,
 
                                         const uint8_t *path_ptr,
 
                                         uintptr_t path_len,
 
                                         ConnectorId connector_id);
 

	
 
intptr_t connector_next_batch(Connector *connector);
 

	
 
void connector_print_debug(Connector *connector);
 

	
 
/**
 
 * Convenience function combining the functionalities of
src/common.rs
Show inline comments
 
@@ -50,13 +50,13 @@ pub struct U32Stream {
 
}
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
#[repr(transparent)]
 
pub struct PortId(Id);
 
#[derive(Default, Clone, Ord, PartialOrd)]
 
#[derive(Default, Eq, PartialEq, Clone, Ord, PartialOrd)]
 
pub struct Payload(Arc<Vec<u8>>);
 
#[derive(
 
    Debug, Eq, PartialEq, Clone, Hash, Copy, Ord, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
#[repr(C)]
 
pub enum Polarity {
 
@@ -86,21 +86,12 @@ pub(crate) enum SyncBlocker {
 
    PutMsg(PortId, Payload),
 
    NondetChoice { n: u16 },
 
}
 
pub(crate) struct DenseDebugHex<'a>(pub &'a [u8]);
 

	
 
///////////////////// IMPL /////////////////////
 
impl Eq for Payload {}
 
impl PartialEq for Payload {
 
    fn eq(&self, other: &Self) -> bool {
 
        // self.as_slice() == other.as_slice()
 
        let res = self.as_slice() == other.as_slice();
 
        println!("CMP RESULT IS.... {}", res);
 
        res
 
    }
 
}
 
impl IdParts for Id {
 
    fn id_parts(self) -> (ConnectorId, U32Suffix) {
 
        (self.connector_id, self.u32_suffix)
 
    }
 
}
 
impl IdParts for PortId {
src/ffi/mod.rs
Show inline comments
 
@@ -137,39 +137,47 @@ pub unsafe extern "C" fn protocol_description_clone(
 
    Box::into_raw(Box::new(pd.clone()))
 
}
 

	
 
///////////////////// CONNECTOR //////////////////////////
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_new_logging(
 
pub unsafe extern "C" fn connector_new_logging_with_id(
 
    pd: &Arc<ProtocolDescription>,
 
    path_ptr: *const u8,
 
    path_len: usize,
 
    connector_id: ConnectorId,
 
) -> *mut Connector {
 
    StoredError::tl_clear();
 
    let path_bytes = &*slice_from_raw_parts(path_ptr, path_len);
 
    let path_str = match std::str::from_utf8(path_bytes) {
 
        Ok(path_str) => path_str,
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            return std::ptr::null_mut();
 
        }
 
    };
 
    match std::fs::File::create(path_str) {
 
        Ok(file) => {
 
            let connector_id = Connector::random_id();
 
            let file_logger = Box::new(FileLogger::new(connector_id, file));
 
            let c = Connector::new(file_logger, pd.clone(), connector_id);
 
            Box::into_raw(Box::new(c))
 
        }
 
        Err(err) => {
 
            StoredError::tl_debug_store(&err);
 
            std::ptr::null_mut()
 
        }
 
    }
 
}
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_new_logging(
 
    pd: &Arc<ProtocolDescription>,
 
    path_ptr: *const u8,
 
    path_len: usize,
 
) -> *mut Connector {
 
    connector_new_logging_with_id(pd, path_ptr, path_len, Connector::random_id())
 
}
 

	
 
#[no_mangle]
 
pub unsafe extern "C" fn connector_print_debug(connector: &mut Connector) {
 
    println!("Debug print dump {:#?}", connector);
 
}
 

	
src/macros.rs
Show inline comments
 
/*
 
Change the definition of these macros to control the logging level statically
 
*/
 

	
 
macro_rules! log {
 
    (@ENDPT, $logger:expr, $($arg:tt)*) => {{
 
        // ignore
 
    }};
 
    (@COMM_NB, $logger:expr, $($arg:tt)*) => {{
 
    (@BENCH, $logger:expr, $($arg:tt)*) => {{
 
        if let Some(w) = $logger.line_writer() {
 
            let _ = writeln!(w, $($arg)*);
 
        }
 
    }};
 
    (@ENDPT, $logger:expr, $($arg:tt)*) => {{
 
        // ignore
 
    }};
 
    ($logger:expr, $($arg:tt)*) => {{
 
        if let Some(w) = $logger.line_writer() {
 
            let _ = writeln!(w, $($arg)*);
 
        }
 
        // if let Some(w) = $logger.line_writer() {
 
        //     let _ = writeln!(w, $($arg)*);
 
        // }
 
    }};
 
}
src/runtime/communication.rs
Show inline comments
 
@@ -201,18 +201,18 @@ impl Connector {
 
        timeout: Option<Duration>,
 
    ) -> Result<Option<RoundOk>, SyncError> {
 
        //////////////////////////////////
 
        use SyncError as Se;
 
        //////////////////////////////////
 
        log!(
 
            @COMM_NB,
 
            cu.inner.logger,
 
            "~~~ SYNC called with timeout {:?}; starting round {}",
 
            &timeout,
 
            comm.round_index
 
        );
 
        log!(@BENCH, cu.inner.logger, "");
 

	
 
        // 1. run all proto components to Nonsync blockers
 
        // NOTE: original components are immutable until Decision::Success
 
        let mut branching_proto_components =
 
            HashMap::<ProtoComponentId, BranchingProtoComponent>::default();
 
        let mut unrun_components: Vec<(ProtoComponentId, ProtoComponent)> =
 
@@ -252,17 +252,17 @@ impl Connector {
 
                    branching_proto_components
 
                        .insert(proto_component_id, BranchingProtoComponent::initial(component));
 
                }
 
            }
 
        }
 
        log!(
 
            @COMM_NB,
 
            cu.inner.logger,
 
            "All {} proto components are now done with Nonsync phase",
 
            branching_proto_components.len(),
 
        );
 
        log!(@BENCH, cu.inner.logger, "");
 

	
 
        // Create temp structures needed for the synchronous phase of the round
 
        let mut rctx = RoundCtx {
 
            solution_storage: {
 
                let n = std::iter::once(SubtreeId::LocalComponent(ComponentId::Native));
 
                let c = cu
 
@@ -283,13 +283,14 @@ impl Connector {
 
                SolutionStorage::new(subtree_id_iter)
 
            },
 
            spec_var_stream: cu.inner.id_manager.new_spec_var_stream(),
 
            getter_buffer: Default::default(),
 
            deadline: timeout.map(|to| Instant::now() + to),
 
        };
 
        log!(@COMM_NB, cu.inner.logger, "Round context structure initialized");
 
        log!(cu.inner.logger, "Round context structure initialized");
 
        log!(@BENCH, cu.inner.logger, "");
 

	
 
        // Explore all native branches eagerly. Find solutions, buffer messages, etc.
 
        log!(
 
            cu.inner.logger,
 
            "Translating {} native batches into branches...",
 
            comm.native_batches.len()
 
@@ -358,21 +359,23 @@ impl Connector {
 
                unreachable!()
 
            }
 
        }
 
        // restore the invariant: !native_batches.is_empty()
 
        comm.native_batches.push(Default::default());
 
        // Call to another big method; keep running this round until a distributed decision is reached
 
        log!(@COMM_NB, cu.inner.logger, "Searching for decision...");
 
        log!(cu.inner.logger, "Searching for decision...");
 
        log!(@BENCH, cu.inner.logger, "");
 
        let decision = Self::sync_reach_decision(
 
            cu,
 
            comm,
 
            &mut branching_native,
 
            &mut branching_proto_components,
 
            &mut rctx,
 
        )?;
 
        log!(@COMM_NB, cu.inner.logger, "Committing to decision {:?}!", &decision);
 
        log!(cu.inner.logger, "Committing to decision {:?}!", &decision);
 
        log!(@BENCH, cu.inner.logger, "");
 
        comm.endpoint_manager.udp_endpoints_round_end(&mut *cu.inner.logger, &decision)?;
 

	
 
        // propagate the decision to children
 
        let msg = Msg::CommMsg(CommMsg {
 
            round_index: comm.round_index,
 
            contents: CommMsgContents::CommCtrl(CommCtrlMsg::Announce {
 
@@ -408,13 +411,14 @@ impl Connector {
 
                    cu.proto_components.keys()
 
                );
 
                // consume native
 
                Ok(Some(branching_native.collapse_with(&mut *cu.inner.logger, &predicate)))
 
            }
 
        };
 
        log!(@COMM_NB, cu.inner.logger, "Sync round ending! Cleaning up");
 
        log!(cu.inner.logger, "Sync round ending! Cleaning up");
 
        log!(@BENCH, cu.inner.logger, "");
 
        ret
 
    }
 

	
 
    fn sync_reach_decision(
 
        cu: &mut ConnectorUnphased,
 
        comm: &mut ConnectorCommunication,
0 comments (0 inline, 0 general)