diff --git a/examples/bench_15/main.c b/examples/bench_15/main.c index 08a6b43fe743aa5203f65349ad757b943d0b9cc4..b1db4ed36ca51834a7dd8382ceffe7d571be5adb 100644 --- a/examples/bench_15/main.c +++ b/examples/bench_15/main.c @@ -19,11 +19,11 @@ int main(int argc, char** argv) { seen_delim = true; continue; } else if(seen_delim) { - printf("putter"); + printf("getter"); p = Polarity_Getter; ep = EndpointPolarity_Passive; } else { - printf("getter"); + printf("putter"); p = Polarity_Putter; ep = EndpointPolarity_Active; } @@ -33,9 +33,10 @@ int main(int argc, char** argv) { } printf("Added all ports!\n"); connector_connect(c, -1); + printf("Connect OK!\n"); clock_t begin = clock(); - for (i=0; i<10000; i++) { + for (i=0; i<100000; i++) { connector_sync(c, -1); } clock_t end = clock(); diff --git a/examples/bench_18/main.c b/examples/bench_18/main.c index 50db8b452bfce9a9004f81104acc280bdd6016e3..a6b8b3e3233c2740a78ac2ae2c41f11e8060612f 100644 --- a/examples/bench_18/main.c +++ b/examples/bench_18/main.c @@ -34,7 +34,7 @@ int main(int argc, char** argv) { clock_t begin = clock(); char msg[] = "Hello, world!"; - for (i=0; i<1000; i++) { + for (i=0; i<1000000; i++) { for(j=0; j +#include "../../reowolf.h" +#include "../utility.c" +int main(int argc, char** argv) { + // same as bench 15 but connecting to 87.210.104.102 and getting at 0.0.0.0 + // also, doing 10k reps (down from 100k) to save time + int i, cid; + cid = atoi(argv[1]); + printf("cid %d\n", cid); + printf("Error str `%s`\n", reowolf_error_peek(NULL)); + + unsigned char pdl[] = ""; + Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1); + Connector * c = connector_new_with_id(pd, cid); + + bool seen_delim = false; + for(i=2; i +#include "../../reowolf.h" +#include "../utility.c" + + +int main(int argc, char** argv) { + // one of two processes: {leader, follower} + // where a set of `par_msgs` messages are sent leader->follower after + // looping follower->leader->follower `msg_loops` times. + int i, j, cid, msg_loops, par_msgs; + char is_leader; + is_leader = argv[1][0]; + msg_loops = atoi(argv[2]); + par_msgs = atoi(argv[3]); + // argv[4..8] encodes peer IP + printf("is_leader %c, msg_loops %d, par_msgs %d\n", is_leader, msg_loops, par_msgs); + cid = is_leader=='y'; // cid := { leader:1, follower:0 } + + unsigned char pdl[] = ""; + Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1); + Connector * c = connector_new_with_id(pd, cid); + PortId native_ports[par_msgs]; + FfiSocketAddr peer_addr = { + { + atoi(argv[4]), + atoi(argv[5]), + atoi(argv[6]), + atoi(argv[7]) + }, 0/*dummy value*/}; + int port = 7000; + + // for each parallel message + for(i=0; i +#include "../../reowolf.h" +#include "../utility.c" + + +int main(int argc, char** argv) { + // same as bench 21 but with parametric message length + int i, j, cid, msg_loops, par_msgs, msg_len; + char is_leader; + is_leader = argv[1][0]; + msg_loops = atoi(argv[2]); + par_msgs = atoi(argv[3]); + msg_len = atoi(argv[8]); + + // argv[4..8] encodes peer IP + printf("is_leader %c, msg_loops %d, par_msgs %d, msg_len %d\n", is_leader, msg_loops, par_msgs, msg_len); + cid = is_leader=='y'; // cid := { leader:1, follower:0 } + + unsigned char pdl[] = ""; + Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1); + Connector * c = connector_new_with_id(pd, cid); + PortId native_ports[par_msgs]; + FfiSocketAddr peer_addr = { + { + atoi(argv[4]), + atoi(argv[5]), + atoi(argv[6]), + atoi(argv[7]) + }, 0/*dummy value*/}; + int port = 7000; + + // for each parallel message + for(i=0; i { - bytes: usize, - r: R, -} - enum PollAndPopulateError { PollFailed, Timeout, @@ -51,14 +43,15 @@ impl NetEndpoint { DenseDebugHex(&self.inbox[..before_len]), DenseDebugHex(&self.inbox[before_len..]), ); - // Try deserialize from the inbox, monitoring how many bytes - // the deserialiation process consumes. In the event of - // success, this makes clear where the message ends - let mut monitored = MonitoredReader::from(&self.inbox[..]); + // Try deserialize from the inbox. `reading_slice' is updated by read() + // in-place to truncate the read part. In the event of success, + // the message bytes are contained in the truncated prefix + let mut reading_slice = self.inbox.as_slice(); + let before_len = reading_slice.len(); use bincode::config::Options; - match Self::bincode_opts().deserialize_from(&mut monitored) { + match Self::bincode_opts().deserialize_from(&mut reading_slice) { Ok(msg) => { - let msg_size = monitored.bytes_read(); + let msg_size = before_len - reading_slice.len(); // inbox[..msg_size] was deserialized into one message! self.inbox.drain(..msg_size); log!( @@ -83,14 +76,21 @@ impl NetEndpoint { } // Send the given serializable type into the stream - pub(super) fn send( + pub(super) fn send( &mut self, msg: &T, + io_byte_buffer: &mut IoByteBuffer, ) -> Result<(), NetEndpointError> { use bincode::config::Options; use NetEndpointError as Nee; - Self::bincode_opts() - .serialize_into(&mut self.stream, msg) + // Create a buffer for our bytes: a slice of the io_byte_buffer + let mut buf_slice = io_byte_buffer.as_mut_slice(); + // serialize into the slice, truncating as its filled + Self::bincode_opts().serialize_into(&mut buf_slice, msg).expect("Serialize failed!"); + // written segment is the part missing from buf_slice. Write this as one segment to the TCP stream + let wrote = IoByteBuffer::CAPACITY - buf_slice.len(); + self.stream + .write_all(&io_byte_buffer.as_mut_slice()[..wrote]) .map_err(|_| Nee::BrokenNetEndpoint)?; let _ = self.stream.flush(); Ok(()) @@ -109,7 +109,7 @@ impl EndpointManager { // Used pervasively, allows for some brevity with the ? operator. pub(super) fn send_to_setup(&mut self, index: usize, msg: &Msg) -> Result<(), ConnectError> { let net_endpoint = &mut self.net_endpoint_store.endpoint_exts[index].net_endpoint; - net_endpoint.send(msg).map_err(|err| { + net_endpoint.send(msg, &mut self.io_byte_buffer).map_err(|err| { ConnectError::NetEndpointSetupError(net_endpoint.stream.local_addr().unwrap(), err) }) } @@ -123,7 +123,9 @@ impl EndpointManager { ) -> Result<(), UnrecoverableSyncError> { use UnrecoverableSyncError as Use; let net_endpoint = &mut self.net_endpoint_store.endpoint_exts[index].net_endpoint; - net_endpoint.send(msg).map_err(|_| Use::BrokenNetEndpoint { index }) + net_endpoint + .send(msg, &mut self.io_byte_buffer) + .map_err(|_| Use::BrokenNetEndpoint { index }) } /// Receive the first message of any kind at all. @@ -266,7 +268,7 @@ impl EndpointManager { } } // try receive a udp message - let recv_buffer = self.udp_in_buffer.as_mut_slice(); + let recv_buffer = self.io_byte_buffer.as_mut_slice(); while let Some(index) = self.udp_endpoint_store.polled_undrained.pop() { let ee = &mut self.udp_endpoint_store.endpoint_exts[index]; if let Some(bytes_written) = ee.sock.recv(recv_buffer).ok() { @@ -438,23 +440,6 @@ impl Debug for NetEndpoint { .finish() } } -impl From for MonitoredReader { - fn from(r: R) -> Self { - Self { r, bytes: 0 } - } -} -impl MonitoredReader { - pub(super) fn bytes_read(&self) -> usize { - self.bytes - } -} -impl Read for MonitoredReader { - fn read(&mut self, buf: &mut [u8]) -> Result { - let n = self.r.read(buf)?; - self.bytes += n; - Ok(n) - } -} impl Into for SetupMsg { fn into(self) -> Msg { Msg::SetupMsg(self) diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index ba664998818ec5995e083ac074dc2eab9a78310b..1d8a9841ca0c7ec58c62c60d59f4dd9d7e189dcb 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -261,7 +261,7 @@ struct IdManager { } // Newtype wrapper around a byte buffer, used for UDP mediators to receive incoming datagrams. -struct UdpInBuffer { +struct IoByteBuffer { byte_vec: Vec, } @@ -285,7 +285,7 @@ struct EndpointManager { undelayed_messages: Vec<(usize, Msg)>, // ready to yield net_endpoint_store: EndpointStore, udp_endpoint_store: EndpointStore, - udp_in_buffer: UdpInBuffer, + io_byte_buffer: IoByteBuffer, } // A storage of endpoints, which keeps track of which components have raised @@ -902,7 +902,7 @@ impl Debug for SpecVal { self.0.fmt(f) } } -impl Default for UdpInBuffer { +impl Default for IoByteBuffer { fn default() -> Self { let mut byte_vec = Vec::with_capacity(Self::CAPACITY); unsafe { @@ -912,15 +912,15 @@ impl Default for UdpInBuffer { Self { byte_vec } } } -impl UdpInBuffer { - const CAPACITY: usize = u16::MAX as usize; +impl IoByteBuffer { + const CAPACITY: usize = u16::MAX as usize + 1000; fn as_mut_slice(&mut self) -> &mut [u8] { self.byte_vec.as_mut_slice() } } -impl Debug for UdpInBuffer { +impl Debug for IoByteBuffer { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "UdpInBuffer") + write!(f, "IoByteBuffer") } } diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index 53556e16b76f60bd91f77ee79d2fdd5a2f8704fb..a86688f7e5cf6daea809e23b33c9f67f6fa35756 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -317,6 +317,7 @@ fn setup_endpoints_and_pair_ports( let [mut net_polled_undrained, udp_polled_undrained] = [VecSet::default(), VecSet::default()]; let mut delayed_messages = vec![]; let mut last_retry_at = Instant::now(); + let mut io_byte_buffer = IoByteBuffer::default(); // Create net/udp todo structures, each already registered with poll let mut net_todos = net_endpoint_setups @@ -501,7 +502,7 @@ fn setup_endpoints_and_pair_ports( port: net_todo.endpoint_setup.getter_for_incoming, })); net_endpoint - .send(&msg) + .send(&msg, &mut io_byte_buffer) .map_err(|e| { Ce::NetEndpointSetupError( net_endpoint.stream.local_addr().unwrap(), @@ -622,7 +623,7 @@ fn setup_endpoints_and_pair_ports( endpoint_exts: udp_endpoint_exts, polled_undrained: udp_polled_undrained, }, - udp_in_buffer: Default::default(), + io_byte_buffer, }; Ok((endpoint_manager, extra_port_info)) }