Changeset - d67249fd4593
[Not reviewed]
0 8 0
Christopher Esterhuyse - 5 years ago 2020-07-02 12:35:40
christopher.esterhuyse@gmail.com
logging using hex, less glob imports, and endpoint_logging feature flag
8 files changed with 119 insertions and 91 deletions:
0 comments (0 inline, 0 general)
.gitignore
Show inline comments
 
target
 
/.idea
 
**/*.rs.bk
 
Cargo.lock
 
main
 
examples/*/*.exe
 
examples/*.dll
 
examples/reowolf*
 
examples/*.txt
 
logs
 
logs/*
 
logs/*/*
Cargo.toml
Show inline comments
 
@@ -27,13 +27,14 @@ backtrace = "0.3"
 

	
 
[dev-dependencies]
 
# test-generator = "0.3.0"
 
crossbeam-utils = "0.7.2"
 
lazy_static = "1.4.0"
 

	
 
[lib]
 
# compile target: dynamically linked library using C ABI
 
crate-type = ["cdylib"]
 

	
 
[features]
 
default = ["ffi"]
 
ffi = [] # no feature dependencies
 
\ No newline at end of file
 
ffi = [] # no feature dependencies
 
endpoint_logging = [] # see src/macros where a conditional check include endpoint logging
 
\ No newline at end of file
src/common.rs
Show inline comments
 
@@ -78,24 +78,25 @@ pub(crate) enum NonsyncBlocker {
 
    Inconsistent,
 
    ComponentExit,
 
    SyncBlockStart,
 
}
 
#[derive(Debug, Clone)]
 
pub(crate) enum SyncBlocker {
 
    Inconsistent,
 
    SyncBlockEnd,
 
    CouldntReadMsg(PortId),
 
    CouldntCheckFiring(PortId),
 
    PutMsg(PortId, Payload),
 
}
 
pub(crate) struct DenseDebugHex<'a>(pub &'a [u8]);
 

	
 
///////////////////// IMPL /////////////////////
 
impl U32Stream {
 
    pub(crate) fn next(&mut self) -> u32 {
 
        if self.next == u32::MAX {
 
            panic!("NO NEXT!")
 
        }
 
        self.next += 1;
 
        self.next - 1
 
    }
 
}
 
impl From<Id> for PortId {
 
@@ -167,25 +168,33 @@ impl Debug for PortId {
 
impl Debug for FiringVar {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        write!(f, "fvID({}'{})", (self.0).0.connector_id, (self.0).0.u32_suffix)
 
    }
 
}
 
impl Debug for ProtoComponentId {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        write!(f, "pcID({}'{})", self.0.connector_id, self.0.u32_suffix)
 
    }
 
}
 
impl Debug for Payload {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        write!(f, "Payload{:x?}", self.as_slice())
 
        write!(f, "Payload[{:?}]", DenseDebugHex(self.as_slice()))
 
    }
 
}
 
impl std::ops::Not for Polarity {
 
    type Output = Self;
 
    fn not(self) -> Self::Output {
 
        use Polarity::*;
 
        match self {
 
            Putter => Getter,
 
            Getter => Putter,
 
        }
 
    }
 
}
 
impl Debug for DenseDebugHex<'_> {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        for b in self.0 {
 
            write!(f, "{:02X?}", b)?;
 
        }
 
        Ok(())
 
    }
 
}
src/macros.rs
Show inline comments
 
macro_rules! endptlog {
 
    ($logger:expr, $($arg:tt)*) => {{
 
        // let w = $logger.line_writer();
 
        // let _ = writeln!(w, $($arg)*);
 
    	if cfg!(feature = "endpoint_logging") {
 
	        let w = $logger.line_writer();
 
	        let _ = writeln!(w, $($arg)*);
 
	    }
 
    }};
 
}
 
macro_rules! log {
 
    ($logger:expr, $($arg:tt)*) => {{
 
        let _ = writeln!($logger.line_writer(), $($arg)*);
 
    }};
 
}
src/runtime/communication.rs
Show inline comments
 
@@ -51,87 +51,87 @@ trait ReplaceBoolTrue {
 
}
 
impl ReplaceBoolTrue for bool {
 
    fn replace_with_true(&mut self) -> bool {
 
        let was = *self;
 
        *self = true;
 
        !was
 
    }
 
}
 

	
 
////////////////
 
impl Connector {
 
    pub fn gotten(&mut self, port: PortId) -> Result<&Payload, GottenError> {
 
        use GottenError::*;
 
        use GottenError as Ge;
 
        let Self { phased, .. } = self;
 
        match phased {
 
            ConnectorPhased::Setup { .. } => Err(NoPreviousRound),
 
            ConnectorPhased::Setup { .. } => Err(Ge::NoPreviousRound),
 
            ConnectorPhased::Communication(comm) => match &comm.round_result {
 
                Err(_) => Err(PreviousSyncFailed),
 
                Ok(None) => Err(NoPreviousRound),
 
                Ok(Some(round_ok)) => round_ok.gotten.get(&port).ok_or(PortDidntGet),
 
                Err(_) => Err(Ge::PreviousSyncFailed),
 
                Ok(None) => Err(Ge::NoPreviousRound),
 
                Ok(Some(round_ok)) => round_ok.gotten.get(&port).ok_or(Ge::PortDidntGet),
 
            },
 
        }
 
    }
 
    pub fn next_batch(&mut self) -> Result<usize, NextBatchError> {
 
        // returns index of new batch
 
        use NextBatchError::*;
 
        use NextBatchError as Nbe;
 
        let Self { phased, .. } = self;
 
        match phased {
 
            ConnectorPhased::Setup { .. } => Err(NotConnected),
 
            ConnectorPhased::Setup { .. } => Err(Nbe::NotConnected),
 
            ConnectorPhased::Communication(comm) => {
 
                comm.native_batches.push(Default::default());
 
                Ok(comm.native_batches.len() - 1)
 
            }
 
        }
 
    }
 
