Changeset - 1bacc6467d19
[Not reviewed]
0 5 0
Christopher Esterhuyse - 5 years ago 2020-06-19 19:16:14
christopher.esterhuyse@gmail.com
rebuilding setup phase. logging is vastly improved
5 files changed with 240 insertions and 40 deletions:
0 comments (0 inline, 0 general)
Cargo.toml
Show inline comments
 
[package]
 
name = "reowolf_rs"
 
version = "0.1.3"
 
authors = [
 
	"Christopher Esterhuyse <esterhuy@cwi.nl, christopher.esterhuyse@gmail.com>",
 
	"Hans-Dieter Hiep <hdh@cwi.nl>"
 
]
 
edition = "2018"
 

	
 
[dependencies]
 
# convenience macros
 
maplit = "1.0.2"
 
derive_more = "0.99.2"
 

	
 
# runtime
 
bincode = "1.2.1"
 
serde = { version = "1.0.112", features = ["derive"] }
 
getrandom = "0.1.14" # tiny crate. used to guess controller-id
 
take_mut = "0.2.2"
 
indexmap = "1.3.0" # hashsets/hashmaps with efficient arbitrary element removal
 

	
 
# network
 
integer-encoding = "1.0.7"
 
byteorder = "1.3.2"
 
mio = "0.6.21"
 
# mio = "0.6.21"
 
mio-extras = "2.0.6"
 
mio07 = { version = "0.7.0", package = "mio", features = ["tcp", "os-poll"] }
 
mio = { version = "0.7.0", package = "mio", features = ["tcp", "os-poll"] }
 

	
 
# protocol
 
# id-arena = "2.2.1"
 
backtrace = "0.3"
 

	
 
[dev-dependencies]
 
test-generator = "0.3.0"
 
crossbeam-utils = "0.7.0"
 
lazy_static = "1.4.0"
 

	
 
[lib]
 
# compile target: dynamically linked library using C ABI
 
crate-type = ["cdylib"]
 

	
 
[features]
 
default = ["ffi"]
 
ffi = [] # no feature dependencies
 
\ No newline at end of file
src/common.rs
Show inline comments
 
///////////////////// PRELUDE /////////////////////
 

	
 
pub use crate::protocol::{ComponentState, ProtocolDescription};
 
pub use crate::runtime::{NonsyncContext, SyncContext};
 

	
 
pub use core::{
 
    cmp::Ordering,
 
    fmt::{Debug, Formatter},
 
    hash::{Hash, Hasher},
 
    ops::{Range, RangeFrom},
 
    time::Duration,
 
};
 
pub use indexmap::{IndexMap, IndexSet};
 
pub use maplit::{hashmap, hashset};
 
pub use mio::{
 
    net::{TcpListener, TcpStream},
 
    Event, Evented, Events, Poll, PollOpt, Ready, Token,
 
    Events, Interest, Poll, Token,
 
};
 
pub use std::{
 
    collections::{hash_map::Entry, BTreeMap, HashMap, HashSet},
 
    convert::TryInto,
 
    io::{Read, Write},
 
    net::SocketAddr,
 
    sync::Arc,
 
    time::Instant,
 
};
 
pub use Polarity::*;
 

	
 
///////////////////// DEFS /////////////////////
 

	
 
pub type ControllerId = u32;
 
pub type PortSuffix = u32;
 

	
 
// globally unique
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
pub struct PortId {
 
    pub(crate) controller_id: ControllerId,
 
    pub(crate) port_index: PortSuffix,
 
}
src/runtime/mod.rs
Show inline comments
 
