Changeset - b972060a1f92
[Not reviewed]
0 4 4
Christopher Esterhuyse - 5 years ago 2020-10-09 15:07:15
christopher.esterhuyse@gmail.com
more optimization-stage benchmarking examples
8 files changed with 298 insertions and 35 deletions:
0 comments (0 inline, 0 general)
Cargo.toml
Show inline comments
 
[package]
 
name = "reowolf_rs"
 
version = "0.1.4"
 
authors = [
 
	"Christopher Esterhuyse <esterhuy@cwi.nl, christopher.esterhuyse@gmail.com>",
 
	"Hans-Dieter Hiep <hdh@cwi.nl>"
 
]
 
edition = "2018"
 

	
 
[dependencies]
 
# convenience macros
 
maplit = "1.0.2"
 
derive_more = "0.99.2"
 

	
 
# runtime
 
bincode = "1.3.1"
 
serde = { version = "1.0.114", features = ["derive"] }
 
getrandom = "0.1.14" # tiny crate. used to guess controller-id
 

	
 
# network
 
mio = { version = "0.7.0", package = "mio", features = ["udp", "tcp", "os-poll"] }
 
socket2 = { version = "0.3.12", optional = true }
 

	
 
# protocol
 
backtrace = "0.3"
 
lazy_static = "1.4.0"
 

	
 
# ffi
 

	
 
# socket ffi
 
libc = { version = "^0.2", optional = true }
 
os_socketaddr = { version = "0.1.0", optional = true }
 

	
 
[dev-dependencies]
 
# test-generator = "0.3.0"
 
crossbeam-utils = "0.7.2"
 
lazy_static = "1.4.0"
 

	
 
[lib]
 
crate-type = [
 
	"rlib", # for use as a Rust dependency. 
 
	"cdylib" # for FFI use, typically C.
 
]
 

	
 
[features]
 
default = ["ffi", "session_optimization"]
 
default = ["ffi"]
 
ffi = [] # see src/ffi/mod.rs
 
ffi_pseudo_socket_api = ["ffi", "libc", "os_socketaddr"]# see src/ffi/pseudo_socket_api.rs.
 
endpoint_logging = [] # see src/macros.rs
 
session_optimization = [] # see src/runtime/setup.rs
 
\ No newline at end of file
examples/bench_27/main.c
Show inline comments
 
new file 100644
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
    int i, rounds;
 
    char optimized = argv[1][0];
 
    rounds = atoi(argv[2]);
 
    printf("optimized %c, rounds %d\n", optimized, rounds);
 

	
 
    unsigned char pdl[] = "\
 
    primitive xrouter(in a, out b, out c) {\
 
        while(true) synchronous {\
 
            if(fires(a)) {\
 
                if(fires(b)) put(b, get(a));\
 
                else         put(c, get(a));\
 
            }\
 
        }\
 
    }\
 
    ";
 
    Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    Connector * c = connector_new_with_id(pd, 0);
 
    PortId ports[8];
 
    if(optimized=='y') {
 
        connector_add_port_pair(c, &ports[0], &ports[1]);
 
        connector_add_port_pair(c, &ports[2], &ports[7]); // 3,4,5,6 uninitialized
 
        connector_add_component(c, "sync", 4, ports+1, 2);
 
        printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    } else {
 
        for(i=0; i<4; i++) {
 
            connector_add_port_pair(c, &ports[i*2+0], &ports[i*2+1]);
 
        }
 
        connector_add_component(c, "xrouter", 7, (PortId[]) {ports[1],ports[2],ports[4]}, 3);
 
        printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
        connector_add_component(c, "merger" , 6, (PortId[]) {ports[3],ports[5],ports[6]}, 3);
 
        printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    }
 
    connector_connect(c, -1);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 

	
 
    size_t msg_len = 1000;
 
    char * msg = malloc(msg_len);
 
    memset(msg, 42, msg_len);
 
    
 
    clock_t begin = clock();
 
    for (i=0; i<rounds; i++) {
 
        connector_put_bytes(c, ports[0], msg, msg_len);
 
        connector_get(c, ports[7]);
 
        connector_sync(c, -1);
 
    }
 
    clock_t end = clock();
 
    double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
    printf("Time Spent: %f\n", time_spent);
 

	
 
    free(msg);
 
    return 0;
 
}
 
\ No newline at end of file
examples/bench_28/main.c
Show inline comments
 
new file 100644
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
int main(int argc, char** argv) {
 
    int i, rounds;
 
    char optimized = argv[1][0];
 
    rounds = atoi(argv[2]);
 
    printf("optimized %c, rounds %d\n", optimized, rounds);
 

	
 
    unsigned char pdl[] = "";
 
    Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    Connector * c = connector_new_with_id(pd, 0);
 
    PortId ports[6];
 
    for(i=0; i<3; i++) {
 
        connector_add_port_pair(c, &ports[i*2+0], &ports[i*2+1]);
 
    }
 
    connector_add_component(c, "sync", 4, ports+1, 2);
 
    if(optimized=='y') {
 
        connector_add_component(c, "forward", 7, ports+3, 2);
 
    } else {
 
        connector_add_component(c, "sync",    4, ports+3, 2);
 
    }
 
    connector_connect(c, -1);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 

	
 
    size_t msg_len = 1000;
 
    char * msg = malloc(msg_len);
 
    memset(msg, 42, msg_len);
 
    
 
    clock_t begin = clock();
 
    for (i=0; i<rounds; i++) {
 
        connector_put_bytes(c, ports[0], msg, msg_len);
 
        connector_get(c, ports[5]);
 
        connector_sync(c, -1);
 
    }
 
    clock_t end = clock();
 
    double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
    printf("Time Spent: %f\n", time_spent);
 

	
 
    free(msg);
 
    return 0;
 
}
 
\ No newline at end of file
examples/bench_29/main.c
Show inline comments
 
new file 100644
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
FfiSocketAddr addr_new(const uint8_t ipv4[4], uint16_t port) {
 
    FfiSocketAddr x;
 
    x.port = port;
 
    memcpy(x.ipv4, ipv4, sizeof(uint8_t)*4);
 
    return x;
 
}
 
int main(int argc, char** argv) {
 
    int i, rounds;
 
    char optimized = argv[1][0];
 
    char sender = argv[2][0];
 
    rounds = atoi(argv[3]);
 
    uint8_t ipv4[4] = { atoi(argv[4]), atoi(argv[5]), atoi(argv[6]), atoi(argv[7]) };
 
    size_t msg_len = atoi(argv[8]);
 

	
 
    printf("optimized %c, sender %c, rounds %d, addr %d.%d.%d.%d, msg_len %d\n",
 
        optimized, sender, rounds, ipv4[0], ipv4[1], ipv4[2], ipv4[3], msg_len);
 

	
 
    unsigned char pdl[] = "\
 
    primitive filter(in i, out o) {\
 
        while(true) synchronous() {\
 
            msg m = get(i);\
 
            if(m[0] == 0) put(o, m);\
 
        }\
 
    }\
 
    ";
 
    Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    Connector * c = connector_new_with_id(pd, sender=='y'?1:0);
 
    PortId ports[3]; // orientation: 0->1->2 (subsets may be initialized) sender puts on 0. !sender gets on 2. 
 
    char ident[] = "filter";
 
    FfiSocketAddr addr = addr_new(ipv4, 7000);
 
    if(sender=='y') {
 
        Polarity p = Polarity_Putter;
 
        EndpointPolarity ep = EndpointPolarity_Active;
 
        if(optimized=='y') {
 
            // 3 ports: (native)0-->1(filter)2-->(NETWORK)
 
            connector_add_port_pair(c, &ports[0], &ports[1]);
 
            connector_add_net_port(c, &ports[2], addr, p, ep);
 
            printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
            connector_add_component(c, ident, sizeof(ident)-1, ports+1, 2);
 
            printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
        } else {
 
            // 1 port
 
            connector_add_net_port(c, &ports[0], addr, p, ep);
 
            printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
        }
 
    } else {
 
        Polarity p = Polarity_Getter;
 
        EndpointPolarity ep = EndpointPolarity_Passive;
 
        if(optimized=='y') {
 
            // 1 port
 
            connector_add_net_port(c, &ports[2], addr, p, ep);
 
            printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
        } else {
 
            // 3 ports: (NETWORK)-->0(filter)1-->2(native)
 
            connector_add_net_port(c, &ports[0], addr, p, ep);
 
            printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
            connector_add_port_pair(c, &ports[1], &ports[2]);
 
            connector_add_component(c, ident, sizeof(ident)-1, ports, 2);
 
            printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
        }
 
    }
 
    connector_connect(c, -1);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 

	
 
    char * msg = malloc(msg_len);
 
    memset(msg, 42, msg_len);
 
    
 
    clock_t begin = clock();
 
    for (i=0; i<rounds; i++) {
 
        if(sender=='y') {
 
            msg[0] = (char) i%2;
 
            connector_put_bytes(c, ports[0], msg, msg_len);
 
            // always put
 
        } else {
 
            // no-get option
 
            connector_next_batch(c);
 
            // get option
 
            connector_get(c, ports[2]);
 
        }
 
        connector_sync(c, -1);
 
    }
 
    clock_t end = clock();
 
    double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
    printf("Time Spent: %f\n", time_spent);
 

	
 
    free(msg);
 
    return 0;
 
}
 
\ No newline at end of file
examples/bench_30/main.c
Show inline comments
 
new file 100644
 
#include <time.h>
 
#include "../../reowolf.h"
 
#include "../utility.c"
 
FfiSocketAddr addr_new(const uint8_t ipv4[4], uint16_t port) {
 
    FfiSocketAddr x;
 
    x.port = port;
 
    memcpy(x.ipv4, ipv4, sizeof(uint8_t)*4);
 
    return x;
 
}
 