    fn port_op_access(
 
        &mut self,
 
        port: PortId,
 
        expect_polarity: Polarity,
 
    ) -> Result<&mut NativeBatch, PortOpError> {
 
        use PortOpError::*;
 
        use PortOpError as Poe;
 
        let Self { unphased, phased } = self;
 
        if !unphased.native_ports.contains(&port) {
 
            return Err(PortUnavailable);
 
            return Err(Poe::PortUnavailable);
 
        }
 
        match unphased.port_info.polarities.get(&port) {
 
            Some(p) if *p == expect_polarity => {}
 
            Some(_) => return Err(WrongPolarity),
 
            None => return Err(UnknownPolarity),
 
            Some(_) => return Err(Poe::WrongPolarity),
 
            None => return Err(Poe::UnknownPolarity),
 
        }
 
        match phased {
 
            ConnectorPhased::Setup { .. } => Err(NotConnected),
 
            ConnectorPhased::Setup { .. } => Err(Poe::NotConnected),
 
            ConnectorPhased::Communication(comm) => {
 
                let batch = comm.native_batches.last_mut().unwrap(); // length >= 1 is invariant
 
                Ok(batch)
 
            }
 
        }
 
    }
 
    pub fn put(&mut self, port: PortId, payload: Payload) -> Result<(), PortOpError> {
 
        use PortOpError::*;
 
        use PortOpError as Poe;
 
        let batch = self.port_op_access(port, Putter)?;
 
        if batch.to_put.contains_key(&port) {
 
            Err(MultipleOpsOnPort)
 
            Err(Poe::MultipleOpsOnPort)
 
        } else {
 
            batch.to_put.insert(port, payload);
 
            Ok(())
 
        }
 
    }
 
    pub fn get(&mut self, port: PortId) -> Result<(), PortOpError> {
 
        use PortOpError::*;
 
        use PortOpError as Poe;
 
        let batch = self.port_op_access(port, Getter)?;
 
        if batch.to_get.insert(port) {
 
            Ok(())
 
        } else {
 
            Err(MultipleOpsOnPort)
 
            Err(Poe::MultipleOpsOnPort)
 
        }
 
    }
 
    // entrypoint for caller. overwrites round result enum, and returns what happened
 
