Changeset - ef80a75d0e6b
[Not reviewed]
1 6 3
Christopher Esterhuyse - 5 years ago 2020-10-06 17:16:43
christopher.esterhuyse@gmail.com
minor simplifications
10 files changed with 283 insertions and 63 deletions:
0 comments (0 inline, 0 general)
examples/bench_15/main.c
Show inline comments
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	int i, cid;
 
	cid = atoi(argv[1]);
 
	printf("cid %d\n", cid);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 

	
 
	unsigned char pdl[] = "";
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
	Connector * c = connector_new_with_id(pd, cid);
 

	
 
	bool seen_delim = false;
 
	for(i=2; i<argc; i++) {
 
		EndpointPolarity ep;
 
		Polarity p;
 
		if(argv[i][0] == '.') {
 
			seen_delim = true;
 
			continue;
 
		} else if(seen_delim) {
 
			printf("putter");
 
			printf("getter");
 
			p = Polarity_Getter;
 
			ep = EndpointPolarity_Passive;
 
		} else {
 
			printf("getter");
 
			printf("putter");
 
			p = Polarity_Putter;
 
			ep = EndpointPolarity_Active;
 
		}
 
		FfiSocketAddr addr = {{127, 0, 0, 1}, atoi(argv[i])};
 
		printf("@%d\n", addr.port);
 
		connector_add_net_port(c, NULL, addr, p, ep);
 
	}
 
	printf("Added all ports!\n");
 
	connector_connect(c, -1);
 
	printf("Connect OK!\n");
 
	
 
	clock_t begin = clock();
 
	for (i=0; i<10000; i++) {
 
	for (i=0; i<100000; i++) {
 
		connector_sync(c, -1);
 
	}
 
	clock_t end = clock();
 
	double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
	printf("Time taken: %f\n", time_spent);
 
	return 0;
 
}
 
\ No newline at end of file
examples/bench_18/main.c
Show inline comments
 
@@ -13,36 +13,36 @@ int main(int argc, char** argv) {
 
	ports_used = atoi(argv[5]);
 
	do_puts = argv[6][0]; // 't' or 'f'
 
	do_gets = argv[7][0];
 
	printf("cid %d, min_putter %d, min_getter %d, ports_tot %d, ports_used %d, do_puts %c, do_gets %c\n",
 
		cid, min_putter, min_getter, ports_tot, ports_used, do_puts, do_gets);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 

	
 
	unsigned char pdl[] = "";
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
	Connector * c = connector_new_with_id(pd, cid);
 
	PortId putters[ports_tot], getters[ports_tot];
 
	for(i=0; i<ports_tot; i++) {
 
		connector_add_net_port(c, &putters[i],
 
			(FfiSocketAddr){{127, 0, 0, 1}, min_putter+i},
 
			Polarity_Putter, EndpointPolarity_Active);
 
		connector_add_net_port(c, &getters[i],
 
			(FfiSocketAddr){{127, 0, 0, 1}, min_getter+i},
 
			Polarity_Getter, EndpointPolarity_Active);
 
	}
 
	connector_connect(c, -1);
 
	printf("connect ok!\n");
 
	
 
	clock_t begin = clock();
 
	char msg[] = "Hello, world!";
 
	for (i=0; i<1000; i++) {
 
	for (i=0; i<1000000; i++) {
 
		for(j=0; j<ports_used; j++) {
 
			if(do_gets=='y') connector_get(c, getters[j]);
 
			if(do_puts=='y') connector_put_bytes(c, putters[j], msg, sizeof(msg)-1);
 
		}
 
		connector_sync(c, -1);
 
	}
 
	clock_t end = clock();
 
	double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
	printf("Time taken: %f\n", time_spent);
 
	return 0;
 
}
 
\ No newline at end of file
examples/bench_20/main.c
Show inline comments
 
new file 100644
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	// same as bench 15 but connecting to 87.210.104.102 and getting at 0.0.0.0
 
	// also, doing 10k reps (down from 100k) to save time
 
	int i, cid;
 
	cid = atoi(argv[1]);
 
	printf("cid %d\n", cid);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 

	
 
	unsigned char pdl[] = "";
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
	Connector * c = connector_new_with_id(pd, cid);
 

	
 
	bool seen_delim = false;
 
	for(i=2; i<argc; i++) {
 
		EndpointPolarity ep;
 
		Polarity p;
 
		FfiSocketAddr addr;
 
		if(argv[i][0] == '.') {
 
			seen_delim = true;
 
			continue;
 
		} else if(seen_delim) {
 
			addr = (FfiSocketAddr) {{0, 0, 0, 0}, atoi(argv[i])};
 
			printf("getter");
 
			p = Polarity_Getter;
 
			ep = EndpointPolarity_Passive;
 
		} else {
 
			addr = (FfiSocketAddr) {{87, 210, 104, 102}, atoi(argv[i])};
 
			printf("putter");
 
			p = Polarity_Putter;
 
			ep = EndpointPolarity_Active;
 
		}
 
		printf("@%d\n", addr.port);
 
		connector_add_net_port(c, NULL, addr, p, ep);
 
	}
 
	printf("Added all ports!\n");
 
	connector_connect(c, -1);
 
	printf("Connect OK!\n");
 
	
 
	clock_t begin = clock();
 
	for (i=0; i<150; i++) {
 
		connector_sync(c, -1);
 
	}
 
	clock_t end = clock();
 
	double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
	printf("Time taken: %f\n", time_spent);
 
	return 0;
 
}
 
