Changeset - d67249fd4593
[Not reviewed]
0 8 0
Christopher Esterhuyse - 5 years ago 2020-07-02 12:35:40
christopher.esterhuyse@gmail.com
logging using hex, less glob imports, and endpoint_logging feature flag
8 files changed with 119 insertions and 91 deletions:
0 comments (0 inline, 0 general)
.gitignore
Show inline comments
 
@@ -7,4 +7,5 @@ examples/*/*.exe
 
examples/*.dll
 
examples/reowolf*
 
examples/*.txt
 
logs
 
logs/*
 
logs/*/*
Cargo.toml
Show inline comments
 
@@ -36,4 +36,5 @@ crate-type = ["cdylib"]
 

	
 
[features]
 
default = ["ffi"]
 
ffi = [] # no feature dependencies
 
\ No newline at end of file
 
ffi = [] # no feature dependencies
 
endpoint_logging = [] # see src/macros where a conditional check include endpoint logging
 
\ No newline at end of file
src/common.rs
Show inline comments
 
@@ -87,6 +87,7 @@ pub(crate) enum SyncBlocker {
 
    CouldntCheckFiring(PortId),
 
    PutMsg(PortId, Payload),
 
}
 
pub(crate) struct DenseDebugHex<'a>(pub &'a [u8]);
 

	
 
///////////////////// IMPL /////////////////////
 
impl U32Stream {
 
@@ -176,7 +177,7 @@ impl Debug for ProtoComponentId {
 
}
 
impl Debug for Payload {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        write!(f, "Payload{:x?}", self.as_slice())
 
        write!(f, "Payload[{:?}]", DenseDebugHex(self.as_slice()))
 
    }
 
}
 
impl std::ops::Not for Polarity {
 
@@ -189,3 +190,11 @@ impl std::ops::Not for Polarity {
 
        }
 
    }
 
}
 
impl Debug for DenseDebugHex<'_> {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        for b in self.0 {
 
            write!(f, "{:02X?}", b)?;
 
        }
 
        Ok(())
 
    }
 
}
src/macros.rs
Show inline comments
 
macro_rules! endptlog {
 
    ($logger:expr, $($arg:tt)*) => {{
 
        // let w = $logger.line_writer();
 
        // let _ = writeln!(w, $($arg)*);
 
    	if cfg!(feature = "endpoint_logging") {
 
	        let w = $logger.line_writer();
 
	        let _ = writeln!(w, $($arg)*);
 
	    }
 
    }};
 
}
 
