Changeset - 9f8f7a65f90d
[Not reviewed]
0 7 0
Christopher Esterhuyse - 5 years ago 2020-09-23 16:56:44
christopher.esterhuyse@gmail.com
simplified setup procedure's reconnection business. got rid of the finicky waker and waker token, instead relying on simple arithmetic for the timeout. more doc comments in setup and endpoint modules
7 files changed with 329 insertions and 208 deletions:
0 comments (0 inline, 0 general)
src/common.rs
Show inline comments
 
@@ -100,8 +100,8 @@ pub(crate) enum SyncBlocker {
 
    PutMsg(PortId, Payload),
 
    NondetChoice { n: u16 },
 
}
 
struct DenseDebugHex<'a>(pub &'a [u8]);
 
struct DebuggableIter<I: Iterator<Item = T> + Clone, T: Debug>(pub(crate) I);
 
pub(crate) struct DenseDebugHex<'a>(pub &'a [u8]);
 
pub(crate) struct DebuggableIter<I: Iterator<Item = T> + Clone, T: Debug>(pub(crate) I);
 
///////////////////// IMPL /////////////////////
 
impl IdParts for Id {
 
    fn id_parts(self) -> (ConnectorId, U32Suffix) {
src/macros.rs
Show inline comments
 
@@ -14,7 +14,9 @@ macro_rules! log {
 
        // }
 
    }};
 
    (@ENDPT, $logger:expr, $($arg:tt)*) => {{
 
        // ignore
 
        // if let Some(w) = $logger.line_writer() {
 
        //     let _ = writeln!(w, $($arg)*);
 
        // }
 
    }};
 