int main(int argc, char** argv) {
 
    int i, rounds;
 
    char optimized = argv[1][0];
 
    char sender = argv[2][0];
 
    rounds = atoi(argv[3]);
 
    uint8_t ipv4[4] = { atoi(argv[4]), atoi(argv[5]), atoi(argv[6]), atoi(argv[7]) };
 
    size_t msg_len = atoi(argv[8]);
 

	
 
    printf("optimized %c, sender %c, rounds %d, addr %d.%d.%d.%d, msg_len %d\n",
 
        optimized, sender, rounds, ipv4[0], ipv4[1], ipv4[2], ipv4[3], msg_len);
 

	
 
    unsigned char pdl[] = "";
 
    Arc_ProtocolDescription * pd = protocol_description_parse(pdl, sizeof(pdl)-1);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
    Connector * c = connector_new_with_id(pd, sender=='y'?1:0);
 

	
 
    PortId ports[5]; // sender always puts 0, receiver always gets 3, 4
 
    char ident[] = "replicator";
 
    FfiSocketAddr addrs[2] = {
 
        addr_new(ipv4, 7000),
 
        addr_new(ipv4, 7001)
 
    };
 
    if(sender=='y') {
 
        Polarity p = Polarity_Putter;
 
        EndpointPolarity ep = EndpointPolarity_Active;
 
        if(optimized=='y') {
 
            // 1 port: (native)0-->(NETWORK)
 
            connector_add_net_port(c, &ports[0], addrs[0], p, ep);
 
            printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
        } else {
 
            // 4 ports: (native)0-->1(replicator)2-->(NETWORK)
 
            //                                   3-->(NETWORK)
 
            connector_add_port_pair(c, &ports[0], &ports[1]);
 
            connector_add_net_port(c, &ports[2], addrs[0], p, ep);
 
            printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
            connector_add_net_port(c, &ports[3], addrs[1], p, ep);
 
            printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
            connector_add_component(c, ident, sizeof(ident)-1, ports+1, 3);
 
            printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
        }
 
    } else {
 
        Polarity p = Polarity_Getter;
 
        EndpointPolarity ep = EndpointPolarity_Passive;
 
        if(optimized=='y') {
 
            // 5 ports: (NETWORK)-->0(replicator)1-->3(native)
 
            //                                   2-->4
 
            connector_add_net_port(c, &ports[0], addrs[0], p, ep);
 
            printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
            connector_add_port_pair(c, &ports[1], &ports[3]);
 
            connector_add_port_pair(c, &ports[2], &ports[4]);
 
            connector_add_component(c, ident, sizeof(ident)-1, ports, 3);
 
            printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
        } else {
 
            // 2 ports: (NETWORK)-->3(native)
 
            //                   -->4
 
            connector_add_net_port(c, &ports[3], addrs[0], p, ep);
 
            printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
            connector_add_net_port(c, &ports[4], addrs[1], p, ep);
 
            printf("Error str `%s`\n", reowolf_error_peek(NULL));
 
        }
 
    }
 
    connector_connect(c, -1);
 
    printf("Error str `%s`\n", reowolf_error_peek(NULL));
 

	
 
    char * msg = malloc(msg_len);
 
    memset(msg, 42, msg_len);
 
    
 
    clock_t begin = clock();
 
    for (i=0; i<rounds; i++) {
 
        if(sender=='y') {
 
            connector_put_bytes(c, ports[0], msg, msg_len);
 
        } else {
 
            connector_get(c, ports[3]);
 
            connector_get(c, ports[4]);
 
        }
 
        connector_sync(c, -1);
 
    }
 
    clock_t end = clock();
 
    double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;
 
    printf("Time Spent: %f\n", time_spent);
 

	
 
    free(msg);
 
    return 0;
 
}
 
\ No newline at end of file
src/protocol/inputsource.rs
Show inline comments
 
use std::fmt;
 
use std::fs::File;
 
use std::io;
 
use std::path::Path;
 

	
 
use backtrace::Backtrace;
 

	
 
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
 
pub struct InputSource {
 
    filename: String,
 
    input: Vec<u8>,
 
    line: usize,
 
    column: usize,
 
    offset: usize,
 
}
 

	
 
static STD_LIB_PDL: &'static [u8] = b"
 
primitive forward(in i, out o) {
 
    while(true) synchronous() put(o, get(i));
 
}
 
primitive sync(in i, out o) {
 
    while(true) synchronous() if(fires(i)) put(o, get(i));
 
}
 
primitive alternator(in i, out l, out r) {
 
    while(true) {
 
        synchronous() if(fires(i)) put(l, get(i));
 
        synchronous() if(fires(i)) put(r, get(i));
 
    }
 
}
 
primitive replicator(in i, out l, out r) {
 
    while(true) synchronous() if(fires(i)) {
 
        msg m = get(i);
 
        put(l, m);
 
        put(r, m);
 
    while(true) synchronous {
 
        if(fires(i)) {
 
            msg m = get(i);
 
            put(l, m);
 
            put(r, m);
 
        }
 
    }
 
}
 
primitive merger(in l, in r, out o) {
 
    while(true) synchronous {
 
        if(fires(l))      put(o, get(l));
 
        else if(fires(r)) put(o, get(r));
 
    }
 
}";
 

	
 
impl InputSource {
 
    // Constructors
 
    pub fn new<R: io::Read, S: ToString>(filename: S, reader: &mut R) -> io::Result<InputSource> {
 
        let mut vec = STD_LIB_PDL.to_vec();
 
        reader.read_to_end(&mut vec)?;
 
        Ok(InputSource {
 
            filename: filename.to_string(),
 
            input: vec,
 
            line: 1,
 
            column: 1,
 
            offset: 0,
 
        })
 
    }
 
    // Constructor helpers
 
    pub fn from_file(path: &Path) -> io::Result<InputSource> {
 
        let filename = path.file_name();
 
        match filename {
 
            Some(filename) => {
 
                let mut f = File::open(path)?;
 
                InputSource::new(filename.to_string_lossy(), &mut f)
 
            }
 
            None => Err(io::Error::new(io::ErrorKind::NotFound, "Invalid path")),
 
        }
 
    }
 
    pub fn from_string(string: &str) -> io::Result<InputSource> {
 
        let buffer = Box::new(string);
 
        let mut bytes = buffer.as_bytes();
 
        InputSource::new(String::new(), &mut bytes)
 
    }
 
    pub fn from_buffer(buffer: &[u8]) -> io::Result<InputSource> {
 
        InputSource::new(String::new(), &mut Box::new(buffer))
 
    }
 
    // Internal methods
 
    pub fn pos(&self) -> InputPosition {
 
        InputPosition { line: self.line, column: self.column, offset: self.offset }
 
    }
 
    pub fn error<S: ToString>(&self, message: S) -> ParseError {
 
        self.pos().parse_error(message)
 
    }
 
    pub fn is_eof(&self) -> bool {
 
        self.next() == None
 
    }
 
    pub fn next(&self) -> Option<u8> {
 
        if self.offset < self.input.len() {
 
            Some((*self.input)[self.offset])
 
        } else {
 
            None
 
        }
 
    }
 
    pub fn lookahead(&self, pos: usize) -> Option<u8> {
 
        if let Some(x) = usize::checked_add(self.offset, pos) {
 
            if x < self.input.len() {
 
                return Some((*self.input)[x]);
 
            }
 
        }
 
        None
 
    }
 
    pub fn consume(&mut self) {
 
        match self.next() {
 
            Some(x) if x == b'\r' && self.lookahead(1) != Some(b'\n') || x == b'\n' => {
 
                self.line += 1;
 
                self.offset += 1;
 
                self.column = 1;
 
            }
 
            Some(_) => {
 
                self.offset += 1;
 
                self.column += 1;
 
            }
 
            None => {}
 
        }
 
    }
 
}
 

	
 
impl fmt::Display for InputSource {
 
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 
        self.pos().fmt(f)
 
    }
 
}
 

	
 
#[derive(Debug, Clone, Copy, Default, serde::Serialize, serde::Deserialize)]
 
pub struct InputPosition {
 
    line: usize,
 
    column: usize,
 
    offset: usize,
 
}
 

	
 
impl InputPosition {
 
    fn context<'a>(&self, source: &'a InputSource) -> &'a [u8] {
 
        let start = self.offset - (self.column - 1);
 
        let mut end = self.offset;
 
        while end < source.input.len() {
 
            let cur = (*source.input)[end];
 
            if cur == b'\n' || cur == b'\r' {
 
                break;
 
            }
 
            end += 1;
 
        }
 
        &source.input[start..end]
 
    }
 
    fn parse_error<S: ToString>(&self, message: S) -> ParseError {
 
        ParseError { position: *self, message: message.to_string(), backtrace: Backtrace::new() }
 
    }
 
    fn eval_error<S: ToString>(&self, message: S) -> EvalError {
 
        EvalError { position: *self, message: message.to_string(), backtrace: Backtrace::new() }
 
    }
 
}
 

	
 
impl fmt::Display for InputPosition {
 
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 
        write!(f, "{}:{}", self.line, self.column)
 
    }
 
}
 

	
 
pub trait SyntaxElement {
 
    fn position(&self) -> InputPosition;
 
    fn error<S: ToString>(&self, message: S) -> EvalError {
 
        self.position().eval_error(message)
 
    }
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct ParseError {
 
    position: InputPosition,
 
    message: String,
 
    backtrace: Backtrace,
 
}
 

	
 
impl ParseError {
 
    pub fn new<S: ToString>(position: InputPosition, message: S) -> ParseError {
 
        ParseError { position, message: message.to_string(), backtrace: Backtrace::new() }
 
    }
 
    // Diagnostic methods
 
    pub fn write<A: io::Write>(&self, source: &InputSource, writer: &mut A) -> io::Result<()> {
 
        if !source.filename.is_empty() {
 
            writeln!(
 
                writer,
 
                "Parse error at {}:{}: {}",
 
                source.filename, self.position, self.message
 
            )?;
 
        } else {
 
            writeln!(writer, "Parse error at {}: {}", self.position, self.message)?;
 
        }
 
        let line = self.position.context(source);
 
        writeln!(writer, "{}", String::from_utf8_lossy(line))?;
 
        let mut arrow: Vec<u8> = Vec::new();
 
        for pos in 1..self.position.column {
 
            let c = line[pos - 1];
 
            if c == b'\t' {
 
                arrow.push(b'\t')
 
            } else {
 
                arrow.push(b' ')
 
            }
 
        }
 
        arrow.push(b'^');
 
        writeln!(writer, "{}", String::from_utf8_lossy(&arrow))
 
    }
 
    pub fn print(&self, source: &InputSource) {
 
        self.write(source, &mut std::io::stdout()).unwrap()
 
    }
 
    pub fn display<'a>(&'a self, source: &'a InputSource) -> DisplayParseError<'a> {
 
        DisplayParseError::new(self, source)
 
    }
 
}
 

	
 
impl From<ParseError> for io::Error {
 
    fn from(_: ParseError) -> io::Error {
 
        io::Error::new(io::ErrorKind::InvalidInput, "parse error")
 
    }
 
}
 

	
 
#[derive(Clone, Copy)]
 
pub struct DisplayParseError<'a> {
 
    error: &'a ParseError,
 
    source: &'a InputSource,
 
}
 

	
 
impl DisplayParseError<'_> {
 
    fn new<'a>(error: &'a ParseError, source: &'a InputSource) -> DisplayParseError<'a> {
 
        DisplayParseError { error, source }
 
    }
 
}
 

	
 
impl fmt::Display for DisplayParseError<'_> {
 
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 
        let mut vec: Vec<u8> = Vec::new();
 
        match self.error.write(self.source, &mut vec) {
 
            Err(_) => {
 
                return fmt::Result::Err(fmt::Error);
 
            }
 
            Ok(_) => {}
 
        }
 
        write!(f, "{}", String::from_utf8_lossy(&vec))
 
    }
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub struct EvalError {
 
    position: InputPosition,
 
    message: String,
 
    backtrace: Backtrace,
 
}
 

	
 
impl EvalError {
 
    pub fn new<S: ToString>(position: InputPosition, message: S) -> EvalError {
 
        EvalError { position, message: message.to_string(), backtrace: Backtrace::new() }
 
    }
 
    // Diagnostic methods
 
    pub fn write<A: io::Write>(&self, source: &InputSource, writer: &mut A) -> io::Result<()> {
 
        if !source.filename.is_empty() {
 
            writeln!(
 
                writer,
 
                "Evaluation error at {}:{}: {}",
 
                source.filename, self.position, self.message
 
            )?;
 
        } else {
 
            writeln!(writer, "Evaluation error at {}: {}", self.position, self.message)?;
 
        }
 
        let line = self.position.context(source);
 
        writeln!(writer, "{}", String::from_utf8_lossy(line))?;
 
        let mut arrow: Vec<u8> = Vec::new();
 
        for pos in 1..self.position.column {
 
            let c = line[pos - 1];
 
            if c == b'\t' {
 
                arrow.push(b'\t')
 
            } else {
 
                arrow.push(b' ')
 
            }
 
        }
 
        arrow.push(b'^');
 
        writeln!(writer, "{}", String::from_utf8_lossy(&arrow))
 
    }
 