@@ -41,101 +41,102 @@ pub(crate) enum SetupMsg {
 
    YouAreMyParent,
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub(crate) struct CommMsg {
 
    pub round_index: usize,
 
    pub contents: CommMsgContents,
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub(crate) enum CommMsgContents {
 
    SendPayload { payload_predicate: Predicate, payload: Payload },
 
    Elaborate { partial_oracle: Predicate }, // SINKWARD
 
    Failure,                                 // SINKWARD
 
    Announce { decision: Decision },         // SINKAWAYS
 
}
 
#[derive(Debug, PartialEq)]
 
pub(crate) enum CommonSatResult {
 
    FormerNotLatter,
 
    LatterNotFormer,
 
    Equivalent,
 
    New(Predicate),
 
    Nonexistant,
 
}
 
pub struct Endpoint {
 
    inbox: Vec<u8>,
 
    stream: mio07::net::TcpStream,
 
    stream: TcpStream,
 
}
 
#[derive(Debug, Default)]
 
pub struct IntStream {
 
    next: u32,
 
}
 
#[derive(Debug)]
 
pub struct IdManager {
 
    controller_id: ControllerId,
 
    port_suffix_stream: IntStream,
 
}
 
#[derive(Debug)]
 
pub struct ProtoComponent {
 
    state: ComponentState,
 
    ports: HashSet<PortId>,
 
}
 
#[derive(Debug)]
 
pub enum InpRoute {
 
    NativeComponent,
 
    ProtoComponent { index: usize },
 
    Endpoint { index: usize },
 
}
 
pub trait Logger: Debug {
 
    fn line_writer(&mut self) -> &mut dyn std::fmt::Write;
 
    fn dump_log(&self, w: &mut dyn std::io::Write);
 
}
 
#[derive(Debug, Clone)]
 
pub struct EndpointSetup {
 
    pub polarity: Polarity,
 
    pub sock_addr: SocketAddr,
 
    pub is_active: bool,
 
}
 
#[derive(Debug)]
 
pub struct EndpointExt {
 
    endpoint: Endpoint,
 
    inp_for_emerging_msgs: PortId,
 
}
 
#[derive(Debug)]
 
pub struct Neighborhood {
 
    parent: Option<usize>,
 
    children: Vec<usize>, // ordered, deduplicated
 
}
 
#[derive(Debug)]
 
pub struct MemInMsg {
 
    inp: PortId,
 
    msg: Payload,
 
}
 
#[derive(Debug)]
 
pub struct EndpointPoller {
 
    poll: mio07::Poll,
 
    events: mio07::Events,
 
    undrained_endpoints: HashSet<usize>,
 
    delayed_inp_messages: Vec<(PortId, Msg)>,
 
    poll: Poll,
 
    events: Events,
 
    undrained_endpoints: IndexSet<usize>,
 
    delayed_messages: Vec<(usize, Msg)>,
 
    undelayed_messages: Vec<(usize, Msg)>,
 
}
 
#[derive(Debug)]
 
pub struct Connector {
 
    logger: Box<dyn Logger>,
 
    proto_description: Arc<ProtocolDescription>,
 
    id_manager: IdManager,
 
    native_ports: HashSet<PortId>,
 
    proto_components: Vec<ProtoComponent>,
 
    outp_to_inp: HashMap<PortId, PortId>,
 
    inp_to_route: HashMap<PortId, InpRoute>,
 
    phased: ConnectorPhased,
 
}
 
#[derive(Debug)]
 
pub enum ConnectorPhased {
 
    Setup {
 
        endpoint_setups: Vec<(PortId, EndpointSetup)>,
 
        surplus_sockets: u16,
 
    },
 
    Communication {
 
        endpoint_poller: EndpointPoller,
 
        endpoint_exts: Vec<EndpointExt>,
 
        neighborhood: Neighborhood,
 
        mem_inbox: Vec<MemInMsg>,
 
    },
 
@@ -144,49 +145,96 @@ pub enum ConnectorPhased {
 
pub struct StringLogger(ControllerId, String);
 
#[derive(Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
pub(crate) struct Predicate {
 
    pub assigned: BTreeMap<PortId, bool>,
 
}
 
#[derive(Debug, Default)]
 
struct SyncBatch {
 
    puts: HashMap<PortId, Payload>,
 
    gets: HashSet<PortId>,
 
}
 
pub struct MonitoredReader<R: Read> {
 
    bytes: usize,
 
    r: R,
 
}
 
pub enum EndpointRecvErr {
 
    MalformedMessage,
 
    BrokenEndpoint,
 
}
 
pub struct SyncContext<'a> {
 
    connector: &'a mut Connector,
 
}
 
pub struct NonsyncContext<'a> {
 
    connector: &'a mut Connector,
 
}
 
enum TryRecyAnyError {
 
    Timeout,
 
    PollFailed,
 
    EndpointRecvErr { error: EndpointRecvErr, index: usize },
 
    BrokenEndpoint(usize),
 
}
 
////////////////
 
impl EndpointPoller {
 
    fn try_recv_any(
 
        &mut self,
 
        endpoint_exts: &mut [EndpointExt],
 
        deadline: Instant,
 
    ) -> Result<(usize, Msg), TryRecyAnyError> {
 
        use TryRecyAnyError::*;
 
        // 1. try messages already buffered
 
        if let Some(x) = self.undelayed_messages.pop() {
 
            return Ok(x);
 
        }
 
        // 2. try read from sockets nonblocking
 
        while let Some(index) = self.undrained_endpoints.pop() {
 
            if let Some(msg) = endpoint_exts[index]
 
                .endpoint
 
                .try_recv()
 
                .map_err(|error| EndpointRecvErr { error, index })?
 
            {
 
                return Ok((index, msg));
 
            }
 
        }
 
        // 3. poll for progress
 
        loop {
 
            let remaining = deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?;
 
            self.poll.poll(&mut self.events, Some(remaining)).map_err(|_| PollFailed)?;
 
            for event in self.events.iter() {
 
                let Token(index) = event.token();
 
                if let Some(msg) = endpoint_exts[index]
 
                    .endpoint
 
                    .try_recv()
 
                    .map_err(|error| EndpointRecvErr { error, index })?
 
                {
 
                    return Ok((index, msg));
 
                }
 
            }
 
        }
 
    }
 
    fn undelay_all(&mut self) {
 
        self.undelayed_messages.extend(self.delayed_messages.drain(..));
 
    }
 
}
 
impl Debug for Endpoint {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        f.debug_struct("Endpoint").field("inbox", &self.inbox).finish()
 
    }
 
}
 
impl NonsyncContext<'_> {
 
    pub fn new_component(&mut self, moved_ports: HashSet<PortId>, init_state: ComponentState) {
 
        todo!()
 
    }
 
    pub fn new_channel(&mut self) -> [PortId; 2] {
 
        todo!()
 
    }
 
}
 