    ($logger:expr, $($arg:tt)*) => {{
 
        // if let Some(w) = $logger.line_writer() {
src/runtime/communication.rs
Show inline comments
 
@@ -152,7 +152,7 @@ impl Connector {
 
    ) -> Result<&mut NativeBatch, PortOpError> {
 
        use PortOpError as Poe;
 
        let Self { unphased: cu, phased } = self;
 
        let info = cu.ips.port_info.get(&port).ok_or(Poe::UnknownPolarity)?;
 
        let info = cu.ips.port_info.map.get(&port).ok_or(Poe::UnknownPolarity)?;
 
        if info.owner != cu.native_component_id {
 
            return Err(Poe::PortUnavailable);
 
        }
 
@@ -376,18 +376,18 @@ impl Connector {
 
                );
 
                let firing_ports: HashSet<PortId> = firing_iter.clone().collect();
 
                for port in firing_iter {
 
                    let var = cu.ips.spec_var_for(port);
 
                    let var = cu.ips.port_info.spec_var_for(port);
 
                    predicate.assigned.insert(var, SpecVal::FIRING);
 
                }
 
                // all silent ports have SpecVal::SILENT
 
                for port in cu.ips.ports_owned_by(cu.native_component_id) {
 
                for port in cu.ips.port_info.ports_owned_by(cu.native_component_id) {
 
                    if firing_ports.contains(port) {
 
                        // this one is FIRING
 
                        continue;
 
                    }
 
                    let var = cu.ips.spec_var_for(*port);
 
                    let var = cu.ips.port_info.spec_var_for(*port);
 
                    if let Some(SpecVal::FIRING) = predicate.assigned.insert(var, SpecVal::SILENT) {
 
                        log!(cu.logger(), "Native branch index={} contains internal inconsistency wrt. {:?}. Skipping", index, var);
 
                        log!(&mut *cu.logger, "Native branch index={} contains internal inconsistency wrt. {:?}. Skipping", index, var);
 
                        continue 'native_branches;
 
                    }
 
                }
 
@@ -406,7 +406,7 @@ impl Connector {
 
                    putter
 
                );
 
                // sanity check
 
                assert_eq!(Putter, cu.ips.port_info.get(&putter).unwrap().polarity);
 
                assert_eq!(Putter, cu.ips.port_info.map.get(&putter).unwrap().polarity);
 
                rctx.putter_push(cu, putter, msg);
 
            }
 
            let branch = NativeBranch { index, gotten: Default::default(), to_get };
 
@@ -580,7 +580,7 @@ impl Connector {
 
            log!(cu.logger(), "Decision loop! have {} messages to recv", rctx.payload_inbox.len());
 
            while let Some((getter, send_payload_msg)) = rctx.getter_pop() {
 
                log!(@MARK, cu.logger(), "handling payload msg for getter {:?} of {:?}", getter, &send_payload_msg);
 
                let getter_info = rctx.ips.port_info.get(&getter).unwrap();
 
                let getter_info = rctx.ips.port_info.map.get(&getter).unwrap();
 
                let cid = getter_info.owner; // the id of the component owning `getter` port
 
                assert_eq!(Getter, getter_info.polarity); // sanity check
 
                log!(
 
@@ -832,7 +832,7 @@ impl BranchingNative {
 
        bn_temp: MapTempGuard<'_, Predicate, NativeBranch>,
 
    ) {
 
        log!(cu.logger(), "feeding native getter {:?} {:?}", getter, &send_payload_msg);
 
        assert_eq!(Getter, rctx.ips.port_info.get(&getter).unwrap().polarity);
 
        assert_eq!(Getter, rctx.ips.port_info.map.get(&getter).unwrap().polarity);
 
        let mut draining = bn_temp;
 
        let finished = &mut self.branches;
 
        std::mem::swap(draining.0, finished);
 
@@ -840,7 +840,7 @@ impl BranchingNative {
 
        // consistent with that of the received message.
 
        for (predicate, mut branch) in draining.drain() {
 
            log!(cu.logger(), "visiting native branch {:?} with {:?}", &branch, &predicate);
 
            let var = rctx.ips.spec_var_for(getter);
 
            let var = rctx.ips.port_info.spec_var_for(getter);
 
            if predicate.query(var) != Some(SpecVal::FIRING) {
 
                // optimization. Don't bother trying this branch,
 
                // because the resulting branch would have an inconsistent predicate.
 
@@ -1046,7 +1046,7 @@ impl BranchingProtoComponent {
 
                }
 
                B::CouldntCheckFiring(port) => {
 
                    // sanity check: `CouldntCheckFiring` returned IFF the variable is speculatively assigned
 
                    let var = rctx.ips.spec_var_for(port);
 
                    let var = rctx.ips.port_info.spec_var_for(port);
 
                    assert!(predicate.query(var).is_none());
 
                    // speculate on the two possible values of `var`. Schedule both branches to be rerun.
 
                    drainer.add_input(predicate.clone().inserted(var, SpecVal::SILENT), branch.clone());
 
@@ -1054,9 +1054,9 @@ impl BranchingProtoComponent {
 
                }
 
                B::PutMsg(putter, payload) => {
 
                    // sanity check: The given port indeed has `Putter` polarity
 
                    assert_eq!(Putter, rctx.ips.port_info.get(&putter).unwrap().polarity);
 
                    assert_eq!(Putter, rctx.ips.port_info.map.get(&putter).unwrap().polarity);
 
                    // assign FIRING to this port's associated firing variable
 
                    let var = rctx.ips.spec_var_for(putter);
 
                    let var = rctx.ips.port_info.spec_var_for(putter);
 
                    let was = predicate.assigned.insert(var, SpecVal::FIRING);
 
                    if was == Some(SpecVal::SILENT) {
 
                        // Discard the branch, as it clearly has contradictory requirements for this value.
 
@@ -1079,8 +1079,8 @@ impl BranchingProtoComponent {
 
                B::SyncBlockEnd => {
 
                    // This branch reached the end of it's synchronous block
 
                    // assign all variables of owned ports that DIDN'T fire to SILENT
 
                    for port in rctx.ips.ports_owned_by(proto_component_id) {
 
                        let var = rctx.ips.spec_var_for(*port);
 
                    for port in rctx.ips.port_info.ports_owned_by(proto_component_id) {
 
                        let var = rctx.ips.port_info.spec_var_for(*port);
 
                        let actually_exchanged = branch.inner.did_put_or_get.contains(port);
 
                        let val = *predicate.assigned.entry(var).or_insert(SpecVal::SILENT);
 
                        let speculated_to_fire = val == SpecVal::FIRING;
 
@@ -1195,7 +1195,7 @@ impl BranchingProtoComponent {
 
        BranchingProtoComponent::drain_branches_to_blocked(cd, cu, rctx, proto_component_id)?;
 
        // swap the blocked branches back
 
        std::mem::swap(blocked.0, &mut self.branches);
 
        log!(cu.logger(), "component settles down with branches: {:?}", branches.keys());
 
        log!(cu.logger(), "component settles down with branches: {:?}", self.branches.keys());
 
        Ok(())
 
    }
 

	
 
@@ -1362,7 +1362,7 @@ impl NonsyncProtoContext<'_> {
 
        for port in moved_ports.iter() {
 
            assert_eq!(
 
                self.proto_component_id,
 
                self.ips.port_info.get(port).unwrap().owner
 
                self.ips.port_info.map.get(port).unwrap().owner
 
            );
 
        }
 
        // Create the new component, and schedule it to be run
 
@@ -1378,7 +1378,7 @@ impl NonsyncProtoContext<'_> {
 
        self.unrun_components.push((new_cid, state));
 
        // Update the ownership of the moved ports
 
        for port in moved_ports.iter() {
 
            self.ips.port_info.get_mut(port).unwrap().owner = new_cid;
 
            self.ips.port_info.map.get_mut(port).unwrap().owner = new_cid;
 
        }
 
    }
 

	
 
@@ -1388,7 +1388,7 @@ impl NonsyncProtoContext<'_> {
 
        // adds two new associated ports, related to each other, and exposed to the proto component
 
        let mut new_cid_fn = || self.ips.id_manager.new_port_id();
 
        let [o, i] = [new_cid_fn(), new_cid_fn()];
 
        self.ips.port_info.insert(
 
        self.ips.port_info.map.insert(
 
            o,
 
            PortInfo {
 
                route: Route::LocalComponent,
 
@@ -1397,7 +1397,7 @@ impl NonsyncProtoContext<'_> {
 
                owner: self.proto_component_id,
 
            },
 
        );
 
        self.ips.port_info.insert(
 
        self.ips.port_info.map.insert(
 
            i,
 
            PortInfo {
 
                route: Route::LocalComponent,
 
@@ -1420,7 +1420,7 @@ impl SyncProtoContext<'_> {
 
    // The component calls the runtime back, inspecting whether it's associated
 
    // preidcate has already determined a (speculative) value for the given port's firing variable.
 
    pub(crate) fn is_firing(&mut self, port: PortId) -> Option<bool> {
 
        let var = self.rctx.ips.spec_var_for(port);
 
        let var = self.rctx.ips.port_info.spec_var_for(port);
 
        self.predicate.query(var).map(SpecVal::is_firing)
 
    }
 

	
src/runtime/endpoints.rs
Show inline comments
 
use super::*;
 

	
 
// A wrapper for some Read type, delegating read calls
 
// to the contained Read structure, but snooping on
 
// the number of bytes it reads, to be inspected later.
 
struct MonitoredReader<R: Read> {
 
    bytes: usize,
 
    r: R,
 
}
 

	
 
enum PollAndPopulateError {
 
    PollFailed,
 
    Timeout,
 
}
 

	
 
struct TryRecvAnyNetError {
 
    error: NetEndpointError,
 
    index: usize,
 
}
 
/////////////////////
 
impl NetEndpoint {
 
    // Returns the bincode configuration the NetEndpoint uses pervasively
 
    // for configuration on ser/de operations.
 
    fn bincode_opts() -> impl bincode::config::Options {
 
        // uses variable-length encoding everywhere; great!
 
        bincode::config::DefaultOptions::default()
 
    }
 

	
 
    // Attempt to return some deserializable T-type from
 
    // the inbox or network stream
 
    pub(super) fn try_recv<T: serde::de::DeserializeOwned>(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
    ) -> Result<Option<T>, NetEndpointError> {
 
        use NetEndpointError as Nee;
 
        // populate inbox as much as possible
 
        // populate inbox with bytes as much as possible (mio::TcpStream is nonblocking)
 
        let before_len = self.inbox.len();
 
        'read_loop: loop {
 
            let res = self.stream.read_to_end(&mut self.inbox);
 
@@ -40,12 +51,16 @@ impl NetEndpoint {
 
            DenseDebugHex(&self.inbox[..before_len]),
 
            DenseDebugHex(&self.inbox[before_len..]),
 
        );
 
        // Try deserialize from the inbox, monitoring how many bytes
 
        // the deserialiation process consumes. In the event of
 
        // success, this makes clear where the message ends
 
        let mut monitored = MonitoredReader::from(&self.inbox[..]);
 
        use bincode::config::Options;
 
        match Self::bincode_opts().deserialize_from(&mut monitored) {
 
            Ok(msg) => {
 
                let msg_size = monitored.bytes_read();
 
                self.inbox.drain(0..(msg_size.try_into().unwrap()));
 
                // inbox[..msg_size] was deserialized into one message!
 
                self.inbox.drain(..msg_size);
 
                log!(
 
                    @ENDPT,
 
                    logger,
 
@@ -59,12 +74,15 @@ impl NetEndpoint {
 
            }
 
            Err(e) => match *e {
 
                bincode::ErrorKind::Io(k) if k.kind() == std::io::ErrorKind::UnexpectedEof => {
 
                    // Contents of inbox insufficient for deserializing a message
 
                    Ok(None)
 
                }
 
                _ => Err(Nee::MalformedMessage),
 
            },
 
        }
 
    }
 

	
 
    // Send the given serializable type into the stream
 
    pub(super) fn send<T: serde::ser::Serialize>(
 
        &mut self,
 
        msg: &T,
 
@@ -86,6 +104,18 @@ impl EndpointManager {
 
    pub(super) fn num_net_endpoints(&self) -> usize {
 
        self.net_endpoint_store.endpoint_exts.len()
 
    }
 

	
 
    // Setup-phase particular send procedure.
 
    // Used pervasively, allows for some brevity with the ? operator.
 
    pub(super) fn send_to_setup(&mut self, index: usize, msg: &Msg) -> Result<(), ConnectError> {
 
        let net_endpoint = &mut self.net_endpoint_store.endpoint_exts[index].net_endpoint;
 
        net_endpoint.send(msg).map_err(|err| {
 
            ConnectError::NetEndpointSetupError(net_endpoint.stream.local_addr().unwrap(), err)
 
        })
 
    }
 

	
 
    // Communication-phase particular send procedure.
 
    // Used pervasively, allows for some brevity with the ? operator.
 
    pub(super) fn send_to_comms(
 
        &mut self,
 
        index: usize,
 
@@ -95,12 +125,6 @@ impl EndpointManager {
 
        let net_endpoint = &mut self.net_endpoint_store.endpoint_exts[index].net_endpoint;
 
        net_endpoint.send(msg).map_err(|_| Use::BrokenNetEndpoint { index })
 
    }
 
    pub(super) fn send_to_setup(&mut self, index: usize, msg: &Msg) -> Result<(), ConnectError> {
 
        let net_endpoint = &mut self.net_endpoint_store.endpoint_exts[index].net_endpoint;
 
        net_endpoint.send(msg).map_err(|err| {
 
            ConnectError::NetEndpointSetupError(net_endpoint.stream.local_addr().unwrap(), err)
 
        })
 
    }
 

	
 
    /// Receive the first message of any kind at all.
 
    /// Why not return SetupMsg? Because often this message will be forwarded to several others,
 
@@ -110,7 +134,8 @@ impl EndpointManager {
 
        logger: &mut dyn Logger,
 
        deadline: &Option<Instant>,
 
    ) -> Result<(usize, Msg), ConnectError> {
 
        ///////////////////////////////////////////
 
        // helper function, mapping a TryRecvAnySetup type error
 
        // into a ConnectError
 
        fn map_trane(
 
            trane: TryRecvAnyNetError,
 
            net_endpoint_store: &EndpointStore<NetEndpointExt>,
 
@@ -124,7 +149,6 @@ impl EndpointManager {
 
                trane.error,
 
            )
 
        }
 
        ///////////////////////////////////////////
 
        // try yield undelayed net message
 
        if let Some(tup) = self.undelayed_messages.pop() {
 
            log!(@ENDPT, logger, "RECV undelayed_msg {:?}", &tup);
 
@@ -156,7 +180,11 @@ impl EndpointManager {
 
        round_index: usize,
 
    ) -> Result<CommRecvOk, UnrecoverableSyncError> {
 
        ///////////////////////////////////////////
 
        // adds scoped functionality for EndpointManager
 
        impl EndpointManager {
 
            // Given some Msg structure in a particular context,
 
            // return a control message for the current round
 
            // if its a payload message, buffer it instead
 
            fn handle_msg(
 
                &mut self,
 
                cu: &mut impl CuUndecided,
 
@@ -167,10 +195,11 @@ impl EndpointManager {
 
                some_message_enqueued: &mut bool,
 
            ) -> Option<(usize, CommCtrlMsg)> {
 
                let comm_msg_contents = match msg {
 
                    Msg::SetupMsg(..) => return None,
 
                    Msg::SetupMsg(..) => return None, // discard setup messages
 
                    Msg::CommMsg(comm_msg) => match comm_msg.round_index.cmp(&round_index) {
 
                        Ordering::Equal => comm_msg.contents,
 
                        Ordering::Equal => comm_msg.contents, // ok, keep going
 
                        Ordering::Less => {
 
                            // discard this message
 
                            log!(
 
                                cu.logger(),
 
                                "We are in round {}, but msg is for round {}. Discard",
 
@@ -180,6 +209,7 @@ impl EndpointManager {
 
                            return None;
 
                        }
 
                        Ordering::Greater => {
 
                            // "delay" this message, enqueueing it for a future round
 
                            log!(
 
                                cu.logger(),
 
                                "We are in round {}, but msg is for round {}. Buffer",
 
@@ -191,9 +221,15 @@ impl EndpointManager {
 
                        }
 
                    },
 
                };
 
                // inspect the contents of this contemporary message, sorting it
 
                match comm_msg_contents {
 
                    CommMsgContents::CommCtrl(comm_ctrl_msg) => Some((net_index, comm_ctrl_msg)),
 
                    CommMsgContents::CommCtrl(comm_ctrl_msg) => {
 
                        // yes! this is a CommCtrlMsg
 
                        Some((net_index, comm_ctrl_msg))
 
                    }
 
                    CommMsgContents::SendPayload(send_payload_msg) => {
 
                        // Enqueue this payload message
 
                        // Still not a CommCtrlMsg, so return None
 
                        let getter =
 
                            self.net_endpoint_store.endpoint_exts[net_index].getter_for_incoming;
 
                        rctx.getter_push(getter, send_payload_msg);
 
@@ -206,7 +242,7 @@ impl EndpointManager {
 
        use {PollAndPopulateError as Pape, UnrecoverableSyncError as Use};
 
        ///////////////////////////////////////////
 
        let mut some_message_enqueued = false;
 
        // try yield undelayed net message
 
        // pop undelayed messages, handling them. Return the first CommCtrlMsg popped
 
        while let Some((net_index, msg)) = self.undelayed_messages.pop() {
 
            if let Some((net_index, msg)) =
 
                self.handle_msg(cu, rctx, net_index, msg, round_index, &mut some_message_enqueued)
 
@@ -215,7 +251,8 @@ impl EndpointManager {
 
            }
 
        }
 
        loop {
 
            // try receive a net message
 
            // drain endpoints of incoming messages (without blocking)
 
            // return first CommCtrlMsg received
 
            while let Some((net_index, msg)) = self.try_recv_undrained_net(cu.logger())? {
 
                if let Some((net_index, msg)) = self.handle_msg(
 
                    cu,
 
@@ -237,7 +274,7 @@ impl EndpointManager {
 
                    self.udp_endpoint_store.polled_undrained.insert(index);
 
                    if !ee.received_this_round {
 
                        let payload = Payload::from(&recv_buffer[..bytes_written]);
 
                        let port_spec_var = rctx.ips.spec_var_for(ee.getter_for_incoming);
 
                        let port_spec_var = rctx.ips.port_info.spec_var_for(ee.getter_for_incoming);
 
                        let predicate = Predicate::singleton(port_spec_var, SpecVal::FIRING);
 
                        rctx.getter_push(
 
                            ee.getter_for_incoming,
 
@@ -261,6 +298,8 @@ impl EndpointManager {
 
            }
 
        }
 
    }
 

	
 
    // Try receive some message from any net endpoint without blocking
 
    fn try_recv_undrained_net(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
@@ -281,6 +320,9 @@ impl EndpointManager {
 
        }
 
        Ok(None)
 
    }
 

	
 
    // Poll the network, raising `polled_undrained` flags for endpoints
 
    // as they receive events.
 
    fn poll_and_populate(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
@@ -297,9 +339,6 @@ impl EndpointManager {
 
        self.poll.poll(&mut self.events, remaining).map_err(|_| Pape::PollFailed)?;
 
        for event in self.events.iter() {
 
            match TokenTarget::from(event.token()) {
 
                TokenTarget::Waker => {
 
                    // Can ignore. Residual event from endpoint manager setup procedure
 
                }
 
                TokenTarget::NetEndpoint { index } => {
 
                    self.net_endpoint_store.polled_undrained.insert(index);
 
                    log!(
 
@@ -327,6 +366,8 @@ impl EndpointManager {
 
        self.events.clear();
 
        Ok(())
 
    }
 

	
 
    // Move all delayed messages to undelayed, making it possible to yield them
 
    pub(super) fn undelay_all(&mut self) {
 
        if self.undelayed_messages.is_empty() {
 
            // fast path
 
@@ -336,6 +377,8 @@ impl EndpointManager {
 
        // slow path
 
        self.undelayed_messages.extend(self.delayed_messages.drain(..));
 
    }
 

	
 
    // End the synchronous round for the udp endpoints given the round decision
 
    pub(super) fn udp_endpoints_round_end(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
@@ -349,6 +392,11 @@ impl EndpointManager {
 
        );
 
        use UnrecoverableSyncError as Use;
 
        if let Decision::Success(solution_predicate) = decision {
 
            // Similar to a native component, we commit the branch of the component
 
            // consistent with the predicate decided upon, making its effects visible
 
            // to the world outside the connector's internals.
 
            // In this case, this takes the form of emptying the component's outbox buffer,
 
            // actually sending payloads 'on the wire' as UDP messages.
 
            for (index, ee) in self.udp_endpoint_store.endpoint_exts.iter_mut().enumerate() {
 
                'outgoing_loop: for (payload_predicate, payload) in ee.outgoing_payloads.drain() {
 
                    if payload_predicate.assigns_subset(solution_predicate) {
src/runtime/logging.rs
Show inline comments
 
use super::*;
 

	
 
// Used in the loggers' format string
 
fn secs_since_unix_epoch() -> f64 {
 
    std::time::SystemTime::now()
 
        .duration_since(std::time::UNIX_EPOCH)
src/runtime/mod.rs
Show inline comments
 
@@ -147,7 +147,7 @@ enum SetupMsg {
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
struct SessionInfo {
 
    serde_proto_description: SerdeProtocolDescription,
 
    port_info: HashMap<PortId, PortInfo>,
 
    port_info: PortInfoMap,
 
    endpoint_incoming_to_getter: Vec<PortId>,
 
    proto_components: HashMap<ComponentId, ComponentState>,
 
}
 
@@ -313,12 +313,19 @@ struct MyPortInfo {
 
    owner: ComponentId,
 
}
 

	
 
// Newtype around port info map, allowing the implementation of some
 
// useful methods
 
#[derive(Default, Debug, Clone, serde::Serialize, serde::Deserialize)]
 
struct PortInfoMap {
 
    map: HashMap<PortId, PortInfo>,
 
}
 

	
 
// A convenient substructure for containing port info and the ID manager.
 
// Houses the bulk of the connector's persistent state between rounds.
 
// It turns out several situations require access to both things.
 
#[derive(Debug, Clone)]
 
struct IdAndPortState {
 
    port_info: HashMap<PortId, PortInfo>,
 
    port_info: PortInfoMap,
 
    id_manager: IdManager,
 
}
 

	
 
@@ -434,7 +441,6 @@ struct NativeBatch {
 
enum TokenTarget {
 
    NetEndpoint { index: usize },
 
    UdpEndpoint { index: usize },
 
    Waker,
 
}
 

	
 
// Returned by the endpoint manager as a result of comm_recv, telling the connector what happened,
 
@@ -475,18 +481,15 @@ impl<T: std::cmp::Ord> VecSet<T> {
 
        self.vec.pop()
 
    }
 
}
 
impl IdAndPortState {
 
impl PortInfoMap {
 
    fn ports_owned_by(&self, owner: ComponentId) -> impl Iterator<Item = &PortId> {
 
        self.port_info
 
            .iter()
 
            .filter(move |(_, port_info)| port_info.owner == owner)
 
            .map(|(port, _)| port)
 
        self.map.iter().filter(move |(_, port_info)| port_info.owner == owner).map(|(port, _)| port)
 
    }
 
    fn spec_var_for(&self, port: PortId) -> SpecVar {
 
        // Every port maps to a speculative variable
 
        // Two distinct ports map to the same variable
 
        // IFF they are two ends of the same logical channel.
 
        let info = self.port_info.get(&port).unwrap();
 
        let info = self.map.get(&port).unwrap();
 
        SpecVar(match info.polarity {
 
            Getter => port,
 
            Putter => info.peer.unwrap(),
 
@@ -528,7 +531,7 @@ impl IdManager {
 
}
 
impl Drop for Connector {
 
    fn drop(&mut self) {
 
        log!(&mut *self.unphased.inner.logger, "Connector dropping. Goodbye!");
 
        log!(self.unphased.logger(), "Connector dropping. Goodbye!");
 
    }
 
}
 
// Given a slice of ports, return the first, if any, port is present repeatedly
 
@@ -594,7 +597,7 @@ impl Connector {
 
        // - they are each others' peers
 
        // - they are owned by a local component with id `cid`
 
        // - polarity putter, getter respectively
 
        cu.ips.port_info.insert(
 
        cu.ips.port_info.map.insert(
 
            o,
 
            PortInfo {
 
                route: Route::LocalComponent,
 
@@ -603,7 +606,7 @@ impl Connector {
 
                polarity: Putter,
 
            },
 
        );
 
        cu.ips.port_info.insert(
 
        cu.ips.port_info.map.insert(
 
            i,
 
            PortInfo {
 
                route: Route::LocalComponent,
 
@@ -642,7 +645,7 @@ impl Connector {
 
            return Err(Ace::WrongNumberOfParamaters { expected: expected_polarities.len() });
 
        }
 
        for (&expected_polarity, &port) in expected_polarities.iter().zip(ports.iter()) {
 
            let info = cu.ips.port_info.get(&port).ok_or(Ace::UnknownPort(port))?;
 
            let info = cu.ips.port_info.map.get(&port).ok_or(Ace::UnknownPort(port))?;
 
            if info.owner != cu.native_component_id {
 
                return Err(Ace::UnknownPort(port));
 
            }
 
@@ -658,7 +661,7 @@ impl Connector {
 
            .insert(new_cid, cu.proto_description.new_main_component(identifier, ports));
 
        // update the ownership of moved ports
 
        for port in ports.iter() {
 
            match cu.ips.port_info.get_mut(port) {
 
            match cu.ips.port_info.map.get_mut(port) {
 
                Some(port_info) => port_info.owner = new_cid,
 
                None => unreachable!(),
 
            }
 
@@ -787,7 +790,7 @@ impl RoundCtx {
 

	
 
    // buffer a message along with the ID of the putter who sent it
 
    fn putter_push(&mut self, cu: &mut impl CuUndecided, putter: PortId, msg: SendPayloadMsg) {
 
        if let Some(getter) = self.ips.port_info.get(&putter).unwrap().peer {
 
        if let Some(getter) = self.ips.port_info.map.get(&putter).unwrap().peer {
 
            log!(cu.logger(), "Putter add (putter:{:?} => getter:{:?})", putter, getter);
 
            self.getter_push(getter, msg);
 
        } else {
src/runtime/setup.rs
Show inline comments
 
use crate::common::*;
 
use crate::runtime::*;
 

	
 
#[derive(Default)]
 
struct ExtraPortInfo {
 
    info: HashMap<PortId, PortInfo>,
 
    peers: HashMap<PortId, PortId>,
 
}
 

	
 
impl TokenTarget {
 
    // subdivides the domain of usize into
 
    // [NET_ENDPOINT][UDP_ENDPOINT  ]
 
    // ^0            ^usize::MAX/2   ^usize::MAX
 
    const HALFWAY_INDEX: usize = usize::MAX / 2;
 
    const MAX_INDEX: usize = usize::MAX;
 
    const WAKER_TOKEN: usize = Self::MAX_INDEX;
 
}
 
impl From<Token> for TokenTarget {
 
    fn from(Token(index): Token) -> Self {
 
        if index == Self::WAKER_TOKEN {
 
            TokenTarget::Waker
 
        } else if let Some(shifted) = index.checked_sub(Self::HALFWAY_INDEX) {
 
        if let Some(shifted) = index.checked_sub(Self::HALFWAY_INDEX) {
 
            TokenTarget::UdpEndpoint { index: shifted }
 
        } else {
 
            TokenTarget::NetEndpoint { index }
 
@@ -20,7 +26,6 @@ impl From<Token> for TokenTarget {
 
impl Into<Token> for TokenTarget {
 
    fn into(self) -> Token {
 
        match self {
 
            TokenTarget::Waker => Token(Self::WAKER_TOKEN),
 
            TokenTarget::UdpEndpoint { index } => Token(index + Self::HALFWAY_INDEX),
 
            TokenTarget::NetEndpoint { index } => Token(index),
 
        }
 
@@ -60,6 +65,7 @@ impl Connector {
 
            })),
 
        }
 
    }
 

	
 
    /// Conceptually, this returning [p0, g1] is sugar for:
 
    /// 1. create port pair [p0, g0]
 
    /// 2. create port pair [p1, g1]
 
@@ -76,19 +82,12 @@ impl Connector {
 
            ConnectorPhased::Setup(setup) => {
 
                let udp_index = setup.udp_endpoint_setups.len();
 
                let udp_cid = cu.ips.id_manager.new_component_id();
 
                // allocates 4 new port identifiers, two for each logical channel,
 
                // one channel per direction (into and out of the component)
 
                let mut npid = || cu.ips.id_manager.new_port_id();
 
                let [nin, nout, uin, uout] = [npid(), npid(), npid(), npid()];
 

	
 
                cu.ips.port_info.insert(
 
                    nin,
 
                    PortInfo {
 
                        route: Route::LocalComponent,
 
                        polarity: Getter,
 
                        peer: Some(uout),
 
                        owner: cu.native_component_id,
 
                    },
 
                );
 
                cu.ips.port_info.insert(
 
                // allocate the native->udp_mediator channel's ports
 
                cu.ips.port_info.map.insert(
 
                    nout,
 
                    PortInfo {
 
                        route: Route::LocalComponent,
 
@@ -97,7 +96,7 @@ impl Connector {
 
                        owner: cu.native_component_id,
 
                    },
 
                );
 
                cu.ips.port_info.insert(
 
                cu.ips.port_info.map.insert(
 
                    uin,
 
                    PortInfo {
 
                        route: Route::UdpEndpoint { index: udp_index },
 
@@ -106,7 +105,8 @@ impl Connector {
 
                        owner: udp_cid,
 
                    },
 
                );
 
                cu.ips.port_info.insert(
 
                // allocate the udp_mediator->native channel's ports
 
                cu.ips.port_info.map.insert(
 
                    uout,
 
                    PortInfo {
 
                        route: Route::UdpEndpoint { index: udp_index },
 
@@ -115,11 +115,23 @@ impl Connector {
 
                        owner: udp_cid,
 
                    },
 
                );
 
                cu.ips.port_info.map.insert(
 
                    nin,
 
                    PortInfo {
 
                        route: Route::LocalComponent,
 
                        polarity: Getter,
 
                        peer: Some(uout),
 
                        owner: cu.native_component_id,
 
                    },
 
                );
 
                // allocate the two ports owned by the UdpMediator component
 
                // Remember to setup this UdpEndpoint setup during `connect` later.
 
                setup.udp_endpoint_setups.push(UdpEndpointSetup {
 
                    local_addr,
 
                    peer_addr,
 
                    getter_for_incoming: nin,
 
                });
 
                // Return the native's output, input port pair
 
                Ok([nout, nin])
 
            }
 
        }
 
@@ -138,8 +150,9 @@ impl Connector {
 
        match phased {
 
            ConnectorPhased::Communication(..) => Err(WrongStateError),
 
            ConnectorPhased::Setup(setup) => {
 
                // allocate a single dangling port with a `None` peer (for now)
 
                let new_pid = cu.ips.id_manager.new_port_id();
 
                cu.ips.port_info.insert(
 
                cu.ips.port_info.map.insert(
 
                    new_pid,
 
                    PortInfo {
 
                        route: Route::LocalComponent,
 
@@ -156,6 +169,7 @@ impl Connector {
 
                    &sock_addr,
 
                    endpoint_polarity
 
                );
 
                // Remember to setup this NetEndpoint setup during `connect` later.
 
                setup.net_endpoint_setups.push(NetEndpointSetup {
 
                    sock_addr,
 
                    endpoint_polarity,
 
@@ -186,11 +200,11 @@ impl Connector {
 
                log!(cu.logger, "~~~ CONNECT called timeout {:?}", timeout);
 
                let deadline = timeout.map(|to| Instant::now() + to);
 
                // connect all endpoints in parallel; send and receive peer ids through ports
 
                let mut endpoint_manager = new_endpoint_manager(
 
                let (mut endpoint_manager, mut extra_port_info) = setup_endpoints_and_pair_ports(
 
                    &mut *cu.logger,
 
                    &setup.net_endpoint_setups,
 
                    &setup.udp_endpoint_setups,
 
                    &mut cu.ips.port_info,
 
                    &cu.ips.port_info,
 
                    &deadline,
 
                )?;
 
                log!(
 
@@ -200,7 +214,8 @@ impl Connector {
 
                    &cu.ips.port_info,
 
                    &endpoint_manager,
 
                );
 
                // leader election and tree construction
 
                // leader election and tree construction. Learn our role in the consensus tree,
 
                // from learning who are our children/parents (neighbors) in the consensus tree.
 
                let neighborhood = init_neighborhood(
 
                    cu.ips.id_manager.connector_id,
 
                    &mut *cu.logger,
 
@@ -208,80 +223,110 @@ impl Connector {
 
                    &deadline,
 
                )?;
 
                log!(cu.logger, "Successfully created neighborhood {:?}", &neighborhood);
 
                // Put it all together with an initial round index of zero.
 
                let mut comm = ConnectorCommunication {
 
                    round_index: 0,
 
                    endpoint_manager,
 
                    neighborhood,
 
                    native_batches: vec![Default::default()],
 
                    round_result: Ok(None),
 
                    round_result: Ok(None), // no previous round yet
 
                };
 
                if cfg!(feature = "session_optimization") {
 
                    // Perform the session optimization procedure, which may modify the
 
                    // internals of the connector, rerouting ports, moving around connectors etc.
 
                    session_optimize(cu, &mut comm, &deadline)?;
 
                }
 
                log!(cu.logger, "connect() finished. setup phase complete");
 
                // Connect procedure successful! Commit changes by...
 
                // ... commiting new port info for ConnectorUnphased
 
                for (port, info) in extra_port_info.info.drain() {
 
                    cu.ips.port_info.map.insert(port, info);
 
                }
 
                for (port, peer) in extra_port_info.peers.drain() {
 
                    cu.ips.port_info.map.get_mut(&port).unwrap().peer = Some(peer);
 
                }
 
                // ... replacing the connector's phase to "communication"
 
                *phased = ConnectorPhased::Communication(Box::new(comm));
 
                Ok(())
 
            }
 
        }
 
    }
 
}
 
fn new_endpoint_manager(
 

	
 
// Given a set of net_ and udp_ endpoints to setup,
 
// port information to flesh out (by discovering peers through channels)
 
// and a deadline in which to do it,
 
// try to return:
 
// - An EndpointManager, containing all the set up endpoints
 
// - new information about ports acquired through the newly-created channels
 
fn setup_endpoints_and_pair_ports(
 
    logger: &mut dyn Logger,
 
    net_endpoint_setups: &[NetEndpointSetup],
 
    udp_endpoint_setups: &[UdpEndpointSetup],
 
    port_info: &mut HashMap<PortId, PortInfo>,
 
    port_info: &PortInfoMap,
 
    deadline: &Option<Instant>,
 
) -> Result<EndpointManager, ConnectError> {
 
    ////////////////////////////////////////////
 
    use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
 
) -> Result<(EndpointManager, ExtraPortInfo), ConnectError> {
 
    use ConnectError as Ce;
 
    const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE);
 
    const WAKER_PERIOD: Duration = Duration::from_millis(300);
 
    struct WakerState {
 
        continue_signal: AtomicBool,
 
        waker: mio::Waker,
 
    }
 
    impl WakerState {
 
        fn waker_loop(&self) {
 
            while self.continue_signal.load(SeqCst) {
 
                std::thread::sleep(WAKER_PERIOD);
 
                let _ = self.waker.wake();
 
            }
 
        }
 
        fn waker_stop(&self) {
 
            self.continue_signal.store(false, SeqCst);
 
            // TODO keep waker registered?
 
        }
 
    }
 
    struct Todo {
 
    const RETRY_PERIOD: Duration = Duration::from_millis(200);
 

	
 
    // The structure shared between this ("setup") thread and that of the waker.
 
    // The waker thread periodically sends signals.
 
    // struct WakerState {
 
    //     continue_signal: AtomicBool,
 
    //     waker: mio::Waker,
 
    // }
 
    // impl WakerState {
 
    //     // The waker thread runs this UNTIL the continue signal is set to false
 
    //     fn waker_loop(&self) {
 
    //         while self.continue_signal.load(SeqCst) {
 
    //             std::thread::sleep(WAKER_PERIOD);
 
    //             let _ = self.waker.wake();
 
    //         }
 
    //     }
 
    //     // The setup thread thread runs this to set the continue signal to false.
 
    //     fn waker_stop(&self) {
 
    //         self.continue_signal.store(false, SeqCst);
 
    //     }
 
    // }
 

	
 
    // The data for a net endpoint's setup in progress
 
    struct NetTodo {
 
        // becomes completed once sent_local_port && recv_peer_port.is_some()
 
        // we send local port if we haven't already and we receive a writable event
 
        // we recv peer port if we haven't already and we receive a readbale event
 
        todo_endpoint: TodoEndpoint,
 
        todo_endpoint: NetTodoEndpoint,
 
        endpoint_setup: NetEndpointSetup,
 
        sent_local_port: bool,          // true <-> I've sent my local port
 
        recv_peer_port: Option<PortId>, // Some(..) <-> I've received my peer's port
 
    }
 

	
 
    // The data for a udp endpoint's setup in progress
 
    struct UdpTodo {
 
        // becomes completed once we receive our first writable event
 
        getter_for_incoming: PortId,
 
        sock: UdpSocket,
 
    }
 
    enum TodoEndpoint {
 
        Accepting(TcpListener),
 
        NetEndpoint(NetEndpoint),
 

	
 
    // Substructure of `NetTodo`, which represents the endpoint itself
 
    enum NetTodoEndpoint {
 
        Accepting(TcpListener),       // awaiting it's peer initiating the connection
 
        PeerInfoRecving(NetEndpoint), // awaiting info about peer port through the channel
 
    }
 

	
 
    ////////////////////////////////////////////
 

	
 
    // 1. Start to construct EndpointManager
 
    let mut waker_state: Option<Arc<WakerState>> = None;
 
    // Start to construct our return values
 
    // let mut waker_state: Option<Arc<WakerState>> = None;
 
    let mut extra_port_info = ExtraPortInfo::default();
 
    let mut poll = Poll::new().map_err(|_| Ce::PollInitFailed)?;
 
    let mut events =
 
        Events::with_capacity((net_endpoint_setups.len() + udp_endpoint_setups.len()) * 2 + 4);
 
    let [mut net_polled_undrained, udp_polled_undrained] = [VecSet::default(), VecSet::default()];
 
    let mut delayed_messages = vec![];
 
    let mut last_retry_at = Instant::now();
 

	
 
    // 2. Create net/udp TODOs, each already registered with poll
 
    // Create net/udp todo structures, each already registered with poll
 
    let mut net_todos = net_endpoint_setups
 
        .iter()
 
        .enumerate()
 
@@ -292,21 +337,21 @@ fn new_endpoint_manager(
 
                let mut stream = TcpStream::connect(endpoint_setup.sock_addr)
 
                    .expect("mio::TcpStream connect should not fail!");
 
                poll.registry().register(&mut stream, token, BOTH).unwrap();
 
                TodoEndpoint::NetEndpoint(NetEndpoint { stream, inbox: vec![] })
 
                NetTodoEndpoint::PeerInfoRecving(NetEndpoint { stream, inbox: vec![] })
 
            } else {
 
                let mut listener = TcpListener::bind(endpoint_setup.sock_addr)
 
                    .map_err(|_| Ce::BindFailed(endpoint_setup.sock_addr))?;
 
                poll.registry().register(&mut listener, token, BOTH).unwrap();
 
                TodoEndpoint::Accepting(listener)
 
                NetTodoEndpoint::Accepting(listener)
 
            };
 
            Ok(Todo {
 
            Ok(NetTodo {
 
                todo_endpoint,
 
                sent_local_port: false,
 
                recv_peer_port: None,
 
                endpoint_setup: endpoint_setup.clone(),
 
            })
 
        })
 
        .collect::<Result<Vec<Todo>, ConnectError>>()?;
 
        .collect::<Result<Vec<NetTodo>, ConnectError>>()?;
 
    let udp_todos = udp_endpoint_setups
 
        .iter()
 
        .enumerate()
 
@@ -322,8 +367,8 @@ fn new_endpoint_manager(
 
        })
 
        .collect::<Result<Vec<UdpTodo>, ConnectError>>()?;
 

	
 
    // Initially, (1) no net connections have failed, and (2) all udp and net endpoint setups are incomplete
 
    let mut net_connect_retry_later: HashSet<usize> = Default::default();
 
    // Initially no net connections have failed, and all udp and net endpoint setups are incomplete
 
    let mut net_connect_to_retry: HashSet<usize> = Default::default();
 
    let mut setup_incomplete: HashSet<TokenTarget> = {
 
        let net_todo_targets_iter =
 
            (0..net_todos.len()).map(|index| TokenTarget::NetEndpoint { index });
 
@@ -333,47 +378,49 @@ fn new_endpoint_manager(
 
    };
 
    // progress by reacting to poll events. continue until every endpoint is set up
 
    while !setup_incomplete.is_empty() {
 
        // recompute the time left to poll for progress
 
        let remaining = if let Some(deadline) = deadline {
 
            Some(deadline.checked_duration_since(Instant::now()).ok_or(Ce::Timeout)?)
 
            deadline.checked_duration_since(Instant::now()).ok_or(Ce::Timeout)?.min(RETRY_PERIOD)
 
        } else {
 
            None
 
            RETRY_PERIOD
 
        };
 
        poll.poll(&mut events, remaining).map_err(|_| Ce::PollFailed)?;
 
        // block until either
 
        // (a) `events` has been populated with 1+ elements
 
        // (b) timeout elapses, or
 
        // (c) RETRY_PERIOD elapses
 
        poll.poll(&mut events, Some(remaining)).map_err(|_| Ce::PollFailed)?;
 
        if last_retry_at.elapsed() > RETRY_PERIOD {
 
            // Retry all net connections and reset `last_retry_at`
 
            last_retry_at = Instant::now();
 
            for net_index in net_connect_to_retry.drain() {
 
                // Restart connect procedure for this net endpoint
 
                let net_todo = &mut net_todos[net_index];
 
                log!(
 
                    logger,
 
                    "Restarting connection with endpoint {:?} {:?}",
 
                    net_index,
 
                    net_todo.endpoint_setup.sock_addr
 
                );
 
                match &mut net_todo.todo_endpoint {
 
                    NetTodoEndpoint::PeerInfoRecving(endpoint) => {
 
                        let mut new_stream = TcpStream::connect(net_todo.endpoint_setup.sock_addr)
 
                            .expect("mio::TcpStream connect should not fail!");
 
                        std::mem::swap(&mut endpoint.stream, &mut new_stream);
 
                        let token = TokenTarget::NetEndpoint { index: net_index }.into();
 
                        poll.registry().register(&mut endpoint.stream, token, BOTH).unwrap();
 
                    }
 
                    _ => unreachable!(),
 
                }
 
            }
 
        }
 
        for event in events.iter() {
 
            let token = event.token();
 
            // figure out which endpoint the event belonged to
 
            let token_target = TokenTarget::from(token);
 
            match token_target {
 
                TokenTarget::Waker => {
 
                    log!(
 
                        logger,
 
                        "Notification from waker. connect_failed is {:?}",
 
                        net_connect_retry_later.iter()
 
                    );
 
                    assert!(waker_state.is_some());
 
                    for net_index in net_connect_retry_later.drain() {
 
                        let net_todo = &mut net_todos[net_index];
 
                        log!(
 
                            logger,
 
                            "Restarting connection with endpoint {:?} {:?}",
 
                            net_index,
 
                            net_todo.endpoint_setup.sock_addr
 
                        );
 
                        match &mut net_todo.todo_endpoint {
 
                            TodoEndpoint::NetEndpoint(endpoint) => {
 
                                let mut new_stream =
 
                                    TcpStream::connect(net_todo.endpoint_setup.sock_addr)
 
                                        .expect("mio::TcpStream connect should not fail!");
 
                                std::mem::swap(&mut endpoint.stream, &mut new_stream);
 
                                let token = TokenTarget::NetEndpoint { index: net_index }.into();
 
                                poll.registry()
 
                                    .register(&mut endpoint.stream, token, BOTH)
 
                                    .unwrap();
 
                            }
 
                            _ => unreachable!(),
 
                        }
 
                    }
 
                }
 
                TokenTarget::UdpEndpoint { index } => {
 
                    // UdpEndpoints are easy to complete.
 
                    // Their setup event just has to succeed without error
 
                    if !setup_incomplete.contains(&token_target) {
 
                        // spurious wakeup. this endpoint has already been set up!
 
                        continue;
 
@@ -385,9 +432,12 @@ fn new_endpoint_manager(
 
                    setup_incomplete.remove(&token_target);
 
                }
 
                TokenTarget::NetEndpoint { index } => {
 
                    // NetEndpoints are complex to complete,
 
                    // they must accept/connect to their peer,
 
                    // and then exchange port info successfully
 
                    let net_todo = &mut net_todos[index];
 
                    if let TodoEndpoint::Accepting(listener) = &mut net_todo.todo_endpoint {
 
                        // FIRST try complete this connection
 
                    if let NetTodoEndpoint::Accepting(listener) = &mut net_todo.todo_endpoint {
 
                        // Passive endpoint that will first try accept the peer's connection
 
                        match listener.accept() {
 
                            Err(e) if err_would_block(&e) => continue, // spurious wakeup
 
                            Err(_) => {
 
@@ -406,51 +456,32 @@ fn new_endpoint_manager(
 
                                    peer_addr
 
                                );
 
                                let net_endpoint = NetEndpoint { stream, inbox: vec![] };
 
                                net_todo.todo_endpoint = TodoEndpoint::NetEndpoint(net_endpoint);
 
                                net_todo.todo_endpoint =
 
                                    NetTodoEndpoint::PeerInfoRecving(net_endpoint);
 
                            }
 
                        }
 
                    }
 
                    if let TodoEndpoint::NetEndpoint(net_endpoint) = &mut net_todo.todo_endpoint {
 
                    // OK now let's try and finish exchanging port info
 
                    if let NetTodoEndpoint::PeerInfoRecving(net_endpoint) =
 
                        &mut net_todo.todo_endpoint
 
                    {
 
                        if event.is_error() {
 
                            // event signals some error! :(
 
                            if net_todo.endpoint_setup.endpoint_polarity
 
                                == EndpointPolarity::Passive
 
                            {
 
                                // right now you cannot retry an acceptor. return failure
 
                                // breaking as the acceptor is currently unrecoverable
 
                                return Err(Ce::AcceptFailed(
 
                                    net_endpoint.stream.local_addr().unwrap(),
 
                                ));
 
                            }
 
                            // this actively-connecting endpoint failed to connect!
 
                            if net_connect_retry_later.insert(index) {
 
                                log!(
 
                                    logger,
 
                                    "Connection failed for {:?}. List is {:?}",
 
                                    index,
 
                                    net_connect_retry_later.iter()
 
                                );
 
                                poll.registry().deregister(&mut net_endpoint.stream).unwrap();
 
                            } else {
 
                                // spurious wakeup. already scheduled to retry connect later
 
                                continue;
 
                            }
 
                            if waker_state.is_none() {
 
                                log!(logger, "First connect failure. Starting waker thread");
 
                                let arc = Arc::new(WakerState {
 
                                    waker: mio::Waker::new(
 
                                        poll.registry(),
 
                                        TokenTarget::Waker.into(),
 
                                    )
 
                                    .unwrap(),
 
                                    continue_signal: true.into(),
 
                                });
 
                                let moved_arc = arc.clone();
 
                                waker_state = Some(arc);
 
                                std::thread::spawn(move || moved_arc.waker_loop());
 
                            }
 
                            // We will schedule it for a retry
 
                            net_connect_to_retry.insert(index);
 
                            continue;
 
                        }
 
                        // event wasn't ERROR
 
                        if net_connect_retry_later.contains(&index) {
 
                        if net_connect_to_retry.contains(&index) {
 
                            // spurious wakeup. already scheduled to retry connect later
 
                            continue;
 
                        }
 
@@ -462,8 +493,9 @@ fn new_endpoint_manager(
 
                            continue;
 
                        }
 
                        let local_info = port_info
 
                            .get_mut(&net_todo.endpoint_setup.getter_for_incoming)
 
                            .unwrap();
 
                            .map
 
                            .get(&net_todo.endpoint_setup.getter_for_incoming)
 
                            .expect("Net Setup's getter port info isn't known"); // unreachable
 
                        if event.is_writable() && !net_todo.sent_local_port {
 
                            // can write and didn't send setup msg yet? Do so!
 
                            let msg = Msg::SetupMsg(SetupMsg::MyPortInfo(MyPortInfo {
 
@@ -484,7 +516,7 @@ fn new_endpoint_manager(
 
                            net_todo.sent_local_port = true;
 
                        }
 
                        if event.is_readable() && net_todo.recv_peer_port.is_none() {
 
                            // can read and didn't recv setup msg yet? Do so!
 
                            // can read and didn't finish recving setup msg yet? Do so!
 
                            let maybe_msg = net_endpoint.try_recv(logger).map_err(|e| {
 
                                Ce::NetEndpointSetupError(
 
                                    net_endpoint.stream.local_addr().unwrap(),
 
@@ -509,15 +541,21 @@ fn new_endpoint_manager(
 
                                        ));
 
                                    }
 
                                    net_todo.recv_peer_port = Some(peer_info.port);
 
                                    // 1. finally learned the peer of this port!
 
                                    local_info.peer = Some(peer_info.port);
 
                                    // 2. learned the info of this peer port
 
                                    port_info.entry(peer_info.port).or_insert(PortInfo {
 
                                        peer: Some(net_todo.endpoint_setup.getter_for_incoming),
 
                                        polarity: peer_info.polarity,
 
                                        owner: peer_info.owner,
 
                                        route: Route::NetEndpoint { index },
 
                                    });
 
                                    // finally learned the peer of this port!
 
                                    extra_port_info.peers.insert(
 
                                        net_todo.endpoint_setup.getter_for_incoming,
 
                                        peer_info.port,
 
                                    );
 
                                    // learned the info of this peer port
 
                                    if !port_info.map.contains_key(&peer_info.port) {
 
                                        let info = PortInfo {
 
                                            peer: Some(net_todo.endpoint_setup.getter_for_incoming),
 
                                            polarity: peer_info.polarity,
 
                                            owner: peer_info.owner,
 
                                            route: Route::NetEndpoint { index },
 
                                        };
 
                                        extra_port_info.info.insert(peer_info.port, info);
 
                                    }
 
                                }
 
                                Some(inappropriate_msg) => {
 
                                    log!(
 
@@ -542,15 +580,12 @@ fn new_endpoint_manager(
 
        events.clear();
 
    }
 
    log!(logger, "Endpoint setup complete! Cleaning up and building structures");
 
    if let Some(ws) = waker_state.take() {
 
        ws.waker_stop();
 
    }
 
    let net_endpoint_exts = net_todos
 
        .into_iter()
 
        .enumerate()
 
        .map(|(index, Todo { todo_endpoint, endpoint_setup, .. })| NetEndpointExt {
 
        .map(|(index, NetTodo { todo_endpoint, endpoint_setup, .. })| NetEndpointExt {
 
            net_endpoint: match todo_endpoint {
 
                TodoEndpoint::NetEndpoint(mut net_endpoint) => {
 
                NetTodoEndpoint::PeerInfoRecving(mut net_endpoint) => {
 
                    let token = TokenTarget::NetEndpoint { index }.into();
 
                    poll.registry()
 
                        .reregister(&mut net_endpoint.stream, token, Interest::READABLE)
 
@@ -577,7 +612,7 @@ fn new_endpoint_manager(
 
            }
 
        })
 
        .collect();
 
    Ok(EndpointManager {
 
    let endpoint_manager = EndpointManager {
 
        poll,
 
        events,
 
        undelayed_messages: delayed_messages, // no longer delayed
 
@@ -591,22 +626,33 @@ fn new_endpoint_manager(
 
            polled_undrained: udp_polled_undrained,
 
        },
 
        udp_in_buffer: Default::default(),
 
    })
 
    };
 
    Ok((endpoint_manager, extra_port_info))
 
}
 

	
 
// Given a fully-formed endpoint manager,
 
// construct the consensus tree with:
 
// 1. decentralized leader election
 
// 2. centralized tree construction
 
fn init_neighborhood(
 
    connector_id: ConnectorId,
 
    logger: &mut dyn Logger,
 
    em: &mut EndpointManager,
 
    deadline: &Option<Instant>,
 
) -> Result<Neighborhood, ConnectError> {
 
    ////////////////////////////////
 
    use {ConnectError as Ce, Msg::SetupMsg as S, SetupMsg as Sm};
 

	
 
    // storage structure for the state of a distributed wave
 
    // (for readability)
 
    #[derive(Debug)]
 
    struct WaveState {
 
        parent: Option<usize>,
 
        leader: ConnectorId,
 
    }
 

	
 
    // kick off a leader-election wave rooted at myself
 
    // given the desired wave information
 
    // (e.g. don't inform my parent if they exist)
 
    fn do_wave(
 
        em: &mut EndpointManager,
 
        awaiting: &mut HashSet<usize>,
 
@@ -661,6 +707,8 @@ fn init_neighborhood(
 
            log!(logger, "Received from index {:?} msg {:?}", &recv_index, &msg);
 
            match msg {
 
                S(Sm::LeaderAnnounce { tree_leader }) => {
 
                    // A neighbor explicitly tells me who is the leader
 
                    // they become my parent, and I adopt their announced leader
 
                    let election_result =
 
                        WaveState { leader: tree_leader, parent: Some(recv_index) };
 
                    log!(logger, "Election lost! Result {:?}", &election_result);
 
@@ -734,6 +782,8 @@ fn init_neighborhood(
 
    };
 

	
 
    // starting algorithm 2. Send a message to every neighbor
 
    // namely, send "YouAreMyParent" to parent (if they exist),
 
    // and LeaderAnnounce to everyone else
 
    log!(logger, "Starting tree construction. Step 1: send one msg per neighbor");
 
    awaiting.clear();
 
    for index in em.index_iter() {
 
@@ -747,6 +797,8 @@ fn init_neighborhood(
 
            )?;
 
        }
 
    }
 
    // Receive one message from each neighbor to learn
 
    // whether they consider me their parent or not.
 
    let mut children = vec![];
 
    em.undelay_all();
 
    while !awaiting.is_empty() {
 
@@ -755,7 +807,7 @@ fn init_neighborhood(
 
        log!(logger, "Received from index {:?} msg {:?}", &recv_index, &msg);
 
        match msg {
 
            S(Sm::LeaderAnnounce { .. }) => {
 
                // not a child
 
                // `recv_index` is not my child
 
                log!(
 
                    logger,
 
                    "Got reply from non-child index {:?}. Children: {:?}",
 
@@ -776,6 +828,7 @@ fn init_neighborhood(
 
                    );
 
                    return Err(Ce::SetupAlgMisbehavior);
 
                }
 
                // `recv_index` is my child
 
                children.push(recv_index);
 
            }
 
            msg @ S(Sm::MyPortInfo(_)) | msg @ S(Sm::LeaderWave { .. }) => {
 
@@ -789,6 +842,7 @@ fn init_neighborhood(
 
            }
 
        }
 
    }
 
    // Neighborhood complete!
 
    children.shrink_to_fit();
 
    let neighborhood =
 
        Neighborhood { parent: election_result.parent, children: VecSet::new(children) };
 
@@ -796,14 +850,17 @@ fn init_neighborhood(
 
    Ok(neighborhood)
 
}
 

	
 
// Connectors collect a map of type ConnectorId=>SessionInfo,
 
// representing a global view of the session's state at the leader.
 
// The leader rewrites its contents however they like (currently: nothing happens)
 
// and the map is again broadcasted, for each peer to make their local changes to
 
// reflect the results of the rewrite.
 
fn session_optimize(
 
    cu: &mut ConnectorUnphased,
 
    comm: &mut ConnectorCommunication,
 
    deadline: &Option<Instant>,
 
) -> Result<(), ConnectError> {
 
    ////////////////////////////////////////
 
    use {ConnectError as Ce, Msg::SetupMsg as S, SetupMsg as Sm};
 
    ////////////////////////////////////////
 
    log!(cu.logger, "Beginning session optimization");
 
    // populate session_info_map from a message per child
 
    let mut unoptimized_map: HashMap<ConnectorId, SessionInfo> = Default::default();
 
@@ -857,6 +914,7 @@ fn session_optimize(
 
        "Gathered all children's maps. ConnectorId set is... {:?}",
 
        unoptimized_map.keys()
 
    );
 
    // add my own session info to the map
 
    let my_session_info = SessionInfo {
 
        port_info: cu.ips.port_info.clone(),
 
        proto_components: cu.proto_components.clone(),
 
@@ -871,7 +929,6 @@ fn session_optimize(
 
    };
 
    unoptimized_map.insert(cu.ips.id_manager.connector_id, my_session_info);
 
    log!(cu.logger, "Inserting my own info. Unoptimized subtree map is {:?}", &unoptimized_map);
 

	
 
    // acquire the optimized info...
 
    let optimized_map = if let Some(parent) = comm.neighborhood.parent {
 
        // ... as a message from my parent
 
@@ -920,25 +977,35 @@ fn session_optimize(
 
        comm.neighborhood.children.iter()
 
    );
 
    log!(cu.logger, "All session info dumped!: {:#?}", &optimized_map);
 
    // extract my own ConnectorId's entry
 
    let optimized_info =
 
        optimized_map.get(&cu.ips.id_manager.connector_id).expect("HEY NO INFO FOR ME?").clone();
 
    // broadcast the optimized session info to my children
 
    let msg = S(Sm::SessionScatter { optimized_map });
 
    for &child in comm.neighborhood.children.iter() {
 
        comm.endpoint_manager.send_to_setup(child, &msg)?;
 
    }
 
    apply_optimizations(cu, comm, optimized_info)?;
 
    // apply local optimizations
 
    apply_my_optimizations(cu, comm, optimized_info)?;
 
    log!(cu.logger, "Session optimizations applied");
 
    Ok(())
 
}
 

	
 
// Defines the optimization function, consuming an optimized map,
 
// and returning an optimized map.
 
fn leader_session_map_optimize(
 
    logger: &mut dyn Logger,
 
    unoptimized_map: HashMap<ConnectorId, SessionInfo>,
 
) -> Result<HashMap<ConnectorId, SessionInfo>, ConnectError> {
 
    log!(logger, "Session map optimize START");
 
    // currently, it's the identity function
 
    log!(logger, "Session map optimize END");
 
    Ok(unoptimized_map)
 
}
 
fn apply_optimizations(
 

	
 
// Modify the given connector's internals to reflect
 
// the given session info
 
fn apply_my_optimizations(
 
    cu: &mut ConnectorUnphased,
 
    comm: &mut ConnectorCommunication,
 
    session_info: SessionInfo,
 
@@ -949,7 +1016,7 @@ fn apply_optimizations(
 
        serde_proto_description,
 
        endpoint_incoming_to_getter,
 
    } = session_info;
 
    // TODO some info which should be read-only can be mutated with the current scheme
 
    // simply overwrite the contents
 
    cu.ips.port_info = port_info;
 
    cu.proto_components = proto_components;
 
    cu.proto_description = serde_proto_description.0;
0 comments (0 inline, 0 general)