    pub fn print(&self, source: &InputSource) {
 
        self.write(source, &mut std::io::stdout()).unwrap()
 
    }
 
    pub fn display<'a>(&'a self, source: &'a InputSource) -> DisplayEvalError<'a> {
 
        DisplayEvalError::new(self, source)
 
    }
 
}
 

	
 
impl From<EvalError> for io::Error {
 
    fn from(_: EvalError) -> io::Error {
 
        io::Error::new(io::ErrorKind::InvalidInput, "eval error")
 
    }
 
}
 

	
 
#[derive(Clone, Copy)]
 
pub struct DisplayEvalError<'a> {
 
    error: &'a EvalError,
 
    source: &'a InputSource,
 
}
 

	
 
impl DisplayEvalError<'_> {
 
    fn new<'a>(error: &'a EvalError, source: &'a InputSource) -> DisplayEvalError<'a> {
 
        DisplayEvalError { error, source }
 
    }
 
}
 

	
 
impl fmt::Display for DisplayEvalError<'_> {
 
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 
        let mut vec: Vec<u8> = Vec::new();
 
        match self.error.write(self.source, &mut vec) {
 
            Err(_) => {
 
                return fmt::Result::Err(fmt::Error);
 
            }
 
            Ok(_) => {}
 
        }
 
        write!(f, "{}", String::from_utf8_lossy(&vec))
 
    }
 
}
 

	
 
// #[cfg(test)]
 
// mod tests {
 
//     use super::*;
 

	
 
//     #[test]
 
//     fn test_from_string() {
 
//         let mut is = InputSource::from_string("#version 100\n").unwrap();
 
//         assert!(is.input.len() == 13);
 
//         assert!(is.line == 1);
 
//         assert!(is.column == 1);
 
//         assert!(is.offset == 0);
 
//         let ps = is.pos();
 
//         assert!(ps.line == 1);
 
//         assert!(ps.column == 1);
 
//         assert!(ps.offset == 0);
 
//         assert!(is.next() == Some(b'#'));
 
//         is.consume();
 
//         assert!(is.next() == Some(b'v'));
 
//         assert!(is.lookahead(1) == Some(b'e'));
 
//         is.consume();
 
//         assert!(is.next() == Some(b'e'));
 
//         is.consume();
 
//         assert!(is.next() == Some(b'r'));
 
//         is.consume();
 
//         assert!(is.next() == Some(b's'));
 
//         is.consume();
 
//         assert!(is.next() == Some(b'i'));
 
//         is.consume();
 
//         {
 
//             let ps = is.pos();
 
//             assert_eq!(b"#version 100", ps.context(&is));
 
//             let er = is.error("hello world!");
 
//             let mut vec: Vec<u8> = Vec::new();
 
//             er.write(&is, &mut vec).unwrap();
 
//             assert_eq!(
 
//                 "Parse error at 1:7: hello world!\n#version 100\n      ^\n",
 
//                 String::from_utf8_lossy(&vec)
 
//             );
 
//         }
 
//         assert!(is.next() == Some(b'o'));
 
//         is.consume();
 
//         assert!(is.next() == Some(b'n'));
 
//         is.consume();
 
//         assert!(is.input.len() == 13);
 
//         assert!(is.line == 1);
 
//         assert!(is.column == 9);
 
//         assert!(is.offset == 8);
 
//         assert!(is.next() == Some(b' '));
 
//         is.consume();
 
//         assert!(is.next() == Some(b'1'));
 
//         is.consume();
 
//         assert!(is.next() == Some(b'0'));
 
//         is.consume();
 
//         assert!(is.next() == Some(b'0'));
 
//         is.consume();
 
//         assert!(is.input.len() == 13);
 
//         assert!(is.line == 1);
 
//         assert!(is.column == 13);
 
//         assert!(is.offset == 12);
 
//         assert!(is.next() == Some(b'\n'));
 
//         is.consume();
 
//         assert!(is.input.len() == 13);
 
//         assert!(is.line == 2);
 
//         assert!(is.column == 1);
 
//         assert!(is.offset == 13);
 
//         {
 
//             let ps = is.pos();
 
//             assert_eq!(b"", ps.context(&is));
 
//         }
 
//         assert!(is.next() == None);
 
//         is.consume();
 
//         assert!(is.next() == None);
 
//     }
 

	
 
//     #[test]
 
//     fn test_split() {
 
//         let mut is = InputSource::from_string("#version 100\n").unwrap();
 
//         let backup = is.clone();
 
//         assert!(is.next() == Some(b'#'));
 
//         is.consume();
 
//         assert!(is.next() == Some(b'v'));
 
//         is.consume();
 
//         assert!(is.next() == Some(b'e'));
 
//         is.consume();
 
//         is = backup;
 
//         assert!(is.next() == Some(b'#'));
 
//         is.consume();
 
//         assert!(is.next() == Some(b'v'));
 
//         is.consume();
 
//         assert!(is.next() == Some(b'e'));
 
//         is.consume();
 
//     }
 
// }
src/runtime/setup.rs
Show inline comments
 
use crate::common::*;
 
use crate::runtime::*;
 

	
 
impl TokenTarget {
 
    // subdivides the domain of usize into
 
    // [NET_ENDPOINT][UDP_ENDPOINT  ]
 
    // ^0            ^usize::MAX/2   ^usize::MAX
 
    const HALFWAY_INDEX: usize = usize::MAX / 2;
 
    const MAX_INDEX: usize = usize::MAX;
 
}
 
impl From<Token> for TokenTarget {
 
    fn from(Token(index): Token) -> Self {
 
        if let Some(shifted) = index.checked_sub(Self::HALFWAY_INDEX) {
 
            TokenTarget::UdpEndpoint { index: shifted }
 
        } else {
 
            TokenTarget::NetEndpoint { index }
 
        }
 
    }
 
}
 
impl Into<Token> for TokenTarget {
 
    fn into(self) -> Token {
 
        match self {
 
            TokenTarget::UdpEndpoint { index } => Token(index + Self::HALFWAY_INDEX),
 
            TokenTarget::NetEndpoint { index } => Token(index),
 
        }
 
    }
 
}
 
impl Connector {
 
    /// Create a new connector structure with the given protocol description (via Arc to facilitate sharing).
 
    /// The resulting connector will start in the setup phase, and cannot be used for communication until the
 
    /// `connect` procedure completes.
 
    /// # Safety
 
    /// The correctness of the system's underlying distributed algorithms requires that no two
 
    /// connectors have the same ID. If the user does not know the identifiers of other connectors in the
 
    /// system, it is advised to guess it using Connector::random_id (relying on the exceptionally low probability of an error).
 
    /// Sessions with duplicate connector identifiers will not result in any memory unsafety, but cannot be guaranteed
 
    /// to preserve their configured protocols.
 
    /// Fortunately, in most realistic cases, the presence of duplicate connector identifiers will result in an
 
    /// error during `connect`, observed as a peer misbehaving.
 
    pub fn new(
 
        mut logger: Box<dyn Logger>,
 
        proto_description: Arc<ProtocolDescription>,
 
        connector_id: ConnectorId,
 
    ) -> Self {
 
        log!(&mut *logger, "Created with connector_id {:?}", connector_id);
 
        let mut id_manager = IdManager::new(connector_id);
 
        let native_component_id = id_manager.new_component_id();
 
        Self {
 
            unphased: ConnectorUnphased {
 
                proto_description,
 
                proto_components: Default::default(),
 
                logger,
 
                native_component_id,
 
                ips: IdAndPortState { id_manager, port_info: Default::default() },
 
            },
 
            phased: ConnectorPhased::Setup(Box::new(ConnectorSetup {
 
                net_endpoint_setups: Default::default(),
 
                udp_endpoint_setups: Default::default(),
 
            })),
 
        }
 
    }
 

	
 
    /// Conceptually, this returning [p0, g1] is sugar for:
 
    /// 1. create port pair [p0, g0]
 
    /// 2. create port pair [p1, g1]
 
    /// 3. create udp component with interface of moved ports [p1, g0]
 
    /// 4. return [p0, g1]
 
    pub fn new_udp_mediator_component(
 
        &mut self,
 
        local_addr: SocketAddr,
 
        peer_addr: SocketAddr,
 
    ) -> Result<[PortId; 2], WrongStateError> {
 
        let Self { unphased: cu, phased } = self;
 
        match phased {
 
            ConnectorPhased::Communication(..) => Err(WrongStateError),
 
            ConnectorPhased::Setup(setup) => {
 
                let udp_index = setup.udp_endpoint_setups.len();
 
                let udp_cid = cu.ips.id_manager.new_component_id();
 
                // allocates 4 new port identifiers, two for each logical channel,
 
                // one channel per direction (into and out of the component)
 
                let mut npid = || cu.ips.id_manager.new_port_id();
 
                let [nin, nout, uin, uout] = [npid(), npid(), npid(), npid()];
 
                // allocate the native->udp_mediator channel's ports
 
                cu.ips.port_info.map.insert(
 
                    nout,
 
                    PortInfo {
 
                        route: Route::LocalComponent,
 
                        polarity: Putter,
 
                        peer: Some(uin),
 
                        owner: cu.native_component_id,
 
                    },
 
                );
 
                cu.ips.port_info.map.insert(
 
                    uin,
 
                    PortInfo {
 
                        route: Route::UdpEndpoint { index: udp_index },
 
                        polarity: Getter,
 
                        peer: Some(uin),
 
                        owner: udp_cid,
 
                    },
 
                );
 
                // allocate the udp_mediator->native channel's ports
 
                cu.ips.port_info.map.insert(
 
                    uout,
 
                    PortInfo {
 
                        route: Route::UdpEndpoint { index: udp_index },
 
                        polarity: Putter,
 
                        peer: Some(uin),
 
                        owner: udp_cid,
 
                    },
 
                );
 
                cu.ips.port_info.map.insert(
 
                    nin,
 
                    PortInfo {
 
                        route: Route::LocalComponent,
 
                        polarity: Getter,
 
                        peer: Some(uout),
 
                        owner: cu.native_component_id,
 
                    },
 
                );
 
                // allocate the two ports owned by the UdpMediator component
 
                // Remember to setup this UdpEndpoint setup during `connect` later.
 
                setup.udp_endpoint_setups.push(UdpEndpointSetup {
 
                    local_addr,
 
                    peer_addr,
 
                    getter_for_incoming: nin,
 
                });
 

	
 
                // update owned sets
 
                cu.ips
 
                    .port_info
 
                    .owned
 
                    .entry(cu.native_component_id)
 
                    .or_default()
 
                    .extend([nin, nout].iter().copied());
 
                cu.ips.port_info.owned.insert(udp_cid, maplit::hashset! {uin, uout});
 
                // Return the native's output, input port pair
 
                Ok([nout, nin])
 
            }
 
        }
 
    }
 

	
 
    /// Adds a "dangling" port to the connector in the setup phase,
 
    /// to be formed into channel during the connect procedure with the given
 
    /// transport layer information.
 
