Changeset - 65390fb1cdbc
[Not reviewed]
0 9 1
Christopher Esterhuyse - 5 years ago 2020-06-24 12:12:27
christopher.esterhuyse@gmail.com
bugfixing and rewrote dist algs to be more readable
10 files changed with 455 insertions and 252 deletions:
0 comments (0 inline, 0 general)
src/common.rs
Show inline comments
 
@@ -28,14 +28,14 @@ pub use Polarity::*;
 

	
 
///////////////////// DEFS /////////////////////
 

	
 
pub type ControllerId = u32;
 
pub type ConnectorId = u32;
 
pub type PortSuffix = u32;
 

	
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
pub struct Id {
 
    pub(crate) controller_id: ControllerId,
 
    pub(crate) connector_id: ConnectorId,
 
    pub(crate) u32_suffix: PortSuffix,
 
}
 

	
 
@@ -179,17 +179,17 @@ impl From<Vec<u8>> for Payload {
 
}
 
impl Debug for PortId {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        write!(f, "PID<{},{}>", self.0.controller_id, self.0.u32_suffix)
 
        write!(f, "PID<{},{}>", self.0.connector_id, self.0.u32_suffix)
 
    }
 
}
 
impl Debug for FiringVar {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        write!(f, "VID<{},{}>", (self.0).0.controller_id, (self.0).0.u32_suffix)
 
        write!(f, "VID<{},{}>", (self.0).0.connector_id, (self.0).0.u32_suffix)
 
    }
 
}
 
impl Debug for ProtoComponentId {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        write!(f, "ProtoComponentId({},{})", self.0.controller_id, self.0.u32_suffix)
 
        write!(f, "ProtoComponentId({},{})", self.0.connector_id, self.0.u32_suffix)
 
    }
 
}
 