impl SyncContext<'_> {
 
    pub fn is_firing(&mut self, port: PortId) -> Option<bool> {
 
        todo!()
 
    }
 
    pub fn read_msg(&mut self, port: PortId) -> Option<&Payload> {
 
        todo!()
 
    }
 
}
 
impl<R: Read> From<R> for MonitoredReader<R> {
 
    fn from(r: R) -> Self {
 
        Self { r, bytes: 0 }
 
@@ -270,51 +318,63 @@ impl Endpoint {
 
            }
 
        }
 
        let mut monitored = MonitoredReader::from(&self.inbox[..]);
 
        match bincode::deserialize_from(&mut monitored) {
 
            Ok(msg) => {
 
                let msg_size = monitored.bytes_read();
 
                self.inbox.drain(0..(msg_size.try_into().unwrap()));
 
                Ok(Some(msg))
 
            }
 
            Err(e) => match *e {
 
                bincode::ErrorKind::Io(k) if k.kind() == std::io::ErrorKind::UnexpectedEof => {
 
                    Ok(None)
 
                }
 
                _ => Err(MalformedMessage),
 
                // println!("SERDE ERRKIND {:?}", e);
 
                // Err(MalformedMessage)
 
            },
 
        }
 
    }
 
    fn send<T: serde::ser::Serialize>(&mut self, msg: &T) -> Result<(), ()> {
 
        bincode::serialize_into(&mut self.stream, msg).map_err(drop)
 
    }
 
}
 
impl Connector {
 