    pub fn new_net_port(
 
        &mut self,
 
        polarity: Polarity,
 
        sock_addr: SocketAddr,
 
        endpoint_polarity: EndpointPolarity,
 
    ) -> Result<PortId, WrongStateError> {
 
        let Self { unphased: cu, phased } = self;
 
        match phased {
 
            ConnectorPhased::Communication(..) => Err(WrongStateError),
 
            ConnectorPhased::Setup(setup) => {
 
                // allocate a single dangling port with a `None` peer (for now)
 
                let new_pid = cu.ips.id_manager.new_port_id();
 
                cu.ips.port_info.map.insert(
 
                    new_pid,
 
                    PortInfo {
 
                        route: Route::LocalComponent,
 
                        peer: None,
 
                        owner: cu.native_component_id,
 
                        polarity,
 
                    },
 
                );
 
                log!(
 
                    cu.logger,
 
                    "Added net port {:?} with polarity {:?} addr {:?} endpoint_polarity {:?}",
 
                    new_pid,
 
                    polarity,
 
                    &sock_addr,
 
                    endpoint_polarity
 
                );
 
                // Remember to setup this NetEndpoint setup during `connect` later.
 
                setup.net_endpoint_setups.push(NetEndpointSetup {
 
                    sock_addr,
 
                    endpoint_polarity,
 
                    getter_for_incoming: new_pid,
 
                });
 
                // update owned set
 
                cu.ips.port_info.owned.entry(cu.native_component_id).or_default().insert(new_pid);
 
                Ok(new_pid)
 
            }
 
        }
 
    }
 

	
 
    /// Finalizes the connector's setup procedure and forms a distributed system with
 
    /// all other connectors reachable through network channels. This procedure represents
 
    /// a synchronization barrier, and upon successful return, the connector can no longer add new network ports,
 
    /// but is ready to begin the first communication round.
 
    /// Initially, the connector has a singleton set of _batches_, the only element of which is empty.
 
    /// This single element starts off selected. The selected batch is modified with `put` and `get`,
 
    /// and new batches are added and selected with `next_batch`. See `sync` for an explanation of the
 
    /// purpose of these batches.
 
    pub fn connect(&mut self, timeout: Option<Duration>) -> Result<(), ConnectError> {
 
        use ConnectError as Ce;
 
        let Self { unphased: cu, phased } = self;
 
        match &phased {
 
            ConnectorPhased::Communication { .. } => {
 
                log!(cu.logger, "Call to connecting in connected state");
 
                Err(Ce::AlreadyConnected)
 
            }
 
            ConnectorPhased::Setup(setup) => {
 
                // Idea: Clone `self.unphased`, and then pass the replica to
 
                // `connect_inner` to do the work, attempting to create a new connector structure
 
                // in connected state without encountering any errors.
 
                // If anything goes wrong during `connect_inner`, we simply keep the original `cu`.
 

	
 
                // Ideally, we'd simply clone `cu` in its entirety.
 
                // However, it isn't clonable, because of the pesky logger.
 
                // Solution: the original and clone ConnectorUnphased structures
 
                // 'share' the original logger by using `mem::swap` strategically to pass a dummy back and forth,
 
                // such that the real logger is wherever we need it to be without violating any invariants.
 
                let mut cu_clone = ConnectorUnphased {
 
                    logger: Box::new(DummyLogger),
 
                    proto_components: cu.proto_components.clone(),
 
                    native_component_id: cu.native_component_id.clone(),
 
                    ips: cu.ips.clone(),
 
                    proto_description: cu.proto_description.clone(),
 
                };
 
                // cu has REAL logger...
 
                std::mem::swap(&mut cu.logger, &mut cu_clone.logger);
 
                // ... cu_clone has REAL logger.
 
                match Self::connect_inner(cu_clone, setup, timeout) {
 
                    Ok(connected_connector) => {
 
                        *self = connected_connector;
 
                        Ok(())
 
                    }
 
                    Err((err, mut logger)) => {
 
                        // Put the original logger back in place (in self.unphased, AKA `cu`).
 
                        // cu_clone has REAL logger...
 
                        std::mem::swap(&mut cu.logger, &mut logger);
 
                        // ... cu has REAL logger.
 
                        Err(err)
 
                    }
 
                }
 
            }
 
        }
 
    }
 

	
 
    // Given an immutable setup structure, and my own (cloned) ConnetorUnphased,
 
    // attempt to complete the setup procedure and return a new connector in Connected state.
 
    // If anything goes wrong, throw everything in the bin, except for the Logger, which is
 
    // the only structure that sees lasting effects of the failed attempt.
 
    fn connect_inner(
 
        mut cu: ConnectorUnphased,
 
        setup: &ConnectorSetup,
 
        timeout: Option<Duration>,
 
    ) -> Result<Self, (ConnectError, Box<dyn Logger>)> {
 
        log!(cu.logger, "~~~ CONNECT called timeout {:?}", timeout);
 
        let deadline = timeout.map(|to| Instant::now() + to);
 
        // `try_complete` is a helper function, which DOES NOT own `cu`, and returns ConnectError on err.
 
        // This outer function takes its output and wraps it alongside `cu` (which it owns)
 
        // as appropriate for Err(...) and OK(...) cases.
 
        let mut try_complete = || {
 
            // connect all endpoints in parallel; send and receive peer ids through ports
 
            let mut endpoint_manager = setup_endpoints_and_pair_ports(
 
                &mut *cu.logger,
 
                &setup.net_endpoint_setups,
 
                &setup.udp_endpoint_setups,
 
                &mut cu.ips.port_info,
 
                &deadline,
 
            )?;
 
            log!(
 
                cu.logger,
 
                "Successfully connected {} endpoints. info now {:#?} {:#?}",
 
                endpoint_manager.net_endpoint_store.endpoint_exts.len(),
 
                &cu.ips.port_info,
 
                &endpoint_manager,
 
            );
 
            // leader election and tree construction. Learn our role in the consensus tree,
 
            // from learning who are our children/parents (neighbors) in the consensus tree.
 
            let neighborhood = init_neighborhood(
 
                cu.ips.id_manager.connector_id,
 
                &mut *cu.logger,
 
                &mut endpoint_manager,
 
                &deadline,
 
            )?;
 
            log!(cu.logger, "Successfully created neighborhood {:?}", &neighborhood);
 
            // Put it all together with an initial round index of zero.
 
            let mut comm = ConnectorCommunication {
 
                round_index: 0,
 
                endpoint_manager,
 
                neighborhood,
 
                native_batches: vec![Default::default()],
 
                round_result: Ok(None), // no previous round yet
 
            };
 
            if cfg!(feature = "session_optimization") {
 
                // Perform the session optimization procedure, which may modify the
 
                // internals of the connector, rerouting ports, moving around connectors etc.
 
                session_optimize(&mut cu, &mut comm, &deadline)?;
 
            }
 
            log!(cu.logger, "connect() finished. setup phase complete");
 
            Ok(comm)
 
        };
 
        match try_complete() {
 
            Ok(comm) => {
 
                Ok(Self { unphased: cu, phased: ConnectorPhased::Communication(Box::new(comm)) })
 
            }
 
            Err(err) => Err((err, cu.logger)),
 
        }
 
    }
 
}
 

	
 
// Given a set of net_ and udp_ endpoints to setup,
 
// port information to flesh out (by discovering peers through channels)
 
// and a deadline in which to do it,
 
// try to return:
 
// - An EndpointManager, containing all the set up endpoints
 
// - new information about ports acquired through the newly-created channels
 
