diff --git a/Cargo.toml b/Cargo.toml index e555a0d97ebe3d1cb46ebae6cdf79c9fd3a41ca2..d6978deb8b50cbaffd0d11dd124dccbef5758d6e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,7 +43,7 @@ crate-type = [ ] [features] -default = ["ffi"] +default = ["ffi", "session_optimization"] ffi = [] # see src/ffi/mod.rs ffi_pseudo_socket_api = ["ffi", "libc", "os_socketaddr"]# see src/ffi/pseudo_socket_api.rs. endpoint_logging = [] # see src/macros.rs diff --git a/examples/bench_1/main.c b/examples/bench_01/main.c similarity index 100% rename from examples/bench_1/main.c rename to examples/bench_01/main.c diff --git a/examples/bench_2/main.c b/examples/bench_02/main.c similarity index 100% rename from examples/bench_2/main.c rename to examples/bench_02/main.c diff --git a/examples/bench_3/main.c b/examples/bench_03/main.c similarity index 100% rename from examples/bench_3/main.c rename to examples/bench_03/main.c diff --git a/examples/bench_4/main.c b/examples/bench_04/main.c similarity index 100% rename from examples/bench_4/main.c rename to examples/bench_04/main.c diff --git a/examples/bench_5/main.c b/examples/bench_05/main.c similarity index 100% rename from examples/bench_5/main.c rename to examples/bench_05/main.c diff --git a/examples/bench_6/main.c b/examples/bench_06/main.c similarity index 100% rename from examples/bench_6/main.c rename to examples/bench_06/main.c diff --git a/examples/bench_7/main.c b/examples/bench_07/main.c similarity index 100% rename from examples/bench_7/main.c rename to examples/bench_07/main.c diff --git a/examples/bench_8/main.c b/examples/bench_08/main.c similarity index 100% rename from examples/bench_8/main.c rename to examples/bench_08/main.c diff --git a/examples/bench_9/main.c b/examples/bench_09/main.c similarity index 100% rename from examples/bench_9/main.c rename to examples/bench_09/main.c diff --git a/examples/bench_23/main.c b/examples/bench_23/main.c new file mode 100644 index 0000000000000000000000000000000000000000..0e02672f40a12dc197bee343f23f56f4cbe300c2 --- /dev/null +++ b/examples/bench_23/main.c @@ -0,0 +1,122 @@ +#include +#include "../../reowolf.h" +#include "../utility.c" +int main(int argc, char** argv) { + int i; + + // unsigned char pdl[] = "\ + // primitive xrouter(in a, out b, out c) {\ + // while(true) synchronous {\ + // if(fires(a)) {\ + // if(fires(b)) put(b, get(a));\ + // else put(c, get(a));\ + // }\ + // }\ + // }" + // ; + unsigned char pdl[] = "\ + primitive lossy(in a, out b) {\ + while(true) synchronous {\ + if(fires(a)) {\ + msg m = get(a);\ + if(fires(b)) put(b, m);\ + }\ + }\ + }\ + primitive sync_drain(in a, in b) {\ + while(true) synchronous {\ + if(fires(a)) {\ + get(a);\ + get(b);\ + }\ + }\ + }\ + composite xrouter(in a, out b, out c) {\ + channel d -> e;\ + channel f -> g;\ + channel h -> i;\ + channel j -> k;\ + channel l -> m;\ + channel n -> o;\ + channel p -> q;\ + channel r -> s;\ + channel t -> u;\ + new replicator(a, d, f);\ + new replicator(g, t, h);\ + new lossy(e, l);\ + new lossy(i, j);\ + new replicator(m, b, p);\ + new replicator(k, n, c);\ + new merger(q, o, r);\ + new sync_drain(u, s);\ + }" + ; + Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1); + Connector * c = connector_new_with_id(pd, 0); + printf("Error str `%s`\n", reowolf_error_peek(NULL)); + + PortId ports[6]; + for(i=0; i<3; i++) { + connector_add_port_pair(c, &ports[2*i], &ports[2*i+1]); + } + // [native~~~~~~~~~~] + // 0 1 2 3 4 5 + // | ^ | ^ | ^ + // `--` `--` `--` + char ident[] = "xrouter"; + connector_add_component( + c, + ident, + sizeof(ident)-1, + (PortId[]) { ports[1], ports[2], ports[4] }, + 3); + printf("Error str `%s`\n", reowolf_error_peek(NULL)); + + // [native~~~~~~~~~~] + // 0 3 5 + // V ^ ^ + // 1 2 4 + // [xrouter~~~~~~~~~] + connector_connect(c, -1); + printf("Connect OK!\n"); + + int msg_len = 1000; + char * msg = malloc(msg_len); + memset(msg, 42, msg_len); + + { + clock_t begin = clock(); + for (i=0; i<100000; i++) { + connector_put_bytes(c, ports[0], msg, msg_len); + connector_get(c, ports[3]); + connector_sync(c, -1); + } + clock_t end = clock(); + double time_spent = (double)(end - begin) / CLOCKS_PER_SEC; + printf("First: %f\n", time_spent); + } + { + clock_t begin = clock(); + for (i=0; i<100000; i++) { + connector_put_bytes(c, ports[0], msg, msg_len); + connector_get(c, ports[5]); + connector_sync(c, -1); + } + clock_t end = clock(); + double time_spent = (double)(end - begin) / CLOCKS_PER_SEC; + printf("Second: %f\n", time_spent); + } + { + clock_t begin = clock(); + for (i=0; i<100000; i++) { + connector_put_bytes(c, ports[0], msg, msg_len); + connector_get(c, ports[3 + (i%2)*2]); + connector_sync(c, -1); + } + clock_t end = clock(); + double time_spent = (double)(end - begin) / CLOCKS_PER_SEC; + printf("Alternating: %f\n", time_spent); + } + free(msg); + return 0; +} \ No newline at end of file diff --git a/examples/bench_24/main.c b/examples/bench_24/main.c new file mode 100644 index 0000000000000000000000000000000000000000..a8558558a022b50f9918999e806a7b1350cd6d93 --- /dev/null +++ b/examples/bench_24/main.c @@ -0,0 +1,78 @@ +#include +#include "../../reowolf.h" +#include "../utility.c" +int main(int argc, char** argv) { + int i, j; + + unsigned char pdl[] = "\ + primitive fifo1_init(msg m, in a, out b) {\ + while(true) synchronous {\ + if(m != null && fires(b)) {\ + put(b, m);\ + m = null;\ + } else if (m == null && fires(a)) {\ + m = get(a);\ + }\ + }\ + }\ + composite fifo1_full(in a, out b) {\ + new fifo1_init(create(0), a, b);\ + }\ + composite fifo1(in a, out b) {\ + new fifo1_init(null, a, b);\ + }\ + composite sequencer3(out a, out b, out c) {\ + channel d -> e;\ + channel f -> g;\ + channel h -> i;\ + channel j -> k;\ + channel l -> m;\ + channel n -> o;\ + new fifo1_full(o, d);\ + new replicator(e, f, a);\ + new fifo1(g, h);\ + new replicator(i, j, b);\ + new fifo1(k, l);\ + new replicator(m, n, c);\ + }" + ; + // unsigned char pdl[] = "\ + // primitive sequencer3(out a, out b, out c) {\ + // int i = 0;\ + // while(true) synchronous {\ + // out to = a;\ + // if (i==1) to = b;\ + // else if(i==2) to = c;\ + // if(fires(to)) {\ + // put(to, create(0));\ + // i = (i + 1)%3;\ + // }\ + // }\ + // }" + ; + Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1); + Connector * c = connector_new_with_id(pd, 0); + printf("Error str `%s`\n", reowolf_error_peek(NULL)); + + PortId putters[3], getters[3]; + for(i=0; i<3; i++) { + connector_add_port_pair(c, &putters[i], &getters[i]); + } + char ident[] = "sequencer3"; + connector_add_component(c, ident, sizeof(ident)-1, putters, 3); + printf("Error str `%s`\n", reowolf_error_peek(NULL)); + connector_connect(c, -1); + printf("Connect OK!\n"); + + clock_t begin = clock(); + for (i=0; i<1000000/3; i++) { + for (j=0; j<3; j++) { + connector_get(c, getters[j]); + connector_sync(c, -1); + } + } + clock_t end = clock(); + double time_spent = (double)(end - begin) / CLOCKS_PER_SEC; + printf("Time taken: %f\n", time_spent); + return 0; +} \ No newline at end of file diff --git a/examples/bench_25/main.c b/examples/bench_25/main.c new file mode 100644 index 0000000000000000000000000000000000000000..fff5774e8558f366b1d23c3ab6cd58b936b11ddc --- /dev/null +++ b/examples/bench_25/main.c @@ -0,0 +1,60 @@ +#include +#include "../../reowolf.h" +#include "../utility.c" +int main(int argc, char** argv) { + int i, j, syncs; + syncs = atoi(argv[1]); + printf("syncs %d\n", syncs); + + unsigned char pdl[] = ""; + Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1); + printf("Error str `%s`\n", reowolf_error_peek(NULL)); + char logpath[] = "./bench_11.txt"; + Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1); + + PortId native_putter, native_getter; + connector_add_port_pair(c, &native_putter, &native_getter); + for (i=0; i +#include "../../reowolf.h" +#include "../utility.c" +FfiSocketAddr addr_new(const uint8_t ipv4[4], uint16_t port) { + FfiSocketAddr x; + x.port = port; + int i; + for(i=0; i<4; i++) { + x.ipv4[i] = ipv4[i]; + } + return x; +} +int main(int argc, char** argv) { + int i, rounds; + char leader; + uint8_t ipv4[4] = { atoi(argv[1]), atoi(argv[2]), atoi(argv[3]), atoi(argv[4]) }; + leader = argv[5][0]; + rounds = atoi(argv[6]); + printf("leader %c, rounds %d, peer at %d.%d.%d.%d\n", + leader, rounds, ipv4[0], ipv4[1], ipv4[2], ipv4[3]); + + unsigned char pdl[] = ""; + Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1); + printf("Error str `%s`\n", reowolf_error_peek(NULL)); + Connector * c = connector_new_with_id(pd, leader=='y'?1:0); + PortId ports[2]; + if(leader=='y') { + EndpointPolarity ep = EndpointPolarity_Active; + connector_add_net_port(c, &ports[0], addr_new(ipv4, 7005), Polarity_Putter, ep); + connector_add_net_port(c, &ports[1], addr_new(ipv4, 7006), Polarity_Getter, ep); + } else { + EndpointPolarity ep = EndpointPolarity_Passive; + connector_add_net_port(c, &ports[0], addr_new(ipv4, 7005), Polarity_Getter, ep); + connector_add_net_port(c, &ports[1], addr_new(ipv4, 7006), Polarity_Putter, ep); + char ident[] = "sync"; + connector_add_component(c, ident, sizeof(ident)-1, ports, 2); + printf("Error str `%s`\n", reowolf_error_peek(NULL)); + } + connector_connect(c, -1); + printf("Error str `%s`\n", reowolf_error_peek(NULL)); + + size_t msg_len = 1000; + char * msg = malloc(msg_len); + memset(msg, 42, msg_len); + + clock_t begin = clock(); + for (i=0; i, - peers: HashMap, -} - impl TokenTarget { // subdivides the domain of usize into // [NET_ENDPOINT][UDP_ENDPOINT ] @@ -208,61 +202,92 @@ impl Connector { Err(Ce::AlreadyConnected) } ConnectorPhased::Setup(setup) => { - log!(cu.logger, "~~~ CONNECT called timeout {:?}", timeout); - let deadline = timeout.map(|to| Instant::now() + to); - // connect all endpoints in parallel; send and receive peer ids through ports - let (mut endpoint_manager, mut extra_port_info) = setup_endpoints_and_pair_ports( - &mut *cu.logger, - &setup.net_endpoint_setups, - &setup.udp_endpoint_setups, - &cu.ips.port_info, - &deadline, - )?; - log!( - cu.logger, - "Successfully connected {} endpoints. info now {:#?} {:#?}", - endpoint_manager.net_endpoint_store.endpoint_exts.len(), - &cu.ips.port_info, - &endpoint_manager, - ); - // leader election and tree construction. Learn our role in the consensus tree, - // from learning who are our children/parents (neighbors) in the consensus tree. - let neighborhood = init_neighborhood( - cu.ips.id_manager.connector_id, - &mut *cu.logger, - &mut endpoint_manager, - &deadline, - )?; - log!(cu.logger, "Successfully created neighborhood {:?}", &neighborhood); - // Put it all together with an initial round index of zero. - let mut comm = ConnectorCommunication { - round_index: 0, - endpoint_manager, - neighborhood, - native_batches: vec![Default::default()], - round_result: Ok(None), // no previous round yet + // Ideally, we'd simply clone `cu` in its entirety, and pass it to + // `connect_inner`, such that a successful connection discards the original. + // We need to work around the `logger` not being clonable; + // solution? create a dummy clone, and use mem-swap to ensure the + // single real logger is wherever it needs to be. + let mut cu_clone = ConnectorUnphased { + logger: Box::new(DummyLogger), + proto_components: cu.proto_components.clone(), + native_component_id: cu.native_component_id.clone(), + ips: cu.ips.clone(), + proto_description: cu.proto_description.clone(), }; - if cfg!(feature = "session_optimization") { - // Perform the session optimization procedure, which may modify the - // internals of the connector, rerouting ports, moving around connectors etc. - session_optimize(cu, &mut comm, &deadline)?; - } - log!(cu.logger, "connect() finished. setup phase complete"); - // Connect procedure successful! Commit changes by... - // ... commiting new port info for ConnectorUnphased - for (port, info) in extra_port_info.info.drain() { - cu.ips.port_info.owned.entry(info.owner).or_default().insert(port); - cu.ips.port_info.map.insert(port, info); - } - for (port, peer) in extra_port_info.peers.drain() { - cu.ips.port_info.map.get_mut(&port).unwrap().peer = Some(peer); + // cu has REAL logger... + std::mem::swap(&mut cu.logger, &mut cu_clone.logger); + // ... cu_clone has REAL logger. + match Self::connect_inner(cu_clone, setup, timeout) { + Ok(connected_connector) => { + *self = connected_connector; + Ok(()) + } + Err((err, mut logger)) => { + // Put the original logger back in place (in self.unphased, AKA `cu`). + // cu_clone has REAL logger... + std::mem::swap(&mut cu.logger, &mut logger); + // ... cu has REAL logger. + Err(err) + } } - // ... replacing the connector's phase to "communication" - *phased = ConnectorPhased::Communication(Box::new(comm)); - Ok(()) } } } + fn connect_inner( + mut cu: ConnectorUnphased, + setup: &ConnectorSetup, + timeout: Option, + ) -> Result)> { + log!(cu.logger, "~~~ CONNECT called timeout {:?}", timeout); + let deadline = timeout.map(|to| Instant::now() + to); + let mut try_complete = || { + // connect all endpoints in parallel; send and receive peer ids through ports + let mut endpoint_manager = setup_endpoints_and_pair_ports( + &mut *cu.logger, + &setup.net_endpoint_setups, + &setup.udp_endpoint_setups, + &mut cu.ips.port_info, + &deadline, + )?; + log!( + cu.logger, + "Successfully connected {} endpoints. info now {:#?} {:#?}", + endpoint_manager.net_endpoint_store.endpoint_exts.len(), + &cu.ips.port_info, + &endpoint_manager, + ); + // leader election and tree construction. Learn our role in the consensus tree, + // from learning who are our children/parents (neighbors) in the consensus tree. + let neighborhood = init_neighborhood( + cu.ips.id_manager.connector_id, + &mut *cu.logger, + &mut endpoint_manager, + &deadline, + )?; + log!(cu.logger, "Successfully created neighborhood {:?}", &neighborhood); + // Put it all together with an initial round index of zero. + let mut comm = ConnectorCommunication { + round_index: 0, + endpoint_manager, + neighborhood, + native_batches: vec![Default::default()], + round_result: Ok(None), // no previous round yet + }; + if cfg!(feature = "session_optimization") { + // Perform the session optimization procedure, which may modify the + // internals of the connector, rerouting ports, moving around connectors etc. + session_optimize(&mut cu, &mut comm, &deadline)?; + } + log!(cu.logger, "connect() finished. setup phase complete"); + Ok(comm) + }; + match try_complete() { + Ok(comm) => { + Ok(Self { unphased: cu, phased: ConnectorPhased::Communication(Box::new(comm)) }) + } + Err(err) => Err((err, cu.logger)), + } + } } // Given a set of net_ and udp_ endpoints to setup, @@ -275,9 +300,9 @@ fn setup_endpoints_and_pair_ports( logger: &mut dyn Logger, net_endpoint_setups: &[NetEndpointSetup], udp_endpoint_setups: &[UdpEndpointSetup], - port_info: &PortInfoMap, + port_info: &mut PortInfoMap, deadline: &Option, -) -> Result<(EndpointManager, ExtraPortInfo), ConnectError> { +) -> Result { use ConnectError as Ce; const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE); const RETRY_PERIOD: Duration = Duration::from_millis(200); @@ -305,12 +330,9 @@ fn setup_endpoints_and_pair_ports( Accepting(TcpListener), // awaiting it's peer initiating the connection PeerInfoRecving(NetEndpoint), // awaiting info about peer port through the channel } - //////////////////////////////////////////// // Start to construct our return values - // let mut waker_state: Option> = None; - let mut extra_port_info = ExtraPortInfo::default(); let mut poll = Poll::new().map_err(|_| Ce::PollInitFailed)?; let mut events = Events::with_capacity((net_endpoint_setups.len() + udp_endpoint_setups.len()) * 2 + 4); @@ -540,20 +562,25 @@ fn setup_endpoints_and_pair_ports( } net_todo.recv_peer_port = Some(peer_info.port); // finally learned the peer of this port! - extra_port_info.peers.insert( - net_todo.endpoint_setup.getter_for_incoming, - peer_info.port, - ); + port_info + .map + .get_mut(&net_todo.endpoint_setup.getter_for_incoming) + .unwrap() + .peer = Some(peer_info.port); // learned the info of this peer port - if !port_info.map.contains_key(&peer_info.port) { - let info = PortInfo { + port_info.map.entry(peer_info.port).or_insert({ + port_info + .owned + .entry(peer_info.owner) + .or_default() + .insert(peer_info.port); + PortInfo { peer: Some(net_todo.endpoint_setup.getter_for_incoming), polarity: peer_info.polarity, owner: peer_info.owner, route: Route::NetEndpoint { index }, - }; - extra_port_info.info.insert(peer_info.port, info); - } + } + }); } Some(inappropriate_msg) => { log!( @@ -625,7 +652,7 @@ fn setup_endpoints_and_pair_ports( }, io_byte_buffer, }; - Ok((endpoint_manager, extra_port_info)) + Ok(endpoint_manager) } // Given a fully-formed endpoint manager, @@ -1015,6 +1042,7 @@ fn apply_my_optimizations( endpoint_incoming_to_getter, } = session_info; // simply overwrite the contents + // println!("BEFORE: {:#?}\n{:#?}", cu, comm); cu.ips.port_info = port_info; assert!(cu.ips.port_info.invariant_preserved()); cu.proto_components = proto_components; @@ -1028,5 +1056,6 @@ fn apply_my_optimizations( { ee.getter_for_incoming = getter; } + // println!("AFTER: {:#?}\n{:#?}", cu, comm); Ok(()) }