\ No newline at end of file
examples/bench_21/main.c
Show inline comments
 
new file 100644
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 

	
 

	
 
int main(int argc, char** argv) {
 
	// one of two processes: {leader, follower}
 
	// where a set of `par_msgs` messages are sent leader->follower after
 
	// looping follower->leader->follower `msg_loops` times.
 
	int i, j, cid, msg_loops, par_msgs;
 
	char is_leader;
 
	is_leader = argv[1][0];
 
	msg_loops = atoi(argv[2]);
 
	par_msgs = atoi(argv[3]);
 
	// argv[4..8] encodes peer IP
 
	printf("is_leader %c, msg_loops %d, par_msgs %d\n", is_leader, msg_loops, par_msgs);
 
	cid = is_leader=='y'; // cid := { leader:1, follower:0 }
 

	
 
	unsigned char pdl[] = "";
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
	Connector * c = connector_new_with_id(pd, cid);
 
	PortId native_ports[par_msgs];
 
	FfiSocketAddr peer_addr = {
 
		{
 
			atoi(argv[4]),
 
			atoi(argv[5]),
 
			atoi(argv[6]),
 
			atoi(argv[7])
 
		}, 0/*dummy value*/};
 
	int port = 7000;
 
	
 
	// for each parallel message 
 
	for(i=0; i<par_msgs; i++) {
 
		if(is_leader == 'y') {
 
			peer_addr.port = port++;
 
			connector_add_net_port(
 
				c, &native_ports[i], peer_addr, Polarity_Putter, EndpointPolarity_Active);
 
			printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
		}
 

	
 
		for(j=0; j<msg_loops; j++) {
 
			PortId loop_getter, loop_putter;
 

	
 
			// create {putter, getter} port pair
 
			connector_add_net_port(
 
				c, &loop_getter,
 
				(FfiSocketAddr) {{0,0,0,0}, port++},
 
				Polarity_Getter, EndpointPolarity_Passive);
 
			printf("Error str `%s`\n", reowolf_error_peek(NULL));
 

	
 
			peer_addr.port = port++;
 
			connector_add_net_port(
 
				c, &loop_putter, peer_addr, Polarity_Putter, EndpointPolarity_Active);
 
			printf("Error str `%s`\n", reowolf_error_peek(NULL));
 

	
 
			connector_add_component(c, "forward", 7,
 
				(PortId[]){loop_getter, loop_putter}, 2);
 
			printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
		}
 

	
 
		if(is_leader != 'y') {
 
			connector_add_net_port(
 
				c, &native_ports[i],
 
				(FfiSocketAddr) {{0,0,0,0}, port++},
 
				Polarity_Getter, EndpointPolarity_Passive);
 
			printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
		}
 
	}
 
	printf("Added all ports!\n");
 
	connector_connect(c, -1);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	printf("Connect OK!\n");
 
	
 
	clock_t begin = clock();
 
	char msg[] = "Hello, world!";
 
	for(i=0; i<250; i++) {
 
		if(is_leader == 'y') {
 
			for(j=0; j<par_msgs; j++) {
 
				connector_put_bytes(c, native_ports[j], msg, sizeof(msg)-1);
 
			}
 
		} else {
 
			for(j=0; j<par_msgs; j++) {
 
				connector_get(c, native_ports[j]);
 
			}
 
		}
 
		connector_sync(c, -1);
 
	}
 
	
 
	clock_t end = clock();
 
	double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
	printf("Time taken: %f\n", time_spent);
 
	return 0;
 
}
 
\ No newline at end of file
examples/bench_22/main.c
Show inline comments
 
new file 100644
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 

	
 

	
 