fn setup_endpoints_and_pair_ports(
 
    logger: &mut dyn Logger,
 
    net_endpoint_setups: &[NetEndpointSetup],
 
    udp_endpoint_setups: &[UdpEndpointSetup],
 
    port_info: &mut PortInfoMap,
 
    deadline: &Option<Instant>,
 
) -> Result<EndpointManager, ConnectError> {
 
    use ConnectError as Ce;
 
    const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE);
 
    const RETRY_PERIOD: Duration = Duration::from_millis(200);
 

	
 
    // The data for a net endpoint's setup in progress
 
    struct NetTodo {
 
        // becomes completed once sent_local_port && recv_peer_port.is_some()
 
        // we send local port if we haven't already and we receive a writable event
 
        // we recv peer port if we haven't already and we receive a readbale event
 
        todo_endpoint: NetTodoEndpoint,
 
        endpoint_setup: NetEndpointSetup,
 
        sent_local_port: bool,          // true <-> I've sent my local port
 
        recv_peer_port: Option<PortId>, // Some(..) <-> I've received my peer's port
 
    }
 

	
 
    // The data for a udp endpoint's setup in progress
 
    struct UdpTodo {
 
        // becomes completed once we receive our first writable event
 
        getter_for_incoming: PortId,
 
        sock: UdpSocket,
 
    }
 

	
 
    // Substructure of `NetTodo`, which represents the endpoint itself
 
    enum NetTodoEndpoint {
 
        Accepting(TcpListener),       // awaiting it's peer initiating the connection
 
        PeerInfoRecving(NetEndpoint), // awaiting info about peer port through the channel
 
    }
 
    ////////////////////////////////////////////
 

	
 
    // Start to construct our return values
 
    let mut poll = Poll::new().map_err(|_| Ce::PollInitFailed)?;
 
    let mut events =
 
        Events::with_capacity((net_endpoint_setups.len() + udp_endpoint_setups.len()) * 2 + 4);
 
    let [mut net_polled_undrained, udp_polled_undrained] = [VecSet::default(), VecSet::default()];
 
    let mut delayed_messages = vec![];
 
    let mut last_retry_at = Instant::now();
 
    let mut io_byte_buffer = IoByteBuffer::default();
 

	
 
    // Create net/udp todo structures, each already registered with poll
 
    let mut net_todos = net_endpoint_setups
 
        .iter()
 
        .enumerate()
 
        .map(|(index, endpoint_setup)| {
 
            let token = TokenTarget::NetEndpoint { index }.into();
 
            log!(logger, "Net endpoint {} beginning setup with {:?}", index, &endpoint_setup);
 
            let todo_endpoint = if let EndpointPolarity::Active = endpoint_setup.endpoint_polarity {
 
                let mut stream = TcpStream::connect(endpoint_setup.sock_addr)
 
                    .map_err(|_| Ce::TcpInvalidConnect(endpoint_setup.sock_addr))?;
 
                poll.registry().register(&mut stream, token, BOTH).unwrap();
 
                NetTodoEndpoint::PeerInfoRecving(NetEndpoint { stream, inbox: vec![] })
 
            } else {
 
                let mut listener = TcpListener::bind(endpoint_setup.sock_addr)
 
                    .map_err(|_| Ce::BindFailed(endpoint_setup.sock_addr))?;
 
                poll.registry().register(&mut listener, token, BOTH).unwrap();
 
                NetTodoEndpoint::Accepting(listener)
 
            };
 
            Ok(NetTodo {
 
                todo_endpoint,
 
                sent_local_port: false,
 
                recv_peer_port: None,
 
                endpoint_setup: endpoint_setup.clone(),
 
            })
 
        })
 
        .collect::<Result<Vec<NetTodo>, ConnectError>>()?;
 
    let udp_todos = udp_endpoint_setups
 
        .iter()
 
        .enumerate()
 
        .map(|(index, endpoint_setup)| {
 
            let mut sock = UdpSocket::bind(endpoint_setup.local_addr)
 
                .map_err(|_| Ce::BindFailed(endpoint_setup.local_addr))?;
 
            sock.connect(endpoint_setup.peer_addr)
 
                .map_err(|_| Ce::UdpConnectFailed(endpoint_setup.peer_addr))?;
 
            poll.registry()
 
                .register(&mut sock, TokenTarget::UdpEndpoint { index }.into(), Interest::WRITABLE)
 
                .unwrap();
 
            Ok(UdpTodo { sock, getter_for_incoming: endpoint_setup.getter_for_incoming })
 
        })
 
        .collect::<Result<Vec<UdpTodo>, ConnectError>>()?;
 

	
 
    // Initially no net connections have failed, and all udp and net endpoint setups are incomplete
 
    let mut net_connect_to_retry: HashSet<usize> = Default::default();
 
    let mut setup_incomplete: HashSet<TokenTarget> = {
 
        let net_todo_targets_iter =
 
            (0..net_todos.len()).map(|index| TokenTarget::NetEndpoint { index });
 
        let udp_todo_targets_iter =
 
            (0..udp_todos.len()).map(|index| TokenTarget::UdpEndpoint { index });
 
        net_todo_targets_iter.chain(udp_todo_targets_iter).collect()
 
    };
 
    // progress by reacting to poll events. continue until every endpoint is set up
 
    while !setup_incomplete.is_empty() {
 
        // recompute the timeout for the poll call
 
        let remaining = match (deadline, net_connect_to_retry.is_empty()) {
 
            (None, true) => None,
 
            (None, false) => Some(RETRY_PERIOD),
 
            (Some(deadline), is_empty) => {
 
                let dur_to_timeout =
 
                    deadline.checked_duration_since(Instant::now()).ok_or(Ce::Timeout)?;
 
                Some(if is_empty { dur_to_timeout } else { dur_to_timeout.min(RETRY_PERIOD) })
 
            }
 
        };
 
        // block until either
 
        // (a) `events` has been populated with 1+ elements
 
        // (b) timeout elapses, or
 
        // (c) RETRY_PERIOD elapses
 
        poll.poll(&mut events, remaining).map_err(|_| Ce::PollFailed)?;
 
        if last_retry_at.elapsed() > RETRY_PERIOD {
 
            // Retry all net connections and reset `last_retry_at`
 
            last_retry_at = Instant::now();
 
            for net_index in net_connect_to_retry.drain() {
 
                // Restart connect procedure for this net endpoint
 
                let net_todo = &mut net_todos[net_index];
 
                log!(
 
                    logger,
 
                    "Restarting connection with endpoint {:?} {:?}",
 
                    net_index,
 
                    net_todo.endpoint_setup.sock_addr
 
                );
 
                match &mut net_todo.todo_endpoint {
 
                    NetTodoEndpoint::PeerInfoRecving(endpoint) => {
 
                        let mut new_stream = TcpStream::connect(net_todo.endpoint_setup.sock_addr)
 
                            .expect("mio::TcpStream connect should not fail!");
 
                        std::mem::swap(&mut endpoint.stream, &mut new_stream);
 
                        let token = TokenTarget::NetEndpoint { index: net_index }.into();
 
                        poll.registry().register(&mut endpoint.stream, token, BOTH).unwrap();
 
                    }
 
                    _ => unreachable!(),
 
                }
 
            }
 
        }
 
        for event in events.iter() {
 
            let token = event.token();
 
            // figure out which endpoint the event belonged to
 
            let token_target = TokenTarget::from(token);
 
            match token_target {
 
                TokenTarget::UdpEndpoint { index } => {
 
                    // UdpEndpoints are easy to complete.
 
                    // Their setup event just has to succeed without error
 
                    if !setup_incomplete.contains(&token_target) {
 
                        // spurious wakeup. this endpoint has already been set up!
 
                        continue;
 
                    }
 
                    let udp_todo: &UdpTodo = &udp_todos[index];
 
                    if event.is_error() {
 
                        return Err(Ce::BindFailed(udp_todo.sock.local_addr().unwrap()));
 
                    }
 
                    setup_incomplete.remove(&token_target);
 
                }
 
                TokenTarget::NetEndpoint { index } => {
 
                    // NetEndpoints are complex to complete,
 
                    // they must accept/connect to their peer,
 
                    // and then exchange port info successfully
 
                    let net_todo = &mut net_todos[index];
 
                    if let NetTodoEndpoint::Accepting(listener) = &mut net_todo.todo_endpoint {
 
                        // Passive endpoint that will first try accept the peer's connection
 
                        match listener.accept() {
 
                            Err(e) if err_would_block(&e) => continue, // spurious wakeup
 
                            Err(_) => {
 
                                log!(logger, "accept() failure on index {}", index);
 
                                return Err(Ce::AcceptFailed(listener.local_addr().unwrap()));
 
                            }
 
                            Ok((mut stream, peer_addr)) => {
 
                                // successfully accepted the active peer
 
                                // reusing the token, but now for the stream and not the listener
 
                                poll.registry().deregister(listener).unwrap();
 
                                poll.registry().register(&mut stream, token, BOTH).unwrap();
 
                                log!(
 
                                    logger,
 
                                    "Endpoint[{}] accepted a connection from {:?}",
 
                                    index,
 
                                    peer_addr
 
                                );
 
                                let net_endpoint = NetEndpoint { stream, inbox: vec![] };
 
                                net_todo.todo_endpoint =
 
                                    NetTodoEndpoint::PeerInfoRecving(net_endpoint);
 
                            }
 
                        }
 
                    }
 
                    // OK now let's try and finish exchanging port info
 
                    if let NetTodoEndpoint::PeerInfoRecving(net_endpoint) =
 
                        &mut net_todo.todo_endpoint
 
                    {
 
                        if event.is_error() {
 
                            // event signals some error! :(
 
                            if net_todo.endpoint_setup.endpoint_polarity
 
                                == EndpointPolarity::Passive
 
                            {
 
                                // breaking as the acceptor is currently unrecoverable
 
                                return Err(Ce::AcceptFailed(
 
                                    net_endpoint.stream.local_addr().unwrap(),
 
                                ));
 
                            }
 
                            // this actively-connecting endpoint failed to connect!
 
                            // We will schedule it for a retry
 
                            net_connect_to_retry.insert(index);
 
                            continue;
 
                        }
 
                        // event wasn't ERROR
 
                        if net_connect_to_retry.contains(&index) {
 
                            // spurious wakeup. already scheduled to retry connect later
 
                            continue;
 
                        }
 
                        if !setup_incomplete.contains(&token_target) {
 
                            // spurious wakeup. this endpoint has already been completed!
 
                            if event.is_readable() {
 
                                net_polled_undrained.insert(index);
 
                            }
 
                            continue;
 
                        }
 
                        let local_info = port_info
 
                            .map
 
                            .get(&net_todo.endpoint_setup.getter_for_incoming)
 
                            .expect("Net Setup's getter port info isn't known"); // unreachable
 
                        if event.is_writable() && !net_todo.sent_local_port {
 
                            // can write and didn't send setup msg yet? Do so!
 
                            let _ = net_endpoint.stream.set_nodelay(true);
 
                            let msg = Msg::SetupMsg(SetupMsg::MyPortInfo(MyPortInfo {
 
                                owner: local_info.owner,
 
                                polarity: local_info.polarity,
 
                                port: net_todo.endpoint_setup.getter_for_incoming,
 
                            }));
 
                            net_endpoint
 
                                .send(&msg, &mut io_byte_buffer)
 
                                .map_err(|e| {
 
                                    Ce::NetEndpointSetupError(
 
                                        net_endpoint.stream.local_addr().unwrap(),
 
                                        e,
 
                                    )
 
                                })
 
                                .unwrap();
 
                            log!(logger, "endpoint[{}] sent msg {:?}", index, &msg);
 
                            net_todo.sent_local_port = true;
 
                        }
 
                        if event.is_readable() && net_todo.recv_peer_port.is_none() {
 
                            // can read and didn't finish recving setup msg yet? Do so!
 
                            let maybe_msg = net_endpoint.try_recv(logger).map_err(|e| {
 
                                Ce::NetEndpointSetupError(
 
                                    net_endpoint.stream.local_addr().unwrap(),
 
                                    e,
 
                                )
 
                            })?;
 
                            if maybe_msg.is_some() && !net_endpoint.inbox.is_empty() {
 
                                net_polled_undrained.insert(index);
 
                            }
 
                            match maybe_msg {
 
                                None => {} // msg deserialization incomplete
 
                                Some(Msg::SetupMsg(SetupMsg::MyPortInfo(peer_info))) => {
 
                                    log!(
 
                                        logger,
 
                                        "endpoint[{}] got peer info {:?}",
 
                                        index,
 
                                        peer_info
 
                                    );
 
                                    if peer_info.polarity == local_info.polarity {
 
                                        return Err(ConnectError::PortPeerPolarityMismatch(
 
                                            net_todo.endpoint_setup.getter_for_incoming,
 
                                        ));
 
                                    }
 
                                    net_todo.recv_peer_port = Some(peer_info.port);
 
                                    // finally learned the peer of this port!
 
                                    port_info
 
                                        .map
 
                                        .get_mut(&net_todo.endpoint_setup.getter_for_incoming)
 
                                        .unwrap()
 
                                        .peer = Some(peer_info.port);
 
                                    // learned the info of this peer port
 
                                    port_info.map.entry(peer_info.port).or_insert({
 
                                        port_info
 
                                            .owned
 
                                            .entry(peer_info.owner)
 
                                            .or_default()
 
                                            .insert(peer_info.port);
 
                                        PortInfo {
 
                                            peer: Some(net_todo.endpoint_setup.getter_for_incoming),
 
                                            polarity: peer_info.polarity,
 
                                            owner: peer_info.owner,
 
                                            route: Route::NetEndpoint { index },
 
                                        }
 
                                    });
 
                                }
 
                                Some(inappropriate_msg) => {
 
                                    log!(
 
                                        logger,
 
                                        "delaying msg {:?} during channel setup phase",
 
                                        inappropriate_msg
 
                                    );
 
                                    delayed_messages.push((index, inappropriate_msg));
 
                                }
 
                            }
 
                        }
 
                        // is the setup for this net_endpoint now complete?
 
                        if net_todo.sent_local_port && net_todo.recv_peer_port.is_some() {
 
                            // yes! connected, sent my info and received peer's info
 
                            setup_incomplete.remove(&token_target);
 
                            log!(logger, "endpoint[{}] is finished!", index);
 
                        }
 
                    }
 
                }
 
            }
 
        }
 
        events.clear();
 
    }
 
    log!(logger, "Endpoint setup complete! Cleaning up and building structures");
 
    let net_endpoint_exts = net_todos
 
        .into_iter()
 
        .enumerate()
 
        .map(|(index, NetTodo { todo_endpoint, endpoint_setup, .. })| NetEndpointExt {
 
            net_endpoint: match todo_endpoint {
 
                NetTodoEndpoint::PeerInfoRecving(mut net_endpoint) => {
 
                    let token = TokenTarget::NetEndpoint { index }.into();
 
                    poll.registry()
 
                        .reregister(&mut net_endpoint.stream, token, Interest::READABLE)
 
                        .unwrap();
 
                    net_endpoint
 
                }
 
                _ => unreachable!(),
 
            },
 
            getter_for_incoming: endpoint_setup.getter_for_incoming,
 
        })
 
        .collect();
 
    let udp_endpoint_exts = udp_todos
 
        .into_iter()
 