macro_rules! log {
src/runtime/communication.rs
Show inline comments
 
@@ -60,23 +60,23 @@ impl ReplaceBoolTrue for bool {
 
////////////////
 
impl Connector {
 
    pub fn gotten(&mut self, port: PortId) -> Result<&Payload, GottenError> {
 
        use GottenError::*;
 
        use GottenError as Ge;
 
        let Self { phased, .. } = self;
 
        match phased {
 
            ConnectorPhased::Setup { .. } => Err(NoPreviousRound),
 
            ConnectorPhased::Setup { .. } => Err(Ge::NoPreviousRound),
 
            ConnectorPhased::Communication(comm) => match &comm.round_result {
 
                Err(_) => Err(PreviousSyncFailed),
 
                Ok(None) => Err(NoPreviousRound),
 
                Ok(Some(round_ok)) => round_ok.gotten.get(&port).ok_or(PortDidntGet),
 
                Err(_) => Err(Ge::PreviousSyncFailed),
 
                Ok(None) => Err(Ge::NoPreviousRound),
 
                Ok(Some(round_ok)) => round_ok.gotten.get(&port).ok_or(Ge::PortDidntGet),
 
            },
 
        }
 
    }
 
    pub fn next_batch(&mut self) -> Result<usize, NextBatchError> {
 
        // returns index of new batch
 
        use NextBatchError::*;
 
        use NextBatchError as Nbe;
 
        let Self { phased, .. } = self;
 
        match phased {
 
            ConnectorPhased::Setup { .. } => Err(NotConnected),
 
            ConnectorPhased::Setup { .. } => Err(Nbe::NotConnected),
 
            ConnectorPhased::Communication(comm) => {
 
                comm.native_batches.push(Default::default());
 
                Ok(comm.native_batches.len() - 1)
 
@@ -88,18 +88,18 @@ impl Connector {
 
        port: PortId,
 
        expect_polarity: Polarity,
 
    ) -> Result<&mut NativeBatch, PortOpError> {
 
        use PortOpError::*;
 
        use PortOpError as Poe;
 
        let Self { unphased, phased } = self;
 
        if !unphased.native_ports.contains(&port) {
 
            return Err(PortUnavailable);
 
            return Err(Poe::PortUnavailable);
 
        }
 
        match unphased.port_info.polarities.get(&port) {
 
            Some(p) if *p == expect_polarity => {}
 
            Some(_) => return Err(WrongPolarity),
 
            None => return Err(UnknownPolarity),
 
            Some(_) => return Err(Poe::WrongPolarity),
 
            None => return Err(Poe::UnknownPolarity),
 
        }
 
        match phased {
 
            ConnectorPhased::Setup { .. } => Err(NotConnected),
 
            ConnectorPhased::Setup { .. } => Err(Poe::NotConnected),
 
            ConnectorPhased::Communication(comm) => {
 
                let batch = comm.native_batches.last_mut().unwrap(); // length >= 1 is invariant
 
                Ok(batch)
 
@@ -107,22 +107,22 @@ impl Connector {
 
        }
 
    }
 
    pub fn put(&mut self, port: PortId, payload: Payload) -> Result<(), PortOpError> {
 
        use PortOpError::*;
 
        use PortOpError as Poe;
 
        let batch = self.port_op_access(port, Putter)?;
 
        if batch.to_put.contains_key(&port) {
 
            Err(MultipleOpsOnPort)
 
            Err(Poe::MultipleOpsOnPort)
 
        } else {
 
            batch.to_put.insert(port, payload);
 
            Ok(())
 
        }
 
    }
 
    pub fn get(&mut self, port: PortId) -> Result<(), PortOpError> {
 
        use PortOpError::*;
 
        use PortOpError as Poe;
 
        let batch = self.port_op_access(port, Getter)?;
 
        if batch.to_get.insert(port) {
 
            Ok(())
 
        } else {
 
            Err(MultipleOpsOnPort)
 
            Err(Poe::MultipleOpsOnPort)
 
        }
 
    }
 
    // entrypoint for caller. overwrites round result enum, and returns what happened
 
@@ -423,7 +423,6 @@ impl Connector {
 
                    Some(Route::LocalComponent(ComponentId::Native)) => branching_native.feed_msg(
 
                        cu,
 
                        &mut solution_storage,
 
                        // &mut Pay
 
                        getter,
 
                        &send_payload_msg,
 
                    ),
 
@@ -432,7 +431,6 @@ impl Connector {
 
                            branching_proto_components.get_mut(proto_component_id)
 
                        {
 
                            let proto_component_id = *proto_component_id;
 
                            // let ConnectorUnphased { port_info, proto_description, .. } = cu;
 
                            branching_component.feed_msg(
 
                                cu,
 
                                &mut solution_storage,
src/runtime/endpoints.rs
Show inline comments
 
@@ -10,7 +10,6 @@ enum TryRecyAnyError {
 
    PollFailed,
 
    EndpointError { error: EndpointError, index: usize },
 
}
 

	
 
/////////////////////
 
impl Endpoint {
 
    fn bincode_opts() -> impl bincode::config::Options {
 
@@ -20,40 +19,52 @@ impl Endpoint {
 
        &mut self,
 
        logger: &mut dyn Logger,
 
    ) -> Result<Option<T>, EndpointError> {
 
        use EndpointError::*;
 
        use EndpointError as Ee;
 
        // populate inbox as much as possible
 
        let before_len = self.inbox.len();
 
        'read_loop: loop {
 
            let res = self.stream.read_to_end(&mut self.inbox);
 
            endptlog!(logger, "Stream read to end {:?}", &res);
 
            match res {
 
                Err(e) if would_block(&e) => break 'read_loop,
 
                Ok(0) => break 'read_loop,
 
                Ok(_) => (),
 
                Err(_e) => return Err(BrokenEndpoint),
 
                Err(_e) => return Err(Ee::BrokenEndpoint),
 
            }
 
        }
 
        endptlog!(logger, "Inbox bytes {:x?}", &self.inbox);
 
        endptlog!(
 
            logger,
 
            "Inbox bytes [{:x?}| {:x?}]",
 
            DenseDebugHex(&self.inbox[..before_len]),
 
            DenseDebugHex(&self.inbox[before_len..]),
 
        );
 
        let mut monitored = MonitoredReader::from(&self.inbox[..]);
 
        use bincode::config::Options;
 
        match Self::bincode_opts().deserialize_from(&mut monitored) {
 
            Ok(msg) => {
 
                let msg_size = monitored.bytes_read();
 
                self.inbox.drain(0..(msg_size.try_into().unwrap()));
 
                endptlog!(
 
                    logger,
 
                    "Yielding msg. Inbox len {}-{}=={}: [{:?}]",
 
                    self.inbox.len() + msg_size,
 
                    msg_size,
 
                    self.inbox.len(),
 
                    DenseDebugHex(&self.inbox[..]),
 
                );
 
                Ok(Some(msg))
 
            }
 
            Err(e) => match *e {
 
                bincode::ErrorKind::Io(k) if k.kind() == std::io::ErrorKind::UnexpectedEof => {
 
                    Ok(None)
 
                }
 
                _ => Err(MalformedMessage),
 
                _ => Err(Ee::MalformedMessage),
 
            },
 
        }
 
    }
 
    pub(super) fn send<T: serde::ser::Serialize>(&mut self, msg: &T) -> Result<(), EndpointError> {
 
        use bincode::config::Options;
 
        Self::bincode_opts()
 
            .serialize_into(&mut self.stream, msg)
 
            .map_err(|_| EndpointError::BrokenEndpoint)
 
        use EndpointError as Ee;
 
        Self::bincode_opts().serialize_into(&mut self.stream, msg).map_err(|_| Ee::BrokenEndpoint)
 
    }
 
}
 

	
 
@@ -74,9 +85,6 @@ impl EndpointManager {
 
            ConnectError::EndpointSetupError(endpoint.stream.local_addr().unwrap(), err)
 
        })
 
    }
 
    pub(super) fn send_to(&mut self, index: usize, msg: &Msg) -> Result<(), EndpointError> {
 
        self.endpoint_exts[index].endpoint.send(msg)
 
    }
 
    pub(super) fn try_recv_any_comms(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
@@ -185,7 +193,6 @@ impl<R: Read> Read for MonitoredReader<R> {
 
        Ok(n)
 
    }
 
}
 

	
 
impl Into<Msg> for SetupMsg {
 
    fn into(self) -> Msg {
 
        Msg::SetupMsg(self)
src/runtime/mod.rs
Show inline comments
 
@@ -93,7 +93,7 @@ enum SetupMsg {
 
struct SessionInfo {
 
    serde_proto_description: SerdeProtocolDescription,
 
    port_info: PortInfo,
 
    getter_for_incoming: Vec<PortId>,
 
    endpoint_incoming_to_getter: Vec<PortId>,
 
    proto_components: HashMap<ProtoComponentId, ProtoComponent>,
 
}
 
#[derive(Debug, Clone)]
 
@@ -304,19 +304,19 @@ impl Connector {
 
        ports: &[PortId],
 
    ) -> Result<(), AddComponentError> {
 
        // called by the USER. moves ports owned by the NATIVE
 
        use AddComponentError::*;
 
        use AddComponentError as Ace;
 
        // 1. check if this is OK
 
        let cu = &mut self.unphased;
 
        let polarities = cu.proto_description.component_polarities(identifier)?;
 
        if polarities.len() != ports.len() {
 
            return Err(WrongNumberOfParamaters { expected: polarities.len() });
 
            return Err(Ace::WrongNumberOfParamaters { expected: polarities.len() });
 
        }
 
        for (&expected_polarity, port) in polarities.iter().zip(ports.iter()) {
 
            if !cu.native_ports.contains(port) {
 
                return Err(UnknownPort(*port));
 
                return Err(Ace::UnknownPort(*port));
 
            }
 
            if expected_polarity != *cu.port_info.polarities.get(port).unwrap() {
 
                return Err(WrongPortPolarity { port: *port, expected_polarity });
 
                return Err(Ace::WrongPortPolarity { port: *port, expected_polarity });
 
            }
 
        }
 
        // 3. remove ports from old component & update port->route
src/runtime/setup.rs
Show inline comments
 
@@ -50,12 +50,12 @@ impl Connector {
 
        }
 
    }
 
    pub fn connect(&mut self, timeout: Option<Duration>) -> Result<(), ConnectError> {
 
        use ConnectError::*;
 
        use ConnectError as Ce;
 
        let Self { unphased: cu, phased } = self;
 
        match phased {
 
            ConnectorPhased::Communication { .. } => {
 
                log!(cu.logger, "Call to connecting in connected state");
 
                Err(AlreadyConnected)
 
                Err(Ce::AlreadyConnected)
 
            }
 
            ConnectorPhased::Setup { endpoint_setups, .. } => {
 
                log!(cu.logger, "~~~ CONNECT called timeout {:?}", timeout);
 
@@ -103,7 +103,7 @@ fn new_endpoint_manager(
 
) -> Result<EndpointManager, ConnectError> {
 
    ////////////////////////////////////////////
 
    use std::sync::atomic::AtomicBool;
 
    use ConnectError::*;
 
    use ConnectError as Ce;
 
    const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE);
 
    struct Todo {
 
        todo_endpoint: TodoEndpoint,
 
@@ -129,7 +129,7 @@ fn new_endpoint_manager(
 
            TodoEndpoint::Endpoint(Endpoint { stream, inbox: vec![] })
 
        } else {
 
            let mut listener = TcpListener::bind(endpoint_setup.sock_addr)
 
                .map_err(|_| BindFailed(endpoint_setup.sock_addr))?;
 
                .map_err(|_| Ce::BindFailed(endpoint_setup.sock_addr))?;
 
            poll.registry().register(&mut listener, token, BOTH).unwrap();
 
            TodoEndpoint::Accepting(listener)
 
        };
 
@@ -150,7 +150,7 @@ fn new_endpoint_manager(
 
    assert!(endpoint_setups.len() < WAKER_TOKEN.0); // using MAX usize as waker token
 

	
 
    let mut waker_continue_signal: Option<Arc<AtomicBool>> = None;
 
    let mut poll = Poll::new().map_err(|_| PollInitFailed)?;
 
    let mut poll = Poll::new().map_err(|_| Ce::PollInitFailed)?;
 
    let mut events = Events::with_capacity(endpoint_setups.len() * 2 + 4);
 
    let mut polled_undrained = VecSet::default();
 
    let mut delayed_messages = vec![];
 
@@ -175,11 +175,11 @@ fn new_endpoint_manager(
 
    let mut setup_incomplete: HashSet<usize> = (0..todos.len()).collect();
 
    while !setup_incomplete.is_empty() {
 
        let remaining = if let Some(deadline) = deadline {
 
            Some(deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?)
 
            Some(deadline.checked_duration_since(Instant::now()).ok_or(Ce::Timeout)?)
 
        } else {
 
            None
 
        };
 
        poll.poll(&mut events, remaining).map_err(|_| PollFailed)?;
 
        poll.poll(&mut events, remaining).map_err(|_| Ce::PollFailed)?;
 
        for event in events.iter() {
 
            let token = event.token();
 
            let Token(index) = token;
 
@@ -232,7 +232,7 @@ fn new_endpoint_manager(
 
                        }
 
                        Err(_) => {
 
                            log!(logger, "accept() failure on index {}", index);
 
                            return Err(AcceptFailed(listener.local_addr().unwrap()));
 
                            return Err(Ce::AcceptFailed(listener.local_addr().unwrap()));
 
                        }
 
                    }
 
                }
 
@@ -240,7 +240,7 @@ fn new_endpoint_manager(
 
                    if event.is_error() {
 
                        if todo.endpoint_setup.endpoint_polarity == EndpointPolarity::Passive {
 
                            // right now you cannot retry an acceptor.
 
                            return Err(AcceptFailed(endpoint.stream.local_addr().unwrap()));
 
                            return Err(Ce::AcceptFailed(endpoint.stream.local_addr().unwrap()));
 
                        }
 
                        if connect_failed.insert(index) {
 
                            log!(
 
@@ -288,7 +288,7 @@ fn new_endpoint_manager(
 
                        endpoint
 
                            .send(&msg)
 
                            .map_err(|e| {
 
                                EndpointSetupError(endpoint.stream.local_addr().unwrap(), e)
 
                                Ce::EndpointSetupError(endpoint.stream.local_addr().unwrap(), e)
 
                            })
 
                            .unwrap();
 
                        log!(logger, "endpoint[{}] sent msg {:?}", index, &msg);
 
@@ -296,7 +296,7 @@ fn new_endpoint_manager(
 
                    }
 
                    if event.is_readable() && todo.recv_peer_port.is_none() {
 
                        let maybe_msg = endpoint.try_recv(logger).map_err(|e| {
 
                            EndpointSetupError(endpoint.stream.local_addr().unwrap(), e)
 
                            Ce::EndpointSetupError(endpoint.stream.local_addr().unwrap(), e)
 
                        })?;
 
                        if maybe_msg.is_some() && !endpoint.inbox.is_empty() {
 
                            polled_undrained.insert(index);
 
@@ -386,7 +386,7 @@ fn init_neighborhood(
 
    deadline: Option<Instant>,
 
) -> Result<Neighborhood, ConnectError> {
 
    ////////////////////////////////
 
    use {ConnectError::*, Msg::SetupMsg as S, SetupMsg::*};
 
    use {ConnectError as Ce, Msg::SetupMsg as S, SetupMsg as Sm};
 
    #[derive(Debug)]
 
    struct WaveState {
 
        parent: Option<usize>,
 
@@ -398,7 +398,7 @@ fn init_neighborhood(
 
        ws: &WaveState,
 
    ) -> Result<(), ConnectError> {
 
        awaiting.clear();
 
        let msg = S(LeaderWave { wave_leader: ws.leader });
 
        let msg = S(Sm::LeaderWave { wave_leader: ws.leader });
 
        for index in em.index_iter() {
 
            if Some(index) != ws.parent {
 
                em.send_to_setup(index, &msg)?;
 
@@ -445,7 +445,7 @@ fn init_neighborhood(
 
            let (recv_index, msg) = em.try_recv_any_setup(logger, deadline)?;
 
            log!(logger, "Received from index {:?} msg {:?}", &recv_index, &msg);
 
            match msg {
 
                S(LeaderAnnounce { tree_leader }) => {
 
                S(Sm::LeaderAnnounce { tree_leader }) => {
 
                    let election_result =
 
                        WaveState { leader: tree_leader, parent: Some(recv_index) };
 
                    log!(logger, "Election lost! Result {:?}", &election_result);
 
@@ -453,7 +453,7 @@ fn init_neighborhood(
 
                    assert_ne!(election_result.leader, connector_id);
 
                    break 'election election_result;
 
                }
 
                S(LeaderWave { wave_leader }) => {
 
                S(Sm::LeaderWave { wave_leader }) => {
 
                    use Ordering as O;
 
                    match wave_leader.cmp(&best_wave.leader) {
 
                        O::Less => log!(
 
@@ -504,12 +504,12 @@ fn init_neighborhood(
 
                        }
 
                    }
 
                }
 
                msg @ S(YouAreMyParent) | msg @ S(MyPortInfo(_)) => {
 
                msg @ S(Sm::YouAreMyParent) | msg @ S(Sm::MyPortInfo(_)) => {
 
                    log!(logger, "Endpont {:?} sent unexpected msg! {:?}", recv_index, &msg);
 
                    return Err(SetupAlgMisbehavior);
 
                    return Err(Ce::SetupAlgMisbehavior);
 
                }
 
                msg @ S(SessionScatter { .. })
 
                | msg @ S(SessionGather { .. })
 
                msg @ S(Sm::SessionScatter { .. })
 
                | msg @ S(Sm::SessionGather { .. })
 
                | msg @ Msg::CommMsg { .. } => {
 
                    log!(logger, "delaying msg {:?} during election algorithm", msg);
 
                    em.delayed_messages.push((recv_index, msg));
 
@@ -523,10 +523,13 @@ fn init_neighborhood(
 
    awaiting.clear();
 
    for index in em.index_iter() {
 
        if Some(index) == election_result.parent {
 
            em.send_to_setup(index, &S(YouAreMyParent))?;
 
            em.send_to_setup(index, &S(Sm::YouAreMyParent))?;
 
        } else {
 
            awaiting.insert(index);
 
            em.send_to_setup(index, &S(LeaderAnnounce { tree_leader: election_result.leader }))?;
 
            em.send_to_setup(
 
                index,
 
                &S(Sm::LeaderAnnounce { tree_leader: election_result.leader }),
 
            )?;
 
        }
 
    }
 
    let mut children = vec![];
 
@@ -536,7 +539,7 @@ fn init_neighborhood(
 
        let (recv_index, msg) = em.try_recv_any_setup(logger, deadline)?;
 
        log!(logger, "Received from index {:?} msg {:?}", &recv_index, &msg);
 
        match msg {
 
            S(LeaderAnnounce { .. }) => {
 
            S(Sm::LeaderAnnounce { .. }) => {
 
                // not a child
 
                log!(
 
                    logger,
 
@@ -545,10 +548,10 @@ fn init_neighborhood(
 
                    children.iter()
 
                );
 
                if !awaiting.remove(&recv_index) {
 
                    return Err(SetupAlgMisbehavior);
 
                    return Err(Ce::SetupAlgMisbehavior);
 
                }
 
            }
 
            S(YouAreMyParent) => {
 
            S(Sm::YouAreMyParent) => {
 
                if !awaiting.remove(&recv_index) {
 
                    log!(
 
                        logger,
 
@@ -556,15 +559,15 @@ fn init_neighborhood(
 
                        recv_index,
 
                        children.iter()
 
                    );
 
                    return Err(SetupAlgMisbehavior);
 
                    return Err(Ce::SetupAlgMisbehavior);
 
                }
 
                children.push(recv_index);
 
            }
 
            msg @ S(MyPortInfo(_)) | msg @ S(LeaderWave { .. }) => {
 
            msg @ S(Sm::MyPortInfo(_)) | msg @ S(Sm::LeaderWave { .. }) => {
 
                log!(logger, "discarding old message {:?} during election", msg);
 
            }
 
            msg @ S(SessionScatter { .. })
 
            | msg @ S(SessionGather { .. })
 
            msg @ S(Sm::SessionScatter { .. })
 
            | msg @ S(Sm::SessionGather { .. })
 
            | msg @ Msg::CommMsg { .. } => {
 
                log!(logger, "delaying msg {:?} during election", msg);
 
                em.delayed_messages.push((recv_index, msg));
 
@@ -584,7 +587,7 @@ fn session_optimize(
 
    deadline: Option<Instant>,
 
) -> Result<(), ConnectError> {
 
    ////////////////////////////////////////
 
    use {ConnectError::*, Msg::SetupMsg as S, SetupMsg::*};
 
    use {ConnectError as Ce, Msg::SetupMsg as S, SetupMsg as Sm};
 
    ////////////////////////////////////////
 
    log!(cu.logger, "Beginning session optimization");
 
    // populate session_info_map from a message per child
 
@@ -601,7 +604,7 @@ fn session_optimize(
 
            comm.endpoint_manager.try_recv_any_setup(&mut *cu.logger, deadline)?;
 
        log!(cu.logger, "Received from index {:?} msg {:?}", &recv_index, &msg);
 
        match msg {
 
            S(SessionGather { unoptimized_map: child_unoptimized_map }) => {
 
            S(Sm::SessionGather { unoptimized_map: child_unoptimized_map }) => {
 
                if !awaiting.remove(&recv_index) {
 
                    log!(
 
                        cu.logger,
 
@@ -609,24 +612,24 @@ fn session_optimize(
 
                        recv_index,
 
                        &child_unoptimized_map
 
                    );
 
                    return Err(SetupAlgMisbehavior);
 
                    return Err(Ce::SetupAlgMisbehavior);
 
                }
 
                unoptimized_map.extend(child_unoptimized_map.into_iter());
 
            }
 
            msg @ S(YouAreMyParent)
 
            | msg @ S(MyPortInfo(..))
 
            | msg @ S(LeaderAnnounce { .. })
 
            | msg @ S(LeaderWave { .. }) => {
 
            msg @ S(Sm::YouAreMyParent)
 
            | msg @ S(Sm::MyPortInfo(..))
 
            | msg @ S(Sm::LeaderAnnounce { .. })
 
            | msg @ S(Sm::LeaderWave { .. }) => {
 
                log!(cu.logger, "discarding old message {:?} during election", msg);
 
            }
 
            msg @ S(SessionScatter { .. }) => {
 
            msg @ S(Sm::SessionScatter { .. }) => {
 
                log!(
 
                    cu.logger,
 
                    "Endpoint {:?} sent unexpected scatter! {:?} I've not contributed yet!",
 
                    recv_index,
 
                    &msg
 
                );
 
                return Err(SetupAlgMisbehavior);
 
                return Err(Ce::SetupAlgMisbehavior);
 
            }
 
            msg @ Msg::CommMsg(..) => {
 
                log!(cu.logger, "delaying msg {:?} during session optimization", msg);
 
@@ -643,7 +646,7 @@ fn session_optimize(
 
        port_info: cu.port_info.clone(),
 
        proto_components: cu.proto_components.clone(),
 
        serde_proto_description: SerdeProtocolDescription(cu.proto_description.clone()),
 
        getter_for_incoming: comm
 
        endpoint_incoming_to_getter: comm
 
            .endpoint_manager
 
            .endpoint_exts
 
            .iter()
 
@@ -657,7 +660,7 @@ fn session_optimize(
 
    let optimized_map = if let Some(parent) = comm.neighborhood.parent {
 
        // ... as a message from my parent
 
        log!(cu.logger, "Forwarding gathered info to parent {:?}", parent);
 
        let msg = S(SessionGather { unoptimized_map });
 
        let msg = S(Sm::SessionGather { unoptimized_map });
 
        comm.endpoint_manager.send_to_setup(parent, &msg)?;
 
        'scatter_loop: loop {
 
            log!(
 
@@ -669,10 +672,10 @@ fn session_optimize(
 
                comm.endpoint_manager.try_recv_any_setup(&mut *cu.logger, deadline)?;
 
            log!(cu.logger, "Received from index {:?} msg {:?}", &recv_index, &msg);
 
            match msg {
 
                S(SessionScatter { optimized_map }) => {
 
                S(Sm::SessionScatter { optimized_map }) => {
 
                    if recv_index != parent {
 
                        log!(cu.logger, "I expected the scatter from my parent only!");
 
                        return Err(SetupAlgMisbehavior);
 
                        return Err(Ce::SetupAlgMisbehavior);
 
                    }
 
                    break 'scatter_loop optimized_map;
 
                }
 
@@ -680,11 +683,11 @@ fn session_optimize(
 
                    log!(cu.logger, "delaying msg {:?} during scatter recv", msg);
 
                    comm.endpoint_manager.delayed_messages.push((recv_index, msg));
 
                }
 
                msg @ S(SessionGather { .. })
 
                | msg @ S(YouAreMyParent)
 
                | msg @ S(MyPortInfo(..))
 
                | msg @ S(LeaderAnnounce { .. })
 
                | msg @ S(LeaderWave { .. }) => {
 
                msg @ S(Sm::SessionGather { .. })
 
                | msg @ S(Sm::YouAreMyParent)
 
                | msg @ S(Sm::MyPortInfo(..))
 
                | msg @ S(Sm::LeaderAnnounce { .. })
 
                | msg @ S(Sm::LeaderWave { .. }) => {
 
                    log!(cu.logger, "discarding old message {:?} during election", msg);
 
                }
 
            }
 
@@ -703,7 +706,7 @@ fn session_optimize(
 
    log!(cu.logger, "All session info dumped!: {:#?}", &optimized_map);
 
    let optimized_info =
 
        optimized_map.get(&cu.id_manager.connector_id).expect("HEY NO INFO FOR ME?").clone();
 
    let msg = S(SessionScatter { optimized_map });
 
    let msg = S(Sm::SessionScatter { optimized_map });
 
    for &child in comm.neighborhood.children.iter() {
 
        comm.endpoint_manager.send_to_setup(child, &msg)?;
 
    }
 
@@ -724,12 +727,19 @@ fn apply_optimizations(
 
    comm: &mut ConnectorCommunication,
 
    session_info: SessionInfo,
 
) -> Result<(), ConnectError> {
 
    let SessionInfo { proto_components, port_info, serde_proto_description, getter_for_incoming } =
 
        session_info;
 
    let SessionInfo {
 
        proto_components,
 
        port_info,
 
        serde_proto_description,
 
        endpoint_incoming_to_getter,
 
    } = session_info;
 
    // TODO some info which should be read-only can be mutated with the current scheme
 
    cu.port_info = port_info;
 
    cu.proto_components = proto_components;
 
    cu.proto_description = serde_proto_description.0;
 
    for (ee, getter) in comm.endpoint_manager.endpoint_exts.iter_mut().zip(getter_for_incoming) {
 
    for (ee, getter) in
 
        comm.endpoint_manager.endpoint_exts.iter_mut().zip(endpoint_incoming_to_getter)
 
    {
 
        ee.getter_for_incoming = getter;
 
    }
 
    Ok(())
0 comments (0 inline, 0 general)