Changeset - ef80a75d0e6b
[Not reviewed]
1 6 3
Christopher Esterhuyse - 5 years ago 2020-10-06 17:16:43
christopher.esterhuyse@gmail.com
minor simplifications
10 files changed with 283 insertions and 63 deletions:
0 comments (0 inline, 0 general)
examples/bench_15/main.c
Show inline comments
 
@@ -19,11 +19,11 @@ int main(int argc, char** argv) {
 
			seen_delim = true;
 
			continue;
 
		} else if(seen_delim) {
 
			printf("putter");
 
			printf("getter");
 
			p = Polarity_Getter;
 
			ep = EndpointPolarity_Passive;
 
		} else {
 
			printf("getter");
 
			printf("putter");
 
			p = Polarity_Putter;
 
			ep = EndpointPolarity_Active;
 
		}
 
@@ -33,9 +33,10 @@ int main(int argc, char** argv) {
 
	}
 
	printf("Added all ports!\n");
 
	connector_connect(c, -1);
 
	printf("Connect OK!\n");
 
	
 
	clock_t begin = clock();
 
	for (i=0; i<10000; i++) {
 
	for (i=0; i<100000; i++) {
 
		connector_sync(c, -1);
 
	}
 
	clock_t end = clock();
examples/bench_18/main.c
Show inline comments
 
@@ -34,7 +34,7 @@ int main(int argc, char** argv) {
 
	
 
	clock_t begin = clock();
 
	char msg[] = "Hello, world!";
 
	for (i=0; i<1000; i++) {
 
	for (i=0; i<1000000; i++) {
 
		for(j=0; j<ports_used; j++) {
 
			if(do_gets=='y') connector_get(c, getters[j]);
 
			if(do_puts=='y') connector_put_bytes(c, putters[j], msg, sizeof(msg)-1);
examples/bench_20/main.c
Show inline comments
 
new file 100644
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	// same as bench 15 but connecting to 87.210.104.102 and getting at 0.0.0.0
 
	// also, doing 10k reps (down from 100k) to save time
 
	int i, cid;
 
	cid = atoi(argv[1]);
 
	printf("cid %d\n", cid);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 

	
 
	unsigned char pdl[] = "";
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
	Connector * c = connector_new_with_id(pd, cid);
 

	
 
	bool seen_delim = false;
 
	for(i=2; i<argc; i++) {
 
		EndpointPolarity ep;
 
		Polarity p;
 
		FfiSocketAddr addr;
 
		if(argv[i][0] == '.') {
 
			seen_delim = true;
 
			continue;
 
		} else if(seen_delim) {
 
			addr = (FfiSocketAddr) {{0, 0, 0, 0}, atoi(argv[i])};
 
			printf("getter");
 
			p = Polarity_Getter;
 
			ep = EndpointPolarity_Passive;
 
		} else {
 
			addr = (FfiSocketAddr) {{87, 210, 104, 102}, atoi(argv[i])};
 
			printf("putter");
 
			p = Polarity_Putter;
 
			ep = EndpointPolarity_Active;
 
		}
 
		printf("@%d\n", addr.port);
 
		connector_add_net_port(c, NULL, addr, p, ep);
 
	}
 
	printf("Added all ports!\n");
 
	connector_connect(c, -1);
 
	printf("Connect OK!\n");
 
	
 
	clock_t begin = clock();
 
	for (i=0; i<150; i++) {
 
		connector_sync(c, -1);
 
	}
 
	clock_t end = clock();
 
	double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
	printf("Time taken: %f\n", time_spent);
 
	return 0;
 
}
 
\ No newline at end of file
examples/bench_21/main.c
Show inline comments
 
new file 100644
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 

	
 

	
 
int main(int argc, char** argv) {
 
	// one of two processes: {leader, follower}
 
	// where a set of `par_msgs` messages are sent leader->follower after
 
	// looping follower->leader->follower `msg_loops` times.
 
	int i, j, cid, msg_loops, par_msgs;
 
	char is_leader;
 
	is_leader = argv[1][0];
 
	msg_loops = atoi(argv[2]);
 
	par_msgs = atoi(argv[3]);
 
	// argv[4..8] encodes peer IP
 
	printf("is_leader %c, msg_loops %d, par_msgs %d\n", is_leader, msg_loops, par_msgs);
 
	cid = is_leader=='y'; // cid := { leader:1, follower:0 }
 

	
 
	unsigned char pdl[] = "";
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
	Connector * c = connector_new_with_id(pd, cid);
 
	PortId native_ports[par_msgs];
 
	FfiSocketAddr peer_addr = {
 
		{
 
			atoi(argv[4]),
 
			atoi(argv[5]),
 
			atoi(argv[6]),
 
			atoi(argv[7])
 
		}, 0/*dummy value*/};
 
	int port = 7000;
 
	
 
	// for each parallel message 
 
	for(i=0; i<par_msgs; i++) {
 
		if(is_leader == 'y') {
 
			peer_addr.port = port++;
 
			connector_add_net_port(
 
				c, &native_ports[i], peer_addr, Polarity_Putter, EndpointPolarity_Active);
 
			printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
		}
 

	
 
		for(j=0; j<msg_loops; j++) {
 
			PortId loop_getter, loop_putter;
 

	
 
			// create {putter, getter} port pair
 
			connector_add_net_port(
 
				c, &loop_getter,
 
				(FfiSocketAddr) {{0,0,0,0}, port++},
 
				Polarity_Getter, EndpointPolarity_Passive);
 
			printf("Error str `%s`\n", reowolf_error_peek(NULL));
 

	
 
			peer_addr.port = port++;
 
			connector_add_net_port(
 
				c, &loop_putter, peer_addr, Polarity_Putter, EndpointPolarity_Active);
 
			printf("Error str `%s`\n", reowolf_error_peek(NULL));
 

	
 
			connector_add_component(c, "forward", 7,
 
				(PortId[]){loop_getter, loop_putter}, 2);
 
			printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
		}
 

	
 
		if(is_leader != 'y') {
 
			connector_add_net_port(
 
				c, &native_ports[i],
 
				(FfiSocketAddr) {{0,0,0,0}, port++},
 
				Polarity_Getter, EndpointPolarity_Passive);
 
			printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
		}
 
	}
 
	printf("Added all ports!\n");
 
	connector_connect(c, -1);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	printf("Connect OK!\n");
 
	
 
	clock_t begin = clock();
 
	char msg[] = "Hello, world!";
 
	for(i=0; i<250; i++) {
 
		if(is_leader == 'y') {
 
			for(j=0; j<par_msgs; j++) {
 
				connector_put_bytes(c, native_ports[j], msg, sizeof(msg)-1);
 
			}
 
		} else {
 
			for(j=0; j<par_msgs; j++) {
 
				connector_get(c, native_ports[j]);
 
			}
 
		}
 
		connector_sync(c, -1);
 
	}
 
	
 
	clock_t end = clock();
 
	double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
	printf("Time taken: %f\n", time_spent);
 
	return 0;
 
}
 
\ No newline at end of file
examples/bench_22/main.c
Show inline comments
 
new file 100644
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 

	
 

	
 
int main(int argc, char** argv) {
 
	// same as bench 21 but with parametric message length
 
	int i, j, cid, msg_loops, par_msgs, msg_len;
 
	char is_leader;
 
	is_leader = argv[1][0];
 
	msg_loops = atoi(argv[2]);
 
	par_msgs = atoi(argv[3]);
 
	msg_len = atoi(argv[8]);
 

	
 
	// argv[4..8] encodes peer IP
 
	printf("is_leader %c, msg_loops %d, par_msgs %d, msg_len %d\n", is_leader, msg_loops, par_msgs, msg_len);
 
	cid = is_leader=='y'; // cid := { leader:1, follower:0 }
 

	
 
	unsigned char pdl[] = "";
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
	Connector * c = connector_new_with_id(pd, cid);
 
	PortId native_ports[par_msgs];
 
	FfiSocketAddr peer_addr = {
 
		{
 
			atoi(argv[4]),
 
			atoi(argv[5]),
 
			atoi(argv[6]),
 
			atoi(argv[7])
 
		}, 0/*dummy value*/};
 
	int port = 7000;
 
	
 
	// for each parallel message 
 
	for(i=0; i<par_msgs; i++) {
 
		if(is_leader == 'y') {
 
			peer_addr.port = port++;
 
			connector_add_net_port(
 
				c, &native_ports[i], peer_addr, Polarity_Putter, EndpointPolarity_Active);
 
			printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
		}
 

	
 
		for(j=0; j<msg_loops; j++) {
 
			PortId loop_getter, loop_putter;
 

	
 
			// create {putter, getter} port pair
 
			connector_add_net_port(
 
				c, &loop_getter,
 
				(FfiSocketAddr) {{0,0,0,0}, port++},
 
				Polarity_Getter, EndpointPolarity_Passive);
 
			printf("Error str `%s`\n", reowolf_error_peek(NULL));
 

	
 
			peer_addr.port = port++;
 
			connector_add_net_port(
 
				c, &loop_putter, peer_addr, Polarity_Putter, EndpointPolarity_Active);
 
			printf("Error str `%s`\n", reowolf_error_peek(NULL));
 

	
 
			connector_add_component(c, "forward", 7,
 
				(PortId[]){loop_getter, loop_putter}, 2);
 
			printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
		}
 

	
 
		if(is_leader != 'y') {
 
			connector_add_net_port(
 
				c, &native_ports[i],
 
				(FfiSocketAddr) {{0,0,0,0}, port++},
 
				Polarity_Getter, EndpointPolarity_Passive);
 
			printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
		}
 
	}
 
	printf("Added all ports!\n");
 
	connector_connect(c, -1);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	printf("Connect OK!\n");
 

	
 
	char * msg = malloc(msg_len);
 
	memset(msg, 42, msg_len);
 
	
 
	clock_t begin = clock();
 
	for(i=0; i<100000; i++) {
 
		if(is_leader == 'y') {
 
			for(j=0; j<par_msgs; j++) {
 
				connector_put_bytes(c, native_ports[j], msg, msg_len);
 
			}
 
		} else {
 
			for(j=0; j<par_msgs; j++) {
 
				connector_get(c, native_ports[j]);
 
			}
 
		}
 
		connector_sync(c, -1);
 
	}
 
	
 
	clock_t end = clock();
 
	double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
	printf("Time taken: %f\n", time_spent);
 
	free(msg);
 
	return 0;
 
}
 
\ No newline at end of file
examples/bench_8/main.c
Show inline comments
 
@@ -2,9 +2,10 @@
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	int i, forwards;
 
	int i, forwards, msglen;
 
	forwards = atoi(argv[1]);
 
	printf("forwards %d\n", forwards);
 
	msglen = atoi(argv[2]);
 
	printf("forwards %d, msglen %d\n", forwards, msglen);
 
	unsigned char pdl[] = ""; 
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
@@ -27,14 +28,12 @@ int main(int argc, char** argv) {
 
	connector_connect(c, -1);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	
 
	size_t msg_len = 0xffff;
 
	char * msg = malloc(msg_len);
 
	memset(msg, 42, msg_len);
 
	char * msg = malloc(msglen);
 
	memset(msg, 42, msglen);
 
	
 
	clock_t begin = clock();
 
	for (i=0; i<1000000; i++) {
 
		connector_put_bytes(c, native_putter, msg, msg_len);
 
	for (i=0; i<100000; i++) {
 
		connector_put_bytes(c, native_putter, msg, msglen);
 
		connector_get(c, native_getter);
 
		connector_sync(c, -1);
 
	}
examples/zoop.sh
Show inline comments
 
deleted file
src/runtime/endpoints.rs
Show inline comments
 
use super::*;
 

	
 
// A wrapper for some Read type, delegating read calls
 
// to the contained Read structure, but snooping on
 
// the number of bytes it reads, to be inspected later.
 
struct MonitoredReader<R: Read> {
 
    bytes: usize,
 
    r: R,
 
}
 

	
 
enum PollAndPopulateError {
 
    PollFailed,
 
    Timeout,
 
@@ -51,14 +43,15 @@ impl NetEndpoint {
 
            DenseDebugHex(&self.inbox[..before_len]),
 
            DenseDebugHex(&self.inbox[before_len..]),
 
        );
 
        // Try deserialize from the inbox, monitoring how many bytes
 
        // the deserialiation process consumes. In the event of
 
        // success, this makes clear where the message ends
 
        let mut monitored = MonitoredReader::from(&self.inbox[..]);
 
        // Try deserialize from the inbox. `reading_slice' is updated by read()
 
        // in-place to truncate the read part. In the event of success,
 
        // the message bytes are contained in the truncated prefix
 
        let mut reading_slice = self.inbox.as_slice();
 
        let before_len = reading_slice.len();
 
        use bincode::config::Options;
 
        match Self::bincode_opts().deserialize_from(&mut monitored) {
 
        match Self::bincode_opts().deserialize_from(&mut reading_slice) {
 
            Ok(msg) => {
 
                let msg_size = monitored.bytes_read();
 
                let msg_size = before_len - reading_slice.len();
 
                // inbox[..msg_size] was deserialized into one message!
 
                self.inbox.drain(..msg_size);
 
                log!(
 
@@ -83,14 +76,21 @@ impl NetEndpoint {
 
    }
 

	
 
    // Send the given serializable type into the stream
 
    pub(super) fn send<T: serde::ser::Serialize>(
 
    pub(super) fn send<T: serde::ser::Serialize + Debug>(
 
        &mut self,
 
        msg: &T,
 
        io_byte_buffer: &mut IoByteBuffer,
 
    ) -> Result<(), NetEndpointError> {
 
        use bincode::config::Options;
 
        use NetEndpointError as Nee;
 
        Self::bincode_opts()
 
            .serialize_into(&mut self.stream, msg)
 
        // Create a buffer for our bytes: a slice of the io_byte_buffer
 
        let mut buf_slice = io_byte_buffer.as_mut_slice();
 
        // serialize into the slice, truncating as its filled
 
        Self::bincode_opts().serialize_into(&mut buf_slice, msg).expect("Serialize failed!");
 
        // written segment is the part missing from buf_slice. Write this as one segment to the TCP stream
 
        let wrote = IoByteBuffer::CAPACITY - buf_slice.len();
 
        self.stream
 
            .write_all(&io_byte_buffer.as_mut_slice()[..wrote])
 
            .map_err(|_| Nee::BrokenNetEndpoint)?;
 
        let _ = self.stream.flush();
 
        Ok(())
 
@@ -109,7 +109,7 @@ impl EndpointManager {
 
    // Used pervasively, allows for some brevity with the ? operator.
 
    pub(super) fn send_to_setup(&mut self, index: usize, msg: &Msg) -> Result<(), ConnectError> {
 
        let net_endpoint = &mut self.net_endpoint_store.endpoint_exts[index].net_endpoint;
 
        net_endpoint.send(msg).map_err(|err| {
 
        net_endpoint.send(msg, &mut self.io_byte_buffer).map_err(|err| {
 
            ConnectError::NetEndpointSetupError(net_endpoint.stream.local_addr().unwrap(), err)
 
        })
 
    }
 
@@ -123,7 +123,9 @@ impl EndpointManager {
 
    ) -> Result<(), UnrecoverableSyncError> {
 
        use UnrecoverableSyncError as Use;
 
        let net_endpoint = &mut self.net_endpoint_store.endpoint_exts[index].net_endpoint;
 
        net_endpoint.send(msg).map_err(|_| Use::BrokenNetEndpoint { index })
 
        net_endpoint
 
            .send(msg, &mut self.io_byte_buffer)
 
            .map_err(|_| Use::BrokenNetEndpoint { index })
 
    }
 

	
 
    /// Receive the first message of any kind at all.
 
@@ -266,7 +268,7 @@ impl EndpointManager {
 
                }
 
            }
 
            // try receive a udp message
 
            let recv_buffer = self.udp_in_buffer.as_mut_slice();
 
            let recv_buffer = self.io_byte_buffer.as_mut_slice();
 
            while let Some(index) = self.udp_endpoint_store.polled_undrained.pop() {
 
                let ee = &mut self.udp_endpoint_store.endpoint_exts[index];
 
                if let Some(bytes_written) = ee.sock.recv(recv_buffer).ok() {
 
@@ -438,23 +440,6 @@ impl Debug for NetEndpoint {
 
            .finish()
 
    }
 
}
 
impl<R: Read> From<R> for MonitoredReader<R> {
 
    fn from(r: R) -> Self {
 
        Self { r, bytes: 0 }
 
    }
 
}
 
impl<R: Read> MonitoredReader<R> {
 
    pub(super) fn bytes_read(&self) -> usize {
 
        self.bytes
 
    }
 
}
 
impl<R: Read> Read for MonitoredReader<R> {
 
    fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
 
        let n = self.r.read(buf)?;
 
        self.bytes += n;
 
        Ok(n)
 
    }
 
}
 
impl Into<Msg> for SetupMsg {
 
    fn into(self) -> Msg {
 
        Msg::SetupMsg(self)
src/runtime/mod.rs
Show inline comments
 
@@ -261,7 +261,7 @@ struct IdManager {
 
}
 

	
 
// Newtype wrapper around a byte buffer, used for UDP mediators to receive incoming datagrams.
 
struct UdpInBuffer {
 
struct IoByteBuffer {
 
    byte_vec: Vec<u8>,
 
}
 

	
 
@@ -285,7 +285,7 @@ struct EndpointManager {
 
    undelayed_messages: Vec<(usize, Msg)>, // ready to yield
 
    net_endpoint_store: EndpointStore<NetEndpointExt>,
 
    udp_endpoint_store: EndpointStore<UdpEndpointExt>,
 
    udp_in_buffer: UdpInBuffer,
 
    io_byte_buffer: IoByteBuffer,
 
}
 

	
 
// A storage of endpoints, which keeps track of which components have raised
 
@@ -902,7 +902,7 @@ impl Debug for SpecVal {
 
        self.0.fmt(f)
 
    }
 
}
 
impl Default for UdpInBuffer {
 
impl Default for IoByteBuffer {
 
    fn default() -> Self {
 
        let mut byte_vec = Vec::with_capacity(Self::CAPACITY);
 
        unsafe {
 
@@ -912,15 +912,15 @@ impl Default for UdpInBuffer {
 
        Self { byte_vec }
 
    }
 
}
 
impl UdpInBuffer {
 
    const CAPACITY: usize = u16::MAX as usize;
 
impl IoByteBuffer {
 
    const CAPACITY: usize = u16::MAX as usize + 1000;
 
    fn as_mut_slice(&mut self) -> &mut [u8] {
 
        self.byte_vec.as_mut_slice()
 
    }
 
}
 

	
 
impl Debug for UdpInBuffer {
 
impl Debug for IoByteBuffer {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        write!(f, "UdpInBuffer")
 
        write!(f, "IoByteBuffer")
 
    }
 
}
src/runtime/setup.rs
Show inline comments
 
@@ -317,6 +317,7 @@ fn setup_endpoints_and_pair_ports(
 
    let [mut net_polled_undrained, udp_polled_undrained] = [VecSet::default(), VecSet::default()];
 
    let mut delayed_messages = vec![];
 
    let mut last_retry_at = Instant::now();
 
    let mut io_byte_buffer = IoByteBuffer::default();
 

	
 
    // Create net/udp todo structures, each already registered with poll
 
    let mut net_todos = net_endpoint_setups
 
@@ -501,7 +502,7 @@ fn setup_endpoints_and_pair_ports(
 
                                port: net_todo.endpoint_setup.getter_for_incoming,
 
                            }));
 
                            net_endpoint
 
                                .send(&msg)
 
                                .send(&msg, &mut io_byte_buffer)
 
                                .map_err(|e| {
 
                                    Ce::NetEndpointSetupError(
 
                                        net_endpoint.stream.local_addr().unwrap(),
 
@@ -622,7 +623,7 @@ fn setup_endpoints_and_pair_ports(
 
            endpoint_exts: udp_endpoint_exts,
 
            polled_undrained: udp_polled_undrained,
 
        },
 
        udp_in_buffer: Default::default(),
 
        io_byte_buffer,
 
    };
 
    Ok((endpoint_manager, extra_port_info))
 
}
0 comments (0 inline, 0 general)