@@ -649,423 +652,423 @@ fn setup_endpoints_and_pair_ports(
 
        .collect();
 
    let endpoint_manager = EndpointManager {
 
        poll,
 
        events,
 
        undelayed_messages: delayed_messages, // no longer delayed
 
        delayed_messages: Default::default(),
 
        net_endpoint_store: EndpointStore {
 
            endpoint_exts: net_endpoint_exts,
 
            polled_undrained: net_polled_undrained,
 
        },
 
        udp_endpoint_store: EndpointStore {
 
            endpoint_exts: udp_endpoint_exts,
 
            polled_undrained: udp_polled_undrained,
 
        },
 
        io_byte_buffer,
 
    };
 
    Ok(endpoint_manager)
 
}
 

	
 
// Given a fully-formed endpoint manager,
 
// construct the consensus tree with:
 
// 1. decentralized leader election
 
// 2. centralized tree construction
 
fn init_neighborhood(
 
    connector_id: ConnectorId,
 
    logger: &mut dyn Logger,
 
    em: &mut EndpointManager,
 
    deadline: &Option<Instant>,
 
) -> Result<Neighborhood, ConnectError> {
 
    use {ConnectError as Ce, Msg::SetupMsg as S, SetupMsg as Sm};
 

	
 
    // storage structure for the state of a distributed wave
 
    // (for readability)
 
    #[derive(Debug)]
 
    struct WaveState {
 
        parent: Option<usize>,
 
        leader: ConnectorId,
 
    }
 

	
 
    // kick off a leader-election wave rooted at myself
 
    // given the desired wave information
 
    // (e.g. don't inform my parent if they exist)
 
    fn do_wave(
 
        em: &mut EndpointManager,
 
        awaiting: &mut HashSet<usize>,
 
        ws: &WaveState,
 
    ) -> Result<(), ConnectError> {
 
        awaiting.clear();
 
        let msg = S(Sm::LeaderWave { wave_leader: ws.leader });
 
        for index in em.index_iter() {
 
            if Some(index) != ws.parent {
 
                em.send_to_setup(index, &msg)?;
 
                awaiting.insert(index);
 
            }
 
        }
 
        Ok(())
 
    }
 
    ///////////////////////
 
    /*
 
    Conceptually, we have two distinct disstributed algorithms back-to-back
 
    1. Leader election using echo algorithm with extinction.
 
        - Each connector initiates a wave tagged with their ID
 
        - Connectors participate in waves of GREATER ID, abandoning previous waves
 
        - Only the wave of the connector with GREATEST ID completes, whereupon they are the leader
 
    2. Tree construction
 
        - The leader broadcasts their leadership with msg A
 
        - Upon receiving their first announcement, connectors reply B, and send A to all peers
 
        - A controller exits once they have received A or B from each neighbor
 

	
 
    The actual implementation is muddier, because non-leaders aren't aware of termiantion of algorithm 1,
 
    so they rely on receipt of the leader's announcement to realize that algorithm 2 has begun.
 

	
 
    NOTE the distinction between PARENT and LEADER
 
    */
 
    log!(logger, "beginning neighborhood construction");
 
    if em.num_net_endpoints() == 0 {
 
        log!(logger, "Edge case of no neighbors! No parent an no children!");
 
        return Ok(Neighborhood { parent: None, children: VecSet::new(vec![]) });
 
    }
 
    log!(logger, "Have {} endpoints. Must participate in distributed alg.", em.num_net_endpoints());
 
    let mut awaiting = HashSet::with_capacity(em.num_net_endpoints());
 
    // 1+ neighbors. Leader can only be learned by receiving messages
 
    // loop ends when I know my sink tree parent (implies leader was elected)
 
    let election_result: WaveState = {
 
        // initially: No parent, I'm the best leader.
 
        let mut best_wave = WaveState { parent: None, leader: connector_id };
 
        // start a wave for this initial state
 
        do_wave(em, &mut awaiting, &best_wave)?;
 
        // with 1+ neighbors, progress is only made in response to incoming messages
 
        em.undelay_all();
 
        'election: loop {
 
            log!(logger, "Election loop. awaiting {:?}...", awaiting.iter());
 
            let (recv_index, msg) = em.try_recv_any_setup(logger, deadline)?;
 
            log!(logger, "Received from index {:?} msg {:?}", &recv_index, &msg);
 
            match msg {
 
                S(Sm::LeaderAnnounce { tree_leader }) => {
 
                    // A neighbor explicitly tells me who is the leader
 
                    // they become my parent, and I adopt their announced leader
 
                    let election_result =
 
                        WaveState { leader: tree_leader, parent: Some(recv_index) };
 
                    log!(logger, "Election lost! Result {:?}", &election_result);
 
                    assert!(election_result.leader >= best_wave.leader);
 
                    assert_ne!(election_result.leader, connector_id);
 
                    break 'election election_result;
 
                }
 
                S(Sm::LeaderWave { wave_leader }) => {
 
                    use Ordering as O;
 
                    match wave_leader.cmp(&best_wave.leader) {
 
                        O::Less => log!(
 
                            logger,
 
                            "Ignoring wave with Id {:?}<{:?}",
 
                            wave_leader,
 
                            best_wave.leader
 
                        ),
 
                        O::Greater => {
 
                            log!(
 
                                logger,
 
                                "Joining wave with Id {:?}>{:?}",
 
                                wave_leader,
 
                                best_wave.leader
 
                            );
 
                            best_wave = WaveState { leader: wave_leader, parent: Some(recv_index) };
 
                            log!(logger, "New wave state {:?}", &best_wave);
 
                            do_wave(em, &mut awaiting, &best_wave)?;
 
                            if awaiting.is_empty() {
 
                                log!(logger, "Special case! Only neighbor is parent. Replying to {:?} msg {:?}", recv_index, &msg);
 
                                em.send_to_setup(recv_index, &msg)?;
 
                            }
 
                        }
 
                        O::Equal => {
 
                            assert!(awaiting.remove(&recv_index));
 
                            log!(
 
                                logger,
 
                                "Wave reply from index {:?} for leader {:?}. Now awaiting {} replies",
 
                                recv_index,
 
                                best_wave.leader,
 
                                awaiting.len()
 
                            );
 
                            if awaiting.is_empty() {
 
                                if let Some(parent) = best_wave.parent {
 
                                    log!(
 
                                        logger,
 
                                        "Sub-wave done! replying to parent {:?} msg {:?}",
 
                                        parent,
 
                                        &msg
 
                                    );
 
                                    em.send_to_setup(parent, &msg)?;
 
                                } else {
 
                                    let election_result: WaveState = best_wave;
 
                                    log!(logger, "Election won! Result {:?}", &election_result);
 
                                    break 'election election_result;
 
                                }
 
                            }
 
                        }
 
                    }
 
                }
 
                msg @ S(Sm::YouAreMyParent) | msg @ S(Sm::MyPortInfo(_)) => {
 
                    log!(logger, "Endpont {:?} sent unexpected msg! {:?}", recv_index, &msg);
 
                    return Err(Ce::SetupAlgMisbehavior);
 
                }
 
                msg @ S(Sm::SessionScatter { .. })
 
                | msg @ S(Sm::SessionGather { .. })
 
                | msg @ Msg::CommMsg { .. } => {
 
                    log!(logger, "delaying msg {:?} during election algorithm", msg);
 
                    em.delayed_messages.push((recv_index, msg));
 
                }
 
            }
 
        }
 
    };
 

	
 
    // starting algorithm 2. Send a message to every neighbor
 
    // namely, send "YouAreMyParent" to parent (if they exist),
 
    // and LeaderAnnounce to everyone else
 
    log!(logger, "Starting tree construction. Step 1: send one msg per neighbor");
 
    awaiting.clear();
 
    for index in em.index_iter() {
 
        if Some(index) == election_result.parent {
 
            em.send_to_setup(index, &S(Sm::YouAreMyParent))?;
 
        } else {
 
            awaiting.insert(index);
 
            em.send_to_setup(
 
                index,
 
                &S(Sm::LeaderAnnounce { tree_leader: election_result.leader }),
 
            )?;
 
        }
 
    }
 
    // Receive one message from each neighbor to learn
 
    // whether they consider me their parent or not.
 
    let mut children = vec![];
 
    em.undelay_all();
 
    while !awaiting.is_empty() {
 
        log!(logger, "Tree construction_loop loop. awaiting {:?}...", awaiting.iter());
 
        let (recv_index, msg) = em.try_recv_any_setup(logger, deadline)?;
 
        log!(logger, "Received from index {:?} msg {:?}", &recv_index, &msg);
 
        match msg {
 
            S(Sm::LeaderAnnounce { .. }) => {
 
                // `recv_index` is not my child
 
                log!(
 
                    logger,
 
                    "Got reply from non-child index {:?}. Children: {:?}",
 
                    recv_index,
 
                    children.iter()
 
                );
 
                if !awaiting.remove(&recv_index) {
 
                    return Err(Ce::SetupAlgMisbehavior);
 
                }
 
            }
 
            S(Sm::YouAreMyParent) => {
 
                if !awaiting.remove(&recv_index) {
 
                    log!(
 
                        logger,
 
                        "Got reply from child index {:?}. Children before... {:?}",
 
                        recv_index,
 
                        children.iter()
 
                    );
 
                    return Err(Ce::SetupAlgMisbehavior);
 
                }
 
                // `recv_index` is my child
 
                children.push(recv_index);
 
            }
 
            msg @ S(Sm::MyPortInfo(_)) | msg @ S(Sm::LeaderWave { .. }) => {
 
                log!(logger, "discarding old message {:?} during election", msg);
 
            }
 
            msg @ S(Sm::SessionScatter { .. })
 
            | msg @ S(Sm::SessionGather { .. })
 
            | msg @ Msg::CommMsg { .. } => {
 
                log!(logger, "delaying msg {:?} during election", msg);
 
                em.delayed_messages.push((recv_index, msg));
 
            }
 
        }
 
    }
 
    // Neighborhood complete!
 
    children.shrink_to_fit();
 
    let neighborhood =
 
        Neighborhood { parent: election_result.parent, children: VecSet::new(children) };
 
    log!(logger, "Neighborhood constructed {:?}", &neighborhood);
 
    Ok(neighborhood)
 
}
 

	
 
// Connectors collect a map of type ConnectorId=>SessionInfo,
 
// representing a global view of the session's state at the leader.
 
// The leader rewrites its contents however they like (currently: nothing happens)
 
// and the map is again broadcasted, for each peer to make their local changes to
 
// reflect the results of the rewrite.
 