    fn get_logger(&self) -> &dyn Logger {
 
    pub fn get_logger(&self) -> &dyn Logger {
 
        &*self.logger
 
    }
 
    pub fn print_state(&self) {
 
        let stdout = std::io::stdout();
 
        let mut lock = stdout.lock();
 
        writeln!(
 
            lock,
 
            "--- Connector with ControllerId={:?}.\n::LOG_DUMP:\n",
 
            self.id_manager.controller_id
 
        )
 
        .unwrap();
 
        self.get_logger().dump_log(&mut lock);
 
        writeln!(lock, "DEBUG_PRINT:\n{:#?}\n", self).unwrap();
 
    }
 
}
 

	
 
// #[derive(Debug)]
 
// pub enum Connector {
 
//     Unconfigured(Unconfigured),
 
//     Configured(Configured),
 
//     Connected(Connected), // TODO consider boxing. currently takes up a lot of stack space
 
// }
 
// #[derive(Debug)]
 
// pub struct Unconfigured {
 
//     pub controller_id: ControllerId,
 
// }
 
// #[derive(Debug)]
 
// pub struct Configured {
 
//     controller_id: ControllerId,
 
//     polarities: Vec<Polarity>,
 
//     bindings: HashMap<usize, PortBinding>,
 
//     protocol_description: Arc<ProtocolD>,
 
//     main_component: Vec<u8>,
 
//     logger: String,
 
// }
 
// #[derive(Debug)]
 
// pub struct Connected {
 
//     native_interface: Vec<(PortId, Polarity)>,
src/runtime/my_tests.rs
Show inline comments
 
use crate as reowolf;
 
use reowolf::Polarity::*;
 
use crossbeam_utils::thread::scope;
 
use reowolf::{Connector, EndpointSetup, Polarity::*, ProtocolDescription};
 
use std::net::SocketAddr;
 
use std::{sync::Arc, time::Duration};
 

	
 
fn next_test_addr() -> SocketAddr {
 
    use std::{
 
        net::{Ipv4Addr, SocketAddrV4},
 
        sync::atomic::{AtomicU16, Ordering::SeqCst},
 
    };
 
    static TEST_PORT: AtomicU16 = AtomicU16::new(5_000);
 
    let port = TEST_PORT.fetch_add(1, SeqCst);
 
    SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into()
 
}
 

	
 
lazy_static::lazy_static! {
 
    static ref MINIMAL_PROTO: Arc<reowolf::ProtocolDescription> =
 
    static ref MINIMAL_PROTO: Arc<ProtocolDescription> =
 
        { Arc::new(reowolf::ProtocolDescription::parse(b"").unwrap()) };
 
}
 

	
 
#[test]
 
fn simple_connector() {
 
    let c = reowolf::Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    println!("{:#?}", c);
 
}
 

	
 
#[test]
 
fn add_port_pair() {
 
    let mut c = reowolf::Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let [_, _] = c.add_port_pair();
 
    let [_, _] = c.add_port_pair();
 
    println!("{:#?}", c);
 
}
 

	
 
#[test]
 
fn add_sync() {
 
    let mut c = reowolf::Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let [o, i] = c.add_port_pair();
 
    c.add_component(b"sync", &[i, o]).unwrap();
 
    println!("{:#?}", c);
 
}
 

	
 
#[test]
 
fn add_net_port() {
 
    let mut c = reowolf::Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let sock_addr = next_test_addr();
 
    let _ = c
 
        .add_net_port(reowolf::EndpointSetup { polarity: Getter, sock_addr, is_active: false })
 
        .unwrap();
 
    let _ = c
 
        .add_net_port(reowolf::EndpointSetup { polarity: Putter, sock_addr, is_active: true })
 
        .unwrap();
 
    let _ =
 
        c.add_net_port(EndpointSetup { polarity: Getter, sock_addr, is_active: false }).unwrap();
 
    let _ = c.add_net_port(EndpointSetup { polarity: Putter, sock_addr, is_active: true }).unwrap();
 
    println!("{:#?}", c);
 
}
 

	
 
#[test]
 
fn trivial_connect() {
 
    let mut c = reowolf::Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    println!("{:#?}", c);
 
}
 

	
 
#[test]
 
fn single_node_connect() {
 
    let mut c = reowolf::Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let sock_addr = next_test_addr();
 
    let _ = c
 
        .add_net_port(reowolf::EndpointSetup { polarity: Getter, sock_addr, is_active: false })
 
        .unwrap();
 
    let _ = c
 
        .add_net_port(reowolf::EndpointSetup { polarity: Putter, sock_addr, is_active: true })
 
        .unwrap();
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let _ =
 
        c.add_net_port(EndpointSetup { polarity: Getter, sock_addr, is_active: false }).unwrap();
 
    let _ = c.add_net_port(EndpointSetup { polarity: Putter, sock_addr, is_active: true }).unwrap();
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    println!("{:#?}", c);
 
    c.get_logger().dump_log(&mut std::io::stdout().lock());
 
}
 

	
 
#[test]
 
fn multithreaded_connect() {
 
    let sock_addr = next_test_addr();
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
            let es = EndpointSetup { polarity: Getter, sock_addr, is_active: true };
 
            let _ = c.add_net_port(es).unwrap();
 
            c.connect(Duration::from_secs(1)).unwrap();
 
            c.print_state();
 
        });
 
        s.spawn(|_| {
 
            let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 1);
 
            let es = EndpointSetup { polarity: Putter, sock_addr, is_active: false };
 
            let _ = c.add_net_port(es).unwrap();
 
            c.connect(Duration::from_secs(1)).unwrap();
 
            c.print_state();
 
        });
 
    })
 
    .unwrap();
 
}
src/runtime/setup2.rs
Show inline comments
 