int main(int argc, char** argv) {
 
	// same as bench 21 but with parametric message length
 
	int i, j, cid, msg_loops, par_msgs, msg_len;
 
	char is_leader;
 
	is_leader = argv[1][0];
 
	msg_loops = atoi(argv[2]);
 
	par_msgs = atoi(argv[3]);
 
	msg_len = atoi(argv[8]);
 

	
 
	// argv[4..8] encodes peer IP
 
	printf("is_leader %c, msg_loops %d, par_msgs %d, msg_len %d\n", is_leader, msg_loops, par_msgs, msg_len);
 
	cid = is_leader=='y'; // cid := { leader:1, follower:0 }
 

	
 
	unsigned char pdl[] = "";
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
	Connector * c = connector_new_with_id(pd, cid);
 
	PortId native_ports[par_msgs];
 
	FfiSocketAddr peer_addr = {
 
		{
 
			atoi(argv[4]),
 
			atoi(argv[5]),
 
			atoi(argv[6]),
 
			atoi(argv[7])
 
		}, 0/*dummy value*/};
 
	int port = 7000;
 
	
 
	// for each parallel message 
 
	for(i=0; i<par_msgs; i++) {
 
		if(is_leader == 'y') {
 
			peer_addr.port = port++;
 
			connector_add_net_port(
 
				c, &native_ports[i], peer_addr, Polarity_Putter, EndpointPolarity_Active);
 
			printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
		}
 

	
 
		for(j=0; j<msg_loops; j++) {
 
			PortId loop_getter, loop_putter;
 

	
 
			// create {putter, getter} port pair
 
			connector_add_net_port(
 
				c, &loop_getter,
 
				(FfiSocketAddr) {{0,0,0,0}, port++},
 
				Polarity_Getter, EndpointPolarity_Passive);
 
			printf("Error str `%s`\n", reowolf_error_peek(NULL));
 

	
 
			peer_addr.port = port++;
 
			connector_add_net_port(
 
				c, &loop_putter, peer_addr, Polarity_Putter, EndpointPolarity_Active);
 
			printf("Error str `%s`\n", reowolf_error_peek(NULL));
 

	
 
			connector_add_component(c, "forward", 7,
 
				(PortId[]){loop_getter, loop_putter}, 2);
 
			printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
		}
 

	
 
		if(is_leader != 'y') {
 
			connector_add_net_port(
 
				c, &native_ports[i],
 
				(FfiSocketAddr) {{0,0,0,0}, port++},
 
				Polarity_Getter, EndpointPolarity_Passive);
 
			printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
		}
 
	}
 
	printf("Added all ports!\n");
 
	connector_connect(c, -1);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	printf("Connect OK!\n");
 

	
 
	char * msg = malloc(msg_len);
 
	memset(msg, 42, msg_len);
 
	
 
	clock_t begin = clock();
 
	for(i=0; i<100000; i++) {
 
		if(is_leader == 'y') {
 
			for(j=0; j<par_msgs; j++) {
 
				connector_put_bytes(c, native_ports[j], msg, msg_len);
 
			}
 
		} else {
 
			for(j=0; j<par_msgs; j++) {
 
				connector_get(c, native_ports[j]);
 
			}
 
		}
 
		connector_sync(c, -1);
 
	}
 
	
 
	clock_t end = clock();
 
	double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
	printf("Time taken: %f\n", time_spent);
 
	free(msg);
 
	return 0;
 
}
 
\ No newline at end of file
examples/bench_8/main.c
Show inline comments
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
	int i, forwards;
 
	int i, forwards, msglen;
 
	forwards = atoi(argv[1]);
 
	printf("forwards %d\n", forwards);
 
	msglen = atoi(argv[2]);
 
	printf("forwards %d, msglen %d\n", forwards, msglen);
 
	unsigned char pdl[] = ""; 
 
	Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	char logpath[] = "./bench_8.txt";
 
	Connector * c = connector_new_logging(pd, logpath, sizeof(logpath)-1);
 

	
 
	PortId native_putter, native_getter;
 
	connector_add_port_pair(c, &native_putter, &native_getter);
 
	for (i=0; i<forwards; i++) {
 
		PortId putter, getter;
 
		connector_add_port_pair(c, &putter, &getter);
 
		// native ports: {native_putter, native_getter, putter, getter}
 
		char ident[] = "forward"; // defined in reowolf's stdlib 
 
		// thread a forward component onto native_tail
 
		connector_add_component(c, ident, sizeof(ident)-1, (PortId[]){native_getter, putter}, 2);
 
		// native ports: {native_putter, getter}
 
		printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
		native_getter = getter;
 
	}
 
	connector_connect(c, -1);
 
	printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
	
 
	
 
	size_t msg_len = 0xffff;
 
	char * msg = malloc(msg_len);
 
	memset(msg, 42, msg_len);
 
	char * msg = malloc(msglen);
 
	memset(msg, 42, msglen);
 
	
 
	clock_t begin = clock();
 
	for (i=0; i<1000000; i++) {
 
		connector_put_bytes(c, native_putter, msg, msg_len);
 
	for (i=0; i<100000; i++) {
 
		connector_put_bytes(c, native_putter, msg, msglen);
 
		connector_get(c, native_getter);
 
		connector_sync(c, -1);
 
	}
 
	clock_t end = clock();
 
	double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
	printf("Time taken: %f\n", time_spent);
 
	return 0;
 
}
 