fn session_optimize(
 
    cu: &mut ConnectorUnphased,
 
    comm: &mut ConnectorCommunication,
 
    deadline: &Option<Instant>,
 
) -> Result<(), ConnectError> {
 
    use {ConnectError as Ce, Msg::SetupMsg as S, SetupMsg as Sm};
 
    log!(cu.logger, "Beginning session optimization");
 
    // populate session_info_map from a message per child
 
    let mut unoptimized_map: HashMap<ConnectorId, SessionInfo> = Default::default();
 
    let mut awaiting: HashSet<usize> = comm.neighborhood.children.iter().copied().collect();
 
    comm.endpoint_manager.undelay_all();
 
    while !awaiting.is_empty() {
 
        log!(
 
            cu.logger,
 
            "Session gather loop. awaiting info from children {:?}...",
 
            awaiting.iter()
 
        );
 
        let (recv_index, msg) =
 
            comm.endpoint_manager.try_recv_any_setup(&mut *cu.logger, deadline)?;
 
        log!(cu.logger, "Received from index {:?} msg {:?}", &recv_index, &msg);
 
        match msg {
 
            S(Sm::SessionGather { unoptimized_map: child_unoptimized_map }) => {
 
                if !awaiting.remove(&recv_index) {
 
                    log!(
 
                        cu.logger,
 
                        "Wasn't expecting session info from {:?}. Got {:?}",
 
                        recv_index,
 
                        &child_unoptimized_map
 
                    );
 
                    return Err(Ce::SetupAlgMisbehavior);
 
                }
 
                unoptimized_map.extend(child_unoptimized_map.into_iter());
 
            }
 
            msg @ S(Sm::YouAreMyParent)
 
            | msg @ S(Sm::MyPortInfo(..))
 
            | msg @ S(Sm::LeaderAnnounce { .. })
 
            | msg @ S(Sm::LeaderWave { .. }) => {
 
                log!(cu.logger, "discarding old message {:?} during election", msg);
 
            }
 
            msg @ S(Sm::SessionScatter { .. }) => {
 
                log!(
 
                    cu.logger,
 
                    "Endpoint {:?} sent unexpected scatter! {:?} I've not contributed yet!",
 
                    recv_index,
 
                    &msg
 
                );
 
                return Err(Ce::SetupAlgMisbehavior);
 
            }
 
            msg @ Msg::CommMsg(..) => {
 
                log!(cu.logger, "delaying msg {:?} during session optimization", msg);
 
                comm.endpoint_manager.delayed_messages.push((recv_index, msg));
 
            }
 
        }
 
    }
 
    log!(
 
        cu.logger,
 
        "Gathered all children's maps. ConnectorId set is... {:?}",
 
        unoptimized_map.keys()
 
    );
 
    // add my own session info to the map
 
    let my_session_info = SessionInfo {
 
        port_info: cu.ips.port_info.clone(),
 
        proto_components: cu.proto_components.clone(),
 
        serde_proto_description: SerdeProtocolDescription(cu.proto_description.clone()),
 
        endpoint_incoming_to_getter: comm
 
            .endpoint_manager
 
            .net_endpoint_store
 
            .endpoint_exts
 
            .iter()
 
            .map(|ee| ee.getter_for_incoming)
 
            .collect(),
 
    };
 
    unoptimized_map.insert(cu.ips.id_manager.connector_id, my_session_info);
 
    log!(cu.logger, "Inserting my own info. Unoptimized subtree map is {:?}", &unoptimized_map);
 
    // acquire the optimized info...
 
    let optimized_map = if let Some(parent) = comm.neighborhood.parent {
 
        // ... as a message from my parent
 
        log!(cu.logger, "Forwarding gathered info to parent {:?}", parent);
 
        let msg = S(Sm::SessionGather { unoptimized_map });
 
        comm.endpoint_manager.send_to_setup(parent, &msg)?;
 
        'scatter_loop: loop {
 
            log!(
 
                cu.logger,
 
                "Session scatter recv loop. awaiting info from children {:?}...",
 
                awaiting.iter()
 
            );
 
            let (recv_index, msg) =
 
                comm.endpoint_manager.try_recv_any_setup(&mut *cu.logger, deadline)?;
 
            log!(cu.logger, "Received from index {:?} msg {:?}", &recv_index, &msg);
 
            match msg {
 
                S(Sm::SessionScatter { optimized_map }) => {
 
                    if recv_index != parent {
 
                        log!(cu.logger, "I expected the scatter from my parent only!");
 
                        return Err(Ce::SetupAlgMisbehavior);
 
                    }
 
                    break 'scatter_loop optimized_map;
 
                }
 
                msg @ Msg::CommMsg { .. } => {
 
                    log!(cu.logger, "delaying msg {:?} during scatter recv", msg);
 
                    comm.endpoint_manager.delayed_messages.push((recv_index, msg));
 
                }
 
                msg @ S(Sm::SessionGather { .. })
 
                | msg @ S(Sm::YouAreMyParent)
 
                | msg @ S(Sm::MyPortInfo(..))
 
                | msg @ S(Sm::LeaderAnnounce { .. })
 
                | msg @ S(Sm::LeaderWave { .. }) => {
 
                    log!(cu.logger, "discarding old message {:?} during election", msg);
 
                }
 
            }
 
        }
 
    } else {
 
        // by computing it myself
 
        log!(cu.logger, "I am the leader! I will optimize this session");
 
        leader_session_map_optimize(&mut *cu.logger, unoptimized_map)?
 
    };
 
    log!(
 
        cu.logger,
 
        "Optimized info map is {:?}. Sending to children {:?}",
 
        &optimized_map,
 
        comm.neighborhood.children.iter()
 
    );
 
    log!(cu.logger, "All session info dumped!: {:#?}", &optimized_map);
 
    // extract my own ConnectorId's entry
 
    let optimized_info =
 
        optimized_map.get(&cu.ips.id_manager.connector_id).expect("HEY NO INFO FOR ME?").clone();
 
    // broadcast the optimized session info to my children
 
    let msg = S(Sm::SessionScatter { optimized_map });
 
    for &child in comm.neighborhood.children.iter() {
 
        comm.endpoint_manager.send_to_setup(child, &msg)?;
 
    }
 
    // apply local optimizations
 
    apply_my_optimizations(cu, comm, optimized_info)?;
 
    log!(cu.logger, "Session optimizations applied");
 
    Ok(())
 
}
 

	
 
// Defines the optimization function, consuming an optimized map,
 
// and returning an optimized map.
 
fn leader_session_map_optimize(
 
    logger: &mut dyn Logger,
 
    unoptimized_map: HashMap<ConnectorId, SessionInfo>,
 
    mut m: HashMap<ConnectorId, SessionInfo>,
 
) -> Result<HashMap<ConnectorId, SessionInfo>, ConnectError> {
 
    log!(logger, "Session map optimize START");
 
    // currently, it's the identity function
 
    log!(logger, "Session map optimize END");
 
    Ok(unoptimized_map)
 
    Ok(m)
 
}
 

	
 
// Modify the given connector's internals to reflect
 
// the given session info
 
fn apply_my_optimizations(
 
    cu: &mut ConnectorUnphased,
 
    comm: &mut ConnectorCommunication,
 
    session_info: SessionInfo,
 
) -> Result<(), ConnectError> {
 
    let SessionInfo {
 
        proto_components,
 
        port_info,
 
        serde_proto_description,
 
        endpoint_incoming_to_getter,
 
    } = session_info;
 
    // simply overwrite the contents
 
    // println!("BEFORE: {:#?}\n{:#?}", cu, comm);
 
    println!("BEFORE: {:#?}\n{:#?}", cu, comm);
 
    cu.ips.port_info = port_info;
 
    assert!(cu.ips.port_info.invariant_preserved());
 
    cu.proto_components = proto_components;
 
    cu.proto_description = serde_proto_description.0;
 
    for (ee, getter) in comm
 
        .endpoint_manager
 
        .net_endpoint_store
 
        .endpoint_exts
 
        .iter_mut()
 
        .zip(endpoint_incoming_to_getter)
 
    {
 
        ee.getter_for_incoming = getter;
 
    }
 
    // println!("AFTER: {:#?}\n{:#?}", cu, comm);
 
    Ok(())
 
}
src/runtime/tests.rs
Show inline comments
 
@@ -928,411 +928,384 @@ fn many_rounds_mem() {
 
#[test]
 
fn pdl_reo_lossy() {
 
    let pdl = b"
 
    primitive lossy(in a, out b) {
 
        while(true) synchronous {
 
            msg m = null;
 
            if(fires(a)) {
 
                m = get(a);
 
                if(fires(b)) {
 
                    put(b, m);
 
                }
 
            }
 
        }
 
    }
 
    ";
 
    reowolf::ProtocolDescription::parse(pdl).unwrap();
 
}
 

	
 
#[test]
 
fn pdl_reo_fifo1() {
 
    let pdl = b"
 
    primitive fifo1(in a, out b) {
 
        msg m = null;
 
        while(true) synchronous {
 
            if(m == null) {
 
                if(fires(a)) m=get(a);
 
            } else {
 
                if(fires(b)) put(b, m);
 
                m = null;
 
            }
 
        }
 
    }
 
    ";
 
    reowolf::ProtocolDescription::parse(pdl).unwrap();
 
}
 

	
 
#[test]
 
fn pdl_reo_fifo1full() {
 
    let test_log_path = Path::new("./logs/pdl_reo_fifo1full");
 
    let pdl = b"
 
    primitive fifo1full(in a, out b) {
 
        msg m = create(0);
 
        while(true) synchronous {
 
            if(m == null) {
 
                if(fires(a)) m=get(a);
 
            } else {
 
                if(fires(b)) put(b, m);
 
                m = null;
 
            }
 
        }
 
    }
 
    ";
 
    let pd = reowolf::ProtocolDescription::parse(pdl).unwrap();
 
    let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd));
 
    let [_p0, g0] = c.new_port_pair();
 
    let [p1, g1] = c.new_port_pair();
 
    c.add_component(b"fifo1full", &[g0, p1]).unwrap();
 
    c.connect(None).unwrap();
 
    c.get(g1).unwrap();
 
    c.sync(None).unwrap();
 
    assert_eq!(0, c.gotten(g1).unwrap().len());
 
}
 

	
 
#[test]
 
fn pdl_msg_consensus() {
 
    let test_log_path = Path::new("./logs/pdl_msg_consensus");
 
    let pdl = b"
 
    primitive msgconsensus(in a, in b) {
 
        while(true) synchronous {
 
            msg x = get(a);
 
            msg y = get(b);
 
            assert(x == y);
 
        }
 
    }
 
    ";
 
    let pd = reowolf::ProtocolDescription::parse(pdl).unwrap();
 
    let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd));
 
    let [p0, g0] = c.new_port_pair();
 
    let [p1, g1] = c.new_port_pair();
 
    c.add_component(b"msgconsensus", &[g0, g1]).unwrap();
 
    c.connect(None).unwrap();
 
    c.put(p0, Payload::from(b"HELLO" as &[_])).unwrap();
 
    c.put(p1, Payload::from(b"HELLO" as &[_])).unwrap();
 
    c.sync(SEC1).unwrap();
 

	
 
    c.put(p0, Payload::from(b"HEY" as &[_])).unwrap();
 
    c.put(p1, Payload::from(b"HELLO" as &[_])).unwrap();
 
    c.sync(SEC1).unwrap_err();
 
}
 

	
 
#[test]
 
