Changeset - 1bacc6467d19
[Not reviewed]
0 5 0
Christopher Esterhuyse - 5 years ago 2020-06-19 19:16:14
christopher.esterhuyse@gmail.com
rebuilding setup phase. logging is vastly improved
5 files changed with 240 insertions and 40 deletions:
0 comments (0 inline, 0 general)
Cargo.toml
Show inline comments
 
@@ -22,9 +22,9 @@ indexmap = "1.3.0" # hashsets/hashmaps with efficient arbitrary element removal
 
# network
 
integer-encoding = "1.0.7"
 
byteorder = "1.3.2"
 
mio = "0.6.21"
 
# mio = "0.6.21"
 
mio-extras = "2.0.6"
 
mio07 = { version = "0.7.0", package = "mio", features = ["tcp", "os-poll"] }
 
mio = { version = "0.7.0", package = "mio", features = ["tcp", "os-poll"] }
 

	
 
# protocol
 
# id-arena = "2.2.1"
src/common.rs
Show inline comments
 
@@ -14,7 +14,7 @@ pub use indexmap::{IndexMap, IndexSet};
 
pub use maplit::{hashmap, hashset};
 
pub use mio::{
 
    net::{TcpListener, TcpStream},
 
    Event, Evented, Events, Poll, PollOpt, Ready, Token,
 
    Events, Interest, Poll, Token,
 
};
 