@@ -94,116 +94,118 @@ impl Connector {
 
                self.inp_to_route
 
                    .insert(*port, InpRoute::ProtoComponent { index: proto_component_index });
 
            }
 
        }
 
        Ok(())
 
    }
 
    pub fn connect(&mut self, timeout: Duration) -> Result<(), ()> {
 
        match &mut self.phased {
 
            ConnectorPhased::Communication { .. } => {
 
                log!(self.logger, "Call to connecting in connected state");
 
                Err(())
 
            }
 
            ConnectorPhased::Setup { endpoint_setups, .. } => {
 
                log!(self.logger, "Call to connecting in setup state. Timeout {:?}", timeout);
 
                let deadline = Instant::now() + timeout;
 
                // connect all endpoints in parallel; send and receive peer ids through ports
 
                let (mut endpoint_exts, mut endpoint_poller) = init_endpoints(
 
                    &mut *self.logger,
 
                    endpoint_setups,
 
                    &mut self.inp_to_route,
 
                    deadline,
 
                )?;
 
                log!(self.logger, "Successfully connected {} endpoints", endpoint_exts.len());
 
                // leader election and tree construction
 
                let neighborhood =
 
                    init_neighborhood(&mut *self.logger, &mut endpoint_exts, &mut endpoint_poller)?;
 
                let neighborhood = init_neighborhood(
 
                    self.id_manager.controller_id,
 
                    &mut *self.logger,
 
                    &mut endpoint_exts,
 
                    &mut endpoint_poller,
 
                    deadline,
 
                )?;
 
                log!(self.logger, "Successfully created neighborhood {:?}", &neighborhood);
 
                // TODO session optimization goes here
 
                self.phased = ConnectorPhased::Communication {
 
                    endpoint_poller,
 
                    endpoint_exts,
 
                    neighborhood,
 
                    mem_inbox: Default::default(),
 
                };
 
                Ok(())
 
            }
 
        }
 
    }
 
}
 

	
 