\ No newline at end of file
examples/zoop.sh
Show inline comments
 
deleted file
src/runtime/endpoints.rs
Show inline comments
 
use super::*;
 

	
 
// A wrapper for some Read type, delegating read calls
 
// to the contained Read structure, but snooping on
 
// the number of bytes it reads, to be inspected later.
 
struct MonitoredReader<R: Read> {
 
    bytes: usize,
 
    r: R,
 
}
 

	
 
enum PollAndPopulateError {
 
    PollFailed,
 
    Timeout,
 
}
 

	
 
struct TryRecvAnyNetError {
 
    error: NetEndpointError,
 
    index: usize,
 
}
 
/////////////////////
 
impl NetEndpoint {
 
    // Returns the bincode configuration the NetEndpoint uses pervasively
 
    // for configuration on ser/de operations.
 
    fn bincode_opts() -> impl bincode::config::Options {
 
        // uses variable-length encoding everywhere; great!
 
        bincode::config::DefaultOptions::default()
 
    }
 

	
 
    // Attempt to return some deserializable T-type from
 
    // the inbox or network stream
 
    pub(super) fn try_recv<T: serde::de::DeserializeOwned>(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
    ) -> Result<Option<T>, NetEndpointError> {
 
        use NetEndpointError as Nee;
 
        // populate inbox with bytes as much as possible (mio::TcpStream is nonblocking)
 
        let before_len = self.inbox.len();
 
        'read_loop: loop {
 
            let res = self.stream.read_to_end(&mut self.inbox);
 
            match res {
 
                Err(e) if err_would_block(&e) => break 'read_loop,
 
                Ok(0) => break 'read_loop,
 
                Ok(_) => (),
 
                Err(_e) => return Err(Nee::BrokenNetEndpoint),
 
            }
 
        }
 
        log!(
 
            @ENDPT,
 
            logger,
 
            "Inbox bytes [{:x?}| {:x?}]",
 
            DenseDebugHex(&self.inbox[..before_len]),
 
            DenseDebugHex(&self.inbox[before_len..]),
 
        );
 
        // Try deserialize from the inbox, monitoring how many bytes
 
        // the deserialiation process consumes. In the event of
 
        // success, this makes clear where the message ends
 
        let mut monitored = MonitoredReader::from(&self.inbox[..]);
 
        // Try deserialize from the inbox. `reading_slice' is updated by read()
 
        // in-place to truncate the read part. In the event of success,
 
        // the message bytes are contained in the truncated prefix
 
        let mut reading_slice = self.inbox.as_slice();
 
        let before_len = reading_slice.len();
 
        use bincode::config::Options;
 
        match Self::bincode_opts().deserialize_from(&mut monitored) {
 
        match Self::bincode_opts().deserialize_from(&mut reading_slice) {
 
            Ok(msg) => {
 
                let msg_size = monitored.bytes_read();
 
                let msg_size = before_len - reading_slice.len();
 
                // inbox[..msg_size] was deserialized into one message!
 
                self.inbox.drain(..msg_size);
 
                log!(
 
                    @ENDPT,
 
                    logger,
 
                    "Yielding msg. Inbox len {}-{}=={}: [{:?}]",
 
                    self.inbox.len() + msg_size,
 
                    msg_size,
 
                    self.inbox.len(),
 
                    DenseDebugHex(&self.inbox[..]),
 
                );
 
                Ok(Some(msg))
 
            }
 
            Err(e) => match *e {
 
                bincode::ErrorKind::Io(k) if k.kind() == std::io::ErrorKind::UnexpectedEof => {
 
                    // Contents of inbox insufficient for deserializing a message
 
                    Ok(None)
 
                }
 
                _ => Err(Nee::MalformedMessage),
 
            },
 
        }
 
    }
 

	
 
    // Send the given serializable type into the stream
 
    pub(super) fn send<T: serde::ser::Serialize>(
 
    pub(super) fn send<T: serde::ser::Serialize + Debug>(
 
        &mut self,
 
        msg: &T,
 
        io_byte_buffer: &mut IoByteBuffer,
 
    ) -> Result<(), NetEndpointError> {
 
        use bincode::config::Options;
 
        use NetEndpointError as Nee;
 
        Self::bincode_opts()
 
            .serialize_into(&mut self.stream, msg)
 
        // Create a buffer for our bytes: a slice of the io_byte_buffer
 
        let mut buf_slice = io_byte_buffer.as_mut_slice();
 
        // serialize into the slice, truncating as its filled
 
        Self::bincode_opts().serialize_into(&mut buf_slice, msg).expect("Serialize failed!");
 
        // written segment is the part missing from buf_slice. Write this as one segment to the TCP stream
 
        let wrote = IoByteBuffer::CAPACITY - buf_slice.len();
 
        self.stream
 
            .write_all(&io_byte_buffer.as_mut_slice()[..wrote])
 
            .map_err(|_| Nee::BrokenNetEndpoint)?;
 
        let _ = self.stream.flush();
 
        Ok(())
 
    }
 
}
 

	
 
impl EndpointManager {
 