    pub fn sync(&mut self, timeout: Option<Duration>) -> Result<usize, SyncError> {
 
        let Self { unphased, phased } = self;
 
        match phased {
 
            ConnectorPhased::Setup { .. } => Err(SyncError::NotConnected),
 
            ConnectorPhased::Communication(comm) => {
 
                comm.round_result = Self::connected_sync(unphased, comm, timeout);
 
                match &comm.round_result {
 
                    Ok(None) => unreachable!(),
 
                    Ok(Some(ok_result)) => Ok(ok_result.batch_index),
 
@@ -414,34 +414,32 @@ impl Connector {
 
                        );
 
                    }
 
                    Some(Route::Endpoint { index }) => {
 
                        let msg = Msg::CommMsg(CommMsg {
 
                            round_index: comm.round_index,
 
                            contents: CommMsgContents::SendPayload(send_payload_msg),
 
                        });
 
                        comm.endpoint_manager.send_to_comms(*index, &msg)?;
 
                    }
 
                    Some(Route::LocalComponent(ComponentId::Native)) => branching_native.feed_msg(
 
                        cu,
 
                        &mut solution_storage,
 
                        // &mut Pay
 
                        getter,
 
                        &send_payload_msg,
 
                    ),
 
                    Some(Route::LocalComponent(ComponentId::Proto(proto_component_id))) => {
 
                        if let Some(branching_component) =
 
                            branching_proto_components.get_mut(proto_component_id)
 
                        {
 
                            let proto_component_id = *proto_component_id;
 
                            // let ConnectorUnphased { port_info, proto_description, .. } = cu;
 
                            branching_component.feed_msg(
 
                                cu,
 
                                &mut solution_storage,
 
                                proto_component_id,
 
                                &mut payloads_to_get,
 
                                getter,
 
                                &send_payload_msg,
 
                            )?;
 
                            if branching_component.branches.is_empty() {
 
                                log!(
 
                                    cu.logger,
 
                                    "{:?} has become inconsistent!",
src/runtime/endpoints.rs
Show inline comments
 
use super::*;
 

	
 
struct MonitoredReader<R: Read> {
 
    bytes: usize,
 
    r: R,
 
}
 
#[derive(Debug)]
 
enum TryRecyAnyError {
 
    Timeout,
 
    PollFailed,
 
    EndpointError { error: EndpointError, index: usize },
 
}
 

	
 
/////////////////////
 
impl Endpoint {
 
    fn bincode_opts() -> impl bincode::config::Options {
 
        bincode::config::DefaultOptions::default()
 
    }
 
    pub(super) fn try_recv<T: serde::de::DeserializeOwned>(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
    ) -> Result<Option<T>, EndpointError> {
 
        use EndpointError::*;
 
        use EndpointError as Ee;
 
        // populate inbox as much as possible
 
        let before_len = self.inbox.len();
 
        'read_loop: loop {
 
            let res = self.stream.read_to_end(&mut self.inbox);
 
            endptlog!(logger, "Stream read to end {:?}", &res);
 
            match res {
 
                Err(e) if would_block(&e) => break 'read_loop,
 
                Ok(0) => break 'read_loop,
 
                Ok(_) => (),
 
                Err(_e) => return Err(BrokenEndpoint),
 
                Err(_e) => return Err(Ee::BrokenEndpoint),
 
            }
 
        }
 
        endptlog!(logger, "Inbox bytes {:x?}", &self.inbox);
 
        endptlog!(
 
            logger,
 
            "Inbox bytes [{:x?}| {:x?}]",
 
            DenseDebugHex(&self.inbox[..before_len]),
 
            DenseDebugHex(&self.inbox[before_len..]),
 
        );
 
        let mut monitored = MonitoredReader::from(&self.inbox[..]);
 
        use bincode::config::Options;
 
        match Self::bincode_opts().deserialize_from(&mut monitored) {
 
            Ok(msg) => {
 
                let msg_size = monitored.bytes_read();
 
                self.inbox.drain(0..(msg_size.try_into().unwrap()));
 
                endptlog!(
 
                    logger,
 
                    "Yielding msg. Inbox len {}-{}=={}: [{:?}]",
 
                    self.inbox.len() + msg_size,
 
                    msg_size,
 
                    self.inbox.len(),
 
                    DenseDebugHex(&self.inbox[..]),
 
                );
 
                Ok(Some(msg))
 
            }
 
            Err(e) => match *e {
 
                bincode::ErrorKind::Io(k) if k.kind() == std::io::ErrorKind::UnexpectedEof => {
 
                    Ok(None)
 
                }
 
                _ => Err(MalformedMessage),
 
                _ => Err(Ee::MalformedMessage),
 
            },
 
        }
 
    }
 
    pub(super) fn send<T: serde::ser::Serialize>(&mut self, msg: &T) -> Result<(), EndpointError> {
 
        use bincode::config::Options;
 
        Self::bincode_opts()
 
            .serialize_into(&mut self.stream, msg)
 
            .map_err(|_| EndpointError::BrokenEndpoint)
 
        use EndpointError as Ee;
 
        Self::bincode_opts().serialize_into(&mut self.stream, msg).map_err(|_| Ee::BrokenEndpoint)
 
    }
 
}
 

	
 
impl EndpointManager {
 
    pub(super) fn index_iter(&self) -> Range<usize> {
 
        0..self.num_endpoints()
 
    }
 
    pub(super) fn num_endpoints(&self) -> usize {
 
        self.endpoint_exts.len()
 
    }
 
    pub(super) fn send_to_comms(&mut self, index: usize, msg: &Msg) -> Result<(), SyncError> {
 
        let endpoint = &mut self.endpoint_exts[index].endpoint;
 
        endpoint.send(msg).map_err(|_| SyncError::BrokenEndpoint(index))
 
    }
 
    pub(super) fn send_to_setup(&mut self, index: usize, msg: &Msg) -> Result<(), ConnectError> {
 
        let endpoint = &mut self.endpoint_exts[index].endpoint;
 
        endpoint.send(msg).map_err(|err| {
 
            ConnectError::EndpointSetupError(endpoint.stream.local_addr().unwrap(), err)
 
        })
 
    }
 
    pub(super) fn send_to(&mut self, index: usize, msg: &Msg) -> Result<(), EndpointError> {
 
        self.endpoint_exts[index].endpoint.send(msg)
 
    }
 
    pub(super) fn try_recv_any_comms(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        deadline: Option<Instant>,
 
    ) -> Result<Option<(usize, Msg)>, SyncError> {
 
        use {SyncError as Se, TryRecyAnyError as Trae};
 
        match self.try_recv_any(logger, deadline) {
 
            Ok(tup) => Ok(Some(tup)),
 
            Err(Trae::Timeout) => Ok(None),
 
            Err(Trae::PollFailed) => Err(Se::PollFailed),
 
            Err(Trae::EndpointError { error: _, index }) => Err(Se::BrokenEndpoint(index)),
 
        }
 
@@ -176,18 +184,17 @@ impl<R: Read> From<R> for MonitoredReader<R> {
 
impl<R: Read> MonitoredReader<R> {
 
    pub(super) fn bytes_read(&self) -> usize {
 
        self.bytes
 
    }
 
}
 
impl<R: Read> Read for MonitoredReader<R> {
 
    fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
 
        let n = self.r.read(buf)?;
 
        self.bytes += n;
 
        Ok(n)
 
    }
 
}
 

	
 
impl Into<Msg> for SetupMsg {
 
    fn into(self) -> Msg {
 
        Msg::SetupMsg(self)
 
    }
 
}
src/runtime/mod.rs
Show inline comments
 
@@ -84,25 +84,25 @@ enum Msg {
 
enum SetupMsg {
 
    MyPortInfo(MyPortInfo),
 
    LeaderWave { wave_leader: ConnectorId },
 
    LeaderAnnounce { tree_leader: ConnectorId },
 
    YouAreMyParent,
 
    SessionGather { unoptimized_map: HashMap<ConnectorId, SessionInfo> },
 
    SessionScatter { optimized_map: HashMap<ConnectorId, SessionInfo> },
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
struct SessionInfo {
 
    serde_proto_description: SerdeProtocolDescription,
 
    port_info: PortInfo,
 
    getter_for_incoming: Vec<PortId>,
 
    endpoint_incoming_to_getter: Vec<PortId>,
 
    proto_components: HashMap<ProtoComponentId, ProtoComponent>,
 
}
 
#[derive(Debug, Clone)]
 
struct SerdeProtocolDescription(Arc<ProtocolDescription>);
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
struct CommMsg {
 
    round_index: usize,
 
    contents: CommMsgContents,
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
enum CommMsgContents {
 
    SendPayload(SendPayloadMsg),
 
@@ -295,37 +295,37 @@ impl Connector {
 
        let route = Route::LocalComponent(ComponentId::Native);
 
        cu.port_info.routes.insert(o, route);
 
        cu.port_info.routes.insert(i, route);
 
        log!(cu.logger, "Added port pair (out->in) {:?} -> {:?}", o, i);
 
        [o, i]
 
    }
 
    pub fn add_component(
 
        &mut self,
 
        identifier: &[u8],
 
        ports: &[PortId],
 
    ) -> Result<(), AddComponentError> {
 
        // called by the USER. moves ports owned by the NATIVE
 
        use AddComponentError::*;
 
        use AddComponentError as Ace;
 
        // 1. check if this is OK
 
        let cu = &mut self.unphased;
 
        let polarities = cu.proto_description.component_polarities(identifier)?;
 
        if polarities.len() != ports.len() {
 
            return Err(WrongNumberOfParamaters { expected: polarities.len() });
 
            return Err(Ace::WrongNumberOfParamaters { expected: polarities.len() });
 
        }
 
        for (&expected_polarity, port) in polarities.iter().zip(ports.iter()) {
 
            if !cu.native_ports.contains(port) {
 
                return Err(UnknownPort(*port));
 
                return Err(Ace::UnknownPort(*port));
 
            }
 
            if expected_polarity != *cu.port_info.polarities.get(port).unwrap() {
 
                return Err(WrongPortPolarity { port: *port, expected_polarity });
 
                return Err(Ace::WrongPortPolarity { port: *port, expected_polarity });
 
            }
 
        }
 
        // 3. remove ports from old component & update port->route
 
        let new_id = cu.id_manager.new_proto_component_id();
 
        for port in ports.iter() {
 
            cu.port_info.routes.insert(*port, Route::LocalComponent(ComponentId::Proto(new_id)));
 
        }
 
        cu.native_ports.retain(|port| !ports.contains(port));
 
        // 4. add new component
 
        cu.proto_components.insert(
 
            new_id,
 
            ProtoComponent {
src/runtime/setup.rs
Show inline comments
 
@@ -41,30 +41,30 @@ impl Connector {
 
                    up.logger,
 
                    "Added net port {:?} with polarity {:?} and endpoint setup {:?} ",
 
                    p,
 
                    polarity,
 
                    &endpoint_setup
 
                );
 
                endpoint_setups.push((p, endpoint_setup));
 
                Ok(p)
 
            }
 
        }
 
    }
 
    pub fn connect(&mut self, timeout: Option<Duration>) -> Result<(), ConnectError> {
 
        use ConnectError::*;
 
        use ConnectError as Ce;
 
        let Self { unphased: cu, phased } = self;
 
        match phased {
 
            ConnectorPhased::Communication { .. } => {
 
                log!(cu.logger, "Call to connecting in connected state");
 
                Err(AlreadyConnected)
 
                Err(Ce::AlreadyConnected)
 
            }
 
            ConnectorPhased::Setup { endpoint_setups, .. } => {
 
                log!(cu.logger, "~~~ CONNECT called timeout {:?}", timeout);
 
                let deadline = timeout.map(|to| Instant::now() + to);
 
                // connect all endpoints in parallel; send and receive peer ids through ports
 
                let mut endpoint_manager = new_endpoint_manager(
 
                    &mut *cu.logger,
 
                    endpoint_setups,
 
                    &mut cu.port_info,
 
                    deadline,
 
                )?;
 
                log!(
 
@@ -94,25 +94,25 @@ impl Connector {
 
            }
 
        }
 
    }
 
}
 
fn new_endpoint_manager(
 
    logger: &mut dyn Logger,
 
    endpoint_setups: &[(PortId, EndpointSetup)],
 
    port_info: &mut PortInfo,
 
    deadline: Option<Instant>,
 
) -> Result<EndpointManager, ConnectError> {
 
    ////////////////////////////////////////////
 
    use std::sync::atomic::AtomicBool;
 
    use ConnectError::*;
 
    use ConnectError as Ce;
 
    const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE);
 
    struct Todo {
 
        todo_endpoint: TodoEndpoint,
 
        endpoint_setup: EndpointSetup,
 
        local_port: PortId,
 
        sent_local_port: bool,          // true <-> I've sent my local port
 
        recv_peer_port: Option<PortId>, // Some(..) <-> I've received my peer's port
 
    }
 
    enum TodoEndpoint {
 
        Accepting(TcpListener),
 
        Endpoint(Endpoint),
 
    }
 
@@ -120,75 +120,75 @@ fn new_endpoint_manager(
 
        token: Token,
 
        local_port: PortId,
 
        endpoint_setup: &EndpointSetup,
 
        poll: &mut Poll,
 
    ) -> Result<Todo, ConnectError> {
 
        let todo_endpoint = if let EndpointPolarity::Active = endpoint_setup.endpoint_polarity {
 
            let mut stream = TcpStream::connect(endpoint_setup.sock_addr)
 
                .expect("mio::TcpStream connect should not fail!");
 
            poll.registry().register(&mut stream, token, BOTH).unwrap();
 
            TodoEndpoint::Endpoint(Endpoint { stream, inbox: vec![] })
 
        } else {
 
            let mut listener = TcpListener::bind(endpoint_setup.sock_addr)
 
                .map_err(|_| BindFailed(endpoint_setup.sock_addr))?;
 
                .map_err(|_| Ce::BindFailed(endpoint_setup.sock_addr))?;
 
            poll.registry().register(&mut listener, token, BOTH).unwrap();
 
            TodoEndpoint::Accepting(listener)
 
        };
 
        Ok(Todo {
 
            todo_endpoint,
 
            local_port,
 
            sent_local_port: false,
 
            recv_peer_port: None,
 
            endpoint_setup: endpoint_setup.clone(),
 
        })
 
    }
 
    ////////////////////////////////////////////
 

	
 
    // 1. Start to construct EndpointManager
 
    const WAKER_TOKEN: Token = Token(usize::MAX);
 
    const WAKER_PERIOD: Duration = Duration::from_millis(300);
 

	
 
    assert!(endpoint_setups.len() < WAKER_TOKEN.0); // using MAX usize as waker token
 

	
 
    let mut waker_continue_signal: Option<Arc<AtomicBool>> = None;
 
    let mut poll = Poll::new().map_err(|_| PollInitFailed)?;
 
    let mut poll = Poll::new().map_err(|_| Ce::PollInitFailed)?;
 
    let mut events = Events::with_capacity(endpoint_setups.len() * 2 + 4);
 
    let mut polled_undrained = VecSet::default();
 
    let mut delayed_messages = vec![];
 

	
 
    // 2. create a registered (TcpListener/Endpoint) for passive / active respectively
 
    let mut todos = endpoint_setups
 
        .iter()
 
        .enumerate()
 
        .map(|(index, (local_port, endpoint_setup))| {
 
            init_todo(Token(index), *local_port, endpoint_setup, &mut poll)
 
        })
 
        .collect::<Result<Vec<Todo>, ConnectError>>()?;
 

	
 
    // 3. Using poll to drive progress:
 
    //    - accept an incoming connection for each TcpListener (turning them into endpoints too)
 
    //    - for each endpoint, send the local PortId
 
    //    - for each endpoint, recv the peer's PortId, and
 

	
 
    // all in connect_failed are NOT registered with Poll
 
    let mut connect_failed: HashSet<usize> = Default::default();
 

	
 
    let mut setup_incomplete: HashSet<usize> = (0..todos.len()).collect();
 
    while !setup_incomplete.is_empty() {
 
        let remaining = if let Some(deadline) = deadline {
 
            Some(deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?)
 
            Some(deadline.checked_duration_since(Instant::now()).ok_or(Ce::Timeout)?)
 
        } else {
 
            None
 
        };
 
        poll.poll(&mut events, remaining).map_err(|_| PollFailed)?;
 
        poll.poll(&mut events, remaining).map_err(|_| Ce::PollFailed)?;
 
        for event in events.iter() {
 
            let token = event.token();
 
            let Token(index) = token;
 
            if token == WAKER_TOKEN {
 
                log!(
 
                    logger,
 
                    "Notification from waker. connect_failed is {:?}",
 
                    connect_failed.iter()
 
                );
 
                assert!(waker_continue_signal.is_some());
 
                for index in connect_failed.drain() {
 
                    let todo: &mut Todo = &mut todos[index];
 
@@ -223,33 +223,33 @@ fn new_endpoint_manager(
 
                                "Endpoint[{}] accepted a connection from {:?}",
 
                                index,
 
                                peer_addr
 
                            );
 
                            let endpoint = Endpoint { stream, inbox: vec![] };
 
                            todo.todo_endpoint = TodoEndpoint::Endpoint(endpoint);
 
                        }
 
                        Err(e) if would_block(&e) => {
 
                            log!(logger, "Spurious wakeup on listener {:?}", index)
 
                        }
 
                        Err(_) => {
 
                            log!(logger, "accept() failure on index {}", index);
 
                            return Err(AcceptFailed(listener.local_addr().unwrap()));
 
                            return Err(Ce::AcceptFailed(listener.local_addr().unwrap()));
 
                        }
 
                    }
 
                }
 
                if let TodoEndpoint::Endpoint(endpoint) = &mut todo.todo_endpoint {
 
                    if event.is_error() {
 
                        if todo.endpoint_setup.endpoint_polarity == EndpointPolarity::Passive {
 
                            // right now you cannot retry an acceptor.
 
                            return Err(AcceptFailed(endpoint.stream.local_addr().unwrap()));
 
                            return Err(Ce::AcceptFailed(endpoint.stream.local_addr().unwrap()));
 
                        }
 
                        if connect_failed.insert(index) {
 
                            log!(
 
                                logger,
 
                                "Connection failed for {:?}. List is {:?}",
 
                                index,
 
                                connect_failed.iter()
 
                            );
 
                            poll.registry().deregister(&mut endpoint.stream).unwrap();
 
                        } else {
 
                            // spurious wakeup
 
                            continue;
 
@@ -279,33 +279,33 @@ fn new_endpoint_manager(
 
                        // spurious wakeup
 
                        continue;
 
                    }
 
                    let local_polarity = *port_info.polarities.get(&todo.local_port).unwrap();
 
                    if event.is_writable() && !todo.sent_local_port {
 
                        let msg = Msg::SetupMsg(SetupMsg::MyPortInfo(MyPortInfo {
 
                            polarity: local_polarity,
 
                            port: todo.local_port,
 
                        }));
 
                        endpoint
 
                            .send(&msg)
 
                            .map_err(|e| {
 
                                EndpointSetupError(endpoint.stream.local_addr().unwrap(), e)
 
                                Ce::EndpointSetupError(endpoint.stream.local_addr().unwrap(), e)
 
                            })
 
                            .unwrap();
 
                        log!(logger, "endpoint[{}] sent msg {:?}", index, &msg);
 
                        todo.sent_local_port = true;
 
                    }
 
                    if event.is_readable() && todo.recv_peer_port.is_none() {
 
                        let maybe_msg = endpoint.try_recv(logger).map_err(|e| {
 
                            EndpointSetupError(endpoint.stream.local_addr().unwrap(), e)
 
                            Ce::EndpointSetupError(endpoint.stream.local_addr().unwrap(), e)
 
                        })?;
 
                        if maybe_msg.is_some() && !endpoint.inbox.is_empty() {
 
                            polled_undrained.insert(index);
 
                        }
 
                        match maybe_msg {
 
                            None => {} // msg deserialization incomplete
 
                            Some(Msg::SetupMsg(SetupMsg::MyPortInfo(peer_info))) => {
 
                                log!(logger, "endpoint[{}] got peer info {:?}", index, peer_info);
 
                                if peer_info.polarity == local_polarity {
 
                                    return Err(ConnectError::PortPeerPolarityMismatch(
 
                                        todo.local_port,
 
                                    ));
 
@@ -377,37 +377,37 @@ fn new_endpoint_manager(
 
        delayed_messages: Default::default(),
 
        endpoint_exts,
 
    })
 
}
 

	
 
fn init_neighborhood(
 
    connector_id: ConnectorId,
 
    logger: &mut dyn Logger,
 
    em: &mut EndpointManager,
 
    deadline: Option<Instant>,
 
) -> Result<Neighborhood, ConnectError> {
 
    ////////////////////////////////
 
    use {ConnectError::*, Msg::SetupMsg as S, SetupMsg::*};
 
    use {ConnectError as Ce, Msg::SetupMsg as S, SetupMsg as Sm};
 
    #[derive(Debug)]
 
    struct WaveState {
 
        parent: Option<usize>,
 
        leader: ConnectorId,
 
    }
 
    fn do_wave(
 
        em: &mut EndpointManager,
 
        awaiting: &mut HashSet<usize>,
 
        ws: &WaveState,
 
    ) -> Result<(), ConnectError> {
 
        awaiting.clear();
 
        let msg = S(LeaderWave { wave_leader: ws.leader });
 
        let msg = S(Sm::LeaderWave { wave_leader: ws.leader });
 
        for index in em.index_iter() {
 
            if Some(index) != ws.parent {
 
                em.send_to_setup(index, &msg)?;
 
                awaiting.insert(index);
 
            }
 
        }
 
        Ok(())
 
    }
 
    ///////////////////////
 
    /*
 
    Conceptually, we have two distinct disstributed algorithms back-to-back
 
    1. Leader election using echo algorithm with extinction.
 
@@ -436,33 +436,33 @@ fn init_neighborhood(
 
    let election_result: WaveState = {
 
        // initially: No parent, I'm the best leader.
 
        let mut best_wave = WaveState { parent: None, leader: connector_id };
 
        // start a wave for this initial state
 
        do_wave(em, &mut awaiting, &best_wave)?;
 
        // with 1+ neighbors, progress is only made in response to incoming messages
 
        em.undelay_all();
 
        'election: loop {
 
            log!(logger, "Election loop. awaiting {:?}...", awaiting.iter());
 
            let (recv_index, msg) = em.try_recv_any_setup(logger, deadline)?;
 
            log!(logger, "Received from index {:?} msg {:?}", &recv_index, &msg);
 
            match msg {
 
                S(LeaderAnnounce { tree_leader }) => {
 
                S(Sm::LeaderAnnounce { tree_leader }) => {
 
                    let election_result =
 
                        WaveState { leader: tree_leader, parent: Some(recv_index) };
 
                    log!(logger, "Election lost! Result {:?}", &election_result);
 
                    assert!(election_result.leader >= best_wave.leader);
 
                    assert_ne!(election_result.leader, connector_id);
 
                    break 'election election_result;
 
                }
 
                S(LeaderWave { wave_leader }) => {
 
                S(Sm::LeaderWave { wave_leader }) => {
 
                    use Ordering as O;
 
                    match wave_leader.cmp(&best_wave.leader) {
 
                        O::Less => log!(
 
                            logger,
 
                            "Ignoring wave with Id {:?}<{:?}",
 
                            wave_leader,
 
                            best_wave.leader
 
                        ),
 
                        O::Greater => {
 
                            log!(
 
                                logger,
 
                                "Joining wave with Id {:?}>{:?}",
 
@@ -495,242 +495,252 @@ fn init_neighborhood(
 
                                        &msg
 
                                    );
 
                                    em.send_to_setup(parent, &msg)?;
 
                                } else {
 
                                    let election_result: WaveState = best_wave;
 
                                    log!(logger, "Election won! Result {:?}", &election_result);
 
                                    break 'election election_result;
 
                                }
 
                            }
 
                        }
 
                    }
 
                }
 
                msg @ S(YouAreMyParent) | msg @ S(MyPortInfo(_)) => {
 
                msg @ S(Sm::YouAreMyParent) | msg @ S(Sm::MyPortInfo(_)) => {
 
                    log!(logger, "Endpont {:?} sent unexpected msg! {:?}", recv_index, &msg);
 
                    return Err(SetupAlgMisbehavior);
 
                    return Err(Ce::SetupAlgMisbehavior);
 
                }
 
                msg @ S(SessionScatter { .. })
 
                | msg @ S(SessionGather { .. })
 
                msg @ S(Sm::SessionScatter { .. })
 
                | msg @ S(Sm::SessionGather { .. })
 
                | msg @ Msg::CommMsg { .. } => {
 
                    log!(logger, "delaying msg {:?} during election algorithm", msg);
 
                    em.delayed_messages.push((recv_index, msg));
 
                }
 
            }
 
        }
 
    };
 

	
 
    // starting algorithm 2. Send a message to every neighbor
 
    log!(logger, "Starting tree construction. Step 1: send one msg per neighbor");
 
    awaiting.clear();
 
    for index in em.index_iter() {
 
        if Some(index) == election_result.parent {
 
            em.send_to_setup(index, &S(YouAreMyParent))?;
 
            em.send_to_setup(index, &S(Sm::YouAreMyParent))?;
 
        } else {
 
            awaiting.insert(index);
 
            em.send_to_setup(index, &S(LeaderAnnounce { tree_leader: election_result.leader }))?;
 
            em.send_to_setup(
 
                index,
 
                &S(Sm::LeaderAnnounce { tree_leader: election_result.leader }),
 
            )?;
 
        }
 
    }
 
    let mut children = vec![];
 
    em.undelay_all();
 
    while !awaiting.is_empty() {
 
        log!(logger, "Tree construction_loop loop. awaiting {:?}...", awaiting.iter());
 
        let (recv_index, msg) = em.try_recv_any_setup(logger, deadline)?;
 
        log!(logger, "Received from index {:?} msg {:?}", &recv_index, &msg);
 
        match msg {
 
            S(LeaderAnnounce { .. }) => {
 
            S(Sm::LeaderAnnounce { .. }) => {
 
                // not a child
 
                log!(
 
                    logger,
 
                    "Got reply from non-child index {:?}. Children: {:?}",
 
                    recv_index,
 
                    children.iter()
 
                );
 
                if !awaiting.remove(&recv_index) {
 
                    return Err(SetupAlgMisbehavior);
 
                    return Err(Ce::SetupAlgMisbehavior);
 
                }
 
            }
 
            S(YouAreMyParent) => {
 
            S(Sm::YouAreMyParent) => {
 
                if !awaiting.remove(&recv_index) {
 
                    log!(
 
                        logger,
 
                        "Got reply from child index {:?}. Children before... {:?}",
 
                        recv_index,
 
                        children.iter()
 
                    );
 
                    return Err(SetupAlgMisbehavior);
 
                    return Err(Ce::SetupAlgMisbehavior);
 
                }
 
                children.push(recv_index);
 
            }
 
            msg @ S(MyPortInfo(_)) | msg @ S(LeaderWave { .. }) => {
 
            msg @ S(Sm::MyPortInfo(_)) | msg @ S(Sm::LeaderWave { .. }) => {
 
                log!(logger, "discarding old message {:?} during election", msg);
 
            }
 
            msg @ S(SessionScatter { .. })
 
            | msg @ S(SessionGather { .. })
 
            msg @ S(Sm::SessionScatter { .. })
 
            | msg @ S(Sm::SessionGather { .. })
 
            | msg @ Msg::CommMsg { .. } => {
 
                log!(logger, "delaying msg {:?} during election", msg);
 
                em.delayed_messages.push((recv_index, msg));
 
            }
 
        }
 
    }
 
    children.shrink_to_fit();
 
    let neighborhood =
 
        Neighborhood { parent: election_result.parent, children: VecSet::new(children) };
 
    log!(logger, "Neighborhood constructed {:?}", &neighborhood);
 
    Ok(neighborhood)
 
}
 

	
 
fn session_optimize(
 
    cu: &mut ConnectorUnphased,
 
    comm: &mut ConnectorCommunication,
 
    deadline: Option<Instant>,
 
) -> Result<(), ConnectError> {
 
    ////////////////////////////////////////
 
    use {ConnectError::*, Msg::SetupMsg as S, SetupMsg::*};
 
    use {ConnectError as Ce, Msg::SetupMsg as S, SetupMsg as Sm};
 
    ////////////////////////////////////////
 
    log!(cu.logger, "Beginning session optimization");
 
    // populate session_info_map from a message per child
 
    let mut unoptimized_map: HashMap<ConnectorId, SessionInfo> = Default::default();
 
    let mut awaiting: HashSet<usize> = comm.neighborhood.children.iter().copied().collect();
 
    comm.endpoint_manager.undelay_all();
 
    while !awaiting.is_empty() {
 
        log!(
 
            cu.logger,
 
            "Session gather loop. awaiting info from children {:?}...",
 
            awaiting.iter()
 
        );
 
        let (recv_index, msg) =
 
            comm.endpoint_manager.try_recv_any_setup(&mut *cu.logger, deadline)?;
 
        log!(cu.logger, "Received from index {:?} msg {:?}", &recv_index, &msg);
 
        match msg {
 
            S(SessionGather { unoptimized_map: child_unoptimized_map }) => {
 
            S(Sm::SessionGather { unoptimized_map: child_unoptimized_map }) => {
 
                if !awaiting.remove(&recv_index) {
 
                    log!(
 
                        cu.logger,
 
                        "Wasn't expecting session info from {:?}. Got {:?}",
 
                        recv_index,
 
                        &child_unoptimized_map
 
                    );
 
                    return Err(SetupAlgMisbehavior);
 
                    return Err(Ce::SetupAlgMisbehavior);
 
                }
 
                unoptimized_map.extend(child_unoptimized_map.into_iter());
 
            }
 
            msg @ S(YouAreMyParent)
 
            | msg @ S(MyPortInfo(..))
 
            | msg @ S(LeaderAnnounce { .. })
 
            | msg @ S(LeaderWave { .. }) => {
 
            msg @ S(Sm::YouAreMyParent)
 
            | msg @ S(Sm::MyPortInfo(..))
 
            | msg @ S(Sm::LeaderAnnounce { .. })
 
            | msg @ S(Sm::LeaderWave { .. }) => {
 
                log!(cu.logger, "discarding old message {:?} during election", msg);
 
            }
 
            msg @ S(SessionScatter { .. }) => {
 
            msg @ S(Sm::SessionScatter { .. }) => {
 
                log!(
 
                    cu.logger,
 
                    "Endpoint {:?} sent unexpected scatter! {:?} I've not contributed yet!",
 
                    recv_index,
 
                    &msg
 
                );
 
                return Err(SetupAlgMisbehavior);
 
                return Err(Ce::SetupAlgMisbehavior);
 
            }
 
            msg @ Msg::CommMsg(..) => {
 
                log!(cu.logger, "delaying msg {:?} during session optimization", msg);
 
                comm.endpoint_manager.delayed_messages.push((recv_index, msg));
 
            }
 
        }
 
    }
 
    log!(
 
        cu.logger,
 
        "Gathered all children's maps. ConnectorId set is... {:?}",
 
        unoptimized_map.keys()
 
    );
 
    let my_session_info = SessionInfo {
 
        port_info: cu.port_info.clone(),
 
        proto_components: cu.proto_components.clone(),
 
        serde_proto_description: SerdeProtocolDescription(cu.proto_description.clone()),
 
        getter_for_incoming: comm
 
        endpoint_incoming_to_getter: comm
 
            .endpoint_manager
 
            .endpoint_exts
 
            .iter()
 
            .map(|ee| ee.getter_for_incoming)
 
            .collect(),
 
    };
 
    unoptimized_map.insert(cu.id_manager.connector_id, my_session_info);
 
    log!(cu.logger, "Inserting my own info. Unoptimized subtree map is {:?}", &unoptimized_map);
 

	
 
    // acquire the optimized info...
 
    let optimized_map = if let Some(parent) = comm.neighborhood.parent {
 
        // ... as a message from my parent
 
        log!(cu.logger, "Forwarding gathered info to parent {:?}", parent);
 
        let msg = S(SessionGather { unoptimized_map });
 
        let msg = S(Sm::SessionGather { unoptimized_map });
 
        comm.endpoint_manager.send_to_setup(parent, &msg)?;
 
        'scatter_loop: loop {
 
            log!(
 
                cu.logger,
 
                "Session scatter recv loop. awaiting info from children {:?}...",
 
                awaiting.iter()
 
            );
 
            let (recv_index, msg) =
 
                comm.endpoint_manager.try_recv_any_setup(&mut *cu.logger, deadline)?;
 
            log!(cu.logger, "Received from index {:?} msg {:?}", &recv_index, &msg);
 
            match msg {
 
                S(SessionScatter { optimized_map }) => {
 
                S(Sm::SessionScatter { optimized_map }) => {
 
                    if recv_index != parent {
 
                        log!(cu.logger, "I expected the scatter from my parent only!");
 
                        return Err(SetupAlgMisbehavior);
 
                        return Err(Ce::SetupAlgMisbehavior);
 
                    }
 
                    break 'scatter_loop optimized_map;
 
                }
 
                msg @ Msg::CommMsg { .. } => {
 
                    log!(cu.logger, "delaying msg {:?} during scatter recv", msg);
 
                    comm.endpoint_manager.delayed_messages.push((recv_index, msg));
 
                }
 
                msg @ S(SessionGather { .. })
 
                | msg @ S(YouAreMyParent)
 
                | msg @ S(MyPortInfo(..))
 
                | msg @ S(LeaderAnnounce { .. })
 
                | msg @ S(LeaderWave { .. }) => {
 
                msg @ S(Sm::SessionGather { .. })
 
                | msg @ S(Sm::YouAreMyParent)
 
                | msg @ S(Sm::MyPortInfo(..))
 
                | msg @ S(Sm::LeaderAnnounce { .. })
 
                | msg @ S(Sm::LeaderWave { .. }) => {
 
                    log!(cu.logger, "discarding old message {:?} during election", msg);
 
                }
 
            }
 
        }
 
    } else {
 
        // by computing it myself
 
        log!(cu.logger, "I am the leader! I will optimize this session");
 
        leader_session_map_optimize(&mut *cu.logger, unoptimized_map)?
 
    };
 
    log!(
 
        cu.logger,
 
        "Optimized info map is {:?}. Sending to children {:?}",
 
        &optimized_map,
 
        comm.neighborhood.children.iter()
 
    );
 
    log!(cu.logger, "All session info dumped!: {:#?}", &optimized_map);
 
    let optimized_info =
 
        optimized_map.get(&cu.id_manager.connector_id).expect("HEY NO INFO FOR ME?").clone();
 
    let msg = S(SessionScatter { optimized_map });
 
    let msg = S(Sm::SessionScatter { optimized_map });
 
    for &child in comm.neighborhood.children.iter() {
 
        comm.endpoint_manager.send_to_setup(child, &msg)?;
 
    }
 
    apply_optimizations(cu, comm, optimized_info)?;
 
    log!(cu.logger, "Session optimizations applied");
 
    Ok(())
 
}
 
fn leader_session_map_optimize(
 
    logger: &mut dyn Logger,
 
    unoptimized_map: HashMap<ConnectorId, SessionInfo>,
 
) -> Result<HashMap<ConnectorId, SessionInfo>, ConnectError> {
 
    log!(logger, "Session map optimize START");
 
    log!(logger, "Session map optimize END");
 
    Ok(unoptimized_map)
 
}
 
fn apply_optimizations(
 
    cu: &mut ConnectorUnphased,
 
    comm: &mut ConnectorCommunication,
 
    session_info: SessionInfo,
 
) -> Result<(), ConnectError> {
 
    let SessionInfo { proto_components, port_info, serde_proto_description, getter_for_incoming } =
 
        session_info;
 
    let SessionInfo {
 
        proto_components,
 
        port_info,
 
        serde_proto_description,
 
        endpoint_incoming_to_getter,
 
    } = session_info;
 
    // TODO some info which should be read-only can be mutated with the current scheme
 
    cu.port_info = port_info;
 
    cu.proto_components = proto_components;
 
    cu.proto_description = serde_proto_description.0;
 
    for (ee, getter) in comm.endpoint_manager.endpoint_exts.iter_mut().zip(getter_for_incoming) {
 
    for (ee, getter) in
 
        comm.endpoint_manager.endpoint_exts.iter_mut().zip(endpoint_incoming_to_getter)
 
    {
 
        ee.getter_for_incoming = getter;
 
    }
 
    Ok(())
 
}
0 comments (0 inline, 0 general)