fn sequencer3_prim() {
 
    let test_log_path = Path::new("./logs/sequencer3_prim");
 
    let pdl = b"
 
    primitive sequencer3(out a, out b, out c) {
 
        int i = 0;
 
        while(true) synchronous {
 
            out to = a;
 
            if     (i==1) to = b;
 
            else if(i==2) to = c;
 
            if(fires(to)) {
 
                put(to, create(0));
 
                i = (i + 1)%3;
 
            }
 
        }
 
    }
 
    ";
 
    let pd = reowolf::ProtocolDescription::parse(pdl).unwrap();
 
    let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd));
 

	
 
    // setup a session between (a) native, and (b) sequencer3, connected by 3 ports.
 
    let [p0, g0] = c.new_port_pair();
 
    let [p1, g1] = c.new_port_pair();
 
    let [p2, g2] = c.new_port_pair();
 
    c.add_component(b"sequencer3", &[p0, p1, p2]).unwrap();
 
    c.connect(None).unwrap();
 

	
 
    let which_of_three = move |c: &mut Connector| {
 
        // setup three sync batches. sync. return which succeeded
 
        c.get(g0).unwrap();
 
        c.next_batch().unwrap();
 
        c.get(g1).unwrap();
 
        c.next_batch().unwrap();
 
        c.get(g2).unwrap();
 
        c.sync(None).unwrap()
 
    };
 

	
 
    const TEST_ROUNDS: usize = 50;
 
    // check that the batch index for rounds 0..TEST_ROUNDS are [0, 1, 2, 0, 1, 2, ...]
 
    for expected_batch_idx in (0..=2).cycle().take(TEST_ROUNDS) {
 
        // silent round
 
        assert_eq!(0, c.sync(None).unwrap());
 
        // non silent round
 
        assert_eq!(expected_batch_idx, which_of_three(&mut c));
 
    }
 
}
 

	
 
#[test]
 
fn sequencer3_comp() {
 
    let test_log_path = Path::new("./logs/sequencer3_comp");
 
    let pdl = b"
 
    primitive fifo1_init(msg m, in a, out b) {
 
        while(true) synchronous {
 
            if(m != null && fires(b)) {
 
                put(b, m);
 
                m = null;
 
            } else if (m == null && fires(a)) {
 
                m = get(a);
 
            }
 
        }
 
    }
 
    composite fifo1_full(in a, out b) {
 
        new fifo1_init(create(0), a, b);
 
    }
 
    composite fifo1(in a, out b) {
 
        new fifo1_init(null, a, b);
 
    }
 
    composite sequencer3(out a, out b, out c) {
 
        channel d -> e;
 
        channel f -> g;
 
        channel h -> i;
 
        channel j -> k;
 
        channel l -> m;
 
        channel n -> o;
 

	
 
        new fifo1_full(o, d);
 
        new replicator(e, f, a);
 
        new fifo1(g, h);
 
        new replicator(i, j, b);
 
        new fifo1(k, l);
 
        new replicator(m, n, c);
 
    }
 
    ";
 
    let pd = reowolf::ProtocolDescription::parse(pdl).unwrap();
 
    let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd));
 

	
 
    // setup a session between (a) native, and (b) sequencer3, connected by 3 ports.
 
    let [p0, g0] = c.new_port_pair();
 
    let [p1, g1] = c.new_port_pair();
 
    let [p2, g2] = c.new_port_pair();
 
    c.add_component(b"sequencer3", &[p0, p1, p2]).unwrap();
 
    c.connect(None).unwrap();
 

	
 
    let which_of_three = move |c: &mut Connector| {
 
        // setup three sync batches. sync. return which succeeded
 
        c.get(g0).unwrap();
 
        c.next_batch().unwrap();
 
        c.get(g1).unwrap();
 
        c.next_batch().unwrap();
 
        c.get(g2).unwrap();
 
        c.sync(SEC1).unwrap()
 
    };
 

	
 
    const TEST_ROUNDS: usize = 50;
 
    // check that the batch index for rounds 0..TEST_ROUNDS are [0, 1, 2, 0, 1, 2, ...]
 
    for expected_batch_idx in (0..=2).cycle().take(TEST_ROUNDS) {
 
        // silent round
 
        assert_eq!(0, c.sync(SEC1).unwrap());
 
        // non silent round
 
        assert_eq!(expected_batch_idx, which_of_three(&mut c));
 
    }
 
}
 

	
 
enum XRouterItem {
 
    Silent,
 
    GetA,
 
    GetB,
 
}
 
// Hardcoded pseudo-random sequence of round behaviors for the native component
 
const XROUTER_ITEMS: &[XRouterItem] = {
 
    use XRouterItem::{GetA as A, GetB as B, Silent as S};
 
    &[
 
        B, A, S, B, A, A, B, S, B, S, A, A, S, B, B, S, B, S, B, B, S, B, B, A, B, B, A, B, A, B,
 
        S, B, S, B, S, A, S, B, A, S, B, A, B, S, B, S, B, S, S, B, B, A, A, A, S, S, S, B, A, A,
 
        A, S, S, B, B, B, A, B, S, S, A, A, B, A, B, B, A, A, A, B, A, B, S, A, B, S, A, A, B, S,
 
    ]
 
};
 

	
 
#[test]
 
fn xrouter_prim() {
 
    let test_log_path = Path::new("./logs/xrouter_prim");
 
    let pdl = b"
 
    primitive xrouter(in a, out b, out c) {
 
        while(true) synchronous {
 
            if(fires(a)) {
 
                if(fires(b)) put(b, get(a));
 
                else         put(c, get(a));
 
            }
 
        }
 
    }
 
    ";
 
    let pd = reowolf::ProtocolDescription::parse(pdl).unwrap();
 
    let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd));
 

	
 
    // setup a session between (a) native, and (b) xrouter2, connected by 3 ports.
 
    let [p0, g0] = c.new_port_pair();
 
    let [p1, g1] = c.new_port_pair();
 
    let [p2, g2] = c.new_port_pair();
 
    c.add_component(b"xrouter", &[g0, p1, p2]).unwrap();
 
    c.connect(None).unwrap();
 

	
 
    let now = std::time::Instant::now();
 
    for item in XROUTER_ITEMS.iter() {
 
        match item {
 
            XRouterItem::Silent => {}
 
            XRouterItem::GetA => {
 
                c.put(p0, TEST_MSG.clone()).unwrap();
 
                c.get(g1).unwrap();
 
            }
 
            XRouterItem::GetB => {
 
                c.put(p0, TEST_MSG.clone()).unwrap();
 
                c.get(g2).unwrap();
 
            }
 
        }
 
        assert_eq!(0, c.sync(SEC1).unwrap());
 
    }
 
    println!("PRIM {:?}", now.elapsed());
 
}
 
#[test]
 
fn xrouter_comp() {
 
    let test_log_path = Path::new("./logs/xrouter_comp");
 
    let pdl = b"
 
    primitive lossy(in a, out b) {
 
        while(true) synchronous {
 
            if(fires(a)) {
 
                msg m = get(a);
 
                if(fires(b)) put(b, m);
 
            }
 
        }
 
    }
 
    primitive sync_drain(in a, in b) {
 
        while(true) synchronous {
 
            if(fires(a)) {
 
                get(a);
 
                get(b);
 
            }
 
        }
 
    }
 
    composite xrouter(in a, out b, out c) {
 
        channel d -> e;
 
        channel f -> g;
 
        channel h -> i;
 
        channel j -> k;
 
        channel l -> m;
 
        channel n -> o;
 
        channel p -> q;
 
        channel r -> s;
 
        channel t -> u;
 

	
 
        new replicator(a, d, f);
 
        new replicator(g, t, h);
 
        new lossy(e, l);
 
        new lossy(i, j);
 
        new replicator(m, b, p);
 
        new replicator(k, n, c);
 
        new merger(q, o, r);
 
        new sync_drain(u, s);
 
    }
 
    ";
 
    let pd = reowolf::ProtocolDescription::parse(pdl).unwrap();
 
    let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd));
 

	
 
    // setup a session between (a) native, and (b) xrouter2, connected by 3 ports.
 
    let [p0, g0] = c.new_port_pair();
 
    let [p1, g1] = c.new_port_pair();
 
    let [p2, g2] = c.new_port_pair();
 
    c.add_component(b"xrouter", &[g0, p1, p2]).unwrap();
 
    c.connect(None).unwrap();
 

	
 
    let now = std::time::Instant::now();
 
    for item in XROUTER_ITEMS.iter() {
 
        match item {
 
            XRouterItem::Silent => {}
 
            XRouterItem::GetA => {
 
                c.put(p0, TEST_MSG.clone()).unwrap();
 
                c.get(g1).unwrap();
 
            }
 
            XRouterItem::GetB => {
 
                c.put(p0, TEST_MSG.clone()).unwrap();
 
                c.get(g2).unwrap();
 
            }
 
        }
 
        assert_eq!(0, c.sync(SEC1).unwrap());
 
    }
 
    println!("COMP {:?}", now.elapsed());
 
}
 

	
 
#[test]
 
fn count_stream() {
 
    let test_log_path = Path::new("./logs/count_stream");
 
    let pdl = b"
 
    primitive count_stream(out o) {
 
        msg m = create(1);
 
        m[0] = 0;
 
        while(true) synchronous {
 
            put(o, m);
 
            m[0] += 1;
 
        }
 
    }
 
    ";
 
    let pd = reowolf::ProtocolDescription::parse(pdl).unwrap();
 
    let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd));
 

	
 
    // setup a session between (a) native, and (b) sequencer3, connected by 3 ports.
 
    let [p0, g0] = c.new_port_pair();
 
    c.add_component(b"count_stream", &[p0]).unwrap();
 
    c.connect(None).unwrap();
 

	
 
    for expecting in 0u8..16 {
 
        c.get(g0).unwrap();
 
        c.sync(None).unwrap();
 
        assert_eq!(&[expecting], c.gotten(g0).unwrap().as_slice());
 
    }
 
}
 

	
 
#[test]
 
fn for_msg_byte() {
 
    let test_log_path = Path::new("./logs/for_msg_byte");
 
    let pdl = b"
 
    primitive for_msg_byte(out o) {
 
        byte i = 0;
 
        while(i<8) {
 
            msg m = create(1);
 
            m[0] = i;
 
            synchronous() put(o, m);
 
            i++;
 
        }
 
    }
 
    ";
 
    let pd = reowolf::ProtocolDescription::parse(pdl).unwrap();
 
    let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd));
 

	
 
    // setup a session between (a) native, and (b) sequencer3, connected by 3 ports.
 
    let [p0, g0] = c.new_port_pair();
 
    c.add_component(b"for_msg_byte", &[p0]).unwrap();
 
    c.connect(None).unwrap();
 

	
 
    for expecting in 0u8..8 {
 
        c.get(g0).unwrap();
 
        c.sync(None).unwrap();
 
        assert_eq!(&[expecting], c.gotten(g0).unwrap().as_slice());
 
    }
 
    c.sync(None).unwrap();
 
}
 

	
 
#[test]
 
fn message_concat() {
 
    // Note: PDL quirks:
 
    // 1. declarations as first lines of a scope
 
    // 2. var names cannot be prefixed by types. Eg `msg_concat` prohibited.
 
    let test_log_path = Path::new("./logs/message_concat");
 
    let pdl = b"
 
    primitive message_concat(out o) {
 
        msg a = create(1);
 
        msg b = create(1);
 
        a[0] = 0;
 
        b[0] = 1;
 
        synchronous() put(o, a+b);
 
    }
 
    ";
 
    let pd = reowolf::ProtocolDescription::parse(pdl).unwrap();
 
    let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd));
 

	
 
    // setup a session between (a) native, and (b) sequencer3, connected by 3 ports.
 
    let [p0, g0] = c.new_port_pair();
 
    c.add_component(b"message_concat", &[p0]).unwrap();
 
    c.connect(None).unwrap();
 
    c.get(g0).unwrap();
 
    c.sync(None).unwrap();
 
    assert_eq!(&[0, 1], c.gotten(g0).unwrap().as_slice());
 
}
0 comments (0 inline, 0 general)