impl std::ops::Not for Polarity {
src/lib.rs
Show inline comments
 
@@ -8,7 +8,7 @@ mod runtime;
 
// #[cfg(test)]
 
// mod test;
 

	
 
pub use common::{ControllerId, Polarity, PortId};
 
pub use common::{ConnectorId, Polarity, PortId};
 
pub use protocol::ProtocolDescription;
 
pub use runtime::{error, Connector, EndpointSetup, FileLogger, VecLogger};
 

	
src/macros.rs
Show inline comments
 
macro_rules! endptlog {
 
    ($logger:expr, $($arg:tt)*) => {{
 
        // let w = $logger.line_writer();
 
        //    let _ = write!(w, "[ENDPT]");
 
        //    let _ = writeln!(w, $($arg)*);
 
    }};
 
}
 
macro_rules! log {
 
    ($logger:expr, $($arg:tt)*) => {{
 
        let _ = write!($logger.line_writer(), $($arg)*).unwrap();
 
        let _ = writeln!($logger.line_writer(), $($arg)*);
 
    }};
 
}
 
// macro_rules! assert_let {
 
//     ($pat:pat = $expr:expr => $work:expr) => {
 
//         if let $pat = $expr {
 
//             $work
 
//         } else {
 
//             panic!("assert_let failed");
 
//         }
 
//     };
 
// }
 

	
 
// #[test]
 
// fn assert_let() {
 
//     let x = Some(5);
 
//     let z = assert_let![Some(y) = x => {
 
//         println!("{:?}", y);
 
//         3
 
//     }];
 
//     println!("{:?}", z);
 
// }
 

	
 
// #[test]
 
// #[should_panic]
 
// fn must_let_panic() {
 
//     let x: Option<u32> = None;
 
//     assert_let![Some(y) = x => {
 
//         println!("{:?}", y);
 
//     }];
 
// }
src/runtime/communication.rs
Show inline comments
 
@@ -348,7 +348,7 @@ impl Connector {
 
                    log!(logger, "No decision yet. Let's recv an endpoint msg...");
 
                    {
 
                        let (endpoint_index, msg) = loop {
 
                            match endpoint_manager.try_recv_any_comms(deadline)? {
 
                            match endpoint_manager.try_recv_any_comms(logger, deadline)? {
 
                                None => {
 
                                    log!(
 
                                        logger,
 
@@ -422,7 +422,7 @@ impl Connector {
 
                            }
 
                            CommMsgContents::Suggest { suggestion } => {
 
                                // only accept this control msg through a child endpoint
 
                                if neighborhood.children.binary_search(&endpoint_index).is_ok() {
 
                                if neighborhood.children.contains(&endpoint_index) {
 
                                    match suggestion {
 
                                        Decision::Success(predicate) => {
 
                                            // child solution contributes to local solution
 
@@ -492,7 +492,7 @@ impl Connector {
 
                }
 

	
 
                *round_result = match decision {
 
                    Decision::Failure => Err(DistributedTimeout),
 
                    Decision::Failure => Err(RoundFailure),
 
                    Decision::Success(predicate) => {
 
                        // commit changes to component states
 
                        self.proto_components.clear();
src/runtime/endpoints.rs
Show inline comments
 
@@ -13,13 +13,21 @@ enum TryRecyAnyError {
 

	
 
/////////////////////
 

	
 
fn would_block(err: &std::io::Error) -> bool {
 
    err.kind() == std::io::ErrorKind::WouldBlock
 
}
 
impl Endpoint {
 
    pub fn try_recv<T: serde::de::DeserializeOwned>(&mut self) -> Result<Option<T>, EndpointError> {
 
    pub fn try_recv<T: serde::de::DeserializeOwned>(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
    ) -> Result<Option<T>, EndpointError> {
 
        use EndpointError::*;
 
        // populate inbox as much as possible
 
        'read_loop: loop {
 
            match self.stream.read_to_end(&mut self.inbox) {
 
                Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => break 'read_loop,
 
            let res = self.stream.read_to_end(&mut self.inbox);
 
            endptlog!(logger, "Stream read to end {:?}", &res);
 
            match res {
 
                Err(e) if would_block(&e) => break 'read_loop,
 
                Ok(0) => break 'read_loop,
 
                Ok(_) => (),
 
                Err(_e) => return Err(BrokenEndpoint),
 
@@ -48,6 +56,12 @@ impl Endpoint {
 
}
 

	
 
impl EndpointManager {
 
    pub fn index_iter(&self) -> Range<usize> {
 
        0..self.num_endpoints()
 
    }
 
    pub fn num_endpoints(&self) -> usize {
 
        self.endpoint_exts.len()
 
    }
 
    pub fn send_to_setup(&mut self, index: usize, msg: &Msg) -> Result<(), ConnectError> {
 
        let endpoint = &mut self.endpoint_exts[index].endpoint;
 
        endpoint.send(msg).map_err(|err| {
 
@@ -59,22 +73,24 @@ impl EndpointManager {
 
    }
 
    pub fn try_recv_any_comms(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        deadline: Option<Instant>,
 
    ) -> Result<Option<(usize, Msg)>, SyncError> {
 
        use {SyncError as Se, TryRecyAnyError as Trae};
 
        match self.try_recv_any(deadline) {
 
        match self.try_recv_any(logger, deadline) {
 
            Ok(tup) => Ok(Some(tup)),
 
            Err(Trae::Timeout) => Ok(None),
 
            Err(Trae::PollFailed) => Err(Se::PollFailed),
 
            Err(Trae::EndpointError { error, index }) => Err(Se::BrokenEndpoint(index)),
 
            Err(Trae::EndpointError { error: _, index }) => Err(Se::BrokenEndpoint(index)),
 
        }
 
    }
 
    pub fn try_recv_any_setup(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        deadline: Option<Instant>,
 
    ) -> Result<(usize, Msg), ConnectError> {
 
        use {ConnectError as Ce, TryRecyAnyError as Trae};
 
        self.try_recv_any(deadline).map_err(|err| match err {
 
        self.try_recv_any(logger, deadline).map_err(|err| match err {
 
            Trae::Timeout => Ce::Timeout,
 
            Trae::PollFailed => Ce::PollFailed,
 
            Trae::EndpointError { error, index } => Ce::EndpointSetupError(
 
@@ -83,36 +99,56 @@ impl EndpointManager {
 
            ),
 
        })
 
    }
 
    fn try_recv_any(&mut self, deadline: Option<Instant>) -> Result<(usize, Msg), TryRecyAnyError> {
 
        use TryRecyAnyError::*;
 
    fn try_recv_any(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        deadline: Option<Instant>,
 
    ) -> Result<(usize, Msg), TryRecyAnyError> {
 
        use TryRecyAnyError as Trea;
 
        // 1. try messages already buffered
 
        if let Some(x) = self.undelayed_messages.pop() {
 
            endptlog!(logger, "RECV undelayed_msg {:?}", &x);
 
            return Ok(x);
 
        }
 
        loop {
 
            // 2. try read a message from an endpoint that raised an event with poll() but wasn't drained
 
            while let Some(index) = self.polled_undrained.pop() {
 
                let endpoint = &mut self.endpoint_exts[index].endpoint;
 
                if let Some(msg) =
 
                    endpoint.try_recv().map_err(|error| EndpointError { error, index })?
 
                if let Some(msg) = endpoint
 
                    .try_recv(logger)
 
                    .map_err(|error| Trea::EndpointError { error, index })?
 
                {
 
                    if !endpoint.inbox.is_empty() {
 
                        // there may be another message waiting!
 
                        self.polled_undrained.insert(index);
 
                    }
 
                    endptlog!(logger, "RECV polled_undrained {:?}", &msg);
 
                    // if !endpoint.inbox.is_empty() {
 
                    // there may be another message waiting!
 
                    self.polled_undrained.insert(index);
 
                    // }
 
                    return Ok((index, msg));
 
                }
 
            }
 
            // 3. No message yet. Do we have enough time to poll?
 
            let remaining = if let Some(deadline) = deadline {
 
                Some(deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?)
 
                Some(deadline.checked_duration_since(Instant::now()).ok_or(Trea::Timeout)?)
 
            } else {
 
                None
 
            };
 
            self.poll.poll(&mut self.events, remaining).map_err(|_| PollFailed)?;
 
            self.poll.poll(&mut self.events, remaining).map_err(|_| Trea::PollFailed)?;
 
            for event in self.events.iter() {
 
                let Token(index) = event.token();
 
                self.polled_undrained.insert(index);
 
                endptlog!(
 
                    logger,
 
                    "RECV poll event {:?} for endpoint index {:?}. undrained: {:?}",
 
                    &event,
 
                    index,
 
                    self.polled_undrained.iter()
 
                );
 
                if event.is_error() {
 
                    return Err(Trea::EndpointError {
 
                        error: EndpointError::BrokenEndpoint,
 
                        index,
 
                    });
 
                }
 
            }
 
            self.events.clear();
 
        }
src/runtime/error.rs
Show inline comments
 
use crate::common::*;
 

	
 
#[derive(Debug, Clone)]
 
pub enum EndpointError {
 
    MalformedMessage,
 
    BrokenEndpoint,
 
#[derive(Debug)]
 
pub enum ConnectError {
 
    BindFailed(SocketAddr),
 
    PollInitFailed,
 
    Timeout,
 
    PollFailed,
 
    AcceptFailed(SocketAddr),
 
    AlreadyConnected,
 
    PortPeerPolarityMismatch(PortId),
 
    EndpointSetupError(SocketAddr, EndpointError),
 
    SetupAlgMisbehavior,
 
}
 
////////////////////////
 
#[derive(Debug, Clone)]
 
pub enum SyncError {
 
    Timeout,
 
    NotConnected,
 
    InconsistentProtoComponent(ProtoComponentId),
 
    IndistinguishableBatches([usize; 2]),
 
    DistributedTimeout,
 
    RoundFailure,
 
    PollFailed,
 
    BrokenEndpoint(usize),
 
}
 
#[derive(Debug, Clone)]
 
pub enum EndpointError {
 
    MalformedMessage,
 
    BrokenEndpoint,
 
}
 
#[derive(Debug)]
 
pub enum PortOpError {
 
    WrongPolarity,
 
@@ -28,19 +40,7 @@ pub enum GottenError {
 
    PortDidntGet,
 
    PreviousSyncFailed,
 
}
 

	
 
#[derive(Debug, Eq, PartialEq)]
 
pub enum NextBatchError {
 
    NotConnected,
 
}
 
#[derive(Debug)]
 
pub enum ConnectError {
 
    BindFailed(SocketAddr),
 
    PollInitFailed,
 
    Timeout,
 
    PollFailed,
 
    AcceptFailed(SocketAddr),
 
    AlreadyConnected,
 
    PortPeerPolarityMismatch(PortId),
 
    EndpointSetupError(SocketAddr, EndpointError),
 
}
src/runtime/logging.rs
Show inline comments
 
new file 100644
 
use super::*;
 

	
 
impl FileLogger {
 
    pub fn new(connector_id: ConnectorId, file: std::fs::File) -> Self {
 
        Self(connector_id, file)
 
    }
 
}
 
impl VecLogger {
 
    pub fn new(connector_id: ConnectorId) -> Self {
 
        Self(connector_id, Default::default())
 
    }
 
}
 
/////////////////
 
impl Logger for DummyLogger {
 
    fn line_writer(&mut self) -> &mut dyn std::io::Write {
 
        self
 
    }
 
}
 
impl Logger for VecLogger {
 
    fn line_writer(&mut self) -> &mut dyn std::io::Write {
 
        let _ = write!(&mut self.1, "CID({}): ", self.0);
 
        self
 
    }
 
}
 
impl Logger for FileLogger {
 
    fn line_writer(&mut self) -> &mut dyn std::io::Write {
 
        let _ = write!(&mut self.1, "CID({}): ", self.0);
 
        &mut self.1
 
    }
 
}
 
///////////////////
 
impl Drop for VecLogger {
 
    fn drop(&mut self) {
 
        let stdout = std::io::stderr();
 
        let mut lock = stdout.lock();
 
        writeln!(lock, "--- DROP LOG DUMP ---").unwrap();
 
        let _ = std::io::Write::write(&mut lock, self.1.as_slice());
 
    }
 
}
 
impl std::io::Write for VecLogger {
 
    fn flush(&mut self) -> Result<(), std::io::Error> {
 
        Ok(())
 
    }
 
    fn write(&mut self, data: &[u8]) -> Result<usize, std::io::Error> {
 
        self.1.extend_from_slice(data);
 
        Ok(data.len())
 
    }
 
}
 
impl std::io::Write for DummyLogger {
 
    fn flush(&mut self) -> Result<(), std::io::Error> {
 
        Ok(())
 
    }
 
    fn write(&mut self, bytes: &[u8]) -> Result<usize, std::io::Error> {
 
        Ok(bytes.len())
 
    }
 
}
src/runtime/mod.rs
Show inline comments
 
mod communication;
 
mod endpoints;
 
pub mod error;
 
mod logging;
 
mod setup;
 

	
 
#[cfg(test)]
 
@@ -9,6 +10,11 @@ mod tests;
 
use crate::common::*;
 
use error::*;
 

	
 
#[derive(Debug)]
 
pub struct VecSet<T: std::cmp::Ord> {
 
    // invariant: ordered, deduplicated
 
    vec: Vec<T>,
 
}
 
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
 
pub enum LocalComponentId {
 
    Native,
 
@@ -37,8 +43,8 @@ pub enum Msg {
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
pub enum SetupMsg {
 
    MyPortInfo(MyPortInfo),
 
    LeaderEcho { maybe_leader: ControllerId },
 
    LeaderAnnounce { leader: ControllerId },
 
    LeaderWave { wave_leader: ConnectorId },
 
    LeaderAnnounce { tree_leader: ConnectorId },
 
    YouAreMyParent,
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
@@ -78,11 +84,11 @@ pub trait Logger: Debug {
 
    fn line_writer(&mut self) -> &mut dyn std::io::Write;
 
}
 
#[derive(Debug)]
 
pub struct VecLogger(ControllerId, Vec<u8>);
 
pub struct VecLogger(ConnectorId, Vec<u8>);
 
#[derive(Debug)]
 
pub struct DummyLogger;
 
#[derive(Debug)]
 
pub struct FileLogger(ControllerId, std::fs::File);
 
pub struct FileLogger(ConnectorId, std::fs::File);
 
#[derive(Debug, Clone)]
 
pub struct EndpointSetup {
 
    pub sock_addr: SocketAddr,
 
@@ -96,7 +102,7 @@ pub struct EndpointExt {
 
#[derive(Debug)]
 
pub struct Neighborhood {
 
    parent: Option<usize>,
 
    children: Vec<usize>, // ordered, deduplicated
 
    children: VecSet<usize>,
 
}
 
#[derive(Debug)]
 
pub struct MemInMsg {
 
@@ -105,7 +111,7 @@ pub struct MemInMsg {
 
}
 
#[derive(Debug)]
 
pub struct IdManager {
 
    controller_id: ControllerId,
 
    connector_id: ConnectorId,
 
    port_suffix_stream: U32Stream,
 
    proto_component_suffix_stream: U32Stream,
 
}
 
@@ -177,6 +183,19 @@ pub struct SyncProtoContext<'a> {
 
    inbox: &'a HashMap<PortId, Payload>,
 
}
 
////////////////
 
impl<T: std::cmp::Ord> VecSet<T> {
 
    fn iter(&self) -> impl Iterator<Item = &T> {
 
        self.vec.iter()
 
    }
 
    fn contains(&self, element: &T) -> bool {
 
        self.vec.binary_search(element).is_ok()
 
    }
 
    fn new(mut vec: Vec<T>) -> Self {
 
        vec.sort();
 
        vec.dedup();
 
        Self { vec }
 
    }
 
}
 
impl PortInfo {
 
    fn firing_var_for(&self, port: PortId) -> FiringVar {
 
        FiringVar(match self.polarities.get(&port).unwrap() {
 
@@ -186,19 +205,19 @@ impl PortInfo {
 
    }
 
}
 
impl IdManager {
 
    fn new(controller_id: ControllerId) -> Self {
 
    fn new(connector_id: ConnectorId) -> Self {
 
        Self {
 
            controller_id,
 
            connector_id,
 
            port_suffix_stream: Default::default(),
 
            proto_component_suffix_stream: Default::default(),
 
        }
 
    }
 
    fn new_port_id(&mut self) -> PortId {
 
        Id { controller_id: self.controller_id, u32_suffix: self.port_suffix_stream.next() }.into()
 
        Id { connector_id: self.connector_id, u32_suffix: self.port_suffix_stream.next() }.into()
 
    }
 
    fn new_proto_component_id(&mut self) -> ProtoComponentId {
 
        Id {
 
            controller_id: self.controller_id,
 
            connector_id: self.connector_id,
 
            u32_suffix: self.proto_component_suffix_stream.next(),
 
        }
 
        .into()
 
@@ -272,58 +291,6 @@ impl Connector {
 
        Ok(())
 
    }
 
}
 
impl Logger for DummyLogger {
 
    fn line_writer(&mut self) -> &mut dyn std::io::Write {
 
        impl std::io::Write for DummyLogger {
 
            fn flush(&mut self) -> Result<(), std::io::Error> {
 
                Ok(())
 
            }
 
            fn write(&mut self, bytes: &[u8]) -> Result<usize, std::io::Error> {
 
                Ok(bytes.len())
 
            }
 
        }
 
        self
 
    }
 
}
 
impl VecLogger {
 
    pub fn new(controller_id: ControllerId) -> Self {
 
        Self(controller_id, Default::default())
 
    }
 
}
 
impl Drop for VecLogger {
 
    fn drop(&mut self) {
 
        let stdout = std::io::stderr();
 
        let mut lock = stdout.lock();
 
        writeln!(lock, "--- DROP LOG DUMP ---").unwrap();
 
        let _ = std::io::Write::write(&mut lock, self.1.as_slice());
 
    }
 
}
 
impl Logger for VecLogger {
 
    fn line_writer(&mut self) -> &mut dyn std::io::Write {
 
        let _ = write!(&mut self.1, "\nCID({}): ", self.0);
 
        self
 
    }
 
}
 
impl FileLogger {
 
    pub fn new(controller_id: ControllerId, file: std::fs::File) -> Self {
 
        Self(controller_id, file)
 
    }
 
}
 
impl Logger for FileLogger {
 
    fn line_writer(&mut self) -> &mut dyn std::io::Write {
 
        let _ = write!(&mut self.1, "\nCID({}): ", self.0);
 
        &mut self.1
 
    }
 
}
 
impl std::io::Write for VecLogger {
 
    fn flush(&mut self) -> Result<(), std::io::Error> {
 
        Ok(())
 
    }
 
    fn write(&mut self, data: &[u8]) -> Result<usize, std::io::Error> {
 
        self.1.extend_from_slice(data);
 
        Ok(data.len())
 
    }
 
}
 
impl Predicate {
 
    #[inline]
 
    pub fn inserted(mut self, k: FiringVar, v: bool) -> Self {
src/runtime/setup.rs
Show inline comments
 
@@ -5,24 +5,25 @@ use std::io::ErrorKind::WouldBlock;
 
impl Connector {
 
    pub fn new_simple(
 
        proto_description: Arc<ProtocolDescription>,
 
        controller_id: ControllerId,
 
        connector_id: ConnectorId,
 
    ) -> Self {
 
        let logger = Box::new(DummyLogger);
 
        // let logger = Box::new(DummyLogger);
 
        let surplus_sockets = 2;
 
        Self::new(logger, proto_description, controller_id, surplus_sockets)
 
        Self::new(logger, proto_description, connector_id, surplus_sockets)
 
    }
 
    pub fn new(
 
        logger: Box<dyn Logger>,
 
        mut logger: Box<dyn Logger>,
 
        proto_description: Arc<ProtocolDescription>,
 
        controller_id: ControllerId,
 
        connector_id: ConnectorId,
 
        surplus_sockets: u16,
 
    ) -> Self {
 
        log!(&mut *logger, "Created with connector_id {:?}", connector_id);
 
        Self {
 
            proto_description,
 
            proto_components: Default::default(),
 
            logger,
 
            id_manager: IdManager::new(controller_id),
 
            id_manager: IdManager::new(connector_id),
 
            native_ports: Default::default(),
 
            port_info: Default::default(),
 
            phased: ConnectorPhased::Setup { endpoint_setups: Default::default(), surplus_sockets },
 
@@ -77,7 +78,7 @@ impl Connector {
 
                );
 
                // leader election and tree construction
 
                let neighborhood = init_neighborhood(
 
                    self.id_manager.controller_id,
 
                    self.id_manager.connector_id,
 
                    &mut *self.logger,
 
                    &mut endpoint_manager,
 
                    deadline,
 
@@ -215,7 +216,7 @@ fn new_endpoint_manager(
 
                        *sent_local_port = true;
 
                    }
 
                    if event.is_readable() && recv_peer_port.is_none() {
 
                        let maybe_msg = endpoint.try_recv().map_err(|e| {
 
                        let maybe_msg = endpoint.try_recv(logger).map_err(|e| {
 
                            EndpointSetupError(endpoint.stream.local_addr().unwrap(), e)
 
                        })?;
 
                        if maybe_msg.is_some() && !endpoint.inbox.is_empty() {
 
@@ -260,9 +261,15 @@ fn new_endpoint_manager(
 
    }
 
    let endpoint_exts = todos
 
        .into_iter()
 
        .map(|Todo { todo_endpoint, local_port, .. }| EndpointExt {
 
        .enumerate()
 
        .map(|(index, Todo { todo_endpoint, local_port, .. })| EndpointExt {
 
            endpoint: match todo_endpoint {
 
                TodoEndpoint::Endpoint(endpoint) => endpoint,
 
                TodoEndpoint::Endpoint(mut endpoint) => {
 
                    poll.registry()
 
                        .reregister(&mut endpoint.stream, Token(index), Interest::READABLE)
 
                        .unwrap();
 
                    endpoint
 
                }
 
                TodoEndpoint::Listener(..) => unreachable!(),
 
            },
 
            getter_for_incoming: local_port,
 
@@ -279,137 +286,192 @@ fn new_endpoint_manager(
 
}
 

	
 
fn init_neighborhood(
 
    controller_id: ControllerId,
 
    connector_id: ConnectorId,
 
    logger: &mut dyn Logger,
 
    em: &mut EndpointManager,
 
    deadline: Option<Instant>,
 
) -> Result<Neighborhood, ConnectError> {
 
    use {Msg::SetupMsg as S, SetupMsg::*};
 
    log!(logger, "beginning neighborhood construction");
 
    // 1. broadcast my ID as the first echo. await reply from all neighbors
 
    let echo = S(LeaderEcho { maybe_leader: controller_id });
 
    let mut awaiting = HashSet::with_capacity(em.endpoint_exts.len());
 
    for index in 0..em.endpoint_exts.len() {
 
        log!(logger, "{:?}'s initial echo to {:?}, {:?}", controller_id, index, &echo);
 
        em.send_to_setup(index, &echo)?;
 
        awaiting.insert(index);
 
    use {ConnectError::*, Msg::SetupMsg as S, SetupMsg::*};
 
    ////////////////////////////////
 
    #[derive(Debug)]
 
    struct WaveState {
 
        parent: Option<usize>,
 
        leader: ConnectorId,
 
    }
 

	
 
    // 2. Receive incoming replies. whenever a higher-id echo arrives,
 
    //    adopt it as leader, sender as parent, and reset the await set.
 
    let mut parent: Option<usize> = None;
 
    let mut my_leader = controller_id;
 
    em.undelay_all();
 
    'echo_loop: while !awaiting.is_empty() || parent.is_some() {
 
        let (index, msg) = em.try_recv_any_setup(deadline)?;
 
        log!(logger, "GOT from index {:?} msg {:?}", &index, &msg);
 
        match msg {
 
            S(LeaderAnnounce { leader }) => {
 
                // someone else completed the echo and became leader first!
 
                // the sender is my parent
 
                parent = Some(index);
 
                my_leader = leader;
 
                awaiting.clear();
 
                break 'echo_loop;
 
    fn do_wave(
 
        em: &mut EndpointManager,
 
        awaiting: &mut HashSet<usize>,
 
        ws: &WaveState,
 
    ) -> Result<(), ConnectError> {
 
        awaiting.clear();
 
        let msg = S(LeaderWave { wave_leader: ws.leader });
 
        for index in em.index_iter() {
 
            if Some(index) != ws.parent {
 
                em.send_to_setup(index, &msg)?;
 
                awaiting.insert(index);
 
            }
 
            S(LeaderEcho { maybe_leader }) => {
 
                use Ordering::*;
 
                match maybe_leader.cmp(&my_leader) {
 
                    Less => { /* ignore this wave */ }
 
                    Equal => {
 
                        awaiting.remove(&index);
 
                        if awaiting.is_empty() {
 
                            if let Some(p) = parent {
 
                                // return the echo to my parent
 
                                em.send_to_setup(p, &S(LeaderEcho { maybe_leader }))?;
 
                            } else {
 
                                // wave completed!
 
                                break 'echo_loop;
 
        }
 
        Ok(())
 
    }
 
    ///////////////////////
 
    /*
 
    Conceptually, we have two distinct disstributed algorithms back-to-back
 
    1. Leader election using echo algorithm with extinction.
 
        - Each connector initiates a wave tagged with their ID
 
        - Connectors participate in waves of GREATER ID, abandoning previous waves
 
        - Only the wave of the connector with GREATEST ID completes, whereupon they are the leader
 
    2. Tree construction
 
        - The leader broadcasts their leadership with msg A
 
        - Upon receiving their first announcement, connectors reply B, and send A to all peers
 
        - A controller exits once they have received A or B from each neighbor
 

	
 
    The actual implementation is muddier, because non-leaders aren't aware of termiantion of algorithm 1,
 
    so they rely on receipt of the leader's announcement to realize that algorithm 2 has begun.
 

	
 
    NOTE the distinction between PARENT and LEADER
 
    */
 
    log!(logger, "beginning neighborhood construction");
 
    if em.num_endpoints() == 0 {
 
        log!(logger, "Edge case of no neighbors! No parent an no children!");
 
        return Ok(Neighborhood { parent: None, children: VecSet::new(vec![]) });
 
    }
 
    log!(logger, "Have {} endpoints. Must participate in distributed alg.", em.num_endpoints());
 
    let mut awaiting = HashSet::with_capacity(em.num_endpoints());
 
    // 1+ neighbors. Leader can only be learned by receiving messages
 
    // loop ends when I know my sink tree parent (implies leader was elected)
 
    let election_result: WaveState = {
 
        // initially: No parent, I'm the best leader.
 
        let mut best_wave = WaveState { parent: None, leader: connector_id };
 
        // start a wave for this initial state
 
        do_wave(em, &mut awaiting, &best_wave)?;
 
        // with 1+ neighbors, progress is only made in response to incoming messages
 
        em.undelay_all();
 
        'election: loop {
 
            log!(logger, "Election loop. awaiting {:?}...", awaiting.iter());
 
            let (recv_index, msg) = em.try_recv_any_setup(logger, deadline)?;
 
            log!(logger, "Received from index {:?} msg {:?}", &recv_index, &msg);
 
            match msg {
 
                S(LeaderAnnounce { tree_leader }) => {
 
                    let election_result =
 
                        WaveState { leader: tree_leader, parent: Some(recv_index) };
 
                    log!(logger, "Election lost! Result {:?}", &election_result);
 
                    assert!(election_result.leader >= best_wave.leader);
 
                    assert_ne!(election_result.leader, connector_id);
 
                    break 'election election_result;
 
                }
 
                S(LeaderWave { wave_leader }) => {
 
                    use Ordering as O;
 
                    match wave_leader.cmp(&best_wave.leader) {
 
                        O::Less => log!(
 
                            logger,
 
                            "Ignoring wave with Id {:?}<{:?}",
 
                            wave_leader,
 
                            best_wave.leader
 
                        ),
 
                        O::Greater => {
 
                            log!(
 
                                logger,
 
                                "Joining wave with Id {:?}>{:?}",
 
                                wave_leader,
 
                                best_wave.leader
 
                            );
 
                            best_wave = WaveState { leader: wave_leader, parent: Some(recv_index) };
 
                            log!(logger, "New wave state {:?}", &best_wave);
 
                            do_wave(em, &mut awaiting, &best_wave)?;
 
                            if awaiting.is_empty() {
 
                                log!(logger, "Special case! Only neighbor is parent. Replying to {:?} msg {:?}", recv_index, &msg);
 
                                em.send_to_setup(recv_index, &msg)?;
 
                            }
 
                        }
 
                    }
 
                    Greater => {
 
                        // join new echo
 
                        log!(logger, "Setting leader to index {:?}", index);
 
                        parent = Some(index);
 
                        my_leader = maybe_leader;
 
                        let echo = S(LeaderEcho { maybe_leader: my_leader });
 
                        awaiting.clear();
 
                        if em.endpoint_exts.len() == 1 {
 
                            // immediately reply to parent
 
                            log!(logger, "replying echo to parent {:?} immediately", index);
 
                            em.send_to_setup(index, &echo)?;
 
                        } else {
 
                            for index2 in 0..em.endpoint_exts.len() {
 
                                if index2 == index {
 
                                    // don't propagate echo to my parent
 
                                    continue;
 
                        O::Equal => {
 
                            assert!(awaiting.remove(&recv_index));
 
                            log!(
 
                                logger,
 
                                "Wave reply from index {:?} for leader {:?}. Now awaiting {} replies",
 
                                recv_index,
 
                                best_wave.leader,
 
                                awaiting.len()
 
                            );
 
                            if awaiting.is_empty() {
 
                                if let Some(parent) = best_wave.parent {
 
                                    log!(
 
                                        logger,
 
                                        "Sub-wave done! replying to parent {:?} msg {:?}",
 
                                        parent,
 
                                        &msg
 
                                    );
 
                                    em.send_to_setup(parent, &msg)?;
 
                                } else {
 
                                    let election_result: WaveState = best_wave;
 
                                    log!(logger, "Election won! Result {:?}", &election_result);
 
                                    break 'election election_result;
 
                                }
 
                                log!(logger, "repeating echo {:?} to {:?}", &echo, index2);
 
                                em.send_to_setup(index2, &echo)?;
 
                                awaiting.insert(index2);
 
                            }
 
                        }
 
                    }
 
                }
 
            }
 
            inappropriate_msg => {
 
                log!(logger, "delaying msg {:?} during echo phase", inappropriate_msg);
 
                em.delayed_messages.push((index, inappropriate_msg))
 
                S(YouAreMyParent) | S(MyPortInfo(_)) => unreachable!(),
 
                comm_msg @ Msg::CommMsg { .. } => {
 
                    log!(logger, "delaying msg {:?} during election algorithm", comm_msg);
 
                    em.delayed_messages.push((recv_index, comm_msg));
 
                }
 
            }
 
        }
 
    }
 
    match parent {
 
        None => assert_eq!(
 
            my_leader, controller_id,
 
            "I've got no parent, but I consider {:?} the leader?",
 
            my_leader
 
        ),
 
        Some(parent) => assert_ne!(
 
            my_leader, controller_id,
 
            "I have {:?} as parent, but I consider myself ({:?}) the leader?",
 
            parent, controller_id
 
        ),
 
    }
 
    log!(logger, "DONE WITH ECHO! Leader has cid={:?}", my_leader);
 
    };
 

	
 
    // 3. broadcast leader announcement (except to parent: confirm they are your parent)
 
    //    in this loop, every node sends 1 message to each neighbor
 
    //    await 1 message from all non-parents.
 
    let msg_for_non_parents = S(LeaderAnnounce { leader: my_leader });
 
    for index in 0..em.endpoint_exts.len() {
 
        let msg = if Some(index) == parent {
 
            &S(YouAreMyParent)
 
    // starting algorithm 2. Send a message to every neighbor
 
    log!(logger, "Starting tree construction. Step 1: send one msg per neighbor");
 
    awaiting.clear();
 
    for index in em.index_iter() {
 
        if Some(index) == election_result.parent {
 
            em.send_to_setup(index, &S(YouAreMyParent))?;
 
        } else {
 
            awaiting.insert(index);
 
            &msg_for_non_parents
 
        };
 
        log!(logger, "ANNOUNCING to {:?} {:?}", index, msg);
 
        em.send_to_setup(index, msg)?;
 
            em.send_to_setup(index, &S(LeaderAnnounce { tree_leader: election_result.leader }))?;
 
        }
 
    }
 
    let mut children = Vec::default();
 
    let mut children = vec![];
 
    em.undelay_all();
 
    while !awaiting.is_empty() {
 
        log!(logger, "awaiting {:?}", &awaiting);
 
        let (index, msg) = em.try_recv_any_setup(deadline)?;
 
        log!(logger, "Tree construction_loop loop. awaiting {:?}...", awaiting.iter());
 
        let (recv_index, msg) = em.try_recv_any_setup(logger, deadline)?;
 
        log!(logger, "Received from index {:?} msg {:?}", &recv_index, &msg);
 
        match msg {
 
            S(YouAreMyParent) => {
 
                assert!(awaiting.remove(&index));
 
                children.push(index);
 
            S(LeaderWave { .. }) => { /* old message */ }
 
            S(LeaderAnnounce { .. }) => {
 
                // not a child
 
                log!(
 
                    logger,
 
                    "Got reply from non-child index {:?}. Children: {:?}",
 
                    recv_index,
 
                    children.iter()
 
                );
 
                if !awaiting.remove(&recv_index) {
 
                    return Err(SetupAlgMisbehavior);
 
                }
 
            }
 
            S(LeaderAnnounce { leader }) => {
 
                assert!(awaiting.remove(&index));
 
                assert!(leader == my_leader);
 
                assert!(Some(index) != parent);
 
                // they wouldn't send me this if they considered me their parent
 
            S(YouAreMyParent) => {
 
                if !awaiting.remove(&recv_index) {
 
                    log!(
 
                        logger,
 
                        "Got reply from child index {:?}. Children before... {:?}",
 
                        recv_index,
 
                        children.iter()
 
                    );
 
                    return Err(SetupAlgMisbehavior);
 
                }
 
                children.push(recv_index);
 
            }
 
            inappropriate_msg => {
 
                log!(logger, "delaying msg {:?} during echo-reply phase", inappropriate_msg);
 
                em.delayed_messages.push((index, inappropriate_msg));
 
            S(MyPortInfo(_)) => unreachable!(),
 
            comm_msg @ Msg::CommMsg { .. } => {
 
                log!(logger, "delaying msg {:?} during election algorithm", comm_msg);
 
                em.delayed_messages.push((recv_index, comm_msg));
 
            }
 
        }
 
    }
 
    children.sort();
 
    children.dedup();
 
    Ok(Neighborhood { parent, children })
 
    children.shrink_to_fit();
 
    let neighborhood =
 
        Neighborhood { parent: election_result.parent, children: VecSet::new(children) };
 
    log!(logger, "Neighborhood constructed {:?}", &neighborhood);
 
    Ok(neighborhood)
 
}
src/runtime/tests.rs
Show inline comments
 
use crate as reowolf;
 
use crossbeam_utils::thread::scope;
 
use reowolf::{
 
    error::*,
 
    Polarity::{Getter, Putter},
 
    *,
 
};
 
@@ -270,10 +271,10 @@ fn sync_sync() {
 
    c.gotten(g1).unwrap();
 
}
 

	
 
fn file_logged_connector(controller_id: ControllerId, path: &str) -> Connector {
 
fn file_logged_connector(connector_id: ConnectorId, path: &str) -> Connector {
 
    let file = std::fs::File::create(path).unwrap();
 
    let file_logger = Box::new(FileLogger::new(controller_id, file));
 
    Connector::new(file_logger, MINIMAL_PROTO.clone(), controller_id, 8)
 
    let file_logger = Box::new(FileLogger::new(connector_id, file));
 
    Connector::new(file_logger, MINIMAL_PROTO.clone(), connector_id, 8)
 
}
 

	
 
#[test]
 
@@ -361,3 +362,105 @@ fn distributed_msg_bounce() {
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn local_timeout() {
 
    let mut c = file_logged_connector(0, "./logs/local_timeout.txt");
 
    let [_, g] = c.new_port_pair();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.get(g).unwrap();
 
    match c.sync(Some(Duration::from_millis(200))) {
 
        Err(SyncError::RoundFailure) => {}
 
        res => panic!("expeted timeout. but got {:?}", res),
 
    }
 
}
 

	
 
#[test]
 
fn parent_timeout() {
 
    let sock_addr = next_test_addr();
 
    scope(|s| {
 
        s.spawn(|_| {
 
            // parent; times out
 
            let mut c = file_logged_connector(999, "./logs/parent_timeout_a.txt");
 
            let _ = c.new_net_port(Putter, EndpointSetup { sock_addr, is_active: true }).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.sync(Some(Duration::from_millis(300))).unwrap_err(); // timeout
 
        });
 
        s.spawn(|_| {
 
            // child
 
            let mut c = file_logged_connector(000, "./logs/parent_timeout_b.txt");
 
            let g = c.new_net_port(Getter, EndpointSetup { sock_addr, is_active: false }).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.get(g).unwrap(); // not matched by put
 
            c.sync(None).unwrap_err(); // no timeout
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn child_timeout() {
 
    let sock_addr = next_test_addr();
 
    scope(|s| {
 
        s.spawn(|_| {
 
            // child; times out
 
            let mut c = file_logged_connector(000, "./logs/child_timeout_a.txt");
 
            let _ = c.new_net_port(Putter, EndpointSetup { sock_addr, is_active: true }).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.sync(Some(Duration::from_millis(300))).unwrap_err(); // timeout
 
        });
 
        s.spawn(|_| {
 
            // parent
 
            let mut c = file_logged_connector(999, "./logs/child_timeout_b.txt");
 
            let g = c.new_net_port(Getter, EndpointSetup { sock_addr, is_active: false }).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.get(g).unwrap(); // not matched by put
 
            c.sync(None).unwrap_err(); // no timeout
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn chain_connect() {
 
    let sock_addrs = [next_test_addr(), next_test_addr(), next_test_addr(), next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(0, "./logs/chain_connect_a.txt");
 
            c.new_net_port(Putter, EndpointSetup { sock_addr: sock_addrs[0], is_active: false })
 
                .unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(2, "./logs/chain_connect_b.txt");
 
            c.new_net_port(Getter, EndpointSetup { sock_addr: sock_addrs[0], is_active: true })
 
                .unwrap();
 
            c.new_net_port(Putter, EndpointSetup { sock_addr: sock_addrs[1], is_active: false })
 
                .unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = file_logged_connector(1, "./logs/chain_connect_c.txt");
 
            c.new_net_port(Getter, EndpointSetup { sock_addr: sock_addrs[1], is_active: true })
 
                .unwrap();
 
            // c.new_net_port(Putter, EndpointSetup { sock_addr: sock_addrs[2], is_active: false })
 
            //     .unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
        });
 
        // s.spawn(|_| {
 
        //     let mut c = file_logged_connector(3, "./logs/chain_connect_d.txt");
 
        //     c.new_net_port(Getter, EndpointSetup { sock_addr: sock_addrs[2], is_active: true })
 
        //         .unwrap();
 
        //     c.new_net_port(Putter, EndpointSetup { sock_addr: sock_addrs[3], is_active: false })
 
        //         .unwrap();
 
        //     c.connect(Some(Duration::from_secs(1))).unwrap();
 
        // });
 
        // s.spawn(|_| {
 
        //     let mut c = file_logged_connector(4, "./logs/chain_connect_e.txt");
 
        //     c.new_net_port(Getter, EndpointSetup { sock_addr: sock_addrs[3], is_active: true })
 
        //         .unwrap();
 
        //     c.connect(Some(Duration::from_secs(1))).unwrap();
 
        // });
 
    })
 
    .unwrap();
 
}
0 comments (0 inline, 0 general)