fn init_endpoints(
 
    logger: &mut dyn Logger,
 
    endpoint_setups: &[(PortId, EndpointSetup)],
 
    inp_to_route: &mut HashMap<PortId, InpRoute>,
 
    deadline: Instant,
 
) -> Result<(Vec<EndpointExt>, EndpointPoller), ()> {
 
    use mio07::{
 
        net::{TcpListener, TcpStream},
 
        Events, Interest, Poll, Token,
 
    };
 
    const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE);
 
    struct Todo {
 
        todo_endpoint: TodoEndpoint,
 
        endpoint_setup: EndpointSetup,
 
        local_port: PortId,
 
        sent_local_port: bool,          // true <-> I've sent my local port
 
        recv_peer_port: Option<PortId>, // Some(..) <-> I've received my peer's port
 
    }
 
    enum TodoEndpoint {
 
        Listener(TcpListener),
 
        Endpoint(Endpoint),
 
    }
 
    fn init(
 
        token: Token,
 
        local_port: PortId,
 
        endpoint_setup: &EndpointSetup,
 
        poll: &mut Poll,
 
    ) -> Result<Todo, ()> {
 
        let todo_endpoint = if endpoint_setup.is_active {
 
            let mut stream = TcpStream::connect(endpoint_setup.sock_addr).map_err(drop)?;
 
            poll.registry().register(&mut stream, token, BOTH).unwrap();
 
            TodoEndpoint::Endpoint(Endpoint { stream, inbox: vec![] })
 
        } else {
 
            let mut listener = TcpListener::bind(endpoint_setup.sock_addr).map_err(drop)?;
 
            poll.registry().register(&mut listener, token, BOTH).unwrap();
 
            TodoEndpoint::Listener(listener)
 
        };
 
        Ok(Todo {
 
            todo_endpoint,
 
            endpoint_setup: endpoint_setup.clone(),
 
            local_port,
 
            sent_local_port: false,
 
            recv_peer_port: None,
 
        })
 
    };
 
    ////////////////////////
 

	
 
    let mut ep = EndpointPoller {
 
        poll: Poll::new().map_err(drop)?,
 
        events: Events::with_capacity(64),
 
        undrained_endpoints: Default::default(),
 
        delayed_inp_messages: Default::default(),
 
        delayed_messages: Default::default(),
 
        undelayed_messages: Default::default(),
 
    };
 

	
 
    let mut todos = endpoint_setups
 
        .iter()
 
        .enumerate()
 
        .map(|(index, (local_port, endpoint_setup))| {
 
            init(Token(index), *local_port, endpoint_setup, &mut ep.poll)
 
        })
 
        .collect::<Result<Vec<Todo>, _>>()?;
 

	
 
    let mut unfinished: HashSet<usize> = (0..todos.len()).collect();
 
    while !unfinished.is_empty() {
 
        let remaining = deadline.checked_duration_since(Instant::now()).ok_or(())?;
 
        ep.poll.poll(&mut ep.events, Some(remaining)).map_err(drop)?;
 
        for event in ep.events.iter() {
 
            let token = event.token();
 
            let Token(index) = token;
 
            let todo: &mut Todo = &mut todos[index];
 
            if let TodoEndpoint::Listener(listener) = &mut todo.todo_endpoint {
 
                let (mut stream, peer_addr) = listener.accept().map_err(drop)?;
 
                ep.poll.registry().deregister(listener).unwrap();
 
                ep.poll.registry().register(&mut stream, token, BOTH).unwrap();
 
                log!(logger, "Endpoint({}) accepted a connection from {:?}", index, peer_addr);
 
                let endpoint = Endpoint { stream, inbox: vec![] };
 
@@ -243,33 +245,154 @@ fn init_endpoints(
 
                    if *sent_local_port && recv_peer_port.is_some() {
 
                        unfinished.remove(&index);
 
                        log!(logger, "endpoint[{}] is finished!", index);
 
                    }
 
                }
 
                Todo { todo_endpoint: TodoEndpoint::Listener(_), .. } => unreachable!(),
 
            }
 
        }
 
        ep.events.clear();
 
    }
 
    let endpoint_exts = todos
 
        .into_iter()
 
        .map(|Todo { todo_endpoint, recv_peer_port, .. }| EndpointExt {
 
            endpoint: match todo_endpoint {
 
                TodoEndpoint::Endpoint(endpoint) => endpoint,
 
                TodoEndpoint::Listener(..) => unreachable!(),
 
            },
 
            inp_for_emerging_msgs: recv_peer_port.unwrap(),
 
        })
 
        .collect();
 
    Ok((endpoint_exts, ep))
 
}
 

	
 