    pub(super) fn index_iter(&self) -> Range<usize> {
 
        0..self.num_net_endpoints()
 
    }
 
    pub(super) fn num_net_endpoints(&self) -> usize {
 
        self.net_endpoint_store.endpoint_exts.len()
 
    }
 

	
 
    // Setup-phase particular send procedure.
 
    // Used pervasively, allows for some brevity with the ? operator.
 
    pub(super) fn send_to_setup(&mut self, index: usize, msg: &Msg) -> Result<(), ConnectError> {
 
        let net_endpoint = &mut self.net_endpoint_store.endpoint_exts[index].net_endpoint;
 
        net_endpoint.send(msg).map_err(|err| {
 
        net_endpoint.send(msg, &mut self.io_byte_buffer).map_err(|err| {
 
            ConnectError::NetEndpointSetupError(net_endpoint.stream.local_addr().unwrap(), err)
 
        })
 
    }
 

	
 
    // Communication-phase particular send procedure.
 
    // Used pervasively, allows for some brevity with the ? operator.
 
    pub(super) fn send_to_comms(
 
        &mut self,
 
        index: usize,
 
        msg: &Msg,
 
    ) -> Result<(), UnrecoverableSyncError> {
 
        use UnrecoverableSyncError as Use;
 
        let net_endpoint = &mut self.net_endpoint_store.endpoint_exts[index].net_endpoint;
 
        net_endpoint.send(msg).map_err(|_| Use::BrokenNetEndpoint { index })
 
        net_endpoint
 
            .send(msg, &mut self.io_byte_buffer)
 
            .map_err(|_| Use::BrokenNetEndpoint { index })
 
    }
 

	
 
    /// Receive the first message of any kind at all.
 
    /// Why not return SetupMsg? Because often this message will be forwarded to several others,
 
    /// and by returning a Msg, it can be serialized in-place (NetEndpoints allow the sending of Msg types!)
 
    pub(super) fn try_recv_any_setup(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        deadline: &Option<Instant>,
 
    ) -> Result<(usize, Msg), ConnectError> {
 
        // helper function, mapping a TryRecvAnySetup type error
 
        // into a ConnectError
 
        fn map_trane(
 
            trane: TryRecvAnyNetError,
 
            net_endpoint_store: &EndpointStore<NetEndpointExt>,
 
        ) -> ConnectError {
 
            ConnectError::NetEndpointSetupError(
 
                net_endpoint_store.endpoint_exts[trane.index]
 
                    .net_endpoint
 
                    .stream
 
                    .local_addr()
 
                    .unwrap(), // stream must already be connected
 
                trane.error,
 
            )
 
@@ -245,49 +247,49 @@ impl EndpointManager {
 
        // pop undelayed messages, handling them. Return the first CommCtrlMsg popped
 
        while let Some((net_index, msg)) = self.undelayed_messages.pop() {
 
            if let Some((net_index, msg)) =
 
                self.handle_msg(cu, rctx, net_index, msg, round_index, &mut some_message_enqueued)
 
            {
 
                return Ok(CommRecvOk::NewControlMsg { net_index, msg });
 
            }
 
        }
 
        loop {
 
            // drain endpoints of incoming messages (without blocking)
 
            // return first CommCtrlMsg received
 
            while let Some((net_index, msg)) = self.try_recv_undrained_net(cu.logger())? {
 
                if let Some((net_index, msg)) = self.handle_msg(
 
                    cu,
 
                    rctx,
 
                    net_index,
 
                    msg,
 
                    round_index,
 
                    &mut some_message_enqueued,
 
                ) {
 
                    return Ok(CommRecvOk::NewControlMsg { net_index, msg });
 
                }
 
            }
 
            // try receive a udp message
 
            let recv_buffer = self.udp_in_buffer.as_mut_slice();
 
            let recv_buffer = self.io_byte_buffer.as_mut_slice();
 
            while let Some(index) = self.udp_endpoint_store.polled_undrained.pop() {
 
                let ee = &mut self.udp_endpoint_store.endpoint_exts[index];
 
                if let Some(bytes_written) = ee.sock.recv(recv_buffer).ok() {
 
                    // I received a payload!
 
                    self.udp_endpoint_store.polled_undrained.insert(index);
 
                    if !ee.received_this_round {
 
                        let payload = Payload::from(&recv_buffer[..bytes_written]);
 
                        let port_spec_var = rctx.ips.port_info.spec_var_for(ee.getter_for_incoming);
 
                        let predicate = Predicate::singleton(port_spec_var, SpecVal::FIRING);
 
                        rctx.getter_push(
 
                            ee.getter_for_incoming,
 
                            SendPayloadMsg { payload, predicate },
 
                        );
 
                        some_message_enqueued = true;
 
                        ee.received_this_round = true;
 
                    } else {
 
                        // lose the message!
 
                    }
 
                }
 
            }
 
            if some_message_enqueued {
 
                return Ok(CommRecvOk::NewPayloadMsgs);
 
            }
 
            // poll if time remains
 
@@ -417,61 +419,44 @@ impl EndpointManager {
 
                }
 
                ee.received_this_round = false;
 
            }
 
        }
 
        Ok(())
 
    }
 
}
 
impl Debug for NetEndpoint {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        struct DebugStream<'a>(&'a TcpStream);
 
        impl Debug for DebugStream<'_> {
 
            fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
                f.debug_struct("Endpoint")
 
                    .field("local_addr", &self.0.local_addr())
 
                    .field("peer_addr", &self.0.peer_addr())
 
                    .finish()
 
            }
 
        }
 