pub use std::{
 
    collections::{hash_map::Entry, BTreeMap, HashMap, HashSet},
src/runtime/mod.rs
Show inline comments
 
@@ -62,7 +62,7 @@ pub(crate) enum CommonSatResult {
 
}
 
pub struct Endpoint {
 
    inbox: Vec<u8>,
 
    stream: mio07::net::TcpStream,
 
    stream: TcpStream,
 
}
 
#[derive(Debug, Default)]
 
pub struct IntStream {
 
@@ -111,10 +111,11 @@ pub struct MemInMsg {
 
}
 
#[derive(Debug)]
 
pub struct EndpointPoller {
 
    poll: mio07::Poll,
 
    events: mio07::Events,
 
    undrained_endpoints: HashSet<usize>,
 
    delayed_inp_messages: Vec<(PortId, Msg)>,
 
    poll: Poll,
 
    events: Events,
 
    undrained_endpoints: IndexSet<usize>,
 
    delayed_messages: Vec<(usize, Msg)>,
 
    undelayed_messages: Vec<(usize, Msg)>,
 
}
 
#[derive(Debug)]
 
pub struct Connector {
 
@@ -165,7 +166,54 @@ pub struct SyncContext<'a> {
 
pub struct NonsyncContext<'a> {
 
    connector: &'a mut Connector,
 
}
 
enum TryRecyAnyError {
 
    Timeout,
 
    PollFailed,
 
    EndpointRecvErr { error: EndpointRecvErr, index: usize },
 
    BrokenEndpoint(usize),
 
}
 
////////////////
 
impl EndpointPoller {
 
    fn try_recv_any(
 
        &mut self,
 
        endpoint_exts: &mut [EndpointExt],
 
        deadline: Instant,
 
    ) -> Result<(usize, Msg), TryRecyAnyError> {
 
        use TryRecyAnyError::*;
 
        // 1. try messages already buffered
 
        if let Some(x) = self.undelayed_messages.pop() {
 
            return Ok(x);
 
        }
 
        // 2. try read from sockets nonblocking
 
        while let Some(index) = self.undrained_endpoints.pop() {
 
            if let Some(msg) = endpoint_exts[index]
 
                .endpoint
 
                .try_recv()
 
                .map_err(|error| EndpointRecvErr { error, index })?
 
            {
 
                return Ok((index, msg));
 
            }
 
        }
 
        // 3. poll for progress
 
        loop {
 
            let remaining = deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?;
 
            self.poll.poll(&mut self.events, Some(remaining)).map_err(|_| PollFailed)?;
 
            for event in self.events.iter() {
 
                let Token(index) = event.token();
 
                if let Some(msg) = endpoint_exts[index]
 
                    .endpoint
 
                    .try_recv()
 
                    .map_err(|error| EndpointRecvErr { error, index })?
 
                {
 
                    return Ok((index, msg));
 
                }
 
            }
 
        }
 
    }
 
    fn undelay_all(&mut self) {
 
        self.undelayed_messages.extend(self.delayed_messages.drain(..));
 
    }
 
}
 
impl Debug for Endpoint {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        f.debug_struct("Endpoint").field("inbox", &self.inbox).finish()
 
@@ -291,9 +339,21 @@ impl Endpoint {
 
    }
 
}
 
impl Connector {
 
    fn get_logger(&self) -> &dyn Logger {
 
    pub fn get_logger(&self) -> &dyn Logger {
 
        &*self.logger
 
    }
 
    pub fn print_state(&self) {
 
        let stdout = std::io::stdout();
 
        let mut lock = stdout.lock();
 
        writeln!(
 
            lock,
 
            "--- Connector with ControllerId={:?}.\n::LOG_DUMP:\n",
 
            self.id_manager.controller_id
 
        )
 
        .unwrap();
 
        self.get_logger().dump_log(&mut lock);
 
        writeln!(lock, "DEBUG_PRINT:\n{:#?}\n", self).unwrap();
 
    }
 
}
 

	
 
// #[derive(Debug)]
src/runtime/my_tests.rs
Show inline comments
 
use crate as reowolf;
 
use reowolf::Polarity::*;
 
use crossbeam_utils::thread::scope;
 
use reowolf::{Connector, EndpointSetup, Polarity::*, ProtocolDescription};
 
use std::net::SocketAddr;
 
use std::{sync::Arc, time::Duration};
 

	
 
@@ -14,19 +15,19 @@ fn next_test_addr() -> SocketAddr {
 
}
 

	
 
lazy_static::lazy_static! {
 
    static ref MINIMAL_PROTO: Arc<reowolf::ProtocolDescription> =
 
    static ref MINIMAL_PROTO: Arc<ProtocolDescription> =
 
        { Arc::new(reowolf::ProtocolDescription::parse(b"").unwrap()) };
 
}
 

	
 
#[test]
 
fn simple_connector() {
 
    let c = reowolf::Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    println!("{:#?}", c);
 
}
 

	
 
#[test]
 
fn add_port_pair() {
 
    let mut c = reowolf::Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let [_, _] = c.add_port_pair();
 
    let [_, _] = c.add_port_pair();
 
    println!("{:#?}", c);
 
@@ -34,7 +35,7 @@ fn add_port_pair() {
 

	
 
#[test]
 
fn add_sync() {
 
    let mut c = reowolf::Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let [o, i] = c.add_port_pair();
 
    c.add_component(b"sync", &[i, o]).unwrap();
 
    println!("{:#?}", c);
 
@@ -42,35 +43,51 @@ fn add_sync() {
 

	
 
#[test]
 
fn add_net_port() {
 
    let mut c = reowolf::Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let sock_addr = next_test_addr();
 
    let _ = c
 
        .add_net_port(reowolf::EndpointSetup { polarity: Getter, sock_addr, is_active: false })
 
        .unwrap();
 
    let _ = c
 
        .add_net_port(reowolf::EndpointSetup { polarity: Putter, sock_addr, is_active: true })
 
        .unwrap();
 
    let _ =
 
        c.add_net_port(EndpointSetup { polarity: Getter, sock_addr, is_active: false }).unwrap();
 
    let _ = c.add_net_port(EndpointSetup { polarity: Putter, sock_addr, is_active: true }).unwrap();
 
    println!("{:#?}", c);
 
}
 

	
 
#[test]
 
fn trivial_connect() {
 
    let mut c = reowolf::Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    println!("{:#?}", c);
 
}
 

	
 
#[test]
 
fn single_node_connect() {
 
    let mut c = reowolf::Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let sock_addr = next_test_addr();
 
    let _ = c
 
        .add_net_port(reowolf::EndpointSetup { polarity: Getter, sock_addr, is_active: false })
 
        .unwrap();
 
    let _ = c
 
        .add_net_port(reowolf::EndpointSetup { polarity: Putter, sock_addr, is_active: true })
 
        .unwrap();
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let _ =
 
        c.add_net_port(EndpointSetup { polarity: Getter, sock_addr, is_active: false }).unwrap();
 
    let _ = c.add_net_port(EndpointSetup { polarity: Putter, sock_addr, is_active: true }).unwrap();
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    println!("{:#?}", c);
 
    c.get_logger().dump_log(&mut std::io::stdout().lock());
 
}
 

	
 
#[test]
 
fn multithreaded_connect() {
 
    let sock_addr = next_test_addr();
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
            let es = EndpointSetup { polarity: Getter, sock_addr, is_active: true };
 
            let _ = c.add_net_port(es).unwrap();
 
            c.connect(Duration::from_secs(1)).unwrap();
 
            c.print_state();
 
        });
 
        s.spawn(|_| {
 
            let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 1);
 
            let es = EndpointSetup { polarity: Putter, sock_addr, is_active: false };
 
            let _ = c.add_net_port(es).unwrap();
 
            c.connect(Duration::from_secs(1)).unwrap();
 
            c.print_state();
 
        });
 
    })
 
    .unwrap();
 
}
src/runtime/setup2.rs
Show inline comments
 
@@ -115,8 +115,13 @@ impl Connector {
 
                )?;
 
                log!(self.logger, "Successfully connected {} endpoints", endpoint_exts.len());
 
                // leader election and tree construction
 
                let neighborhood =
 
                    init_neighborhood(&mut *self.logger, &mut endpoint_exts, &mut endpoint_poller)?;
 
                let neighborhood = init_neighborhood(
 
                    self.id_manager.controller_id,
 
                    &mut *self.logger,
 
                    &mut endpoint_exts,
 
                    &mut endpoint_poller,
 
                    deadline,
 
                )?;
 
                log!(self.logger, "Successfully created neighborhood {:?}", &neighborhood);
 
                // TODO session optimization goes here
 
                self.phased = ConnectorPhased::Communication {
 
@@ -137,10 +142,6 @@ fn init_endpoints(
 
    inp_to_route: &mut HashMap<PortId, InpRoute>,
 
    deadline: Instant,
 
) -> Result<(Vec<EndpointExt>, EndpointPoller), ()> {
 
    use mio07::{
 
        net::{TcpListener, TcpStream},
 
        Events, Interest, Poll, Token,
 
    };
 
    const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE);
 
    struct Todo {
 
        todo_endpoint: TodoEndpoint,
 
@@ -182,7 +183,8 @@ fn init_endpoints(
 
        poll: Poll::new().map_err(drop)?,
 
        events: Events::with_capacity(64),
 
        undrained_endpoints: Default::default(),
 
        delayed_inp_messages: Default::default(),
 
        delayed_messages: Default::default(),
 
        undelayed_messages: Default::default(),
 
    };
 

	
 
    let mut todos = endpoint_setups
 
@@ -264,12 +266,133 @@ fn init_endpoints(
 
}
 

	
 
fn init_neighborhood(
 
    controller_id: ControllerId,
 
    logger: &mut dyn Logger,
 
    endpoint_exts: &mut [EndpointExt],
 
    endpoint_poller: &mut EndpointPoller,
 
    ep: &mut EndpointPoller,
 
    deadline: Instant,
 
) -> Result<Neighborhood, ()> {
 
    log!(logger, "Time to construct my neighborhood");
 
    let parent = None;
 
    let children = Default::default();
 
    log!(logger, "beginning neighborhood construction");
 
    use Msg::SetupMsg as S;
 
    use SetupMsg::*;
 

	
 
    // 1. broadcast my ID as the first echo. await reply from all neighbors
 
    let echo = S(LeaderEcho { maybe_leader: controller_id });
 
    let mut awaiting = HashSet::with_capacity(endpoint_exts.len());
 
    for (index, ee) in endpoint_exts.iter_mut().enumerate() {
 
        log!(logger, "{:?}'s initial echo to {:?}, {:?}", controller_id, index, &echo);
 
        ee.endpoint.send(&echo)?;
 
        awaiting.insert(index);
 
    }
 

	
 
    // 2. Receive incoming replies. whenever a higher-id echo arrives,
 
    //    adopt it as leader, sender as parent, and reset the await set.
 
    let mut parent: Option<usize> = None;
 
    let mut my_leader = controller_id;
 
    ep.undelay_all();
 
    'echo_loop: while !awaiting.is_empty() || parent.is_some() {
 
        let (index, msg) = ep.try_recv_any(endpoint_exts, deadline).map_err(drop)?;
 
        log!(logger, "GOT from index {:?} msg {:?}", &index, &msg);
 
        match msg {
 
            S(LeaderAnnounce { leader }) => {
 
                // someone else completed the echo and became leader first!
 
                // the sender is my parent
 
                parent = Some(index);
 
                my_leader = leader;
 
                awaiting.clear();
 
                break 'echo_loop;
 
            }
 
            S(LeaderEcho { maybe_leader }) => {
 
                use Ordering::*;
 
                match maybe_leader.cmp(&my_leader) {
 
                    Less => { /* ignore */ }
 
                    Equal => {
 
                        awaiting.remove(&index);
 
                        if awaiting.is_empty() {
 
                            if let Some(p) = parent {
 
                                // return the echo to my parent
 
                                endpoint_exts[p].endpoint.send(&S(LeaderEcho { maybe_leader }))?;
 
                            } else {
 
                                // DECIDE!
 
                                break 'echo_loop;
 
                            }
 
                        }
 
                    }
 
                    Greater => {
 
                        // join new echo
 
                        log!(logger, "Setting leader to index {:?}", index);
 
                        parent = Some(index);
 
                        my_leader = maybe_leader;
 
                        let echo = S(LeaderEcho { maybe_leader: my_leader });
 
                        awaiting.clear();
 
                        if endpoint_exts.len() == 1 {
 
                            // immediately reply to parent
 
                            log!(logger, "replying echo to parent {:?} immediately", index);
 
                            endpoint_exts[index].endpoint.send(&echo)?;
 
                        } else {
 
                            for (index2, ee) in endpoint_exts.iter_mut().enumerate() {
 
                                if index2 == index {
 
                                    continue;
 
                                }
 
                                log!(logger, "repeating echo {:?} to {:?}", &echo, index2);
 
                                ee.endpoint.send(&echo)?;
 
                                awaiting.insert(index2);
 
                            }
 
                        }
 
                    }
 
                }
 
            }
 
            inappropriate_msg => ep.delayed_messages.push((index, inappropriate_msg)),
 
        }
 
    }
 
    match parent {
 
        None => assert_eq!(
 
            my_leader, controller_id,
 
            "I've got no parent, but I consider {:?} the leader?",
 
            my_leader
 
        ),
 
        Some(parent) => assert_ne!(
 
            my_leader, controller_id,
 
            "I have {:?} as parent, but I consider myself ({:?}) the leader?",
 
            parent, controller_id
 
        ),
 
    }
 

	
 
    log!(logger, "DONE WITH ECHO! Leader has cid={:?}", my_leader);
 

	
 
    // 3. broadcast leader announcement (except to parent: confirm they are your parent)
 
    //    in this loop, every node sends 1 message to each neighbor
 
    // await 1 message from all non-parents
 
    let msg_for_non_parents = S(LeaderAnnounce { leader: my_leader });
 
    for (index, ee) in endpoint_exts.iter_mut().enumerate() {
 
        let msg = if Some(index) == parent {
 
            &S(YouAreMyParent)
 
        } else {
 
            awaiting.insert(index);
 
            &msg_for_non_parents
 
        };
 
        log!(logger, "ANNOUNCING to {:?} {:?}", index, msg);
 
        ee.endpoint.send(msg)?;
 
    }
 
    let mut children = Vec::default();
 
    ep.undelay_all();
 
    while !awaiting.is_empty() {
 
        let (index, msg) = ep.try_recv_any(endpoint_exts, deadline).map_err(drop)?;
 
        match msg {
 
            S(YouAreMyParent) => {
 
                assert!(awaiting.remove(&index));
 
                children.push(index);
 
            }
 
            S(SetupMsg::LeaderAnnounce { leader }) => {
 
                assert!(awaiting.remove(&index));
 
                assert!(leader == my_leader);
 
                assert!(Some(index) != parent);
 
                // they wouldn't send me this if they considered me their parent
 
            }
 
            inappropriate_msg => ep.delayed_messages.push((index, inappropriate_msg)),
 
        }
 
    }
 
    children.sort();
 
    children.dedup();
 
    Ok(Neighborhood { parent, children })
 
}
0 comments (0 inline, 0 general)