fn init_neighborhood(
 
    controller_id: ControllerId,
 
    logger: &mut dyn Logger,
 
    endpoint_exts: &mut [EndpointExt],
 
    endpoint_poller: &mut EndpointPoller,
 
    ep: &mut EndpointPoller,
 
    deadline: Instant,
 
) -> Result<Neighborhood, ()> {
 
    log!(logger, "Time to construct my neighborhood");
 
    let parent = None;
 
    let children = Default::default();
 
    log!(logger, "beginning neighborhood construction");
 
    use Msg::SetupMsg as S;
 
    use SetupMsg::*;
 

	
 
    // 1. broadcast my ID as the first echo. await reply from all neighbors
 
    let echo = S(LeaderEcho { maybe_leader: controller_id });
 
    let mut awaiting = HashSet::with_capacity(endpoint_exts.len());
 
    for (index, ee) in endpoint_exts.iter_mut().enumerate() {
 
        log!(logger, "{:?}'s initial echo to {:?}, {:?}", controller_id, index, &echo);
 
        ee.endpoint.send(&echo)?;
 
        awaiting.insert(index);
 
    }
 

	
 
    // 2. Receive incoming replies. whenever a higher-id echo arrives,
 
    //    adopt it as leader, sender as parent, and reset the await set.
 
    let mut parent: Option<usize> = None;
 
    let mut my_leader = controller_id;
 
    ep.undelay_all();
 
    'echo_loop: while !awaiting.is_empty() || parent.is_some() {
 
        let (index, msg) = ep.try_recv_any(endpoint_exts, deadline).map_err(drop)?;
 
        log!(logger, "GOT from index {:?} msg {:?}", &index, &msg);
 
        match msg {
 
            S(LeaderAnnounce { leader }) => {
 
                // someone else completed the echo and became leader first!
 
                // the sender is my parent
 
                parent = Some(index);
 
                my_leader = leader;
 
                awaiting.clear();
 
                break 'echo_loop;
 
            }
 
            S(LeaderEcho { maybe_leader }) => {
 
                use Ordering::*;
 
                match maybe_leader.cmp(&my_leader) {
 
                    Less => { /* ignore */ }
 
                    Equal => {
 
                        awaiting.remove(&index);
 
                        if awaiting.is_empty() {
 
                            if let Some(p) = parent {
 
                                // return the echo to my parent
 
                                endpoint_exts[p].endpoint.send(&S(LeaderEcho { maybe_leader }))?;
 
                            } else {
 
                                // DECIDE!
 
                                break 'echo_loop;
 
                            }
 
                        }
 
                    }
 
                    Greater => {
 
                        // join new echo
 
                        log!(logger, "Setting leader to index {:?}", index);
 
                        parent = Some(index);
 
                        my_leader = maybe_leader;
 
                        let echo = S(LeaderEcho { maybe_leader: my_leader });
 
                        awaiting.clear();
 
                        if endpoint_exts.len() == 1 {
 
                            // immediately reply to parent
 
                            log!(logger, "replying echo to parent {:?} immediately", index);
 
                            endpoint_exts[index].endpoint.send(&echo)?;
 
                        } else {
 
                            for (index2, ee) in endpoint_exts.iter_mut().enumerate() {
 
                                if index2 == index {
 
                                    continue;
 
                                }
 
                                log!(logger, "repeating echo {:?} to {:?}", &echo, index2);
 
                                ee.endpoint.send(&echo)?;
 
                                awaiting.insert(index2);
 
                            }
 
                        }
 
                    }
 
                }
 
            }
 
            inappropriate_msg => ep.delayed_messages.push((index, inappropriate_msg)),
 
        }
 
    }
 
    match parent {
 
        None => assert_eq!(
 
            my_leader, controller_id,
 
            "I've got no parent, but I consider {:?} the leader?",
 
            my_leader
 
        ),
 
        Some(parent) => assert_ne!(
 
            my_leader, controller_id,
 
            "I have {:?} as parent, but I consider myself ({:?}) the leader?",
 
            parent, controller_id
 
        ),
 
    }
 

	
 
    log!(logger, "DONE WITH ECHO! Leader has cid={:?}", my_leader);
 

	
 
    // 3. broadcast leader announcement (except to parent: confirm they are your parent)
 
    //    in this loop, every node sends 1 message to each neighbor
 
    // await 1 message from all non-parents
 
    let msg_for_non_parents = S(LeaderAnnounce { leader: my_leader });
 
    for (index, ee) in endpoint_exts.iter_mut().enumerate() {
 
        let msg = if Some(index) == parent {
 
            &S(YouAreMyParent)
 
        } else {
 
            awaiting.insert(index);
 
            &msg_for_non_parents
 
        };
 
        log!(logger, "ANNOUNCING to {:?} {:?}", index, msg);
 
        ee.endpoint.send(msg)?;
 
    }
 
    let mut children = Vec::default();
 
    ep.undelay_all();
 
    while !awaiting.is_empty() {
 
        let (index, msg) = ep.try_recv_any(endpoint_exts, deadline).map_err(drop)?;
 
        match msg {
 
            S(YouAreMyParent) => {
 
                assert!(awaiting.remove(&index));
 
                children.push(index);
 
            }
 
            S(SetupMsg::LeaderAnnounce { leader }) => {
 
                assert!(awaiting.remove(&index));
 
                assert!(leader == my_leader);
 
                assert!(Some(index) != parent);
 
                // they wouldn't send me this if they considered me their parent
 
            }
 
            inappropriate_msg => ep.delayed_messages.push((index, inappropriate_msg)),
 
        }
 
    }
 
    children.sort();
 
    children.dedup();
 
    Ok(Neighborhood { parent, children })
 
}
0 comments (0 inline, 0 general)