Changeset - d1a70dfdafba
[Not reviewed]
0 8 0
Christopher Esterhuyse - 5 years ago 2020-06-24 08:09:12
christopher.esterhuyse@gmail.com
more robust error handling
8 files changed with 255 insertions and 132 deletions:
0 comments (0 inline, 0 general)
.gitignore
Show inline comments
 
target
 
/.idea
 
**/*.rs.bk
 
Cargo.lock
 
main
 
examples/*/*.exe
 
examples/reowolf*
 
\ No newline at end of file
 
examples/reowolf*
 
logs
 
\ No newline at end of file
src/lib.rs
Show inline comments
 
@@ -5,12 +5,12 @@ mod common;
 
mod protocol;
 
mod runtime;
 

	
 
// #[cfg(test)]
 
// mod test;
 

	
 
pub use common::Polarity;
 
pub use common::{ControllerId, Polarity, PortId};
 
pub use protocol::ProtocolDescription;
 
pub use runtime::{error, Connector, EndpointSetup, StringLogger};
 
pub use runtime::{error, Connector, EndpointSetup, FileLogger, VecLogger};
 

	
 
// #[cfg(feature = "ffi")]
 
// pub use runtime::ffi;
src/runtime/communication.rs
Show inline comments
 
@@ -98,25 +98,25 @@ impl Connector {
 
                    return Err(MultipleOpsOnPort);
 
                }
 
                Ok(())
 
            }
 
        }
 
    }
 
    pub fn sync(&mut self, timeout: Duration) -> Result<usize, SyncError> {
 
    pub fn sync(&mut self, timeout: Option<Duration>) -> Result<usize, SyncError> {
 
        use SyncError::*;
 
        match &mut self.phased {
 
            ConnectorPhased::Setup { .. } => Err(NotConnected),
 
            ConnectorPhased::Communication {
 
                round_index,
 
                neighborhood,
 
                native_batches,
 
                endpoint_manager,
 
                round_result,
 
                ..
 
            } => {
 
                let deadline = Instant::now() + timeout;
 
                let mut deadline = timeout.map(|to| Instant::now() + to);
 
                let logger: &mut dyn Logger = &mut *self.logger;
 
                // 1. run all proto components to Nonsync blockers
 
                log!(
 
                    logger,
 
                    "~~~ SYNC called with timeout {:?}; starting round {}",
 
                    &timeout,
 
@@ -344,14 +344,41 @@ impl Connector {
 
                    }
 

	
 
                    // stuck! make progress by receiving a msg
 
                    // try recv messages arriving through endpoints
 
                    log!(logger, "No decision yet. Let's recv an endpoint msg...");
 
                    {
 
                        let (endpoint_index, msg) =
 
                            endpoint_manager.try_recv_any(deadline).unwrap();
 
                        let (endpoint_index, msg) = loop {
 
                            match endpoint_manager.try_recv_any_comms(deadline)? {
 
                                None => {
 
                                    log!(
 
                                        logger,
 
                                        "Reached user-defined deadling without decision..."
 
                                    );
 
                                    if let Some(parent) = neighborhood.parent {
 
                                        log!(
 
                                            logger,
 
                                            "Sending failure request to parent index {}",
 
                                            parent
 
                                        );
 
                                        let msg = Msg::CommMsg(CommMsg {
 
                                            round_index: *round_index,
 
                                            contents: CommMsgContents::Suggest {
 
                                                suggestion: Decision::Failure,
 
                                            },
 
                                        });
 
                                        endpoint_manager.send_to(parent, &msg).unwrap();
 
                                    } else {
 
                                        log!(logger, "As the leader, deciding on timeout");
 
                                        break 'undecided Decision::Failure;
 
                                    }
 
                                    deadline = None;
 
                                }
 
                                Some((endpoint_index, msg)) => break (endpoint_index, msg),
 
                            }
 
                        };
 
                        log!(logger, "Received from endpoint {} msg {:?}", endpoint_index, &msg);
 
                        let comm_msg_contents = match msg {
 
                            Msg::SetupMsg(..) => {
 
                                log!(logger, "Discarding setup message; that phase is over");
 
                                continue 'undecided;
 
                            }
src/runtime/endpoints.rs
Show inline comments
 
use super::*;
 

	
 
struct MonitoredReader<R: Read> {
 
    bytes: usize,
 
    r: R,
 
}
 
#[derive(Debug)]
 
enum TryRecyAnyError {
 
    Timeout,
 
    PollFailed,
 
    EndpointError { error: EndpointError, index: usize },
 
}
 

	
 
/////////////////////
 

	
 
impl Endpoint {
 
    pub fn try_recv<T: serde::de::DeserializeOwned>(&mut self) -> Result<Option<T>, EndpointError> {
 
        use EndpointError::*;
 
@@ -33,22 +39,54 @@ impl Endpoint {
 
                _ => Err(MalformedMessage),
 
                // println!("SERDE ERRKIND {:?}", e);
 
                // Err(MalformedMessage)
 
            },
 
        }
 
    }
 
    pub fn send<T: serde::ser::Serialize>(&mut self, msg: &T) -> Result<(), ()> {
 
        bincode::serialize_into(&mut self.stream, msg).map_err(drop)
 
    pub fn send<T: serde::ser::Serialize>(&mut self, msg: &T) -> Result<(), EndpointError> {
 
        bincode::serialize_into(&mut self.stream, msg).map_err(|_| EndpointError::BrokenEndpoint)
 
    }
 
}
 

	
 
impl EndpointManager {
 
    pub fn send_to(&mut self, index: usize, msg: &Msg) -> Result<(), ()> {
 
    pub fn send_to_setup(&mut self, index: usize, msg: &Msg) -> Result<(), ConnectError> {
 
        let endpoint = &mut self.endpoint_exts[index].endpoint;
 
        endpoint.send(msg).map_err(|err| {
 
            ConnectError::EndpointSetupError(endpoint.stream.local_addr().unwrap(), err)
 
        })
 
    }
 
    pub fn send_to(&mut self, index: usize, msg: &Msg) -> Result<(), EndpointError> {
 
        self.endpoint_exts[index].endpoint.send(msg)
 
    }
 
    pub fn try_recv_any(&mut self, deadline: Instant) -> Result<(usize, Msg), TryRecyAnyError> {
 
    pub fn try_recv_any_comms(
 
        &mut self,
 
        deadline: Option<Instant>,
 
    ) -> Result<Option<(usize, Msg)>, SyncError> {
 
        use {SyncError as Se, TryRecyAnyError as Trae};
 
        match self.try_recv_any(deadline) {
 
            Ok(tup) => Ok(Some(tup)),
 
            Err(Trae::Timeout) => Ok(None),
 
            Err(Trae::PollFailed) => Err(Se::PollFailed),
 
            Err(Trae::EndpointError { error, index }) => Err(Se::BrokenEndpoint(index)),
 
        }
 
    }
 
    pub fn try_recv_any_setup(
 
        &mut self,
 
        deadline: Option<Instant>,
 
    ) -> Result<(usize, Msg), ConnectError> {
 
        use {ConnectError as Ce, TryRecyAnyError as Trae};
 
        self.try_recv_any(deadline).map_err(|err| match err {
 
            Trae::Timeout => Ce::Timeout,
 
            Trae::PollFailed => Ce::PollFailed,
 
            Trae::EndpointError { error, index } => Ce::EndpointSetupError(
 
                self.endpoint_exts[index].endpoint.stream.local_addr().unwrap(),
 
                error,
 
            ),
 
        })
 
    }
 
    fn try_recv_any(&mut self, deadline: Option<Instant>) -> Result<(usize, Msg), TryRecyAnyError> {
 
        use TryRecyAnyError::*;
 
        // 1. try messages already buffered
 
        if let Some(x) = self.undelayed_messages.pop() {
 
            return Ok(x);
 
        }
 
        loop {
 
@@ -63,14 +101,18 @@ impl EndpointManager {
 
                        self.polled_undrained.insert(index);
 
                    }
 
                    return Ok((index, msg));
 
                }
 
            }
 
            // 3. No message yet. Do we have enough time to poll?
 
            let remaining = deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?;
 
            self.poll.poll(&mut self.events, Some(remaining)).map_err(|_| PollFailed)?;
 
            let remaining = if let Some(deadline) = deadline {
 
                Some(deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?)
 
            } else {
 
                None
 
            };
 
            self.poll.poll(&mut self.events, remaining).map_err(|_| PollFailed)?;
 
            for event in self.events.iter() {
 
                let Token(index) = event.token();
 
                self.polled_undrained.insert(index);
 
            }
 
            self.events.clear();
 
        }
src/runtime/error.rs
Show inline comments
 
use crate::common::*;
 

	
 
#[derive(Debug)]
 
#[derive(Debug, Clone)]
 
pub enum EndpointError {
 
    MalformedMessage,
 
    BrokenEndpoint,
 
}
 
#[derive(Debug)]
 
pub enum TryRecyAnyError {
 
    Timeout,
 
    PollFailed,
 
    EndpointError { error: EndpointError, index: usize },
 
    BrokenEndpoint(usize),
 
}
 
#[derive(Debug, Clone)]
 
pub enum SyncError {
 
    Timeout,
 
    NotConnected,
 
    InconsistentProtoComponent(ProtoComponentId),
 
    IndistinguishableBatches([usize; 2]),
 
    DistributedTimeout,
 
    PollFailed,
 
    BrokenEndpoint(usize),
 
}
 
#[derive(Debug)]
 
pub enum PortOpError {
 
    WrongPolarity,
 
    NotConnected,
 
    MultipleOpsOnPort,
 
@@ -35,6 +30,17 @@ pub enum GottenError {
 
}
 

	
 
#[derive(Debug, Eq, PartialEq)]
 
pub enum NextBatchError {
 
    NotConnected,
 
}
 
#[derive(Debug)]
 
pub enum ConnectError {
 
    BindFailed(SocketAddr),
 
    PollInitFailed,
 
    Timeout,
 
    PollFailed,
 
    AcceptFailed(SocketAddr),
 
    AlreadyConnected,
 
    PortPeerPolarityMismatch(PortId),
 
    EndpointSetupError(SocketAddr, EndpointError),
 
}
src/runtime/mod.rs
Show inline comments
 
@@ -72,19 +72,20 @@ pub struct Endpoint {
 
#[derive(Debug, Clone)]
 
pub struct ProtoComponent {
 
    state: ComponentState,
 
    ports: HashSet<PortId>,
 
}
 
pub trait Logger: Debug {
 
    fn line_writer(&mut self) -> &mut dyn std::fmt::Write;
 
    fn dump_log(&self, w: &mut dyn std::io::Write);
 
    fn line_writer(&mut self) -> &mut dyn std::io::Write;
 
}
 
#[derive(Debug)]
 
pub struct StringLogger(ControllerId, String);
 
pub struct VecLogger(ControllerId, Vec<u8>);
 
#[derive(Debug)]
 
pub struct DummyLogger;
 
#[derive(Debug)]
 
pub struct FileLogger(ControllerId, std::fs::File);
 
#[derive(Debug, Clone)]
 
pub struct EndpointSetup {
 
    pub sock_addr: SocketAddr,
 
    pub is_active: bool,
 
}
 
#[derive(Debug)]
 
@@ -200,13 +201,25 @@ impl IdManager {
 
            controller_id: self.controller_id,
 
            u32_suffix: self.proto_component_suffix_stream.next(),
 
        }
 
        .into()
 
    }
 
}
 
impl Drop for Connector {
 
    fn drop(&mut self) {
 
        log!(&mut *self.logger, "Connector dropping. Goodbye!");
 
    }
 
}
 
impl Connector {
 
    pub fn swap_logger(&mut self, mut new_logger: Box<dyn Logger>) -> Box<dyn Logger> {
 
        std::mem::swap(&mut self.logger, &mut new_logger);
 
        new_logger
 
    }
 
    pub fn get_logger(&mut self) -> &mut dyn Logger {
 
        &mut *self.logger
 
    }
 
    pub fn new_port_pair(&mut self) -> [PortId; 2] {
 
        // adds two new associated ports, related to each other, and exposed to the native
 
        let [o, i] = [self.id_manager.new_port_id(), self.id_manager.new_port_id()];
 
        self.native_ports.insert(o);
 
        self.native_ports.insert(i);
 
        // {polarity, peer, route} known. {} unknown.
 
@@ -257,67 +270,61 @@ impl Connector {
 
            },
 
        );
 
        Ok(())
 
    }
 
}
 
impl Logger for DummyLogger {
 
    fn line_writer(&mut self) -> &mut dyn std::fmt::Write {
 
        impl std::fmt::Write for DummyLogger {
 
            fn write_str(&mut self, _: &str) -> Result<(), std::fmt::Error> {
 
    fn line_writer(&mut self) -> &mut dyn std::io::Write {
 
        impl std::io::Write for DummyLogger {
 
            fn flush(&mut self) -> Result<(), std::io::Error> {
 
                Ok(())
 
            }
 
            fn write(&mut self, bytes: &[u8]) -> Result<usize, std::io::Error> {
 
                Ok(bytes.len())
 
            }
 
        }
 
        self
 
    }
 
    fn dump_log(&self, _: &mut dyn std::io::Write) {}
 
}
 
impl StringLogger {
 
impl VecLogger {
 
    pub fn new(controller_id: ControllerId) -> Self {
 
        Self(controller_id, String::default())
 
        Self(controller_id, Default::default())
 
    }
 
}
 
impl Drop for StringLogger {
 
impl Drop for VecLogger {
 
    fn drop(&mut self) {
 
        let stdout = std::io::stderr();
 
        let mut lock = stdout.lock();
 
        writeln!(lock, "--- DROP LOG DUMP ---").unwrap();
 
        self.dump_log(&mut lock);
 
        // lock.flush().unwrap();
 
        // std::thread::sleep(Duration::from_millis(50));
 
        let _ = std::io::Write::write(&mut lock, self.1.as_slice());
 
    }
 
}
 
impl Logger for StringLogger {
 
    fn line_writer(&mut self) -> &mut dyn std::fmt::Write {
 
        use std::fmt::Write;
 
impl Logger for VecLogger {
 
    fn line_writer(&mut self) -> &mut dyn std::io::Write {
 
        let _ = write!(&mut self.1, "\nCID({}): ", self.0);
 
        self
 
    }
 
    fn dump_log(&self, w: &mut dyn std::io::Write) {
 
        let _ = w.write(self.1.as_bytes());
 
}
 
impl FileLogger {
 
    pub fn new(controller_id: ControllerId, file: std::fs::File) -> Self {
 
        Self(controller_id, file)
 
    }
 
}
 
impl std::fmt::Write for StringLogger {
 
    fn write_str(&mut self, s: &str) -> Result<(), std::fmt::Error> {
 
        self.1.write_str(s)
 
impl Logger for FileLogger {
 
    fn line_writer(&mut self) -> &mut dyn std::io::Write {
 
        let _ = write!(&mut self.1, "\nCID({}): ", self.0);
 
        &mut self.1
 
    }
 
}
 
impl Connector {
 
    pub fn get_logger(&self) -> &dyn Logger {
 
        &*self.logger
 
impl std::io::Write for VecLogger {
 
    fn flush(&mut self) -> Result<(), std::io::Error> {
 
        Ok(())
 
    }
 
    pub fn print_state(&self) {
 
        let stdout = std::io::stdout();
 
        let mut lock = stdout.lock();
 
        writeln!(
 
            lock,
 
            "--- Connector with ControllerId={:?}.\n::LOG_DUMP:\n",
 
            self.id_manager.controller_id
 
        )
 
        .unwrap();
 
        self.get_logger().dump_log(&mut lock);
 
        writeln!(lock, "\n\nDEBUG_PRINT:\n{:#?}\n", self).unwrap();
 
    fn write(&mut self, data: &[u8]) -> Result<usize, std::io::Error> {
 
        self.1.extend_from_slice(data);
 
        Ok(data.len())
 
    }
 
}
 
impl Predicate {
 
    #[inline]
 
    pub fn inserted(mut self, k: FiringVar, v: bool) -> Self {
 
        self.assigned.insert(k, v);
src/runtime/setup.rs
Show inline comments
 
use crate::common::*;
 
use crate::runtime::*;
 
use std::io::ErrorKind::WouldBlock;
 

	
 
impl Connector {
 
    pub fn new_simple(
 
        proto_description: Arc<ProtocolDescription>,
 
        controller_id: ControllerId,
 
    ) -> Self {
 
        let logger = Box::new(StringLogger::new(controller_id));
 
        let logger = Box::new(DummyLogger);
 
        // let logger = Box::new(DummyLogger);
 
        let surplus_sockets = 8;
 
        let surplus_sockets = 2;
 
        Self::new(logger, proto_description, controller_id, surplus_sockets)
 
    }
 
    pub fn new(
 
        logger: Box<dyn Logger>,
 
        proto_description: Arc<ProtocolDescription>,
 
        controller_id: ControllerId,
 
@@ -49,21 +50,22 @@ impl Connector {
 
                endpoint_setups.push((p, endpoint_setup));
 
                Ok(p)
 
            }
 
            ConnectorPhased::Communication { .. } => Err(()),
 
        }
 
    }
 
    pub fn connect(&mut self, timeout: Duration) -> Result<(), ()> {
 
    pub fn connect(&mut self, timeout: Option<Duration>) -> Result<(), ConnectError> {
 
        use ConnectError::*;
 
        match &mut self.phased {
 
            ConnectorPhased::Communication { .. } => {
 
                log!(self.logger, "Call to connecting in connected state");
 
                Err(())
 
                Err(AlreadyConnected)
 
            }
 
            ConnectorPhased::Setup { endpoint_setups, .. } => {
 
                log!(self.logger, "~~~ CONNECT called timeout {:?}", timeout);
 
                let deadline = Instant::now() + timeout;
 
                let deadline = timeout.map(|to| Instant::now() + to);
 
                // connect all endpoints in parallel; send and receive peer ids through ports
 
                let mut endpoint_manager = new_endpoint_manager(
 
                    &mut *self.logger,
 
                    endpoint_setups,
 
                    &mut self.port_info,
 
                    deadline,
 
@@ -78,12 +80,13 @@ impl Connector {
 
                    self.id_manager.controller_id,
 
                    &mut *self.logger,
 
                    &mut endpoint_manager,
 
                    deadline,
 
                )?;
 
                log!(self.logger, "Successfully created neighborhood {:?}", &neighborhood);
 
                log!(self.logger, "connect() finished. setup phase complete");
 
                // TODO session optimization goes here
 
                self.phased = ConnectorPhased::Communication {
 
                    round_index: 0,
 
                    endpoint_manager,
 
                    neighborhood,
 
                    mem_inbox: Default::default(),
 
@@ -97,15 +100,16 @@ impl Connector {
 
}
 

	
 
fn new_endpoint_manager(
 
    logger: &mut dyn Logger,
 
    endpoint_setups: &[(PortId, EndpointSetup)],
 
    port_info: &mut PortInfo,
 
    deadline: Instant,
 
) -> Result<EndpointManager, ()> {
 
    deadline: Option<Instant>,
 
) -> Result<EndpointManager, ConnectError> {
 
    ////////////////////////////////////////////
 
    use ConnectError::*;
 
    const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE);
 
    struct Todo {
 
        todo_endpoint: TodoEndpoint,
 
        local_port: PortId,
 
        sent_local_port: bool,          // true <-> I've sent my local port
 
        recv_peer_port: Option<PortId>, // Some(..) <-> I've received my peer's port
 
@@ -116,60 +120,76 @@ fn new_endpoint_manager(
 
    }
 
    fn init_todo(
 
        token: Token,
 
        local_port: PortId,
 
        endpoint_setup: &EndpointSetup,
 
        poll: &mut Poll,
 
    ) -> Result<Todo, ()> {
 
    ) -> Result<Todo, ConnectError> {
 
        let todo_endpoint = if endpoint_setup.is_active {
 
            let mut stream = TcpStream::connect(endpoint_setup.sock_addr).map_err(drop)?;
 
            let mut stream = TcpStream::connect(endpoint_setup.sock_addr)
 
                .expect("mio::TcpStream connect should not fail!");
 
            poll.registry().register(&mut stream, token, BOTH).unwrap();
 
            TodoEndpoint::Endpoint(Endpoint { stream, inbox: vec![] })
 
        } else {
 
            let mut listener = TcpListener::bind(endpoint_setup.sock_addr).map_err(drop)?;
 
            let mut listener = TcpListener::bind(endpoint_setup.sock_addr)
 
                .map_err(|_| BindFailed(endpoint_setup.sock_addr))?;
 
            poll.registry().register(&mut listener, token, BOTH).unwrap();
 
            TodoEndpoint::Listener(listener)
 
        };
 
        Ok(Todo { todo_endpoint, local_port, sent_local_port: false, recv_peer_port: None })
 
    };
 
    ////////////////////////////////////////////
 

	
 
    // 1. Start to construct EndpointManager
 
    let mut poll = Poll::new().map_err(drop)?;
 
    let mut poll = Poll::new().map_err(|_| PollInitFailed)?;
 
    let mut events = Events::with_capacity(64);
 
    let mut polled_undrained = IndexSet::<usize>::default();
 
    let mut delayed_messages = vec![];
 

	
 
    // 2. create a registered (TcpListener/Endpoint) for passive / active respectively
 
    let mut todos = endpoint_setups
 
        .iter()
 
        .enumerate()
 
        .map(|(index, (local_port, endpoint_setup))| {
 
            init_todo(Token(index), *local_port, endpoint_setup, &mut poll)
 
        })
 
        .collect::<Result<Vec<Todo>, _>>()?;
 
        .collect::<Result<Vec<Todo>, ConnectError>>()?;
 

	
 
    // 3. Using poll to drive progress:
 
    //    - accept an incoming connection for each TcpListener (turning them into endpoints too)
 
    //    - for each endpoint, send the local PortId
 
    //    - for each endpoint, recv the peer's PortId, and
 
    let mut setup_incomplete: HashSet<usize> = (0..todos.len()).collect();
 
    while !setup_incomplete.is_empty() {
 
        let remaining = deadline.checked_duration_since(Instant::now()).ok_or(())?;
 
        poll.poll(&mut events, Some(remaining)).map_err(drop)?;
 
        let remaining = if let Some(deadline) = deadline {
 
            Some(deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?)
 
        } else {
 
            None
 
        };
 
        poll.poll(&mut events, remaining).map_err(|_| PollFailed)?;
 
        for event in events.iter() {
 
            let token = event.token();
 
            let Token(index) = token;
 
            let todo: &mut Todo = &mut todos[index];
 
            if let TodoEndpoint::Listener(listener) = &mut todo.todo_endpoint {
 
                let (mut stream, peer_addr) = listener.accept().map_err(drop)?;
 
                poll.registry().deregister(listener).unwrap();
 
                poll.registry().register(&mut stream, token, BOTH).unwrap();
 
                log!(logger, "Endpoint[{}] accepted a connection from {:?}", index, peer_addr);
 
                let endpoint = Endpoint { stream, inbox: vec![] };
 
                todo.todo_endpoint = TodoEndpoint::Endpoint(endpoint);
 
                match listener.accept() {
 
                    Ok((mut stream, peer_addr)) => {
 
                        poll.registry().deregister(listener).unwrap();
 
                        poll.registry().register(&mut stream, token, BOTH).unwrap();
 
                        log!(
 
                            logger,
 
                            "Endpoint[{}] accepted a connection from {:?}",
 
                            index,
 
                            peer_addr
 
                        );
 
                        let endpoint = Endpoint { stream, inbox: vec![] };
 
                        todo.todo_endpoint = TodoEndpoint::Endpoint(endpoint);
 
                    }
 
                    Err(e) if e.kind() == WouldBlock => {}
 
                    Err(_) => return Err(AcceptFailed(listener.local_addr().unwrap())),
 
                }
 
            }
 
            match todo {
 
                Todo {
 
                    todo_endpoint: TodoEndpoint::Endpoint(endpoint),
 
                    local_port,
 
                    sent_local_port,
 
@@ -182,26 +202,37 @@ fn new_endpoint_manager(
 
                    let local_polarity = *port_info.polarities.get(local_port).unwrap();
 
                    if event.is_writable() && !*sent_local_port {
 
                        let msg = Msg::SetupMsg(SetupMsg::MyPortInfo(MyPortInfo {
 
                            polarity: local_polarity,
 
                            port: *local_port,
 
                        }));
 
                        endpoint.send(&msg)?;
 
                        endpoint
 
                            .send(&msg)
 
                            .map_err(|e| {
 
                                EndpointSetupError(endpoint.stream.local_addr().unwrap(), e)
 
                            })
 
                            .unwrap();
 
                        log!(logger, "endpoint[{}] sent msg {:?}", index, &msg);
 
                        *sent_local_port = true;
 
                    }
 
                    if event.is_readable() && recv_peer_port.is_none() {
 
                        let maybe_msg = endpoint.try_recv().map_err(drop)?;
 
                        let maybe_msg = endpoint.try_recv().map_err(|e| {
 
                            EndpointSetupError(endpoint.stream.local_addr().unwrap(), e)
 
                        })?;
 
                        if maybe_msg.is_some() && !endpoint.inbox.is_empty() {
 
                            polled_undrained.insert(index);
 
                        }
 
                        match maybe_msg {
 
                            None => {} // msg deserialization incomplete
 
                            Some(Msg::SetupMsg(SetupMsg::MyPortInfo(peer_info))) => {
 
                                log!(logger, "endpoint[{}] got peer info {:?}", index, peer_info);
 
                                assert!(peer_info.polarity != local_polarity);
 
                                if peer_info.polarity == local_polarity {
 
                                    return Err(ConnectError::PortPeerPolarityMismatch(
 
                                        *local_port,
 
                                    ));
 
                                }
 
                                *recv_peer_port = Some(peer_info.port);
 
                                // 1. finally learned the peer of this port!
 
                                port_info.peers.insert(*local_port, peer_info.port);
 
                                // 2. learned the info of this peer port
 
                                port_info.polarities.insert(peer_info.port, peer_info.polarity);
 
                                port_info.peers.insert(peer_info.port, *local_port);
 
@@ -248,32 +279,32 @@ fn new_endpoint_manager(
 
}
 

	
 
fn init_neighborhood(
 
    controller_id: ControllerId,
 
    logger: &mut dyn Logger,
 
    em: &mut EndpointManager,
 
    deadline: Instant,
 
) -> Result<Neighborhood, ()> {
 
    deadline: Option<Instant>,
 
) -> Result<Neighborhood, ConnectError> {
 
    use {Msg::SetupMsg as S, SetupMsg::*};
 
    log!(logger, "beginning neighborhood construction");
 
    // 1. broadcast my ID as the first echo. await reply from all neighbors
 
    let echo = S(LeaderEcho { maybe_leader: controller_id });
 
    let mut awaiting = HashSet::with_capacity(em.endpoint_exts.len());
 
    for (index, ee) in em.endpoint_exts.iter_mut().enumerate() {
 
    for index in 0..em.endpoint_exts.len() {
 
        log!(logger, "{:?}'s initial echo to {:?}, {:?}", controller_id, index, &echo);
 
        ee.endpoint.send(&echo)?;
 
        em.send_to_setup(index, &echo)?;
 
        awaiting.insert(index);
 
    }
 

	
 
    // 2. Receive incoming replies. whenever a higher-id echo arrives,
 
    //    adopt it as leader, sender as parent, and reset the await set.
 
    let mut parent: Option<usize> = None;
 
    let mut my_leader = controller_id;
 
    em.undelay_all();
 
    'echo_loop: while !awaiting.is_empty() || parent.is_some() {
 
        let (index, msg) = em.try_recv_any(deadline).map_err(drop)?;
 
        let (index, msg) = em.try_recv_any_setup(deadline)?;
 
        log!(logger, "GOT from index {:?} msg {:?}", &index, &msg);
 
        match msg {
 
            S(LeaderAnnounce { leader }) => {
 
                // someone else completed the echo and became leader first!
 
                // the sender is my parent
 
                parent = Some(index);
 
@@ -287,13 +318,13 @@ fn init_neighborhood(
 
                    Less => { /* ignore this wave */ }
 
                    Equal => {
 
                        awaiting.remove(&index);
 
                        if awaiting.is_empty() {
 
                            if let Some(p) = parent {
 
                                // return the echo to my parent
 
                                em.send_to(p, &S(LeaderEcho { maybe_leader }))?;
 
                                em.send_to_setup(p, &S(LeaderEcho { maybe_leader }))?;
 
                            } else {
 
                                // wave completed!
 
                                break 'echo_loop;
 
                            }
 
                        }
 
                    }
 
@@ -304,21 +335,21 @@ fn init_neighborhood(
 
                        my_leader = maybe_leader;
 
                        let echo = S(LeaderEcho { maybe_leader: my_leader });
 
                        awaiting.clear();
 
                        if em.endpoint_exts.len() == 1 {
 
                            // immediately reply to parent
 
                            log!(logger, "replying echo to parent {:?} immediately", index);
 
                            em.send_to(index, &echo)?;
 
                            em.send_to_setup(index, &echo)?;
 
                        } else {
 
                            for (index2, ee) in em.endpoint_exts.iter_mut().enumerate() {
 
                            for index2 in 0..em.endpoint_exts.len() {
 
                                if index2 == index {
 
                                    // don't propagate echo to my parent
 
                                    continue;
 
                                }
 
                                log!(logger, "repeating echo {:?} to {:?}", &echo, index2);
 
                                ee.endpoint.send(&echo)?;
 
                                em.send_to_setup(index2, &echo)?;
 
                                awaiting.insert(index2);
 
                            }
 
                        }
 
                    }
 
                }
 
            }
 
@@ -343,27 +374,27 @@ fn init_neighborhood(
 
    log!(logger, "DONE WITH ECHO! Leader has cid={:?}", my_leader);
 

	
 
    // 3. broadcast leader announcement (except to parent: confirm they are your parent)
 
    //    in this loop, every node sends 1 message to each neighbor
 
    //    await 1 message from all non-parents.
 
    let msg_for_non_parents = S(LeaderAnnounce { leader: my_leader });
 
    for (index, ee) in em.endpoint_exts.iter_mut().enumerate() {
 
    for index in 0..em.endpoint_exts.len() {
 
        let msg = if Some(index) == parent {
 
            &S(YouAreMyParent)
 
        } else {
 
            awaiting.insert(index);
 
            &msg_for_non_parents
 
        };
 
        log!(logger, "ANNOUNCING to {:?} {:?}", index, msg);
 
        ee.endpoint.send(msg)?;
 
        em.send_to_setup(index, msg)?;
 
    }
 
    let mut children = Vec::default();
 
    em.undelay_all();
 
    while !awaiting.is_empty() {
 
        log!(logger, "awaiting {:?}", &awaiting);
 
        let (index, msg) = em.try_recv_any(deadline).map_err(drop)?;
 
        let (index, msg) = em.try_recv_any_setup(deadline)?;
 
        match msg {
 
            S(YouAreMyParent) => {
 
                assert!(awaiting.remove(&index));
 
                children.push(index);
 
            }
 
            S(LeaderAnnounce { leader }) => {
src/runtime/tests.rs
Show inline comments
 
use crate as reowolf;
 
use crossbeam_utils::thread::scope;
 
use reowolf::{Connector, EndpointSetup, Polarity::*, ProtocolDescription};
 
use reowolf::{
 
    Polarity::{Getter, Putter},
 
    *,
 
};
 
use std::net::SocketAddr;
 
use std::{sync::Arc, time::Duration};
 

	
 
fn next_test_addr() -> SocketAddr {
 
    use std::{
 
        net::{Ipv4Addr, SocketAddrV4},
 
@@ -46,72 +49,72 @@ fn new_net_port() {
 
    let _ = c.new_net_port(Putter, EndpointSetup { sock_addr, is_active: true }).unwrap();
 
}
 

	
 
#[test]
 
fn trivial_connect() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
}
 

	
 
#[test]
 
fn single_node_connect() {
 
    let sock_addr = next_test_addr();
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let _ = c.new_net_port(Getter, EndpointSetup { sock_addr, is_active: false }).unwrap();
 
    let _ = c.new_net_port(Putter, EndpointSetup { sock_addr, is_active: true }).unwrap();
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
}
 

	
 
#[test]
 
fn multithreaded_connect() {
 
    let sock_addr = next_test_addr();
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
            let _ = c.new_net_port(Getter, EndpointSetup { sock_addr, is_active: true }).unwrap();
 
            c.connect(Duration::from_secs(1)).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 1);
 
            let _ = c.new_net_port(Putter, EndpointSetup { sock_addr, is_active: false }).unwrap();
 
            c.connect(Duration::from_secs(1)).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn put_no_sync() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let [o, _] = c.new_port_pair();
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.put(o, (b"hi" as &[_]).into()).unwrap();
 
}
 

	
 
#[test]
 
fn wrong_polarity_bad() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let [_, i] = c.new_port_pair();
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.put(i, (b"hi" as &[_]).into()).unwrap_err();
 
}
 

	
 
#[test]
 
fn dup_put_bad() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let [o, _] = c.new_port_pair();
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.put(o, (b"hi" as &[_]).into()).unwrap();
 
    c.put(o, (b"hi" as &[_]).into()).unwrap_err();
 
}
 

	
 
#[test]
 
fn trivial_sync() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    c.sync(Duration::from_secs(1)).unwrap();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.sync(Some(Duration::from_secs(1))).unwrap();
 
}
 

	
 
#[test]
 
fn unconnected_gotten_err() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let [_, i] = c.new_port_pair();
 
@@ -119,122 +122,122 @@ fn unconnected_gotten_err() {
 
}
 

	
 
#[test]
 
fn connected_gotten_err_no_round() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let [_, i] = c.new_port_pair();
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    assert_eq!(reowolf::error::GottenError::NoPreviousRound, c.gotten(i).unwrap_err());
 
}
 

	
 
#[test]
 
fn connected_gotten_err_ungotten() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let [_, i] = c.new_port_pair();
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    c.sync(Duration::from_secs(1)).unwrap();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.sync(Some(Duration::from_secs(1))).unwrap();
 
    assert_eq!(reowolf::error::GottenError::PortDidntGet, c.gotten(i).unwrap_err());
 
}
 

	
 
#[test]
 
fn native_polarity_checks() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let [o, i] = c.new_port_pair();
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    // fail...
 
    c.get(o).unwrap_err();
 
    c.put(i, (b"hi" as &[_]).into()).unwrap_err();
 
    // succeed..
 
    c.get(i).unwrap();
 
    c.put(o, (b"hi" as &[_]).into()).unwrap();
 
}
 

	
 
#[test]
 
fn native_multiple_gets() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let [_, i] = c.new_port_pair();
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.get(i).unwrap();
 
    c.get(i).unwrap_err();
 
}
 

	
 
#[test]
 
fn next_batch() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    c.next_batch().unwrap_err();
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.next_batch().unwrap();
 
    c.next_batch().unwrap();
 
    c.next_batch().unwrap();
 
}
 

	
 
#[test]
 
fn native_self_msg() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let [o, i] = c.new_port_pair();
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.get(i).unwrap();
 
    c.put(o, (b"hi" as &[_]).into()).unwrap();
 
    c.sync(Duration::from_secs(1)).unwrap();
 
    c.sync(Some(Duration::from_secs(1))).unwrap();
 
}
 

	
 
#[test]
 
fn two_natives_msg() {
 
    let sock_addr = next_test_addr();
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
            let g = c.new_net_port(Getter, EndpointSetup { sock_addr, is_active: true }).unwrap();
 
            c.connect(Duration::from_secs(1)).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.get(g).unwrap();
 
            c.sync(Duration::from_secs(1)).unwrap();
 
            c.sync(Some(Duration::from_secs(1))).unwrap();
 
            c.gotten(g).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 1);
 
            let p = c.new_net_port(Putter, EndpointSetup { sock_addr, is_active: false }).unwrap();
 
            c.connect(Duration::from_secs(1)).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.put(p, (b"hello" as &[_]).into()).unwrap();
 
            c.sync(Duration::from_secs(1)).unwrap();
 
            c.sync(Some(Duration::from_secs(1))).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn trivial_nondet() {
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let [_, i] = c.new_port_pair();
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.get(i).unwrap();
 
    // getting 0 batch
 
    c.next_batch().unwrap();
 
    // silent 1 batch
 
    assert_eq!(1, c.sync(Duration::from_secs(1)).unwrap());
 
    assert_eq!(1, c.sync(Some(Duration::from_secs(1))).unwrap());
 
    c.gotten(i).unwrap_err();
 
}
 

	
 
#[test]
 
fn connector_pair_nondet() {
 
    let sock_addr = next_test_addr();
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
            let g = c.new_net_port(Getter, EndpointSetup { sock_addr, is_active: true }).unwrap();
 
            c.connect(Duration::from_secs(1)).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.next_batch().unwrap();
 
            c.get(g).unwrap();
 
            assert_eq!(1, c.sync(Duration::from_secs(1)).unwrap());
 
            assert_eq!(1, c.sync(Some(Duration::from_secs(1))).unwrap());
 
            c.gotten(g).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 1);
 
            let p = c.new_net_port(Putter, EndpointSetup { sock_addr, is_active: false }).unwrap();
 
            c.connect(Duration::from_secs(1)).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.put(p, (b"hello" as &[_]).into()).unwrap();
 
            c.sync(Duration::from_secs(1)).unwrap();
 
            c.sync(Some(Duration::from_secs(1))).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
@@ -242,13 +245,13 @@ fn cannot_use_moved_ports() {
 
    /*
 
    native p|-->|g sync
 
    */
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 1);
 
    let [p, g] = c.new_port_pair();
 
    c.add_component(b"sync", &[g, p]).unwrap();
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.put(p, (b"hello" as &[_]).into()).unwrap_err();
 
    c.get(g).unwrap_err();
 
}
 

	
 
#[test]
 
fn sync_sync() {
 
@@ -257,48 +260,54 @@ fn sync_sync() {
 
           g1|<--|p1
 
    */
 
    let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0);
 
    let [p0, g0] = c.new_port_pair();
 
    let [p1, g1] = c.new_port_pair();
 
    c.add_component(b"sync", &[g0, p1]).unwrap();
 
    c.connect(Duration::from_secs(1)).unwrap();
 
    c.connect(Some(Duration::from_secs(1))).unwrap();
 
    c.put(p0, (b"hello" as &[_]).into()).unwrap();
 
    c.get(g1).unwrap();
 
    c.sync(Duration::from_secs(1)).unwrap();
 
    c.sync(Some(Duration::from_secs(1))).unwrap();
 
    c.gotten(g1).unwrap();
 
}
 

	
 
fn file_logged_connector(controller_id: ControllerId, path: &str) -> Connector {
 
    let file = std::fs::File::create(path).unwrap();
 
    let file_logger = Box::new(FileLogger::new(controller_id, file));
 
    Connector::new(file_logger, MINIMAL_PROTO.clone(), controller_id, 8)
 
}
 

	
 
#[test]
 
fn double_net_connect() {
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    scope(|s| {
 
        s.spawn(|_| {
 
            let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 2);
 
            let mut c = file_logged_connector(0, "./logs/double_net_a.txt");
 
            let [_p, _g] = [
 
                c.new_net_port(Putter, EndpointSetup { sock_addr: sock_addrs[0], is_active: true })
 
                    .unwrap(),
 
                c.new_net_port(Getter, EndpointSetup { sock_addr: sock_addrs[1], is_active: true })
 
                    .unwrap(),
 
            ];
 
            c.connect(Duration::from_secs(1)).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
        });
 
        s.spawn(|_| {
 
            let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 3);
 
            let mut c = file_logged_connector(1, "./logs/double_net_b.txt");
 
            let [_g, _p] = [
 
                c.new_net_port(
 
                    Getter,
 
                    EndpointSetup { sock_addr: sock_addrs[0], is_active: false },
 
                )
 
                .unwrap(),
 
                c.new_net_port(
 
                    Putter,
 
                    EndpointSetup { sock_addr: sock_addrs[1], is_active: false },
 
                )
 
                .unwrap(),
 
            ];
 
            c.connect(Duration::from_secs(1)).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
@@ -311,44 +320,44 @@ fn distributed_msg_bounce() {
 
    scope(|s| {
 
        s.spawn(|_| {
 
            /*
 
            native | sync p|-->
 
                   |      g|<--
 
            */
 
            let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 4);
 
            let mut c = file_logged_connector(0, "./logs/distributed_msg_bounce_a.txt");
 
            let [p, g] = [
 
                c.new_net_port(Putter, EndpointSetup { sock_addr: sock_addrs[0], is_active: true })
 
                    .unwrap(),
 
                c.new_net_port(Getter, EndpointSetup { sock_addr: sock_addrs[1], is_active: true })
 
                    .unwrap(),
 
            ];
 
            c.add_component(b"sync", &[g, p]).unwrap();
 
            c.connect(Duration::from_secs(1)).unwrap();
 
            c.sync(Duration::from_secs(1)).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.sync(Some(Duration::from_secs(1))).unwrap();
 
        });
 
        s.spawn(|_| {
 
            /*
 
            native p|-->
 
                   g|<--
 
            */
 
            let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 5);
 
            let mut c = file_logged_connector(1, "./logs/distributed_msg_bounce_b.txt");
 
            let [g, p] = [
 
                c.new_net_port(
 
                    Getter,
 
                    EndpointSetup { sock_addr: sock_addrs[0], is_active: false },
 
                )
 
                .unwrap(),
 
                c.new_net_port(
 
                    Putter,
 
                    EndpointSetup { sock_addr: sock_addrs[1], is_active: false },
 
                )
 
                .unwrap(),
 
            ];
 
            c.connect(Duration::from_secs(1)).unwrap();
 
            c.connect(Some(Duration::from_secs(1))).unwrap();
 
            c.put(p, (b"hello" as &[_]).into()).unwrap();
 
            c.get(g).unwrap();
 
            c.sync(Duration::from_secs(1)).unwrap();
 
            c.sync(Some(Duration::from_secs(1))).unwrap();
 
            c.gotten(g).unwrap();
 
        });
 
    })
 
    .unwrap();
 
}
0 comments (0 inline, 0 general)