        f.debug_struct("Endpoint")
 
            .field("inbox", &self.inbox)
 
            .field("stream", &DebugStream(&self.stream))
 
            .finish()
 
    }
 
}
 
impl<R: Read> From<R> for MonitoredReader<R> {
 
    fn from(r: R) -> Self {
 
        Self { r, bytes: 0 }
 
    }
 
}
 
impl<R: Read> MonitoredReader<R> {
 
    pub(super) fn bytes_read(&self) -> usize {
 
        self.bytes
 
    }
 
}
 
impl<R: Read> Read for MonitoredReader<R> {
 
    fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
 
        let n = self.r.read(buf)?;
 
        self.bytes += n;
 
        Ok(n)
 
    }
 
}
 
impl Into<Msg> for SetupMsg {
 
    fn into(self) -> Msg {
 
        Msg::SetupMsg(self)
 
    }
 
}
 
impl From<PollAndPopulateError> for ConnectError {
 
    fn from(pape: PollAndPopulateError) -> ConnectError {
 
        use {ConnectError as Ce, PollAndPopulateError as Pape};
 
        match pape {
 
            Pape::PollFailed => Ce::PollFailed,
 
            Pape::Timeout => Ce::Timeout,
 
        }
 
    }
 
}
 
impl From<TryRecvAnyNetError> for UnrecoverableSyncError {
 
    fn from(trane: TryRecvAnyNetError) -> UnrecoverableSyncError {
 
        let TryRecvAnyNetError { index, .. } = trane;
 
        UnrecoverableSyncError::BrokenNetEndpoint { index }
 
    }
 
}
src/runtime/mod.rs
Show inline comments
 
@@ -240,73 +240,73 @@ struct NetEndpointExt {
 
#[derive(Debug)]
 
struct UdpEndpointExt {
 
    sock: UdpSocket, // already bound and connected
 
    received_this_round: bool,
 
    outgoing_payloads: HashMap<Predicate, Payload>,
 
    getter_for_incoming: PortId,
 
}
 

	
 
// Meta-data for the connector: its role in the consensus tree.
 
#[derive(Debug)]
 
struct Neighborhood {
 
    parent: Option<usize>,
 
    children: VecSet<usize>,
 
}
 

	
 
// Manages the connector's ID, and manages allocations for connector/port IDs.
 
#[derive(Debug, Clone)]
 
struct IdManager {
 
    connector_id: ConnectorId,
 
    port_suffix_stream: U32Stream,
 
    component_suffix_stream: U32Stream,
 
}
 

	
 
// Newtype wrapper around a byte buffer, used for UDP mediators to receive incoming datagrams.
 
struct UdpInBuffer {
 
struct IoByteBuffer {
 
    byte_vec: Vec<u8>,
 
}
 

	
 
// A generator of speculative variables. Created on-demand during the synchronous round
 
// by the IdManager.
 
#[derive(Debug)]
 
struct SpecVarStream {
 
    connector_id: ConnectorId,
 
    port_suffix_stream: U32Stream,
 
}
 

	
 
// Manages the messy state of the various endpoints, pollers, buffers, etc.
 
#[derive(Debug)]
 
struct EndpointManager {
 
    // invariants:
 
    // 1. net and udp endpoints are registered with poll with tokens computed with TargetToken::into
 
    // 2. Events is empty
 
    poll: Poll,
 
    events: Events,
 
    delayed_messages: Vec<(usize, Msg)>,
 
    undelayed_messages: Vec<(usize, Msg)>, // ready to yield
 
    net_endpoint_store: EndpointStore<NetEndpointExt>,
 
    udp_endpoint_store: EndpointStore<UdpEndpointExt>,
 
    udp_in_buffer: UdpInBuffer,
 
    io_byte_buffer: IoByteBuffer,
 
}
 

	
 
// A storage of endpoints, which keeps track of which components have raised
 
// an event during poll(), signifying that they need to be checked for new incoming data
 
#[derive(Debug)]
 
struct EndpointStore<T> {
 
    endpoint_exts: Vec<T>,
 
    polled_undrained: VecSet<usize>,
 
}
 

	
 
// The information associated with a port identifier, designed for local storage.
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
struct PortInfo {
 
    owner: ComponentId,
 
    peer: Option<PortId>,
 
    polarity: Polarity,
 
    route: Route,
 
}
 

	
 
// Similar to `PortInfo`, but designed for communication during the setup procedure.
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
struct MyPortInfo {
 
    polarity: Polarity,
 
    port: PortId,
 
@@ -881,46 +881,46 @@ impl IdParts for SpecVar {
 
    }
 
}
 
impl Debug for SpecVar {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        let (a, b) = self.id_parts();
 
        write!(f, "v{}_{}", a, b)
 
    }
 
}
 
impl SpecVal {
 
    const FIRING: Self = SpecVal(1);
 
    const SILENT: Self = SpecVal(0);
 
    fn is_firing(self) -> bool {
 
        self == Self::FIRING
 
        // all else treated as SILENT
 
    }
 
    fn iter_domain() -> impl Iterator<Item = Self> {
 
        (0..).map(SpecVal)
 
    }
 
}
 
impl Debug for SpecVal {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        self.0.fmt(f)
 
    }
 
}
 
