Changeset - ada07cee1f99
[Not reviewed]
0 6 0
Christopher Esterhuyse - 5 years ago 2020-07-08 13:32:24
christopher.esterhuyse@gmail.com
refactored the endpoint manager to support sending and receiving of messages. moved some behavior from src/runtime/communication to src/runtime/endpoints
6 files changed with 485 insertions and 157 deletions:
0 comments (0 inline, 0 general)
src/runtime/communication.rs
Show inline comments
 
@@ -58,12 +58,20 @@ impl ReplaceBoolTrue for bool {
 
        *self = true;
 
        !was
 
    }
 
}
 

	
 
////////////////
 
impl RoundCtxTrait for RoundCtx {
 
    fn get_deadline(&self) -> &Option<Instant> {
 
        &self.deadline
 
    }
 
    fn getter_add(&mut self, getter: PortId, msg: SendPayloadMsg) {
 
        self.getter_buffer.getter_add(getter, msg)
 
    }
 
}
 
impl Connector {
 
    pub fn gotten(&mut self, port: PortId) -> Result<&Payload, GottenError> {
 
        use GottenError as Ge;
 
        let Self { phased, .. } = self;
 
        match phased {
 
            ConnectorPhased::Setup { .. } => Err(Ge::NoPreviousRound),
 
@@ -298,26 +306,31 @@ impl Connector {
 
                // thanks to the native_branch_spec_var, each batch has a distinct predicate
 
                unreachable!()
 
            }
 
        }
 
        // restore the invariant: !native_batches.is_empty()
 
        comm.native_batches.push(Default::default());
 

	
 
        comm.endpoint_manager.udp_endpoints_round_start(&mut *cu.logger, &mut rctx.spec_var_stream);
 
        // Call to another big method; keep running this round until a distributed decision is reached
 
        let decision = Self::sync_reach_decision(
 
            cu,
 
            comm,
 
            &mut branching_native,
 
            &mut branching_proto_components,
 
            &mut rctx,
 
        )?;
 
        log!(cu.logger, "Committing to decision {:?}!", &decision);
 
        comm.endpoint_manager.udp_endpoints_round_end(&mut *cu.logger, &decision)?;
 

	
 
        // propagate the decision to children
 
        let msg = Msg::CommMsg(CommMsg {
 
            round_index: comm.round_index,
 
            contents: CommMsgContents::Announce { decision: decision.clone() },
 
            contents: CommMsgContents::CommCtrl(CommCtrlMsg::Announce {
 
                decision: decision.clone(),
 
            }),
 
        });
 
        log!(
 
            cu.logger,
 
            "Announcing decision {:?} through child endpoints {:?}",
 
            &msg,
 
            &comm.neighborhood.children
 
@@ -501,13 +514,15 @@ impl Connector {
 
                match comm.neighborhood.parent {
 
                    Some(parent) => {
 
                        log!(cu.logger, "Forwarding to my parent {:?}", parent);
 
                        let suggestion = Decision::Success(solution);
 
                        let msg = Msg::CommMsg(CommMsg {
 
                            round_index: comm.round_index,
 
                            contents: CommMsgContents::Suggest { suggestion },
 
                            contents: CommMsgContents::CommCtrl(CommCtrlMsg::Suggest {
 
                                suggestion,
 
                            }),
 
                        });
 
                        comm.endpoint_manager.send_to_comms(parent, &msg)?;
 
                    }
 
                    None => {
 
                        log!(cu.logger, "No parent. Deciding on solution {:?}", &solution);
 
                        return Ok(Decision::Success(solution));
 
@@ -516,79 +531,44 @@ impl Connector {
 
            }
 

	
 
            // stuck! make progress by receiving a msg
 
            // try recv messages arriving through endpoints
 
            log!(cu.logger, "No decision yet. Let's recv an endpoint msg...");
 
            {
 
                let (endpoint_index, msg) = loop {
 
                    match comm
 
                let (endpoint_index, comm_ctrl_msg): (usize, CommCtrlMsg) = match comm
 
                    .endpoint_manager
 
                        .try_recv_any_comms(&mut *cu.logger, rctx.deadline)?
 
                    .try_recv_any_comms(&mut *cu.logger, &cu.port_info, rctx, comm.round_index)?
 
                {
 
                        None => {
 
                    CommRecvOk::NewControlMsg { net_endpoint_index, msg } => {
 
                        (net_endpoint_index, msg)
 
                    }
 
                    CommRecvOk::NewPayloadMsgs => continue 'undecided,
 
                    CommRecvOk::TimeoutWithoutNew => {
 
                        log!(cu.logger, "Reached user-defined deadling without decision...");
 
                        if let Some(parent) = comm.neighborhood.parent {
 
                            if already_requested_failure.replace_with_true() {
 
                                Self::request_failure(cu, comm, parent)?
 
                            } else {
 
                                log!(cu.logger, "Already requested failure");
 
                            }
 
                        } else {
 
                            log!(cu.logger, "As the leader, deciding on timeout");
 
                            return Ok(Decision::Failure);
 
                        }
 
                        rctx.deadline = None;
 
                        }
 
                        Some((endpoint_index, msg)) => break (endpoint_index, msg),
 
                    }
 
                };
 
                log!(cu.logger, "Received from endpoint {} msg {:?}", endpoint_index, &msg);
 
                let comm_msg_contents = match msg {
 
                    Msg::SetupMsg(..) => {
 
                        log!(cu.logger, "Discarding setup message; that phase is over");
 
                        continue 'undecided;
 
                    }
 
                    Msg::CommMsg(comm_msg) => match comm_msg.round_index.cmp(&comm.round_index) {
 
                        Ordering::Equal => comm_msg.contents,
 
                        Ordering::Less => {
 
                            log!(
 
                                cu.logger,
 
                                "We are in round {}, but msg is for round {}. Discard",
 
                                comm_msg.round_index,
 
                                comm.round_index,
 
                            );
 
                            drop(comm_msg);
 
                            continue 'undecided;
 
                        }
 
                        Ordering::Greater => {
 
                            log!(
 
                                cu.logger,
 
                                "We are in round {}, but msg is for round {}. Buffer",
 
                                comm_msg.round_index,
 
                                comm.round_index,
 
                            );
 
                            comm.endpoint_manager
 
                                .delayed_messages
 
                                .push((endpoint_index, Msg::CommMsg(comm_msg)));
 
                            continue 'undecided;
 
                        }
 
                    },
 
                };
 
                match comm_msg_contents {
 
                    CommMsgContents::SendPayload(send_payload_msg) => {
 
                        let getter = comm.endpoint_manager.net_endpoint_exts[endpoint_index]
 
                            .getter_for_incoming;
 
                        assert!(cu.port_info.polarities.get(&getter) == Some(&Getter));
 
                log!(
 
                    cu.logger,
 
                            "Msg routed to getter port {:?}. Buffer for recv loop",
 
                            getter,
 
                    "Received from endpoint {} ctrl msg  {:?}",
 
                    endpoint_index,
 
                    &comm_ctrl_msg
 
                );
 
                        rctx.getter_buffer.getter_add(getter, send_payload_msg);
 
                    }
 
                    CommMsgContents::Suggest { suggestion } => {
 
                match comm_ctrl_msg {
 
                    CommCtrlMsg::Suggest { suggestion } => {
 
                        // only accept this control msg through a child endpoint
 
                        if comm.neighborhood.children.contains(&endpoint_index) {
 
                            match suggestion {
 
                                Decision::Success(predicate) => {
 
                                    // child solution contributes to local solution
 
                                    log!(cu.logger, "Child provided solution {:?}", &predicate);
 
@@ -623,13 +603,13 @@ impl Connector {
 
                                "Discarding suggestion {:?} from non-child endpoint idx {:?}",
 
                                &suggestion,
 
                                endpoint_index
 
                            );
 
                        }
 
                    }
 
                    CommMsgContents::Announce { decision } => {
 
                    CommCtrlMsg::Announce { decision } => {
 
                        if Some(endpoint_index) == comm.neighborhood.parent {
 
                            // adopt this decision
 
                            return Ok(decision);
 
                        } else {
 
                            log!(
 
                                cu.logger,
 
@@ -650,13 +630,13 @@ impl Connector {
 
        parent: usize,
 
    ) -> Result<(), UnrecoverableSyncError> {
 
        log!(cu.logger, "Forwarding to my parent {:?}", parent);
 
        let suggestion = Decision::Failure;
 
        let msg = Msg::CommMsg(CommMsg {
 
            round_index: comm.round_index,
 
            contents: CommMsgContents::Suggest { suggestion },
 
            contents: CommMsgContents::CommCtrl(CommCtrlMsg::Suggest { suggestion }),
 
        });
 
        comm.endpoint_manager.send_to_comms(parent, &msg)
 
    }
 
}
 
impl BranchingNative {
 
    fn feed_msg(
 
@@ -755,13 +735,13 @@ impl BranchingNative {
 
            logger,
 
            "Collapsing native with {} branch preds {:?}",
 
            self.branches.len(),
 
            self.branches.keys()
 
        );
 
        for (branch_predicate, branch) in self.branches {
 
            if branch.to_get.is_empty() && solution_predicate.consistent_with(&branch_predicate) {
 
            if branch.to_get.is_empty() && branch_predicate.assigns_subset(solution_predicate) {
 
                let NativeBranch { index, gotten, .. } = branch;
 
                log!(logger, "Collapsed native has gotten {:?}", &gotten);
 
                return RoundOk { batch_index: index, gotten };
 
            }
 
        }
 
        panic!("Native had no branches matching pred {:?}", solution_predicate);
 
@@ -934,13 +914,13 @@ impl BranchingProtoComponent {
 
        log!(cu.logger, "component settles down with branches: {:?}", branches.keys());
 
        Ok(())
 
    }
 
    fn collapse_with(self, solution_predicate: &Predicate) -> ProtoComponent {
 
        let BranchingProtoComponent { ports, branches } = self;
 
        for (branch_predicate, branch) in branches {
 
            if branch.ended && solution_predicate.consistent_with(&branch_predicate) {
 
            if branch.ended && branch_predicate.assigns_subset(solution_predicate) {
 
                let ProtoComponentBranch { state, .. } = branch;
 
                return ProtoComponent { state, ports };
 
            }
 
        }
 
        panic!("ProtoComponent had no branches matching pred {:?}", solution_predicate);
 
    }
src/runtime/endpoints.rs
Show inline comments
 
use super::*;
 

	
 
struct MonitoredReader<R: Read> {
 
    bytes: usize,
 
    r: R,
 
}
 
#[derive(Debug)]
 
enum TryRecyAnyError {
 
    Timeout,
 
enum PollAndPopulateError {
 
    PollFailed,
 
    EndpointError { error: EndpointError, index: usize },
 
    Timeout,
 
}
 
struct TryRecvAnyNetError {
 
    error: NetEndpointError,
 
    index: usize,
 
}
 
/////////////////////
 
impl NetEndpoint {
 
    fn bincode_opts() -> impl bincode::config::Options {
 
        bincode::config::DefaultOptions::default()
 
    }
 
    pub(super) fn try_recv<T: serde::de::DeserializeOwned>(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
    ) -> Result<Option<T>, EndpointError> {
 
        use EndpointError as Ee;
 
    ) -> Result<Option<T>, NetEndpointError> {
 
        use NetEndpointError as Nee;
 
        // populate inbox as much as possible
 
        let before_len = self.inbox.len();
 
        'read_loop: loop {
 
            let res = self.stream.read_to_end(&mut self.inbox);
 
            match res {
 
                Err(e) if would_block(&e) => break 'read_loop,
 
                Ok(0) => break 'read_loop,
 
                Ok(_) => (),
 
                Err(_e) => return Err(Ee::BrokenEndpoint),
 
                Err(_e) => return Err(Nee::BrokenNetEndpoint),
 
            }
 
        }
 
        endptlog!(
 
            logger,
 
            "Inbox bytes [{:x?}| {:x?}]",
 
            DenseDebugHex(&self.inbox[..before_len]),
 
@@ -54,131 +56,365 @@ impl NetEndpoint {
 
                Ok(Some(msg))
 
            }
 
            Err(e) => match *e {
 
                bincode::ErrorKind::Io(k) if k.kind() == std::io::ErrorKind::UnexpectedEof => {
 
                    Ok(None)
 
                }
 
                _ => Err(Ee::MalformedMessage),
 
                _ => Err(Nee::MalformedMessage),
 
            },
 
        }
 
    }
 
    pub(super) fn send<T: serde::ser::Serialize>(&mut self, msg: &T) -> Result<(), EndpointError> {
 
    pub(super) fn send<T: serde::ser::Serialize>(
 
        &mut self,
 
        msg: &T,
 
    ) -> Result<(), NetEndpointError> {
 
        use bincode::config::Options;
 
        use EndpointError as Ee;
 
        Self::bincode_opts().serialize_into(&mut self.stream, msg).map_err(|_| Ee::BrokenEndpoint)
 
        use NetEndpointError as Nee;
 
        Self::bincode_opts()
 
            .serialize_into(&mut self.stream, msg)
 
            .map_err(|_| Nee::BrokenNetEndpoint)
 
    }
 
}
 

	
 
impl EndpointManager {
 
    pub(super) fn index_iter(&self) -> Range<usize> {
 
        0..self.num_net_endpoints()
 
    }
 
    pub(super) fn num_net_endpoints(&self) -> usize {
 
        self.net_endpoint_exts.len()
 
        self.net_endpoint_store.endpoint_exts.len()
 
    }
 
    pub(super) fn send_to_comms(
 
        &mut self,
 
        index: usize,
 
        msg: &Msg,
 
    ) -> Result<(), UnrecoverableSyncError> {
 
        use UnrecoverableSyncError as Use;
 
        let net_endpoint = &mut self.net_endpoint_exts[index].net_endpoint;
 
        net_endpoint.send(msg).map_err(|_| Use::BrokenEndpoint(index))
 
        let net_endpoint = &mut self.net_endpoint_store.endpoint_exts[index].net_endpoint;
 
        net_endpoint.send(msg).map_err(|_| Use::BrokenNetEndpoint { index })
 
    }
 
    pub(super) fn send_to_setup(&mut self, index: usize, msg: &Msg) -> Result<(), ConnectError> {
 
        let net_endpoint = &mut self.net_endpoint_exts[index].net_endpoint;
 
        let net_endpoint = &mut self.net_endpoint_store.endpoint_exts[index].net_endpoint;
 
        net_endpoint.send(msg).map_err(|err| {
 
            ConnectError::EndpointSetupError(net_endpoint.stream.local_addr().unwrap(), err)
 
            ConnectError::NetEndpointSetupError(net_endpoint.stream.local_addr().unwrap(), err)
 
        })
 
    }
 
    pub(super) fn try_recv_any_comms(
 

	
 
    /// Receive the first message of any kind at all.
 
    /// Why not return SetupMsg? Because often this message will be forwarded to several others,
 
    /// and by returning a Msg, it can be serialized in-place (NetEndpoints allow the sending of Msg types!)
 
    pub(super) fn try_recv_any_setup(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        deadline: Option<Instant>,
 
    ) -> Result<Option<(usize, Msg)>, UnrecoverableSyncError> {
 
        use {TryRecyAnyError as Trae, UnrecoverableSyncError as Use};
 
        match self.try_recv_any(logger, deadline) {
 
            Ok(tup) => Ok(Some(tup)),
 
            Err(Trae::Timeout) => Ok(None),
 
            Err(Trae::PollFailed) => Err(Use::PollFailed),
 
            Err(Trae::EndpointError { error: _, index }) => Err(Use::BrokenEndpoint(index)),
 
        deadline: &Option<Instant>,
 
    ) -> Result<(usize, Msg), ConnectError> {
 
        ///////////////////////////////////////////
 
        fn map_trane(
 
            trane: TryRecvAnyNetError,
 
            net_endpoint_store: &EndpointStore<NetEndpointExt>,
 
        ) -> ConnectError {
 
            ConnectError::NetEndpointSetupError(
 
                net_endpoint_store.endpoint_exts[trane.index]
 
                    .net_endpoint
 
                    .stream
 
                    .local_addr()
 
                    .unwrap(), // stream must already be connected
 
                trane.error,
 
            )
 
        }
 
        ///////////////////////////////////////////
 
        // try yield undelayed net message
 
        if let Some(tup) = self.undelayed_messages.pop() {
 
            endptlog!(logger, "RECV undelayed_msg {:?}", &tup);
 
            return Ok(tup);
 
        }
 
        loop {
 
            // try recv from some polled undrained NET endpoint
 
            if let Some(tup) = self
 
                .try_recv_undrained_net(logger)
 
                .map_err(|trane| map_trane(trane, &self.net_endpoint_store))?
 
            {
 
                return Ok(tup);
 
            }
 
    pub(super) fn try_recv_any_setup(
 
            // poll if time remains
 
            self.poll_and_polulate(logger, deadline)?;
 
        }
 
    }
 

	
 
    // drops all Setup messages,
 
    // buffers all future round messages,
 
    // drops all previous round messages,
 
    // enqueues all current round SendPayload messages using round_ctx.getter_add
 
    // returns the first comm_ctrl_msg encountered
 
    // only polls until SOME message is enqueued
 
    pub(super) fn try_recv_any_comms(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        deadline: Option<Instant>,
 
    ) -> Result<(usize, Msg), ConnectError> {
 
        use {ConnectError as Ce, TryRecyAnyError as Trae};
 
        self.try_recv_any(logger, deadline).map_err(|err| match err {
 
            Trae::Timeout => Ce::Timeout,
 
            Trae::PollFailed => Ce::PollFailed,
 
            Trae::EndpointError { error, index } => Ce::EndpointSetupError(
 
                self.net_endpoint_exts[index].net_endpoint.stream.local_addr().unwrap(),
 
                error,
 
            ),
 
        })
 
    }
 
    fn try_recv_any(
 
        port_info: &PortInfo,
 
        round_ctx: &mut impl RoundCtxTrait,
 
        round_index: usize,
 
    ) -> Result<CommRecvOk, UnrecoverableSyncError> {
 
        ///////////////////////////////////////////
 
        impl EndpointManager {
 
            fn handle_msg(
 
                &mut self,
 
                logger: &mut dyn Logger,
 
        deadline: Option<Instant>,
 
    ) -> Result<(usize, Msg), TryRecyAnyError> {
 
        use TryRecyAnyError as Trea;
 
        // 1. try messages already buffered
 
        if let Some(x) = self.undelayed_messages.pop() {
 
            endptlog!(logger, "RECV undelayed_msg {:?}", &x);
 
            return Ok(x);
 
                round_ctx: &mut impl RoundCtxTrait,
 
                net_endpoint_index: usize,
 
                msg: Msg,
 
                round_index: usize,
 
                some_message_enqueued: &mut bool,
 
            ) -> Option<(usize, CommCtrlMsg)> {
 
                let comm_msg_contents = match msg {
 
                    Msg::SetupMsg(..) => return None,
 
                    Msg::CommMsg(comm_msg) => match comm_msg.round_index.cmp(&round_index) {
 
                        Ordering::Equal => comm_msg.contents,
 
                        Ordering::Less => {
 
                            log!(
 
                                logger,
 
                                "We are in round {}, but msg is for round {}. Discard",
 
                                comm_msg.round_index,
 
                                round_index,
 
                            );
 
                            return None;
 
                        }
 
                        Ordering::Greater => {
 
                            log!(
 
                                logger,
 
                                "We are in round {}, but msg is for round {}. Buffer",
 
                                comm_msg.round_index,
 
                                round_index,
 
                            );
 
                            self.delayed_messages
 
                                .push((net_endpoint_index, Msg::CommMsg(comm_msg)));
 
                            return None;
 
                        }
 
                    },
 
                };
 
                match comm_msg_contents {
 
                    CommMsgContents::CommCtrl(comm_ctrl_msg) => {
 
                        Some((net_endpoint_index, comm_ctrl_msg))
 
                    }
 
                    CommMsgContents::SendPayload(send_payload_msg) => {
 
                        let getter = self.net_endpoint_store.endpoint_exts[net_endpoint_index]
 
                            .getter_for_incoming;
 
                        round_ctx.getter_add(getter, send_payload_msg);
 
                        *some_message_enqueued = true;
 
                        None
 
                    }
 
                }
 
            }
 
        }
 
        use {PollAndPopulateError as Pape, UnrecoverableSyncError as Use};
 
        ///////////////////////////////////////////
 
        let mut some_message_enqueued = false;
 
        // try yield undelayed net message
 
        while let Some((net_endpoint_index, msg)) = self.undelayed_messages.pop() {
 
            if let Some((net_endpoint_index, msg)) = self.handle_msg(
 
                logger,
 
                round_ctx,
 
                net_endpoint_index,
 
                msg,
 
                round_index,
 
                &mut some_message_enqueued,
 
            ) {
 
                return Ok(CommRecvOk::NewControlMsg { net_endpoint_index, msg });
 
            }
 
        }
 
        loop {
 
            // 2. try read a message from an endpoint that raised an event with poll() but wasn't drained
 
            while let Some(index) = self.polled_undrained.pop() {
 
                let net_endpoint = &mut self.net_endpoint_exts[index].net_endpoint;
 
            // try receive a net message
 
            while let Some((net_endpoint_index, msg)) = self.try_recv_undrained_net(logger)? {
 
                if let Some((net_endpoint_index, msg)) = self.handle_msg(
 
                    logger,
 
                    round_ctx,
 
                    net_endpoint_index,
 
                    msg,
 
                    round_index,
 
                    &mut some_message_enqueued,
 
                ) {
 
                    return Ok(CommRecvOk::NewControlMsg { net_endpoint_index, msg });
 
                }
 
            }
 
            // try receive a udp message
 
            let recv_buffer = self.udp_in_buffer.as_mut_slice();
 
            while let Some(index) = self.udp_endpoint_store.polled_undrained.pop() {
 
                let ee = &mut self.udp_endpoint_store.endpoint_exts[index];
 
                if let Some(bytes_written) = ee.sock.recv(recv_buffer).ok() {
 
                    // I received a payload!
 
                    self.udp_endpoint_store.polled_undrained.insert(index);
 
                    let payload = Payload::from(&recv_buffer[..bytes_written]);
 
                    let [branch_spec_var, port_spec_var] = [
 
                        ee.incoming_round_spec_var.unwrap(), // should not be NONE
 
                        port_info.spec_var_for(ee.getter_for_incoming),
 
                    ];
 
                    let branch_spec_val = SpecVal::nth_domain_element(ee.incoming_payloads.len());
 
                    ee.incoming_payloads.push(payload.clone());
 
                    let predicate = Predicate::default()
 
                        .inserted(branch_spec_var, branch_spec_val)
 
                        .inserted(port_spec_var, SpecVal::FIRING);
 
                    round_ctx
 
                        .getter_add(ee.getter_for_incoming, SendPayloadMsg { payload, predicate });
 
                    some_message_enqueued = true;
 
                }
 
            }
 
            if some_message_enqueued {
 
                return Ok(CommRecvOk::NewPayloadMsgs);
 
            }
 
            // poll if time remains
 
            match self.poll_and_polulate(logger, round_ctx.get_deadline()) {
 
                Ok(()) => {} // continue looping
 
                Err(Pape::Timeout) => return Ok(CommRecvOk::TimeoutWithoutNew),
 
                Err(Pape::PollFailed) => return Err(Use::PollFailed),
 
            }
 
        }
 
    }
 
    fn try_recv_undrained_net(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
    ) -> Result<Option<(usize, Msg)>, TryRecvAnyNetError> {
 
        while let Some(index) = self.net_endpoint_store.polled_undrained.pop() {
 
            let net_endpoint = &mut self.net_endpoint_store.endpoint_exts[index].net_endpoint;
 
            if let Some(msg) = net_endpoint
 
                .try_recv(logger)
 
                    .map_err(|error| Trea::EndpointError { error, index })?
 
                .map_err(|error| TryRecvAnyNetError { error, index })?
 
            {
 
                endptlog!(logger, "RECV polled_undrained {:?}", &msg);
 
                if !net_endpoint.inbox.is_empty() {
 
                    // there may be another message waiting!
 
                        self.polled_undrained.insert(index);
 
                    self.net_endpoint_store.polled_undrained.insert(index);
 
                }
 
                    return Ok((index, msg));
 
                return Ok(Some((index, msg)));
 
            }
 
        }
 
            // 3. No message yet. Do we have enough time to poll?
 
        Ok(None)
 
    }
 
    fn poll_and_polulate(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        deadline: &Option<Instant>,
 
    ) -> Result<(), PollAndPopulateError> {
 
        use PollAndPopulateError as Pape;
 
        // No message yet. Do we have enough time to poll?
 
        let remaining = if let Some(deadline) = deadline {
 
                Some(deadline.checked_duration_since(Instant::now()).ok_or(Trea::Timeout)?)
 
            Some(deadline.checked_duration_since(Instant::now()).ok_or(Pape::Timeout)?)
 
        } else {
 
            None
 
        };
 
            self.poll.poll(&mut self.events, remaining).map_err(|_| Trea::PollFailed)?;
 
        // Yes we do! Poll with remaining time as poll deadline
 
        self.poll.poll(&mut self.events, remaining).map_err(|_| Pape::PollFailed)?;
 
        for event in self.events.iter() {
 
                let Token(index) = event.token();
 
                self.polled_undrained.insert(index);
 
            match TokenTarget::from(event.token()) {
 
                TokenTarget::Waker => {
 
                    // Can ignore. Residual event from endpoint manager setup procedure
 
                }
 
                TokenTarget::NetEndpoint { index } => {
 
                    self.net_endpoint_store.polled_undrained.insert(index);
 
                    endptlog!(
 
                        logger,
 
                    "RECV poll event {:?} for endpoint index {:?}. undrained: {:?}",
 
                        "RECV poll event {:?} for NET endpoint index {:?}. undrained: {:?}",
 
                        &event,
 
                        index,
 
                    self.polled_undrained.iter()
 
                        self.net_endpoint_store.polled_undrained.iter()
 
                    );
 
                }
 
                TokenTarget::UdpEndpoint { index } => {
 
                    self.udp_endpoint_store.polled_undrained.insert(index);
 
                    endptlog!(
 
                        logger,
 
                        "RECV poll event {:?} for UDP endpoint index {:?}. undrained: {:?}",
 
                        &event,
 
                        index,
 
                        self.udp_endpoint_store.polled_undrained.iter()
 
                    );
 
                }
 
            self.events.clear();
 
            }
 
        }
 
        self.events.clear();
 
        Ok(())
 
    }
 
    pub(super) fn undelay_all(&mut self) {
 
        if self.undelayed_messages.is_empty() {
 
            // fast path
 
            std::mem::swap(&mut self.delayed_messages, &mut self.undelayed_messages);
 
            return;
 
        }
 
        // slow path
 
        self.undelayed_messages.extend(self.delayed_messages.drain(..));
 
    }
 
    pub(super) fn udp_endpoints_round_start(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        spec_var_stream: &mut SpecVarStream,
 
    ) {
 
        log!(
 
            logger,
 
            "Starting round for {} udp endpoints",
 
            self.udp_endpoint_store.endpoint_exts.len()
 
        );
 
        for (index, ee) in self.udp_endpoint_store.endpoint_exts.iter_mut().enumerate() {
 
            let spec_var = spec_var_stream.next();
 
            log!(logger, "Udp endpoint given {} spec var {:?} for this round", index, spec_var);
 
            ee.incoming_round_spec_var = Some(spec_var);
 
        }
 
    }
 
    pub(super) fn udp_endpoints_round_end(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        decision: &Decision,
 
    ) -> Result<(), UnrecoverableSyncError> {
 
        log!(
 
            logger,
 
            "Ending round for {} udp endpoints",
 
            self.udp_endpoint_store.endpoint_exts.len()
 
        );
 
        use UnrecoverableSyncError as Use;
 
        if let Decision::Success(solution_predicate) = decision {
 
            'endpoint_loop: for (index, ee) in
 
                self.udp_endpoint_store.endpoint_exts.iter_mut().enumerate()
 
            {
 
                ee.incoming_round_spec_var = None; // shouldn't be accessed before its overwritten next round; still adding for clarity.
 
                for (payload_predicate, payload) in ee.outgoing_payloads.drain() {
 
                    if payload_predicate.assigns_subset(solution_predicate) {
 
                        ee.sock
 
                            .send(payload.as_slice())
 
                            .map_err(|_| Use::BrokenNetEndpoint { index })?;
 
                        log!(
 
                            logger,
 
                            "Sent payload {:?} with pred {:?} through Udp endpoint {}",
 
                            &payload,
 
                            &payload_predicate,
 
                            index
 
                        );
 
                        continue 'endpoint_loop; // send at most one payload per endpoint per round
 
                    }
 
                }
 
                log!(logger, "Sent no message through Udp endpoint {}", index);
 
            }
 
        }
 
        Ok(())
 
    }
 
}
 
// impl UdpEndpointExt {
 
//     fn try_recv(
 
//         &mut self,
 
//         port_info: &PortInfo,
 
//         udp_in_buffer: &mut UdpInBuffer,
 
//     ) -> Option<SendPayloadMsg> {
 
//         let recv_buffer = udp_in_buffer.as_mut_slice();
 
//         let len = self.sock.recv(recv_buffer).ok()?;
 
//         let payload = Payload::from(&recv_buffer[..len]);
 
//         let branch_spec_var = self
 
//             .incoming_round_spec_var
 
//             .expect("Udp spec var should be Some(..) if recv() is called");
 
//         let branch_spec_val = SpecVal::nth_domain_element(self.incoming_payloads.len());
 
//         self.incoming_payloads.push(payload.clone());
 
//         let predicate = Predicate::default()
 
//             .inserted(branch_spec_var, branch_spec_val)
 
//             .inserted(port_info.spec_var_for(self.getter_for_incoming), SpecVal::FIRING);
 
//         Some(SendPayloadMsg { payload, predicate })
 
//     }
 
// }
 
impl Debug for NetEndpoint {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        f.debug_struct("Endpoint").field("inbox", &self.inbox).finish()
 
    }
 
}
 
impl<R: Read> From<R> for MonitoredReader<R> {
 
@@ -200,6 +436,21 @@ impl<R: Read> Read for MonitoredReader<R> {
 
}
 
impl Into<Msg> for SetupMsg {
 
    fn into(self) -> Msg {
 
        Msg::SetupMsg(self)
 
    }
 
}
 
impl From<PollAndPopulateError> for ConnectError {
 
    fn from(pape: PollAndPopulateError) -> ConnectError {
 
        use {ConnectError as Ce, PollAndPopulateError as Pape};
 
        match pape {
 
            Pape::PollFailed => Ce::PollFailed,
 
            Pape::Timeout => Ce::Timeout,
 
        }
 
    }
 
}
 
impl From<TryRecvAnyNetError> for UnrecoverableSyncError {
 
    fn from(trane: TryRecvAnyNetError) -> UnrecoverableSyncError {
 
        let TryRecvAnyNetError { index, .. } = trane;
 
        UnrecoverableSyncError::BrokenNetEndpoint { index }
 
    }
 
}
src/runtime/error.rs
Show inline comments
 
@@ -6,13 +6,13 @@ pub enum ConnectError {
 
    PollInitFailed,
 
    Timeout,
 
    PollFailed,
 
    AcceptFailed(SocketAddr),
 
    AlreadyConnected,
 
    PortPeerPolarityMismatch(PortId),
 
    EndpointSetupError(SocketAddr, EndpointError),
 
    NetEndpointSetupError(SocketAddr, NetEndpointError),
 
    SetupAlgMisbehavior,
 
}
 
#[derive(Eq, PartialEq, Copy, Clone, Debug)]
 
pub enum AddComponentError {
 
    NoSuchComponent,
 
    NonPortTypeParameters,
 
@@ -23,13 +23,14 @@ pub enum AddComponentError {
 
    DuplicateMovedPort(PortId),
 
}
 
////////////////////////
 
#[derive(Debug, Clone)]
 
pub enum UnrecoverableSyncError {
 
    PollFailed,
 
    BrokenEndpoint(usize),
 
    BrokenNetEndpoint { index: usize },
 
    BrokenUdpEndpoint { index: usize },
 
    MalformedStateError(MalformedStateError),
 
}
 
#[derive(Debug, Clone)]
 
pub enum SyncError {
 
    NotConnected,
 
    InconsistentProtoComponent(ProtoComponentId),
 
@@ -39,15 +40,15 @@ pub enum SyncError {
 
#[derive(Debug, Clone)]
 
pub enum MalformedStateError {
 
    PortCannotPut(PortId),
 
    GetterUnknownFor { putter: PortId },
 
}
 
#[derive(Debug, Clone)]
 
pub enum EndpointError {
 
pub enum NetEndpointError {
 
    MalformedMessage,
 
    BrokenEndpoint,
 
    BrokenNetEndpoint,
 
}
 
#[derive(Debug)]
 
pub enum PortOpError {
 
    WrongPolarity,
 
    UnknownPolarity,
 
    NotConnected,
src/runtime/mod.rs
Show inline comments
 
@@ -120,12 +120,16 @@ struct CommMsg {
 
    round_index: usize,
 
    contents: CommMsgContents,
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
enum CommMsgContents {
 
    SendPayload(SendPayloadMsg),
 
    CommCtrl(CommCtrlMsg),
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
enum CommCtrlMsg {
 
    Suggest { suggestion: Decision }, // SINKWARD
 
    Announce { decision: Decision },  // SINKAWAYS
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
struct SendPayloadMsg {
 
    predicate: Predicate,
 
@@ -173,33 +177,45 @@ struct Neighborhood {
 
struct IdManager {
 
    connector_id: ConnectorId,
 
    port_suffix_stream: U32Stream,
 
    proto_component_suffix_stream: U32Stream,
 
}
 
#[derive(Debug)]
 
struct UdpInBuffer {
 
    byte_vec: Vec<u8>,
 
}
 
#[derive(Debug)]
 
struct SpecVarStream {
 
    connector_id: ConnectorId,
 
    port_suffix_stream: U32Stream,
 
}
 
#[derive(Debug)]
 
struct EndpointManager {
 
    // invariants:
 
    // 1. endpoint N is registered READ | WRITE with poller
 
    // 2. Events is empty
 
    poll: Poll,
 
    events: Events,
 
    polled_undrained: VecSet<usize>,
 
    delayed_messages: Vec<(usize, Msg)>,
 
    undelayed_messages: Vec<(usize, Msg)>,
 
    net_endpoint_exts: Vec<NetEndpointExt>,
 
    net_endpoint_store: EndpointStore<NetEndpointExt>,
 
    udp_endpoint_store: EndpointStore<UdpEndpointExt>,
 
    udp_in_buffer: UdpInBuffer,
 
}
 
#[derive(Debug)]
 
struct EndpointStore<T> {
 
    endpoint_exts: Vec<T>,
 
    polled_undrained: VecSet<usize>,
 
}
 
#[derive(Debug)]
 
struct UdpEndpointExt {
 
    sock: UdpSocket,
 
    sock: UdpSocket, // already bound and connected
 
    outgoing_payloads: HashMap<Predicate, Payload>,
 
    incoming_round_spec_var: Option<SpecVar>,
 
    getter_for_incoming: PortId,
 
    outgoing_buffer: HashMap<Predicate, Payload>,
 
    incoming_buffer: Vec<Payload>,
 
    incoming_payloads: Vec<Payload>,
 
}
 
#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
 
struct PortInfo {
 
    polarities: HashMap<PortId, Polarity>,
 
    peers: HashMap<PortId, PortId>,
 
    routes: HashMap<PortId, Route>,
 
@@ -244,36 +260,45 @@ struct NativeBatch {
 
}
 
enum TokenTarget {
 
    NetEndpoint { index: usize },
 
    UdpEndpoint { index: usize },
 
    Waker,
 
}
 
trait RoundCtxTrait {
 
    fn get_deadline(&self) -> &Option<Instant>;
 
    fn getter_add(&mut self, getter: PortId, msg: SendPayloadMsg);
 
}
 
enum CommRecvOk {
 
    TimeoutWithoutNew,
 
    NewPayloadMsgs,
 
    NewControlMsg { net_endpoint_index: usize, msg: CommCtrlMsg },
 
}
 
////////////////
 
fn would_block(err: &std::io::Error) -> bool {
 
    err.kind() == std::io::ErrorKind::WouldBlock
 
}
 
impl TokenTarget {
 
    const HALFWAY_INDEX: usize = usize::MAX / 2;
 
    const MAX_INDEX: usize = usize::MAX;
 
    const WAKER_TOKEN: usize = Self::MAX_INDEX;
 
}
 
impl From<Token> for TokenTarget {
 
    fn from(Token(index): Token) -> Self {
 
        if index == Self::MAX_INDEX {
 
        if index == Self::WAKER_TOKEN {
 
            TokenTarget::Waker
 
        } else if let Some(shifted) = index.checked_sub(Self::HALFWAY_INDEX) {
 
            TokenTarget::UdpEndpoint { index: shifted }
 
        } else {
 
            TokenTarget::NetEndpoint { index }
 
        }
 
    }
 
}
 
impl Into<Token> for TokenTarget {
 
    fn into(self) -> Token {
 
        match self {
 
            TokenTarget::Waker => Token(Self::MAX_INDEX),
 
            TokenTarget::Waker => Token(Self::WAKER_TOKEN),
 
            TokenTarget::UdpEndpoint { index } => Token(index + Self::HALFWAY_INDEX),
 
            TokenTarget::NetEndpoint { index } => Token(index),
 
        }
 
    }
 
}
 
impl<T: std::cmp::Ord> VecSet<T> {
 
@@ -423,12 +448,23 @@ impl Connector {
 
impl Predicate {
 
    #[inline]
 
    pub fn inserted(mut self, k: SpecVar, v: SpecVal) -> Self {
 
        self.assigned.insert(k, v);
 
        self
 
    }
 

	
 
    pub fn assigns_subset(&self, maybe_superset: &Self) -> bool {
 
        for (var, val) in self.assigned.iter() {
 
            match maybe_superset.assigned.get(var) {
 
                Some(val2) if val2 == val => {}
 
                _ => return false,
 
            }
 
        }
 
        true
 
    }
 

	
 
    // returns true IFF self.unify would return Equivalent OR FormerNotLatter
 
    pub fn consistent_with(&self, other: &Self) -> bool {
 
        let [larger, smaller] =
 
            if self.assigned.len() > other.assigned.len() { [self, other] } else { [other, self] };
 

	
 
        for (var, val) in smaller.assigned.iter() {
 
@@ -554,15 +590,35 @@ impl SpecVal {
 
    const FIRING: Self = SpecVal(1);
 
    const SILENT: Self = SpecVal(0);
 
    fn is_firing(self) -> bool {
 
        self == Self::FIRING
 
        // all else treated as SILENT
 
    }
 
    fn nth_domain_element(n: usize) -> Self {
 
        let n: u16 = n.try_into().unwrap();
 
        SpecVal(n)
 
    }
 
    fn iter_domain() -> impl Iterator<Item = Self> {
 
        (0..).map(SpecVal)
 
    }
 
}
 
impl Debug for SpecVal {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        self.0.fmt(f)
 
    }
 
}
 
impl Default for UdpInBuffer {
 
    fn default() -> Self {
 
        let mut byte_vec = Vec::with_capacity(Self::CAPACITY);
 
        unsafe {
 
            // safe! this vector is guaranteed to have sufficient capacity
 
            byte_vec.set_len(Self::CAPACITY);
 
        }
 
        Self { byte_vec }
 
    }
 
}
 
impl UdpInBuffer {
 
    const CAPACITY: usize = u16::MAX as usize;
 
    fn as_mut_slice(&mut self) -> &mut [u8] {
 
        self.byte_vec.as_mut_slice()
 
    }
 
}
src/runtime/setup.rs
Show inline comments
 
@@ -24,42 +24,54 @@ impl Connector {
 
                surplus_sockets,
 
            })),
 
        }
 
    }
 
    pub fn new_udp_port(
 
        &mut self,
 
        polarity: Polarity,
 
        local_addr: SocketAddr,
 
        peer_addr: SocketAddr,
 
    ) -> Result<[PortId; 2], WrongStateError> {
 
        let Self { unphased: _up, phased } = self;
 
    ) -> Result<PortId, WrongStateError> {
 
        let Self { unphased: cu, phased } = self;
 
        match phased {
 
            ConnectorPhased::Communication(..) => Err(WrongStateError),
 
            ConnectorPhased::Setup(_setup) => {
 
                let _udp_endpoint_setup = UdpEndpointSetup { local_addr, peer_addr };
 
                todo!()
 
            ConnectorPhased::Setup(setup) => {
 
                let udp_endpoint_setup = UdpEndpointSetup { local_addr, peer_addr };
 
                let udp_index = setup.udp_endpoint_setups.len();
 
                let [port_nat, port_udp] =
 
                    [cu.id_manager.new_port_id(), cu.id_manager.new_port_id()];
 
                cu.native_ports.insert(port_nat);
 
                cu.port_info.peers.insert(port_nat, port_udp);
 
                cu.port_info.peers.insert(port_udp, port_nat);
 
                cu.port_info.routes.insert(port_nat, Route::LocalComponent(ComponentId::Native));
 
                cu.port_info.routes.insert(port_udp, Route::UdpEndpoint { index: udp_index });
 
                cu.port_info.polarities.insert(port_nat, polarity);
 
                cu.port_info.polarities.insert(port_udp, !polarity);
 
                setup.udp_endpoint_setups.push((port_nat, udp_endpoint_setup));
 
                Ok(port_nat)
 
            }
 
        }
 
    }
 
    pub fn new_net_port(
 
        &mut self,
 
        polarity: Polarity,
 
        sock_addr: SocketAddr,
 
        endpoint_polarity: EndpointPolarity,
 
    ) -> Result<PortId, WrongStateError> {
 
        let Self { unphased: up, phased } = self;
 
        let Self { unphased: cu, phased } = self;
 
        match phased {
 
            ConnectorPhased::Communication(..) => Err(WrongStateError),
 
            ConnectorPhased::Setup(setup) => {
 
                let endpoint_setup = NetEndpointSetup { sock_addr, endpoint_polarity };
 
                let p = up.id_manager.new_port_id();
 
                up.native_ports.insert(p);
 
                let p = cu.id_manager.new_port_id();
 
                cu.native_ports.insert(p);
 
                // {polarity, route} known. {peer} unknown.
 
                up.port_info.polarities.insert(p, polarity);
 
                up.port_info.routes.insert(p, Route::LocalComponent(ComponentId::Native));
 
                cu.port_info.polarities.insert(p, polarity);
 
                cu.port_info.routes.insert(p, Route::LocalComponent(ComponentId::Native));
 
                log!(
 
                    up.logger,
 
                    cu.logger,
 
                    "Added net port {:?} with polarity {:?} and endpoint setup {:?} ",
 
                    p,
 
                    polarity,
 
                    &endpoint_setup
 
                );
 
                setup.net_endpoint_setups.push((p, endpoint_setup));
 
@@ -80,49 +92,49 @@ impl Connector {
 
                let deadline = timeout.map(|to| Instant::now() + to);
 
                // connect all endpoints in parallel; send and receive peer ids through ports
 
                let mut endpoint_manager = new_endpoint_manager(
 
                    &mut *cu.logger,
 
                    &setup.net_endpoint_setups,
 
                    &mut cu.port_info,
 
                    deadline,
 
                    &deadline,
 
                )?;
 
                log!(
 
                    cu.logger,
 
                    "Successfully connected {} endpoints",
 
                    endpoint_manager.net_endpoint_exts.len()
 
                    endpoint_manager.net_endpoint_store.endpoint_exts.len()
 
                );
 
                // leader election and tree construction
 
                let neighborhood = init_neighborhood(
 
                    cu.id_manager.connector_id,
 
                    &mut *cu.logger,
 
                    &mut endpoint_manager,
 
                    deadline,
 
                    &deadline,
 
                )?;
 
                log!(cu.logger, "Successfully created neighborhood {:?}", &neighborhood);
 
                let mut comm = ConnectorCommunication {
 
                    round_index: 0,
 
                    endpoint_manager,
 
                    neighborhood,
 
                    native_batches: vec![Default::default()],
 
                    round_result: Ok(None),
 
                };
 
                if cfg!(feature = "session_optimization") {
 
                    session_optimize(cu, &mut comm, deadline)?;
 
                    session_optimize(cu, &mut comm, &deadline)?;
 
                }
 
                log!(cu.logger, "connect() finished. setup phase complete");
 
                self.phased = ConnectorPhased::Communication(Box::new(comm));
 
                Ok(())
 
            }
 
        }
 
    }
 
}
 
fn new_endpoint_manager(
 
    logger: &mut dyn Logger,
 
    endpoint_setups: &[(PortId, NetEndpointSetup)],
 
    port_info: &mut PortInfo,
 
    deadline: Option<Instant>,
 
    deadline: &Option<Instant>,
 
) -> Result<EndpointManager, ConnectError> {
 
    ////////////////////////////////////////////
 
    use std::sync::atomic::AtomicBool;
 
    use ConnectError as Ce;
 
    const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE);
 
    struct Todo {
 
@@ -170,13 +182,14 @@ fn new_endpoint_manager(
 
        waker: mio::Waker,
 
    }
 

	
 
    let mut waker_state: Option<Arc<WakerState>> = None;
 
    let mut poll = Poll::new().map_err(|_| Ce::PollInitFailed)?;
 
    let mut events = Events::with_capacity(endpoint_setups.len() * 2 + 4);
 
    let mut polled_undrained = VecSet::default();
 
    let mut net_polled_undrained = VecSet::default();
 
    let udp_polled_undrained = VecSet::default();
 
    let mut delayed_messages = vec![];
 

	
 
    // 2. create a registered (TcpListener/Endpoint) for passive / active respectively
 
    let mut todos = endpoint_setups
 
        .iter()
 
        .enumerate()
 
@@ -331,28 +344,31 @@ fn new_endpoint_manager(
 
                                polarity: local_polarity,
 
                                port: todo.local_port,
 
                            }));
 
                            net_endpoint
 
                                .send(&msg)
 
                                .map_err(|e| {
 
                                    Ce::EndpointSetupError(
 
                                    Ce::NetEndpointSetupError(
 
                                        net_endpoint.stream.local_addr().unwrap(),
 
                                        e,
 
                                    )
 
                                })
 
                                .unwrap();
 
                            log!(logger, "endpoint[{}] sent msg {:?}", index, &msg);
 
                            todo.sent_local_port = true;
 
                        }
 
                        if event.is_readable() && todo.recv_peer_port.is_none() {
 
                            // can read and didn't recv setup msg yet? Do so!
 
                            let maybe_msg = net_endpoint.try_recv(logger).map_err(|e| {
 
                                Ce::EndpointSetupError(net_endpoint.stream.local_addr().unwrap(), e)
 
                                Ce::NetEndpointSetupError(
 
                                    net_endpoint.stream.local_addr().unwrap(),
 
                                    e,
 
                                )
 
                            })?;
 
                            if maybe_msg.is_some() && !net_endpoint.inbox.is_empty() {
 
                                polled_undrained.insert(index);
 
                                net_polled_undrained.insert(index);
 
                            }
 
                            match maybe_msg {
 
                                None => {} // msg deserialization incomplete
 
                                Some(Msg::SetupMsg(SetupMsg::MyPortInfo(peer_info))) => {
 
                                    log!(
 
                                        logger,
 
@@ -404,13 +420,20 @@ fn new_endpoint_manager(
 
                    }
 
                }
 
            }
 
        }
 
        events.clear();
 
    }
 
    // all todos must be the NetEndpoint variants! unwrap and collect them
 
    log!(logger, "Endpoint setup complete! Cleaning up and building structures");
 
    if let Some(arc) = waker_state {
 
        log!(logger, "Sending waker the stop signal");
 
        arc.continue_signal.store(false, std::sync::atomic::Ordering::SeqCst);
 
        // TODO leave the waker registered?
 
    }
 
    let udp_endpoint_exts = vec![];
 

	
 
    let net_endpoint_exts = todos
 
        .into_iter()
 
        .enumerate()
 
        .map(|(index, Todo { todo_endpoint, local_port, .. })| NetEndpointExt {
 
            net_endpoint: match todo_endpoint {
 
                TodoEndpoint::NetEndpoint(mut net_endpoint) => {
 
@@ -422,32 +445,34 @@ fn new_endpoint_manager(
 
                }
 
                _ => unreachable!(),
 
            },
 
            getter_for_incoming: local_port,
 
        })
 
        .collect();
 
    if let Some(arc) = waker_state {
 
        log!(logger, "Sending waker the stop signal");
 
        arc.continue_signal.store(false, std::sync::atomic::Ordering::SeqCst);
 
        // TODO leave the waker registered?
 
    }
 
    Ok(EndpointManager {
 
        poll,
 
        events,
 
        polled_undrained,
 
        undelayed_messages: delayed_messages, // no longer delayed
 
        delayed_messages: Default::default(),
 
        net_endpoint_exts,
 
        net_endpoint_store: EndpointStore {
 
            endpoint_exts: net_endpoint_exts,
 
            polled_undrained: net_polled_undrained,
 
        },
 
        udp_endpoint_store: EndpointStore {
 
            endpoint_exts: udp_endpoint_exts,
 
            polled_undrained: udp_polled_undrained,
 
        },
 
        udp_in_buffer: Default::default(),
 
    })
 
}
 

	
 
fn init_neighborhood(
 
    connector_id: ConnectorId,
 
    logger: &mut dyn Logger,
 
    em: &mut EndpointManager,
 
    deadline: Option<Instant>,
 
    deadline: &Option<Instant>,
 
) -> Result<Neighborhood, ConnectError> {
 
    ////////////////////////////////
 
    use {ConnectError as Ce, Msg::SetupMsg as S, SetupMsg as Sm};
 
    #[derive(Debug)]
 
    struct WaveState {
 
        parent: Option<usize>,
 
@@ -642,13 +667,13 @@ fn init_neighborhood(
 
    Ok(neighborhood)
 
}
 

	
 
fn session_optimize(
 
    cu: &mut ConnectorUnphased,
 
    comm: &mut ConnectorCommunication,
 
    deadline: Option<Instant>,
 
    deadline: &Option<Instant>,
 
) -> Result<(), ConnectError> {
 
    ////////////////////////////////////////
 
    use {ConnectError as Ce, Msg::SetupMsg as S, SetupMsg as Sm};
 
    ////////////////////////////////////////
 
    log!(cu.logger, "Beginning session optimization");
 
    // populate session_info_map from a message per child
 
@@ -706,13 +731,14 @@ fn session_optimize(
 
    let my_session_info = SessionInfo {
 
        port_info: cu.port_info.clone(),
 
        proto_components: cu.proto_components.clone(),
 
        serde_proto_description: SerdeProtocolDescription(cu.proto_description.clone()),
 
        endpoint_incoming_to_getter: comm
 
            .endpoint_manager
 
            .net_endpoint_exts
 
            .net_endpoint_store
 
            .endpoint_exts
 
            .iter()
 
            .map(|ee| ee.getter_for_incoming)
 
            .collect(),
 
    };
 
    unoptimized_map.insert(cu.id_manager.connector_id, my_session_info);
 
    log!(cu.logger, "Inserting my own info. Unoptimized subtree map is {:?}", &unoptimized_map);
 
@@ -795,13 +821,17 @@ fn apply_optimizations(
 
        endpoint_incoming_to_getter,
 
    } = session_info;
 
    // TODO some info which should be read-only can be mutated with the current scheme
 
    cu.port_info = port_info;
 
    cu.proto_components = proto_components;
 
    cu.proto_description = serde_proto_description.0;
 
    for (ee, getter) in
 
        comm.endpoint_manager.net_endpoint_exts.iter_mut().zip(endpoint_incoming_to_getter)
 
    for (ee, getter) in comm
 
        .endpoint_manager
 
        .net_endpoint_store
 
        .endpoint_exts
 
        .iter_mut()
 
        .zip(endpoint_incoming_to_getter)
 
    {
 
        ee.getter_for_incoming = getter;
 
    }
 
    Ok(())
 
}
src/runtime/tests.rs
Show inline comments
 
@@ -654,6 +654,16 @@ fn multi_recover() {
 
                assert_eq!(res.is_ok(), succeeds);
 
            }
 
        });
 
    })
 
    .unwrap();
 
}
 

	
 
#[test]
 
fn udp_self_connect() {
 
    let test_log_path = Path::new("./logs/udp_self_connect");
 
    let sock_addrs = [next_test_addr(), next_test_addr()];
 
    let mut c = file_logged_connector(0, test_log_path);
 
    c.new_udp_port(Putter, sock_addrs[0], sock_addrs[1]).unwrap();
 
    c.new_udp_port(Getter, sock_addrs[1], sock_addrs[0]).unwrap();
 
    c.connect(SEC1).unwrap();
 
}
0 comments (0 inline, 0 general)