impl Default for UdpInBuffer {
 
impl Default for IoByteBuffer {
 
    fn default() -> Self {
 
        let mut byte_vec = Vec::with_capacity(Self::CAPACITY);
 
        unsafe {
 
            // safe! this vector is guaranteed to have sufficient capacity
 
            byte_vec.set_len(Self::CAPACITY);
 
        }
 
        Self { byte_vec }
 
    }
 
}
 
impl UdpInBuffer {
 
    const CAPACITY: usize = u16::MAX as usize;
 
impl IoByteBuffer {
 
    const CAPACITY: usize = u16::MAX as usize + 1000;
 
    fn as_mut_slice(&mut self) -> &mut [u8] {
 
        self.byte_vec.as_mut_slice()
 
    }
 
}
 

	
 
impl Debug for UdpInBuffer {
 
impl Debug for IoByteBuffer {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        write!(f, "UdpInBuffer")
 
        write!(f, "IoByteBuffer")
 
    }
 
}
src/runtime/setup.rs
Show inline comments
 
@@ -296,48 +296,49 @@ fn setup_endpoints_and_pair_ports(
 
    // The data for a udp endpoint's setup in progress
 
    struct UdpTodo {
 
        // becomes completed once we receive our first writable event
 
        getter_for_incoming: PortId,
 
        sock: UdpSocket,
 
    }
 

	
 
    // Substructure of `NetTodo`, which represents the endpoint itself
 
    enum NetTodoEndpoint {
 
        Accepting(TcpListener),       // awaiting it's peer initiating the connection
 
        PeerInfoRecving(NetEndpoint), // awaiting info about peer port through the channel
 
    }
 

	
 
    ////////////////////////////////////////////
 

	
 
    // Start to construct our return values
 
    // let mut waker_state: Option<Arc<WakerState>> = None;
 
    let mut extra_port_info = ExtraPortInfo::default();
 
    let mut poll = Poll::new().map_err(|_| Ce::PollInitFailed)?;
 
    let mut events =
 
        Events::with_capacity((net_endpoint_setups.len() + udp_endpoint_setups.len()) * 2 + 4);
 
    let [mut net_polled_undrained, udp_polled_undrained] = [VecSet::default(), VecSet::default()];
 
    let mut delayed_messages = vec![];
 
    let mut last_retry_at = Instant::now();
 
    let mut io_byte_buffer = IoByteBuffer::default();
 

	
 
    // Create net/udp todo structures, each already registered with poll
 
    let mut net_todos = net_endpoint_setups
 
        .iter()
 
        .enumerate()
 
        .map(|(index, endpoint_setup)| {
 
            let token = TokenTarget::NetEndpoint { index }.into();
 
            log!(logger, "Net endpoint {} beginning setup with {:?}", index, &endpoint_setup);
 
            let todo_endpoint = if let EndpointPolarity::Active = endpoint_setup.endpoint_polarity {
 
                let mut stream = TcpStream::connect(endpoint_setup.sock_addr)
 
                    .map_err(|_| Ce::TcpInvalidConnect(endpoint_setup.sock_addr))?;
 
                poll.registry().register(&mut stream, token, BOTH).unwrap();
 
                NetTodoEndpoint::PeerInfoRecving(NetEndpoint { stream, inbox: vec![] })
 
            } else {
 
                let mut listener = TcpListener::bind(endpoint_setup.sock_addr)
 
                    .map_err(|_| Ce::BindFailed(endpoint_setup.sock_addr))?;
 
                poll.registry().register(&mut listener, token, BOTH).unwrap();
 
                NetTodoEndpoint::Accepting(listener)
 
            };
 
            Ok(NetTodo {
 
                todo_endpoint,
 
                sent_local_port: false,
 
                recv_peer_port: None,
 
                endpoint_setup: endpoint_setup.clone(),
 
@@ -480,49 +481,49 @@ fn setup_endpoints_and_pair_ports(
 
                        if net_connect_to_retry.contains(&index) {
 
                            // spurious wakeup. already scheduled to retry connect later
 
                            continue;
 
                        }
 
                        if !setup_incomplete.contains(&token_target) {
 
                            // spurious wakeup. this endpoint has already been completed!
 
                            if event.is_readable() {
 
                                net_polled_undrained.insert(index);
 
                            }
 
                            continue;
 
                        }
 
                        let local_info = port_info
 
                            .map
 
                            .get(&net_todo.endpoint_setup.getter_for_incoming)
 
                            .expect("Net Setup's getter port info isn't known"); // unreachable
 
                        if event.is_writable() && !net_todo.sent_local_port {
 
                            // can write and didn't send setup msg yet? Do so!
 
                            let _ = net_endpoint.stream.set_nodelay(true);
 
                            let msg = Msg::SetupMsg(SetupMsg::MyPortInfo(MyPortInfo {
 
                                owner: local_info.owner,
 
                                polarity: local_info.polarity,
 
                                port: net_todo.endpoint_setup.getter_for_incoming,
 
                            }));
 
                            net_endpoint
 
                                .send(&msg)
 
                                .send(&msg, &mut io_byte_buffer)
 
                                .map_err(|e| {
 
                                    Ce::NetEndpointSetupError(
 
                                        net_endpoint.stream.local_addr().unwrap(),
 
                                        e,
 
                                    )
 
                                })
 
                                .unwrap();
 
                            log!(logger, "endpoint[{}] sent msg {:?}", index, &msg);
 
                            net_todo.sent_local_port = true;
 
                        }
 
                        if event.is_readable() && net_todo.recv_peer_port.is_none() {
 
                            // can read and didn't finish recving setup msg yet? Do so!
 
                            let maybe_msg = net_endpoint.try_recv(logger).map_err(|e| {
 
                                Ce::NetEndpointSetupError(
 
                                    net_endpoint.stream.local_addr().unwrap(),
 
                                    e,
 
                                )
 
                            })?;
 
                            if maybe_msg.is_some() && !net_endpoint.inbox.is_empty() {
 
                                net_polled_undrained.insert(index);
 
                            }
 
                            match maybe_msg {
 
                                None => {} // msg deserialization incomplete
 
                                Some(Msg::SetupMsg(SetupMsg::MyPortInfo(peer_info))) => {
 
@@ -601,49 +602,49 @@ fn setup_endpoints_and_pair_ports(
 
            let UdpTodo { mut sock, getter_for_incoming } = udp_todo;
 
            let token = TokenTarget::UdpEndpoint { index }.into();
 
            poll.registry().reregister(&mut sock, token, Interest::READABLE).unwrap();
 
            UdpEndpointExt {
 
                sock,
 
                outgoing_payloads: Default::default(),
 
                received_this_round: false,
 
                getter_for_incoming,
 
            }
 
        })
 
        .collect();
 
    let endpoint_manager = EndpointManager {
 
        poll,
 
        events,
 
        undelayed_messages: delayed_messages, // no longer delayed
 
        delayed_messages: Default::default(),
 
        net_endpoint_store: EndpointStore {
 
            endpoint_exts: net_endpoint_exts,
 
            polled_undrained: net_polled_undrained,
 
        },
 
        udp_endpoint_store: EndpointStore {
 
            endpoint_exts: udp_endpoint_exts,
 
            polled_undrained: udp_polled_undrained,
 
        },
 
        udp_in_buffer: Default::default(),
 
        io_byte_buffer,
 
    };
 
    Ok((endpoint_manager, extra_port_info))
 
}
 

	
 
// Given a fully-formed endpoint manager,
 
// construct the consensus tree with:
 
// 1. decentralized leader election
 
// 2. centralized tree construction
 
fn init_neighborhood(
 
    connector_id: ConnectorId,
 
    logger: &mut dyn Logger,
 
    em: &mut EndpointManager,
 
    deadline: &Option<Instant>,
 
) -> Result<Neighborhood, ConnectError> {
 
    use {ConnectError as Ce, Msg::SetupMsg as S, SetupMsg as Sm};
 

	
 
    // storage structure for the state of a distributed wave
 
    // (for readability)
 
    #[derive(Debug)]
 
    struct WaveState {
 
        parent: Option<usize>,
 
        leader: ConnectorId,
 
    }
 

	
0 comments (0 inline, 0 general)